# Project 2, Part 3, Create and load staging tables

University of California, Berkeley

Master of Information and Data Science (MIDS) program

w205 - Fundamentals of Data Engineering

Student: Jack Galvin

Year: 2022

Semester: Spring

Section: 9


# Included Modules and Packages

Code cell containing your includes for modules and packages

In [1]:
import pandas as pd
import numpy as np
import math
import psycopg2

# Supporting code

Code cells containing any supporting code, such as connecting to the database, any functions, etc.  

Remember you can freely use any code from the labs. You do not need to cite code from the labs.

In [2]:
connection = psycopg2.connect(
    user = "postgres",
    password = "ucb",
    host = "postgres",
    port = "5432",
    database = "postgres"
)

In [3]:
cursor = connection.cursor()

In [16]:
# Function to run a select query and return rows in a pandas dataframe
# Pandas puts all numeric values from postgres to float
# If it will fit in an integer, change it to integer


def my_select_query_pandas(query, rollback_before_flag, rollback_after_flag):
    "function to run a select query and return rows in a pandas dataframe"
    
    if rollback_before_flag:
        connection.rollback()
    
    df = pd.read_sql_query(query, connection)
    
    if rollback_after_flag:
        connection.rollback()
    
    # fix the float columns that really should be integers
    
    for column in df:
    
        if df[column].dtype == "float64":

            fraction_flag = False

            for value in df[column].values:
                
                if not np.isnan(value):
                    if value - math.floor(value) != 0:
                        fraction_flag = True

            if not fraction_flag:
                df[column] = df[column].astype('Int64')
    
    return(df)

# 2.3.1 Drop the staging table stage_1_peak_sales if it exists

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [4]:
# Drop the stage_1_peak_sales table if it exists

connection.rollback()

query = """

drop table if exists stage_1_peak_sales

"""

cursor.execute(query)

connection.commit()

# 2.3.2 Drop the staging table stage_1_peak_stores if it exists

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [5]:
# Drop the stage_1_peak_stores table if it exists

connection.rollback()

query = """

drop table if exists stage_1_peak_stores

"""

cursor.execute(query)

connection.commit()

# 2.3.3 Drop the staging table stage_1_peak_customers if it exists

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [6]:
# Drop the stage_1_peak_customers table if it exists

connection.rollback()

query = """

drop table if exists stage_1_peak_customers

"""

cursor.execute(query)

connection.commit()

# 2.3.4 Drop the staging table stage_1_peak_line_items if it exists

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [7]:
# Drop the stage_1_peak_line_items table if it exists

connection.rollback()

query = """

drop table if exists stage_1_peak_line_items

"""

cursor.execute(query)

connection.commit()

# 2.3.5 Create the staging table stage_1_peak_sales

Use the same technique we used in the labs.

The first column should be stage_id as serial.

For each field in the CSV file, create a column as varchar(100).

Remember we do not set a primary key in staging tables.

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [8]:
# Create stage_1_peak_sales table with all varchar(100)

connection.rollback()

query = """


create table stage_1_peak_sales (
  stage_id serial,
  sale_id varchar(100),
  sale_date varchar(100),
  sub_total varchar(100),
  tax varchar(100),
  total_amount varchar(100)
);


"""

cursor.execute(query)

connection.commit()

# 2.3.6 Create the staging table stage_1_peak_stores

Use the same technique we used in the labs.

The first column should be stage_id as serial.

For each field in the CSV file, create a column as varchar(100).

Remember we do not set a primary key in staging tables.

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [9]:
# Create stage_1_peak_stores table with all varchar(100)

connection.rollback()

query = """


create table stage_1_peak_stores (
  stage_id serial,
  sale_id varchar(100),
  location_id varchar(100),
  name varchar(100),
  street varchar(100),
  city varchar(100),
  state varchar(100),
  zip varchar(100)
);


"""

cursor.execute(query)

connection.commit()

# 2.3.7 Create the staging table stage_1_peak_customers

Use the same technique we used in the labs.

The first column should be stage_id as serial.

