In [1]:
from pyspark import SparkConf, SparkContext, SQLContext  
from pyspark.sql import SparkSession   
from pyspark.ml.feature import Word2Vec,CountVectorizer  
from pyspark.ml.clustering import LDA, LDAModel  
from pyspark.sql.functions import col, udf  
from pyspark.sql.types import IntegerType,ArrayType,StringType  
import pylab as pl  

In [2]:
def to_word(termIndices):
  words = []  
  for termID in termIndices:
    words.append(vocab_broadcast.value[termID])      
  return words

In [3]:
#Load your document dataframe here
#================your code here==================

spark = SparkSession.builder \
    .appName('CSV_Handler').getOrCreate()

spark_df = spark.read.options(header=True, inferSchema=True) \
    .csv('./OutputStreaming.csv')

spark_df = spark_df.dropna(subset=['sentence'])

#==================================================
spark_df.show(5)

+--------------------+
|            sentence|
+--------------------+
|RT @intuslegens: ...|
|@GlNGERHEAD moi n...|
|RT @moussa_drc: P...|
|amo amo amo que i...|
|@TMZ Why would @A...|
+--------------------+
only showing top 5 rows



In [4]:
#CountVectorizer
#================your code here==================

from pyspark.ml.feature import HashingTF, IDF, Tokenizer

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(spark_df)

wordsData.show(5)

cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=100)
cvModel = cv.fit(wordsData)

cvResult = cvModel.transform(wordsData)
cvResult.show(5, truncate=False)

#==================================================

+--------------------+--------------------+
|            sentence|               words|
+--------------------+--------------------+
|RT @intuslegens: ...|[rt, @intuslegens...|
|@GlNGERHEAD moi n...|[@glngerhead, moi...|
|RT @moussa_drc: P...|[rt, @moussa_drc:...|
|amo amo amo que i...|[amo, amo, amo, q...|
|@TMZ Why would @A...|[@tmz, why, would...|
+--------------------+--------------------+
only showing top 5 rows

+--------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------+
|sentence                                                                                                                                    |words                                                    

In [5]:
#train LDA model, cluster the documents into 10 topics 
#================your code here==================

lda = LDA(k=10, seed=1, optimizer="em")
lda.setMaxIter(100)
ldaModel = lda.fit(cvResult)
ldaModel.setSeed(1)

#==================================================

DistributedLDAModel: uid=LDA_57890a30e6b6, k=10, numFeatures=100

In [6]:
transformed = ldaModel.transform(cvResult).select("topicDistribution")  
#show the weight of every topic Distribution 
transformed.show(5, truncate=False)  

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|topicDistribution                                                                                                                                                                                        |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[0.09379907872832405,0.09349974504736815,0.10739761411376647,0.09440674500305143,0.094104708504797,0.11510354577858058,0.09090933904371971,0.12108492766210116,0.09386228511348478,0.09583201100480668]  |
|[0.08937191573578426,0.152754320212833,0.09537803175476514,0.09933178149096641,0.09048311408148614,0.11010332810982244,0.08695681850431071,0.0869588230515238,0.10160144471056043,0.087

In [7]:
#The higher ll is, the lower lp is, the better model is.
ll = ldaModel.logLikelihood(cvResult)  
lp = ldaModel.logPerplexity(cvResult)
print("ll: ", ll)
print("lp: ", lp)

ll:  -282444.9106044657
lp:  4.0796872920682015


In [8]:
wordNumbers = 10
vocabArray = cvModel.vocabulary

sc = SparkContext.getOrCreate()

topicIndices = ldaModel.describeTopics(maxTermsPerTopic = wordNumbers)
vocab_broadcast = sc.broadcast(vocabArray)
udf_to_word = udf(to_word, ArrayType(StringType()))
 
topics = topicIndices.withColumn("words", udf_to_word(topicIndices.termIndices))
topics.show(5, truncate=False)

+-----+----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------+
|topic|termIndices                             |termWeights                                                                                                                                                                                                        |words                                            |
+-----+----------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------+
|0    |[4, 15, 1, 5, 37, 73, 74, 19, 12, 29]   |[0.3489735459621439

In [9]:
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())+ " words):")
topics = ldaModel.topicsMatrix()
print(topics)

Learned topics (as distributions over vocab of 100 words):
DenseMatrix([[4.19123430e-01, 2.12756105e-01, 3.83722738e-01, 2.34273925e-01,
              3.80921237e-01, 2.60662272e-01, 8.11219306e+03, 2.81637406e-01,
              3.32917700e-01, 8.30092936e+00],
             [8.82509938e+02, 6.82618810e-01, 5.88934396e+02, 7.67100860e+00,
              7.90961388e+02, 3.26656707e+01, 3.43525646e-01, 6.40765669e+00,
              9.56127940e+02, 1.47469586e+03],
             [9.63615339e+00, 1.80745605e+00, 8.39144715e+00, 7.50685217e+01,
              7.85507059e+00, 1.00647579e+03, 2.28004975e-01, 3.03335411e-01,
              2.58602259e+03, 1.15211638e+02],
             [5.68151557e+01, 3.61887904e-01, 1.46037366e+02, 5.30150732e-01,
              1.22618166e+02, 8.99607947e-01, 2.21134423e-01, 2.46037445e+03,
              1.37029247e+01, 1.04391545e+01],
             [2.19766405e+03, 1.40271041e-01, 4.95317878e+01, 1.51574137e-01,
              1.50279216e+02, 1.67817952e-01, 1.302