# Final Project Milestone 1
# Part 2: PostgreSQL
## Daisy Pinaroc

In [16]:
HOST = "XXXX"
USER = "XXXX"
PW="XXXX"
DB="XXXX"

In [17]:
import psycopg

def connect():
    
    url = "dbname='{}' user='{}' host={} port=XXXX password={}".format(DB, USER, HOST, PW)
    connection = psycopg.connect(url)

    return connection

**1. Test connection**

In [18]:
import psycopg

connection = connect()

print(connection)

<psycopg.Connection [IDLE] (host=10.65.160.5 database=postgres) at 0x7fc74e31fac0>


**2. Create a schema in Postgres with a table by the name of reservations (and drop it if it already exists)**

In [19]:
# Create a schema called final_project
drop_schema = "DROP SCHEMA IF EXISTS final_project CASCADE "
create_schema = "CREATE SCHEMA IF NOT EXISTS final_project"
try:
    cursor = connection.cursor()
    cursor.execute(drop_schema)
    print("Dropped schema: final_project")
    
    cursor = connection.cursor()
    cursor.execute(create_schema)
    print("Created schema: final_project")
    
    connection.commit()
except Exception as e:
    print("Error occurred while creating final_project schema:", e)

Dropped schema: final_project
Created schema: final_project


In [20]:
# To ensure referential integrity, we get the shopper data generated in final-project-mysql.ipynb
# This data was hard-coded in final-project-mysql.ipynb
from sqlalchemy import create_engine
import pandas as pd
from gcsfs import GCSFileSystem

# GCS path to the CSV file
# gsutil
gcs_path = 'gs://XXXX'

# Using GCSFileSystem to open the file
fs = GCSFileSystem(project='XXXX')
with fs.open(gcs_path, 'rb') as file:
    # Use pandas to read the CSV file 
    shopper_data = pd.read_csv(file, low_memory=False)

# Transfer the data to Postgres database, into final_project schema
try:
    postgres_engine = create_engine('postgresql://XXXX')
    shopper_data.to_sql('shopper', postgres_engine, schema='final_project', if_exists='replace', index=False)
    print("Data synchronized/transferred successfully")
except Exception as e:
    print("Error occurred while synchronizing data:", e)

Data synchronized/transferred successfully


In [21]:
# This code block adds a unique constraint to cust_id, which ensures that all postgres.reservations.cust_id exist in mysql.shopper.cust_id
import psycopg

connection = connect()
print(connection)

try:
    cursor = connection.cursor()

    # SQL statement to add the constraint
    add_unique_constraint = '''
    ALTER TABLE final_project.shopper
    ADD CONSTRAINT unique_cust_id UNIQUE (cust_id);
    '''

    cursor.execute(add_unique_constraint)
    print('Added unique constraint to cust_id in the shopper table')
    connection.commit()  # commit the changes
except Exception as e:
    print("Error occurred while adding unique constraint to cust_id:", e)

<psycopg.Connection [IDLE] (host=10.65.160.5 database=postgres) at 0x7fc74e72e350>
Added unique constraint to cust_id in the shopper table


In [22]:
import psycopg

connection = connect()
print(connection)

drop_reservations = 'DROP TABLE IF EXISTS final_project.reservations'

create_reservations = '''CREATE TABLE final_project.reservations(
    res_id SERIAL PRIMARY KEY,
    cust_id SERIAL NOT NULL,
    prp_nm VARCHAR(100) NOT NULL,
    prp_ch VARCHAR(40) NOT NULL,
    adr_line_1 VARCHAR(41) NOT NULL,
    adr_line_2 VARCHAR(41),
    city VARCHAR(29) NOT NULL,
    state CHAR(2) NOT NULL,
    postal_cd CHAR(5) NOT NULL,
    lat NUMERIC(10, 6) NOT NULL,
    long NUMERIC(10, 6) NOT NULL,
    cnt_code CHAR(2) NOT NULL,
    arr_date DATE NOT NULL,
    dep_date DATE NOT NULL,
    pmt_amt NUMERIC(12, 2) NOT NULL,
    FOREIGN KEY (cust_id) REFERENCES final_project.shopper(cust_id)
)'''

print(drop_reservations)
print(create_reservations)

try:
    cursor = connection.cursor()
    cursor.execute(drop_reservations)
    print('Dropped reservations table')

    cursor = connection.cursor()
    cursor.execute(create_reservations)
    print('Created reservations table')

    connection.commit()  # commit the changes
