# DS 2002 Midterm Project ETL Processor

By David Bae (mff3eq)

The source is from a Kaggle dataset that contains CSVs on purchase histories of underwear.

## Import Necessary Libraries

In [96]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

## Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working

In [97]:
host_name = "localhost"
port = "3306"
uid = "root"
pwd = "Passw0rd123"

src_dbname = "under_db"
dst_dbname : "under_db2"


# The 'cluster_location' must either be "atlas" or "local".
#Used local instance of mongodb to test/work on code to avoid complications with setting up account and finding connection details
mongodb_args = {
    "user_name" : "",
    "password" : "",
    "cluster_name" : "",
    "cluster_subnet" : "",
    "cluster_location" : 'local',
    "db_name" : "under_mdb"
}

## Define Functions for Getting Data From and Setting Data into Databases

In [98]:
def get_dataframe(uid, pwd, host_name, db_name, sql_query):
    conn_str = f"mysql+pymysql://{uid}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(sql_query, connection);
    connection.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

# Dimension Table Operations

### Extract Data from Source Databases

In [99]:
#extract data from dim_customers table created in mysql workbench
sql_customers = "SELECT * FROM under_db.dim_customers;"
df_customers = get_dataframe(uid, pwd, host_name, src_dbname, sql_customers)
df_customers.head(2)

Unnamed: 0,CustomerID,CustomerName,Region,Country,PriceCategory,CustomerClass,LeadSource,Discontinued
0,1,C1,Moscow,Russian Federation,1,Large-Scale Wholesaler-1,Referral by the Central Office,b'\x00'
1,2,C2,Moscow,Russian Federation,1,Large-Scale Wholesaler-1,Referral by the Central Office,b'\x00'


In [100]:
#extract data from dim_products table created in mysql workbench
sql_products = "SELECT * FROM under_db.dim_products;"
df_products = get_dataframe(uid, pwd, host_name, src_dbname, sql_products)
df_products.head(2)

Unnamed: 0,ProductID,ProductName,Color,ModelDescription,FabricDescription,Category,Gender,ProductLine,Weight,Size,PackSize,Status,InventoryDate,PurchasePrice
0,1,3-182,,AT,182,Undershirts,Girls' Undershirts,Underwear,822,3,Dozen,In Production,7/10/2003,6.6
1,2,3-183,,AT,183,Undershirts,Girls' Undershirts,Underwear,620,3,Dozen,Out of Production,7/10/2003,5.6


### Perform Any Necessary Transformations

In [101]:
#Dropped columns based on input values in source database, such as removing columns with None or NULL values
drop_cols = ['PriceCategory', 'LeadSource', 'Discontinued']
df_customers.drop(drop_cols, axis=1, inplace=True)

df_customers.head(2)

Unnamed: 0,CustomerID,CustomerName,Region,Country,CustomerClass
0,1,C1,Moscow,Russian Federation,Large-Scale Wholesaler-1
1,2,C2,Moscow,Russian Federation,Large-Scale Wholesaler-1


In [102]:
#Dropped columns based on input values in source database, such as removing columns with None or NULL values
drop_cols = ['PackSize', 'Status', 'Category', 'Color']
df_products.drop(drop_cols, axis=1, inplace=True)

df_products.head(2)

Unnamed: 0,ProductID,ProductName,ModelDescription,FabricDescription,Gender,ProductLine,Weight,Size,InventoryDate,PurchasePrice
0,1,3-182,AT,182,Girls' Undershirts,Underwear,822,3,7/10/2003,6.6
1,2,3-183,AT,183,Girls' Undershirts,Underwear,620,3,7/10/2003,5.6


# Fact Orders Tables Operations

### Extract Data from Source Databases

In [103]:
#extract data from orders table created in mysql workbench
sql_orders = "SELECT * FROM under_db.orders;"
df_orders = get_dataframe(uid, pwd, host_name, src_dbname, sql_orders)

