In [51]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.18                                                                               Hit:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
0% [Connecting to archive.ubuntu.com] [Connecting to security.ubuntu.com (91.180% [Release.gpg gpgv 696 B] [Connecting to archive.ubuntu.com] [Connecting to s                                                                               Hit:4 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Hit:5 https://developer.download.nvidia.com/comp

In [52]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-03-07 23:09:16--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.1’


2022-03-07 23:09:16 (5.96 MB/s) - ‘postgresql-42.2.16.jar.1’ saved [1002883/1002883]



In [53]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Vine_Review-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

In [54]:
# I'm using the pyspark to gain more experience instead of Panda or sql
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Jewelry_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   50423057|R135Q3VZ4DQN5N|B00JWXFDMG|     657335467|Everbling Purple ...|         Jewelry|          5|            0|          0|   N|                Y|           Beauties!|so beautiful even...| 2015-08-31|
|         US|   11262325|R2N0QQ6R4T7YRY|B00W5T1H9W|      26030170|925 Sterling Silv...|         Jewelry|          5|    

In [55]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: string (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [56]:
#remove null data
print(df.count())
df =df.dropna();
print(df.count())

1767753


In [57]:
from typing import cast
# Create the vine_table. DataFrame
from pyspark.sql.types import IntegerType
vine_df = df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
#convert to integer type bec star_rating was a text in df but int in pgAdmin table definition
vine_df = vine_df.withColumn("star_rating",vine_df["star_rating"].cast(IntegerType()))
vine_df.show(truncate=False)

+--------------+-----------+-------------+-----------+----+-----------------+
|review_id     |star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R135Q3VZ4DQN5N|5          |0            |0          |N   |Y                |
|R2N0QQ6R4T7YRY|5          |0            |0          |N   |N                |
|R3N5JE5Y4T6W5M|5          |0            |0          |N   |Y                |
|R2I150CX5IVY9Q|5          |0            |0          |N   |Y                |
|R1RM9ICOOA9MQ3|5          |0            |0          |N   |Y                |
|R2J2KMDL10UMSH|5          |0            |0          |N   |Y                |
|R3R9ZUFA4TB4FQ|5          |0            |0          |N   |Y                |
|R3UQ8VAQN7R6WL|5          |0            |0          |N   |Y                |
|R1FXZ69C01JNQM|5          |0            |0          |N   |Y                |
|RY36LB4OW0FFS |5          |0            |0          |N   |Y    

In [58]:
vine_df.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)



In [59]:
#1. Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count 
# is equal to or greater than 20 to pick reviews that are more likely to be helpful and 
#to avoid having division by zero errors later on
filtered_df = vine_df.filter('total_votes>=20')
filtered_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
|R16YC6SMGKA8SR|          5|           23|         23|   N|                Y|
| R5O4WE9UM60B0|          1|           29|         29|   N|                Y|
|R2EKRVCRC7U0IY|          1|           22|         24|   N|                Y|
|R2OKV47GETH0L7|          5|           24|         24|   N|                Y|
| R36LII9IITE17|          1|           22|         24|   N|                Y|
|R35VKNE16PFY0H|          5|           22|         24|   N|                N|
|R3TG008LHHZZIN|          5|          123|        125|   N|                Y|
|R2FYQBKCC1XG4M|          5|           20|         20|   N|                Y|
| R63PV336NI5X7|          1|           27|         28|   N|                Y|
|R1R42WPHB5ZSWI|          5|           51|         52|   N|     

In [60]:
#2.Filter the new DataFrame or table created in Step 1 and create a new DataFrame or table to retrieve 
#all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%
helpful_votes_df = filtered_df.withColumn('greater_than_50_votes', filtered_df['helpful_votes'] / filtered_df['total_votes']).filter('greater_than_50_votes >= 0.5')
helpful_votes_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|greater_than_50_votes|
+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|R16YC6SMGKA8SR|          5|           23|         23|   N|                Y|                  1.0|
| R5O4WE9UM60B0|          1|           29|         29|   N|                Y|                  1.0|
|R2EKRVCRC7U0IY|          1|           22|         24|   N|                Y|   0.9166666666666666|
|R2OKV47GETH0L7|          5|           24|         24|   N|                Y|                  1.0|
| R36LII9IITE17|          1|           22|         24|   N|                Y|   0.9166666666666666|
|R35VKNE16PFY0H|          5|           22|         24|   N|                N|   0.9166666666666666|
|R3TG008LHHZZIN|          5|          123|        125|   N|                Y|                0.984|


In [61]:
#3.Filter the DataFrame or table created in Step 2, and create a new DataFrame or table that retrieves 
#all the rows where a review was written as part of the Vine program (paid), vine == 'Y'
paid_vine_df = helpful_votes_df.filter(helpful_votes_df['vine'] == 'Y')
paid_vine_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|greater_than_50_votes|
+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|R15PXA9XU9ZKSU|          4|           29|         29|   Y|                N|                  1.0|
|R2LJ4QYQ4PYDEH|          5|           24|         26|   Y|                N|   0.9230769230769231|
|R2PJWWQZ37WCIC|          5|           95|         96|   Y|                N|   0.9895833333333334|
|R33G2VDKRN2USY|          5|           21|         21|   Y|                N|                  1.0|
|R13R4X37T2U50A|          4|           52|         57|   Y|                N|   0.9122807017543859|
|R1X1J2NAQ39DF6|          4|           29|         34|   Y|                N|   0.8529411764705882|
|R2UV5XHFNMPXOX|          5|           60|         61|   Y|                N|   0.9836065573770492|


In [62]:
#4. Repeat Step 3, but this time retrieve all the rows where the review was not part of the Vine program (unpaid), vine == 'N'
unpaid_vine_df = helpful_votes_df.filter(helpful_votes_df['vine'] == 'N')
unpaid_vine_df.show()


+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|greater_than_50_votes|
+--------------+-----------+-------------+-----------+----+-----------------+---------------------+
|R16YC6SMGKA8SR|          5|           23|         23|   N|                Y|                  1.0|
| R5O4WE9UM60B0|          1|           29|         29|   N|                Y|                  1.0|
|R2EKRVCRC7U0IY|          1|           22|         24|   N|                Y|   0.9166666666666666|
|R2OKV47GETH0L7|          5|           24|         24|   N|                Y|                  1.0|
| R36LII9IITE17|          1|           22|         24|   N|                Y|   0.9166666666666666|
|R35VKNE16PFY0H|          5|           22|         24|   N|                N|   0.9166666666666666|
|R3TG008LHHZZIN|          5|          123|        125|   N|                Y|                0.984|


In [63]:
#5. Determine the total number of reviews, the number of 5-star reviews, and the percentage 
# of 5-star reviews for the two types of review (paid vs unpaid)
# paid total
paid_total_num_of_review = paid_vine_df.agg({'review_id':'count'}).withColumnRenamed("count(review_id)", "Paid_Total_NumOf_Vine_Reviews")
paid_total_num_of_review.show()


+-----------------------------+
|Paid_Total_NumOf_Vine_Reviews|
+-----------------------------+
|                           21|
+-----------------------------+



In [64]:
#continue 5... number of 5 star reviews - paid
paid_numof_5star_reviews = paid_vine_df.filter('star_rating == 5').agg({'star_rating':'count'}).withColumnRenamed("count(star_rating)", "Paid_5star_NumOf_Vine_Reviews")
paid_numof_5star_reviews.show()

+-----------------------------+
|Paid_5star_NumOf_Vine_Reviews|
+-----------------------------+
|                           11|
+-----------------------------+



In [69]:
#continue 5 ... the percentage of 5-star reviews - paid
paid_vine_reviews_percentage = paid_numof_5star_reviews.collect()[0]["Paid_5star_NumOf_Vine_Reviews"] / paid_total_num_of_review.collect()[0]["Paid_Total_NumOf_Vine_Reviews"] * 100
round(paid_vine_reviews_percentage,3)



52.381

In [66]:
# continue 5... unpaid total
unpaid_total_num_of_review = unpaid_vine_df.agg({'review_id':'count'}).withColumnRenamed("count(review_id)", "Unpaid_Total_NumOf_Vine_Reviews")
unpaid_total_num_of_review.show()


+-------------------------------+
|Unpaid_Total_NumOf_Vine_Reviews|
+-------------------------------+
|                           7689|
+-------------------------------+



In [67]:
#continue 5... number of 5 star reviews - unpaid
unpaid_numof_5star_reviews = unpaid_vine_df.filter('star_rating == 5').agg({'star_rating':'count'}).withColumnRenamed("count(star_rating)", "Unpaid_5star_NumOf_Vine_Reviews")
unpaid_numof_5star_reviews.show()

+-------------------------------+
|Unpaid_5star_NumOf_Vine_Reviews|
+-------------------------------+
|                           4444|
+-------------------------------+



In [70]:
#continue 5... the percentage of 5-star reviews - unpaid
unpaid_vine_reviews_percentage = unpaid_numof_5star_reviews.collect()[0]["Unpaid_5star_NumOf_Vine_Reviews"] / unpaid_total_num_of_review.collect()[0]["Unpaid_Total_NumOf_Vine_Reviews"] * 100
round(unpaid_vine_reviews_percentage,3)

57.797