In [None]:
!pip install -r 'requirements.txt' 

In [None]:
import os
import pyspark

import pandas as pd
import re as re
import numpy as np

import nltk
nltk.download('stopwords')

from pyspark.ml.feature import CountVectorizer , IDF, StopWordsRemover
#from pyspark.mllib.linalg import Vector, Vectors as MLlibVectors
from pyspark.sql.functions import udf, col, size, explode, regexp_replace, trim, lower, lit
#from pyspark.mllib.clustering import LDA as MLlibLDA

from pyspark.ml.clustering import LDA

import matplotlib.pyplot as plt
import pyLDAvis

In [None]:
conf=pyspark.SparkConf().setAppName("TEST").setMaster("spark://54.72.254.85:7077")
conf.set("spark.executor.memory", "3g")
conf.set("spark.driver.memory", "3g")

sc = pyspark.SparkContext(conf=conf)
print(sc.version)

In [None]:
sqlContext = pyspark.sql.SQLContext(sc)

# reading the data
data = sqlContext.read.format("csv") \
   .options(header='true', inferschema='true') \
   .load(os.path.realpath("data/imdb_master.csv"))

print(data.count())
data.show()

In [None]:
reviews = data.rdd.map(lambda x : x['review']).filter(lambda x: x is not None)


In [None]:
StopWords = nltk.corpus.stopwords.words("english")

tokens = reviews                                                   \
    .map( lambda document: document.strip().lower())               \
    .map( lambda document: re.split(" ", document))                \
    .map( lambda word: [x for x in word if x.isalpha()])           \
    .map( lambda word: [x for x in word if len(x) > 3] )           \
    .map( lambda word: [x for x in word if x not in StopWords])    \
    .zipWithIndex()

In [None]:
df_txts = sqlContext.createDataFrame(tokens, ["list_of_words",'index'])

# TF
cv = CountVectorizer(inputCol="list_of_words", outputCol="raw_features", vocabSize=5000, minDF=10.0)
cvmodel = cv.fit(df_txts)

result_cv = cvmodel.transform(df_txts)
result_cv.show()


# IDF
idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(result_cv)
result_tfidf = idfModel.transform(result_cv)

In [None]:
print(type(df_txts))

In [None]:
num_topics = 10
max_iterations = 100

toTrain = result_tfidf[['index','features']].rdd.mapValues(MLlibVectors.fromML).map(list)
lda_model = MLlibLDA.train(toTrain, k=num_topics, maxIterations=max_iterations)

In [None]:
wordNumbers = 10 
topicIndices = sc.parallelize(lda_model.describeTopics(maxTermsPerTopic = wordNumbers))
vocabArray = cvmodel.vocabulary
def topic_render(topic):
    terms = topic[0]
    result = []
    for i in range(wordNumbers):
        term = vocabArray[terms[i]]
        result.append(term)
    return result

topics_final = topicIndices.map(lambda topic: topic_render(topic)).collect()

for topic in range(len(topics_final)):
    print ("Topic" + str(topic) + ":")
    for term in topics_final[topic]:
        print (term)
    print ('\n')

In [None]:
lda = LDA(k=num_topics, maxIter=max_iterations)
model = lda.fit(result_tfidf)

In [None]:
#save model
model.save("saved_model.h5")

In [None]:
transformed = model.transform(result_tfidf)

In [None]:
def format_data_to_pyldavis(df_filtered, count_vectorizer, transformed, lda_model):
    xxx = df_filtered.select((explode(df_filtered.words_filtered)).alias("words")).groupby("words").count()
    word_counts = {r['words']:r['count'] for r in xxx.collect()}
    word_counts = cvmodel.vocabulary
    #word_counts = [word_counts[w] for w in count_vectorizer.vocabulary]


    data = {'topic_term_dists': np.array(lda_model.topicsMatrix().toArray()).T, 
            'doc_topic_dists': np.array([x.toArray() for x in transformed.select(["topicDistribution"]).toPandas()['topicDistribution']]),
            'doc_lengths': [r[0] for r in df_filtered.select(size(df_filtered.words_filtered)).collect()],
            'vocab': count_vectorizer.vocabulary,
            'term_frequency': word_counts}

    return data

def filter_bad_docs(data):
    bad = 0
    doc_topic_dists_filtrado = []
    doc_lengths_filtrado = []

    for x,y in zip(data['doc_topic_dists'], data['doc_lengths']):
        if np.sum(x)==0:
            bad+=1
        elif np.sum(x) != 1:
            bad+=1
        elif np.isnan(x).any():
            bad+=1
        else:
            doc_topic_dists_filtrado.append(x)
            doc_lengths_filtrado.append(y)

    data['doc_topic_dists'] = doc_topic_dists_filtrado
    data['doc_lengths'] = doc_lengths_filtrado

In [None]:
# FORMAT DATA AND PASS IT TO PYLDAVIS
remover = StopWordsRemover(inputCol="list_of_words", outputCol="words_filtered")
df_txts = remover.transform(df_txts)#.show(truncate=False)
df_txts.show()

In [None]:
data = format_data_to_pyldavis(df_txts, cvmodel, transformed, model)
filter_bad_docs(data) # this is, because for some reason some docs apears with 0 value in all the vectors, or the norm is not 1, so I filter those docs.


In [None]:
py_lda_prepared_data = pyLDAvis.prepare(**data)

In [None]:
pyLDAvis.display(py_lda_prepared_data)