# Big Data Assignment

__Summary__

Many of Amazon's shoppers depend on product reviews to make a purchase. 
Amazon makes these datasets publicly available. However, they are quite
large and can exceed the capacity of local machines to handle. One dataset 
alone contains over 1.5 million rows.

__Goal__

Performing the `ETL process` completely `in the cloud` and upload a DataFrame
to an `RDS instance`, from two Amazon customer review data sets.

__Data sets__

- Shoes s3a://amazon-reviews-pds/tsv/amazon_reviews_us_Shoes_v1_00.tsv.g

- Apparel s3a://amazon-reviews-pds/tsv/amazon_reviews_us_Apparel_v1_00.tsv.gz


__Notes__

- Here is the code used in the Apache Zeppelin notebook to load the __Apparel__
data set. Apache Zeppelin is a web-based notebook which brings data exploration, 
visualization, sharing and collaboration features to Spark.

- The first data set loaded to the database was Shoes, hence here are more validations
to ensure that the constraints were enforced, like the uniqueness in the table 
`products` or in the table `customers`.



In [None]:
%pyspark
from pyspark import SparkFiles
# Load apparel data from S3 into a DataFrame

apparel_df = spark.read.option('header', 'true').csv("s3a://amazon-reviews-pds/tsv/amazon_reviews_us_Apparel_v1_00.tsv.gz", inferSchema=True, sep='\t')
apparel_df.show(10)

In [None]:
%pyspark
from pyspark.sql.functions import to_date

# review_date should be in the format yyyy-mm-dd
date_df = apparel_df.withColumn("date", to_date("review_date", "yyyy-mm-dd"))
date_df.show(10)

In [None]:
%pyspark
from pyspark.sql.functions import col
# Created data frame to match review_id_table
# CREATE TABLE review_id_table (review_id TEXT PRIMARY KEY NOT NULL, customer_id INTEGER, product_id TEXT, product_parent INTEGER, 
# review_date DATE -- this should be in the formate yyyy-mm-dd );

review_df = date_df.select(["review_id", "customer_id", "product_id", "product_parent", col("date").alias("review_date")])
review_df.show(10)

In [None]:
%pyspark
# Number of reviews
review_df.count()  # => 5906333

In [None]:
%pyspark
# Created data frame to match products table  -- This table will contain only unique values
# CREATE TABLE products (product_id TEXT PRIMARY KEY NOT NULL UNIQUE,product_title TEXT);

products_df = date_df.select(["product_id", "product_title"]).distinct()
products_df.show(10)

In [None]:
%pyspark
# Number of products
products_df.count()   # => 2305630

In [None]:
%pyspark
# Created data frame to match customer table -- Customer table for first data set
# CREATE TABLE customers (customer_id INT PRIMARY KEY NOT NULL UNIQUE,  customer_count INT);

counts_df = date_df.groupBy("customer_id").count().orderBy("customer_id")
counts_df.show(10)

In [None]:
%pyspark
# Check the data types
counts_df.dtypes

In [None]:
%pyspark
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col

# Change the name of the column and the data type
customers_df =  counts_df.select(["customer_id", col("count").cast(IntegerType()).alias("customer_count")])

In [None]:
%pyspark
# Check the changes
customers_df.dtypes

In [None]:
%pyspark
customers_df.dtypes

In [None]:
%pyspark
# Number of customers
customers_df.count()  # ->3228415

In [None]:
%pyspark
# Created data frame to match vine table
# CREATE TABLE vine_table (review_id TEXT PRIMARY KEY, star_rating INTEGER, helpful_votes INTEGER, total_votes INTEGER, vine TEXT);

vine_df = date_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.show(10)

In [None]:
%pyspark
# Number of vines
vine_df.count()

In [None]:
%pyspark
from pyspark.sql.functions import col
# Divide data set of vines to ensure the database insert

vine1_df = vine_df.filter(col("star_rating") == 1)
vine1_df.count()  # => 445456

In [None]:
%pyspark
# Divide data set of vines to ensure the database insert
vine2_df = vine_df.filter(col("star_rating") == 2)
vine2_df.count()  # => 369601

In [None]:
%pyspark
# Divide data set of vines to ensure the database insert
vine3_df = vine_df.filter(col("star_rating") == 3)
vine3_df.count()   # => 623471

In [None]:
%pyspark
from pyspark.sql.functions import col
# Divide data set of vines to ensure the database insert
vine4_df = vine_df.filter(col("star_rating") == 4)
vine4_df.count()   # => 1147237

