In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 75.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=687441a8247e17138dc00f4da83d772ae17791778bfafa088596e155dd16e299
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [2]:
# download a Postgres driver that will allow Spark to interact with Postgres
!wget https://jdbc.postgresql.org/download/postgresql-42.2.17.jar

--2022-12-16 19:22:56--  https://jdbc.postgresql.org/download/postgresql-42.2.17.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1004734 (981K) [application/java-archive]
Saving to: ‘postgresql-42.2.17.jar’


2022-12-16 19:22:56 (5.18 MB/s) - ‘postgresql-42.2.17.jar’ saved [1004734/1004734]



In [3]:
# start a Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("VineReview").config("spark.driver.extraClassPath","/content/postgresql-42.2.17.jar").getOrCreate()

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Toys_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   18778586| RDIJS7QYB6XNR|B00EDBY7X8|     122952789|Monopoly Junior B...|            Toys|          5|            0|          0|   N|                Y|          Five Stars|        Excellent!!!|2015-08-31 00:00:00|
|         US|   24769659|R36ED1U38IELG8|B00D7JFOPC|     952062646|56 Pieces of Wood...| 

In [22]:
# Create the vine_table. DataFrame
vine_df = df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
vine_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| RDIJS7QYB6XNR|          5|            0|          0|   N|                Y|
|R36ED1U38IELG8|          5|            0|          0|   N|                Y|
| R1UE3RPRGCOLD|          2|            1|          1|   N|                Y|
|R298788GS6I901|          5|            0|          0|   N|                Y|
|  RNX4EXOBBPN5|          1|            1|          1|   N|                Y|
|R3BPETL222LMIM|          5|            0|          0|   N|                Y|
|R3SORMPJZO3F2J|          3|            2|          2|   N|                Y|
|R2RDOJQ0WBZCF6|          5|            0|          0|   N|                Y|
|R2B8VBEPB4YEZ7|          5|            0|          0|   N|                Y|
|R1CB783I7B0U52|          1|            0|          1|   N|     

In [23]:
# Retrieve all the rows where the total_votes count is equal to or greater than 20 
from pyspark.sql.functions import col
vine_votes_df= vine_df.filter(col("total_votes")>=20)
vine_votes_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| ROP6ITXO8K5V2|          5|           23|         27|   N|                Y|
|R3ND1LVU7AXCVF|          1|           21|         21|   N|                Y|
| R9I5FOLKU99RY|          5|           19|         20|   Y|                N|
|R1QS8AOD6HX3ED|          4|           59|         81|   N|                N|
|R3ED60RC69CCQ6|          5|           22|         23|   Y|                N|
|R2MDI1TP46VSYD|          3|            9|         21|   N|                Y|
|R2JM687C525WR9|          3|           33|         33|   Y|                N|
|R2LWX4TZ67FWPT|          4|           50|         50|   N|                N|
|R29IYHPYD14AGI|          3|           84|         84|   N|                Y|
|R1F2I723WRK5QV|          5|           20|         20|   N|     

In [25]:
# Retrieve all the rows where the number of helpful_votes divided by total_votes is equal to or greater than 50%.
votes_ratio_df= vine_df.filter((vine_df.helpful_votes/vine_df.total_votes) >=0.5)
votes_ratio_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R1UE3RPRGCOLD|          2|            1|          1|   N|                Y|
|  RNX4EXOBBPN5|          1|            1|          1|   N|                Y|
|R3SORMPJZO3F2J|          3|            2|          2|   N|                Y|
|R1T96CG98BBA15|          3|            2|          4|   N|                Y|
|R1YS3DS218NNMD|          5|            4|          4|   N|                N|
|R1H1HOVB44808I|          5|            1|          1|   N|                N|
| R4UVQIRZ5T1FM|          4|            1|          2|   N|                Y|
|R3031Q42BKAN7J|          4|            1|          1|   N|                N|
| R44NP0QG6E98W|          3|            1|          1|   N|                Y|
| RKLAK7EPEG5S6|          5|            1|          2|   N|     