except Exception as e:
    print("Error occurred while creating reservations table:", e)

<psycopg.Connection [IDLE] (host=10.65.160.5 database=postgres) at 0x7fc74e6d5c00>
DROP TABLE IF EXISTS final_project.reservations
CREATE TABLE final_project.reservations(
    res_id SERIAL PRIMARY KEY,
    cust_id SERIAL NOT NULL,
    prp_nm VARCHAR(100) NOT NULL,
    prp_ch VARCHAR(40) NOT NULL,
    adr_line_1 VARCHAR(41) NOT NULL,
    adr_line_2 VARCHAR(41),
    city VARCHAR(29) NOT NULL,
    state CHAR(2) NOT NULL,
    postal_cd CHAR(5) NOT NULL,
    lat NUMERIC(10, 6) NOT NULL,
    long NUMERIC(10, 6) NOT NULL,
    cnt_code CHAR(2) NOT NULL,
    arr_date DATE NOT NULL,
    dep_date DATE NOT NULL,
    pmt_amt NUMERIC(12, 2) NOT NULL,
    FOREIGN KEY (cust_id) REFERENCES final_project.shopper(cust_id)
)
Dropped reservations table
Created reservations table


**3. Insert records into the table**

In [23]:
# DELETE all records from final_project.reservations, if needed
'''
try:
    cursor = connection.cursor()

    # Delete all records from the reservations table
    delete_query = 'DELETE FROM final_project.reservations'
    cursor.execute(delete_query)

    connection.commit()   # commit the changes

    print("All records deleted from the reservations table.")
    
except Exception as e:
    print("Error occurred while deleting records:", e)
'''

All records deleted from the reservations table.


In [24]:
# Insert data into reservations table
from faker import Faker
from random import randrange
from datetime import timedelta

connection = connect()
print(connection)

fake = Faker()

sql = '''
INSERT INTO final_project.reservations (
    cust_id, prp_nm, prp_ch, adr_line_1, adr_line_2, city, state, postal_cd,
    lat, long, cnt_code, arr_date, dep_date, pmt_amt
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
'''

cursor = connection.cursor()

# Query existing cust_id values from the shopper table
cursor.execute("SELECT cust_id FROM final_project.shopper")
existing_cust_ids = [row[0] for row in cursor.fetchall()]

reservations_records = []
num_records = 0

# Create 100 records using Faker(), Insert them into the reservations table
for _ in range(100):
    cust_id = fake.random_element(existing_cust_ids)
    prp_nm = fake.company()
    prp_ch = fake.company_suffix()
    adr_line_1 = fake.street_address()
    adr_line_2 = fake.secondary_address()
    city = fake.city()
    state = fake.state_abbr()
    postal_cd = fake.zipcode()
    lat = fake.latitude()
    long = fake.longitude()
    cnt_code = fake.country_code(representation="alpha-2")
    arr_date = fake.date_between(start_date="-30d", end_date="+30d")
    dep_date = arr_date + timedelta(days=randrange(1, 15))  
    pmt_amt = round(fake.random.uniform(100, 10000), 2) 

    record = (
        cust_id, prp_nm, prp_ch, adr_line_1, adr_line_2, city, state, postal_cd,
        lat, long, cnt_code, arr_date, dep_date, pmt_amt
    )

    reservations_records.append(record)
    
    num_records += 1 
# Insert the records into the reservations table
try:
    cursor.executemany(sql, reservations_records)
    connection.commit()
    print(f"{num_records} records written into reservations table")
except Exception as e:
    print("Failed to insert records into reservations table:", e)

<psycopg.Connection [IDLE] (host=10.65.160.5 database=postgres) at 0x7fc74e37d600>
100 records written into reservations table


In [25]:
# Sanity check: Print records of reservations table
try:
    sql_select_query = '''
    SELECT * FROM final_project.reservations
    '''
    cursor.execute(sql_select_query)
    records = cursor.fetchall()
    for record in records:
        print(record)
except Exception as e:
    print("Error occurred while fetching records:", e)

