In [1]:
import org.apache.spark.ml.feature.{CountVectorizer, IDF}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import java.util.Properties
import java.io.ByteArrayInputStream;

import edu.stanford.nlp.ling.CoreAnnotations.{LemmaAnnotation, SentencesAnnotation, TokensAnnotation}
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}

import spark.implicits._

Intitializing Scala interpreter ...

Spark Web UI available at http://localhost:4041
SparkContext available as 'sc' (version = 2.4.6, master = local[*], app id = local-1606995847684)
SparkSession available as 'spark'


import org.apache.spark.ml.feature.{CountVectorizer, IDF}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import java.util.Properties
import java.io.ByteArrayInputStream
import edu.stanford.nlp.ling.CoreAnnotations.{LemmaAnnotation, SentencesAnnotation, TokensAnnotation}
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import spark.implicits._


## 데이터 불러오기

In [2]:
sc

res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@38d6685f


In [3]:
val plainText = sc.textFile("Data/proj/D5.csv")

plainText: org.apache.spark.rdd.RDD[String] = Data/proj/D5.csv MapPartitionsRDD[1] at textFile at <console>:41


In [4]:
plainText.take(2)

res1: Array[String] = Array("1  . A computer-implemented method of decentralized block chain voting, the method comprising:  retrieving, by a computing device, polling data including a plurality of polling options and an option identifier associated with each polling option;  generating a customized cryptographic currency address for each of the plurality of polling options, based on the corresponding option identifier associated with each polling option; and  transferring a specified amount of cryptographic tokens to the customized cryptographic currency address for a selected one of the plurality of polling options,  wherein the transfer is broadcast to a cryptographic currency network for confirmation and inclusion within a block chain ledger of the cryptographic currency network.  2...

In [5]:
val stopWords = scala.io.Source.fromFile("Data/practice6/stopwords.txt").getLines().toSet

stopWords: scala.collection.immutable.Set[String] = Set(down, it's, ourselves, that's, for, method, further, she'll, any, there's, this, haven't, in, ought, myself, have, your, off, once, i'll, are, is, his, why, too, why's, am, than, isn't, didn't, himself, but, you're, below, what, would, methods, i'd, if, you'll, own, they'll, up, we're, they'd, so, our, do, all, him, had, nor, before, it, a, she's, as, hadn't, because, has, she, yours, or, above, yourself, herself, she'd, such, they, each, can't, don't, i, until, that, out, he's, cannot, to, we've, hers, you, did, let's, most, here, these, hasn't, was, there, when's, shan't, doing, at, through, been, over, i've, on, being, same, how, whom, my, after, who, itself, me, them, by, then, couldn't, he, should, few, wasn't, again, while, t...

In [6]:
import edu.stanford.nlp.pipeline._
import edu.stanford.nlp.ling.CoreAnnotations._

def plainTextToLemmas(text: String, stopWords: Set[String], pipeline:StanfordCoreNLP): Seq[String] = {
  val doc = new Annotation(text)
  pipeline.annotate(doc)
  val lemmas = new ArrayBuffer[String]()
  val sentences = doc.get(classOf[SentencesAnnotation])
  for (sentence <- sentences; token <- sentence.get(classOf[TokensAnnotation])) {
    val lemma = token.get(classOf[LemmaAnnotation])
    if (lemma.length > 2 && !stopWords.contains(lemma)) {
      lemmas += lemma.toLowerCase
    }
  }
  lemmas
}

val lemmatized = plainText.mapPartitions(strings => {
  val props = new Properties()
  props.put("annotators", "tokenize, ssplit, pos, lemma")
  val pipeline = new StanfordCoreNLP(props)
  strings.map(string => plainTextToLemmas(string, stopWords, pipeline))
})
//lemmatized.foreach(println)

import edu.stanford.nlp.pipeline._
import edu.stanford.nlp.ling.CoreAnnotations._
plainTextToLemmas: (text: String, stopWords: Set[String], pipeline: edu.stanford.nlp.pipeline.StanfordCoreNLP)Seq[String]
lemmatized: org.apache.spark.rdd.RDD[Seq[String]] = MapPartitionsRDD[2] at mapPartitions at <console>:61


In [7]:
val termsDF = lemmatized.toDF("terms")

termsDF: org.apache.spark.sql.DataFrame = [terms: array<string>]


In [8]:
termsDF.show()

+--------------------+
|               terms|
+--------------------+
|[computer, implem...|
|[cryptographic, v...|
|[system, comprise...|
|[comprise, identi...|
|[block, generatio...|
|[process, vote, p...|
|[system, comprise...|
|[carry, least, fi...|
|[system, comprise...|
|[system, secure, ...|
|[node, position, ...|
|[blockchain, cons...|
|[electronic, voti...|
|[electronic, voti...|
|[electronic, voti...|
|[autonomous, vehi...|
|[system, securely...|
|[comprise, receiv...|
|[cryptographic, v...|
|[system, comprise...|
+--------------------+
only showing top 20 rows



## TF-IDF

In [9]:
val titleDF = spark.read.
    option("inferSchema", true).
    option("header", true).
    csv("Data/proj/T5.csv")

titleDF: org.apache.spark.sql.DataFrame = [title: string]


In [10]:
val t1 = termsDF.withColumn("id", monotonically_increasing_id())
val t2 = titleDF.withColumn("id", monotonically_increasing_id())

t1: org.apache.spark.sql.DataFrame = [terms: array<string>, id: bigint]
t2: org.apache.spark.sql.DataFrame = [title: string, id: bigint]


In [11]:
val patentDF = t2.join(t1, "id").drop("id")

patentDF: org.apache.spark.sql.DataFrame = [title: string, terms: array<string>]


In [12]:
patentDF.show()

+--------------------+--------------------+
|               title|               terms|
+--------------------+--------------------+
|CRYPTOGRAPHIC CUR...|[computer, implem...|
|Blockchain-based ...|[cryptographic, v...|
|OFF-CHAIN BLOCKCH...|[system, comprise...|
|Peer voting on a ...|[comprise, identi...|
|BLOCK GENERATION ...|[block, generatio...|
|METHOD AND SYSTEM...|[process, vote, p...|
|Blockchain-based ...|[system, comprise...|
|Methods and appar...|[carry, least, fi...|
|SYSTEMS AND METHO...|[system, comprise...|
|CRYPTOGRAPHICALLY...|[system, secure, ...|
|APPARATUS AND MET...|[node, position, ...|
|METHOD, APPARATUS...|[blockchain, cons...|
|Electronic voting...|[electronic, voti...|
|Electronic voting...|[electronic, voti...|
|Electronic voting...|[electronic, voti...|
|Secure distribute...|[autonomous, vehi...|
|Blockchain fortif...|[system, securely...|
|SYSTEMS AND METHO...|[comprise, receiv...|
|Blockchain-based ...|[cryptographic, v...|
|BLOCKCHAIN COMPUT...|[system, c

In [13]:
val numTerms = 20000
val countVectorizer = new CountVectorizer().
setInputCol("terms").
setOutputCol("termFreqs").
setVocabSize(numTerms)
val vocabModel = countVectorizer.fit(patentDF)
val docTermFreqs = vocabModel.transform(patentDF)

numTerms: Int = 20000
countVectorizer: org.apache.spark.ml.feature.CountVectorizer = cntVec_1481a0fbf72d
vocabModel: org.apache.spark.ml.feature.CountVectorizerModel = cntVec_1481a0fbf72d
docTermFreqs: org.apache.spark.sql.DataFrame = [title: string, terms: array<string> ... 1 more field]


In [14]:
docTermFreqs.cache()

res4: docTermFreqs.type = [title: string, terms: array<string> ... 1 more field]


In [15]:
val idf = new IDF().
setInputCol("termFreqs").
setOutputCol("tfidfVec")
val idfModel = idf.fit(docTermFreqs)
val docTermMatrix = idfModel.transform(docTermFreqs).select("title", "tfidfVec")

idf: org.apache.spark.ml.feature.IDF = idf_83efa156a7de
idfModel: org.apache.spark.ml.feature.IDFModel = idf_83efa156a7de
docTermMatrix: org.apache.spark.sql.DataFrame = [title: string, tfidfVec: vector]


In [16]:
val termIds: Array[String] = vocabModel.vocabulary

termIds: Array[String] = Array(wherein, claim, one, comprise, first, blockchain, second, node, transaction, datum, system, block, plurality, computer, device, least, receive, key, network, include, configure, identifier, processor, determine, base, vote, request, store, associate, information, value, generate, ledger, use, number, record, apparatus, distribute, cryptographic, readable, encrypt, storage, consensus, user, set, message, correspond, server, access, frame, perform, communication, medium, computing, party, voter, security, instruction, transmit, identify, response, execute, memory, voting, control, function, say, media, entity, accord, character, time, select, private, database, trust, program, public, portion, new, token, non-transitory, bit, cause, address, field, compute, ...

In [17]:
val docIds = docTermFreqs.rdd.map(_.getString(0)).
zipWithUniqueId().
map(_.swap).
collect().toMap

docIds: scala.collection.immutable.Map[Long,String] = Map(138 -> DOCUMENT TRANSFER PROCESSING FOR BLOCKCHAINS, 234 -> CRYPTOASSET CUSTODIAL SYSTEM WITH DIFFERENT CRYPTOGRAPHIC KEYS CONTROLLING ACCESS TO SEPARATE GROUPS OF PRIVATE KEYS, 0 -> CRYPTOGRAPHIC CURRENCY BLOCK CHAIN BASED VOTING SYSTEM, 88 -> ENABLING ACCESS ACROSS PRIVATE NETWORKS FOR A MANAGED BLOCKCHAIN SERVICE, 352 -> Efficient clearinghouse transactions with trusted and un-trusted entities, 170 -> VOTING SYSTEM AND METHOD, 276 -> Dynamic content distribution protocol for an enterprise environment, 308 -> BYZANTINE AGREEMENT IN OPEN NETWORKS, 120 -> Blockchain Timeclock System, 202 -> Method and system for copying live entities of source blocks identified by source list for selected destination block to selected destination...

In [18]:
import org.apache.spark.mllib.linalg.{Vectors, Vector => MLLibVector}
import org.apache.spark.ml.linalg.{Vector => MLVector}

val vecRdd = docTermMatrix.select("tfidfVec").rdd.map { row =>
Vectors.fromML(row.getAs[MLVector]("tfidfVec"))
}

import org.apache.spark.mllib.linalg.{Vectors, Vector=>MLLibVector}
import org.apache.spark.ml.linalg.{Vector=>MLVector}
vecRdd: org.apache.spark.rdd.RDD[org.apache.spark.mllib.linalg.Vector] = MapPartitionsRDD[67] at map at <console>:51


## LSA(latent semantic Analysis)

In [19]:
import org.apache.spark.mllib.linalg.distributed.RowMatrix

vecRdd.cache()
val mat = new RowMatrix(vecRdd)
val k = 500
val svd = mat.computeSVD(k, computeU=true)

import org.apache.spark.mllib.linalg.distributed.RowMatrix
mat: org.apache.spark.mllib.linalg.distributed.RowMatrix = org.apache.spark.mllib.linalg.distributed.RowMatrix@4a51aa22
k: Int = 500
svd: org.apache.spark.mllib.linalg.SingularValueDecomposition[org.apache.spark.mllib.linalg.distributed.RowMatrix,org.apache.spark.mllib.linalg.Matrix] =
SingularValueDecomposition(org.apache.spark.mllib.linalg.distributed.RowMatrix@628e68e9,[1729.5240106965148,1383.0347146843299,1299.6098644632875,1223.6731226286024,1122.4545355555204,1099.2715388184295,1002.2768406200446,964.7291645012558,929.9797759987968,927.2915659974697,897.7475721775903,879.4837146115256,849.5819026996015,838.4496239858136,826.8733146926664,796.6586015193386,791.6847810376802,757.6977146003904,738.0340386036961,730.448481420...

In [20]:
import org.apache.spark.mllib.linalg.{Matrix, SingularValueDecomposition}
import org.apache.spark.mllib.linalg.distributed.RowMatrix

def topTermsInTopConcepts(
svd: SingularValueDecomposition[RowMatrix, Matrix],
numConcepts: Int,
numTerms: Int, termIds: Array[String])
:Seq[Seq[(String, Double)]] = {
    val v = svd.V
    val topTerms = new ArrayBuffer[Seq[(String, Double)]]()
    val arr = v.toArray
    for (i <- 0 until numConcepts) {
        val offs = i * v.numRows
        val termWeights = arr.slice(offs, offs + v.numRows).zipWithIndex
        val sorted = termWeights.sortBy(-_._1)
        topTerms += sorted.take(numTerms).map {
            case (score, id) => (termIds(id), score)
        }
    }
    topTerms
}


import org.apache.spark.mllib.linalg.{Matrix, SingularValueDecomposition}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
topTermsInTopConcepts: (svd: org.apache.spark.mllib.linalg.SingularValueDecomposition[org.apache.spark.mllib.linalg.distributed.RowMatrix,org.apache.spark.mllib.linalg.Matrix], numConcepts: Int, numTerms: Int, termIds: Array[String])Seq[Seq[(String, Double)]]


In [21]:
def topDocsInTopConcepts(
svd: SingularValueDecomposition[RowMatrix, Matrix],
numConcepts: Int, numDocs: Int, docIds: Map[Long, String])
: Seq[Seq[(String, Double)]] = {
    val u = svd.U
    val topDocs = new ArrayBuffer[Seq[(String, Double)]]()
for( i <- 0 until numConcepts) {
    val docWeights = u.rows.map(_.toArray(i)).zipWithUniqueId()
    topDocs += docWeights.top(numDocs).map {
        case (score, id) => (docIds(id), score)
    }
}
topDocs}

topDocsInTopConcepts: (svd: org.apache.spark.mllib.linalg.SingularValueDecomposition[org.apache.spark.mllib.linalg.distributed.RowMatrix,org.apache.spark.mllib.linalg.Matrix], numConcepts: Int, numDocs: Int, docIds: Map[Long,String])Seq[Seq[(String, Double)]]


In [24]:
val topConceptTerms = topTermsInTopConcepts(svd, 6, 5, termIds)
val topConceptDocs = topDocsInTopConcepts(svd, 6, 5, docIds)
for((terms, docs) <- topConceptTerms.zip(topConceptDocs)) {
    println("Concept terms: " + terms.map(_._1).mkString(", "))
    println("Concept docs: " + docs.map(_._1).mkString(". "))
    println()
}

Concept terms: elapsed, movable, driveable, trajectory, simply
Concept docs: Distributive networks of groups of moveable autonomous devices. Method for Smart Contract Data Input through a Proof-of-Work Consensus Mechanism. Massively Scalable Blockchain Ledger. Systems and Methods for Dynamic Cypher Key Management. Flexible Blockchain Smart-Contract Deployment

Concept terms: character, directory, name, decode, recite
Concept docs: Exclusive encryption. Exclusive encryption. Exclusive encryption. Exclusive encryption. Method for Smart Contract Data Input through a Proof-of-Work Consensus Mechanism

Concept terms: frame, field, control, party, crosslink
Concept docs: Methods and apparatus for communicating high efficiency control information. Communication apparatus, communication system, and communication control program. Communication apparatus, communication system and communication control program. Efficient clearinghouse transactions with trusted and un-trusted entities. Interlocked

topConceptTerms: Seq[Seq[(String, Double)]] = ArrayBuffer(ArraySeq((elapsed,-7.35774369583185E-7), (movable,-7.357743695857871E-7), (driveable,-7.357743695883892E-7), (trajectory,-7.357743695974965E-7), (simply,-1.147520937130192E-6)), ArraySeq((character,0.04621677006392272), (directory,0.033459889038937346), (name,0.027513886217600397), (decode,0.02596357621401845), (recite,0.021620478949759908)), ArraySeq((frame,0.4557077012816694), (field,0.44240159831639514), (control,0.18860630226458705), (party,0.1759912845645021), (crosslink,0.1548386631537958)), ArraySeq((field,0.3369040896776928), (frame,0.31608290057553606), (control,0.09837342321626272), (grab,0.08798058875313144), (physical,0.07218577996770213)), ArraySeq((hsm,0.4921007526197303), (security,0.46920133841553135), (api,0.2924...