## pyspark LDA

In [None]:
import jieba
import time
import pyLDAvis
import pandas as pd
import numpy as np
import findspark
findspark.init('/usr/local/spark')
# 載入必要module
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
from pyspark.sql import types
from pyspark.ml.clustering import LDA, LDAModel
from pyspark.ml.feature import CountVectorizer
# 開啟 spark session
# pyspark session 目前需要手動設定使用到的參數
# spark.executor.memory worker 計算/cache相關記憶體大小
# spark.driver.memory 與 driver 相關的記憶體大小影響task分派，對演算速度影響大
# spark.driver.maxResultSize 設定分散資料collect回本地端的大小上限，SparkR有更改設定檔無限制，此處須設置。
ss = SparkSession.builder \
    .master( "spark://192.168.1.52:7077" ) \
    .appName( "talk.tw LDA" ) \
    .config( "spark.cores.max", "4" ) \
    .config( "spark.executor.memory", "12g" ) \
    .config( "spark.driver.memory", "24g" ) \
    .config( "spark.driver.maxResultSize", "16g" ) \
    .getOrCreate() 
# 讀取資料並整理
talk_tw = ss.read.csv( path = "hdfs://192.168.1.53:9000/corpus/ATS/talk.tw/20180905.tsv.gz", 
                       sep = "\t" )
talk_tw = talk_tw.toDF( 'category','url','author','tag','createTime','title','text')
talk_tw = talk_tw.where("url IS NOT NULL").where("text IS NOT NULL")
talk_tw = talk_tw.repartition(4)
# 匯入停用詞
stopWord_set = set()
stopWords_path = '/home/stat_jerry/stop_word_test1800.txt'
with open( stopWords_path, 'r', encoding='utf-8') as stopwords:
    for stopword in stopwords:
        stopWord_set.add(stopword.strip('\n'))
# 設定 UDF 使用 jieba 斷詞，並依停用詞來篩選。最後輸出格式為 array, 內容為 str
def cut_stopWord(w):
    l = []
    seg = jieba.cut(w)
    for word in seg:
        if word not in stopWord_set:
            if not word.isdigit():
                if len( word ) > 1:
                    l.append(word)
    return l
cut_udf = f.udf( cut_stopWord, types.ArrayType(types.StringType()) )
# 斷詞
talk_tw_cut = talk_tw.select( 'category', 'url', 'author', 'tag', 'title', cut_udf('text').alias('words'), 'createTime' )
talk_tw_cut = talk_tw_cut.where("words[0] IS NOT NULL")

### LDA 模型訓練

In [None]:
# 先將文字向量化
cv = CountVectorizer( vocabSize=2**20, inputCol='words', outputCol='features')
cv_model = cv.fit(talk_tw_cut)
result = cv_model.transform(talk_tw_cut)
# 訓練 LDA 模型
lda = LDA( featuresCol="features", k=15, maxIter=100, learningOffset=1024, learningDecay=0.7, seed=110, optimizer="em" )
lda_model = lda.fit(result)
lda_result = lda_model.transform(result)
# 存取LDA模型
lda_model.write().overwrite().save( "hdfs://192.168.1.53:9000/corpus/ml/LDA_model_20180905" )
cv_model.write().overwrite().save( "hdfs://192.168.1.53:9000/corpus/ml/CountVectorizer_model_20180905" )

### 

In [None]:
# 計算 pyLDAvis 所需的資料
# topic_term_dists
ttd = (lda_model.topicsMatrix().toArray().T / lda_model.topicsMatrix().toArray().T.sum(axis=1)[:, None]).tolist()
# doc_topic_dists
dtd = lda_result.rdd.map(lambda x: x['topicDistribution'].values).map(lambda y: [i for i in y]).collect()
# term_frequency
tf = result[['features']].rdd.map(lambda row: row['features'].toArray()).reduce(lambda x,y: [x[i]+y[i] for i in range(len(y))])
# doc_lengths
count_udf = f.udf( lambda x: x.numNonzeros(), types.IntegerType() )
dl = result.select( count_udf('features').alias('doc_lengths')).rdd.flatMap(lambda x: x).collect()
# vocab
v = cv_model.vocabulary

# 使用 pyLDAvis 繪圖 
ldavis = pyLDAvis.prepare( topic_term_dists=ttd, doc_topic_dists=dtd, doc_lengths=dl, vocab=v, term_frequency=tf,  sort_topics=False )
pyLDAvis.save_html(ldavis, '/home/stat_jerry/talk_LDAvis_0905.html')

# 將資料存回本地並計算個文章最高的 topic 是哪個
doc_topic_pd = lda_result.select('url','title','topicDistribution').toPandas()
topic_df = pd.DataFrame()
for i in range(doc_topic_pd.shape[0]):
    tmp = pd.DataFrame(doc_topic_pd['topicDistribution'][i].values)
    topic_df = pd.concat([topic_df, tmp], axis=1)
topic_df = topic_df.T
topic_df.columns = ["topic_" + str(i) for i in range(1,16)]
topic_df = topic_df.reset_index(drop=True)
dominant_topic = np.argmax(topic_df.values, axis=1)
doc_topic_pd['dominant_topic'] = (dominant_topic + 1)
doc_topic = pd.concat( [doc_topic_pd[['url', 'dominant_topic']], topic_df], axis=1)
ss.createDataFrame(doc_topic)\
  .repartition(1)\
  .write.csv( path = "hdfs://192.168.1.53:9000/ATS/Preprocess_Data/2018M08/docs_topic_table/talk_tw.csv",
              mode = "overwrite",
              compression = "gzip" )
# 將所有主題 top 30 個字回傳成 list，供後續
topics = lda_model.describeTopics(30)
topics_rdd = topics.rdd
pd.DataFrame(topics_rdd.map(lambda row: row['termIndices'])\
             .flatMap(lambda idx_list: [v[idx] for idx in idx_list])\
             .collect()).to_csv("LDA_kw_top30_0906.csv", header = None, index = None)
ss.stop()

# # 讀取模型的用法
# from pyspark.ml.clustering import DistributedLDAModel
# from pyspark.ml.feature import CountVectorizerModel
# lda_model_2 = DistributedLDAModel.load( "hdfs://192.168.1.53:9000/corpus/ml/LDA_model_20180906" )
# cv_model_2 = CountVectorizerModel.load("hdfs://192.168.1.53:9000/corpus/ml/CountVectorizer_model_20180906")