df_orders.head(2)

Unnamed: 0,OrderID,CustomerID,EmployeeID,ShippingMethodID,OrderDate,ShipDate,FreightCharge
0,2,1,1,1.0,7/10/2003,7/10/2003,0.0
1,4,2,2,1.0,7/11/2003,7/11/2003,0.0


In [104]:
#extract data from order_details table created in mysql workbench
sql_order_deets = "SELECT * FROM under_db.order_details;"
df_order_deets = get_dataframe(uid, pwd, host_name, src_dbname, sql_order_deets)

df_order_deets.head(2)

Unnamed: 0,OrderDetailID,OrderID,ProductID,QuantitySold,UnitSalesPrice
0,2,2,955,5,7.5
1,3,2,958,5,8.5


### Creating Fact Orders Table

Create a fact orders table based on the shared column OrderID by merging the two tables.

In [105]:
df_fact_orders = pd.merge(df_orders, df_order_deets, on = 'OrderID', how = 'right')
df_fact_orders.head(2)

Unnamed: 0,OrderID,CustomerID,EmployeeID,ShippingMethodID,OrderDate,ShipDate,FreightCharge,OrderDetailID,ProductID,QuantitySold,UnitSalesPrice
0,2,1,1,1.0,7/10/2003,7/10/2003,0.0,2,955,5,7.5
1,2,1,1,1.0,7/10/2003,7/10/2003,0.0,3,958,5,8.5


### Modifying Fact Orders Table

