## Dataframe Basics

In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.1'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 http://security.ubuntu.com/ubuntu bionic-security InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.152)] [Connected to cloud.r-pro0% [1 InRelease gpgv 88.7 kB] [Connecting to archive.ubuntu.com (91.189.88.152)                                                                               Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [1 InRelease gpgv 88.7 kB] [Waiting for headers] [Waiting for headers] [Wait                                                                               Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:8 https

In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

--2022-03-06 23:04:23--  https://jdbc.postgresql.org/download/postgresql-42.2.9.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 914037 (893K) [application/java-archive]
Saving to: ‘postgresql-42.2.9.jar.2’


2022-03-06 23:04:23 (5.74 MB/s) - ‘postgresql-42.2.9.jar.2’ saved [914037/914037]



In [None]:
# Start Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DataFrameBasics").config("spark.jars","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Pet_Products_v1_00.tsv.gz"
spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("amazon_reviews_us_Pet_Products_v1_00.tsv.gz"), sep="\t", header=True, quote="")

# Show DataFrame
df.show()

In [None]:
df.count()

2643619

In [None]:
from pyspark.sql.functions import col
df_review_id_table = df.select(['review_id','customer_id','product_id','product_parent','review_date']).distinct()
df_review_id_table = (df_review_id_table
                      .withColumn("customer_id", col("customer_id").cast("int"))
                      .withColumn("product_parent", col("product_parent").cast("int"))
                      .withColumn("review_date", col("review_date").cast("date")))
df_review_id_table.printSchema()

root
 |-- review_id: string (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: integer (nullable = true)
 |-- review_date: date (nullable = true)



In [None]:
df_review_id_table.count()


2643619

In [None]:
df_review_id_table.show()

+--------------+-----------+----------+--------------+-----------+
|     review_id|customer_id|product_id|product_parent|review_date|
+--------------+-----------+----------+--------------+-----------+
|R1003WQ2N568UJ|   30984122|B000HCJX2C|     561470701| 2015-07-16|
|R1004EZA3OASMT|   15509407|B0009YYQ4S|     455866213| 2011-12-14|
|R100EXNK731LTK|   37164434|B0038U5WO8|     233057226| 2015-05-25|
|R100PQSJMOLWP0|   26963189|B001ICPAZE|     709838409| 2011-12-28|
|R10168116JX8X2|   30974720|B0018CG2EC|     500296269| 2013-11-12|
|R10173AGXELGVQ|   33160607|B002DX8QNU|     602831505| 2015-08-28|
|R101EV570FB01A|   12321022|B00PUVABGC|     298139833| 2015-03-26|
|R101JMABBJBX6U|   51876859|B000W5SLB8|     956903177| 2014-05-03|
|R102BUG3QZXMY8|   11142660|B001MUPGQ0|     266396365| 2014-04-16|
|R102OI838NN3Z7|   11056168|B000YDUAMQ|     180607962| 2014-01-18|
|R102VGUEHU4MVS|   26684756|B0018CE8LQ|     855508227| 2014-07-16|
|R103E8CGPOZD6B|    8996047|B0053AS5VS|     840020528| 2014-01

In [None]:
df_products = df.select(['product_id','product_title']).distinct()
df_products.printSchema()

root
 |-- product_id: string (nullable = true)
 |-- product_title: string (nullable = true)



In [None]:
df_products.show()

+----------+--------------------+
|product_id|       product_title|
+----------+--------------------+
|B00134HSYS|Special Edition P...|
|B00BS6NPBG|High Tech Pet 6-P...|
|B000F930FS|PetSafe ScoopFree...|
|B000FJ9QTW|Insight ActiviToy...|
|B000ALY0OQ|SmartCat Bootsie'...|
|B00Q8ETIZ0|Dogloveit Rubber ...|
|B002VU2BA4|Kragonfly Interch...|
|B00QA3K3QM|Attmu Retractable...|
|B005DGHUC2|Zoo Med Laborator...|
|B000XY7C7C|All Four Paws, Th...|
|B004UUE26O|Animal Planet PET...|
|B008APML2C|Chuckit Medium Ul...|
|B00ZJN7T8E|1 Half Portion Ja...|
|B00NRZC8LY|Hide-A-Toy Hallow...|
|B001P3NU4E|Virbac C.E.T. Enz...|
|B003TEQ2U6|Jolly Pets Jolly ...|
|B00DJSNF0M|Multipet Lenny th...|
|B00FXVFEQG|Bags on Board Dur...|
|B005ORDWYA|Just One Bite II ...|
|B003E77OG4|  Petmate Sky Kennel|
+----------+--------------------+
only showing top 20 rows



In [None]:
df_customers = df_review_id_table.groupBy("customer_id").count().withColumnRenamed('count','customer_count')
df_customers.show()

+-----------+--------------+
|customer_id|customer_count|
+-----------+--------------+
|   21651970|             2|
|   13893272|             3|
|   48646654|             8|
|   46748849|             6|
|   43696557|             1|
|   22178798|             1|
|   51124356|             1|
|   22651936|             1|
|   50291430|            17|
|   43056892|             4|
|   27568723|            19|
|   22726580|             1|
|   25607575|             3|
|   52159850|             1|
|   21872019|             1|
|   35535470|             6|
|   47269154|             1|
|   11132915|             2|
|   43220840|             2|
|   14846974|             1|
+-----------+--------------+
only showing top 20 rows



In [None]:
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://mypostgresdb.cvhifjxfcutl.us-east-1.rds.amazonaws.com:5432/my_data_class_db"
config = {"user":"user", 
          "password": "password", 
          "driver":"org.postgresql.Driver"}


In [None]:
# Write DataFrame to table in RDS 

df_review_id_table.write.jdbc(url=jdbc_url, table='review_id_table', mode=mode, properties=config)
df_products.write.jdbc(url=jdbc_url, table='products', mode=mode, properties=config)
df_customers.write.jdbc(url=jdbc_url, table='customers', mode=mode, properties=config)