## Discover Topics with Spark LDA


### Workflow of LDA_Pyspark.ipynb

> <b> Load Cleaned Data (from Data_Cleaning.py) into Dataframe 

> <b> Vectorization: create tf-idf feature vectors

In Spark Maching Learning Library, Tf-Idf is separated into two parts - TF and IDF to make them flexible. Therefore, CountVectorizer is used first to generate the term frequency vector. IDF then takes feature vectors created from CountVectorizer and scales each column (token), down-weighting columns (tokens) which appear frequently in a corpus.



> <b> Train & Evaluate Base Model: train LDA model on complete dataset and evaluate
    
By training a base LDA model with complete dataset, a couple of topics would be discovered initially and evaluated by human judgement through visulization, which could be further improved by enriching stopwords and model parameter tunning.
    
> <b> Model Improvement: 

- Filtering with TF-IDF score: identifying reasonable threshold and adding underscored tokens to stoplist
- Parameter Tuning:
Dirichlet Parameters: Alpha, Beta, Number of Topics (k)

### Load Data into Spark DataFrame

In [88]:
import os
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext, Row
from pyspark.sql import functions as F
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.linalg import Vector, Vectors, DenseVector
from pyspark.ml.feature import CountVectorizer, IDF, CountVectorizerModel
from pyspark.ml.clustering import LDA, LDAModel, LocalLDAModel
from pyspark.sql.types import ArrayType, FloatType, StringType, DoubleType, IntegerType

In [12]:
spark = SparkSession.builder.getOrCreate()

#Read from json file (single-line mode by records)
cleaned = spark.read.json("cleaning_test_output.json") 
cleaned.createOrReplaceTempView('cleaned')

#Drop cleaned text of null values(852 rows removed, 134145 rows left)
query = '''SELECT * FROM cleaned WHERE cleaned_text IS NOT NULL''' 
text = spark.sql(query)

#Split cleaned text into column of tokens, create index column
text = text.withColumn('tokens',F.split(F.trim(text.cleaned_text)," "))\
           .withColumn("index", F.monotonically_increasing_id())\
           .select("*")
text.createOrReplaceTempView('cleaned_indexed')

In [3]:
#Schema of the text dataframe
text.printSchema()

root
 |-- body: string (nullable = true)
 |-- cleaned_text: string (nullable = true)
 |-- confidence: double (nullable = true)
 |-- lang: string (nullable = true)
 |-- modr_status: long (nullable = true)
 |-- name: string (nullable = true)
 |-- post_id: long (nullable = true)
 |-- post_key: string (nullable = true)
 |-- post_type: string (nullable = true)
 |-- title: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- index: long (nullable = false)



In [4]:
#Preview of the first entry
text.take(1)

[Row(body='Is anyone pdl1 negative and taking opdivo or keytruda ?  Are you having good results?', cleaned_text='opdivo keytruda results ', confidence=1.0, lang='en', modr_status=2, name='Lung Cancer Survivors', post_id=749195, post_key='disc.749195', post_type='disc', title='Pdl1 negative', tokens=['opdivo', 'keytruda', 'results'], index=0)]

### Create TF-IDF Feature Vectors

#### Tf-Idf Sparse Vector


In [5]:
#Check vocabulary size for setting vocabSize parameters
unfold_df = text.withColumn('word',F.explode(F.split(text.cleaned_text, "\s"))).where('word != ""')
vocab_df = unfold_df.groupBy("word").agg(F.count("tokens").alias('df')) #84084

In [6]:
#Create tfidf vectors (with all words)
# TF
query = '''SELECT tokens, index FROM cleaned_indexed''' 
df_text = spark.sql(query)
cv = CountVectorizer(inputCol="tokens", outputCol="raw_features", vocabSize=84084)
cvModel = cv.fit(df_text)
result_cv = cvModel.transform(df_text)

# IDF
idf = IDF(inputCol="raw_features", outputCol="features")
idfModel = idf.fit(result_cv)
result_tfidf = idfModel.transform(result_cv) 

#Check DocSize and VocabSize
tfidf = result_tfidf.select('index','features')
print(tfidf.count(), len(cvModel.vocabulary))

134145 84084


In [7]:
tfidf.take(1)

