In [1]:
from pyspark.sql import functions as F

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1588718502363_0008,pyspark,idle,Link,Link,✔


SparkSession available as 'spark'.


In [2]:
spark

<pyspark.sql.session.SparkSession object at 0x7ffb555ac890>

In [3]:
# Load Data Set
df = spark.read\
          .option("header", "true")\
          .option("inferSchema", "true")\
          .option("basePath", "hdfs:///hive/amazon-reviews-pds/parquet/")\
          .parquet("hdfs:///hive/amazon-reviews-pds/parquet/*")

In [4]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- product_category: string (nullable = true)

In [5]:
print(df.columns)

['marketplace', 'customer_id', 'review_id', 'product_id', 'product_parent', 'product_title', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase', 'review_headline', 'review_body', 'review_date', 'year', 'product_category']

### Keep only limited number of columns

In [6]:
#Year filter
columns_to_keep = ['customer_id', 'review_id', 'product_id', 'product_parent', 
                   'product_title', 'star_rating', 'helpful_votes', 'total_votes',
                   'verified_purchase', 'review_date', 'year', 'product_category']
df_limited = df.select(columns_to_keep).filter(F.col("year")>2004)

In [7]:
df_limited.printSchema()

root
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- product_category: string (nullable = true)

In [8]:
df_limited.rdd.getNumPartitions()

216

In [9]:
df_limited.count()

71519024

### Exclude Multiple Reviews and consider only one review for every customer_id, product_category and product_id 

In [10]:
from pyspark.sql.window import Window

df_final = df_limited.select("*", F.row_number().over(Window.partitionBy("customer_id","product_category","product_id").orderBy("customer_id")).alias("row_num")).where("row_num = 1")

df_final.show(10)

+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+-----------------+-----------+----+--------------------+-------+
|customer_id|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|verified_purchase|review_date|year|    product_category|row_num|
+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+-----------------+-----------+----+--------------------+-------+
|   10000034|R18KORVQBM4KTQ|B0096YOL56|     837621444|Trident Case AEGI...|          5|            0|          1|                Y| 2013-10-27|2013|            Wireless|      1|
|   10000128|R10ZQ3KGV1I2XB|1421663260|     593136478|Golden Retrievers...|          4|            0|          0|                Y| 2011-01-04|2011|               Books|      1|
|   10000166| RKNBFD2B5J2KL|B002DQL34G|     827931632|He-Man and the Ma...|          5|            1|         

In [11]:
#Type
type(df_final)



<class 'pyspark.sql.dataframe.DataFrame'>

In [12]:
#Count records in the dataframe
df_final.count()

65911069

In [13]:
#Drop column row_num
df_final.drop("row_num")

DataFrame[customer_id: string, review_id: string, product_id: string, product_parent: string, product_title: string, star_rating: int, helpful_votes: int, total_votes: int, verified_purchase: string, review_date: date, year: int, product_category: string]

In [14]:
#Persist Dataframe - will keep the data in the memory as much as possible after first action
df_final.persist()
df_final.show(2)

+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+-----------------+-----------+----+----------------+-------+
|customer_id|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|verified_purchase|review_date|year|product_category|row_num|
+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+-----------------+-----------+----+----------------+-------+
|   10000034|R18KORVQBM4KTQ|B0096YOL56|     837621444|Trident Case AEGI...|          5|            0|          1|                Y| 2013-10-27|2013|        Wireless|      1|
|   10000128|R10ZQ3KGV1I2XB|1421663260|     593136478|Golden Retrievers...|          4|            0|          0|                Y| 2011-01-04|2011|           Books|      1|
+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+-----------------

## Question 1

Explore the dataset and provide analysis by product-category and year:

 1. Number of reviews
 2. Number of users
 3. Average and Median review stars
 4. Percentiles of length of the review. Use the following percentiles: [0.1, 0.25, 0.5, 0.75, 0.9, 0.95]
 5. Percentiles for number of reviews per product. For example, 10% of books got 5 or less reviews. Use the following percentiles: [0.1, 0.25, 0.5, 0.75, 0.9, 0.95]
- Digital_Ebook_Purchase
- Books
- Wireless
- PC
- Mobile_Apps
- Video_DVD
- Digital_Video_Download

6. Identify week number (each year has 52 weeks) for each year and product category with most positive reviews (4 and 5 star).


In [15]:
#1. No of reviews in a given year for a particular product category sorted in descending order
df_final.groupby(F.col("product_category"), F.col("year"))\
.agg(F.count(F.col("review_id")).alias("no-of-reviews"))\
.sort("no-of-reviews", ascending =  False)\
.show(10)

+--------------------+----+-------------+
|    product_category|year|no-of-reviews|
+--------------------+----+-------------+
|Digital_Ebook_Pur...|2014|      6723857|
|Digital_Ebook_Pur...|2015|      4609422|
|Digital_Ebook_Pur...|2013|      4569668|
|               Books|2014|      3540849|
|            Wireless|2015|      3000790|
|               Books|2013|      2965949|
|               Books|2015|      2860728|
|            Wireless|2014|      2834083|
|                  PC|2014|      2008492|
|                  PC|2015|      1886149|
+--------------------+----+-------------+
only showing top 10 rows

In [16]:
#2. No of users in a given year for a particular product category sorted in descending order
df_final.groupby(F.col("product_category"), F.col("year"))\
.agg(F.count(F.col("customer_id")).alias("no-of-users"))\
.sort("no-of-users", ascending =  False)\
.show(10)

+--------------------+----+-----------+
|    product_category|year|no-of-users|
+--------------------+----+-----------+
|Digital_Ebook_Pur...|2014|    6723857|
|Digital_Ebook_Pur...|2015|    4609422|
|Digital_Ebook_Pur...|2013|    4569668|
|               Books|2014|    3540849|
|            Wireless|2015|    3000790|
|               Books|2013|    2965949|
|               Books|2015|    2860728|
|            Wireless|2014|    2834083|
|                  PC|2014|    2008492|
|                  PC|2015|    1886149|
+--------------------+----+-----------+
only showing top 10 rows

In [17]:
#3. Average and median review stars

#median review stars
colName = "star_rating"
quantileProbs = [0.5]
relError = 0.05

df_final.stat.approxQuantile("star_rating", quantileProbs, relError) 

[5.0]

In [18]:
#average review stars
df_final.groupby(F.col("product_category"), F.col("year"))\
.agg(F.avg(F.col("star_rating")).alias("avg_star_rating"))\
.sort("avg_star_rating", ascending =  False)\
.show(10)

+--------------------+----+------------------+
|    product_category|year|   avg_star_rating|
+--------------------+----+------------------+
|           Video_DVD|2015| 4.529908628845081|
|               Books|2015| 4.497382134897131|
|           Video_DVD|2014| 4.485465693220808|
|               Books|2014| 4.473276606825086|
|               Books|2013| 4.412498326842437|
|           Video_DVD|2013| 4.409182937819165|
|Digital_Ebook_Pur...|2015| 4.349698291889959|
|Digital_Ebook_Pur...|2014|4.3328144843056595|
|               Books|2012| 4.314687408696959|
|Digital_Ebook_Pur...|2013| 4.301699817142077|
+--------------------+----+------------------+
only showing top 10 rows

In [19]:
#4. Percentiles of length of reviews

sample = df.withColumn("length_of_reviews", F.length(F.col("review_body")))
sample.show(5)

colname = "length_of_reviews"
quantileProbs = [0.1, 0.25, 0.5, 0.75, 0.9, 0.95]
relError = 0.05

sample.stat.approxQuantile(colname, quantileProbs, relError)

+-----------+-----------+--------------+----------+--------------+-----------------------+-----------+-------------+-----------+----+-----------------+--------------------+----------------------------+-----------+----+----------------+-----------------+
|marketplace|customer_id|     review_id|product_id|product_parent|          product_title|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|                 review_body|review_date|year|product_category|length_of_reviews|
+-----------+-----------+--------------+----------+--------------+-----------------------+-----------+-------------+-----------+----+-----------------+--------------------+----------------------------+-----------+----+----------------+-----------------+
|         US|   16868299|R1UQX4QQ4IU00P|B005I6EU7U|      90774528|   Verso Kindle Fire...|          5|            0|          0|   N|                Y|A Perfect Fit and...|        I purchased the P...| 2012-06-29|2012|              PC|   

In [20]:
#5. Percentiles for number of reviews per product

sample1 = df_final.groupby(F.col("product_category"))\
                  .agg(F.count(F.col("review_id")).alias("review_count"))
sample1.show(5)

colname = "review_count"
quantileProbs = [0.1, 0.25, 0.5, 0.75, 0.9, 0.95]
relError = 0.05

sample1.stat.approxQuantile(colname, quantileProbs, relError)

+--------------------+------------+
|    product_category|review_count|
+--------------------+------------+
|                  PC|     6897944|
|            Wireless|     9002606|
|Digital_Video_Dow...|     4115479|
|Digital_Ebook_Pur...|    17923458|
|               Books|    17134822|
+--------------------+------------+
only showing top 5 rows

[4115479.0, 5331078.0, 6897944.0, 17134822.0, 17923458.0, 17923458.0]

In [21]:
#6. Identify week number of year and product category with most positive reviews

df_weekno = df_final.withColumn("week_number", F.date_format(F.to_date("review_date", "yyyy-MM-dd"), "w"))
df_weekno.show(5)

df_weekno.groupby(F.col("product_category"), F.col("year"), F.col("week_number"))\
.agg(F.count(F.expr("star_rating >= 4")).alias("number_of_positive_reviews"))\
.sort("number_of_positive_reviews", ascending = False)\
.show(10)


+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+-----------------+-----------+----+----------------+-------+-----------+
|customer_id|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|verified_purchase|review_date|year|product_category|row_num|week_number|
+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+-----------------+-----------+----+----------------+-------+-----------+
|   10000034|R18KORVQBM4KTQ|B0096YOL56|     837621444|Trident Case AEGI...|          5|            0|          1|                Y| 2013-10-27|2013|        Wireless|      1|         44|
|   10000128|R10ZQ3KGV1I2XB|1421663260|     593136478|Golden Retrievers...|          4|            0|          0|                Y| 2011-01-04|2011|           Books|      1|          2|
|   10000166| RKNBFD2B5J2KL|B002DQL34G|     827931632|He-Man and the M

## Question 2

1. Using Spark Pivot functionality, produce DataFrame with following columns:
    1. Year
    2. Month
    3. Total number of reviews for "Digital eBook Purchase" category
    4. Total number of reviews for "Books" category
    5. Average stars for reviews for "Digital eBook Purchase" category
    6. Average stars for reviews for "Books" category

2. Produce two graphs to demonstrate aggregations from #1:
    1. Number of reviews
    2. Average stars

3. Identify similar products (books) in both categories. Use "product_title" to match products. To account for potential differences in naming of products, compare titles after stripping spaces and converting to lower case.
    1. Is there a difference in average rating for the similar books in digital and printed form?
    2. To answer #1, you may calculate number of items with high stars in digital form versus printed form, and vise versa.    Alternatively, you can make the conclusion by using appropriate pairwise statistic.

4. Using provided LDA starter notebook, perform LDA topic modeling for the reviews in Digital_Ebook_Purchase and Books categories. Consider reviews for the January of 2015 only.
    1. Perform LDA separately for reviews with 1/2 stars and reviews with 4/5 stars.
    2. Add stop words to the standard list as needed. In the example notebook, you can see some words like 34, br, p appear in the topics.
    3. Identify 5 top topics for each case (1/2 versus 4/5)
    4. Does topic modeling provides good approximation to number of stars given in the review?


In [22]:
#1. Using Spark Pivot Functionality, produce a dataframe with relevant columns

dfwithMonth = df_final.withColumn("month", F.month(F.col("review_date")))

categories_to_pivot = ['Digital_Ebook_Purchase','Books']

df_pivoted = dfwithMonth.groupby(F.col("year")).pivot("product_category", categories_to_pivot)\
.agg(F.count(F.col("review_id")).alias("number_of_reviews"),
     F.avg(F.col("star_rating")).alias("avg_review_stars"))

df_pivoted.show(10)
df_pivoted.cache()

+----+----------------------------------------+---------------------------------------+-----------------------+----------------------+
|year|Digital_Ebook_Purchase_number_of_reviews|Digital_Ebook_Purchase_avg_review_stars|Books_number_of_reviews|Books_avg_review_stars|
+----+----------------------------------------+---------------------------------------+-----------------------+----------------------+
|2007|                                     508|                      3.938976377952756|                 761029|     4.258168873985091|
|2015|                                 4609422|                      4.349698291889959|                2860728|     4.497382134897131|
|2006|                                      36|                      4.027777777777778|                 568385|     4.196546355023443|
|2013|                                 4569668|                      4.301699817142077|                2965949|     4.412498326842437|
|2014|                                 6723857|        

In [23]:
#2. Produce two graphs to demonstrate aggregations from #1

df_pivoted.coalesce(1).write.csv(path = 'hdfs:////user/livy/df_pivoted.csv', header = 'true')
#hdfs dfs -copyToLocal /user/livy/df_pivoted.csv ~/

In [25]:
#3. part-1

for i in df_final.columns:
    df_final = df_final.withColumn(i, F.ltrim(F.rtrim(df_final[i])))


df_final = df_final.withColumn('product_title', F.lower(F.col('product_title')))
df_final.show(5)

+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+-----------------+-----------+----+----------------+-------+
|customer_id|     review_id|product_id|product_parent|       product_title|star_rating|helpful_votes|total_votes|verified_purchase|review_date|year|product_category|row_num|
+-----------+--------------+----------+--------------+--------------------+-----------+-------------+-----------+-----------------+-----------+----+----------------+-------+
|   10000034|R18KORVQBM4KTQ|B0096YOL56|     837621444|trident case aegi...|          5|            0|          1|                Y| 2013-10-27|2013|        Wireless|      1|
|   10000128|R10ZQ3KGV1I2XB|1421663260|     593136478|golden retrievers...|          4|            0|          0|                Y| 2011-01-04|2011|           Books|      1|
|   10000166| RKNBFD2B5J2KL|B002DQL34G|     827931632|he-man and the ma...|          5|            1|          1|                Y

In [26]:
#3. part-2

df_books = df_final.select("*").where(F.col('product_category').like('%Books%'))
df_books = df_books.groupBy('product_title').agg(F.avg('star_rating'))

df_ebooks = df_final.select("*").where(F.col('product_category').like('%Digital_Ebook_Purchase%'))
df_ebooks = df_ebooks.groupBy('product_title').agg(F.avg('star_rating'))

In [27]:
df_books.show(5)

+--------------------+-----------------+
|       product_title| avg(star_rating)|
+--------------------+-----------------+
|practice makes pe...|            4.625|
|education and the...|              5.0|
|existentialism: a...|              3.5|
|dispatches from t...|4.534351145038168|
|particular scanda...|4.583333333333333|
+--------------------+-----------------+
only showing top 5 rows

In [28]:
df_ebooks.show(5)

+--------------------+-----------------+
|       product_title| avg(star_rating)|
+--------------------+-----------------+
|         xavier grey|              3.0|
|zombies ate my ne...|4.285714285714286|
|ceo's expectant s...|            3.625|
|simon: new orlean...|4.532846715328467|
|deadlocked 1 (dea...| 4.40460947503201|
+--------------------+-----------------+
only showing top 5 rows

In [29]:
#3. part-3

innerjoin = df_books.join(df_ebooks, df_books.product_title == df_ebooks.product_title)
innerjoin.show(5)

+--------------------+-----------------+--------------------+------------------+
|       product_title| avg(star_rating)|       product_title|  avg(star_rating)|
+--------------------+-----------------+--------------------+------------------+
|"rays of light": ...|              5.0|"rays of light": ...|               5.0|
|"the siege of khe...|4.315789473684211|"the siege of khe...| 3.326923076923077|
|          'dem bon'z|              5.0|          'dem bon'z|               5.0|
|   0400 roswell time|              5.0|   0400 roswell time|3.6666666666666665|
|10 smart things g...|              4.8|10 smart things g...| 4.833333333333333|
+--------------------+-----------------+--------------------+------------------+
only showing top 5 rows

There is a difference between average star ratings for the books in digital and printed form. 

## Amazon Reviews LDA

In [30]:
#Import ML Libraries

from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import CountVectorizer, IDF,RegexTokenizer, Tokenizer
from pyspark.sql.types import ArrayType
from pyspark.sql.types import StringType
from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.sql.functions import struct
import re
from pyspark.ml.feature import StopWordsRemover
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer

An error was encountered:
Session 0 unexpectedly reached final status 'dead'. See logs:
stdout: 

stderr: 
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_90 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_6 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_14 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_44 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_17 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_102 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_162 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_36 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_71 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpo

In [31]:
#For star rating 4 and 5

df_ml = df.filter((F.col("product_category")=="Digital_Ebook_Purchase") | (F.col("product_category")=="Books"))
                   & (F.col("year")==2015) \
                   & (F.col("review_date")<'2015-02-01')
                   & (F.col("star_rating")>3))

An error was encountered:
Session 0 unexpectedly reached final status 'dead'. See logs:
stdout: 

stderr: 
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_90 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_6 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_14 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_44 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_17 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_102 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_162 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_36 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_39_71 !
20/05/05 23:45:15 WARN BlockManagerMasterEndpo

## Create new DF with only narrative and unique ID

In [None]:
#from pyspark.sql.functions import monotonically_increasing_id, concat

df1 = df_ml.withColumn('review_text', 
                       F.concat(F.col('review_headline'),F.lit(' '), F.col('review_body')))
corpus =df1.select('review_text')

# This will return a new DF with all the columns + id
corpus_df = corpus.withColumn("id", F.monotonically_increasing_id())
# Remove records with no review text
corpus_df = corpus_df.dropna()

In [None]:
corpus_df.persist()
print('Corpus size:', corpus_df.count())
corpus_df.show(5)

In [None]:
corpus_df.printSchema()

## Tokenize Narrative text

In [None]:
tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
countTokens = udf(lambda words: len(words), IntegerType())
'''
tokenized_df = tokenizer.transform(corpus_df)
tokenized_df.select("review_text", "words").withColumn("tokens", countTokens(col("words"))).show() 
'''
regexTokenizer = RegexTokenizer(inputCol="review_text", 
                                outputCol="words",pattern="\\w+", gaps=False)
# alternatively, pattern="\\w+", gaps(False) pattern="\\W"

tokenized_df = regexTokenizer.transform(corpus_df)
tokenized_df.select("review_text", "words") \
    .withColumn("tokens", countTokens(F.col("words"))).show()

In [None]:
stop_words = ['a', 'about', 'above', 'across', 'after', 'afterwards', 'again', 'against', 'all', 'almost', 'alone', 'along', 'already', 'also', 'although', 'always', 'am', 'among', 'amongst', 'amoungst', 'amount', 'an', 'and', 'another', 'any', 'anyhow', 'anyone', 'anything', 'anyway', 'anywhere', 'are', 'around', 'as', 'at', 'back', 'be', 'became', 'because', 'become', 'becomes', 'becoming', 'been', 'before', 'beforehand', 'behind', 'being', 'below', 'beside', 'besides', 'between', 'beyond', 'bill', 'both', 'bottom', 'but', 'by', 'call', 'can', 'cannot', 'cant', 'co', 'computer', 'con', 'could', 'couldnt', 'cry', 'de', 'describe', 'detail', 'do', 'done', 'down', 'due', 'during', 'each', 'eg', 'eight', 'either', 'eleven', 'else', 'elsewhere', 'empty', 'enough', 'etc', 'even', 'ever', 'every', 'everyone', 'everything', 'everywhere', 'except', 'few', 'fifteen', 'fify', 'fill', 'find', 'fire', 'first', 'five', 'for', 'former', 'formerly', 'forty', 'found', 'four', 'from', 'front', 'full', 'further', 'get', 'give', 'go', 'had', 'has', 'hasnt', 'have', 'he', 'hence', 'her', 'here', 'hereafter', 'hereby', 'herein', 'hereupon', 'hers', 'herself', 'him', 'himself', 'his', 'how', 'however', 'hundred', 'i', 'ie', 'if', 'in', 'inc', 'indeed', 'interest', 'into', 'is', 'it', 'its', 'itself', 'keep', 'last', 'latter', 'latterly', 'least', 'less', 'ltd', 'made', 'many', 'may', 'me', 'meanwhile', 'might', 'mill', 'mine', 'more', 'moreover', 'most', 'mostly', 'move', 'much', 'must', 'my', 'myself', 'name', 'namely', 'neither', 'never', 'nevertheless', 'next', 'nine', 'no', 'nobody', 'none', 'noone', 'nor', 'not', 'nothing', 'now', 'nowhere', 'of', 'off', 'often', 'on', 'once', 'one', 'only', 'onto', 'or', 'other', 'others', 'otherwise', 'our', 'ours', 'ourselves', 'out', 'over', 'own', 'part', 'per', 'perhaps', 'please', 'put', 'rather', 're', 'same', 'see', 'seem', 'seemed', 'seeming', 'seems', 'serious', 'several', 'she', 'should', 'show', 'side', 'since', 'sincere', 'six', 'sixty', 'so', 'some', 'somehow', 'someone', 'something', 'sometime', 'sometimes', 'somewhere', 'still', 'such', 'system', 'take', 'ten', 'than', 'that', 'the', 'their', 'them', 'themselves', 'then', 'thence', 'there', 'thereafter', 'thereby', 'therefore', 'therein', 'thereupon', 'these', 'they', 'thick', 'thin', 'third', 'this', 'those', 'though', 'three', 'through', 'throughout', 'thru', 'thus', 'to', 'together', 'too', 'top', 'toward', 'towards', 'twelve', 'twenty', 'two', 'un', 'under', 'until', 'up', 'upon', 'us', 'very', 'via', 'was', 'we', 'well', 'were', 'what', 'whatever', 'when', 'whence', 'whenever', 'where', 'whereafter', 'whereas', 'whereby', 'wherein', 'whereupon', 'wherever', 'whether', 'which', 'while', 'whither', 'who', 'whoever', 'whole', 'whom', 'whose', 'why', 'will', 'with', 'within', 'without', 'would', 'yet', 'you', 'your', 'yours', 'yourself', 'yourselves', '', 'm', 'ich', 'y', 'zu']
stop_words = stop_words + ['br','book','34']

In [None]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tokenized_df1 = remover.transform(tokenized_df)
tokenized_df1.show(5)

stopwordList = stop_words

remover=StopWordsRemover(inputCol="filtered", outputCol="filtered_more" ,stopWords=stopwordList)
tokenized_df2 = remover.transform(tokenized_df1)
tokenized_df2.show(5)

## Vectorize (convert to numeric)

In [None]:
# Term Frequency Vectorization  - Option 2 (CountVectorizer)    : 
cv = CountVectorizer(inputCol="filtered_more", outputCol="features", vocabSize = 10000)
cvmodel = cv.fit(tokenized_df2)
featurized_df = cvmodel.transform(tokenized_df2)
vocab = cvmodel.vocabulary
featurized_df.select('filtered_more','features','id').show(5)

## This is our DF to train LDA model on

In [None]:
countVectors = featurized_df.select('features','id')
countVectors.persist()
print('Records in the DF:', countVectors.count())

## Train LDA model

In [None]:
#k=10 means 10 words per topic
lda = LDA(k=10, maxIter=10)
model = lda.fit(countVectors)

"""
ll = model.logLikelihood(countVectors)
lp = model.logPerplexity(countVectors)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(countVectors)
transformed.show(truncate=False)
"""

## Display words for top 10 topics

In [None]:
topics = model.describeTopics()   
topics_rdd = topics.rdd

topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()

for idx, topic in enumerate(topics_words):
    print ("topic: ", idx)
    print ("----------")
    for word in topic:
       print (word)
    print ("----------")

In [None]:
#For star rating 1 and 2

df_ml = df.filter((F.col("product_category")=="Digital_Ebook_Purchase") | (F.col("product_category")=="Books"))
                   & (F.col("year")==2015) \
                   & (F.col("review_date")<'2015-02-01')
                   & (F.col("star_rating")<3))

