In [1]:
from tqdm import tqdm, tqdm_notebook

In [2]:
import findspark
findspark.init()
findspark.find()
import pyspark
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# from config import password 

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp
spark = SparkSession.builder.appName('pyspark_app') \
.config('spark.driver.extraClassPath','postgresql-42.2.11') \
.master('local[*]') \
.getOrCreate()


In [4]:
from pyspark import SparkFiles
from pyspark.sql.functions import to_date

In [5]:
#path for video games
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_Games_v1_00.tsv.gz"
#path for toys
# url2 = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Toys_v1_00.tsv.gz"

In [6]:
#add url file
spark.sparkContext.addFile(url)

In [8]:
df = spark.read.option("header", "true").csv(SparkFiles.get
                                             ("amazon_reviews_us_Video_Games_v1_00.tsv.gz"), inferSchema=True, sep='\t')

In [9]:
df.show(5)

+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|marketplace|customer_id|     review_id|product_id|product_parent|       product_title|product_category|star_rating|helpful_votes|total_votes|vine|verified_purchase|     review_headline|         review_body|        review_date|
+-----------+-----------+--------------+----------+--------------+--------------------+----------------+-----------+-------------+-----------+----+-----------------+--------------------+--------------------+-------------------+
|         US|   12039526| RTIS3L2M1F5SM|B001CXYMFS|     737716809|Thrustmaster T-Fl...|     Video Games|          5|            0|          0|   N|                Y|an amazing joysti...|Used this for Eli...|2015-08-31 00:00:00|
|         US|    9636577| R1ZV7R40OLHKD|B00M920ND6|     569686175|Tonsee 6 buttons ...| 

In [10]:
print(f'There are {df.count()} rows in the video game dataframe')

There are 1785997 rows in the video game dataframe


In [11]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)



In [12]:
#create column in correct date format
df = df.withColumn("date", to_date(df['review_date']))

In [13]:
df.printSchema()

root
 |-- marketplace: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: timestamp (nullable = true)
 |-- date: date (nullable = true)



In [14]:
#create reviews df
review_df = df.select(["review_id", "customer_id", "product_id", "product_parent", "date"])
review_df.show(5)

+--------------+-----------+----------+--------------+----------+
|     review_id|customer_id|product_id|product_parent|      date|
+--------------+-----------+----------+--------------+----------+
| RTIS3L2M1F5SM|   12039526|B001CXYMFS|     737716809|2015-08-31|
| R1ZV7R40OLHKD|    9636577|B00M920ND6|     569686175|2015-08-31|
|R3BH071QLH8QMC|    2331478|B0029CSOD2|      98937668|2015-08-31|
|R127K9NTSXA2YH|   52495923|B00GOOSV98|      23143350|2015-08-31|
|R32ZWUXDJPW27Q|   14533949|B00Y074JOM|     821342511|2015-08-31|
+--------------+-----------+----------+--------------+----------+
only showing top 5 rows



In [31]:
#create customer df with unique custy id
custy_df = df.groupBy('customer_id').count()

In [36]:
custy_df = custy_df.withColumnRenamed('count','customer_count')

In [37]:
custy_df.show(5)

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   48670265|             1|
|   49103216|             2|
|    1131200|             1|
|   43076447|             2|
|   46261368|             1|
+-----------+--------------+
only showing top 5 rows



In [16]:
#create prod df with unique prod id
prod_df = df.select(["product_id", "product_title"])
prod_df = prod_df.dropDuplicates(['product_id'])

In [17]:
prod_df.count()

65792

In [18]:
#create vine df
vine_df = df.select(["review_id", "star_rating", "helpful_votes", "total_votes", "vine"])
vine_df.show(5)

+--------------+-----------+-------------+-----------+----+
|     review_id|star_rating|helpful_votes|total_votes|vine|
+--------------+-----------+-------------+-----------+----+
| RTIS3L2M1F5SM|          5|            0|          0|   N|
| R1ZV7R40OLHKD|          5|            0|          0|   N|
|R3BH071QLH8QMC|          1|            0|          1|   N|
|R127K9NTSXA2YH|          3|            0|          0|   N|
|R32ZWUXDJPW27Q|          4|            0|          0|   N|
+--------------+-----------+-------------+-----------+----+
only showing top 5 rows



In [24]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://big-data-2.cpclfqan17p9.us-east-1.rds.amazonaws.com:5432/bd-homework"
config = {"user":"postgres", 
          "password": "rbulls14", 
          "driver":"org.postgresql.Driver"}

In [21]:
review_df.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)

In [38]:
custy_df.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)

Please use `tqdm.notebook.tqdm` instead of `tqdm.tqdm_notebook`
  """Entry point for launching an IPython kernel.


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

|<bar/>| 0/? [00:00<?, ?it/s]

In [39]:
prod_df.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)

In [None]:
vine_df.write.jdbc(url=jdbc_url, table='vine_table', mode=mode, properties=config)