In [None]:
%pyspark
# Divide data set of vines to ensure the database insert
vine5_df = vine_df.filter(col("star_rating") == 5)
vine5_df.count()  # => 3320557

In [None]:
%pyspark
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://<endpoiny>:<port>/<db>"
config = {"user":"<user>", 
          "password": "<pwd>", 
          "driver":"org.postgresql.Driver"}


In [None]:
%pyspark
# Append DataFrame to review_id_table in RDS
review_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

__Records from shoes and apparel data sets__

![Review](../Images/rev_sho_app.png)

In [None]:
%pyspark
# Get the products from the database
dbproducts_df = sqlContext.read.jdbc(url=jdbc_url, table='products', properties=config)
dbproducts_df.show(10)

In [None]:
%pyspark
# Join the two products data frames (right excluding join to have only the products that are not in the database)
new_prod_df = products_df.join(dbproducts_df, on="product_id", how="leftanti")
new_prod_df.show(10)

In [None]:
%pyspark
# See the product that raises an error during insert
new_prod_df.filter(new_prod_df.product_id == 'B00IVCPVNK').show(truncate=False)

In [None]:
%pyspark
# Drop duplicates of the new data frame that came from apparel data set
new_prod_df = new_prod_df.select(['product_id', 'product_title']).dropDuplicates(['product_id'])

In [None]:
%pyspark
# Check again the product that raises an error during insert
new_prod_df.filter(new_prod_df.product_id == 'B00IVCPVNK').show()

In [None]:
%pyspark
# Number of new unique products
new_prod_df.count()   #1798374  -- 1798371

In [None]:
%pyspark
# Write dataframe to products table in RDS
new_prod_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

__Records from shoes and apparel data sets__

![Product](../Images/prod_sho_app.png)

In [None]:
%pyspark
# Get the customers from the database
dbcustomers_df = sqlContext.read.jdbc(url=jdbc_url, table="customers", properties=config)
dbcustomers_df.show(10)

In [None]:
%pyspark
dbcustomers_df.count()  # => 2816830

In [None]:
%pyspark
# Join the two customers data frames (right excluding join to have only the customers that are not in the database)
new_cust_df = customers_df.join(dbcustomers_df, on="customer_id" , how="leftanti")
new_cust_df.show(10)

In [None]:
%pyspark
# Check if are duplicates by customer_id before trying to insert the records
if new_cust_df.count() > new_cust_df.dropDuplicates(['customer_id']).count():
    raise ValueError('Data has duplicates')

In [None]:
%pyspark
new_cust_df.count()  # => 2366059

In [None]:
%pyspark
# Select customers that already were in the database
same_cust_df = customers_df.select('customer_id', 'customer_count')
same_cust_df = same_cust_df.withColumnRenamed('customer_count', 'cdb_count')
same_cust_df.show(10)

In [None]:
%pyspark
# Join the two customers data frames (see which customers are in the database and in the data set)
same_c_df = same_cust_df.join(dbcustomers_df,  on="customer_id" , how="inner")
same_c_df.show(10)

In [None]:
%pyspark
same_c_df.count()   # => 862356

In [None]:
%pyspark
# Write dataframe to customers table in RDS
new_cust_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

__Records from shoes and apparel data sets__

![Customer](../Images/cust_sho_app.png)

In [2]:
%pyspark
# Write dataframe to customers table in RDS
same_c_df.write.jdbc(url=jdbc_url, table='repcustomers', mode=mode, properties=config)

UsageError: Line magic function `%pyspark` not found.


In [None]:
%pyspark
# Write dataframe to vine_table_app table in RDS
vine1_df.write.jdbc(url=jdbc_url, table='vine_table_app', mode=mode, properties=config)

In [None]:
%pyspark
# Write dataframe to vine_table_app table in RDS
vine2_df.write.jdbc(url=jdbc_url, table='vine_table_app', mode=mode, properties=config)

In [None]:
%pyspark
# Write dataframe to vine_table_app table in RDS
vine3_df.write.jdbc(url=jdbc_url, table='vine_table_app', mode=mode, properties=config)

In [None]:
%pyspark
# Write dataframe to vine_table_app table in RDS
vine4_df.write.jdbc(url=jdbc_url, table='vine_table_app', mode=mode, properties=config)

In [None]:
%pyspark
# Write dataframe to vine_table_app table in RDS
vine5_df.write.jdbc(url=jdbc_url, table='vine_table_app', mode=mode, properties=config)

__Only records from apparel data set__

![Vine](../Images/vine_app.png)