In [1]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:9 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [76.0 kB]
Get:10 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:13 https://developer.download.nvi

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("WatchETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [3]:
from pyspark import SparkFiles
# Load data from S3 bucket into a DataFrame
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv.gz"), inferSchema=True, sep='\t')
df.show(10)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...| 2015-08-31|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...|         Watches|          5|    

In [4]:
# Total no of rows in this dataset
df.count()

960872

## Examine the schema

In [5]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [6]:
#Drop rows with null values
df=df.dropna()

In [7]:
#Drop duplicates
df=df.distinct()

In [8]:
#Check row count after dropping duplicates
df.count()

960679

In [9]:
#Change Review date column type from string to date
from pyspark.sql.functions import to_date
 
df = df.withColumn('ReviewDate',to_date(df.review_date, 'yyyy-MM-dd'))
df.printSchema()



root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)
 |-- ReviewDate: date (nullable = true)



In [10]:
#drop old review_date column
df=df.drop('review_date')

In [11]:
# rename new review_date column
df=df.withColumnRenamed('ReviewDate','review_date',)
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)



In [12]:
# Separate dataframe for Vine Customers
df_vine=df.filter(df['vine']=='Y')

In [13]:
#Separate dataframe for Non Vine Customers
df_nonvine=df.filter(df['vine']=='N')

## Analysis for vine customers

In [14]:
#Checking summary statistics of starratings , helpful votes and total votes for vine customers 
vine_summary_analysis_df = df_vine.select(["star_rating","helpful_votes","total_votes"]).describe()

print("Summary statistics for VINE CUSTOMERS")
vine_summary_analysis_df.show()

Summary statistics for VINE CUSTOMERS
+-------+------------------+------------------+------------------+
|summary|       star_rating|     helpful_votes|       total_votes|
+-------+------------------+------------------+------------------+
|  count|              1747|              1747|              1747|
|   mean| 4.034344590726961|2.8580423583285635| 3.712077847738981|
| stddev|0.9366959934006931|11.039313131908221|12.010833577598133|
|    min|                 1|                 0|                 0|
|    max|                 5|               349|               370|
+-------+------------------+------------------+------------------+



In [15]:
# Checking total votes for each star ratings
from pyspark.sql.functions import mean, min, max, count,sum
df_vine.groupBy('star_rating').sum('total_votes').show()


+-----------+----------------+
|star_rating|sum(total_votes)|
+-----------+----------------+
|          1|             172|
|          3|             988|
|          5|            2023|
|          4|            2956|
|          2|             346|
+-----------+----------------+



In [16]:
# Checking helpful votes for each star ratings
df_vine.groupBy('star_rating').sum('helpful_votes').show()

+-----------+------------------+
|star_rating|sum(helpful_votes)|
+-----------+------------------+
|          1|               121|
|          3|               694|
|          5|              1604|
|          4|              2361|
|          2|               213|
+-----------+------------------+



In [17]:
# Checking review count where helpful votes greater than 5
df_vine.select().where(df_vine.helpful_votes>5).count()

188

In [18]:
# Checking review count where total votes greater than 5
df_vine.select().where(df_vine.total_votes>5).count()

272

In [19]:
#checking average star rating for products where helpful votes greater than 5
df_vine_star_helpful=df.filter(df_vine.helpful_votes>5)
df_vine_star_helpful.select(mean('star_rating')).show()

+------------------+
|  avg(star_rating)|
+------------------+
|3.8673150146727138|
+------------------+



In [20]:
#checking average star rating for products where total votes greater than 5
df_vine_star_total=df.filter(df_vine.total_votes>5)
df_vine_star_total.select(mean('star_rating')).show()

+-----------------+
| avg(star_rating)|
+-----------------+
|3.600273761055735|
+-----------------+



In [21]:
#Checking number of reviews where total votes and helpful votes both greater than 5
df_vine.select().where((df_vine.total_votes>5) & (df_vine.helpful_votes>5) ).count()

188

In [22]:
#Checking average starrating for products where total_votes and helpful votes both greater than 5
df_vine_star=df.filter((df_vine.total_votes>5)&(df_vine.helpful_votes>5))
df_vine_star.select(mean('star_rating')).show()

+------------------+
|  avg(star_rating)|
+------------------+
|3.8673150146727138|
+------------------+



In [23]:
# Checking number of reviews where star_rating is 5
df_vine_5star=df_vine.filter(df_vine.star_rating==5)
df_vine_5star.select('review_id').distinct().count()

605

In [24]:
#Checking 20 products where starrating is 1 with helpful votes in descending order
from pyspark.sql.functions import desc

low_vine_helpful_votes = df_vine.orderBy(df_vine.helpful_votes.desc())
low_vine_helpful_votes = low_vine_helpful_votes.filter('star_rating = 1')

print("Worst 20 rated products with the top helpful votes for VINE CUSTOMERS")
low_vine_helpful_votes.show()

Worst 20 rated products with the top helpful votes for VINE CUSTOMERS
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   48164338| REZLEYIW61ZA1|B005DRE2LI|     432683687|Invicta Men's 143...|         Watches|          1|           26|         29|   Y|                N|Very large, easy ...|This watch comes ...| 2011-09-20|
|         US|   43713823|R2UGJVWDAPAFN3|B00542NE98

