# Topic modelling using LDA with Lemmitizer

In [1]:
import os
import re

import numpy as np
import pandas as pd

from pyspark.sql.types import *
from pyspark.sql.functions import expr
from pyspark.sql import Row
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.clustering import LDA
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import functions as F
import sparknlp
from sparknlp import DocumentAssembler, Finisher
from sparknlp.annotator import *
from sparknlp.annotator import StopWordsCleaner
%matplotlib inline
from sparknlp.annotator import PerceptronModel
spark = sparknlp.start()
from pyspark.sql import types as T
from pyspark.ml.feature import CountVectorizer,StringIndexer, RegexTokenizer,StopWordsRemover
from sparknlp.annotator import NGramGenerator

In [2]:
dataPath = "C:/Users/saika/Desktop/Pyspark/data/shopee_reviews.csv"
df = spark.read.csv(dataPath, header='true', inferSchema = 'true')

In [3]:
df.show()

+--------------------+--------------------+
|               label|                text|
+--------------------+--------------------+
|                   5|Looks ok. Not lik...|
|                   5|Tried, the curren...|
|                   5|Item received aft...|
|                   5|Thanks!!! Works a...|
|                   5|Fast delivery con...|
|                   5|Fast delivery goo...|
|                   5|Got my order and ...|
|                   5|Items received in...|
|                   5|Received in good ...|
|                   1|Item doesn’t work . |
|Asked me to send ...| show a non worki...|
|Don’t waste time ...|                null|
|                   5|         Fast. Great|
|                   5|I've tried it, an...|
|                   5|Hub uses it. Musc...|
|                   5|Well received. Fa...|
|                   5|Product received....|
|                   5|             Good.. |
|                   5|box was a little ...|
|                   4|Fast deliv

In [4]:
df.dropna()
text_col = 'text'

text = df.select(text_col).filter(F.col(text_col).isNotNull())

text.limit(5).show(truncate=90)

+------------------------------------------------------------------------------------------+
|                                                                                      text|
+------------------------------------------------------------------------------------------+
|   Looks ok. Not like so durable. Will hv to use a while to recommend others of its worth.|
|Tried, the current can be very powerful depending on the setting, i don't dare to go hi...|
|               Item received after a week. Looks smaller than expected, can’t wait to try!|
|Thanks!!! Works as describe no complaints. Not really expecting any life changing resul...|
|Fast delivery considering it’s from overseas and only tried once. Not sure about the re...|
+------------------------------------------------------------------------------------------+



In [5]:
import nltk
nltk.download('stopwords')

from nltk.corpus import stopwords

eng_stopwords = stopwords.words('english')

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\saika\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [6]:
#lemmatizer = LemmatizerModel.pretrained()\
#   .setInputCols(['token'])\
#    .setOutputCol('lemma')

#norvig_pretrained = NorvigSweetingModel.pretrained()\
 #   .setInputCols(['tokens_filtered'])\
  #  .setOutputCol('norvig_annotations')

In [7]:
assembler = DocumentAssembler()\
    .setInputCol('text')\
    .setOutputCol('document')
sentence = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentences")
tokenizer = Tokenizer()\
    .setInputCols(['sentences'])\
    .setOutputCol('token')

#stemmer = Stemmer()\
 #   .setInputCols(['token'])\
  #  .setOutputCol('stem')
lemmatizer = LemmatizerModel.pretrained()\
   .setInputCols(['token'])\
    .setOutputCol('lemma')
normalizer = Normalizer()\
    .setCleanupPatterns([
        '[^a-zA-Z.-]+', 
        '^[^a-zA-Z]+', 
        '[^a-zA-Z]+$',
    ])\
    .setInputCols(['token'])\
    .setOutputCol('normalized')\
    .setLowercase(True)
sw_clean = StopWordsCleaner() \
     .setInputCols(['lemma']) \
     .setOutputCol('unigrams') \
     .setStopWords(eng_stopwords)

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]


In [8]:
ngrammer = NGramGenerator() \
    .setInputCols(['lemma']) \
    .setOutputCol('ngrams') \
    .setN(3) \
    .setEnableCumulative(True) \
    .setDelimiter('_')

In [9]:
pos_tagger = PerceptronModel.pretrained('pos_anc') \
    .setInputCols(['document', 'lemma']) \
    .setOutputCol('pos')

pos_anc download started this may take some time.
Approximate size to download 4.3 MB
[OK!]


In [10]:
finisher = Finisher() \
     .setInputCols(['unigrams', 'ngrams', 'pos']) \

In [11]:
from pyspark.ml import Pipeline

pipeline = Pipeline() \
     .setStages([assembler, sentence, tokenizer, lemmatizer, normalizer, sw_clean, ngrammer,pos_tagger, finisher])