For each field in the CSV file, create a column as varchar(100).

Remember we do not set a primary key in staging tables.

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [10]:
# Create stage_1_peak_customers table with all varchar(100)

connection.rollback()

query = """


create table stage_1_peak_customers (
  stage_id serial,
  sale_id varchar(100),
  customer_id varchar(100),
  first_name varchar(100),
  last_name varchar(100),
  street varchar(100),
  city varchar(100),
  state varchar(100),
  zip varchar(100)
);


"""

cursor.execute(query)

connection.commit()

# 2.3.8 Create the staging table stage_1_peak_line_items

Use the same technique we used in the labs.

The first column should be stage_id as serial.

For each field in the CSV file, create a column as varchar(100).

Remember we do not set a primary key in staging tables.

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [11]:
# Create stage_1_peak_line_items table with all varchar(100)

connection.rollback()

query = """


create table stage_1_peak_line_items (
  stage_id serial,
  sale_id varchar(100),
  line_item_id varchar(100),
  product_id varchar(100),
  price varchar(100),
  quantity varchar(100),
  taxable varchar(100)
);


"""

cursor.execute(query)

connection.commit()

# 2.3.9 Load the CSV file peak_sales.csv into the table stage_1_peak_sales

Use the same technique we used in the labs.

Remember the stage_id column is a serial and will auto populate, so we need to leave it off the column list for the load.

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [12]:
# Load peak_sales.csv into stage_1_peak_sales

connection.rollback()

query = """

copy stage_1_peak_sales (sale_id, sale_date, sub_total, tax, total_amount)
from '/user/projects/ucb_mids_w205_project_2/peak_sales.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

# 2.3.10 Load the CSV file peak_stores.csv into the table stage_1_peak_stores

Use the same technique we used in the labs.

Remember the stage_id column is a serial and will auto populate, so we need to leave it off the column list for the load.

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [13]:
# Load peak_stores.csv into stage_1_peak_stores

connection.rollback()

query = """

copy stage_1_peak_stores (sale_id, location_id, name, street, city, state, zip)
from '/user/projects/ucb_mids_w205_project_2/peak_stores.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

# 2.3.11 Load the CSV file peak_customers.csv into the table stage_1_peak_customers

Use the same technique we used in the labs.

Remember the stage_id column is a serial and will auto populate, so we need to leave it off the column list for the load.

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [14]:
# Load peak_customers.csv into stage_1_peak_customers

connection.rollback()

query = """

copy stage_1_peak_customers (sale_id, customer_id, first_name, last_name, street, city, state, zip)
from '/user/projects/ucb_mids_w205_project_2/peak_customers.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

# 2.3.12 Load the CSV file peak_line_items.csv into the table stage_1_peak_line_items

Use the same technique we used in the labs.

Remember the stage_id column is a serial and will auto populate, so we need to leave it off the column list for the load.

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [15]:
# Load peak_line_items.csv into stage_1_peak_line_items

connection.rollback()

query = """

copy stage_1_peak_line_items (sale_id, line_item_id, product_id, price, quantity, taxable)
from '/user/projects/ucb_mids_w205_project_2/peak_line_items.csv' delimiter ',' NULL '' csv header;

"""

cursor.execute(query)

connection.commit()

# 2.3.13 Verify the load of stage_1_peak_sales by querying the entire table

Query the entire table into a Pandas dataframe.  

Remember to sort by stage_id so the rows are in the same order as the CSV file to make comparison easier.

Inspect and compare the first few rows and the last few rows to the CSV file to ensure they loaded correctly.  

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [17]:
# Verify the load of stage_1_peak_sales

rollback_before_flag = True
rollback_after_flag = True

query = """

select * 
from stage_1_peak_sales
order by stage_id;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,sale_date,sub_total,tax,total_amount
0,1,5763728874,2020-10-03,12,0,12
1,2,5763729036,2020-10-03,72,0,72
2,3,5763728904,2020-10-03,24,0,24
3,4,5763728973,2020-10-03,96,0,96
4,5,5763728757,2020-10-03,108,0,108
...,...,...,...,...,...,...
92,93,5763728927,2020-10-03,72,0,72
93,94,5763729096,2020-10-03,48,0,48
94,95,5763729268,2020-10-03,84,0,84
95,96,5763729237,2020-10-03,60,0,60


