In [1]:
# Import Dependencies
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

'apt-get' is not recognized as an internal or external command,
operable program or batch file.
The system cannot find the path specified.
'wget' is not recognized as an internal or external command,
operable program or batch file.
tar: Error opening archive: Failed to open '$SPARK_VERSION-bin-hadoop2.7.tgz'


Exception: Unable to find py4j, your SPARK_HOME may not be configured correctly

In [None]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### Load Amazon Data into Spark DataFrame

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Electronics_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Electronics_v1_00.tsv.gz"), sep="\t", header=True, inferSchema=True)
df.show()

### Create DataFrames to match tables

In [None]:
from pyspark.sql.functions import to_date
# Read in the Review dataset as a DataFrame
review_df = df.select(["review_id","customer_id","product_id","product_parent","product_title","review_date","star_rating", "helpful_votes","total_votes","vine","verified_purchase"])
review_df.show()

In [None]:
# Create the customers_table DataFrame
customers_df = review_df.groupby("customer_id").agg({"customer_id": 'count'}).withColumnRenamed("count(customer_id)", "customer_count")
customers_df.show()

In [None]:
# Create the products_table DataFrame and drop duplicates. 
products_df = review_df.select(["product_id","product_title"]).drop_duplicates(["product_id"])
products_df.show()

In [None]:
# Create the review_id_table DataFrame. 
# Convert the 'review_date' column to a date datatype with to_date("review_date", 'yyyy-MM-dd').alias("review_date")
review_id_df = review_df.select(["review_id","customer_id", "product_id","product_parent", to_date("review_date", 'yyyy-MM-dd').alias("review_date")])
review_id_df.show()

In [None]:
# Create the vine_table. DataFrame
vine_df = review_df.select(["review_id","star_rating","helpful_votes","total_votes","vine","verified_purchase"])
vine_df.show()

### Connect to the AWS RDS instance and write each DataFrame to its table. 

In [None]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://databootcamp-weise-postgres.cjl6bkg3rik6.us-west-1.rds.amazonaws.com:5432/Electronics_review_db"
config = {"user":"postgres", 
          "password": "", 
          "driver":"org.postgresql.Driver"}

In [None]:
# Write review_id_df to table in RDS
review_id_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [None]:
# Write products_df to table in RDS
# about 3 min
products_df.write.jdbc(url=jdbc_url, table='products_table', mode=mode, properties=config)

In [None]:
# Write customers_df to table in RDS
# 5 min 14 s
customers_df.write.jdbc(url=jdbc_url, table='customers_table', mode=mode, properties=config)

In [None]:
# Write vine_df to table in RDS
# 11 minutes
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)