In [1]:
from pyspark.sql import SparkSession, Row
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]").setAppName("week8") 
sc = SparkContext(conf = conf)  # RDD 的入口 entry

spark = SparkSession.builder.config(conf=conf).getOrCreate() # DataFrame 的入口

In [18]:
trainingData = [["Chinese Beijing Chinese", "c"],\
                ["Chinese Chinese Nanjing", "c"],\
                ["Chinese Macao", "c"],\
                ["Australia Sydney Chinese","o"],\
               ]
testData = ["Chinese Chinese Chinese Australia Sydney"]
type(trainingData)

list

In [19]:
trainRDD = sc.parallelize(trainingData)
testRDD = sc.parallelize(testData)
trainRDD

ParallelCollectionRDD[66] at parallelize at PythonRDD.scala:195

In [20]:
# 把 RDD 的每一行转化成 Row() 的类型，为 DataFrame 作准备
trainRDD = trainRDD.map(lambda e: Row(descript=e[0], category=e[1]))
testRDD = testRDD.map(lambda e: Row(descript=e))
trainRDD.collect()

[Row(category='c', descript='Chinese Beijing Chinese'),
 Row(category='c', descript='Chinese Chinese Nanjing'),
 Row(category='c', descript='Chinese Macao'),
 Row(category='o', descript='Australia Sydney Chinese')]

In [21]:
# 转化成 DataFrame  
trainDF = spark.createDataFrame(trainRDD)
testDF = spark.createDataFrame(testRDD)
trainDF.show(truncate=False)

+--------+------------------------+
|category|descript                |
+--------+------------------------+
|c       |Chinese Beijing Chinese |
|c       |Chinese Chinese Nanjing |
|c       |Chinese Macao           |
|o       |Australia Sydney Chinese|
+--------+------------------------+



In [22]:
trainDF.select("descript").show()

+--------------------+
|            descript|
+--------------------+
|Chinese Beijing C...|
|Chinese Chinese N...|
|       Chinese Macao|
|Australia Sydney ...|
+--------------------+



In [23]:
trainDF.groupby("category").count().show()

+--------+-----+
|category|count|
+--------+-----+
|       o|    1|
|       c|    3|
+--------+-----+



In [25]:
## Tokenizer 
from pyspark.ml.feature import Tokenizer
from pyspark.sql.functions import col

# defined a Tokenizer
tokenizer = Tokenizer(inputCol="descript", outputCol="words")

tokenizedDF = tokenizer.transform(trainDF)

tokenizedDF.show(truncate=False)

+--------+------------------------+----------------------------+
|category|descript                |words                       |
+--------+------------------------+----------------------------+
|c       |Chinese Beijing Chinese |[chinese, beijing, chinese] |
|c       |Chinese Chinese Nanjing |[chinese, chinese, nanjing] |
|c       |Chinese Macao           |[chinese, macao]            |
|o       |Australia Sydney Chinese|[australia, sydney, chinese]|
+--------+------------------------+----------------------------+



In [26]:
testTokenizedDF = tokenizer.transform(testDF)
testTokenizedDF.show(truncate=False)

+----------------------------------------+----------------------------------------------+
|descript                                |words                                         |
+----------------------------------------+----------------------------------------------+
|Chinese Chinese Chinese Australia Sydney|[chinese, chinese, chinese, australia, sydney]|
+----------------------------------------+----------------------------------------------+



In [28]:
# CountVectorizer 
from pyspark.ml.feature import CountVectorizer
# defined a CountVectorizer
cv = CountVectorizer(inputCol="words", outputCol="features")
# Estimator fit to model
cvModel = cv.fit(tokenizedDF)
featuredDF = cvModel.transform(tokenizedDF)
featuredDF.show(truncate=False)


+--------+------------------------+----------------------------+-------------------------+
|category|descript                |words                       |features                 |
+--------+------------------------+----------------------------+-------------------------+
|c       |Chinese Beijing Chinese |[chinese, beijing, chinese] |(6,[0,3],[2.0,1.0])      |
|c       |Chinese Chinese Nanjing |[chinese, chinese, nanjing] |(6,[0,2],[2.0,1.0])      |
|c       |Chinese Macao           |[chinese, macao]            |(6,[0,1],[1.0,1.0])      |
|o       |Australia Sydney Chinese|[australia, sydney, chinese]|(6,[0,4,5],[1.0,1.0,1.0])|
+--------+------------------------+----------------------------+-------------------------+



In [30]:
testFeaturedDF = cvModel.transform(testTokenizedDF)
testFeaturedDF.show()

+--------------------+--------------------+--------------------+
|            descript|               words|            features|
+--------------------+--------------------+--------------------+
|Chinese Chinese C...|[chinese, chinese...|(6,[0,4,5],[3.0,1...|
+--------------------+--------------------+--------------------+



In [31]:
# StringIndexer 
from pyspark.ml.feature import StringIndexer

# defined a StringIndexer
indexer = StringIndexer(inputCol="category", outputCol="label")
indexedDF = indexer.fit(featuredDF).transform(featuredDF)

indexedDF.show()

+--------+--------------------+--------------------+--------------------+-----+
|category|            descript|               words|            features|label|
+--------+--------------------+--------------------+--------------------+-----+
|       c|Chinese Beijing C...|[chinese, beijing...| (6,[0,3],[2.0,1.0])|  0.0|
|       c|Chinese Chinese N...|[chinese, chinese...| (6,[0,2],[2.0,1.0])|  0.0|
|       c|       Chinese Macao|    [chinese, macao]| (6,[0,1],[1.0,1.0])|  0.0|
|       o|Australia Sydney ...|[australia, sydne...|(6,[0,4,5],[1.0,1...|  1.0|
+--------+--------------------+--------------------+--------------------+-----+



In [38]:
# Navie Bayes (调包使用)
from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(featuresCol='features', labelCol='label', predictionCol='nb_prediction', smoothing=1.0, modelType='multinomial')
nb_model = nb.fit(indexedDF)

nb_model.transform(testFeaturedDF).head().nb_prediction

0.0

In [40]:
# 使用 pipeLine
trainDF.show()

+--------+--------------------+
|category|            descript|
+--------+--------------------+
|       c|Chinese Beijing C...|
|       c|Chinese Chinese N...|
|       c|       Chinese Macao|
|       o|Australia Sydney ...|
+--------+--------------------+



In [41]:
testDF.show()

+--------------------+
|            descript|
+--------------------+
|Chinese Chinese C...|
+--------------------+



In [42]:
from pyspark.ml import Pipeline

nb_pipeLine = Pipeline(stages=[tokenizer, cv, indexer, nb])

pipeModel = nb_pipeLine.fit(trainDF)

resDF = pipeModel.transform(testDF)

resDF.head().nb_prediction

0.0