In [3]:
import os
# Find the latest version of spark 3.2  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.2.2'
# spark_version = 'spark-3.'

os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

# Start a SparkSession
import findspark
findspark.init()


0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Waiting for headers] [Waiting for headers] [Waiting for headers] [Connectin0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Ign:2 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:3 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:4 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                           

In [4]:
#postgres connect
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-11-30 00:15:42--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar’


2022-11-30 00:15:43 (5.70 MB/s) - ‘postgresql-42.2.9.jar’ saved [914037/914037]



In [5]:
#Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [8]:
#read in data set 1
from pyspark import SparkFiles
url1 = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Major_Appliances_v1_00.tsv.gz"
spark.sparkContext.addFile(url1)
df_1 = spark.read.csv(SparkFiles.get("amazon_reviews_us_Major_Appliances_v1_00.tsv.gz"), sep="\t", header=True)

# Show DataFrame
df_1.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   16199106|R203HPW78Z7N4K|B0067WNSZY|     633038551|FGGF3032MW Galler...|Major Appliances|          5|            0|          0|   N|                Y|If you need a new...|What a great stov...| 2015-08-31|
|         US|   16374060|R2EAIGVLEALSP3|B002QSXK60|     811766671|Best Hand Clothes...|Major Appliances|          5|    

In [9]:
#rows in df
df_1.count()

96901

In [10]:
#read in data set 2
url2 = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Personal_Care_Appliances_v1_00.tsv.gz"
spark.sparkContext.addFile(url2)
df_2 = spark.read.csv(SparkFiles.get("amazon_reviews_us_Personal_Care_Appliances_v1_00.tsv.gz"), sep="\t", header=True)

df_2.show()

+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|    product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+--------------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   32114233|R1QX6706ZWJ1P5|B00OYRW4UE|     223980852|Elite Sportz Exer...|Personal_Care_App...|          5|            0|          0|   N|                Y|Good quality. Shi...|Exactly as descri...| 2015-08-31|
|         US|   18125776|R3QWMLJHIW6P37|B0000537JQ|     819771537|     Ezy Dose Weekly|Personal_Care_App

In [11]:
df_2.count()

85981

In [12]:
#both df
df_combo = df_1.union(df_2);
df_combo.count()

182882

In [13]:
#clean up data
#no duplicates
df = df_combo.dropDuplicates()
df.count()

182882

In [14]:
#check data types
df_combo.dtypes

[('marketplace', 'string'),
 ('customer_id', 'string'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'string'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'string'),
 ('helpful_votes', 'string'),
 ('total_votes', 'string'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'string')]

In [15]:
#need int, not str
from pyspark.sql.types import IntegerType
df = df.withColumn("customer_id", df["customer_id"].cast(IntegerType()))
df = df.withColumn("product_parent", df["product_parent"].cast(IntegerType()))
df = df.withColumn("star_rating", df["star_rating"].cast(IntegerType()))
df = df.withColumn("helpful_votes", df["helpful_votes"].cast(IntegerType()))
df = df.withColumn("total_votes", df["total_votes"].cast(IntegerType()))
df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'int'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'string')]

In [16]:
#df review_id
from pyspark.sql.functions import to_date

df_id = df.select([ "review_id"
                               , "customer_id"
                               , "product_id"
                               , "product_parent"
                               , to_date("review_date", "yyy-MM-dd").alias("review_date")
                               ]).dropDuplicates(["review_id"])
df_id.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R109P3IYHMUH31|   40588923|B000QF91YA|     228913016| 2009-11-30|
|R10RGSX89PKNLC|   24289428|B002YOJ70Y|     347542369| 2013-08-19|
|R10SFLSDK0T4VU|   18108829|B0002L5A8Y|     276226768| 2007-10-24|
|R10XNDOQPEWTCX|   19063567|B000SB4O8U|     251630444| 2011-11-12|
|R113T2WEL9HHS9|   10645016|B003NDS2EM|     888323415| 2013-01-09|
|R118R1V2ZS83U6|   39034871|B0057YO0IM|     611955694| 2013-06-19|
|R11C1CUSB1M017|   27670090|B007MU1X4I|     309878602| 2013-12-30|
|R11I56KYNUSNMQ|   30974418|B002HORJY2|     732980220| 2011-11-07|
|R11XA3W8K9XNM7|   10137785|B00K2MZJS4|     328081602| 2014-08-03|
|R124YO8LNNP361|   46412401|B00JG8B42K|     672722354| 2015-06-19|
|R12AALHK486N33|   43105079|B000EE5DTK|     315552854| 2009-08-18|
|R12ECLCPII68JO|   19930639|B001BLCZZU|     293902826| 2008-10

In [18]:
df_id.dtypes

