# Readme

This is a wordflow showing how to do LDA topic model from reading files, data pre-processing (text cleaning and getting ready for NLP), building LDA model and visualizing LDA results

In [None]:
# import useful packages
import operator
from pyspark.sql import functions as fn
from pyspark.ml.feature import Tokenizer
import nltk
from pyspark.ml.feature import StopWordsRemover
from nltk.stem import WordNetLemmatizer
from pyspark.sql import Row
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml import Pipeline
from pyspark.ml.clustering import LDA
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Reading Files into Spark

In [None]:
# read files into spark, producing a DataFrame has two columns  FILENAME | CONTENT
df = spark.read.csv(
    "input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)


# Data Pre-processing

Before doing LDA model, we should preprocess the text to get ready for analyzing. First, we should count number of words in each text and then remove the corpus with too few words.

In [None]:
# add a column of word count to remove the rows with too few number of words
df_wc = df.withColumn('wordCount', fn.size(fn.split(fn.col('CONTENT'), " ")))

To better view distribution of word count, let's register the dataframe as temp view to be used by SQL.

In [None]:
# register the Spark DataFrame as a Temp view to be used by SQL
df_wc.createOrReplaceTempView("df")

In [None]:
%sql
-- to see the distribution of word count so that documents with too few words can be removed
SELECT l.wordCount
       ,sum(s.count) as subtotal
       ,sum(s.count)*100.0/(select sum(count) from (select round(wordCount*1.0/5)*5 as wordCount,count(filename) as count from df group by round(wordCount*1.0/5)*5)) as percent      
FROM
(
select round(wordCount*1.0/5)*5 as wordCount
       ,count(filename) as count
from df
group by round(wordCount*1.0/5)*5
)l
LEFT JOIN
(
select round(wordCount*1.0/5)*5 as wordCount
       ,count(filename) as count
from df
group by round(wordCount*1.0/5)*5
)s on l.wordCount >= s.wordCount
group by l.wordCount
order by l.wordCount

From my sample documents, 5% of documents have word count less than 25. Let's remove documents which have wordcount less than 25.

In [None]:
# remove 5% of the documents, which is wordCount less than 25
df_wc_gt25 = df_wc.where(df_wc.wordCount > 25) 
print "before filtering wordCount, there are %s txt files" %df_wc.count()
print "after filtering out wordCount <= 30, there are %s txt files" %df_wc_gt25.count()

After removing unnecesary documents, text should be tokenized to be analyzed. Now, let's tokenize from the corpus

In [None]:
# Tokenizer
tokenizer = Tokenizer().setInputCol('CONTENT').setOutputCol('words')
tokenized = tokenizer.transform(df_wc_gt25)
tokenized.show()

After tokenizing the text, we shoud remove stopwords like "a, an, the, we're" etc. Let's download stopwords list from NLTK. We can also add customized stopwords.

In [None]:
# construct stopwords list
nltk.download("stopwords")
stop_words = nltk.corpus.stopwords.words('english')

add cutomized stopwords into stopwords list

stop_words.extend([u"twenty",u"thirty",u"forty",u'sixty',u'seventy',u'eighty',u'eleven', u'twelve',u'thirteen',u'fourteen',u'fifteen',u'sixteen',u'seventeen',u'eighteen',u'nineteen'])

stop_words.extend([u'january', u'february', u'march', u'april', u'may', u'june', u'july', u'august', u'september', u'october', u'november', u'december'])

stop_words.extend([u'monday', u'tuesday', u'wednesday', u'thursday', u'friday', u'saturday', u'sunday'])

In [None]:
# remove stopwords
sw_filter = StopWordsRemover()\
  .setStopWords(stop_words)\
  .setCaseSensitive(False)\
  .setInputCol("words")\
  .setOutputCol("filtered")
sw_filteredDF = sw_filter.transform(tokenized)

After removing stopwords, let's lemmatize the filtered token. 

Lemmatization is the process of grouping together the different inflected forms of a word so they can be analysed as a single item. Lemmatization is similar to stemming but it brings context to the words. So it links words with similar meaning to one word. https://www.geeksforgeeks.org/python-lemmatization-with-nltk/

In [None]:
# build class to lemmatize the filtered token
class lemmatizer():
# define own function of lemmatize
  def lemmatize(self, row, Input, Output):
    # to add a column of lemmatized words, let's turn RDD's row into dictionary since tuple is not mutable
    rowDict = row.asDict()
    nltk.download('wordnet')
    lemmatizer = WordNetLemmatizer()
    wordvec = []
    # iterate input columns words and get a list of lemmatized word
    for word in rowDict[Input]:
      wordvec.append(lemmatizer.lemmatize(word))
    # add new list of lemmatized word into row dictionary
    rowDict[Output] = wordvec
    # turn row dictionary back to Row
    newrow = Row(**rowDict)
    return newrow

  def transform(self, data, Input, Output):
    lemmatizedDF = data.rdd.map(lambda x: self.lemmatize(x, Input, Output)).toDF()
    return lemmatizedDF

In [None]:
# lemmatize the filtered token
lemmatizedDF = lemmatizer().transform(sw_filteredDF,"filtered", "lemmatized")
lemmatizedDF.show()

After above steps, let's build TF-IDF. For more information about TF-IDF, please look at https://en.wikipedia.org/wiki/Tf%E2%80%93idf

In [None]:
# TF
# we will remove words that appear in 5 docs or less
cv = CountVectorizer(vocabSize=2**17)\
  .setInputCol("lemmatized")\
  .setOutputCol("tf")\
  .setMinDF(5)

In [None]:
# IDF
idf = IDF(inputCol="tf", outputCol="tfidf")

Fit TF and IDF into pipeline

In [None]:
# TF & IDF after lemmatizer
pipepline = Pipeline(stages=[cv, idf]).fit(lemmatizedDF)

In [None]:
result_tfidf = pipepline.transform(lemmatizedDF)

# LDA Model

For more information about LDA model, please refer to https://en.wikipedia.org/wiki/Latent_Dirichlet_allocation

Now, let's build LDA model -- keep tuning the number_of_topics and maxIter

In [None]:
num_of_topics = 7
lda = LDA(k=num_of_topics, seed=12344, optimizer="em", featuresCol="tfidf", maxIter = 100).setTopicDistributionCol("topicDistributionCol")
ldamodel = lda.fit(result_tfidf)
transformed = ldamodel.transform(result_tfidf)

After building LDA model, next step is to visualize the result of LDA model

Since each word is represented by a number, we need to convert the number back to word. So, let's get vocabulary of CountVectorizer and transform to dictionary for easily use later

In [None]:
vocab = pipepline.stages[0].vocabulary
vocab_dict = {k: v for k, v in enumerate(vocab)}

In [None]:
# topics X words
class topic_describe():
  wordNumber = 5
  def setWordNumber(self, num):
    self.wordNumber = num
    
  def topic_render(self, row):
    result = [[row.topic]]
    dictionary = dict(zip(row.termIndices, row.termWeights))
    newDict = dict(sorted(dictionary.iteritems(), key=operator.itemgetter(1), reverse=True)[:self.wordNumber])
    termwords = []
    termweights = []
    for key, value in newDict.iteritems():
        termwords.append(vocab_dict[key])
        termweights.append(value)
    result.extend([termwords, termweights])
    return result
  
  def topic_by_word(self, ldamodel):
    topicIndices = ldamodel.describeTopics()
    topics_final = topicIndices.rdd.map(self.topic_render).toDF(["topic","termwords","termweights"])
    return topics_final


Now let's display topics X words

In [None]:
topicresult = topic_describe()
topicresult.setWordNumber(7) # setting how many words to describe topics
display(topicresult.topic_by_word(ldamodel).toPandas())

We also need to get Documents X Topics by FILENAME

In [None]:
class docuByTopicRender():
  TopicNumber = 0
  InputCol = ''
  OutputCol = ''
  cutoff = 0
  def setTopicNumber(self, num):
    self.TopicNumber = num
    
  def setInputCol(self, name):
    self.InputCol = name
  
  def setOutputCol(self, name):
    self.OutputCol = name
    
  def setCutoff(self, number):
    self.cutoff = number
  
  #convert vector of TopicDistributionCol to a readable format - columns of topics with list of probability as value
  def topicDistConvertCols(self, data):
    for i in range(self.TopicNumber):
      ithelement=udf(lambda v:float(v[i]),FloatType())
      data = data.withColumn('Topic %s' %i, ithelement(self.InputCol))
    return data

  def topicColsCombined(self, row):
    topiclist = ['Topic %s' %i for i in range(self.TopicNumber)]
    problist = []
    for topic in topiclist:
      problist.append(row[topic])
    topicDict = dict(zip(topiclist, problist)) # combine topics with probabilities as one column and filtered based on cutoff
    topicDoc = {k: v for k, v in topicDict.iteritems() if v >= self.cutoff}
    return [row['FILENAME'], topicDoc]

  def docuByTopic(self, data):
    transformedWithTopics = self.topicDistConvertCols(data)
    return transformedWithTopics.rdd.map(lambda x: self.topicColsCombined(x)).toDF(['FILENAME', self.OutputCol])


Let's display Documents X Topics

In [None]:
docAndTopic = docuByTopicRender()
docAndTopic.setTopicNumber(num_of_topics)
docAndTopic.setInputCol('topicDistributionCol')
docAndTopic.setOutputCol('topicDoc')
docAndTopic.setCutoff(0.1) # only showing the topics which is greater than a cutoff (0.1?)
dff = docAndTopic.docuByTopic(transformed)
display(dff.toPandas())

We haven't finished. We should keep tuning the LDA model based on the results and finally optimize the model