[Row(index=0, features=SparseVector(84084, {41: 2.6916, 641: 5.0757, 1623: 5.9602}))]

### Train LDA Model on Complete Dataset and Evaluate

#### Train LDA model with All Tokens

#### Identify Top Terms 

In [50]:
num_topics = 20
max_iterations = 100

lda = LDA(featuresCol="features",k=num_topics, seed=1, optimizer="online", maxIter = max_iterations)
ldaModel=lda.fit(tfidf)
lda_df=ldaModel.transform(tfidf)

ldatopics = ldaModel.describeTopics()
numTopics = ldatopics.count()

vocabulary = cvModel.vocabulary
ListOfIndexToWords = F.udf(lambda wl: list([vocabulary[w] for w in wl]), ArrayType(StringType()))
FormatNumbers = F.udf(lambda nl: [float("{:1.4f}".format(x)) for x in nl], ArrayType(FloatType()))

#Describe top 20 topics(10 top words per topic)
toptopics = ldatopics.withColumn('topic',ldatopics.topic + 1)\
                     .withColumn('words', ListOfIndexToWords(ldatopics.termIndices))\
                     .withColumn('weights', FormatNumbers(ldatopics.termWeights))

toptopics.show(truncate=False, n = numTopics)
print('Topics:', numTopics, 'Vocabulary:', len(vocabulary))

+-----+------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|topic|termIndices                                                 |termWeights                                                                                                                                                                                                                     |words                                                                                                  |weights                                                                         |
+-----+-----------------------------------

#### Generate Evaluation Metrics

In [109]:
#Check Log Likelihood of base LDA model 
ll = ldaModel.logLikelihood(tfidf)
ll

-190290427.02601972

In [154]:
#Check Log Perplexity of base LDA model
lp = ldaModel.logPerplexity(tfidf)
lp

8.434250933211537

#### Save Base LDA Model

In [51]:
#Check if the model is distributed and save the model
# print(ldaModel.isDistributed())
# path = os.getcwd()
# cvModel.save(path + 'CVModel_stem')
# ldaModel.save(path + 'LDAModel_base')
# lda.save(path + 'LDA_base')

False


### Model Improvement (more meaningful topics)

#### Generate TF-IDF-Term List

In [15]:
vocabulary = cvModel.vocabulary
ListOfIndexToWords = F.udf(lambda wl: list([vocabulary[w] for w in wl]), ArrayType(StringType()))
ExtractValues = F.udf(lambda vec: vec.values.tolist(), ArrayType(DoubleType()))
ExtractIndex = F.udf(lambda vec: vec.indices.tolist(), ArrayType(IntegerType()))
result_tfidf = result_tfidf.withColumn('feature_list',ExtractValues(result_tfidf.features))\
                           .withColumn('term_index', ExtractIndex(result_tfidf.features))
result_tfidf = result_tfidf.withColumn('term_list', ListOfIndexToWords(result_tfidf.term_index))

tfidf_termlist = result_tfidf.select('term_list','index','feature_list')\
                           .withColumn("tmp", F.arrays_zip('term_list','feature_list'))\
                           .withColumn('tmp', F.explode('tmp'))\
                           .select('index',F.col('tmp.term_list'),F.col('tmp.feature_list'))\
                           .orderBy('index','feature_list')

#Preview the first entry of TF-IDF-Term list
tfidf_termlist.take(1)

[Row(index=0, term_list='results', feature_list=2.691643845918748)]

In [149]:
#Get terms with tfidf <= 2
tfidf_filter_2 = tfidf_termlist.filter(tfidf_termlist.feature_list <= 2).toPandas()['term_list'] #146126 tfidf <= 2#
tfidf_filter_2.unique()

array(['months', 'years', 'day', 'cancer', 'year', 'thanks', 'help',
       'time', 'pain'], dtype=object)

In [21]:
#Get terms with tfidf <= 3
tfidf_filter_3 = tfidf_termlist.filter((tfidf_termlist.feature_list <= 3) & (tfidf_termlist.feature_list > 2)).toPandas()['term_list']
tfidf_filter_3.unique()

