In [35]:
from pyspark import SparkConf, SparkContext,SQLContext  
from pyspark.sql import SparkSession   
from pyspark.ml.feature import Word2Vec,CountVectorizer  
from pyspark.ml.clustering import LDA, LDAModel  
from pyspark.sql.functions import col, udf  
from pyspark.sql.types import IntegerType,ArrayType,StringType  
import pylab as pl  

In [36]:
def to_word(termIndices):
  words = []  
  for termID in termIndices:
    words.append(vocab_broadcast.value[termID])      
  return words

In [42]:
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

spark_df = spark.read.options(inferschema='true') \
                .csv("gs://6893_bda/hw2/stream_data.csv")
spark_df=spark_df.select(split(col("_c0")," ") \
                .alias("words")) \
                .drop("_c0")

spark_df.show()

+--------------------+
|               words|
+--------------------+
|[I, absolutely, A...|
|[Java, Vs, Python...|
|[voulu, un, grec,...|
|[Pareil, Il, pris...|
|[Music, Academy, ...|
|[Tarps,, tents,, ...|
|[voulu, un, grec,...|
|[We, drive, effic...|
|[Check, out, my, ...|
|[Hey,, nice, bone...|
|[lembro, como, so...|
|[WHO, WITH, A, DE...|
|[@Tina69911364, @...|
|[alguem, cria, um...|
|[@Neptvn08, Comme...|
|[une, dinguerie, ...|
|[Y, a, une, gross...|
|[Je, te, cache, p...|
|[@JAPANFESS, seta...|
|[Femme, rechercha...|
+--------------------+
only showing top 20 rows



In [43]:
#CountVectorizer

cv=CountVectorizer(inputCol="words", \
               outputCol="features", \
               vocabSize=3,minDF=6)
model=cv.fit(spark_df)
cvResult=model.transform(spark_df)
cvResult.show(truncate=False)


+----------------------------------------------------------------------------------------------------------------------------------+-------------+
|words                                                                                                                             |features     |
+----------------------------------------------------------------------------------------------------------------------------------+-------------+
|[I, absolutely, ADORED, ETERNALS,, but, I, recognize, not, cup, of, Marvel, tea., This, might, not, make, a, whole, lot, of]      |(3,[0],[1.0])|
|[Java, Vs, Python, For, Data, Science, #cybersecurity, #devops, #100DaysOfCode, #ai, #codenewbie, #machinelearning, #DEVCommunity]|(3,[],[])    |
|[voulu, un, grec, puis, suis, dit, non, manger, la, maison, et, jsuis, donc]                                                      |(3,[],[])    |
|[Pareil, Il, pris, de, ma, poche, et, quand, je, lui, prends, des, mains, il, me, dit]                               

In [44]:
#train LDA model, cluster the documents into 10 topics 

lda = LDA(k=10, maxIter=100)
ldaModel=lda.fit(result)


In [45]:
transformed = ldaModel.transform(cvResult) \
                    .select("topicDistribution")  
#show the weight of every topic Distribution 
transformed.show(truncate=False)  

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|topicDistribution                                                                                                                                                                                             |
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|[0.6005681603306324,0.0386782389613483,0.09000511094083564,0.03867847382487296,0.03867836980715218,0.03867848781002219,0.038678166703342405,0.038678112780249706,0.03867829452459278,0.038678584316951425]    |
|[0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0]                                                                                                                          

In [46]:
#The higher ll is, the lower lp is, the better model is.
ll = ldaModel.logLikelihood(cvResult)  
lp = ldaModel.logPerplexity(cvResult)
print("ll: ", ll)
print("lp: ", lp)

ll:  -1188.4912987714956
lp:  1.9612067636493327


In [47]:
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())+ " words):")
topics = ldaModel.topicsMatrix()
print(topics)

Learned topics (as distributions over vocab of 3 words):
DenseMatrix([[23.39368276,  0.19560305,  0.18781267,  0.1991493 ,  0.19822299,
               0.20680644,  0.18875968,  0.1853626 ,  0.19743615,  0.20908859],
             [ 0.18039432,  0.17821403, 14.43101239,  0.21552116,  0.18505439,
               0.19326789,  0.19539444,  0.17922252,  0.17432955,  0.20810173],
             [ 0.17801635,  0.18708805, 14.42455494,  0.19929065,  0.2100472 ,
               0.20094358,  0.18850754,  0.19942027,  0.19639779,  0.20825067]])
