In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.2.0'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
beauty_url="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Beauty_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
beauty_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Beauty_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True, timestampFormat="yyyy/mm/dd")

# Show DataFrame
beauty_df.show()

In [None]:
# Drop duplicates and incomplete rows, and count
print(beauty_df.count())
beauty_df = beauty_df.dropna()
print(beauty_df.count())
beauty_df = beauty_df.dropDuplicates()
print(beauty_df.count())

In [None]:
# Examine the schema
beauty_df.printSchema()

In [None]:
# Create review table for sql 
beauty_review_df = beauty_df.select(["review_id", "customer_id", "product_id", "product_parent", "review_date"])
beauty_review_df.show(5)

In [None]:
# Create product table for sql
beauty_product_df = beauty_df.select(["product_id", "product_title"]).groupby("product_id")
beauty_product_df.show(5)

In [None]:
# Create customer table for sql
beauty_customer_df = beauty_df.groupby("customer_id").agg({"customer_count": "count"})
beauty_customer_df.show(5)

In [None]:
# Create vine table for sql
beauty_vine_df = beauty_df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
beauty_vine_df.show(5)