In [1]:
# DELIVERABLE 2
# Using PySpark to determine bias of Vine reviews data
import os

spark_version = "spark-3.3.1"
os.environ['SPARK_VERSION'] = spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.39)] [0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Hit:4 http://archive.ubuntu.com/ubuntu

In [2]:
# Start spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16_Review_Analysis").getOrCreate()

In [3]:
# Load Amazon data into Spark data frame
# The dataset chosen to analyze:  Watches (amazon_reviews_us_Watches_v1_00.tsv.gz)
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...|2015-08-31 00:00:00|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...| 

In [4]:
# Get count of rows in data frame
df.count()

960872

In [5]:
# Drop missing values from data frame
clean_df = df.dropna()
clean_df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|    3653882|R3O9SGZBVQBV76|B00FALQ1ZC|     937001370|Invicta Women's 1...|         Watches|          5|            0|          0|   N|                Y|          Five Stars|Absolutely love t...|2015-08-31 00:00:00|
|         US|   14661224| RKH8BNC3L5DLF|B00D3RGO20|     484010722|Kenneth Cole New ...| 

In [6]:
# Get the row count after dropping missing values
clean_df.count()

960679

In [7]:
# Create a vine_table to analyze
vine_df = clean_df.select(["review_id","star_rating", "helpful_votes","total_votes","vine","verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R3O9SGZBVQBV76|          5|            0|          0|   N|                Y|
| RKH8BNC3L5DLF|          5|            0|          0|   N|                Y|
|R2HLE8WKZSU3NL|          2|            1|          1|   N|                Y|
|R31U3UH5AZ42LL|          5|            0|          0|   N|                Y|
|R2SV659OUJ945Y|          4|            0|          0|   N|                Y|
| RA51CP8TR5A2L|          5|            0|          0|   N|                Y|
| RB2Q7DLDN6TH6|          5|            1|          1|   N|                Y|
|R2RHFJV0UYBK3Y|          1|            1|          5|   N|                N|
|R2Z6JOQ94LFHEP|          5|            1|          2|   N|                Y|
| RX27XIIWY5JPB|          4|            0|          0|   N|     

In [8]:
# Get the data types of vine_df
vine_df.dtypes

[('review_id', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string'),
 ('verified_purchase', 'string')]

In [9]:
# Filter data and create a new DataFrame to retrieve rows where the total_votes count >= 20 
from pyspark.sql.functions import col
total_votes_df = vine_df.filter(col("total_votes") >= 20)
total_votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R14W2VCHHK5V7W|          5|           19|         20|   N|                Y|
|R1S3T57O3OYT5S|          5|           19|         20|   N|                Y|
|R1BTWIBLYYVOV7|          5|           30|         30|   N|                Y|
| R6F9VY91ADPLA|          1|            8|         30|   N|                N|
|R3PXNV89DFIXKV|          5|           35|         37|   N|                Y|
|R2ZF9NYVT3J7D6|          5|           19|         22|   N|                Y|
|R20NYA5V0UF9NE|          5|           27|         28|   N|                Y|
|R2X8FZRUOS8R8C|          4|           25|         26|   N|                Y|
|R2D8IMBVX3XCLF|          1|           14|         20|   N|                Y|
|R25UD9TA63L3Q8|          5|           25|         27|   N|     

In [10]:
# count of reviews where total_votes >= 20 
total_votes_df.count()

9612

In [11]:
# Filter the data frame created above and create a new data frame to retrieve all the rows 
# where the number of helpful_votes divided by total_votes is equal to or greater than 50%.
percent_helpful_votes_df = total_votes_df.withColumn("percent_helpful_votes", col('helpful_votes')/col('total_votes')).alias("percent_helpful_votes").filter(col("percent_helpful_votes") >= 0.5)
percent_helpful_votes_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|percent_helpful_votes|
+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|R14W2VCHHK5V7W|          5|           19|         20|   N|                Y|                 0.95|
|R1S3T57O3OYT5S|          5|           19|         20|   N|                Y|                 0.95|
|R1BTWIBLYYVOV7|          5|           30|         30|   N|                Y|                  1.0|
|R3PXNV89DFIXKV|          5|           35|         37|   N|                Y|   0.9459459459459459|
|R2ZF9NYVT3J7D6|          5|           19|         22|   N|                Y|   0.8636363636363636|
|R20NYA5V0UF9NE|          5|           27|         28|   N|                Y|   0.9642857142857143|
|R2X8FZRUOS8R8C|          4|           25|         26|   N|                Y|   0.9615384615384616|


In [12]:
# count of atleast 50% of helpful reviews that have >20 total votes
percent_helpful_votes_df.count()

8390

In [13]:
# Filter the DataFrame created above, and create a new DataFrame that retrieves all the rows 
# where a review was written as part of the Vine program (paid), vine == 'Y'.
paid_reviews_df = percent_helpful_votes_df.filter(col('vine') == 'Y')
paid_reviews_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|percent_helpful_votes|
+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|R1B7M0OP3UNP6O|          5|           49|         52|   Y|                N|   0.9423076923076923|
|R2UUV4UGGYMQG8|          5|           34|         39|   Y|                N|   0.8717948717948718|
| R9K0LZV2BK9YY|          4|           37|         39|   Y|                N|   0.9487179487179487|
|R2OVFLNEUEGTJM|          3|           18|         25|   Y|                N|                 0.72|
| RBE09ELJ77LQ0|          5|           44|         45|   Y|                N|   0.9777777777777777|
|R3867T8AIJJHM6|          5|           26|         27|   Y|                N|   0.9629629629629629|
|R1FNVUXPU63WOZ|          4|           43|         48|   Y|                N|   0.8958333333333334|


In [14]:
# Get the number of paid reviews (that are 50% helpful and have >=20 total votes)
paid_reviews_df.count()

47

In [15]:
# Repeat above Step to retrieve all the rows where the review was not part of the Vine program (unpaid), vine == 'N'.
unpaid_reviews_df = percent_helpful_votes_df.filter(col('vine') == 'N')
unpaid_reviews_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|percent_helpful_votes|
+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|R14W2VCHHK5V7W|          5|           19|         20|   N|                Y|                 0.95|
|R1S3T57O3OYT5S|          5|           19|         20|   N|                Y|                 0.95|
|R1BTWIBLYYVOV7|          5|           30|         30|   N|                Y|                  1.0|
|R3PXNV89DFIXKV|          5|           35|         37|   N|                Y|   0.9459459459459459|
|R2ZF9NYVT3J7D6|          5|           19|         22|   N|                Y|   0.8636363636363636|
|R20NYA5V0UF9NE|          5|           27|         28|   N|                Y|   0.9642857142857143|
|R2X8FZRUOS8R8C|          4|           25|         26|   N|                Y|   0.9615384615384616|


In [16]:
# Get the number of unpaid reviews (with 50% helpful votes and have >=20 total votes)
unpaid_reviews_df.count()

8343

In [17]:
# Determine the total number of reviews: count(col("vine")
# the number of 5-star reviews: count(when(col("star_rating") == 5, True))
# the percentage of 5-star reviews:  count(when(col("star_rating") == 5, True))/count(col("vine") * 100
# for the two types of review (paid vs unpaid): df.groupBy("vine")
from pyspark.sql.functions import count, when

review_summary_df = percent_helpful_votes_df.groupBy("vine").agg(count(col("vine")).alias("total_reviews"),
                                                            count(when(col("star_rating") == 5, True)).alias("five_star_reviews"),
                                                            (count(when(col("star_rating") == 5, True))/count(col("vine")) * 100).alias("percent_five_star_reviews"))
review_summary_df.show()

+----+-------------+-----------------+-------------------------+
|vine|total_reviews|five_star_reviews|percent_five_star_reviews|
+----+-------------+-----------------+-------------------------+
|   Y|           47|               15|       31.914893617021278|
|   N|         8343|             4318|        51.75596308282392|
+----+-------------+-----------------+-------------------------+



In [18]:
# Deliverable 3 : Further analysis
# Case 1: Summarize vine_df without filtering (for five star ratings and % helpful votes and >=20 total votes)
case_one_df = vine_df.groupBy("vine").agg(count(col("vine")).alias("total_reviews"),
                                          count(when(col("star_rating") == 5, True)).alias("five_star_reviews"),
                                          (count(when(col("star_rating") == 5, True))/count(col("vine")) * 100).alias("percent_five_star_reviews"))
case_one_df.show()

+----+-------------+-----------------+-------------------------+
|vine|total_reviews|five_star_reviews|percent_five_star_reviews|
+----+-------------+-----------------+-------------------------+
|   Y|         1747|              605|       34.630795649685176|
|   N|       958932|           570888|         59.5337312760446|
+----+-------------+-----------------+-------------------------+



In [19]:
# Case 2: Summarize vine_df with average review ratings for the two types of review (paid vs unpaid)
case_two_df = vine_df.groupBy("vine").agg({"vine":"count","star_rating":"avg"})
case_two_df.show()

+----+-----------------+-----------+
|vine| avg(star_rating)|count(vine)|
+----+-----------------+-----------+
|   Y|4.034344590726961|       1747|
|   N|4.138437344879512|     958932|
+----+-----------------+-----------+

