# Topic Modeling

## Prepare Data

In [15]:
df = amazon_cleaned.join(covid_cleaned, amazon_cleaned.review_date == covid_cleaned.date, "leftouter")
df = df.na.fill(value=0,subset=["increase_cases","increase_deaths","cases","deaths"])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
tm_df = df.withColumn('datediff', datediff(current_date(),col("review_date")).alias("datediff"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
tm_df = tm_df.withColumn('before', (tm_df.datediff < 864).cast("integer"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
tm_df.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+----------+--------------------+-----+----------+--------------+-------------+----+--------------------+-----------------+-----------+----+-----+------+--------------+---------------+--------+------+
|index|product_id|               title|price|num_rating|overall_rating|helpful_votes|star|                text|  review_location|review_date|date|cases|deaths|increase_cases|increase_deaths|datediff|before|
+-----+----------+--------------------+-----+----------+--------------+-------------+----+--------------------+-----------------+-----------+----+-----+------+--------------+---------------+--------+------+
|    0|B00006IX59|Chuckit! Classic ...| 8.72|     46941|           4.7|          385| 4.0|"After I broke my...|the United States| 2019-05-18|null|    0|     0|             0|              0|    1112|     0|
|    1|B00006IX59|Chuckit! Classic ...| 8.72|     46941|           4.7|          909| 3.0|Some (pointed) th...|the United States| 2015-07-15|null|    0|     0|             

In [19]:
before = tm_df.filter(tm_df.before==0)
after = tm_df.filter(tm_df.before==1)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Before 

In [20]:
# reference: https://medium.com/trustyou-engineering/topic-modelling-with-pyspark-and-spark-nlp-a99d063f1a6e
from pyspark.sql import functions as F
data = before
text_col = 'text'
review_text = data.select(text_col).filter(F.col(text_col).isNotNull())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer
from sparknlp.annotator import Normalizer
from sparknlp.annotator import LemmatizerModel
from sparknlp.annotator import PerceptronModel
from sparknlp.annotator import Chunker
from sparknlp.base import Finisher

documentAssembler = DocumentAssembler() \
     .setInputCol(text_col) \
     .setOutputCol('document')

tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('tokenized')

normalizer = Normalizer() \
     .setInputCols(['tokenized']) \
     .setOutputCol('normalized') \
     .setLowercase(True)
    
lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['normalized']) \
     .setOutputCol('lemmatized')

finisher_lemma = Finisher()\
      .setInputCols("lemmatized").setOutputCols("lemma")

from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
stopwords_cleaner = StopWordsRemover(inputCol='lemma',outputCol='no_stop_lemmatized')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]

In [22]:
pos_tagger = PerceptronModel.pretrained('pos_anc') \
     .setInputCols(['document', 'lemmatized']) \
     .setOutputCol('pos')

allowed_tags = ['<JJ>+<NN>', '<NN>+<NN>']
chunker = Chunker() \
     .setInputCols(['document', 'pos']) \
     .setOutputCol('ngrams') \
     .setRegexParsers(allowed_tags)

finisher = Finisher() \
     .setInputCols(['unigrams', 'ngrams'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[OK!]

In [23]:
from pyspark.ml import Pipeline
pipeline = Pipeline() \
     .setStages([documentAssembler,
                 tokenizer,
                 normalizer,
                 lemmatizer,
                 finisher_lemma,
                 stopwords_cleaner])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
processed_review = pipeline.fit(review_text).transform(review_text)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF

tfizer = CountVectorizer(inputCol='no_stop_lemmatized',
                         outputCol='tf_features')
tf_model = tfizer.fit(processed_review)
tf_result = tf_model.transform(processed_review)

idfizer = IDF(inputCol='tf_features', 
              outputCol='tf_idf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
from pyspark.ml.clustering import LDA
num_topics = 6
max_iter = 50
lda = LDA(k=num_topics, 
          maxIter=max_iter, 
          featuresCol='tf_idf_features')
lda_model = lda.fit(tfidf_result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
import pyspark.sql.types as T
vocab = tf_model.vocabulary
def get_words(token_list):
    return [vocab[token_id] for token_id in token_list]
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
num_top_words =10
before_topics = lda_model.describeTopics(num_top_words).withColumn('topicWords', udf_to_words(F.col('termIndices')))
before_topics.show(truncate=100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----------------------------------------------------+----------------------------------------------------------------------------------------------------+------------------------------------------------------------------+
|topic|                                          termIndices|                                                                                         termWeights|                                                        topicWords|
+-----+-----------------------------------------------------+----------------------------------------------------------------------------------------------------+------------------------------------------------------------------+
|    0|                 [1, 70, 21, 4, 122, 5, 2, 3, 16, 93]|[0.006931628151790679, 0.005946793510152523, 0.005090178446993778, 0.004654223075063661, 0.004631...|         [cat, water, bag, food, filter, one, get, use, eat, open]|
|    1|                  [23, 1, 26, 4, 3, 55, 24, 0, 19, 9]|[0.0116608704095722

In [29]:
before_topics.write.parquet("s3n://lzhang19/lda/before")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def array_to_string(my_list):
    return '[' + ','.join([str(elem) for elem in my_list]) + ']'

array_to_string_udf = udf(array_to_string, StringType())

df = before_topics.withColumn('termIndices', array_to_string_udf(before_topics["termIndices"]))
df = df.withColumn('termWeights', array_to_string_udf(df["termWeights"]))
df = df.withColumn('topicWords', array_to_string_udf(df["topicWords"]))

df.coalesce(1).write.csv('s3n://lzhang19/output/before.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## After

In [31]:
# reference: https://medium.com/trustyou-engineering/topic-modelling-with-pyspark-and-spark-nlp-a99d063f1a6e
from pyspark.sql import functions as F
data = after
text_col = 'text'
review_text = data.select(text_col).filter(F.col(text_col).isNotNull())

from sparknlp.base import DocumentAssembler
from sparknlp.annotator import Tokenizer
from sparknlp.annotator import Normalizer
from sparknlp.annotator import LemmatizerModel
from sparknlp.annotator import PerceptronModel
from sparknlp.annotator import Chunker
from sparknlp.base import Finisher

documentAssembler = DocumentAssembler() \
     .setInputCol(text_col) \
     .setOutputCol('document')

tokenizer = Tokenizer() \
     .setInputCols(['document']) \
     .setOutputCol('tokenized')

normalizer = Normalizer() \
     .setInputCols(['tokenized']) \
     .setOutputCol('normalized') \
     .setLowercase(True)
    
lemmatizer = LemmatizerModel.pretrained() \
     .setInputCols(['normalized']) \
     .setOutputCol('lemmatized')

finisher_lemma = Finisher()\
      .setInputCols("lemmatized").setOutputCols("lemma")

from pyspark.sql.functions import regexp_replace
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
stopwords_cleaner = StopWordsRemover(inputCol='lemma',outputCol='no_stop_lemmatized')

pos_tagger = PerceptronModel.pretrained('pos_anc') \
     .setInputCols(['document', 'lemmatized']) \
     .setOutputCol('pos')

allowed_tags = ['<JJ>+<NN>', '<NN>+<NN>']
chunker = Chunker() \
     .setInputCols(['document', 'pos']) \
     .setOutputCol('ngrams') \
     .setRegexParsers(allowed_tags)

finisher = Finisher() \
     .setInputCols(['unigrams', 'ngrams'])

from pyspark.ml import Pipeline
pipeline = Pipeline() \
     .setStages([documentAssembler,
                 tokenizer,
                 normalizer,
                 lemmatizer,
                 finisher_lemma,
                 stopwords_cleaner])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

lemma_antbnc download started this may take some time.
Approximate size to download 907.6 KB
[OK!]
pos_anc download started this may take some time.
Approximate size to download 3.9 MB
[OK!]

In [32]:
processed_review = pipeline.fit(review_text).transform(review_text)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import IDF

tfizer = CountVectorizer(inputCol='no_stop_lemmatized',
                         outputCol='tf_features')
tf_model = tfizer.fit(processed_review)
tf_result = tf_model.transform(processed_review)

idfizer = IDF(inputCol='tf_features', 
              outputCol='tf_idf_features')
idf_model = idfizer.fit(tf_result)
tfidf_result = idf_model.transform(tf_result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
from pyspark.ml.clustering import LDA
num_topics = 6
max_iter = 50
lda = LDA(k=num_topics, 
          maxIter=max_iter, 
          featuresCol='tf_idf_features')
lda_model = lda.fit(tfidf_result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
import pyspark.sql.types as T
vocab = tf_model.vocabulary
def get_words(token_list):
    return [vocab[token_id] for token_id in token_list]
udf_to_words = F.udf(get_words, T.ArrayType(T.StringType()))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
num_top_words = 10
after_topics = lda_model.describeTopics(num_top_words).withColumn('topicWords', udf_to_words(F.col('termIndices')))
after_topics.show(truncate=100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----+-----------------------------------------+----------------------------------------------------------------------------------------------------+-------------------------------------------------------------------+
|topic|                              termIndices|                                                                                         termWeights|                                                         topicWords|
+-----+-----------------------------------------+----------------------------------------------------------------------------------------------------+-------------------------------------------------------------------+
|    0|   [91, 3, 129, 9, 104, 7, 140, 2, 1, 22]|[0.0069457412946933896, 0.006816312454237421, 0.006464533416780946, 0.006368400629049005, 0.00589...|    [hair, use, flea, work, brush, product, spray, get, cat, smell]|
|    1|   [66, 130, 5, 62, 0, 71, 37, 183, 1, 2]|[0.012686116859810342, 0.006044027755829386, 0.0052425687352202, 0.00508208

In [37]:
after_topics.write.parquet("s3n://lzhang19/lda/after")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def array_to_string(my_list):
    return '[' + ','.join([str(elem) for elem in my_list]) + ']'

array_to_string_udf = udf(array_to_string, StringType())

df = after_topics.withColumn('termIndices', array_to_string_udf(after_topics["termIndices"]))
df = df.withColumn('termWeights', array_to_string_udf(df["termWeights"]))
df = df.withColumn('topicWords', array_to_string_udf(df["topicWords"]))

df.coalesce(1).write.csv('s3n://lzhang19/output/after.csv')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…