In [106]:
# 1. Modify 'df_fact_orders' by merging it with 'df_customers' on the 'CustomerID' column
# 2. Drop the 'CustomerID' column
# 3. Display the first 2 rows of the dataframe to validate your work
df_fact_orders = pd.merge(df_fact_orders, df_customers, on='CustomerID', how='inner')
df_fact_orders.drop(['CustomerID'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,OrderID,EmployeeID,ShippingMethodID,OrderDate,ShipDate,FreightCharge,OrderDetailID,ProductID,QuantitySold,UnitSalesPrice,CustomerName,Region,Country,CustomerClass
0,2,1,1.0,7/10/2003,7/10/2003,0.0,2,955,5,7.5,C1,Moscow,Russian Federation,Large-Scale Wholesaler-1
1,2,1,1.0,7/10/2003,7/10/2003,0.0,3,958,5,8.5,C1,Moscow,Russian Federation,Large-Scale Wholesaler-1


In [107]:
# Repeat for the Product dimension
df_fact_orders = pd.merge(df_fact_orders, df_products, on='ProductID', how='inner')
df_fact_orders.drop(['ProductID'], axis=1, inplace=True)
df_fact_orders.head(2)

Unnamed: 0,OrderID,EmployeeID,ShippingMethodID,OrderDate,ShipDate,FreightCharge,OrderDetailID,QuantitySold,UnitSalesPrice,CustomerName,...,CustomerClass,ProductName,ModelDescription,FabricDescription,Gender,ProductLine,Weight,Size,InventoryDate,PurchasePrice
0,2,1,1.0,7/10/2003,7/10/2003,0.0,2,5,7.5,C1,...,Large-Scale Wholesaler-1,XXL-PCL29,PCL,29,Women's Panties,Underwear,997,XXL,7/10/2003,7.15
1,467,7,1.0,1/21/2004,1/21/2004,0.0,11608,5,7.9,C1,...,Large-Scale Wholesaler-1,XXL-PCL29,PCL,29,Women's Panties,Underwear,997,XXL,7/10/2003,7.15


## Date Dimension Table

### Populate MongoDB with Source Data

In [108]:
#set the connection to Mongo DB
client = get_mongo_client(**mongodb_args)

#set diretory path
data_dir = os.path.join(os.getcwd(), 'Downloads')

#Grab JSON Files to read and store in Mongo DB instance
json_files = {"customers" : 'dim_customers.json',
              "products" : 'dim_products.json',
              "orders" : 'orders.json',
              "order_details" : 'order_details.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)   

In [109]:
#extract data from source MongoDB Collections into Dataframes
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "customers"

df_customers_mongo = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customers_mongo.head(2)

Unnamed: 0,ProductID,ProductName,Color,ModelDescription,FabricDescription,Category,Gender,ProductLine,Weight,Size,PackSize,Status,InventoryDate,PurchasePrice
0,1,3-182,,AT,182,Undershirts,Girls' Undershirts,Underwear,822,3,Dozen,In Production,7/10/2003,6.6
1,2,3-183,,AT,183,Undershirts,Girls' Undershirts,Underwear,620,3,Dozen,Out of Production,7/10/2003,5.6


In [110]:
#extract data from source MongoDB Collections into Dataframes
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "products"

df_products_mongo = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_products_mongo.head(2)

Unnamed: 0,ProductID,ProductName,Color,ModelDescription,FabricDescription,Category,Gender,ProductLine,Weight,Size,PackSize,Status,InventoryDate,PurchasePrice
0,1,3-182,,AT,182,Undershirts,Girls' Undershirts,Underwear,822,3,Dozen,In Production,7/10/2003,6.6
1,2,3-183,,AT,183,Undershirts,Girls' Undershirts,Underwear,620,3,Dozen,Out of Production,7/10/2003,5.6


In [111]:
#extract data from source MongoDB Collections into Dataframes
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "order_details"

df_od_mongo = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_od_mongo.head(2)

Unnamed: 0,OrderDetailID,OrderID,ProductID,QuantitySold,UnitSalesPrice
0,2,2,955,5,7.5
1,3,2,958,5,8.5


In [112]:
#extract data from source MongoDB Collections into Dataframes
client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "orders"

df_orders_mongo = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_orders_mongo.head(2)

Unnamed: 0,OrderID,CustomerID,EmployeeID,ShippingMethodID,OrderDate,ShipDate,FreightCharge
0,2,1,1,1.0,7/10/2003,7/10/2003,0.0
1,4,2,2,1.0,7/11/2003,7/11/2003,0.0


In [113]:
#extract data from dim_date table
sql_dim_date = "SELECT date_key, full_date FROM under_db.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


## Perform Operations on Date Dimension Tables from MongoDB

In [114]:
df_dim_inventory_date = df_dim_date.rename(columns={"date_key" : "inventory_date_key", "full_date" : "InventoryDate"})
df_dim_inventory_date.InventoryDate = df_products.InventoryDate.astype('datetime64[ns]').dt.date
df_products = pd.merge(df_products, df_dim_inventory_date, on='InventoryDate', how='left')
df_products.drop(['InventoryDate'], axis=1, inplace=True)
df_products.head(2)

Unnamed: 0,ProductID,ProductName,ModelDescription,FabricDescription,Gender,ProductLine,Weight,Size,PurchasePrice,inventory_date_key
0,1,3-182,AT,182,Girls' Undershirts,Underwear,822,3,6.6,
1,2,3-183,AT,183,Girls' Undershirts,Underwear,620,3,5.6,


## Insert Transformed Dataframes into New Warehouse

In [115]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_products', df_products, 'product_key')]

## SQL Queries to Get Total and Average Purchase Prices

In [116]:
#SQL Query
sql_fact_products = """

SUM(products.PurchasePrice) AS 'TotalPurchasePrice'
FROM under_db.fact_products AS po
INNER JOIN under_db.dim_products AS s
ON po.ProductID = s.ProductID
GROUP BY s.ProductID

"""

In [117]:
#SQL Query
sql_fact_products = """

AVG(products.PurchasePrice) AS 'AveragePurchasePrice'
FROM under_db.fact_products AS po
INNER JOIN under_db.dim_products AS s
ON po.ProductID = s.ProductID
GROUP BY s.ProductID

"""