# Spark MLlib

In [None]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.\
    master("local[*]").\
    config("spark.executor.memory", "4g").\
    config("spark.driver.memory", "4g").\
    config("spark.ui.showConsoleProgress", "false").\
    appName("MLlib").\
    getOrCreate()
sc = spark.sparkContext
# sc.setLogLevel("ERROR")
print(spark)
print(sc)

### 数据读取

我们依然使用弹幕数据：

In [None]:
# cids = [144541892, 144541943, 160377038, 148952771, 150894103, 153392221, 156629080, 159982308, 162395026]
cids = [144541892, 144541943, 160377038, 148952771, 150894103]
jsons = [f"data/lec14-danmu-{cid}.json" for cid in cids]
jsons

In [None]:
df = spark.read.json(jsons, multiLine=True)
df.count()

出于演示目的，我们随机抽取一小部分数据并缓存：

In [None]:
df_small = df.sample(withReplacement=False, fraction=0.001, seed=123)
df_small.cache()
df_small.show(n=15)
df_small.count()

### 数据转换与特征提取

除了 PySpark 自带的变换操作，我们还可以直接编写 Python 函数（基于 Pandas）对数据进行变换，如对弹幕进行分词。

In [None]:
import jieba
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

def seg_words(danmu):
    # 弹幕通常包含大量空格以醒目，但破坏语义，先移除
    danmu = danmu.replace(" ", "")
    # 生成一个含有切分后词语的迭代器
    cut = jieba.lcut(danmu, cut_all=False)
    # 去掉多余的空格
    cut = filter(lambda x: x != " ", cut)
    # 再用空格将词语合并
    return " ".join(cut)

seg_words_udf = udf(seg_words, StringType())

In [None]:
df_seg = df_small.withColumn("seg", seg_words_udf(df_small.content))
df_seg.cache()
df_seg.show(n=10)

接下来使用 MLlib 中的 `Tokenizer` 转换器来将词语转为列表：

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

tok = Tokenizer(inputCol="seg", outputCol="words")
df_tok = tok.transform(df_seg)
df_tok.select("seg", "words").show(n=10)

移除停用词：

In [None]:
stop_words = ["的", "了", "是", "，", "。", "？", "！", "：", "（", "）", "“", "”", ".", "…", "."]
rmstop = StopWordsRemover(inputCol="words", outputCol="filtered", stopWords=stop_words)
df_rmstop = rmstop.transform(df_tok)
df_rmstop.select("words", "filtered").show(n=10)

进而使用 `CountVectorizer` 来统计词频：

In [None]:
from pyspark.ml.feature import CountVectorizer, IDF

# minDF 表示进入字典的词最少需要在多少句子（弹幕）中出现
# vocabSize 表示取词频前几位的词语作为字典
counter = CountVectorizer(inputCol="filtered", outputCol="features",
                          minDF=10, vocabSize=500)

counter_model = counter.fit(df_rmstop)
print(counter_model.vocabulary)

将词频统计器应用到 `DataFrame` 上：

In [None]:
df_freq = counter_model.transform(df_rmstop)
df_freq.select("filtered", "features").show()

再用 `IDF` 对词频进行规约化：

In [None]:
idf = IDF(inputCol="features", outputCol="scaled_features")
idf_model = idf.fit(df_freq)
df_scaled = idf_model.transform(df_freq)
df_scaled.select("content", "scaled_features").show(n=15, truncate=50)

### 模型训练

我们利用得到的特征对弹幕进行 K-means 聚类：

In [None]:
from pyspark.ml.clustering import KMeans

kmeans = KMeans().setK(10).setSeed(123)
kmeans.setMaxIter(100)
kmeans.setFeaturesCol("scaled_features")

kmeans_model = kmeans.fit(df_scaled)
kmeans_model.setPredictionCol("cluster_labels")

df_pred = kmeans_model.transform(df_scaled)

此时 `df_pred` 中包含了最终的聚类结果和若干中间变量：

In [None]:
df_pred.printSchema()

查看聚类结果：

In [None]:
df_pred.select("content", "cluster_labels").groupBy("cluster_labels").count().show()

最后将聚类结果与原始弹幕和剧集相对照：

In [None]:
video_info = spark.read.json("data/lec14-video-data.json", multiLine=True)
video_title = video_info.select("cid", "title")
video_title.show()

In [None]:
df_res = df_pred.select("cid", "content", "cluster_labels").filter("cluster_labels > 0")
df_res.join(video_title, df_res.cid == video_title.cid, "inner").drop(df_res.cid).\
    drop(video_title.cid).sort("cluster_labels").show(n=300)