# Project 2, Part 4, Validate data in the staging tables using SQL

University of California, Berkeley

Master of Information and Data Science (MIDS) program

w205 - Fundamentals of Data Engineering

Student: Stephanie Cabanela

Year: 2022

Semester: Spring

Section: 7


# Included Modules and Packages

Code cell containing your includes for modules and packages

In [1]:
import math
import numpy as np
import pandas as pd
import psycopg2
import json
import csv
from datetime import datetime as dt

# Supporting code

Code cells containing any supporting code, such as connecting to the database, any functions, etc.  

Remember you can freely use any code from the labs. You do not need to cite code from the labs.

In [2]:
connection = psycopg2.connect(
    user = "postgres",
    password = "ucb",
    host = "postgres",
    port = "5432",
    database = "postgres"
)

cursor = connection.cursor()

def my_select_query_pandas(query, rollback_before_flag, rollback_after_flag):
    "function to run a select query and return rows in a pandas dataframe"
    
    if rollback_before_flag:
        connection.rollback()
    
    df = pd.read_sql_query(query, connection)
    
    if rollback_after_flag:
        connection.rollback()
    
    # fix the float columns that really should be integers
    
    for column in df:
    
        if df[column].dtype == "float64":

            fraction_flag = False

            for value in df[column].values:
                
                if not np.isnan(value):
                    if value - math.floor(value) != 0:
                        fraction_flag = True

            if not fraction_flag:
                df[column] = df[column].astype('Int64')
    
    return(df)

In [3]:
# drop existing staging tables
connection.rollback()

query = """

drop table if exists stage_1_peak_sales;
drop table if exists stage_1_peak_stores;
drop table if exists stage_1_peak_customers;
drop table if exists stage_1_peak_line_items;

"""

cursor.execute(query)

connection.commit()

In [4]:
# create staging tables
connection.rollback()

query = """


create table stage_1_peak_sales (
  stage_id serial,
  sale_id varchar(100),
  sale_date varchar(100),
  sub_total varchar(100),
  tax varchar(100),
  total_amount varchar(100)
);

create table stage_1_peak_stores (
  stage_id serial,
  sale_id varchar(100),
  location_id varchar(100),
  name varchar(100),
  street varchar(100),
  city varchar(100),
  state varchar(100),
  zip varchar(100)
);

create table stage_1_peak_customers (
  stage_id serial,
  sale_id varchar(100),
  customer_id varchar(100),
  first_name varchar(100),
  last_name varchar(100),
  street varchar(100),
  city varchar(100),
  state varchar(100),
  zip varchar(100)
);

create table stage_1_peak_line_items (
  stage_id serial,
  sale_id varchar(100),
  line_item_id varchar(100),
  product_id varchar(100),
  price varchar(100),
  quantity varchar(100),
  taxable varchar(100)
);

"""

cursor.execute(query)

connection.commit()

# 2.4.1 Validate the data types in the staging table stage_1_peak_sales

Generally, we do not expect any issues with data types.  Write a simple query that validates the numeric and date columns.

* sale_id - validate that is is numeric
* sales_date - validate that it is a date
* sub_total - validate that it is numeric
* tax - validate that it is numeric
* total_amount - validate that it is numeric

Hint: make use of the operators: 
* xxxx::numeric
* xxxx::date

Sort by stage_id

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [7]:
connection.rollback()

query = """

select sale_id::numeric,
       sale_date::date,
       sub_total::numeric, 
       tax::numeric,
       total_amount::numeric
from stage_1_peak_sales
order by stage_id

"""

cursor.execute(query)

connection.commit()

# 2.4.2 Validate the data types in the staging table stage_1_peak_stores

Generally, we do not expect any issues with data types.  Write a simple query that validates the numeric and date columns.

* sale_id - validate that it is numeric
* location_id - validate that it is numeric

Hint: make use of the operator xxxx::numeric

Sort by stage_id

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [9]:
connection.rollback()

query = """

select sale_id::numeric,
       location_id::numeric
from stage_1_peak_stores
order by stage_id

"""

cursor.execute(query)

connection.commit()

# 2.4.3 Validate the data types in the staging table stage_1_peak_customers

