In [2]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease [1,581 B]
Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:5 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:6 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:8 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:9 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:11 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Packages [1,039 kB]
Hit:12 http://ppa.launchpad.net/deadsnakes/ppa/ub

In [3]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-12-01 14:41:38--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2022-12-01 14:41:38 (4.82 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [5]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Ebook_Purchase_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Digital_Ebook_Purchase_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)


In [6]:
# Drop incomplete rows
df = df.dropna()
df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   35471030|R1C5OK9AND7PRP|B00AHK07V0|     175130663|Hunter's Moon (A ...|Digital_Ebook_Pur...|          5|            0|          0|   N|                Y|Shugak is like a ...|This is the most ...|2015-08-31 00:00:00|
|         US|   26579324|R333RNBQMPVUFT|B014085WTQ|     859232728|Flying

In [7]:
# Create the vine_table DataFrame
vine_df = df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R1C5OK9AND7PRP|          5|            0|          0|   N|                Y|
|R333RNBQMPVUFT|          5|            0|          0|   N|                Y|
|R2A2K0GTNSKXM1|          5|            0|          0|   N|                N|
|R365LAQ9REV876|          3|            0|          0|   N|                Y|
|R294OF3SNH6SWZ|          4|            0|          0|   N|                Y|
|R2OMRH58WYEEP9|          5|            0|          0|   N|                N|
| RBFXYUJZIH5JR|          5|            0|          0|   N|                Y|
|R2K2RSBNV42HU4|          5|            0|          0|   N|                N|
|R2CLRFFJ5HJSU3|          4|            0|          0|   N|                N|
| RS1O7V45AADDO|          4|            0|          0|   N|     

In [8]:
# Filter DataFrame for total_votes equal to or greater than 20
high_total_votes_df = vine_df.filter(vine_df.total_votes >= 20)
high_total_votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2HQLKREFKG25D|          5|           21|         21|   N|                Y|
| RWIL1XU6YM0U4|          5|           20|         20|   N|                N|
|R28SB7Q1WBJRFM|          5|           25|         27|   N|                N|
|R3PVKAWMZW55U8|          5|          106|        119|   N|                Y|
|R21JS1HIQV1H7W|          3|           17|         22|   N|                Y|
|R2IBPJ9OIDSZHW|          3|            0|         37|   N|                Y|
|R3SR8MPHQATFCJ|          1|            8|         20|   N|                Y|
|R245IE9DATP3ZL|          3|           15|         23|   N|                Y|
| RB1Y3RO4J4854|          3|           23|         30|   N|                Y|
|R23P75H9VX5RLM|          5|           32|         36|   N|     

In [9]:
# Filter DataFrame for helpful_votes ratio equal to or greater than 50%.
most_helpful_votes = high_total_votes_df.filter((high_total_votes_df.helpful_votes / high_total_votes_df.total_votes) >= 0.5)
most_helpful_votes.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2HQLKREFKG25D|          5|           21|         21|   N|                Y|
| RWIL1XU6YM0U4|          5|           20|         20|   N|                N|
|R28SB7Q1WBJRFM|          5|           25|         27|   N|                N|
|R3PVKAWMZW55U8|          5|          106|        119|   N|                Y|
|R21JS1HIQV1H7W|          3|           17|         22|   N|                Y|
|R245IE9DATP3ZL|          3|           15|         23|   N|                Y|
| RB1Y3RO4J4854|          3|           23|         30|   N|                Y|
|R23P75H9VX5RLM|          5|           32|         36|   N|                Y|
|R3JCH5P63TIQ4E|          5|           19|         20|   N|                N|
| RYA7IZ8VHN5P8|          2|           37|         42|   N|     

In [10]:
# Create a new DataFrame that retrieves all the rows where a review was written as part of the Vine program (paid), vine == 'Y'
paid_helpful_df = most_helpful_votes.filter(most_helpful_votes.vine == 'Y')
paid_helpful_df.show()

+---------+-----------+-------------+-----------+----+-----------------+
|review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+---------+-----------+-------------+-----------+----+-----------------+
+---------+-----------+-------------+-----------+----+-----------------+



In [11]:
# Create a new DataFrame that retrieves all the rows where a review was written as not part of the Vine program (unpaid), vine == 'N'
unpaid_helpful_df = most_helpful_votes.filter(most_helpful_votes.vine == 'N')
unpaid_helpful_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R2HQLKREFKG25D|          5|           21|         21|   N|                Y|
| RWIL1XU6YM0U4|          5|           20|         20|   N|                N|
|R28SB7Q1WBJRFM|          5|           25|         27|   N|                N|
|R3PVKAWMZW55U8|          5|          106|        119|   N|                Y|
|R21JS1HIQV1H7W|          3|           17|         22|   N|                Y|
|R245IE9DATP3ZL|          3|           15|         23|   N|                Y|
| RB1Y3RO4J4854|          3|           23|         30|   N|                Y|
|R23P75H9VX5RLM|          5|           32|         36|   N|                Y|
|R3JCH5P63TIQ4E|          5|           19|         20|   N|                N|
| RYA7IZ8VHN5P8|          2|           37|         42|   N|     

In [12]:
# Determine the total number of reviews
count_vine_df = vine_df.count()
print(f'Total number of reviews: {count_vine_df}')

Total number of reviews: 12518702


In [14]:
# Determine the total number of reviews
count_high_votes = high_total_votes_df.count()
print(f'Total number of reviews with high votes: {count_high_votes}')

Total number of reviews with high votes: 70710


In [15]:
# Determine total number of most helpful reviews (helpful_votes ratio equal to or greater than 50%.)
count_most_helpful = most_helpful_votes.count()
print(f'Total number of reviews that are most helpful: {count_most_helpful}')

