<a href="https://colab.research.google.com/github/jennyschilling/DataProject1_DS2002/blob/main/ETL_Processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import os
import numpy
import pandas as pd
from sqlalchemy import create_engine

In [None]:
host_name = "localhost"
port = "3306"
user_id = "root"
pwd = "Passw0rd123"

src_dbname = "classicmodels"
dst_dbname = "retail_sales_dw"

In [None]:
def get_dataframe(user_id, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()

    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()

    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")

    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')

    connection.close()

In [None]:
conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

# connection.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
# connection.execute(f"CREATE DATABASE `{dst_dbname}`;")
connection.execute(f"USE {dst_dbname};")

connection.close()

In [None]:
sql_customers = "SELECT * FROM northwind.customers;"
df_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

In [None]:
sql_employees = "SELECT * FROM northwind.employees;"
df_employees = get_dataframe(user_id, pwd, host_name, src_dbname, sql_customers)
df_employees.head(2)

In [None]:
# 1. Create a List that enumerates the names of each column you wish to remove (drop) from the Pandas DataFrame
drop_cols = ['email_address','home_phone','mobile_phone','web_page','notes','attachments']
df_customers.drop(drop_cols, axis=1, inplace=True)

# 2. Rename the "id" column to reflect the entity as it will serve as the business key for lookup operations
df_customers.rename(columns={"id":"customer_id"}, inplace=True)

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))

# 4. Display the first 2 rows of the dataframe to validate your work
df_customers.head(2)

In [None]:
drop_cols = ['mobile_phone','notes','attachments']
df_employees.drop(drop_cols, axis=1, inplace=True)
df_employees.rename(columns={"id":"employee_id"}, inplace=True)
df_employees.insert(0, "employee_key", range(1, df_employees.shape[0]+1))
df_employees.head(2)

In [None]:
drop_cols = ['supplier_ids','description','attachments']
df_products.drop(drop_cols, axis=1, inplace=True)
df_products.rename(columns={"id":"product_id"}, inplace=True)
df_products.insert(0, "product_key", range(1, df_products.shape[0]+1))
df_products.head(2)

In [None]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_employees', df_employees, 'employee_key'),
          ('dim_products', df_products, 'product_key'),
          ('dim_shippers', df_shippers, 'shipper_key')]

In [None]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [None]:
sql_fact_orders = """
    SELECT o.id AS order_id,
        od.id AS order_detail_id,
        o.customer_id,
        o.employee_id,
        od.product_id,
        o.shipper_id,
        o.order_date,
        o.paid_date,
        o.shipped_date,
        o.payment_type,
        od.quantity,
        od.unit_price,
        od.discount,
        o.shipping_fee,
        o.taxes,
        o.tax_rate,
        os.status_name AS order_status,
        ods.status_name AS order_details_status
    FROM northwind.orders AS o
    INNER JOIN northwind.orders_status AS os
    ON o.status_id = os.id
    RIGHT OUTER JOIN northwind.order_details AS od
    ON o.id = od.order_id
    INNER JOIN northwind.order_details_status AS ods
    ON od.status_id = ods.id;
"""

df_fact_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_fact_orders)
df_fact_orders.head(2)

In [None]:
sql_orders = "SELECT * FROM northwind.orders;"
df_orders = get_dataframe(user_id, pwd, host_name, src_dbname, sql_orders)
df_orders.rename(columns={"id":"order_id"}, inplace=True)
df_orders.head(2)

In [None]:
sql_orders_status = "SELECT * FROM northwind.orders_status;"
df_orders_status = get_dataframe(user_id, pwd, host_name, src_dbname, sql_orders_status)
df_orders_status.rename(columns={"id":"status_id"}, inplace=True)
df_orders_status.head(2)

In [None]:
df_orders = pd.merge(df_orders, df_orders_status, on='status_id', how='inner')
df_orders.rename(columns={"status_name":"order_status"}, inplace=True)
df_orders.drop(['status_id'], axis=1, inplace=True)
df_orders.head(2)

In [None]:
df_order_details = pd.merge(df_order_details, df_order_details_status, on='status_id', how='inner')
df_order_details.rename(columns={"status_name":"order_details_status"}, inplace=True)
df_order_details.drop(['status_id'], axis=1, inplace=True)
df_order_details.head(2)

In [None]:
df_fact_orders = pd.merge(df_orders, df_order_details, on='order_id', how='right')
df_fact_orders.head(2)

In [None]:
df_fact_orders.shape

In [None]:
sql_dim_customers = "SELECT customer_key, customer_id FROM northwind_dw2.dim_customers;"
df_dim_customers = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_customers)
df_dim_customers.head(2)

In [None]:
sql_dim_employees = "SELECT employee_key, employee_id FROM northwind_dw2.dim_employees;"
df_dim_employees = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_employees)
df_dim_employees.head(2)

In [None]:
df_fact_orders = pd.merge(df_fact_orders, df_dim_customers, on='customer_id', how='inner')
df_fact_orders.drop(['customer_id'], axis=1, inplace=True)
df_fact_orders.head(2)

In [None]:
sql_dim_date = "SELECT date_key, full_date FROM northwind_dw.dim_date;"
df_dim_date = get_dataframe(user_id, pwd, host_name, src_dbname, sql_dim_date)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

In [None]:
# Lookup the Surrogate Primary Key (date_key) that Corresponds to the "order_date" Column.
df_dim_order_date = df_dim_date.rename(columns={"date_key" : "order_date_key", "full_date" : "order_date"})
df_fact_orders.order_date = df_fact_orders.order_date.astype('datetime64[ns]').dt.date

df_fact_orders = pd.merge(df_fact_orders, df_dim_order_date, on='order_date', how='left')
df_fact_orders.drop(['order_date'], axis=1, inplace=True)
df_fact_orders.head(2)

In [None]:
# 1. Drop the columns of no particular interest
drop_columns = ['notes', 'tax_status_id', 'purchase_order_id', 'inventory_id', 'date_allocated',
               'ship_name', 'ship_address', 'ship_city', 'ship_state_province', 'ship_zip_postal_code', 'ship_country_region']
df_fact_orders.drop(drop_columns, axis=1, inplace=True)

# 2. Reorder the remaining columns
ordered_columns = ['order_id', 'order_detail_id', 'customer_key', 'employee_key', 'product_key', 'shipper_key',
                  'order_date_key', 'paid_date_key', 'shipped_date_key', 'payment_type', 'quantity', 'unit_price', 'discount',
                  'shipping_fee', 'taxes', 'tax_rate', 'order_status', 'order_details_status']
df_fact_orders = df_fact_orders[ordered_columns]

# 3. Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_fact_orders.insert(0, "fact_order_key", range(1, df_fact_orders.shape[0]+1))
# 4. Display the first 2 rows of the dataframe to validate your work
df_fact_orders.head(2)

In [None]:
table_name = "fact_orders"
primary_key = "fact_order_key"
db_operation = "insert"

set_dataframe(user_id, pwd, host_name, dst_dbname, df_fact_orders, table_name, primary_key, db_operation)