## Create new DF with only narrative and unique ID

In [None]:
#from pyspark.sql.functions import monotonically_increasing_id, concat

df1 = df_ml.withColumn('review_text', 
                       F.concat(F.col('review_headline'),F.lit(' '), F.col('review_body')))
corpus =df1.select('review_text')

# This will return a new DF with all the columns + id
corpus_df = corpus.withColumn("id", F.monotonically_increasing_id())
# Remove records with no review text
corpus_df = corpus_df.dropna()

In [None]:
corpus_df.persist()
print('Corpus size:', corpus_df.count())
corpus_df.show(5)

In [None]:
corpus_df.printSchema()

## Tokenize Narrative text

In [None]:
tokenizer = Tokenizer(inputCol="review_text", outputCol="words")
countTokens = udf(lambda words: len(words), IntegerType())
'''
tokenized_df = tokenizer.transform(corpus_df)
tokenized_df.select("review_text", "words").withColumn("tokens", countTokens(col("words"))).show() 
'''
regexTokenizer = RegexTokenizer(inputCol="review_text", 
                                outputCol="words",pattern="\\w+", gaps=False)
# alternatively, pattern="\\w+", gaps(False) pattern="\\W"

tokenized_df = regexTokenizer.transform(corpus_df)
tokenized_df.select("review_text", "words") \
    .withColumn("tokens", countTokens(F.col("words"))).show()