(1, 13, 'Wilson, Cooper and Brewer', 'and Sons', '92579 Jamie Locks', 'Apt. 262', 'West Timothy', 'MO', '64095', Decimal('71.876170'), Decimal('-154.609189'), 'EG', datetime.date(2023, 11, 27), datetime.date(2023, 12, 3), Decimal('6243.60'))
(2, 20, 'Ramos Inc', 'Ltd', '32601 Murray Alley Suite 713', 'Apt. 345', 'West Derekborough', 'RI', '57910', Decimal('59.807628'), Decimal('61.185516'), 'BR', datetime.date(2023, 11, 21), datetime.date(2023, 11, 22), Decimal('2363.32'))
(3, 3, 'Miller-Smith', 'and Sons', '514 Walker Village Apt. 922', 'Apt. 975', 'Lozanostad', 'NE', '39982', Decimal('17.988926'), Decimal('-152.458124'), 'BH', datetime.date(2023, 12, 22), datetime.date(2023, 12, 25), Decimal('8309.81'))
(4, 47, 'Riley-Kline', 'Inc', '57968 Jonathan Neck Apt. 209', 'Suite 808', 'South Michelle', 'WY', '08308', Decimal('24.000295'), Decimal('53.266712'), 'AL', datetime.date(2023, 11, 12), datetime.date(2023, 11, 24), Decimal('481.70'))
(5, 34, 'Hill, Weaver and Baldwin', 'and Sons', '5

**Hard-coding the reservations data to later ensure referential integrity between the tables**

Making sure that:
* postgres.reservations.cust_id exist in mysql.shopper.cust_id
* postgres.reservations.cnt_code exist in bigquery.currency.cnt_code

In [26]:
# importing necessary libraries for hard-coding
import csv
from io import StringIO
from google.cloud import storage
import psycopg2

In [27]:
# Select data from reservations table

connection = connect()
print(connection)

try:
    cursor = connection.cursor()
    select_query = "SELECT * FROM final_project.reservations"
    cursor.execute(select_query)
    records = cursor.fetchall()

    # Get column names
    column_names = [desc[0] for desc in cursor.description]

    # Convert records to a list of dictionaries
    records_as_dicts = [dict(zip(column_names, record)) for record in records]
    
    print("Successfully fetched records")
    
    cursor.close()
except Exception as e:
    print("Error occurred while fetching records:", e)

<psycopg.Connection [IDLE] (host=10.65.160.5 database=postgres) at 0x7fc74e37cca0>
Successfully fetched records


In [28]:
# Write data to a CSV file in memory
csv_data = StringIO()
try:
    fieldnames = ['res_id', 'cust_id', 'prp_nm', 'prp_ch', 'adr_line_1', 'adr_line_2', 'city', 'state', 'postal_cd',
                  'lat', 'long', 'cnt_code', 'arr_date', 'dep_date', 'pmt_amt']
    writer = csv.DictWriter(csv_data, fieldnames=fieldnames)

    writer.writeheader()
    for record in records_as_dicts:
        writer.writerow(record)
    print("CSV file containing reservations data written successfully")
except Exception as e:
    print("Error occurred while writing to CSV file in memory:", e)

CSV file containing reservations data written successfully


In [29]:
# Upload CSV data to a Google Cloud Storage bucket
bucket_name = 'XXXX'
blob_name = 'reservations_data_postgresql.csv'
try:
    client = storage.Client()
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(blob_name)

    # Convert CSV data to bytes and upload it to the blob
    blob.upload_from_string(csv_data.getvalue().encode('utf-8'), content_type='text/csv')

    print(f'Data exported to gs://{bucket_name}/{blob_name}')
except Exception as e:
    print("Error occurred while uploading to Google Cloud Storage:", e)

Data exported to gs://cs327e-final-project/reservations_data_postgresql.csv


In [30]:
# Check if the csv file's correct - download to jupyter notebook instance
import subprocess

# Define Google Cloud bucket_name and file_path
bucket_name = 'XXXX'
file_path = 'reservations_data_postgresql.csv'

# Download the file using gsutil
# Official documentation here: https://cloud.google.com/storage/docs/gsutil_install
subprocess.run(['gsutil', 'cp', f'gs://{bucket_name}/{file_path}', 'reservations_data_postgresql.csv'])
print('File downloaded successfully.')

Copying gs://cs327e-final-project/reservations_data_postgresql.csv...
/ [1 files][ 13.7 KiB/ 13.7 KiB]                                                
Operation completed over 1 objects/13.7 KiB.                                     


File downloaded successfully.
