In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:3 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://security.ubuntu.com/ubuntu bionic-security/universe amd64 Packages [1,431 kB]
Get:13 http:

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2021-10-12 02:43:51--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2021-10-12 02:43:51 (6.52 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [4]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Toys_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.option("encoding", "UTF-8").csv(SparkFiles.get(""), sep="\t", header=True, inferSchema=True)
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   18778586| RDIJS7QYB6XNR|B00EDBY7X8|     122952789|Monopoly Junior B...|            Toys|          5|            0|          0|   N|                Y|          Five Stars|        Excellent!!!| 2015-08-31|
|         US|   24769659|R36ED1U38IELG8|B00D7JFOPC|     952062646|56 Pieces of Wood...|            Toys|          5|    

### Create DataFrames to match tables

In [5]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   18778586| RDIJS7QYB6XNR|B00EDBY7X8|     122952789|Monopoly Junior B...|            Toys|          5|            0|          0|   N|                Y|          Five Stars|        Excellent!!!| 2015-08-31|
|         US|   24769659|R36ED1U38IELG8|B00D7JFOPC|     952062646|56 Pieces of Wood...|            Toys|          5|    

In [6]:
# Create the customers_table DataFrame
# customers_df = df.groupby("").agg({""}).withColumnRenamed("", "customer_count")

from pyspark.sql.functions import count    ## need to import this before using agg function 
customers_df =  df.groupby("customer_id").agg(count("customer_id")).withColumnRenamed("count(customer_id)", "customer_count")

In [7]:
customers_df.count()   # check the row count    

2561107

In [8]:
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   16989307|             1|
|   45632184|             2|
|   14703850|            13|
|   49645387|             2|
|   16343477|             1|
|   15554899|             1|
|   17067926|             1|
|   50843047|             2|
|    4051424|             1|
|   11487525|             1|
|   19371753|             1|
|   18634862|             1|
|   14552054|             1|
|   52695798|             1|
|   49438424|             3|
|   10854449|             9|
|   48521319|             1|
|   11839424|             2|
|   27887950|             1|
|   45392827|             3|
+-----------+--------------+
only showing top 20 rows



In [9]:
# Create the products_table DataFrame and drop duplicates. 
products_df = df.select(["product_id","product_title"]).drop_duplicates()   ## with this we get 664063

In [10]:
# Create the products_table DataFrame and drop duplicates  of product_id  rows as table has product_id is primary key and unique
products_df = df.select(["product_id","product_title"]).drop_duplicates(['product_id'])

In [11]:
products_df.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|0545449359|Klutz Fashion For...|
|0615394124|     füdoo Board Kit|
|0615881386|The Revised Crisw...|
|0735328757|Mudpuppy Butterfl...|
|0764955063|The Square of St....|
|0890100039|Kalmbach Building...|
|0954840526|World Prehistoric...|
|0966257545|Ladybug Game NEW ...|
|0972428232| Conversations to Go|
|0980345553|Art of Conversati...|
|0982757751|World of Harmony ...|
|0988179024|All Quiet on the ...|
|1409582485|Listen and Learn ...|
|1554841615|     Cat Smarts Game|
|1556345542|Munchkin 2 - Unna...|
|1567677037|Hot Dots Let’s Le...|
|1572158700|Cat in the Hat Gi...|
|1579823645|MerryMakers Pete ...|
|1585640514|Star Fleet Battle...|
|1589941411|     War of the Ring|
+----------+--------------------+
only showing top 20 rows



In [12]:
products_df.count()

664062

In [None]:
#  tried thi way 
## products_df = products_df.drop_duplicates(subset ='product_id',keep ="first")
#  df.dropDuplicates(['name', 'height']).show()

clean_products_df= products_df.dropDuplicates(['product_id']).show()

In [13]:
# Create the review_id_table DataFrame. 
# Convert the 'review_date' column to a date datatype with to_date("review_date", 'yyyy-MM-dd').alias("review_date")
# review_id_df = df.select([, to_date("review_date", 'yyyy-MM-dd').alias("review_date")])

review_id_df = df.select(['review_id', 'customer_id', 'product_id', 'product_parent', to_date("review_date", 'yyyy-MM-dd').alias("review_date")]) 


In [14]:
review_id_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
| RDIJS7QYB6XNR|   18778586|B00EDBY7X8|     122952789| 2015-08-31|
|R36ED1U38IELG8|   24769659|B00D7JFOPC|     952062646| 2015-08-31|
| R1UE3RPRGCOLD|   44331596|B002LHA74O|     818126353| 2015-08-31|
|R298788GS6I901|   23310293|B00ARPLCGY|     261944918| 2015-08-31|
|  RNX4EXOBBPN5|   38745832|B00UZOPOFW|     717410439| 2015-08-31|
|R3BPETL222LMIM|   13394189|B009B7F6CA|     873028700| 2015-08-31|
|R3SORMPJZO3F2J|    2749569|B0101EHRSM|     723424342| 2015-08-31|
|R2RDOJQ0WBZCF6|   41137196|B00407S11Y|     383363775| 2015-08-31|
|R2B8VBEPB4YEZ7|     433677|B00FGPU7U2|     780517568| 2015-08-31|
|R1CB783I7B0U52|    1297934|B0013OY0S0|     269360126| 2015-08-31|
| R2D90RQQ3V8LH|   52006292|B00519PJTW|     493486387| 2015-08-31|
|R1Y4ZOUGFMJ327|   32071052|B001TCY2DO|     459122467| 2015-08

In [28]:
review_id_df = df.select(['review_id', 'customer_id', 'product_id', 'product_parent', to_date("review_date", 'yyyy-MM-dd').alias("review_date")]).drop_duplicates(['review_id'])

In [29]:
 review_id_df.count()

4864249

In [15]:
# Create the vine_table. DataFrame
vine_df = df.select(['review_id', 'star_rating','helpful_votes','total_votes','vine','verified_purchase'])

In [16]:
#   Drop duplicate entries of review_id = from vine_df 
vine_df =   df.select(["review_id","star_rating","helpful_votes","vine","verified_purchase" ]).drop_duplicates(['review_id'])

In [17]:
vine_df.show()

+--------------+-----------+-------------+----+-----------------+
|     review_id|star_rating|helpful_votes|vine|verified_purchase|
+--------------+-----------+-------------+----+-----------------+
|R1001U2LSX6SNS|          5|            0|   N|                Y|
|R1001UXWB3TQFD|          5|            0|   N|                Y|
|R100GSKB9U47P4|          4|            0|   N|                Y|
|R100VXLBDQG2SS|          5|            0|   N|                Y|
|R1017N6BWQZ2TE|          2|            4|   N|                Y|
|R101H8VGT76VOY|          5|            1|   N|                Y|
|R101KG8CEJW5YL|          5|            0|   N|                Y|
|R101NWAG076S4S|          5|            0|   N|                Y|
|R101XRVBKZ07YQ|          5|            0|   N|                Y|
|R10250IF3T5MLZ|          5|            1|   N|                Y|
|R1025GR0EAZRKZ|          5|            0|   N|                Y|
|R102HS0E0WZXJG|          5|            0|   N|                Y|
|R102OM19P

In [18]:
vine_df.count()

4864249

### Connect to the AWS RDS instance and write each DataFrame to its table. 

In [19]:
# Configure settings for RDS
#  jdbc:sqlserver://[serverName[\instanceName][:portNumber]][;property=value[;property=value]]
mode = "append"
#jdbc_url="jdbc:postgresql://dbmodule16viz.cfp4cgpvvsmv.us-east-2.rds.amazonaws.com:5432/dbmodule16viz"
jdbc_url="jdbc:postgresql://mypostgresdb.cfp4cgpvvsmv.us-east-2.rds.amazonaws.com:5432/my_16_db"

config = {"user":"postgres", 
          "password": "dbPASS16", 
          "driver":"org.postgresql.Driver"}

In [30]:
# Write review_id_df to table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [21]:
review_id_df.count()

4864249

In [22]:
products_df.count()  # check how many rows in dataframe 

664062

In [23]:
# Create the products_table DataFrame and drop duplicates  of product_id  rows as table has product_id is primary key and unique
products_df = df.select(["product_id","product_title"]).drop_duplicates(['product_id'])

In [31]:
# Write products_df to table in RDS
# about 3 min
products_df.write.jdbc(url=jdbc_url, table='products_table', mode=mode, properties=config)      ###  on execution got 664062 rows

In [32]:
# Write customers_df to table in RDS
# 5 min 14 s
customers_df.write.jdbc(url=jdbc_url, table='customers_table', mode=mode, properties=config)    ##  on execution  received 2561107 rows

In [33]:
# Write vine_df to table in RDS
# 11 minutes
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)