In [None]:
stop_words = ['a', 'about', 'above', 'across', 'after', 'afterwards', 'again', 'against', 'all', 'almost', 'alone', 'along', 'already', 'also', 'although', 'always', 'am', 'among', 'amongst', 'amoungst', 'amount', 'an', 'and', 'another', 'any', 'anyhow', 'anyone', 'anything', 'anyway', 'anywhere', 'are', 'around', 'as', 'at', 'back', 'be', 'became', 'because', 'become', 'becomes', 'becoming', 'been', 'before', 'beforehand', 'behind', 'being', 'below', 'beside', 'besides', 'between', 'beyond', 'bill', 'both', 'bottom', 'but', 'by', 'call', 'can', 'cannot', 'cant', 'co', 'computer', 'con', 'could', 'couldnt', 'cry', 'de', 'describe', 'detail', 'do', 'done', 'down', 'due', 'during', 'each', 'eg', 'eight', 'either', 'eleven', 'else', 'elsewhere', 'empty', 'enough', 'etc', 'even', 'ever', 'every', 'everyone', 'everything', 'everywhere', 'except', 'few', 'fifteen', 'fify', 'fill', 'find', 'fire', 'first', 'five', 'for', 'former', 'formerly', 'forty', 'found', 'four', 'from', 'front', 'full', 'further', 'get', 'give', 'go', 'had', 'has', 'hasnt', 'have', 'he', 'hence', 'her', 'here', 'hereafter', 'hereby', 'herein', 'hereupon', 'hers', 'herself', 'him', 'himself', 'his', 'how', 'however', 'hundred', 'i', 'ie', 'if', 'in', 'inc', 'indeed', 'interest', 'into', 'is', 'it', 'its', 'itself', 'keep', 'last', 'latter', 'latterly', 'least', 'less', 'ltd', 'made', 'many', 'may', 'me', 'meanwhile', 'might', 'mill', 'mine', 'more', 'moreover', 'most', 'mostly', 'move', 'much', 'must', 'my', 'myself', 'name', 'namely', 'neither', 'never', 'nevertheless', 'next', 'nine', 'no', 'nobody', 'none', 'noone', 'nor', 'not', 'nothing', 'now', 'nowhere', 'of', 'off', 'often', 'on', 'once', 'one', 'only', 'onto', 'or', 'other', 'others', 'otherwise', 'our', 'ours', 'ourselves', 'out', 'over', 'own', 'part', 'per', 'perhaps', 'please', 'put', 'rather', 're', 'same', 'see', 'seem', 'seemed', 'seeming', 'seems', 'serious', 'several', 'she', 'should', 'show', 'side', 'since', 'sincere', 'six', 'sixty', 'so', 'some', 'somehow', 'someone', 'something', 'sometime', 'sometimes', 'somewhere', 'still', 'such', 'system', 'take', 'ten', 'than', 'that', 'the', 'their', 'them', 'themselves', 'then', 'thence', 'there', 'thereafter', 'thereby', 'therefore', 'therein', 'thereupon', 'these', 'they', 'thick', 'thin', 'third', 'this', 'those', 'though', 'three', 'through', 'throughout', 'thru', 'thus', 'to', 'together', 'too', 'top', 'toward', 'towards', 'twelve', 'twenty', 'two', 'un', 'under', 'until', 'up', 'upon', 'us', 'very', 'via', 'was', 'we', 'well', 'were', 'what', 'whatever', 'when', 'whence', 'whenever', 'where', 'whereafter', 'whereas', 'whereby', 'wherein', 'whereupon', 'wherever', 'whether', 'which', 'while', 'whither', 'who', 'whoever', 'whole', 'whom', 'whose', 'why', 'will', 'with', 'within', 'without', 'would', 'yet', 'you', 'your', 'yours', 'yourself', 'yourselves', '', 'm', 'ich', 'y', 'zu']
stop_words = stop_words + ['br','book','34']

