In [None]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.1'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()




In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url="https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz" 

spark.sparkContext.addFile(url)
user_data_df = spark.read.csv(SparkFiles.get("amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"), sep=",", header=True, inferSchema=True)

# Show DataFrame
user_data_df.show()

In [None]:
user_data_df.count()

In [None]:
from pyspark.sql.functions import to_date

reviewdf =user_data_df.select(['review_id','customer_id','product_id','product_parent',to_date('review_date','yyyy-MM-dd').alias('review_date')])
reviewdf.show()

In [None]:
productdf = user_data_df.select(['product_id','product_title']).drop_duplicates()
productdf.show()

In [None]:
customerdf = user_data_df.groupby('customer_id').agg({'customer_id':'count'}).withColumnRenamed('count(customer_id)','customer_count')
customerdf.show()

In [None]:
vinedf =user_data_df.select(['review_id','star_rating','helpful_votes','total_votes','vine'])
vinedf.show() 

In [None]:
mode='append'
jdbc_url ='jdbc:postgresql://mypostgresdb.cquiod3wcd3k.us-east-1.rds.amazonaws.com:5432/postgres'
config={'user':'root','password':'','driver':'org.postgresql.Driver'}

In [None]:
reviewdf.write.jdbc(url=jdbc_url, table ='review_id', mode =mode, properties=config)

In [None]:
productdf.write.jdbc(url=jdbc_url, table ='product_id', mode=mode,properties=config)

In [None]:
customerdf.write.jdbc(url=jdbc_url, table='customer_id', mode=mode, properties=config)

In [None]:
vinedf.write.jdbc(url=jdbc_url, table='vine', mode=mode, properties=config)