In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [1 InRelease 14.2 kB/88.7                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/u

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Vine-Review-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Pet_Products_v1_00.tsv.gz"
spark.sparkContext.addFile(url)


### Create vine table

In [5]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)

In [92]:
# Create the vine_table. DataFrame
vine_df = df.select(['review_id', 'star_rating', 'helpful_votes', 'total_votes', 'vine', 'verified_purchase'])

### Do whatever is next 

In [93]:
#1. Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20
votes_greater_twenty = vine_df.filter('total_votes >= 20')

In [94]:
#2. Filter the new DataFrame or table created in Step 1 and create a new DataFrame or table to retrieve all the rows 
# where the number of helpful_votes divided by total_votes is equal to or greater than 50%.
import pyspark.sql.functions as F
percent_helpful_over_fifty = votes_greater_twenty.withColumn('helpful_vote_percentage', 
                                                  F.round((votes_greater_twenty.helpful_votes/votes_greater_twenty.total_votes)*100))
percent_helpful_over_fifty.filter('helpful_vote_percentage >= 50')

DataFrame[review_id: string, star_rating: int, helpful_votes: int, total_votes: int, vine: string, verified_purchase: string, helpful_vote_percentage: double]

In [95]:
# 3. Filter the DataFrame or table created in Step 2, and create a new DataFrame or table that retrieves all the rows where 
# a review was written as part of the Vine program (paid), vine == 'Y'.
vine_reviews_yes = percent_helpful_over_fifty.filter("vine == 'Y'")

In [96]:
# 4. Repeat Step 3, but this time retrieve all the rows where the review was not part of the Vine program (unpaid), vine == 'N'.
vine_reviews_no = percent_helpful_over_fifty.filter("vine == 'N'")

In [97]:
# 5. Determine the total number of reviews for all Vine and non-Vine reviews
total_reviews_vine = vine_reviews_yes.count()
total_reviews_not_vine = vine_reviews_no.count()


In [98]:
# Determine the number of 5-star reviews for all Vine and non-Vine reviews 
count_five_star_vine = vine_reviews_yes.filter('star_rating == 5').count()
count_five_star_not_vine = vine_reviews_no.filter('star_rating == 5').count()

In [99]:
# Determine the percentage of 5-star reviews for all Vine and non-Vine reviews.
percent_five_star_vine = round((count_five_star_vine/total_reviews_vine)*100)
percent_five_star_not_vine = round((count_five_star_not_vine/total_reviews_not_vine)*100)

In [101]:
# Create a dataframe to see results
Vine_table = spark.createDataFrame(
    [(total_reviews_vine, count_five_star_vine, percent_five_star_vine)],
    ["Total Vine Reviews", "5-Star Reviews", "% of 5-Star Reviews"]
      )
Non_vine_table = spark.createDataFrame(
    [(total_reviews_not_vine, count_five_star_not_vine, percent_five_star_not_vine)],
    ["Total Non-Vine Reviews", "5-Star Reviews (non-vine)", "% of 5-Star Reviews (non-vine)"]
      )
Vine_table.show()
Non_vine_table.show()

+------------------+--------------+-------------------+
|Total Vine Reviews|5-Star Reviews|% of 5-Star Reviews|
+------------------+--------------+-------------------+
|               170|            65|                 38|
+------------------+--------------+-------------------+

+----------------------+-------------------------+------------------------------+
|Total Non-Vine Reviews|5-Star Reviews (non-vine)|% of 5-Star Reviews (non-vine)|
+----------------------+-------------------------+------------------------------+
|                 37840|                    20612|                            54|
+----------------------+-------------------------+------------------------------+

