# [Database options]

# ---------------------------------------------------------------------------------------------------------------

# Using AWS RDS, Spark, and Google Colab

## Set up Relational Database and connect to pgAdmin

In [None]:
# Create an AWS Relational Database 16.7.2
# Connecting AWS to pgAdmin 16.7.3

## CRUD with AWS - Changing database stored in AWS cloud

## [In pgAdmin]

### Create

In [None]:
# CREATE TABLE doctors (
#  id INT PRIMARY KEY NOT NULL,
#  speciality TEXT,
#  taking_patients BOOLEAN
# );
# CREATE TABLE patients (
#  id INT NOT NULL,
#  doctor_id INT NOT NULL,
#  health_status TEXT,
#  PRIMARY KEY (id, doctor_id),
#  FOREIGN KEY (doctor_id) REFERENCES doctors (id)
# );

# INSERT INTO doctors(id, speciality, taking_patients)
# VALUES
# (1, 'cardiology', TRUE),
# (2, 'orthopedics', FALSE),
# (3, 'pediatrics', TRUE);
# INSERT INTO patients (id, doctor_id, health_status)
# VALUES
# (1, 2, 'healthy'),
# (2, 3, 'sick'),
# (3, 2, 'sick'),
# (4, 1, 'healthy'),
# (5, 1, 'sick');

### Read

In [None]:
-- Read tables
SELECT * FROM doctors;
SELECT * FROM patients;

### Update

In [None]:
-- Update rows
UPDATE doctors
SET taking_patients = FALSE
WHERE id = 1;
UPDATE patients
SET health_status = 'healthy'
WHERE id = 1;

### Delete

In [None]:
-- Delete row
DELETE FROM patients
WHERE id = 1;

In [None]:
# load files into bucket
# 16.9.1 PySpark ETL Full example saved

 # PySpark ETL - using Google Colab and Spark

In [None]:
# Store data with AWS S3 16.8.2

## [In pgAdmin]
### Create tables 

In [None]:
# -- Create Active User Table in pdAdmin
# CREATE TABLE active_user (
#  id INT PRIMARY KEY NOT NULL,
#  first_name TEXT,
#  last_name TEXT,
#  username TEXT
# );

# CREATE TABLE billing_info (
#  billing_id INT PRIMARY KEY NOT NULL,
#  street_address TEXT,
#  state TEXT,
#  username TEXT
# );

# CREATE TABLE payment_info (
#  billing_id INT PRIMARY KEY NOT NULL,
#  cc_encrypted TEXT
# );

## [In Google Colab]
### PySpark - Imports, Drivers, Required Installs, environment variables, create spark session

In [None]:
import os
# Find the latest version of spark 3.0  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.0.3'
spark_version = 'spark-3.0.3'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop2.7.tgz
!tar xf $SPARK_VERSION-bin-hadoop2.7.tgz
!pip install -q findspark

# Set Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop2.7"

# Start a SparkSession
import findspark
findspark.init()

In [None]:
# Download postgres driver to allow Spark to interact with Posgres
!wget https://jdbc.postgresql.org/download/postgresql-42.2.16.jar

In [None]:
# Start a Spark seesion with an additional option that adds the driver to Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("CloudETL").config("spark.driver.extraClassPath","/content/postgresql-42.2.16.jar").getOrCreate()

### PySpark ETL - EXTRACT

In [None]:
# Read in data from S3 Buckets
from pyspark import SparkFiles
url ="https://kwporras-bucket.s3.amazonaws.com/user_data.csv"
spark.sparkContext.addFile(url)
user_data_df = spark.read.csv(SparkFiles.get("user_data.csv"), sep=",", header=True, inferSchema=True)

In [None]:
# Show DataFrame
user_data_df.show()

In [None]:
# Read in data from S3 buckets
url ="https://kwporras-bucket.s3.amazonaws.com/user_payment.csv"
spark.sparkContext.addFile(url)
user_payment_df = spark.read.csv(SparkFiles.get("user_payment.csv"), sep=",", header=True, inferSchema=True)