[('review_id', 'string'),
 ('customer_id', 'int'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('review_date', 'date')]

In [27]:
df_id.count()

182882

In [20]:
#df products
df_products = df.select([ "product_id"
                        , "product_title"
                        ]).dropDuplicates(["product_id"]) 
df_products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B0000532OW|Ezy Dose Ezy Crus...|
|B0000AHSA2|Conair FB14S Ultr...|
|B00012FJ9O|Lobob Soft Contac...|
|B00013KQ3M|Ladies Tel Time G...|
|B00013WIH4|Eye Stic - Gunmet...|
|B00013WQ2G|OPTX 20/20 Lenses...|
|B0001AGMYM|The Invisible Clo...|
|B0006FUD1K|Purrfect Opener (...|
|B0006ON8LI|LeClaire & Bayot ...|
|B0008D7NDQ|Healing Dreams Mu...|
|B0009I571A|Littmann Lightwei...|
|B000AEFDP8|Revlon RV476C 187...|
|B000AQG5R6|NAIL ESQUE ACRYLI...|
|B000BMR820|3M Littmann Cardi...|
|B000BY382C|Resistance Chair ...|
|B000BYVFAY|Lady Bug Massager...|
|B000CMG10O|Coleman Pet Bone ...|
|B000CSWAEE|   Nylon Medical Bag|
|B000EP66MW|Hand Helper Thera...|
|B000F0WHTM|52 lb POSTAL STAM...|
+----------+--------------------+
only showing top 20 rows



In [25]:
df_products.dtypes

[('product_id', 'string'), ('product_title', 'string')]

In [26]:
df_products.count()

29256

In [21]:
#df customer table
df_numb_customer = df.groupBy("customer_id").count()
df_customer = df_numb_customer.select([ "customer_id"
                                        , "count"
                                        ]).dropDuplicates(["customer_id"]) 
df_customer = df_customer.withColumnRenamed("count", "customer_count")
df_customer.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   34760180|             1|
|   52436945|             1|
|   52619924|             1|
|   36351443|             1|
|   33266873|             2|
|   36569642|             1|
|   29221210|             1|
|   49667311|             1|
|   42260446|             1|
|   50296144|             1|
|   43967957|             1|
|   26923982|             1|
|   52763081|             1|
|   15675430|             1|
|   39416583|             1|
|   18273933|             1|
|   15864798|             2|
|   46424876|             1|
|    1810767|             1|
|   32117655|             1|
+-----------+--------------+
only showing top 20 rows



In [23]:
df_customer.dtypes

[('customer_id', 'int'), ('customer_count', 'bigint')]

In [24]:
df_customer.count()

172925

In [22]:
#df vine
df_vine = df.select([ "review_id"
                          , "star_rating"
                          , "helpful_votes"
                          , "total_votes"
                          , "vine"
                          ]).dropDuplicates(["review_id"])
df_vine.show()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R109P3IYHMUH31|          2|            2|          2|   N|
|R10RGSX89PKNLC|          5|            1|          1|   N|
|R10SFLSDK0T4VU|          5|            5|          5|   N|
|R10XNDOQPEWTCX|          5|            0|          0|   N|
|R113T2WEL9HHS9|          2|           12|         16|   N|
|R118R1V2ZS83U6|          1|            4|          6|   N|
|R11C1CUSB1M017|          1|            1|          1|   N|
|R11I56KYNUSNMQ|          5|            0|          0|   N|
|R11XA3W8K9XNM7|          5|            2|          2|   N|
|R124YO8LNNP361|          5|            0|          0|   N|
|R12AALHK486N33|          5|            0|          0|   N|
|R12ECLCPII68JO|          5|            0|         11|   N|
|R12I5DGPWF7EG3|          5|            0|          0|   N|
|R133TQ7PP5FL3K|          2|           1

In [28]:
df_vine.dtypes

[('review_id', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string')]

In [29]:
df_vine.count()

182882

In [None]:
# Configure settings for RDS
#had major issues with pgadmin running on my Mac (kept crashing), accepting points off for this portion of assignment - sorry
#know how to code this portion of assignment, however, simply just a pgadmin issue
from config import username, password, host 

database_name = "db_1"
port = 5432
mode = "append"
jdbc_url=f'jdbc:postgresql://{host}:{port}/{database_name}'
config = {"user":f'{username}',
          "password": f'{password}',
          "driver":"org.postgresql.Driver"}

In [None]:
#df id to db
try:
  df_id.write.jdbc(url=jdbc_url, table='review_id', mode=mode, properties=config)
except Exception as e:
  print(e)  

In [None]:
#df products to db
try:
  df_products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)
except Exception as e:
  print(e)  

In [None]:
#df customers to db
try:
  df_customer.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)
except Exception as e:
  print(e)  

In [None]:
#df vine to db
try:
  df_vine.write.jdbc(url=jdbc_url, table='vine_info', mode=mode, properties=config)
except Exception as e:
  print(e)  