# Amazon reviews LDA topic modeling - for EMR

## The notebook:
- Train LDA model
- Identify top topics

### Create Spark Session

In [1]:
from pyspark.sql import functions as F

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
9,application_1588267976982_0011,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
spark

<pyspark.sql.session.SparkSession object at 0x7fa4a94aaac8>

## Import ML libraries

In [3]:
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import CountVectorizer, IDF,RegexTokenizer, Tokenizer
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import struct
import re
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer

### Load Retail data set

In [4]:
df = spark.read\
          .option("header", "true")\
          .option("inferSchema", "true")\
          .option("basePath", "hdfs:///hive/amazon-reviews-pds/parquet/")\
          .parquet("hdfs:///hive/amazon-reviews-pds/parquet/*")

In [5]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- product_category: string (nullable = true)

In [6]:
#df_ml = df.filter((F.col("product_category")=="Digital_Ebook_Purchase") | (F.col("product_category")=="Books"))
df_ml = df.filter((F.col("product_category")=="Digital_Ebook_Purchase") \
                   & (F.col("year")==2015) \
                   & (F.col("review_date")<'2015-02-01')
                   & (F.col("star_rating")>3))

## Create new DF with only narrative and unique ID

In [7]:
#from pyspark.sql.functions import monotonically_increasing_id, concat

df1 = df_ml.withColumn('review_text', 
                       F.concat(F.col('review_headline'),F.lit(' '), F.col('review_body')))
corpus =df1.select('review_text')

# This will return a new DF with all the columns + id
corpus_df = corpus.withColumn("id", F.monotonically_increasing_id())
# Remove records with no review text
corpus_df = corpus_df.dropna()

In [8]:
corpus_df.persist()
print('Corpus size:', corpus_df.count())
corpus_df.show(5)

Corpus size: 466817
+--------------------+----------+
|         review_text|        id|
+--------------------+----------+
|Great story! Ray ...|8589934592|
|Four Stars It's a...|8589934593|
|Four Stars Good r...|8589934594|
|Really enjoyed th...|8589934595|
|Five Stars Amazin...|8589934596|
+--------------------+----------+
only showing top 5 rows

In [9]:
corpus_df.printSchema()

root
 |-- review_text: string (nullable = true)
 |-- id: long (nullable = false)

## Tokenize Narrative text 

In [10]:
tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
countTokens = udf(lambda words: len(words), IntegerType())
'''
tokenized_df = tokenizer.transform(corpus_df)
tokenized_df.select("review_text", "words").withColumn("tokens", countTokens(col("words"))).show() 
'''
regexTokenizer = RegexTokenizer(inputCol="review_text", 
                                outputCol="words",pattern="\\w+", gaps=False)
# alternatively, pattern="\\w+", gaps(False) pattern="\\W"

tokenized_df = regexTokenizer.transform(corpus_df)
tokenized_df.select("review_text", "words") \
    .withColumn("tokens", countTokens(F.col("words"))).show()