In [39]:
#  Deliverable 2: Determine Bias of Vine Reviews 
# Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20 to pick reviews that are more likely to be helpful and to avoid having division by zero errors later on.
# Create new vine. DataFrame

vdf = df.select(["star_rating", "helpful_votes", "total_votes", "vine", "verified_purchase"])

#vine_df_filtered  = df.filter(df['total_votes'] >= 20).select(['review_id', 'star_rating','helpful_votes','total_votes','vine','verified_purchase'])

In [40]:
#  Filter the data and create a new DataFrame or table to retrieve all the rows where the total_votes count is equal to or greater than 20 to pick reviews that are more likely to be helpful and to avoid having division by zero errors later on.
filtered_vdf = vdf.filter(vdf["total_votes"] >= 20)

In [41]:
# Find the percentage of helpful_votes to total_votes

filtered_vdf_2 = filtered_vdf.filter(filtered_vdf["helpful_votes"]/filtered_vdf["total_votes"] >= 0.5)

In [42]:
filtered_vdf_2.show()

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          5|           23|         27|   N|                Y|
|          1|           21|         21|   N|                Y|
|          5|           19|         20|   Y|                N|
|          4|           59|         81|   N|                N|
|          5|           22|         23|   Y|                N|
|          3|           33|         33|   Y|                N|
|          4|           50|         50|   N|                N|
|          3|           84|         84|   N|                Y|
|          5|           20|         20|   N|                Y|
|          5|           22|         22|   N|                N|
|          1|           28|         36|   N|                Y|
|          5|           42|         43|   N|                N|
|          4|           32|         34|   N|           