# Show DataFrame
user_payment_df.show()

### PySpark ETL - TRANSFORM

In [None]:
# Join the two DataFrame
joined_df= user_data_df.join(user_payment_df, on="username", how="inner")
joined_df.show()

In [None]:
# Drop null values
dropna_df = joined_df.dropna()
dropna_df.show()

In [None]:
# Load in a sql function to use columns
from pyspark.sql.functions import col

# Filter for only columns with active users
cleaned_df = dropna_df.filter(col("active_user") == True)
cleaned_df.show()


In [None]:
# Create user dataframe to match active_user table
clean_user_df = cleaned_df.select(["id", "first_name", "last_name", "username"])
clean_user_df.show()

In [None]:
# Create user dataframe to match billing_info table
clean_billing_df = cleaned_df.select(["billing_id", "street_address", "State", "username"])
clean_billing_df.show()

In [None]:
# Create user dataframe to match payment_info table
clean_payment_df = cleaned_df.select(["billing_id", "cc_encrypted"])
clean_payment_df.show()

### PySpark ETL - LOAD

In [None]:
# Store environmental variable
from getpass import getpass
password = getpass('Enter database password')
# Configure settings for RDS
mode = "append"
jdbc_url="jdbc:postgresql://dataviz.czshayekq14i.us-east-2.rds.amazonaws.com:5432/my_data_class_db"
config = {"user":"postgres",
          "password": password,
          "driver":"org.postgresql.Driver"}

In [None]:
# Write DataFrame to active_user table in RDS
clean_user_df.write.jdbc(url=jdbc_url, table='active_user', mode=mode, properties=config)

In [None]:
# Write dataframe to billing_info table in RDS
clean_billing_df.write.jdbc(url=jdbc_url, table='billing_info', mode=mode, properties=config)

In [None]:
# Write dataframe to payment_info table in RDS
clean_payment_df.write.jdbc(url=jdbc_url, table='payment_info', mode=mode, properties=config)

## [In pgAdmin]
### -- Query database to check successful upload

In [None]:
# SELECT * FROM active_user;
# SELECT * FROM billing_info;
# SELECT * FROM payment_info;

# ---------------------------------------------------------------------------------------------------------------

# Alternative - Increase user accessiblilty to Database by avoiding postgreSQL
### Use sqlalchemy and sqlite  

In [None]:
# Import Dependencies
import numpy as np
import pandas as pd
import datetime as dt

# Python SQL toolkit and Object Relational Mapper
import sqlalchemy
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import Session
from sqlalchemy import create_engine, func
from sqlalchemy import extract

In [None]:
# Set Up the Database engine for the Flask application to allow access to the SQLite database
engine = create_engine("sqlite:///hawaii.sqlite")

# reflect an existing database into a new model
Base = automap_base()
# reflect the tables
Base.prepare(engine, reflect=True)

# Save references to each table
Measurement = Base.classes.measurement
Station = Base.classes.station

In [None]:
# Create our session (link) from Python to the DB
session = Session(engine)

# ---------------------------------------------------------------------------------------------------------------

# Connecting Pandas and SQL 8.5.1

### Create a Database

In [None]:
# Make sure a database in create in SQL covered above

### Import Modules

In [None]:
from sqlalchemy import create_engine
from config import db_password
import psycopg2

### Create the Database Engine

In [None]:
# "postgresql://[user]:[password]@[location]:[port]/[database]"

In [None]:
# Best Practice - hide password in a config.py file and play config.py in .gitignore file
db_password = 'YOUR_PASSWORD_HERE'

In [None]:
# Create local server connection string
db_string = f"postgresql://postgres:{db_password}{db_password}@housing-prices.ctpruadwlamv.us-east-2.rds.amazonaws.com:5432/housing-prices"


In [None]:
# Create the database engine
engine = create_engine(db_string)

# ---------------------------------------------------------------------------------------------------------------