<a href="https://colab.research.google.com/github/skotagiri95/BigData_Amazon/blob/master/Amazon_BigData2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [20]:
# Connect to postgresql
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2020-05-08 05:47:05--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.2’


2020-05-08 05:47:06 (1.05 MB/s) - ‘postgresql-42.2.9.jar.2’ saved [914037/914037]



In [0]:
# Start a SparkSession
import findspark
findspark.init()
import pyspark

# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AWS_Mobile_Reviews").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

In [0]:
!wget -q https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Mobile_Electronics_v1_00.tsv.gz
!gunzip amazon_reviews_us_Mobile_Electronics_v1_00.tsv.gz

gzip: amazon_reviews_us_Mobile_Electronics_v1_00.tsv already exists; do you wish to overwrite (y or n)? 

In [0]:
# Load in a sql function to use columns
from pyspark.sql.functions import col
import pyspark.sql.functions as F 

from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Mobile_Electronics_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
mobile_electronics_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Mobile_Electronics_v1_00.tsv.gz"), sep="\t", header=True)

# Show DataFrame
mobile_electronics_df.show()


In [0]:
# original dataset information
print((df.count(), len(df.columns)))
# check data types
df.printSchema

In [0]:
# Import struct fields that we can use
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, DateType

In [0]:
schema = [StructField('marketplace', StringType(), True), StructField('customer_id', IntegerType(), True),
          StructField('review_id', StringType(), True), StructField('product_id', StringType(), True),
          StructField('product_category', StringType(), True), StructField('star_rating', IntegerType(), True),
          StructField('helpful_votes', IntegerType(), True), StructField('total_votes', IntegerType(), True),
          StructField('vine', StringType(), True), StructField('verified_purchase', StringType(), True),
          StructField('review_headline', StringType(), True), StructField('review_body', StringType(), True),
          StructField('review_date', DateType(), True), ]
schema

In [0]:
final=StructType(fields=schema)
me_df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Mobile_Electronics_v1_00.tsv.gz"), schema=final, sep="\t", header=True)
me_df.printSchema()

In [0]:
clean_mobile = me_df.dropna()

In [0]:
review_id_table = clean_mobile.select(['review_id','customer_id','product_id','review_date'])
review_id_table.show()

In [0]:
products = clean_mobile.select(['product_id','product_title'])
products =products.dropDuplicates(['product_id'])
products.show()

In [0]:
vine_table = clean_mobile.select(['review_id','star_rating','helpful_votes','total_votes','vine'])
vine_table.show()

In [0]:
paid_vine_df = vine_table.filter(col('vine')=='Y')
paid_vine_df.orderBy(paid_vine_df['helpful_votes'].desc()).select('star_rating','helpful_votes','total_votes').show()


In [0]:
paid_vine_df.filter('star_rating = 5').select('helpful_votes').count()

In [0]:
# Configure setting for RDS
mode = 'append'
jdbc_url="jdbc:postgresql://dataviz.cbp360boojkd.us-east-2.rds.amazonaws.com:5431/AWS_PC_Reviews"
config = {"user":"skotagiri95",
          "password": "password",
          "driver":"org.postgresql.Driver"}

In [0]:
#Write DataFrame to review_id_table table in RDS
review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

# Write DataFrame to products table in RDS
products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

# Write DataFrame to customers table in RDS
customers.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

# Write DataFrame to vine_table table in RDS
vine_table.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)