

```
# This is formatted as code
```

## Dataframe Basics

In [41]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [1 0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Co0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Get:4 http://security.ubuntu.com/ubuntu bionic-s

In [42]:
# connect to database
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-03-09 09:19:15--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.1’


2022-03-09 09:19:16 (4.48 MB/s) - ‘postgresql-42.2.9.jar.1’ saved [914037/914037]



In [43]:
# start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("HW_GC").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [44]:
# read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Gift_Card_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Gift_Card_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)

In [45]:
# count rows
df.count()

149086

In [46]:
# print df schema
df.dtypes

[('marketplace', 'string'),
 ('customer_id', 'int'),
 ('review_id', 'string'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('product_title', 'string'),
 ('product_category', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string'),
 ('verified_purchase', 'string'),
 ('review_headline', 'string'),
 ('review_body', 'string'),
 ('review_date', 'string')]

# Transform Dataset to fit tables in schema file

In [47]:
from pyspark.sql.types import DateType

# create review df
review_df = df.select('review_id','customer_id','product_id','product_parent','review_date')
review_df = review_df.withColumn('review_date',review_df['review_date'].cast(DateType()))
review_df.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R27ZP1F1CD0C3Y|   24371595|B004LLIL5A|     346014806| 2015-08-31|
| RJ7RSBCHUDNNE|   42489718|B004LLIKVU|     473048287| 2015-08-31|
|R1HVYBSKLQJI5S|     861463|B00IX1I3G6|     926539283| 2015-08-31|
|R2HAXF0IIYQBIR|   25283295|B00IX1I3G6|     926539283| 2015-08-31|
| RNYLPX611NB7Q|     397970|B005ESMGV4|     379368939| 2015-08-31|
|R3ALA9XXMBEDZR|   18513645|B004KNWWU4|     326384774| 2015-08-31|
|R3R8PHAVJFTPDF|   22484620|B004LLIKVU|     473048287| 2015-08-31|
|R18WWEK8OIXE30|   14765851|BT00CTP2EE|     775486538| 2015-08-31|
|R1EGUNQON2J277|   18751931|B004LLIKVU|     473048287| 2015-08-31|
|R21Z4M4L98CPU2|   15100528|B004W8D102|     595099956| 2015-08-31|
| R6JH7A117FHFA|    3559726|B004LLIKVU|     473048287| 2015-08-31|
|R1XZHS8M1GCGI7|   23413911|B004KNWWU4|     326384774| 2015-08

In [48]:
# verify datatype
review_df.dtypes

[('review_id', 'string'),
 ('customer_id', 'int'),
 ('product_id', 'string'),
 ('product_parent', 'int'),
 ('review_date', 'date')]

In [49]:
# create product df
product_df = df.select('product_id','product_title').drop_duplicates()
product_df.dtypes

[('product_id', 'string'), ('product_title', 'string')]

In [50]:
# create customer df
from pyspark.sql.types import IntegerType
counts_df = df.groupBy("customer_id").count().orderBy("customer_id")
counts_df = counts_df.withColumn('count',counts_df['count'].cast(IntegerType()))
customers_df = counts_df.withColumnRenamed('count', 'customer_count')
customers_df.dtypes

[('customer_id', 'int'), ('customer_count', 'int')]

In [51]:
# creating vine df
vine_df = df.select('review_id','star_rating','helpful_votes','total_votes','vine')
vine_df.dtypes

[('review_id', 'string'),
 ('star_rating', 'int'),
 ('helpful_votes', 'int'),
 ('total_votes', 'int'),
 ('vine', 'string')]

#Postgres Setup

In [52]:
# configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://mypostgresdb.cgjaxat2x40a.us-west-2.rds.amazonaws.com:5432/my_data_class_db"
config = {"user":"root", 
          "password": "condren2022", 
          "driver":"org.postgresql.Driver"}

In [53]:
# write df to RDS
review_df.write.jdbc(url=jdbc_url, table='gc_review_id_table', mode=mode, properties=config)

In [54]:
product_df.write.jdbc(url=jdbc_url, table='gc_products', mode=mode, properties=config)

In [55]:
customers_df.write.jdbc(url=jdbc_url, table='gc_customers', mode=mode, properties=config)

In [56]:
vine_df.write.jdbc(url=jdbc_url, table='gc_vine_table', mode=mode, properties=config)