In [1]:
# import required libraries and functions
import requests
from pyspark.sql.functions import countDistinct, avg, stddev
from pyspark.sql import SparkSession
import pandas as pd
# plotting 
import seaborn as sns
import matplotlib.pyplot as plt
sns.set(style="whitegrid")
from pandas import datetime


In [2]:
# download data from source
# data source: https://s3.amazonaws.com/amazon-reviews-pds/tsv/index.txt

url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_multilingual_US_v1_00.tsv.gz"
target_path = "amazon_reviews_multilingual_US_v1_00.tsv.gz"
response = requests.get(url, stream=True)
if response.status_code == 200:
    with open(target_path, 'wb') as f:
        f.write(response.raw.read())
else:
  print("Unable to download file")

In [3]:
%sh
gunzip amazon_reviews_multilingual_US_v1_00.tsv.gz

In [4]:
# moving data from driver node to dbfs file system
dbutils.fs.mv("file:/databricks/driver/amazon_reviews_multilingual_US_v1_00.tsv", "dbfs:/tmp/amazon_reviews_multilingual_US_v1_00.tsv")  

In [5]:
# load data on spark cluster
df = spark.read.format("csv").option("header","true").option("inferSchema", "true").option("sep", "\t").load("dbfs:/tmp/amazon_reviews_multilingual_US_v1_00.tsv")

In [6]:
# print first line of the data
df.take(1)

In [7]:
# total no. of reviews
df.count()

In [8]:
# Number of distinct product titles
df.select(countDistinct("product_title").alias("Distinct products")).show()

In [9]:
# Number of different product caetegories
df.select(countDistinct("product_category").alias("Distinct product category")).show()

In [10]:
# total number of unique cutsomers
df.select(countDistinct("customer_id").alias("Number of Customers ")).show()

In [11]:
# check for null values
from pyspark.sql.functions import isnull, when, count, col
display(df.select([count(when(isnull(c), c)).alias(c) for c in df.columns]))

marketplace,customer_id,review_id,product_id,product_parent,product_title,product_category,star_rating,helpful_votes,total_votes,vine,verified_purchase,review_headline,review_body,review_date
0,0,0,0,0,0,1,1,1,1,1,1,12,572,615


In [12]:
df = df.filter(df.star_rating.isNotNull())

In [13]:
# distribution of star ratings
display(df.groupby('star_rating').count())

star_rating,count
3,536717
5,4441940
1,406653
4,1266311
2,279544


In [14]:
display(df.groupby('star_rating').count().orderBy('star_rating'))

star_rating,count
1,406653
2,279544
3,536717
4,1266311
5,4441940


In [15]:
# reviews by product_category
display(df.groupby('product_category').count())

product_category,count
PC,57041
Lawn and Garden,1212
Kitchen,1843
Home Entertainment,36524
Home Improvement,3731
Home,2007
Wireless,22762
Video,46715
Digital_Video_Download,1058097
Luggage,78


In [16]:
# count of reviews by product category 
df_product = df.groupby(['product_category','product_title']).count().orderBy('product_category').toPandas()


In [17]:
# Product with maximum reviews and its product category
display(df_product)

product_category,product_title,count
,"Emma (Large Print)	Books	5	0	0	N	Y	Follows Austen's book closley	I have the movie with Ms Paltrow starring, so I bought the book to see how it matched. Very well done.	2012-12-22",1
Apparel,Swatch Faux Fox Black Dial Plastic Orange Silicone Quartz Men's Watch SUOB709,20
Apparel,Naruto: Frog Plush Coin Purse,2
Apparel,Lansky Standard Knife Sharpening System Kit,1
Apparel,Naruto Frog Coin Purse,6
Apparel,Michael Kors Women's Bradshaw Gold Bracelet Brown Dial Watch MK5696,33
Apparel,RAM Mounts RAM Double Socket Arm for 1in. Ball Bases RAM-B-201U,2
Apparel,Sony LCJHN Two Part Case for HX50 Camera,1
Apparel,Ricoh GC-5 Leather Case for Ricoh GR Camera Black,1
Apparel,Swatch Women's Menthol Tone LK292G Silver Stainless Steel Swiss Quartz with Silver Dial,37


In [18]:
# average star rating for every product category
category_wise_rating = df.select('product_category','star_rating')
category_avg_rating_df = category_wise_rating.groupby('product_category').agg({'star_rating':'avg'}).toPandas()

In [19]:
# average star rating for product category
display(category_avg_rating_df)

product_category,avg(star_rating)
PC,4.197471993829001
Lawn and Garden,4.128712871287129
Kitchen,3.9934888768312535
Home Entertainment,4.036797722045778
Home Improvement,4.131064057893326
Home,4.052316890881913
Wireless,4.031939196907126
Video,4.2412715401905166
Digital_Video_Download,4.241790686487156
Luggage,4.064102564102564


