In [None]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

### Create DataFrames to match tables

In [None]:
from pyspark.sql.functions import to_date

# Read in the Review dataset as a DataFrame
review_df = df
review_df.show()

In [None]:
# Create the customers_table DataFrame
customers_df = df.groupby("customer_id").agg({"customer_id": "count"}).withColumnRenamed("count(customer_id", "customer_count")
customers_df.show()

In [None]:
# Create the products_table DataFrame and drop duplicates. 
products_df = df.select(["product_id", "product_title"]).drop_duplicates()
products_df.show()

In [None]:
# Create the review_id_table DataFrame. 
# Convert the 'review_date' column to a date datatype with to_date("review_date", 'yyyy-MM-dd').alias("review_date")
review_id_df = df.select(
    ["review_id",
    "customer_id",
    "product_id",
    "product_parent",
    to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
review_id_df.show()

In [None]:
# Create the vine_table. DataFrame
vine_df = review_df.select(
    ["review_id",
    "star_rating",
    "helpful_votes",
    "total_votes",
    "vine",
    "verified_purchase"])
vine_df.show()

### Determine the Bias of Reviews

In [None]:
# Collect all the rows where total_votes is >= 20
twenty_plus_df = vine_df.filter(vine_df.total_votes >= 20)
twenty_plus_df.show()

In [None]:
# Collect all the rows where the # of helpful votes / total_votes >= 50%
helpful_votes_df = vine_df.filter(
    (twenty_plus_df.helpful_votes)/(twenty_plus_df.total_votes) >= .50)

helpful_votes_df.show()

In [None]:
# Collect all the rows where a review was part of the Vine Program (vine == 'Y')
paid_review_df = helpful_votes_df.filter(helpful_votes_df.vine == 'Y')
paid_review_df.show()

In [None]:
# Collect all the rows where a review was NOT part of the Vine Program (vine == 'N')
unpaid_review_df = helpful_votes_df.filter(helpful_votes_df.vine == 'N')
unpaid_review_df.show()

In [None]:
# Determine:
    # The total # of reviews
review_paid = paid_review_df.count()

    # # # of 5 star reviews
review_unpaid = unpaid_review_df.count()

paid_5star = paid_review_df.filter(paid_review_df.star_rating == '5').count()
unpaid_5star = unpaid_review_df.filter(unpaid_review_df.star_rating == '5').count()

    # % of 5 star reviews for (vine == 'Y')
review_5star_paid = (paid_5star/review_paid)

    # % of 5 star reviews for (vine == 'N')
review_5star_unpaid = (unpaid_5star/review_unpaid)

In [None]:
print(
    f' Total Number of reviews through the Vine Program: {review_paid}\n',
    f'Total Number of reviews outside the Vine Program: {review_unpaid}\n',
    f"-------------------------\n"
    f' Number of 5 Star Reviews through the Vine Program: {paid_5star}\n',
    f'Number of 5 Star Reviews outside the Vine Program: {unpaid_5star}\n',
    f"-------------------------\n"
    f' Percent of 5 Star Reviews through the Vine Program: {review_5star_paid:1f}\n',
    f'Percent of 5 Star Reviews through the Vine Program: {review_5star_unpaid:.1f}%')