In [1]:
import os
# Find the latest version of spark 3.0  from http://www-us.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.0'
spark_version = 'spark-3.0.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install psycopg2-binary

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

# Import our dependencies
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import OneHotEncoder
from sklearn.linear_model import LogisticRegression
import pandas as pd
import numpy as np
import tensorflow as tf
from sqlalchemy import create_engine

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:9 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:11 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease
Get:13 http://archive.ubuntu.com/ubuntu bionic-backports

In [2]:
# enter the following code to download a Postgres driver that will allow Spark to interact with Postgres:
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2020-10-30 00:22:28--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar’


2020-10-30 00:22:29 (1.01 MB/s) - ‘postgresql-42.2.16.jar’ saved [1002883/1002883]



In [3]:
# start a Spark session with an additional option that adds the driver to Spark:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Wine_Weather").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()


In [4]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://wine5-bucket.s3.us-east-2.amazonaws.com/Red_Wine_Cleaned.csv"
spark.sparkContext.addFile(url)
Red_Wine_df = spark.read.csv(SparkFiles.get("Red_Wine_Cleaned.csv"), sep=",", header=True, inferSchema=True)

# Show DataFrame
Red_Wine_df.show()

+---+--------------------+-------+--------------------+-----+----------+-------+-------+-----------+-----+----------------+----------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+----------+--------------------+-----------+--------------------+-----------+-------------+-------------+--------------------+----------------+--------------------+--------------+
|_c0|                wine|wine_id|         appellation|color|   regions|country|vintage|is_primeurs|score|confidence_index|journalist_count|     avgPrcpFebruary|avgTempFebruary|        avgPrcpMarch|avgTempMarch|        avgPrcpApril|avgTempApril|          avgPrcpMay|avgTempMay|         avgPrcpJune|avgTempJune|         avgPrcpJuly|avgTempJuly|avgPrcpAugust|avgTempAugust|    avgPrcpSeptember|avgTempSeptember|      avgPrcpOctober|avgTempOctober|
+---+--------------------+-------+--------------------+-----+----------+-------+-------+-----------+-----+--

In [5]:
Red_Wine_df.count()

4098

In [14]:
list(Red_Wine_df.columns)

['_c0',
 'wine',
 'wine_id',
 'appellation',
 'color',
 'regions',
 'country',
 'vintage',
 'is_primeurs',
 'score',
 'confidence_index',
 'journalist_count',
 'avgPrcpFebruary',
 'avgTempFebruary',
 'avgPrcpMarch',
 'avgTempMarch',
 'avgPrcpApril',
 'avgTempApril',
 'avgPrcpMay',
 'avgTempMay',
 'avgPrcpJune',
 'avgTempJune',
 'avgPrcpJuly',
 'avgTempJuly',
 'avgPrcpAugust',
 'avgTempAugust',
 'avgPrcpSeptember',
 'avgTempSeptember',
 'avgPrcpOctober',
 'avgTempOctober']

In [6]:
# #Drop null values
# Red_Wine_dropna_df = Red_Wine_df.dropna()
# Red_Wine_dropna_df.show()

+---+--------------------+-------+--------------------+-----+----------+-------+-------+-----------+-----+----------------+----------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+----------+--------------------+-----------+--------------------+-----------+-------------+-------------+--------------------+----------------+--------------------+--------------+
|_c0|                wine|wine_id|         appellation|color|   regions|country|vintage|is_primeurs|score|confidence_index|journalist_count|     avgPrcpFebruary|avgTempFebruary|        avgPrcpMarch|avgTempMarch|        avgPrcpApril|avgTempApril|          avgPrcpMay|avgTempMay|         avgPrcpJune|avgTempJune|         avgPrcpJuly|avgTempJuly|avgPrcpAugust|avgTempAugust|    avgPrcpSeptember|avgTempSeptember|      avgPrcpOctober|avgTempOctober|
+---+--------------------+-------+--------------------+-----+----------+-------+-------+-----------+-----+--

In [15]:
# Create user dataframe to match table
Red_wine_table_df = Red_Wine_cleaned_df.select(['_c0',
 'wine',
 'wine_id',
 'appellation',
 'color',
 'regions',
 'country',
 'vintage',
 'is_primeurs',
 'score',
 'confidence_index',
 'journalist_count',
 'avgPrcpFebruary',
 'avgTempFebruary',
 'avgPrcpMarch',
 'avgTempMarch',
 'avgPrcpApril',
 'avgTempApril',
 'avgPrcpMay',
 'avgTempMay',
 'avgPrcpJune',
 'avgTempJune',
 'avgPrcpJuly',
 'avgTempJuly',
 'avgPrcpAugust',
 'avgTempAugust',
 'avgPrcpSeptember',
 'avgTempSeptember',
 'avgPrcpOctober',
 'avgTempOctober'])
Red_wine_table_df.show()