# 2.3.14 Verify the load of stage_1_peak_stores by querying the entire table

Query the entire table into a Pandas dataframe.  

Remember to sort by stage_id so the rows are in the same order as the CSV file to make comparison easier.

Inspect and compare the first few rows and the last few rows to the CSV file to ensure they loaded correctly.  

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [18]:
# Verify the load of stage_1_peak_stores

rollback_before_flag = True
rollback_after_flag = True

query = """

select * 
from stage_1_peak_stores
order by stage_id;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,location_id,name,street,city,state,zip
0,1,5763728874,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
1,2,5763729036,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
2,3,5763728904,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
3,4,5763728973,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
4,5,5763728757,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
...,...,...,...,...,...,...,...,...
92,93,5763728927,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
93,94,5763729096,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
94,95,5763729268,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705
95,96,5763729237,12573,Acme Gourmet Meals,3000 Telegraph Ave,Berkeley,CA,94705


# 2.3.15 Verify the load of stage_1_peak_customers by querying the entire table

Query the entire table into a Pandas dataframe.  

Remember to sort by stage_id so the rows are in the same order as the CSV file to make comparison easier.

Inspect and compare the first few rows and the last few rows to the CSV file to ensure they loaded correctly.  

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [19]:
# Verify the load of stage_1_peak_customers

rollback_before_flag = True
rollback_after_flag = True

query = """

select * 
from stage_1_peak_customers
order by stage_id;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,customer_id,first_name,last_name,street,city,state,zip
0,1,5763728874,3728404,Darrelle,Dohrmann,46 Farwell Terrace,Oakland,CA,94609
1,2,5763729036,3729309,Moria,Greenwood,8803 Delaware Crossing,Berkeley,CA,94705
2,3,5763728904,3728508,Josiah,Hulett,6755 Melby Plaza,Oakland,CA,94612
3,4,5763728973,3728534,Gayle,MacGarrity,286 Onsgard Center,Berkeley,CA,94703
4,5,5763728757,3729188,Courtenay,Shirrell,75 West Park,Emeryville,CA,94608
...,...,...,...,...,...,...,...,...,...
92,93,5763728927,3728568,Valina,Spring,119 Sachtjen Junction,Berkeley,CA,94702
93,94,5763729096,3728990,Claire,Mebes,358 Oxford Road,Albany,CA,94706
94,95,5763729268,3728901,Freddy,Mumford,6 Transport Lane,Orinda,CA,94563
95,96,5763729237,3729019,Arv,Doret,2120 Mesta Circle,Emeryville,CA,94608


# 2.3.16 Verify the load of stage_1_peak_line_items by querying the entire table

Query the entire table into a Pandas dataframe.  

Remember to sort by stage_id so the rows are in the same order as the CSV file to make comparison easier.

Inspect and compare the first few rows and the last few rows to the CSV file to ensure they loaded correctly.  

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [20]:
# Verify the load of stage_1_peak_line_items

rollback_before_flag = True
rollback_after_flag = True

query = """

select * 
from stage_1_peak_line_items
order by stage_id;

"""

my_select_query_pandas(query, rollback_before_flag, rollback_after_flag)

Unnamed: 0,stage_id,sale_id,line_item_id,product_id,price,quantity,taxable
0,1,5763728874,1,42314780,12,1,N
1,2,5763729036,1,42314677,12,1,N
2,3,5763729036,2,42314782,12,3,N
3,4,5763729036,3,42314784,12,2,N
4,5,5763728904,1,42314780,12,1,N
...,...,...,...,...,...,...,...
347,348,5763729237,2,42314678,12,2,N
348,349,5763729237,3,42314782,12,2,N
349,350,5763728673,1,42314677,12,2,N
350,351,5763728673,2,42314678,12,1,N
