<a href="https://colab.research.google.com/github/kyliekwann/FinalProject/blob/hankai26/AnalyzeUP_connect_db.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## **Setup Spark**

In [1]:
import os
# Find the latest version of spark 3.0 from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
spark_version = 'spark-3.0.3'
# spark_version = 'spark-3.2.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.39)] [Connecting to security.ub                                                                               Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com] [Connecting to ppa0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Connecting to security.ubu                                                                               Hit:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
0% [1 InRelease gpgv 3,626 B] [Wait

In [2]:
# Download the Postgres driver that will allow Spark to interact with Postgres.
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

--2022-06-19 00:33:42--  https://jdbc.postgresql.org/download/postgresql-42.2.16.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1002883 (979K) [application/java-archive]
Saving to: ‘postgresql-42.2.16.jar.1’


2022-06-19 00:33:43 (5.99 MB/s) - ‘postgresql-42.2.16.jar.1’ saved [1002883/1002883]



In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("M16-Amazon-Challenge").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

## **Load Data from S3 into Spark DataFrame**
- Connect to data storage, then extract that data into a DataFrame. We have the mock data (sample_url) in S3 bucket.

- The dataset (df) mimics our expected final database structure.

In [4]:
sample_url = "https://analyzeup-hz.s3.us-west-1.amazonaws.com/Charity_Navigator_US_States_and_Territories.csv"
file_name = "Charity_Navigator_US_States_and_Territories.csv"

In [6]:
from pyspark import SparkFiles

spark.sparkContext.addFile(sample_url)
df = spark.read.csv(SparkFiles.get(file_name), sep=',', header=True, inferSchema=True)
df.show(5)

+--------------------+--------------------+--------------------+--------------------+--------------------+---------+-----+--------------------+-----------------------+------------------------------+---------------------------------+---------------------------------+---------------------+------------------------------------------+-------------------------------------+--------------------------------------+------------------------------------------+-------------------------------------------+----------------------+--------------------+-----------------------------+--------------------+--------------+--------------------+--------------------------+--------------------+--------------------------------+-----------------------------+------------------------------+---------------------------+-------------------------------+-----------------------------+------------------------------------+---------------------------------+----------------------------+-----------------------------+------------

In [7]:
# Load in a sql function to use columns
from pyspark.sql.functions import col
df.columns

# df.printSchema()

['charityNavigatorURL',
 'mission',
 'websiteURL',
 'tagLine',
 'charityName',
 'ein',
 'orgID',
 'currentRating__score',
 'currentRating__ratingID',
 'currentRating__publicationDate',
 'currentRating__ratingImage__small',
 'currentRating__ratingImage__large',
 'currentRating__rating',
 'currentRating___rapid_links__related__href',
 'currentRating__financialRating__score',
 'currentRating__financialRating__rating',
 'currentRating__accountabilityRating__score',
 'currentRating__accountabilityRating__rating',
 'category__categoryName',
 'category__categoryID',
 'category__charityNavigatorURL',
 'category__image',
 'cause__causeID',
 'cause__causeName',
 'cause__charityNavigatorURL',
 'cause__image',
 'irsClassification__deductibility',
 'irsClassification__subsection',
 'irsClassification__assetAmount',
 'irsClassification__nteeType',
 'irsClassification__incomeAmount',
 'irsClassification__nteeSuffix',
 'irsClassification__filingRequirement',
 'irsClassification__classification',
 'irs

## **Connect to the AWS RDS instance and write each DataFrame to its table.**
---



In [8]:
# Store environmental variable
from getpass import getpass
password = getpass('Enter database password (DB instance)')
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://database-analyzeup.c9mmdejuhxq9.us-west-1.rds.amazonaws.com:5432/analyzeup_project"
config = {"user":"postgres",
          "password": password,
          "driver":"org.postgresql.Driver"}

Enter database password (DB instance)··········


# ** Model is connected here to READ from and WRITE into tables from our dataset.
### ** Schema is created with 8 tables. We will keep editing and adding more helpful tables for the model training.



In [32]:
# Test to write table into DB
df.write.jdbc(url=jdbc_url, table='complete_table', mode=mode, properties=config)
df.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+---------+-----+--------------------+-----------------------+------------------------------+---------------------------------+---------------------------------+---------------------+------------------------------------------+-------------------------------------+--------------------------------------+------------------------------------------+-------------------------------------------+----------------------+--------------------+-----------------------------+--------------------+--------------+--------------------+--------------------------+--------------------+--------------------------------+-----------------------------+------------------------------+---------------------------+-------------------------------+-----------------------------+------------------------------------+---------------------------------+----------------------------+-----------------------------+------------

In [9]:
# Test to read table from DB
read_df = spark.read.jdbc(url=jdbc_url, table='complete_table', properties= config) 
read_df.limit(10).show()

+--------------------+--------------------+--------------------+--------------------+--------------------+---------+-----+--------------------+-----------------------+------------------------------+---------------------------------+---------------------------------+---------------------+------------------------------------------+-------------------------------------+--------------------------------------+------------------------------------------+-------------------------------------------+----------------------+--------------------+-----------------------------+--------------------+--------------+--------------------+--------------------------+--------------------+--------------------------------+-----------------------------+------------------------------+---------------------------+-------------------------------+-----------------------------+------------------------------------+---------------------------------+----------------------------+-----------------------------+------------

In [10]:
ein_df = spark.read.jdbc(url=jdbc_url, table='ein_table', properties= config) 
name_df = spark.read.jdbc(url=jdbc_url, table='name_table', properties= config) 
income_df = spark.read.jdbc(url=jdbc_url, table='income_table', properties= config) 
publish_date_df = spark.read.jdbc(url=jdbc_url, table='publish_date', properties= config) 
rating_df = spark.read.jdbc(url=jdbc_url, table='rating_table', properties= config) 
state_df = spark.read.jdbc(url=jdbc_url, table='state_table', properties= config) 


In [11]:
print(ein_df.show(5), name_df.show(5))

+-----+---+
|orgid|ein|
+-----+---+
+-----+---+

+-----+------------------------+
|orgid|organization_charityname|
+-----+------------------------+
+-----+------------------------+

None None


# ** Data Cleaning **

In [28]:
print(f"*************The original dataset contains {df.count()} rows*************************")

*************The original dataset contains 3463 rows*************************


In [12]:
 # Select insterested columns
 select_columns = ["orgID", 
                   "currentRating__financialRating__rating", 
                   "currentRating__accountabilityRating__rating", 
                   "irsClassification__assetAmount"]

In [26]:
selected_df =  df[select_columns]
selected_df.show(10)

3463
+-----+--------------------------------------+-------------------------------------------+------------------------------+
|orgID|currentRating__financialRating__rating|currentRating__accountabilityRating__rating|irsClassification__assetAmount|
+-----+--------------------------------------+-------------------------------------------+------------------------------+
|15177|                                     2|                                          2|                       1325910|
|16270|                                     4|                                          3|                       3032238|
|15711|                                     1|                                          3|                       1265941|
|10038|                                     4|                                          3|                      57645774|
|18112|                                     4|                                          4|                      38106163|
|13761|            

In [24]:
# cleaned_df.write.csv("cleaned_table.csv")

In [14]:
selected_df.dtypes

[('orgID', 'string'),
 ('currentRating__financialRating__rating', 'string'),
 ('currentRating__accountabilityRating__rating', 'string'),
 ('irsClassification__assetAmount', 'string')]

In [93]:
# cleaned_df.write.jdbc(url=jdbc_url, table='cleaned_table', mode=mode, properties=config)
# cleaned_df.show(3)


+-----+--------------------------------------+-------------------------------------------+------------------------------+
|orgID|currentRating__financialRating__rating|currentRating__accountabilityRating__rating|irsClassification__assetAmount|
+-----+--------------------------------------+-------------------------------------------+------------------------------+
|15177|                                     2|                                          2|                       1325910|
|16270|                                     4|                                          3|                       3032238|
|15711|                                     1|                                          3|                       1265941|
+-----+--------------------------------------+-------------------------------------------+------------------------------+
only showing top 3 rows



In [98]:
# # Convert column orgID into integer
# selected_df_convertINT = selected_df.withColumn("orgID",col("orgID").cast("integer"))
# print(selected_df_convertINT.count())
# selected_df_convertINT.dtypes

3463


[('orgID', 'int'),
 ('currentRating__financialRating__rating', 'string'),
 ('currentRating__accountabilityRating__rating', 'string'),
 ('irsClassification__assetAmount', 'string')]

In [15]:
# Clean data keeping only the rows with "orgID" valid
cleaned_df = selected_df.filter(selected_df["orgID"].cast("int").isNotNull())
print(cleaned_df.count())
cleaned_df.show(10)

3354
+-----+--------------------------------------+-------------------------------------------+------------------------------+
|orgID|currentRating__financialRating__rating|currentRating__accountabilityRating__rating|irsClassification__assetAmount|
+-----+--------------------------------------+-------------------------------------------+------------------------------+
|15177|                                     2|                                          2|                       1325910|
|16270|                                     4|                                          3|                       3032238|
|15711|                                     1|                                          3|                       1265941|
|10038|                                     4|                                          3|                      57645774|
|18112|                                     4|                                          4|                      38106163|
|13761|            

In [103]:
# Write clean_table into DB
cleaned_df.write.jdbc(url=jdbc_url, table='cleaned_table', mode=mode, properties=config)
cleaned_df.show()

+-----+--------------------------------------+-------------------------------------------+------------------------------+
|orgID|currentRating__financialRating__rating|currentRating__accountabilityRating__rating|irsClassification__assetAmount|
+-----+--------------------------------------+-------------------------------------------+------------------------------+
|15177|                                     2|                                          2|                       1325910|
|16270|                                     4|                                          3|                       3032238|
|15711|                                     1|                                          3|                       1265941|
|10038|                                     4|                                          3|                      57645774|
|18112|                                     4|                                          4|                      38106163|
|13761|                 

In [29]:
print(f"*************The cleaned dataset contains {cleaned_df.count()} rows*************************")

*************The cleaned dataset contains 3354 rows*************************


# **Read table**

### 1. Read cleaned_table directly from DB

In [16]:
cleaned_df = spark.read.jdbc(url=jdbc_url, table='cleaned_table', properties= config) 
print(cleaned_df.count())
cleaned_df.show()

3354
+-----+--------------------------------------+-------------------------------------------+------------------------------+
|orgID|currentRating__financialRating__rating|currentRating__accountabilityRating__rating|irsClassification__assetAmount|
+-----+--------------------------------------+-------------------------------------------+------------------------------+
| 7602|                                     3|                                          4|                      60251950|
| 4281|                                     2|                                          3|                      56401419|
|14738|                                     3|                                          4|                       2616186|
| 6604|                                     4|                                          4|                       7563723|
| 6412|                                     3|                                          4|                      21520375|
| 7607|            


### 2. Read cleaned_table from S3 in Spark

In [17]:
from pyspark import SparkFiles

spark.sparkContext.addFile("https://analyzeup-hz.s3.us-west-1.amazonaws.com/cleaned_table.csv")
cleaned_df = spark.read.csv(SparkFiles.get("cleaned_table.csv"), sep=',', header=True, inferSchema=True)

cleaned_df.count()

3354

### 3. Read cleaned_table from S3 using Pandas

In [22]:
import pandas as pd
import io
import requests
clean_table_url="https://analyzeup-hz.s3.us-west-1.amazonaws.com/cleaned_table.csv"
s = requests.get(clean_table_url).content
clean_table_df = pd.read_csv(io.StringIO(s.decode('utf-8'))) 
# (, on_bad_lines='skip)
print(clean_table_df.count())
clean_table_df.head(5)

orgID                                          3354
currentRating__financialRating__rating         3354
currentRating__accountabilityRating__rating    3354
irsClassification__assetAmount                 3336
dtype: int64


Unnamed: 0,orgID,currentRating__financialRating__rating,currentRating__accountabilityRating__rating,irsClassification__assetAmount
0,7602,3.0,4,60251950
1,4281,2.0,3,56401419
2,14738,3.0,4,2616186
3,6604,4.0,4,7563723
4,6412,3.0,4,21520375