In [45]:
#  from above df filter out  not paid records 
not_paid_five_star_df = filtered_vdf_2.filter(col('vine')=='N').select([ 'star_rating','helpful_votes','total_votes','vine','verified_purchase']) 

not_paid_five_star_df.show()

+-----------+-------------+-----------+----+-----------------+
|star_rating|helpful_votes|total_votes|vine|verified_purchase|
+-----------+-------------+-----------+----+-----------------+
|          5|           23|         27|   N|                Y|
|          1|           21|         21|   N|                Y|
|          4|           59|         81|   N|                N|
|          4|           50|         50|   N|                N|
|          3|           84|         84|   N|                Y|
|          5|           20|         20|   N|                Y|
|          5|           22|         22|   N|                N|
|          1|           28|         36|   N|                Y|
|          5|           42|         43|   N|                N|
|          4|           32|         34|   N|                N|
|          5|           29|         33|   N|                N|
|          5|           95|         95|   N|                Y|
|          5|          124|        127|   N|           

In [50]:
not_paid_five_star_count = not_paid_five_star_df[not_paid_five_star_df['star_rating']== 5].count()
not_paid_total_count = not_paid_five_star_df.count()
percentage_five_star_vine_not_paid = float(not_paid_five_star_count) / float(not_paid_total_count) *100
print(not_paid_five_star_count)
print(not_paid_total_count)
print(percentage_five_star_vine_not_paid)

29982
62028
48.33623524859741


In [47]:
#  from above df filter out  paid records 
paid_five_star_df = filtered_vdf_2.filter(col('vine')=='Y').select([ 'star_rating','helpful_votes','total_votes','vine','verified_purchase']) 

In [49]:
paid_five_star_count = paid_five_star_df[paid_five_star_df['star_rating']== 5].count()
paid_total_count = paid_five_star_df.count()
percentage_five_star_vine_paid = float(paid_five_star_count) / float(paid_total_count)  *100
print(paid_five_star_count)
print(paid_total_count)
print(percentage_five_star_vine_paid)

432
1266
34.12322274881517
