In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("yarn")\
.config('spark.executor.cores', '2')\
.config('spark.executor.memory', '7G')\
.config('spark.driver.memory','4G')\
.config('spark.executor.instances','8')\
.appName('bdse62')\
.getOrCreate()

In [2]:
abstract=spark.read.csv('hdfs:///bdse71/ABS')

# Feedforward Neural Network

In [3]:
testDF = spark.read.parquet('hdfs:///data/jso_hierarchy.parquet')

In [4]:
testDF.show()

+------------------+--------+-------+-------+
|Application_Number|mono_ipc|tri_ipc|qua_ipc|
+------------------+--------+-------+-------+
| PCT/CA2018/000243|       B|    A61|   B25J|
| PCT/CA2019/050200|       G|    G02|   G02C|
| PCT/CL2016/050019|       G|    G06|   G06Q|
| PCT/CN2012/070253|       A|    A43|   A43B|
| PCT/DE2014/100160|       F|    F41|   F41H|
| PCT/DE2016/100332|       C|    C10|   C10B|
| PCT/DK2005/000435|       A|    A61|   A61K|
| PCT/EP1999/004038|       E|    E21|   E21B|
| PCT/EP1999/008598|       A|    A61|   A61K|
| PCT/EP2000/004724|       A|    A23|   A23G|
| PCT/EP2000/011562|       C|    C11|   C11D|
| PCT/EP2001/002279|       C|    A01|   A01N|
| PCT/EP2001/003614|       B|    F16|   F16B|
| PCT/EP2001/003789|       A|    A22|   A22B|
| PCT/EP2001/010090|       C|    C12|   C12N|
| PCT/EP2001/013071|       C|    C07|   C08F|
| PCT/EP2001/014943|       B|    B60|   B60T|
| PCT/EP2003/007473|       C|    A01|   A01N|
| PCT/EP2003/011110|       A|    A

## TF-IDF計算

In [5]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, RegexTokenizer
from pyspark.ml.feature import StopWordsRemover

In [6]:
abstract=abstract.dropna()
abstract.show()

+-----------------+--------------------+
|              _c0|                 _c1|
+-----------------+--------------------+
|PCT/IB2015/050868|For the productio...|
|PCT/EP2013/003311|A composition com...|
|PCT/AU2010/000148|An agricultural a...|
|PCT/CN2013/072341|A detection metho...|
|PCT/EP2013/058979|The invention rel...|
|PCT/EP2016/063487|The present inven...|
|PCT/GB2009/001774|A compound having...|
|PCT/EP2013/077705|A compound with t...|
|PCT/EP2009/006204|The invention rel...|
|PCT/KR2017/010450|The present inven...|
|PCT/EP2010/000270|The present inven...|
|PCT/EP2013/077695|A compound of for...|
|PCT/KR2012/004595|The present funct...|
|PCT/JP2015/004803|A zoom lens accor...|
|PCT/AT2017/000070|The invention rel...|
|PCT/KR2015/006148|The present inven...|
|PCT/IB2014/001029|A conjugate of fo...|
|PCT/GB2012/052526|A method for prom...|
|PCT/EP2001/009832|Liquid crystal mi...|
|PCT/KR2000/000814|The present inven...|
+-----------------+--------------------+
only showing top

### Join two tables

In [7]:
joinExpression = testDF['Application_Number']==abstract["_c0"]
joinType = "inner"

In [8]:
joinedDF=testDF.join(abstract,joinExpression,joinType).select('Application_Number','mono_ipc','_c1')
joinedDF.show()

+------------------+--------+--------------------+
|Application_Number|mono_ipc|                 _c1|
+------------------+--------+--------------------+
| PCT/EP2010/000270|       A|The present inven...|
| PCT/EP2013/077695|       C|A compound of for...|
| PCT/EP2001/009832|       C|Liquid crystal mi...|
| PCT/KR2000/000814|       F|The present inven...|
| PCT/BR2010/000175|       C|The organic matte...|
| PCT/IL2000/000667|       H|The present inven...|
| PCT/KR2009/007459|       A|The present inven...|
| PCT/DK2001/000750|       C|The invention rel...|
| PCT/EP2008/009031|       A|The invention rel...|
| PCT/EP2001/012972|       B|The invention rel...|
| PCT/EP2007/000257|       B|The invention rel...|
| PCT/EP2003/014296|       C|The present inven...|
| PCT/GB2003/005261|       C|Fungicidal compou...|
| PCT/EP2013/066528|       B|The present inven...|
| PCT/RU2001/000190|       G|The inventive met...|
| PCT/IL2000/000459|       C|The present inven...|
| PCT/KR2006/002440|       B|A 

### stack the model

In [9]:
tokenizer = RegexTokenizer(inputCol="_c1", outputCol="words",pattern="\\W")
remover = StopWordsRemover(inputCol="words", outputCol="removededWords") # stopWords=yourownstopwords
vectorizer = CountVectorizer(inputCol="removededWords", outputCol="rawFeatures")
#idf = IDF(inputCol="rawFeatures", outputCol="features") # minDocFreq=2, TF小於2的就忽略
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=2)
pipeline = Pipeline(stages=[tokenizer,remover ,vectorizer, idf])
model = pipeline.fit(joinedDF)