Total number of reviews that are most helpful: 54079


In [16]:
# Determine the number of 5-star reviews
count_vine_df_5star = vine_df.filter("star_rating=='5'").count()
print(f'Total number of 5 Star reviews: {count_vine_df_5star}')

Total number of 5 Star reviews: 7678551


In [17]:
# Determine the number of 5-star reviews that are most helpful
count_most_helpful_5star = most_helpful_votes.filter("star_rating=='5'").count()
print(f'Total number of 5 Star most helpful reviews: {count_most_helpful_5star}')

Total number of 5 Star most helpful reviews: 20911


In [20]:
# Since there is no paid helpful reviews, we will determine if there are any paids reviews in all data
# Determine total number of paid reviews
paid_df = vine_df.filter(vine_df.vine == 'Y')
paid_df.show(10)

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R139V6UP5XQNJS|          3|            1|          1|   Y|                N|
|R3JJ8JM9S8GO0K|          4|            3|          6|   Y|                N|
| R8Z2F22OCOJ9B|          5|            8|          8|   Y|                Y|
|R3RSQ7WVO2ESQ1|          4|            3|          4|   Y|                N|
| R8B40D83FHHOV|          5|            3|          5|   Y|                N|
| RZ14CPL9FWVSB|          5|            0|          0|   Y|                N|
|R3G73NCIEFIBLV|          5|            0|          0|   Y|                N|
|R1TT7A3BB57KKO|          3|            0|          0|   Y|                N|
|R14LGT1WBV55G8|          5|            2|          2|   Y|                N|
|R1AOFVHMG68TQR|          3|            1|          1|   Y|     

In [28]:
count_paid = paid_df.count()
print(f'Total number of paid reviews: {count_paid}')

Total number of paid reviews: 32


In [29]:
# Determine total number of unpaid reviews
unpaid_df = vine_df.filter(vine_df.vine == 'N')
count_unpaid = unpaid_df.count()
print(f'Total number of unpaid reviews: {count_unpaid}')

Total number of unpaid reviews: 12518670


# **Determine the percentage of 5-star total reviews for the two types of review (paid vs unpaid)**

In [25]:
# Determine the number of paid 5-star reviews 
count_5star_paid = paid_df.filter("star_rating=='5'").count()
print(f'Total number of paid 5-star reviews: {count_5star_paid}')

Total number of paid 5-star reviews: 17


In [33]:
# Determine the percentage of paid 5-star reviews 
percent_5star_paid = (count_5star_paid / count_paid)*100
print(f'Percentage of 5-star reviews for paid reviews: {percent_5star_paid}')

Percentage of 5-star reviews for paid reviews: 53.125


In [26]:
# Determine the number of unpaid 5-star reviews 
count_5star_unpaid = unpaid_df.filter("star_rating=='5'").count()
print(f'Total number of unpaid 5-star reviews: {count_5star_unpaid}')

Total number of unpaid 5-star reviews: 7678534


In [34]:
# Determine the percentage of unpaid 5-star reviews 
percent_5star_unpaid = (count_5star_unpaid/count_unpaid)*100
print(f'Percentage of 5-star reviews for unpaid reviews: {percent_5star_unpaid}')

Percentage of 5-star reviews for unpaid reviews: 61.33665956527331


# **Determine the percentage of 5-star most helpful reviews for the two types of review (paid vs unpaid)**

In [39]:
# Filter paid reviews for helpful_votes ratio equal to or greater than 50%.
# Determine the total number of paid most helpful reviews
most_helpful_paid_df = most_helpful_votes.filter(vine_df.vine == 'Y')
count_most_helpful_paid = most_helpful_paid_df.count()
print(f'Total number of paid most helpful reviews: {count_most_helpful_paid}')

Total number of paid most helpful reviews: 0


In [40]:
# Determine the total number of paid 5-star most helpful reviews
count_5star_most_helpful_paid = most_helpful_paid_df.filter("star_rating=='5'").count()
print(f'Total number of paid 5-star most helpful reviews: {count_5star_most_helpful_paid}')

Total number of paid 5-star most helpful reviews: 0


In [42]:
# Determine the percentage of 5-star most helpful reviews for paid reviews
percent_5star_most_helpful_paid = (count_5star_most_helpful_paid / count_most_helpful_paid)*100
print(f'Percentage of 5-star most helpful reviews for paid reviews: {percent_5star_most_helpful_paid}')

ZeroDivisionError: ignored

In [43]:
# Filter unpaid reviews for helpful_votes ratio equal to or greater than 50%.
# Determine the total number of paid most helpful reviews
most_helpful_unpaid_df = most_helpful_votes.filter(vine_df.vine == 'N')
count_most_helpful_unpaid = most_helpful_unpaid_df.count()
print(f'Total number of unpaid most helpful reviews: {count_most_helpful_unpaid}')

Total number of unpaid most helpful reviews: 54079


In [44]:
# Determine the total number of unpaid 5-star most helpful reviews
count_5star_most_helpful_unpaid = most_helpful_unpaid_df.filter("star_rating=='5'").count()
print(f'Total number of unpaid 5-star most helpful reviews: {count_5star_most_helpful_unpaid}')

Total number of unpaid 5-star most helpful reviews: 20911


In [45]:
# Determine the percentage of 5-star most helpful reviews for unpaid reviews
percent_5star_most_helpful_unpaid = (count_5star_most_helpful_unpaid / count_most_helpful_unpaid)*100
print(f'Percentage of 5-star most helpful reviews for unpaid reviews: {percent_5star_most_helpful_unpaid}')

Percentage of 5-star most helpful reviews for unpaid reviews: 38.667504946467204
