In [287]:
import pandas as pd
import io

import os
# Find the latest version of spark 2.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
spark_version = 'spark-3.0.2'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
Get:3 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:5 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:6 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:7 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Hit:8 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:11 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:12 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:13 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:14 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [74.6 kB]
Hit:15 http://ppa.launchpad.net/grap

In [288]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2021-05-02 14:48:55--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.4’


2021-05-02 14:48:57 (1.21 MB/s) - ‘postgresql-42.2.16.jar.4’ saved [1002883/1002883]



In [289]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Wine-Project").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

**Load Data into Spark DataFrame**

In [290]:
from pyspark import SparkFiles
url = "https://green-team-wine.s3.us-east-2.amazonaws.com/winemag-data-130k-v2.csv"
spark.sparkContext.addFile(url)
wine_df = spark.read.csv(SparkFiles.get("winemag-data-130k-v2.csv"), sep=",", header=True, inferSchema=True)
wine_df.show()

+---+---------+--------------------+--------------------+------+-----+-----------------+-------------------+-----------------+------------------+---------------------+--------------------+------------------+-------------------+
|_c0|  country|         description|         designation|points|price|         province|           region_1|         region_2|       taster_name|taster_twitter_handle|               title|           variety|             winery|
+---+---------+--------------------+--------------------+------+-----+-----------------+-------------------+-----------------+------------------+---------------------+--------------------+------------------+-------------------+
|  0|    Italy|Aromas include tr...|        Vulkà Bianco|    87| null|Sicily & Sardinia|               Etna|             null|     Kerin O’Keefe|         @kerinokeefe|Nicosia 2013 Vulk...|       White Blend|            Nicosia|
|  1| Portugal|This is ripe and ...|            Avidagos|    87| 15.0|            Douro|

In [291]:
wine_df.count()

129975

In [292]:
url = "https://green-team-wine.s3.us-east-2.amazonaws.com/US_AL_cseat13t19.csv"
spark.sparkContext.addFile(url)
weather_df = spark.read.csv(SparkFiles.get("US_AL_cseat13t19.csv"), sep=",", header=True, inferSchema=True)
weather_df.show()

+-----------+-------------------+--------+---------+---------+----------+----+----+----+----+----+----+----+----+
|    STATION|               NAME|LATITUDE|LONGITUDE|ELEVATION|      DATE|AWND|PRCP|SN32|SNOW|SX32|TAVG|TMAX|TMIN|
+-----------+-------------------+--------+---------+---------+----------+----+----+----+----+----+----+----+----+
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|    167.6|2013-01-01|null|1.73|null|null|null|null|  51|  33|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|    167.6|2013-01-02|null| 0.0|null|null|null|null|  51|  33|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|    167.6|2013-01-03|null| 0.0|null|null|null|null|  51|  31|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|    167.6|2013-01-04|null| 0.0|null|null|null|null|  45|  28|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|    167.6|2013-01-05|null| 0.0|null|null|null|null|  45|  28|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|    167.6|2013-01-06|null| 0.0|null|

In [293]:
weather_df.count()

116849

**Create DataFrames to match tables**

In [339]:
#Select only the columns that will be stored into the RDS database

from pyspark.sql.functions import col

wine_table_df = wine_df.select(col("country").alias("country"), col("points").alias("points"), col("province").alias("province"), col("region_1").alias("region_1"), col("region_2").alias("region_2"), col("title").alias("title"), col("variety").alias("variety"), col("winery").alias("winery")).where(col("country").isNotNull())
wine_table_df.show()


+---------+------+-----------------+-------------------+-----------------+--------------------+------------------+-------------------+
|  country|points|         province|           region_1|         region_2|               title|           variety|             winery|
+---------+------+-----------------+-------------------+-----------------+--------------------+------------------+-------------------+
|    Italy|    87|Sicily & Sardinia|               Etna|             null|Nicosia 2013 Vulk...|       White Blend|            Nicosia|
| Portugal|    87|            Douro|               null|             null|Quinta dos Avidag...|    Portuguese Red|Quinta dos Avidagos|
|       US|    87|           Oregon|  Willamette Valley|Willamette Valley|Rainstorm 2013 Pi...|        Pinot Gris|          Rainstorm|
|       US|    87|         Michigan|Lake Michigan Shore|             null|St. Julian 2013 R...|          Riesling|         St. Julian|
|       US|    87|           Oregon|  Willamette Valley

In [340]:
wine_table_df.printSchema()

root
 |-- country: string (nullable = true)
 |-- points: string (nullable = true)
 |-- province: string (nullable = true)
 |-- region_1: string (nullable = true)
 |-- region_2: string (nullable = true)
 |-- title: string (nullable = true)
 |-- variety: string (nullable = true)
 |-- winery: string (nullable = true)