In [None]:
remover = StopWordsRemover(inputCol="words", outputCol="filtered")
tokenized_df1 = remover.transform(tokenized_df)
tokenized_df1.show(5)

stopwordList = stop_words

remover=StopWordsRemover(inputCol="filtered", outputCol="filtered_more" ,stopWords=stopwordList)
tokenized_df2 = remover.transform(tokenized_df1)
tokenized_df2.show(5)

## Vectorize (convert to numeric)

In [None]:
# Term Frequency Vectorization  - Option 2 (CountVectorizer)    : 
cv = CountVectorizer(inputCol="filtered_more", outputCol="features", vocabSize = 10000)
cvmodel = cv.fit(tokenized_df2)
featurized_df = cvmodel.transform(tokenized_df2)
vocab = cvmodel.vocabulary
featurized_df.select('filtered_more','features','id').show(5)

## This is our DF to train LDA model on

In [None]:
countVectors = featurized_df.select('features','id')
countVectors.persist()
print('Records in the DF:', countVectors.count())

## Train LDA model

In [None]:
#k=10 means 10 words per topic
lda = LDA(k=10, maxIter=10)
model = lda.fit(countVectors)

"""
ll = model.logLikelihood(countVectors)
lp = model.logPerplexity(countVectors)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

# Describe topics.
topics = model.describeTopics(3)
print("The topics described by their top-weighted terms:")
topics.show(truncate=False)

# Shows the result
transformed = model.transform(countVectors)
transformed.show(truncate=False)
"""

## Display words for top 10 topics

In [None]:
topics = model.describeTopics()   
topics_rdd = topics.rdd

topics_words = topics_rdd\
       .map(lambda row: row['termIndices'])\
       .map(lambda idx_list: [vocab[idx] for idx in idx_list])\
       .collect()

for idx, topic in enumerate(topics_words):
    print ("topic: ", idx)
    print ("----------")
    for word in topic:
       print (word)
    print ("----------")