## Level 1 - Big Data

In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

In [None]:
# Import modules
from pyspark.sql import SparkSession
from pyspark import SparkFiles

# Build spark session
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath",
                                                        "/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
# Load in data
base_url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/"
file = "amazon_reviews_us_Pet_Products_v1_00.tsv.gz"
url = base_url + file
spark.sparkContext.addFile(url)

# Generate df
df = spark.read.option('header', 'true').csv(SparkFiles.get(file),
                                             inferSchema = True,
                                             sep = '\t',
                                             timestampFormat = "mm/dd/yy")
df.head()

In [None]:
# Clean up data
df = df.dropna()
df = df.dropDuplicates()

In [None]:
# Count number of rows
df.count()

In [None]:
# Check out schema
df.printSchema()

In [None]:
# Generate Products df
products = df[["product_id", "product_title"]]
print(products.head())

# Configure export
mode = "append"
jdbc_url = "jdbc:postgresql://rdsdb.cgx5vst360bw.us-east-2.rds.amazonaws.com:5432/dbforbigdata"
config = {"user":"postgres",
          "password": "###",
          "driver":"org.postgresql.Driver"}

# Export
products.write.jdbc(url = jdbc_url,
                    table = 'products',
                    mode = mode,
                    properties = config)

In [None]:
# Generate Review df
review_id_table = df[["review_id", "customer_id", "product_id", "product_parent", "review_date"]]
print(review_id_table.head())

# Export
review_id_table.write.jdbc(url = jdbc_url,
                           table = 'review_id_table',
                           mode = mode,
                           properties = config)

In [None]:
# Creat customers table
customers = df["customer_id"]
customers = customers.groupBy("customer_id").count()
customers = customers.withColumnRenamed("count","customer_count")
print(customers.head())

# Export
customers.write.jdbc(url = jdbc_url,
                     table = 'customers',
                     mode = mode,
                     properties = config)

In [None]:
# Create vine table
vine_table = df[["review_id", "star_rating", "helpful_votes", "total_votes", "vine"]]
print(vine_table.head())

# Export
vine_table.write.jdbc(url = jdbc_url, 
                      table = 'vine_table', 
                      mode = mode,
                      properties = ßconfig)