In [None]:
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.2.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

# **Load Reviews**

In [None]:
from pyspark import SparkFiles
# Load file

url = 'https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz'
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv.gz"), sep="\t", header=True)
df.show()

In [None]:
from pyspark import SparkFiles
# Load file

url = 'https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Software_v1_00.tsv.gz'
spark.sparkContext.addFile(url)
Softwaredf = spark.read.csv(SparkFiles.get("amazon_reviews_us_Software_v1_00.tsv.gz"), sep="\t", header=True)
Softwaredf.show()

# **Drop Unused Data**

In [None]:
df = df.dropna()
df = df.dropDuplicates()
df.count()

In [None]:
Softwaredf = Softwaredf.dropna()
Softwaredf = Softwaredf.dropDuplicates()
Softwaredf.count()

# **Filter data**

In [None]:
paidDF = df.filter(df['vine'] == 'Y')
unpaidDF = df.filter(df['vine'] == 'N')
print(paidDF.count())
print(unpaidDF.count())

In [None]:
SoftwarepaidDF = Softwaredf.filter(Softwaredf['vine'] == 'Y')
SoftwareunpaidDF = Softwaredf.filter(Softwaredf['vine'] == 'N')
print(SoftwarepaidDF.count())
print(SoftwareunpaidDF.count())

# **Find and Print Results**

In [None]:
# Vine Program Stats
paid5star = paidDF[paidDF['star_rating']==5].count()
PaidPercentage = (paid5star/(paidDF.count()))*100

# Normal Reviews Program Stats
unpaid5star = unpaidDF[unpaidDF['star_rating']==5].count()
UnpaidPercentage = (unpaid5star/(unpaidDF.count()))*100

In [None]:
# Vine Program Stats
Softwarepaid5star = SoftwarepaidDF[SoftwarepaidDF['star_rating']==5].count()
SoftwarePaidPercentage = (Softwarepaid5star/(SoftwarepaidDF.count()))*100

# Normal Reviews Program Stats
Softwareunpaid5star = SoftwareunpaidDF[SoftwareunpaidDF['star_rating']==5].count()
SoftwareUnpaidPercentage = (Softwareunpaid5star/(SoftwareunpaidDF.count()))*100

In [None]:
print(f'Watches Vine Program \nNumber of 5 Star Reviews: {paid5star} \n\
Number of Reviews: {paidDF.count()} \n\
Percentage 5 Star Reviews: {PaidPercentage:.2f}%')

In [None]:
print(f'Watches Normal Reviews \nNumber of 5 Star Reviews: {unpaid5star} \n\
Number of Reviews: {unpaidDF.count()} \n\
Percentage 5 Star Reviews: {UnpaidPercentage:.2f}%')

In [None]:
print(f'Software Vine Program \nNumber of 5 Star Reviews: {Softwarepaid5star} \n\
Number of Reviews: {SoftwarepaidDF.count()} \n\
Percentage 5 Star Reviews: {SoftwarePaidPercentage:.2f}%')

In [None]:
print(f'Software Normal Reviews \nNumber of 5 Star Reviews: {Softwareunpaid5star} \n\
Number of Reviews: {SoftwareunpaidDF.count()} \n\
Percentage 5 Star Reviews: {SoftwareUnpaidPercentage:.2f}%')