In [20]:
# plot product category average star rating sorted in desceding order
sns.set(rc={'figure.figsize':(20, 10)})
sorted_df = category_avg_rating_df.sort_values(['avg(star_rating)'], ascending=False).reset_index(drop=True)
sns.barplot(x="avg(star_rating)", y="product_category", data=sorted_df,
            label="Total", color="green")
display(sns.despine())

In [21]:

prod_cat_votes_df = df.groupby('product_category').agg({'total_votes':'avg', 'helpful_votes':'avg'}).toPandas()
prod_cat_votes_sorted_df = prod_cat_votes_df.sort_values(['avg(total_votes)'], ascending=False).reset_index(drop=True)
prod_cat_votes_sorted_df_20 = prod_cat_votes_sorted_df[:20]
display(prod_cat_votes_sorted_df_20)



product_category,avg(total_votes),avg(helpful_votes)
Mobile_Electronics,9.076086956521738,6.967391304347826
Video,6.967055549609333,3.915102215562453
Books,6.0892584644730565,4.022758702908917
Video DVD,4.690281396608216,2.541025229604535
Music,4.437005027577838,2.728022807096397
Office Products,3.894509295287505,3.41980112408128
Kitchen,3.252306022788931,2.8616386326641345
Mobile_Apps,3.243587509146653,2.5243312855227544
Electronics,3.1278471070774065,2.3934025656689064
Health & Personal Care,3.118580765639589,2.6022408963585435


In [22]:
# Average votes and helpful votes proportion for top 10 most voted product category
sns.set(rc={'figure.figsize':(16, 8)})
#fig, axs = plt.subplots(1,2,sharex='col', sharey='row')
prod_cat_hfvotes_sorted_df = prod_cat_votes_df.sort_values(['avg(helpful_votes)'], ascending=False).reset_index(drop=True)
prod_cat_hfvotes_sorted_df_10 = prod_cat_hfvotes_sorted_df[:10]
prod_cat_votes_sorted_df_10 = prod_cat_votes_sorted_df[:10]
p1 = sns.barplot(x="avg(total_votes)", y="product_category", data=prod_cat_votes_sorted_df_10, label = "Total",color="blue")
#display(sns.despine())
p2 = sns.barplot(x="avg(helpful_votes)", y="product_category", data=prod_cat_hfvotes_sorted_df_10, label = "Total",color="green")
display(sns.despine())

In [23]:
# number of reviews over the 20 years
from pyspark.sql.functions import col, year
display(df.withColumn("review_date", year(col("review_date"))).groupBy("review_date").count().orderBy('review_date'))

review_date,count
,615
1995.0,9
1996.0,192
1997.0,1753
1998.0,11274
1999.0,38859
2000.0,79138
2001.0,70521
2002.0,77795
2003.0,90169


In [24]:
#from pyspark.mllib.feature import HashingTF, IDF
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer

In [25]:
#parsed_review_headline_text = df.rdd.map(lambda line: line['review_headline'].lower().split(' ') if line['review_headline'] else [])
#parsed_review_headline_text.take(5)
tokenizer = Tokenizer(inputCol="review_headline", outputCol="headline_words")
tokenized = tokenizer.transform(df.filter(df.review_headline.isNotNull()))
remover = StopWordsRemover(inputCol="headline_words", outputCol="headline_words_filtered")
filtered_set = remover.transform(tokenized)

In [26]:
counts = filtered_set.rdd.flatMap(lambda line:line['headline_words_filtered']) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
headline_word_counts = counts.collect()

In [27]:
# top 20 most frequent words in review headline
display(sorted(headline_word_counts, key= lambda x:x[1], reverse=True)[:20])

_1,_2
stars,970184
five,707029
great,641899
good,357925
love,266289
book,220111
best,209509
movie,206368
game,191459
fun,187522


In [28]:
# tokenize the words in the review body for length distribution
body_tokenizer = Tokenizer(inputCol="review_body", outputCol="body_words")
tokenized_body = body_tokenizer.transform(df.filter(df.review_body.isNotNull()))

In [29]:
buckets = [b for b in range(0, 200, 10)]
rdd_histogram_data = tokenized_body.rdd.map(lambda line:len(line['body_words'])).histogram(buckets)

In [30]:
# distribution of length of words used by the customers in reviews
sns.set(rc={'figure.figsize':(16, 8)})
plt.hist(rdd_histogram_data[0][:-1], bins=rdd_histogram_data[0], weights=rdd_histogram_data[1])
display(plt.show())
