## Big Data Challenge notebook #2
### Goals

1) Perform the ETL process completely in the cloud and upload a dataframe to an RDS instance

2) Use PySpark or SQL to perform a statistical analysis of selected data.

3) I am using my local machine to run spark so I can load the data to my local postgres database and avoid paying Amazon. 

In [17]:
# https://medium.com/beeranddiapers/installing-apache-spark-on-mac-os-ce416007d79f
# installed spark on mac using brew and modified bash_profile 
import os
SPARK_VERSION = 'spark-3.3.1'
import findspark
findspark.init()
from dotenv import load_dotenv
from pyspark.sql.functions import col,to_date,count
DB_HOST = os.getenv('DB_HOST')
DB_NAME = os.getenv('DB_NAME')
DB_USER = os.getenv('DB_USER')
DB_PASS = os.getenv('DB_PASS')
load_dotenv()

True

In [18]:
# Start Spark session on local machine
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("CloudETLProject").getOrCreate()

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark write to postgres example") \
    .config("spark.jars", "/usr/local/Cellar/apache-spark/3.2.0/libexec/jars/postgresql-42.2.9.jar") \
    .getOrCreate()


In [19]:
from pyspark import SparkFiles
# Load in user_data.csv from S3 into a DataFrame
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Watches_v1_00.tsv.gz"
spark.sparkContext.addFile(url)

In [20]:
# 
df = spark.read.option('header', 'true').csv(SparkFiles.get("amazon_reviews_us_Watches_v1_00.tsv.gz"), inferSchema=True, sep='\t', timestampFormat="mm/dd/yy")
# df.show(10)

In [21]:
# Count number of records in the spark dataframe
df.count()

960872

In [22]:
# Print schema
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: string (nullable = true)



In [23]:
# Create review dataframe
watch_reviews = df.select("customer_id","product_id","product_parent","review_date")
watch_reviews.show()

+-----------+----------+--------------+-----------+
|customer_id|product_id|product_parent|review_date|
+-----------+----------+--------------+-----------+
|    3653882|B00FALQ1ZC|     937001370| 2015-08-31|
|   14661224|B00D3RGO20|     484010722| 2015-08-31|
|   27324930|B00DKYC7TK|     361166390| 2015-08-31|
|    7211452|B000EQS1JW|     958035625| 2015-08-31|
|   12733322|B00A6GFD7S|     765328221| 2015-08-31|
|    6576411|B00EYSOSE8|     230493695| 2015-08-31|
|   11811565|B00WM0QA3M|     549298279| 2015-08-31|
|   49401598|B00A4EYBR0|     844009113| 2015-08-31|
|   45925069|B00MAMPGGE|     263720892| 2015-08-31|
|   44751341|B004LBPB7Q|     124278407| 2015-08-31|
|    9962330|B00KGTVGKS|      28017857| 2015-08-31|
|   16097204|B0039UT5OU|     685450910| 2015-08-31|
|   51330346|B00MPF0XJQ|     767769082| 2015-08-31|
|    4201739|B003P1OHHS|     648595227| 2015-08-31|
|   26339765|B00R70YEOE|     457338020| 2015-08-31|
|    2692576|B000FVE3BG|     824370661| 2015-08-31|
|   44713366

In [24]:
watch_reviews.write \
    .mode("overwrite") \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/big_data") \
    .option("dbtable", "watch_reviews") \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .option("driver", "org.postgresql.Driver") \
    .save()

In [25]:
# Create products dataframe
watch_products = df.select("product_id","product_title")

In [26]:
watch_products.write \
    .mode("overwrite") \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/big_data") \
    .option("dbtable", "watch_products") \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .option("driver", "org.postgresql.Driver") \
    .save()

In [28]:
# Create vine dataframe
watches_vine = df.select("review_id","star_rating","helpful_votes","total_votes","vine")

In [29]:
watches_vine.write \
    .mode("overwrite") \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/big_data") \
    .option("dbtable", "watches_vine") \
    .option("user", DB_USER) \
    .option("password", DB_PASS) \
    .option("driver", "org.postgresql.Driver") \
    .save()

In [None]:
# customer counts
watch_customers = df.select(col('customer_id').cast('int')).groupBy('customer_id').agg(count('customer_id').alias("customer_count") )
watch_customers.show()