In [13]:
import numpy as np
for i in ['A','B','C','D','E','F','G','H']:
    total_counts = model.transform(joinedDF.filter(joinedDF.mono_ipc==i)).select('rawFeatures').rdd.map(lambda row: row['rawFeatures'].toArray()).reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])
    vocabList = model.stages[2].vocabulary
    d = {'vocabList':vocabList,'counts':total_counts}
    spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).limit(200).sort('counts',ascending=False).write.csv(f'hdfs:///data/wob_{i}',mode='overwrite')

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1212 tasks (1025.5 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB)
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2008)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2007)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2007)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:973)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:973)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2239)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2188)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2177)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:775)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2164)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1004)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1003)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:168)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:834)


In [11]:
import numpy as np
total_counts = model.transform(joinedDF.filter(joinedDF.mono_ipc=='A')).select('rawFeatures').rdd.map(lambda row: row['rawFeatures'].toArray()).reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])
vocabList = model.stages[2].vocabulary
d = {'vocabList':vocabList,'counts':total_counts}
spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).show()

+----------+------+
| vocabList|counts|
+----------+------+
| invention|3494.0|
|      said|2348.0|
|       one|2238.0|
|     least|1949.0|
|   relates|1886.0|
|    method|1742.0|
|comprising|1563.0|
|    device|1280.0|
|     means|1267.0|
| comprises|1026.0|
|     first|1022.0|
|   surface| 961.0|
|    system| 955.0|
|       use| 946.0|
|      also| 942.0|
|  material| 910.0|
|    second| 910.0|
|  provided| 893.0|
|   present| 833.0|
|   wherein| 811.0|
+----------+------+
only showing top 20 rows



In [36]:
spark.createDataFrame(np.array(list(d.values())).T.tolist(),list(d.keys())).write.csv('hdfs:///data/words_Of_Bad_Jan12.csv',header=True,mode='overwrite')

In [12]:
tfidf=model.transform(joinedDF)

In [13]:
tfidf.createOrReplaceTempView("table1")
spark.sql('select count(*) as num_of_sample from table1').show()

+-------------+
|num_of_sample|
+-------------+
|         4091|
+-------------+



In [14]:
tfidf.columns

['Application_Number',
 'mono_ipc',
 '_c1',
 'words',
 'removededWords',
 'rawFeatures',
 'features']

In [15]:
tfidfForTrain=spark.sql('select mono_ipc,rawFeatures, features from table1')

In [16]:
tfidfForTrain.show()

