In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.3.0'
spark_version = 'spark-3.3.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16Deliverable2").getOrCreate()

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Musical_Instruments_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get("amazon_reviews_us_Musical_Instruments_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

In [None]:
# Create the vine_table DataFrame
vine_df = df.select(['review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase'])
vine_df.show()

In [None]:
# 1. Filter data and create new DataFrame to retrieve all rows where the total_votes count is equal to or greater than 20
filtered_df = vine_df.filter('total_votes>=20')
filtered_df.show()

In [None]:
# 2. Filter the new DataFrame created in Step 1 and create a new DataFrame to retrieve all the rows where the number of helpful_votes
# divided by total_votes is equal to or greater than 50%
helpful_votes_df = filtered_df.filter(df['helpful_votes']/df['total_votes'] >= 0.5)
helpful_votes_df.show()

In [None]:
# Confirm the above table by adding a "helpful_votes/total_votes" column
helpful_votes_df2 = vine_df.withColumn('helpful_votes/total_votes', vine_df['helpful_votes']/vine_df['total_votes'] >= 0.5)
helpful_votes_df2 = helpful_votes_df2.filter(helpful_votes_df2['helpful_votes']/helpful_votes_df2['total_votes'] >= 0.5)
helpful_votes_df2.show()

In [None]:
# 3. Filter the DataFrame created in Step 2 and create a new DataFrame that retrieves all the rows where a review was written as part of
# the Vine program (paid), vine == 'Y'
vine_paid_df = helpful_votes_df.filter(helpful_votes_df['vine'] == 'Y')
vine_paid_df.show()

In [None]:
# Repeat Step 3, but this time retrieve all the rows where the review was not part of the Vine program (unpaid), vine == 'N'
vine_unpaid_df = helpful_votes_df.filter(helpful_votes_df['vine'] == 'N')
vine_unpaid_df.show()

In [None]:
# Step 5: Total Number of Reviews
# Get row count
rows = helpful_votes_df.count()
print(f"DataFrame Rows count : {rows}")

In [None]:
# Number of 5-star reviews
five_star_reviews = helpful_votes_df.filter(helpful_votes_df['star_rating'] == 5).count()
print(f"There are {five_star_reviews} 5-star reviews.")

In [None]:
# Number of paid and unpaid (overall)
paid = vine_paid_df.count()
unpaid = vine_unpaid_df.count()
print(f"There are {paid} paid reviews and {unpaid} unpaid reviews.")

In [None]:
# Number of paid and unpaid (five star reviews)
five_star_paid = vine_paid_df.filter(vine_paid_df['star_rating'] == 5).count()
five_star_unpaid = vine_unpaid_df.filter(vine_unpaid_df['star_rating'] == 5).count()
print(f"There are {five_star_paid} paid 5-star reviews and {five_star_unpaid} unpaid 5-star reviews.")

In [None]:
# Determine the percentage of 5-star reviews for the two types of review (paid vs unpaid)
five_star_paid_percent = five_star_paid/five_star_reviews * 100
five_star_unpaid_percent = five_star_unpaid/five_star_reviews * 100
print(f"{five_star_paid_percent}% of 5 star reviews were paid and {five_star_unpaid_percent}% were unpaid.")