In [12]:
processed = pipeline.fit(text).transform(text)


In [13]:
processed.limit(10).show()


+--------------------+--------------------+--------------------+--------------------+
|                text|   finished_unigrams|     finished_ngrams|        finished_pos|
+--------------------+--------------------+--------------------+--------------------+
|Looks ok. Not lik...|[Looks, ok, ., li...|[Looks, ok, ., Lo...|[NNP, JJ, ., RB, ...|
|Tried, the curren...|[Tried, ,, curren...|[Tried, ,, the, c...|[NNP, ,, DT, JJ, ...|
|Item received aft...|[Item, receive, w...|[Item, receive, a...|[NNP, VB, IN, DT,...|
|Thanks!!! Works a...|[Thanks, !!!, Wor...|[Thanks, !!!, Tha...|[NNS, ., NNP, IN,...|
|Fast delivery con...|[Fast, delivery, ...|[Fast, delivery, ...|[RB, NN, NN, NN, ...|
|Fast delivery goo...|[Fast, delivery, ...|[Fast, delivery, ...|    [RB, NN, JJ, NN]|
|Got my order and ...|[Got, order, come...|[Got, i, order, a...|[NNP, NNP, NN, CC...|
|Items received in...|[Items, receive, ...|[Items, receive, ...|[NNS, VBP, IN, DT...|
|Received in good ...|[Received, good, ...|[Received, 

In [14]:

from pyspark.ml.feature import CountVectorizer

tf = CountVectorizer(inputCol='finished_unigrams', outputCol='tf_features')
tf_model = tf.fit(processed)
tf_result = tf_model.transform(processed)

In [15]:
tf_result.select('tf_features').limit(10).show()

+--------------------+
|         tf_features|
+--------------------+
|(189545,[0,25,34,...|
|(189545,[0,2,7,13...|
|(189545,[0,2,3,5,...|
|(189545,[0,3,33,3...|
|(189545,[0,4,15,2...|
|(189545,[1,4,15,1...|
|(189545,[0,1,3,8,...|
|(189545,[0,2,3,5,...|
|(189545,[0,1,2,6,...|
|(189545,[0,14,27,...|
+--------------------+



In [16]:

from pyspark.ml.feature import IDF

idfizer = IDF(inputCol='tf_features', outputCol='tf_idf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

In [17]:
tfidf_result.limit(10).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                text|   finished_unigrams|     finished_ngrams|        finished_pos|         tf_features|     tf_idf_features|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Looks ok. Not lik...|[Looks, ok, ., li...|[Looks, ok, ., Lo...|[NNP, JJ, ., RB, ...|(189545,[0,25,34,...|(189545,[0,25,34,...|
|Tried, the curren...|[Tried, ,, curren...|[Tried, ,, the, c...|[NNP, ,, DT, JJ, ...|(189545,[0,2,7,13...|(189545,[0,2,7,13...|
|Item received aft...|[Item, receive, w...|[Item, receive, a...|[NNP, VB, IN, DT,...|(189545,[0,2,3,5,...|(189545,[0,2,3,5,...|
|Thanks!!! Works a...|[Thanks, !!!, Wor...|[Thanks, !!!, Tha...|[NNS, ., NNP, IN,...|(189545,[0,3,33,3...|(189545,[0,3,33,3...|
|Fast delivery con...|[Fast, delivery, ...|[Fast, delivery, ...|[RB, NN, NN, NN, ...|(189545,[0,4,15,2..

In [18]:

from pyspark.ml.clustering import LDA

num_topics = 5
max_iter = 10

lda = LDA(k=num_topics, maxIter=max_iter, featuresCol='tf_idf_features')
lda_model = lda.fit(tfidf_result)

In [19]:

vocab = tf_model.vocabulary

def get_words(token_list):
     return [vocab[token_id] for token_id in token_list]
       
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

# Top 5 topics with words

In [20]:
num_top_words = 10

topics = lda_model.describeTopics(num_top_words).withColumn('topicWords', udf_to_words(F.col('termIndices')))
topics.select('topic', 'topicWords').show(truncate=90)

+-----+--------------------------------------------------------------------+
|topic|                                                          topicWords|
+-----+--------------------------------------------------------------------+
|    0|               [., !, ,, good, receive, item, fast, week, buy, take]|
|    1|    [., !, good, ,, delivery, quality, Fast, fast, condition, price]|
|    2|[., good, !, ,, material, receive, Thanks, condition, Item, picture]|
|    3|       [., ,, receive, !, order, seller, delivery, well, good, time]|
|    4|          [., ,, good, !, product, buy, small, size, use, condition]|
+-----+--------------------------------------------------------------------+