+--------+--------------------+--------------------+
|mono_ipc|         rawFeatures|            features|
+--------+--------------------+--------------------+
|       B|(17518,[0,3,4,6,7...|(17518,[0,3,4,6,7...|
|       A|(17518,[6,29,95,1...|(17518,[6,29,95,1...|
|       A|(17518,[0,3,4,5,1...|(17518,[0,3,4,5,1...|
|       G|(17518,[0,4,5,14,...|(17518,[0,4,5,14,...|
|       G|(17518,[0,1,7,15,...|(17518,[0,1,7,15,...|
|       H|(17518,[0,3,4,5,8...|(17518,[0,3,4,5,8...|
|       C|(17518,[0,1,4,5,2...|(17518,[0,1,4,5,2...|
|       C|(17518,[5,6,10,16...|(17518,[5,6,10,16...|
|       F|(17518,[1,17,71,8...|(17518,[1,17,71,8...|
|       C|(17518,[0,2,4,19,...|(17518,[0,2,4,19,...|
|       H|(17518,[0,2,3,4,5...|(17518,[0,2,3,4,5...|
|       C|(17518,[0,2,3,4,5...|(17518,[0,2,3,4,5...|
|       C|(17518,[0,4,5,15,...|(17518,[0,4,5,15,...|
|       C|(17518,[0,2,3,4,6...|(17518,[0,2,3,4,6...|
|       A|(17518,[0,2,3,4,6...|(17518,[0,2,3,4,6...|
|       C|(17518,[0,4,14,18...|(17518,[0,4,14,

In [17]:
from pyspark.ml.linalg import Vectors # !!!!caution: not from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [18]:
labelIndexer = StringIndexer(inputCol='mono_ipc',outputCol='indexedLabel').fit(tfidfForTrain)
labelIndexer.transform(tfidfForTrain).show(5, True)
stage1=labelIndexer.transform(tfidfForTrain)

+--------+--------------------+--------------------+------------+
|mono_ipc|         rawFeatures|            features|indexedLabel|
+--------+--------------------+--------------------+------------+
|       B|(17518,[0,3,4,6,7...|(17518,[0,3,4,6,7...|         2.0|
|       A|(17518,[6,29,95,1...|(17518,[6,29,95,1...|         1.0|
|       A|(17518,[0,3,4,5,1...|(17518,[0,3,4,5,1...|         1.0|
|       G|(17518,[0,4,5,14,...|(17518,[0,4,5,14,...|         3.0|
|       G|(17518,[0,1,7,15,...|(17518,[0,1,7,15,...|         3.0|
+--------+--------------------+--------------------+------------+
only showing top 5 rows



In [19]:
(trainingData, testData) = stage1.randomSplit([0.6, 0.4])
#trainingData.show(2,truncate=False)
#testData.show(2,truncate=False)

In [28]:
from pyspark.ml.classification import MultilayerPerceptronClassifier
layers = [17518, 5, 4, 3, 8]
FNN = MultilayerPerceptronClassifier(featuresCol="features" ,labelCol="indexedLabel", maxIter=300, layers=layers)
FNNModel = FNN.fit(trainingData).transform(testData)

In [29]:
FNNModel.show()

+--------+--------------------+--------------------+------------+--------------------+--------------------+----------+
|mono_ipc|         rawFeatures|            features|indexedLabel|       rawPrediction|         probability|prediction|
+--------+--------------------+--------------------+------------+--------------------+--------------------+----------+
|       C|(17518,[0,2,3,4,6...|(17518,[0,2,3,4,6...|         0.0|[7.86917423544461...|[0.42011571546086...|       1.0|
|       C|(17518,[0,2,4,19,...|(17518,[0,2,4,19,...|         0.0|[5.67270138747262...|[0.96143216333270...|       0.0|
|       C|(17518,[0,4,14,18...|(17518,[0,4,14,18...|         0.0|[8.27611533230929...|[0.70972009652484...|       0.0|
|       C|(17518,[5,6,10,16...|(17518,[5,6,10,16...|         0.0|[0.45946124204758...|[0.15760435821545...|       2.0|
|       F|(17518,[1,17,71,8...|(17518,[1,17,71,8...|         5.0|[-3.6533299451317...|[7.18238339490464...|       3.0|
|       G|(17518,[0,4,5,14,...|(17518,[0,4,5,14,

In [33]:
prob_FNN=FNNModel.select('probability').collect();prob_FNN[:10]

[Row(probability=DenseVector([0.4201, 0.577, 0.0006, 0.0, 0.0, 0.0017, 0.0, 0.0006])),
 Row(probability=DenseVector([0.9614, 0.0034, 0.0132, 0.0003, 0.0014, 0.0177, 0.0, 0.0025])),
 Row(probability=DenseVector([0.7097, 0.287, 0.0008, 0.0, 0.0, 0.0019, 0.0, 0.0005])),
 Row(probability=DenseVector([0.1576, 0.0671, 0.2503, 0.219, 0.0869, 0.1011, 0.0186, 0.0994])),
 Row(probability=DenseVector([0.0007, 0.0124, 0.028, 0.7732, 0.0799, 0.0136, 0.0558, 0.0364])),
 Row(probability=DenseVector([0.0009, 0.0132, 0.0322, 0.7633, 0.0792, 0.0143, 0.0586, 0.0384])),
 Row(probability=DenseVector([0.2849, 0.7071, 0.0037, 0.0, 0.0, 0.0023, 0.0, 0.002])),
 Row(probability=DenseVector([0.002, 0.9881, 0.006, 0.0002, 0.0001, 0.0003, 0.0, 0.0034])),
 Row(probability=DenseVector([0.9989, 0.0001, 0.0007, 0.0, 0.0, 0.0004, 0.0, 0.0])),
 Row(probability=DenseVector([0.0007, 0.0124, 0.0286, 0.7724, 0.0791, 0.0135, 0.0567, 0.0365]))]

In [30]:
labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel",labels=labelIndexer.labels)
labelConverter.transform(FNNModel).select('features','mono_ipc','predictedLabel').show()

+--------------------+--------+--------------+
|            features|mono_ipc|predictedLabel|
+--------------------+--------+--------------+
|(17518,[0,2,3,4,6...|       C|             A|
|(17518,[0,2,4,19,...|       C|             C|
|(17518,[0,4,14,18...|       C|             C|
|(17518,[5,6,10,16...|       C|             B|
|(17518,[1,17,71,8...|       F|             G|
|(17518,[0,4,5,14,...|       G|             G|
|(17518,[0,2,9,11,...|       A|             A|
|(17518,[0,4,9,25,...|       A|             A|
|(17518,[0,30,37,3...|       A|             C|
|(17518,[2,5,10,14...|       A|             G|
|(17518,[0,4,9,25,...|       B|             A|
|(17518,[0,1,2,4,9...|       C|             B|
|(17518,[0,1,4,13,...|       C|             C|
|(17518,[0,1,4,19,...|       C|             C|
|(17518,[0,4,14,18...|       C|             B|
|(17518,[0,4,12,13...|       H|             H|
|(17518,[2,5,13,14...|       H|             H|
|(17518,[0,1,4,5,1...|       C|             B|
|(17518,[0,2,

In [31]:
labelConverter.transform(FNNModel).select('indexedLabel','prediction').createOrReplaceTempView('table2')
predictionAndTarget = spark.sql('select indexedLabel as label, prediction from table2')

evaluatorMulti = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
acc = evaluatorMulti.evaluate(predictionAndTarget)

In [32]:
print(acc)

0.39263883094815843
