## Dataframe Basics

In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.2.1'
spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION'] = spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.9.jar

In [None]:
from pyspark.sql import SparkSession
sparks = SparkSession.builder.appName("DataFrameBasics").config("spark.jars","/content/postgresql-42.2.9.jar").getOrCreate()

In [None]:
from pyspark import SparkFiles
url = "https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Pet_Products_v1_00.tsv.gz"
sparks.sparkContext.addFile(url)
data = sparks.read.csv(SparkFiles.get("amazon_reviews_us_Pet_Products_v1_00.tsv.gz"),
                       sep="\t",
                       header=True,
                       quote="")

data.show()

In [None]:
data.count()

In [None]:
from pyspark.sql.functions import col
review_ids = data.select(['review_id','customer_id','product_id','product_parent','review_date']).distinct()
review_ids = (review_ids
                      .withColumn("customer_id", col("customer_id").cast("int"))
                      .withColumn("product_parent", col("product_parent").cast("int"))
                      .withColumn("review_date", col("review_date").cast("date")))
review_ids.printSchema()

In [None]:
review_ids.count()


In [None]:
review_ids.show()

In [None]:
products = data.select(['product_id','product_title']).distinct()
products.printSchema()

In [None]:
products.show()

In [None]:
customers = review_ids.groupBy("customer_id").count().withColumnRenamed('count','customer_count')
customers.show()

In [None]:
mode = "append"
jdbc_url = "jdbc:postgresql://mypostgresdb.cvhifjxfcutl.us-east-1.rds.amazonaws.com:5432/my_data_class_db"
config = {"user": "user", 
          "password": "password", 
          "driver": "org.postgresql.Driver"}


In [None]:
review_ids.write.jdbc(url = jdbc_url,
                      table = 'review_id',
                      mode = mode, properties = config)
products.write.jdbc(url = jdbc_url, table = 'products',
                    mode = mode,
                    properties = config)
customers.write.jdbc(url = jdbc_url,
                     table = 'customers',
                     mode = mode, properties = config)