+---+--------------------+-------+--------------------+-----+----------+-------+-------+-----------+-----+----------------+----------------+--------------------+---------------+--------------------+------------+--------------------+------------+--------------------+----------+--------------------+-----------+--------------------+-----------+-------------+-------------+--------------------+----------------+--------------------+--------------+
|_c0|                wine|wine_id|         appellation|color|   regions|country|vintage|is_primeurs|score|confidence_index|journalist_count|     avgPrcpFebruary|avgTempFebruary|        avgPrcpMarch|avgTempMarch|        avgPrcpApril|avgTempApril|          avgPrcpMay|avgTempMay|         avgPrcpJune|avgTempJune|         avgPrcpJuly|avgTempJuly|avgPrcpAugust|avgTempAugust|    avgPrcpSeptember|avgTempSeptember|      avgPrcpOctober|avgTempOctober|
+---+--------------------+-------+--------------------+-----+----------+-------+-------+-----------+-----+--

In [16]:
engine = create_engine('postgresql://postgres:postgres@database-1.cslpjur96f9r.us-east-2.rds.amazonaws.com:5432')
Red_wine_table_df.toPandas().to_sql('red_table', engine)

In [17]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://wine5-bucket.s3.us-east-2.amazonaws.com/White_Wine_Cleaned.csv"
spark.sparkContext.addFile(url)
White_Wine_df = spark.read.csv(SparkFiles.get("White_Wine_Cleaned.csv"), sep=",", header=True, inferSchema=True)

# Show DataFrame
White_Wine_df.show()

+---+--------------------+-------+--------------------+-----+---------+----------+-------+-------+-----------+-----+----------------+----------------+-------------------+---------------+--------------------+------------+--------------------+------------+--------------------+----------+--------------------+-----------+--------------------+-----------+--------------------+-------------+--------------------+----------------+--------------------+--------------+
|_c0|                wine|wine_id|         appellation|color|wine_type|   regions|country|vintage|is_primeurs|score|confidence_index|journalist_count|    avgPrcpFebruary|avgTempFebruary|        avgPrcpMarch|avgTempMarch|        avgPrcpApril|avgTempApril|          avgPrcpMay|avgTempMay|         avgPrcpJune|avgTempJune|         avgPrcpJuly|avgTempJuly|       avgPrcpAugust|avgTempAugust|    avgPrcpSeptember|avgTempSeptember|      avgPrcpOctober|avgTempOctober|
+---+--------------------+-------+--------------------+-----+---------+-----

In [18]:
White_Wine_df.count()

732

In [19]:
# #Drop null values
# White_Wine_dropna_df = White_Wine_df.dropna()
# White_Wine_dropna_df.show()

+---+--------------------+-------+--------------------+-----+---------+----------+-------+-------+-----------+-----+----------------+----------------+-------------------+---------------+--------------------+------------+--------------------+------------+--------------------+----------+--------------------+-----------+--------------------+-----------+--------------------+-------------+--------------------+----------------+--------------------+--------------+
|_c0|                wine|wine_id|         appellation|color|wine_type|   regions|country|vintage|is_primeurs|score|confidence_index|journalist_count|    avgPrcpFebruary|avgTempFebruary|        avgPrcpMarch|avgTempMarch|        avgPrcpApril|avgTempApril|          avgPrcpMay|avgTempMay|         avgPrcpJune|avgTempJune|         avgPrcpJuly|avgTempJuly|       avgPrcpAugust|avgTempAugust|    avgPrcpSeptember|avgTempSeptember|      avgPrcpOctober|avgTempOctober|
+---+--------------------+-------+--------------------+-----+---------+-----

In [20]:
# White_Wine_dropna_df.count()

732

In [22]:
# Create user dataframe to match table
White_wine_table_df = White_Wine_df.select(['_c0',
 'wine',
 'wine_id',
 'appellation',
 'color',
 'regions',
 'country',
 'vintage',
 'is_primeurs',
 'score',
 'confidence_index',
 'journalist_count',
 'avgPrcpFebruary',
 'avgTempFebruary',
 'avgPrcpMarch',
 'avgTempMarch',
 'avgPrcpApril',
 'avgTempApril',
 'avgPrcpMay',
 'avgTempMay',
 'avgPrcpJune',
 'avgTempJune',
 'avgPrcpJuly',
 'avgTempJuly',
 'avgPrcpAugust',
 'avgTempAugust',
 'avgPrcpSeptember',
 'avgTempSeptember',
 'avgPrcpOctober',
 'avgTempOctober'])
White_wine_table_df.show()

+---+--------------------+-------+--------------------+-----+----------+-------+-------+-----------+-----+----------------+----------------+-------------------+---------------+--------------------+------------+--------------------+------------+--------------------+----------+--------------------+-----------+--------------------+-----------+--------------------+-------------+--------------------+----------------+--------------------+--------------+
|_c0|                wine|wine_id|         appellation|color|   regions|country|vintage|is_primeurs|score|confidence_index|journalist_count|    avgPrcpFebruary|avgTempFebruary|        avgPrcpMarch|avgTempMarch|        avgPrcpApril|avgTempApril|          avgPrcpMay|avgTempMay|         avgPrcpJune|avgTempJune|         avgPrcpJuly|avgTempJuly|       avgPrcpAugust|avgTempAugust|    avgPrcpSeptember|avgTempSeptember|      avgPrcpOctober|avgTempOctober|
+---+--------------------+-------+--------------------+-----+----------+-------+-------+--------

In [23]:
engine = create_engine('postgresql://postgres:postgres@database-1.cslpjur96f9r.us-east-2.rds.amazonaws.com:5432')
White_wine_table_df.toPandas().to_sql('white_table', engine)