In [1]:
import os
# Find the latest version of spark 3.2 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.3'
spark_version = 'spark-3.3.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu focal-cran40/ InRelease [3,622 B]
0% [Waiting for headers] [Connecting to security.ubuntu.com] [1 InRelease 0 B/30% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [                                                                               Hit:2 http://archive.ubuntu.com/ubuntu focal InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [                                                                               Get:3 http://archive.ubuntu.com/ubuntu focal-updates InRelease [114 kB]
0% [3 InRelease 14.2 kB/114 kB 12%] [Connecting to security.ubuntu.com (185.125                                                                               Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu2004/x86_64  InRelease
0% [3 InRelease 15.6 kB/114 kB 14%] [Connecting to security.ubuntu.com (185.125             

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2023-02-05 11:58:18--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2023-02-05 11:58:18 (11.0 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M17-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_PC_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
# df.show()

### Create DataFrames to match tables

In [5]:
from pyspark.sql.functions import to_date, count
# Clean the data
# print(df.count())
df = df.dropna()
# print(df.count())
df = df.dropDuplicates()
# print(df.count())

In [6]:
# print schema
# df.printSchema()

In [7]:
# Step 1 : Create a new DataFrame to retieve all the rows where the total_votes count is equal to or greater than 20

review_df = df.filter("total_votes >=20")

# review_df.show(20)

In [8]:
# Step 2 : Filter the new DataFram in Step 1 and create a new DataFrame to retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%

review_df_2 = review_df.filter("helpful_votes / total_votes >= 0.5")

# review_df_2.show(20)

In [9]:
# Check review_df and review_df_2 
# print(f'The number of rows where the total_votes count is equal to or greater than 20: ',review_df.count())
# print(f'The number of rows where helpful_votes divided by total_votes is equal to or greater than 50%: ', review_df_2.count())

In [10]:
# Step 3 : Filter the DataFrame in Step 2 and create a new DataFrame to retrieve all the rows where a review was written as part of the Vine program (paid), vine =='Y'

vine_Y_df = review_df_2.filter("vine == 'Y'")

# vine_Y_df.show(20)


In [11]:
# Step 4 : Filter the DataFrame in Step 2 and create a new DataFrame to retrieve all the rows where the review was not part of the Vine program (unpaid), vine =='N'

vine_N_df = review_df_2.filter("vine == 'N'")

# vine_N_df.show(20)

In [12]:
# Step  5 : Determine the total number of reviews, the number of 5-star reviews, 
# and the percentage of 5-star reviews for the two types of review (paid vs unpaid)

# Paid Reviews

total_vine_Y = vine_Y_df.count()
five_star_vine_Y = vine_Y_df.filter("star_rating ==5").count()
five_star_percentage_vine_Y = round(five_star_vine_Y / total_vine_Y,2)

# Unpaid Reviews

total_vine_N = vine_N_df.count()
five_star_vine_N = vine_N_df.filter("star_rating ==5").count()
five_star_percentage_vine_N = round(five_star_vine_N / total_vine_N,2)

In [13]:
# Total number of paid / unpaid reviews
print(f'The total number of paid reviews: ', total_vine_Y)
print(f'The total number of unpaid reviews: ', total_vine_N)

The total number of paid reviews:  1775
The total number of unpaid reviews:  77370


In [14]:
# Number of 5-star paid / unpaid reviews
print(f'The number of 5-star paid reviews: ', five_star_vine_Y)
print(f'The number of 5-star unpaid reviews: ', five_star_vine_N)

The number of 5-star paid reviews:  783
The number of 5-star unpaid reviews:  35944


In [15]:
# Percentage of 5-star unpaid reviews
print(f'The percentage of 5-star paid reviews: ', five_star_percentage_vine_Y)
print(f'The percentage of 5-star unpaid reviews: ', five_star_percentage_vine_N)

The percentage of 5-star paid reviews:  0.44
The percentage of 5-star unpaid reviews:  0.46


In [16]:
# Step  6 : Additional Analysis
# Determine the total number of reviews, the number of 4-star reviews, 
# and the percentage of 4-star reviews for the two types of review (paid vs unpaid)

# Paid Reviews

four_star_vine_Y = vine_Y_df.filter("star_rating ==4").count()
four_star_percentage_vine_Y = round(four_star_vine_Y / total_vine_Y,2)

# Unpaid Reviews

four_star_vine_N = vine_N_df.filter("star_rating ==4").count()
four_star_percentage_vine_N = round(four_star_vine_N / total_vine_N,2)

In [17]:
# Number of 4-star paid / unpaid reviews
print(f'The number of 4-star paid reviews: ', four_star_vine_Y)
print(f'The number of 4-star unpaid reviews: ', four_star_vine_N)

The number of 4-star paid reviews:  602
The number of 4-star unpaid reviews:  12424


In [18]:
# Percentage of 4-star unpaid reviews
print(f'The percentage of 4-star paid reviews: ', four_star_percentage_vine_Y)
print(f'The percentage of 4-star unpaid reviews: ', four_star_percentage_vine_N)

The percentage of 4-star paid reviews:  0.34
The percentage of 4-star unpaid reviews:  0.16
