In [1]:
# Install Java, Spark, and Findspark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [2]:
# Start Spark session
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Week16Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.9.jar").getOrCreate()

--2020-06-29 03:21:29--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.4’


2020-06-29 03:21:30 (1.04 MB/s) - ‘postgresql-42.2.9.jar.4’ saved [914037/914037]



In [3]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"

spark.sparkContext.addFile(url)
filename = "amazon_reviews_us_Video_Games_v1_00.tsv.gz"
df = spark.read.option("encoding","UTF-8").csv(SparkFiles.get(filename), sep="\t", header=True)

In [4]:
df_count = df.count()
df_count

1785997

In [5]:
# Show DataFrame
df.show()

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-----------+
|         US|   12039526| RTIS3L2M1F5SM|B001CXYMFS|     737716809|Thrustmaster T-Fl...|     Video Games|          5|            0|          0|   N|                Y|an amazing joysti...|Used this for Eli...| 2015-08-31|
|         US|    9636577| R1ZV7R40OLHKD|B00M920ND6|     569686175|Tonsee 6 buttons ...|     Video Games|          5|    

In [23]:
products_df = df.select("product_id", "product_title").distinct()
products_df.show()
products_df.count()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00CJ7IUI6|The Elder Scrolls...|
|B00DHF39KS|Wolfenstein: The ...|
|B00MUTAVH6|Under Night In-Bi...|
|B001AZSEUW|              Peggle|
|B00KVOVBGM|PlayStation 4 Con...|
|B00O9VGH4Y|USPRO&reg; Headph...|
|B004OQNZY4|Phineas and Ferb:...|
|B00ZLN980O|Donop seablue 2.4...|
|B002L8W5V6|Dotop Nintendo Ga...|
|B007AJZ5PY|Nyko Game Case fo...|
|B000AOEU2K|Fire Emblem: Path...|
|B000H8BW7U|Tanarus (PC) (Com...|
|B013RADQOQ|Susenstone® 2400D...|
|B00KQXKUJ2|FIFA 15 (Ultimate...|
|B006W41X2C|Turtle Beach - Ea...|
|B000KCX9M4|Grand Theft Auto:...|
|B00YT90JWC|Red Wii Mini Cons...|
|B0096KG6A8|Wii U Super Mario...|
|B00L6AVLB0|World of Tanks-X3...|
|B000IMYKQ0|Wii Nunchuk Contr...|
+----------+--------------------+
only showing top 20 rows



65792

In [51]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import unix_timestamp
from pyspark.sql.functions import from_unixtime

review_df = df.select("review_id", "customer_id", "product_id", "product_parent", "review_date").distinct()
review_df = review_df.withColumn("customer_id", review_df["customer_id"].cast(IntegerType()))
review_df = review_df.withColumn("product_parent", review_df["product_parent"].cast(IntegerType()))
review_df = review_df.withColumn("review_date", from_unixtime(unix_timestamp('review_date', 'YYYY-MM-dd')).cast("timestamp"))

review_df.show()
review_df.count()

+--------------+-----------+----------+--------------+-------------------+
|     review_id|customer_id|product_id|product_parent|        review_date|
+--------------+-----------+----------+--------------+-------------------+
|R25HN3SIQZIUWF|   19145185|B00MV7KVP4|      92208606|2014-12-28 00:00:00|
|R1DB0RXBWJTSZA|   17033222|B00BU3ZLJQ|     860676261|2014-12-28 00:00:00|
|R17XE0M8S59SQ8|   10589108|B000069TDF|      13629273|2014-12-28 00:00:00|
|R1MK4ENBTCKHCG|    2498811|B00VETEZ34|      53980133|2014-12-28 00:00:00|
|R3VC0HCN87EN7F|   22788662|B00ZSJUTCM|     838780144|2014-12-28 00:00:00|
|R1TEKXJZIWTFNF|   44777724|B00MV7KVP4|      92208606|2014-12-28 00:00:00|
|R3J78BD5M97ZHB|   25766524|B00EZV6HHU|     899419151|2014-12-28 00:00:00|
|R140YF99L4RFAI|   30351861|B000P46NMK|     482320488|2014-12-28 00:00:00|
|R3NLEZBP3TUUGZ|   49915281|B00KM66UFQ|     703572787|2014-12-28 00:00:00|
| RRK4AUSOSHZD6|    3887193|B00KVP78FE|     177244653|2014-12-28 00:00:00|
|R2VRSUZ3YSEU2X|   335535

1785997

In [54]:
vine_df = df.select("review_id", "star_rating", "helpful_votes", "total_votes", "vine").distinct()
vine_df = vine_df.withColumn("star_rating", vine_df["star_rating"].cast(IntegerType()))
vine_df = vine_df.withColumn("helpful_votes", vine_df["helpful_votes"].cast(IntegerType()))
vine_df = vine_df.withColumn("total_votes", vine_df["total_votes"].cast(IntegerType()))
vine_df.show()
vine_df.count()

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
|R2MTHHQM6RSDQK|          4|            0|          1|   N|
|R26I4OP3M9BC2K|          5|            0|          0|   N|
|R2P5HQ648QPWVG|          5|            0|          0|   N|
|R293SZSOOV2DMC|          5|            0|          0|   N|
|R3793DQNKX2INH|          5|            0|          0|   N|
|R1IFTWQW79OCWE|          5|            0|          0|   N|
|R11K7B8ZVWT5J2|          5|            0|          0|   N|
|R1LE118TIRBPKA|          5|            0|          1|   N|
| RB9GZU9BZNUEM|          5|            0|          0|   N|
|R2RO1I6G4TOSYD|          4|            0|          1|   N|
| RXRUAMUV19IY8|          1|            0|          0|   N|
|R2F84MTDEMY5E3|          5|            0|          0|   N|
|R18XB0G8PFU91B|          1|            3|          7|   N|
|R3FVPRCTKXBJO0|          5|            

1785997

In [57]:
from pyspark.sql.functions import countDistinct
customers_df = df.select("customer_id").distinct()
customers_df = df.groupBy("customer_id").agg(countDistinct("customer_id"))
customers_df = customers_df.withColumnRenamed("count(DISTINCT customer_id)", "customer_count")
customers_df = customers_df.withColumn("customer_id", customers_df["customer_id"].cast(IntegerType()))
customers_df.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|     669612|             1|
|     706239|             1|
|    2042590|             1|
|   37372816|             1|
|   26691950|             1|
|   22429746|             1|
|    9748608|             1|
|   18699756|             1|
|   12052612|             1|
|     624822|             1|
|   11599387|             1|
|   45477156|             1|
|     670025|             1|
|   35965063|             1|
|   45871365|             1|
|   20595499|             1|
|   19359440|             1|
|   12002337|             1|
|   43534753|             1|
|     433865|             1|
+-----------+--------------+
only showing top 20 rows



In [37]:
customers_df.count()

1045733

In [30]:
mode = "append"
end_point = "berkeley-data-db-2.cuctescucrhh.us-east-2.rds.amazonaws.com"
db_name = "postgres"
jdbc_url="jdbc:postgresql://" + end_point + ":5432/" + db_name
config = {"user":"postgres", 
          "password": "Pavelkoo1984", 
          "driver":"org.postgresql.Driver"}

In [31]:
products_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [52]:
review_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [55]:
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)

In [58]:
customers_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)