In [341]:
#Change "points" column datatype from string to integer
from pyspark.sql.types import IntegerType
wine_table_df = wine_table_df.withColumn("points", wine_table_df["points"].cast(IntegerType()))

In [342]:
wine_table_df.printSchema()

root
 |-- country: string (nullable = true)
 |-- points: integer (nullable = true)
 |-- province: string (nullable = true)
 |-- region_1: string (nullable = true)
 |-- region_2: string (nullable = true)
 |-- title: string (nullable = true)
 |-- variety: string (nullable = true)
 |-- winery: string (nullable = true)



In [343]:
wine_table_df.count()

129912

In [344]:
#Select only the columns that will be stored into the RDS database

from pyspark.sql.functions import col
from pyspark.sql.functions import to_date

weather_table_df = weather_df.select(col("STATION").alias("station"), col("NAME").alias("countyname"), col("LATITUDE").alias("latitude"), col("LONGITUDE").alias("longitude"), to_date("DATE", 'yyyy-MM-dd').alias("date"), col("TMAX").alias("tmax"), col("TMIN").alias("tmin"))
weather_table_df.show()

+-----------+-------------------+--------+---------+----------+----+----+
|    station|         countyname|latitude|longitude|      date|tmax|tmin|
+-----------+-------------------+--------+---------+----------+----+----+
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-01|  51|  33|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-02|  51|  33|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-03|  51|  31|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-04|  45|  28|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-05|  45|  28|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-06|  48|  28|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-07|  48|  28|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-08|  61|  34|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-09|  59|  33|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-10|  59|  50|
|USC00012209|DECATUR 4 SE, AL US|34.57

In [345]:
weather_table_df.printSchema()

root
 |-- station: string (nullable = true)
 |-- countyname: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- date: date (nullable = true)
 |-- tmax: integer (nullable = true)
 |-- tmin: integer (nullable = true)



In [346]:
weather_table_df.count()

116849

In [347]:
# Remove rows that have at least 1 null value
weather_table_df = weather_table_df.dropna()
#weather_table_df.head(10)
weather_table_df.count()

82094

**Connect to the AWS RDS instance and write each DataFrame to its table**

In [348]:
# Configure settings for RDS
mode = "append"
#jdbc_url="jdbc:postgresql://<connection string>:5432/<database-name>"
jdbc_url="jdbc:postgresql://wine-final-project.czqkltznl3rl.us-east-2.rds.amazonaws.com/winedb"
#config = {"user":"postgres",
          #"password": "<password>",
          #"driver":"org.postgresql.Driver"}
config = {"user":"wineuser", 
          "password": "############",
          "driver":"org.postgresql.Driver"}

In [349]:
wine_table_df

DataFrame[country: string, points: int, province: string, region_1: string, region_2: string, title: string, variety: string, winery: string]

In [350]:
# Write wine_table_df to table in RDS
wine_table_df.write.jdbc(url=jdbc_url, table='wine', mode=mode, properties=config)

In [351]:
# Write weather_table_df to table in RDS
weather_table_df.write.jdbc(url=jdbc_url, table='weather', mode=mode, properties=config)

In [352]:
# Read wine table from RDS
spark.read.jdbc(url=jdbc_url, table='wine', properties=config).limit(10).show()

+------------+------+------------+--------------------+---------------+--------------------+--------------------+-------------------+
|     country|points|    province|            region_1|       region_2|               title|             variety|             winery|
+------------+------+------------+--------------------+---------------+--------------------+--------------------+-------------------+
|   Australia|    90|    Victoria|        Yarra Valley|           null|Giant Steps 2006 ...|          Chardonnay|        Giant Steps|
|          US|    90|  Washington|        Red Mountain|Columbia Valley|Gorman 2006 The P...|               Syrah|             Gorman|
|          US|    90|  Washington|Columbia Valley (WA)|Columbia Valley|Guardian 2006 Cha...|           Red Blend|           Guardian|
|          US|    90|  Washington|        Red Mountain|Columbia Valley|Barrister 2005 Ar...|              Merlot|          Barrister|
|      France|    90|    Bordeaux|        Saint-Julien|       

In [353]:
# Read weather table from RDS
spark.read.jdbc(url=jdbc_url, table='weather', properties=config).limit(10).show()

+-----------+-------------------+--------+---------+----------+----+----+
|    station|         countyname|latitude|longitude|      date|tmax|tmin|
+-----------+-------------------+--------+---------+----------+----+----+
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-01|51.0|33.0|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-02|51.0|33.0|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-03|51.0|31.0|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-04|45.0|28.0|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-05|45.0|28.0|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-06|48.0|28.0|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-07|48.0|28.0|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-08|61.0|34.0|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-09|59.0|33.0|
|USC00012209|DECATUR 4 SE, AL US|34.57556|-86.93389|2013-01-10|59.0|50.0|
+-----------+-------------------+-----