Generally, we do not expect any issues with data types.  Write a simple query that validates the numeric and date columns.

* sale_id - validate that it is numeric
* customer_id - validate that it is numeric

Hint: make use of the operator xxxx::numeric

Sort by stage_id

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [10]:
connection.rollback()

query = """

select sale_id::numeric,
       customer_id::numeric
from stage_1_peak_customers
order by stage_id

"""

cursor.execute(query)

connection.commit()

# 2.4.4 Validate the data types in the staging table stage_1_peak_line_items

Generally, we do not expect any issues with data types.  Write a simple query that validates the numeric and date columns.

* sale_id - validate that it is numeric
* line_item_id - validate that it is numeric
* product_id - validate that it is numeric
* price - validate that it is numeric
* quantity - validate that it is numeric

Hint: make use of the operator xxxx::numeric

Sort by stage_id

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [12]:
connection.rollback()

query = """

select sale_id::numeric,
       line_item_id::numeric,
       product_id::numeric,
       price::numeric,
       quantity::numeric
from stage_1_peak_line_items
order by stage_id

"""

cursor.execute(query)

connection.commit()

# 2.4.5 Validate the math on sub_total, tax, and total_amount in stage_1_peak_sales

Generally, we do not expect any issues with the math.  Write a simple query that validates the math:

total_amount = sub_total + tax

It's usually best to write a query that will return rows with errors.  In our case, the query should return nothing.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [15]:
connection.rollback()

query = """

select total_amount::numeric = sub_total::numeric + tax::numeric
from stage_1_peak_sales
order by stage_id

"""

cursor.execute(query)

connection.commit()

# 2.4.6 Validate the math between stage_1_peak_sales and stage_1_peak_line_items

Generally, we do not expect any issues with the math.  Write a simple query that validates the math:

total_sales in stage_1_sales matches the sum of (price x quantity) in stage_1_line_items

It's usually best to write a query that will return rows with errors.  In our case, the query should return nothing.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [33]:
connection.rollback()

query = """

select li.price::numeric * li.quantity::numeric
from stage_1_peak_line_items as li
    join stage_1_peak_sales as sa on li.sale_id = sa.sale_id
order by li.stage_id
limit 2

"""

cursor.execute(query)

connection.commit()

UndefinedFunction: operator does not exist: character varying * character varying
LINE 3: select li.price * li.quantity
                        ^
HINT:  No operator matches the given name and argument types. You might need to add explicit type casts.


# 2.4.7 Validate the tax is always zero in stage_1_peak_sales

It's usually best to write a query that will return rows with errors.  In our case, the query should return nothing.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [18]:
connection.rollback()

query = """

select tax::numeric = 0
from stage_1_peak_sales
order by stage_id

"""

cursor.execute(query)

connection.commit()

# 2.4.8 Validate the price is always 12 in stage_1_peak_line_items

It's usually best to write a query that will return rows with errors.  In our case, the query should return nothing.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [21]:
connection.rollback()

query = """

select price::numeric = 12
from stage_1_peak_line_items
order by stage_id

"""

cursor.execute(query)

connection.commit()

# 2.4.9 Validate taxable is always N in stage_1_peak_line_items

It's usually best to write a query that will return rows with errors.  In our case, the query should return nothing.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [25]:
connection.rollback()

query = """

select taxable = 'N'
from stage_1_peak_line_items
order by stage_id

"""

cursor.execute(query)

connection.commit()

# 2.4.10 Validate the store is the same for all in stage_1_peak_stores

It's usually best to write a query that will return rows with errors.  In our case, the query should return nothing.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id

Pattern your code after the examples in the labs.  You may use as many code cells as you need.

In [28]:
connection.rollback()

query = """

select location_id::numeric = '12573'
from stage_1_peak_stores
order by stage_id

"""

cursor.execute(query)

connection.commit()

# 2.4.11 Validate the product id in stage_1_peak_line_items against peak_product_mapping

It's usually best to write a query that will return rows with errors.  In our case, the query should return nothing.

Remember that with staging tables, we need to convert varchar to numeric using column::numeric before math will work.

Sort by stage_id

Pattern your code after the examples in the labs.  You may use as many code cells as you need.