In [26]:
# Retrieves all the rows where a review was written as part of the Vine program (paid)
paid_review_df= votes_ratio_df.filter(votes_ratio_df.vine == "Y")
paid_review_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R9I5FOLKU99RY|          5|           19|         20|   Y|                N|
|R3ANN81Q02N3IU|          4|            2|          4|   Y|                N|
|R3ED60RC69CCQ6|          5|           22|         23|   Y|                N|
| R3V3AFWPY369L|          4|            1|          1|   Y|                N|
|R2MLZWOSNHXEL6|          3|            2|          3|   Y|                N|
|R2JM687C525WR9|          3|           33|         33|   Y|                N|
|R1WJYFHPPCZ4DE|          5|            4|          5|   Y|                N|
|R3EUSL1NBNEW08|          5|            2|          2|   Y|                N|
| RVXCX0CYLJXOL|          4|            1|          2|   Y|                N|
|R2NMZC8QAIUC1Y|          4|            2|          3|   Y|     

In [27]:
# Retrieves all the rows where a review was not written as part of the Vine program (unpaid)
unpaid_review_df= votes_ratio_df.filter(votes_ratio_df.vine == "N")
unpaid_review_df.show()

+--------------+-----------+-------------+-----------+----+-----------------+
|     review_id|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+--------------+-----------+-------------+-----------+----+-----------------+
| R1UE3RPRGCOLD|          2|            1|          1|   N|                Y|
|  RNX4EXOBBPN5|          1|            1|          1|   N|                Y|
|R3SORMPJZO3F2J|          3|            2|          2|   N|                Y|
|R1T96CG98BBA15|          3|            2|          4|   N|                Y|
|R1YS3DS218NNMD|          5|            4|          4|   N|                N|
|R1H1HOVB44808I|          5|            1|          1|   N|                N|
| R4UVQIRZ5T1FM|          4|            1|          2|   N|                Y|
|R3031Q42BKAN7J|          4|            1|          1|   N|                N|
| R44NP0QG6E98W|          3|            1|          1|   N|                Y|
| RKLAK7EPEG5S6|          5|            1|          2|   N|     

In [28]:

# Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for paid program
from pyspark.sql.functions import count

# Total nb of reviews 
nb_paid_reviews= paid_review_df.count()
print("The total number of reviews for paid program is:")  
print(nb_paid_reviews)


The total number of reviews for paid program is:
16358


In [29]:
# Nnumber of 5 stars reviews for paid program
nb_5star_paid_reviews = paid_review_df.filter(paid_review_df.star_rating == 5).count()
print("The total number of 5-star reviews for paid program is:")  
print(nb_5star_paid_reviews)


The total number of 5-star reviews for paid program is:
6432


In [32]:
# Percentage of 5-star reviews for paid program
perc_5star_paid = float(nb_5star_paid_reviews)/float(nb_paid_reviews) * 100
print("The percentage of 5-star reviews for paid program is:")  
print(perc_5star_paid)

The percentage of 5-star reviews for paid program is:
39.32021029465705


In [33]:
# Determine the total number of reviews, the number of 5-star reviews, and the percentage of 5-star reviews for unpaid program
# Total nb of reviews 
nb_unpaid_reviews= unpaid_review_df.count()
print("The total number of reviews for unpaid program is:")  
print(nb_unpaid_reviews)
     

The total number of reviews for unpaid program is:
1429109


In [34]:
# Nnumber of 5 stars reviews for unpaid program 
nb_5star_unpaid_reviews = unpaid_review_df.filter(unpaid_review_df.star_rating == 5).count()
print("The total number of 5-star reviews for unpaid program is:")  
print(nb_5star_unpaid_reviews)

The total number of 5-star reviews for unpaid program is:
801043


In [35]:

# Percentage of 5-star reviews for unpaid program
perc_5star_unpaid = float(nb_5star_unpaid_reviews)/float(nb_unpaid_reviews) * 100
print("The percentage of 5-star reviews for unpaid program is:")  
print(perc_5star_unpaid)

The percentage of 5-star reviews for unpaid program is:
56.051917663383264