array(['results', 'symptoms', 'things', 'blood', 'disease', 'body',
       'family', 'diagnosis', 'problems', 'treatment', 'husband', 'stage',
       'lung', 'doctor', 'days', 'chemo', 'way', 'need', 'hope', 'post',
       'doctors', 'care', 'love', 'share', 'test', 'hospital',
       'information', 'time', 'health', 'weeks', 'advice', 'thing',
       'heart', 'week', 'work', 'area', 'dr', 'question', 'surgery',
       'people', 'lot', 'experience', 'times', 'morning', 'life', 'night',
       'support', 'today', 'past', 'month', 'home', 'end', 'problem',
       'effects', 'use', 'issues'], dtype=object)

In [22]:
#Get terms with tfidf <= 4
tfidf_filter_4 = tfidf_termlist.filter((tfidf_termlist.feature_list <= 4) & (tfidf_termlist.feature_list > 3)).toPandas()['term_list']
tfidf_filter_4.unique()

array(['help', 'march', 'feet', 'ideas', 'weight', 'appointment',
       'medication', 'daughter', 'condition', 'pain', 'center',
       'thoughts', 'suggestions', 'oncologist', 'water', 'check', 'guess',
       'cell', 'effect', 'monday', 'friday', 'size', 'scan', 'advance',
       'case', 'answer', 'cancer', 'pet', 'place', 'walk', 'kind', 'bed',
       'mind', 'house', 'world', 'biopsy', 'feeling', 'point', 'friends',
       'years', 'yesterday', 'situation', 'god', 'depression', 'head',
       'radiation', 'neck', 'year', 'list', 'chest', 'doc', 'kids',
       'meds', 'talk', 'tests', 'stomach', 'food', 'feel', 'thyroid',
       'son', 'fact', 'breath', 'skin', 'hand', 'july', 'experiences',
       'procedure', 'hours', 'bladder', 'dose', 'fatigue', 'syndrome',
       'process', 'patients', 'follow', 'visit', 'june', 'breast',
       'brain', 'diet', 'story', 'face', 'months', 'minutes', 'children',
       'order', 'medications', 'questions', 'idea', 'pressure', 'lungs',
       'ro

#### Add Stopwords & Filter

In [16]:
#Stopwords from Tfidf = 3
noun_stpwd = \
            ['day','month','year','days','months','years','today','everyday',
             'time', 'weeks','dr','times', 'thing','things','way', 'problems','thanks',
            'husband','wife','doctor','doctors','care','results', 'symptoms', 
             'cancer', 'disease', 'body', 'family', 'diagnosis', 'treatment', 
             'husband', 'stage','help', 'need', 'hope', 'post','share', 'test',
             'hospital','information', 'health', 'advice','heart', 'week', 'work', 
             'area', 'dr', 'question', 'surgery','people', 'pain', 'lot', 'experience',
             'morning', 'life','night', 'support', 'past', 'home', 'end','problem', 'effects', 'use', 'issues',
            'cm','mm','wanna', 'hour','info','april','anybody','lots','mom','state',
            'list', 'doc', 'pm','am','october','plan','change','changes','result','options','ones',
            'december','mg','period','start','number','january','patient','want','type','person',
            'treatments','wish','stuff','right','issue','minutes','june','meds','son','july',
            'hours','hour','god','friend','pet','march','daughter','thoughts','suggestions',
            'guess','ideas','appointment','effect','world','feeling','point','monday','friday',
            'bed','answer','kind','case','pharmacy','love','hugs','dad','house','place','child','li']

#Words more than 1 letters (not common for medical abbreviations)
one_characters = [word for word in cvModel.vocabulary if len(word) == 1]

swRemover = StopWordsRemover(inputCol='tokens', outputCol="filtered")
swRemover.setStopWords(swRemover.getStopWords() + noun_stpwd + one_characters)

df_text_new = swRemover.transform(df_text).select('index','filtered')\
                       .withColumn('tokens', F.col('filtered'))\
                       .select('index','tokens')

#Save filtered dataframe
# path = os.getcwd()
# df_text_new.toPandas().to_json(path +'/filtered_text.json', lines = True, orient = 'records')

#### Re-train Model and View Topics

In [12]:
# TF
cv = CountVectorizer(inputCol="tokens", outputCol="raw_features", vocabSize=83920)
cvModel_1 = cv.fit(df_text_new)
result_cv_1 = cvModel_1.transform(df_text_new)

# IDF
idf = IDF(inputCol="raw_features", outputCol="features")
idfModel_1 = idf.fit(result_cv_1)
result_tfidf_1 = idfModel_1.transform(result_cv_1) 

#Check DocSize and VocabSize
tfidf_new = result_tfidf_1.select('index','features')

In [3]:
#Save new tfidf
# tfidf_new.write.json("tfidf_new")

In [13]:
#Re-train the model
num_topics = 20
max_iterations = 100

lda_1 = LDA(featuresCol="features",k=num_topics, seed=1, optimizer="online", maxIter = max_iterations)
ldaModel_1=lda_1.fit(tfidf_new)
#lda_df_1=ldaModel_1.transform(tfidf_new)

ldatopics_1 = ldaModel_1.describeTopics()
numTopics = ldatopics_1.count()

vocabulary_1 = cvModel_1.vocabulary
ListOfIndexToWords = F.udf(lambda wl: list([vocabulary_1[w] for w in wl]), ArrayType(StringType()))
FormatNumbers = F.udf(lambda nl: [float("{:1.4f}".format(x)) for x in nl], ArrayType(FloatType()))

#Describe top 20 topics(10 top words per topic)
toptopics_1 = ldatopics_1.withColumn('topic',ldatopics_1.topic + 1)\
                     .withColumn('words', ListOfIndexToWords(ldatopics_1.termIndices))\
                     .withColumn('weights', FormatNumbers(ldatopics_1.termWeights))

toptopics_1.show(truncate=False, n = numTopics)
print('Topics:', numTopics, 'Vocabulary:', len(vocabulary_1))

+-----+-----------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------+
|topic|termIndices                                                |termWeights                                                                                                                                                                                                                   |words                                                                                       |weights                                                                         |
+-----+-----------------------------------------------------------+---

In [20]:
ldaModel_1.topicsMatrix()

DenseMatrix(83920, 20, [7038.5126, 4597.086, 1520.6349, 184.8832, 724.8405, 72.6162, 30.8562, 2221.0819, ..., 0.1113, 5.7495, 0.4829, 0.0944, 0.2156, 0.1037, 0.1086, 0.1097], 0)

In [14]:
#Check if the model is distributed and save the model
print(ldaModel_1.isDistributed())
path = os.getcwd()
cvModel_1.save(path + 'CVModel_1')
ldaModel_1.save(path + 'LDAModel_1')
lda_1.save(path + 'LDA_base_1')

False


In [None]:
#Check Log Likelihood of base LDA model 
ll = ldaModel.logLikelihood(tfidf)
ll

In [16]:
#Check Log Perplexity of base LDA model 
lp = ldaModel_1.logPerplexity(tfidf_new)
lp

8.32607268178269

#### Descriptive Exploration of the Generated Topics

In [22]:
#Load Model - ldaModel_1
path = os.getcwd()
usedLDAModel = LocalLDAModel.load(path + 'LDAModel_1')
usedCVModel = CountVectorizerModel.load(path + 'CVModel_1')
#Load saved filtered data
df_text_new = spark.read.json(path+ '/filtered_text.json')
#Load saved tfidf
from pyspark.sql.types import StructField, StructType , LongType, StringType, IntegerType
from pyspark.ml.linalg import SparseVector, VectorUDT, SparseVector
tfidfSchema = StructType([StructField("index", LongType(), True), StructField("features", VectorUDT(), True)])
tfidf_new = spark.read.schema(tfidfSchema).json("tfidf_new").sort(F.col("index"))

In [40]:
#Count docs for each topic
countTopDocs = (usedLDAModel
                .transform(tfidf_new)
                .select("topicDistribution")
                .rdd.map(lambda r: Row( nTopTopic = int(np.argmax(r)))).toDF()
                .groupBy("nTopTopic").count().sort("nTopTopic")) \
                .withColumn('topic', F.col('nTopTopic')+1) \
                .select('topic','count').toPandas()

In [63]:
#Plot "Document Counts by Topic"
import plotly.plotly as py
import plotly.graph_objs as go

data = [go.Bar(x=countTopDocs['topic'],
            y=countTopDocs['count'].astype('str'))]

py.iplot(data, filename='Topic Distribution across Topics')

High five! You successfully sent some data to your account on plotly. View your plot in your browser at https://plot.ly/~xinyuan_0420/0 or inside your plot.ly account where it is named 'Topic Distribution across Topics'


In [65]:
#PyLDAVis
#Required Input for prepare.py
# topic_term_dists : array-like, shape (`n_topics`, `n_terms`)
#         Matrix of topic-term probabilities. Where `n_terms` is `len(vocab)`.
# doc_topic_dists : array-like, shape (`n_docs`, `n_topics`)
#         Matrix of document-topic probabilities.
# doc_lengths : array-like, shape `n_docs`
#         The length of each document, i.e. the number of words in each document.
#         The order of the numbers should be consistent with the ordering of the
#         docs in `doc_topic_dists`.
# vocab : array-like, shape `n_terms`
#         List of all the words in the corpus used to train the model.
# term_frequency : array-like, shape `n_terms`
#         The count of each particular term over the entire corpus. The ordering
#         of these counts should correspond with `vocab` and `topic_term_dists`.

import pyLDAvis
#topicsMatrix() - 'topic_term_dists'
#topicDistribution - 'doc_topic_dists'
#doc_length - tokens count
#usedCVModel.vocabulary - 'vocab'

usedLDAModel.vocabSize()

83920

In [134]:
#doc length
#cleaned.select('body').take(1)
df_text_new.take(1)

[Row(index=0, tokens=['opdivo', 'keytruda'])]

In [None]:
def load_LDA_model(ldamodel, cvmodel, doc_vect, doc_df):
    def dense_to_array(v):
        new_array = list([float(x) for x in v])
        return new_array
    dense_to_array_udf = F.udf(dense_to_array, ArrayType(FloatType()))
    
    def sparse_to_array(v):
        v = DenseVector(v)
        new_array = list([float(x) for x in v])
        return new_array
    sparse_to_array_udf = F.udf(sparse_to_array, ArrayType(FloatType()))
    
    topic_term_dists = ldamodel.topicsMatrix().toArray().tolist()
    doc_topic_dists = ldaModel.transform(text_vect)\
                                           .withColumn('topicDistribution', dense_to_array_udf('topicDistribution'))\
                                           .select('topicDistribution').toPandas().topicDistribution.values.tolist()
    doc_lengths = 
    vocab = cvmodel.vocabulary
    term_frequency = cvmodel.transform(doc_df)\
                            .withColumn('raw_features', sparse_to_array_udf('raw_features'))\
                            .select('raw_features').toPandas().raw_features.values.tolist()
    
    data = {'topic_term_dists': topic_term_dists, 
            'doc_topic_dists': doc_topic_dists,
            'doc_lengths': ,
            'vocab': vocab,
            'term_frequency': term_frequency }
    return data

In [None]:
movies_model_data = load_LDA_model('data/movie_reviews_input.json')

print('Topic-Term shape: %s' % str(np.array(movies_model_data['topic_term_dists']).shape))
print('Doc-Topic shape: %s' % str(np.array(movies_model_data['doc_topic_dists']).shape))

In [None]:
#Identify top docs for each topics
countVectors = (result_cv.select("index", "raw_features").cache())
df = ldaModel.transform(countVectors)

topWords = ldatopics.select(ListOfIndexToWords(ldatopics.termIndices).alias('words')).take(numTopics)

#Show single top topic
nTopDoc = 1  

for i in range(0, numTopics):
    ntopic = i  # which topic 
    print('Topic ' + str(ntopic) + '\n')  

    df_sliced = df.select("index", "topicDistribution") \
        .rdd.map(lambda r: Row(ID=int(r[0]), weight=float(r[1][ntopic]))).toDF()

    DocIDs = df_sliced.sort(df_sliced.weight.desc()).take(nTopDoc)
    print('Top Document(s):',DocIDs)
    for d_id in DocIDs:
        df_text.filter(df_text.index == d_id[0]) \
            .select('title', 'body') \
            .show(truncate=False)

    print('Top terms:')
    print(topWords[ntopic][0], '\n')
    print('===================================================')