## Analysis for non_vine custimers

In [25]:
#Checking summary statistics of starratings , helpful votes and total votes for nonvine customers 
nonvine_summary_analysis_df = df_nonvine.select(["star_rating","helpful_votes","total_votes"]).describe()

print("Summary statistics for NON_VINE CUSTOMERS")
nonvine_summary_analysis_df.show()

Summary statistics for NON_VINE CUSTOMERS
+-------+------------------+------------------+-----------------+
|summary|       star_rating|     helpful_votes|      total_votes|
+-------+------------------+------------------+-----------------+
|  count|            958932|            958932|           958932|
|   mean| 4.138437344879512|1.1928822898808258|1.556204193832305|
| stddev|1.2938410587847065| 8.153765015489055|9.061132029872812|
|    min|                 1|                 0|                0|
|    max|                 5|              4004|             4249|
+-------+------------------+------------------+-----------------+



In [26]:
# Checking total votes for each star ratings for nonvine customers
from pyspark.sql.functions import mean, min, max, count,sum
df_nonvine.groupBy('star_rating').sum('total_votes').show()

+-----------+----------------+
|star_rating|sum(total_votes)|
+-----------+----------------+
|          1|          253208|
|          3|          140270|
|          5|          753157|
|          4|          238579|
|          2|          107080|
+-----------+----------------+



In [27]:
# Checking helpful votes for each star ratings for nonvine customers
df_nonvine.groupBy('star_rating').sum('helpful_votes').show()

+-----------+------------------+
|star_rating|sum(helpful_votes)|
+-----------+------------------+
|          1|            152988|
|          3|             97523|
|          5|            629342|
|          4|            195129|
|          2|             68911|
+-----------+------------------+



In [28]:
# Checking review count where helpful votes greater than 5 for nonvine customers
df_nonvine.select().where(df_nonvine.helpful_votes>5).count()

41045

In [29]:
# Checking review count where total votes greater than 5 for nonvine customers
df_nonvine.select().where(df_nonvine.total_votes>5).count()

56712

In [30]:
#checking average star rating for products where helpful votes greater than 5
df_nonvine_star_helpful=df.filter(df_nonvine.helpful_votes>5)
df_nonvine_star_helpful.select(mean('star_rating')).show()

+------------------+
|  avg(star_rating)|
+------------------+
|3.8673150146727138|
+------------------+



In [31]:
#checking average star rating for products where total votes greater than 5
df_nonvine_star_total=df_nonvine.filter(df_nonvine.total_votes>5)
df_nonvine_star_total.select(mean('star_rating')).show()

+-----------------+
| avg(star_rating)|
+-----------------+
|3.598532938355198|
+-----------------+



In [32]:
#Checking number of reviews where total votes and helpful votes both greater than 5
df_nonvine.select().where((df_nonvine.total_votes>5) & (df_nonvine.helpful_votes>5) ).count()

41045

In [33]:
#Checking average starrating for products where total_votes and helpful votes both greater than 5
df_nonvine_star=df_nonvine.filter((df_nonvine.total_votes>5)&(df_nonvine.helpful_votes>5))
df_nonvine_star.select(mean('star_rating')).show()

+-----------------+
| avg(star_rating)|
+-----------------+
|3.866634181995371|
+-----------------+



In [34]:
# Checking number of reviews where star_rating is 5
df_nonvine_5star=df_nonvine.filter(df_nonvine.star_rating==5)
df_nonvine_5star.count()

570888

In [35]:
#Checking 20 products where starrating is 1 with helpful votes in descending order
from pyspark.sql.functions import desc

low_nonvine_helpful_votes = df_nonvine.orderBy(df_nonvine.helpful_votes.desc())
low_nonvine_helpful_votes = low_nonvine_helpful_votes.filter('star_rating = 1')

print("Worst 20 rated products with the top helpful votes for NON VINE CUSTOMERS")
low_nonvine_helpful_votes.show()

Worst 20 rated products with the top helpful votes for NON VINE CUSTOMERS
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   18765196|R3PA20VZXRWHTJ|B001K3IXW8|     728413681|Zenith Men's 96.0...|         Watches|          1|         1372|       1498|   N|                N|$9.95 shipping......|I had decided on ...| 2010-06-21|
|         US|   51277649|R1CCAPD7CHQ6V7|B001K3

## Analysis on Amazon Vine Program:

1. We can see that the percentage of 5-star reviews in Vine is not very close to non-Vine reviews (34.63% for vine to 59.53% for nonvine).

2. Both average star rating for vine and non vine customers is near 4%

3. Both or vine and nonvine customers, numbers of reviews with only helpful votes  greater than 5 and with both helpful votes and total votes greater than 5 are same.

4. Apart from 5 products, all other products rated 1 don't have helpful vote greater than 5 in case of vine customers.

5. In case of both vine and non vine customers average starrating is around 3.5 for products with helpful and total votes greater than 5. 


So based on the above results it can be considered that Amzon Vine Reviews are not biased.