+--------------------+--------------------+------+
|         review_text|               words|tokens|
+--------------------+--------------------+------+
|Great story! Ray ...|[great, story, ra...|    29|
|Four Stars It's a...|[four, stars, it,...|    17|
|Four Stars Good r...|[four, stars, goo...|     6|
|Really enjoyed th...|[really, enjoyed,...|    29|
|Five Stars Amazin...|[five, stars, ama...|     7|
|Five Stars excell...|[five, stars, exc...|     4|
|Good read The las...|[good, read, the,...|    24|
|Five Stars Great ...|[five, stars, gre...|     8|
|Five Stars Best h...|[five, stars, bes...|     8|
|Loved it I read e...|[loved, it, i, re...|    17|
|Four Stars Easy r...|[four, stars, eas...|    11|
|wow I Should have...|[wow, i, should, ...|    23|
|Five Stars If you...|[five, stars, if,...|    14|
|Really Great Read...|[really, great, r...|    20|
|Inspirational. Fo...|[inspirational, f...|    53|
|Five Stars This i...|[five, stars, thi...|    13|
|Good read! Very g...|[good, re

## Get stop words list

In [11]:
stop_words = ['a', 'about', 'above', 'across', 'after', 'afterwards', 'again', 'against', 'all', 'almost', 'alone', 'along', 'already', 'also', 'although', 'always', 'am', 'among', 'amongst', 'amoungst', 'amount', 'an', 'and', 'another', 'any', 'anyhow', 'anyone', 'anything', 'anyway', 'anywhere', 'are', 'around', 'as', 'at', 'back', 'be', 'became', 'because', 'become', 'becomes', 'becoming', 'been', 'before', 'beforehand', 'behind', 'being', 'below', 'beside', 'besides', 'between', 'beyond', 'bill', 'both', 'bottom', 'but', 'by', 'call', 'can', 'cannot', 'cant', 'co', 'computer', 'con', 'could', 'couldnt', 'cry', 'de', 'describe', 'detail', 'do', 'done', 'down', 'due', 'during', 'each', 'eg', 'eight', 'either', 'eleven', 'else', 'elsewhere', 'empty', 'enough', 'etc', 'even', 'ever', 'every', 'everyone', 'everything', 'everywhere', 'except', 'few', 'fifteen', 'fify', 'fill', 'find', 'fire', 'first', 'five', 'for', 'former', 'formerly', 'forty', 'found', 'four', 'from', 'front', 'full', 'further', 'get', 'give', 'go', 'had', 'has', 'hasnt', 'have', 'he', 'hence', 'her', 'here', 'hereafter', 'hereby', 'herein', 'hereupon', 'hers', 'herself', 'him', 'himself', 'his', 'how', 'however', 'hundred', 'i', 'ie', 'if', 'in', 'inc', 'indeed', 'interest', 'into', 'is', 'it', 'its', 'itself', 'keep', 'last', 'latter', 'latterly', 'least', 'less', 'ltd', 'made', 'many', 'may', 'me', 'meanwhile', 'might', 'mill', 'mine', 'more', 'moreover', 'most', 'mostly', 'move', 'much', 'must', 'my', 'myself', 'name', 'namely', 'neither', 'never', 'nevertheless', 'next', 'nine', 'no', 'nobody', 'none', 'noone', 'nor', 'not', 'nothing', 'now', 'nowhere', 'of', 'off', 'often', 'on', 'once', 'one', 'only', 'onto', 'or', 'other', 'others', 'otherwise', 'our', 'ours', 'ourselves', 'out', 'over', 'own', 'part', 'per', 'perhaps', 'please', 'put', 'rather', 're', 'same', 'see', 'seem', 'seemed', 'seeming', 'seems', 'serious', 'several', 'she', 'should', 'show', 'side', 'since', 'sincere', 'six', 'sixty', 'so', 'some', 'somehow', 'someone', 'something', 'sometime', 'sometimes', 'somewhere', 'still', 'such', 'system', 'take', 'ten', 'than', 'that', 'the', 'their', 'them', 'themselves', 'then', 'thence', 'there', 'thereafter', 'thereby', 'therefore', 'therein', 'thereupon', 'these', 'they', 'thick', 'thin', 'third', 'this', 'those', 'though', 'three', 'through', 'throughout', 'thru', 'thus', 'to', 'together', 'too', 'top', 'toward', 'towards', 'twelve', 'twenty', 'two', 'un', 'under', 'until', 'up', 'upon', 'us', 'very', 'via', 'was', 'we', 'well', 'were', 'what', 'whatever', 'when', 'whence', 'whenever', 'where', 'whereafter', 'whereas', 'whereby', 'wherein', 'whereupon', 'wherever', 'whether', 'which', 'while', 'whither', 'who', 'whoever', 'whole', 'whom', 'whose', 'why', 'will', 'with', 'within', 'without', 'would', 'yet', 'you', 'your', 'yours', 'yourself', 'yourselves', '']
stop_words = stop_words + ['br','book','34']

In [12]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tokenized_df1 = remover.transform(tokenized_df)
tokenized_df1.show(5)

stopwordList = stop_words

remover=StopWordsRemover(inputCol="filtered", outputCol="filtered_more" ,stopWords=stopwordList)
tokenized_df2 = remover.transform(tokenized_df1)
tokenized_df2.show(5)

+--------------------+----------+--------------------+--------------------+
|         review_text|        id|               words|            filtered|
+--------------------+----------+--------------------+--------------------+
|Great story! Ray ...|8589934592|[great, story, ra...|[great, story, ra...|
|Four Stars It's a...|8589934593|[four, stars, it,...|[four, stars, gre...|
|Four Stars Good r...|8589934594|[four, stars, goo...|[four, stars, goo...|
|Really enjoyed th...|8589934595|[really, enjoyed,...|[really, enjoyed,...|
|Five Stars Amazin...|8589934596|[five, stars, ama...|[five, stars, ama...|
+--------------------+----------+--------------------+--------------------+
only showing top 5 rows

+--------------------+----------+--------------------+--------------------+--------------------+
|         review_text|        id|               words|            filtered|       filtered_more|
+--------------------+----------+--------------------+--------------------+--------------------+


## Vectorize (convert to numeric)

In [13]:
# Term Frequency Vectorization  - Option 2 (CountVectorizer)    : 
cv = CountVectorizer(inputCol="filtered_more", outputCol="features", vocabSize = 10000)
cvmodel = cv.fit(tokenized_df2)
featurized_df = cvmodel.transform(tokenized_df2)
vocab = cvmodel.vocabulary
featurized_df.select('filtered_more','features','id').show(5)

+--------------------+--------------------+----------+
|       filtered_more|            features|        id|
+--------------------+--------------------+----------+
|[great, story, ra...|(10000,[1,2,6,7,2...|8589934592|
|[stars, great, ge...|(10000,[0,2,9,32,...|8589934593|
|[stars, good, rea...|(10000,[0,4,9,137...|8589934594|
|[really, enjoyed,...|(10000,[7,10,11,1...|8589934595|
|[stars, amazing, ...|(10000,[0,9,29,97...|8589934596|
+--------------------+--------------------+----------+
only showing top 5 rows

## This is our DF to train LDA model on

In [14]:
countVectors = featurized_df.select('features','id')
countVectors.persist()
print('Records in the DF:', countVectors.count())

Records in the DF: 466817

## Train LDA model

In [15]:
#k=10 means 10 words per topic
lda = LDA(k=10, maxIter=10)
model = lda.fit(countVectors)

"""
ll = model.logLikelihood(countVectors)
lp = model.logPerplexity(countVectors)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(countVectors)
transformed.show(truncate=False)
"""

'\nll = model.logLikelihood(countVectors)\nlp = model.logPerplexity(countVectors)\nprint("The lower bound on the log likelihood of the entire corpus: " + str(ll))\nprint("The upper bound on perplexity: " + str(lp))\n\n# Describe topics.\ntopics = model.describeTopics(3)\nprint("The topics described by their top-weighted terms:")\ntopics.show(truncate=False)\n\n# Shows the result\ntransformed = model.transform(countVectors)\ntransformed.show(truncate=False)\n'

## Display words for top 10 topics

In [16]:
topics = model.describeTopics()   
topics_rdd = topics.rdd

topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()

for idx, topic in enumerate(topics_words):
    print ("topic: ", idx)
    print ("----------")
    for word in topic:
       print (word)
    print ("----------")

topic:  0
----------
love
story
read
loved
like
really
characters
good
life
m
----------
topic:  1
----------
read
great
story
love
series
good
books
reading
stars
characters
----------
topic:  2
----------
read
series
like
books
story
great
mystery
good
author
new
----------
topic:  3
----------
story
read
good
series
really
like
time
life
love
little
----------
topic:  4
----------
read
story
great
best
love
history
books
god
reading
wonderful
----------
topic:  5
----------
read
great
life
amazing
god
time
m
reading
story
stars
----------
topic:  6
----------
read
good
like
great
life
really
reading
author
people
time
----------
topic:  7
----------
great
read
recipes
easy
life
like
really
recommend
love
way
----------
topic:  8
----------
stars
good
loved
story
excellent
awesome
y
read
reading
series
----------
topic:  9
----------
read
characters
story
loved
love
reading
author
series
great
books
----------