# DS 2002 Midterm Project: ETL Process with a Sample Database

#### Declare all the necessary imports:

In [149]:
import os
import json
import numpy
import datetime
import pymysql
import pandas as pd

import pymongo
from sqlalchemy import create_engine

#### Declare & Assign Connection Variables for the MySQL Server & Databases with which You'll be Working 

In [12]:
host_name = "localhost"
host_ip = "73.251.125.47"
port = "3306"
user_id = "root"
pwd = "Hookey.93002"

atlas_cluster_name = "Cluster0"
atlas_default_dbname = "sample_airbnb"
atlas_user_name = "matt93002"
atlas_password = "Hookey.93002"

conn_str = {
    "sql" : f"mysql+pymysql://{user_id}:{pwd}@{host_name}",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.m9vzszr.mongodb.net/{atlas_default_dbname}?retryWrites=true&w=majority"
}


src_dbname = "classicmodels"
dst_dbname = "classicmodel_dw"

#### Define Functions for Getting Data From and Setting Data Into Databases

In [3]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def get_mongo_dataframe_local(user_id, pwd, host_name, port, db_name, collection, query):
    '''Create a connection to MongoDB, with or without authentication credentials'''
    if user_id and pwd:
        mongo_uri = 'mongodb://%s:%s@%s:%s/%s' % (username, password, host, port, db_name)
        client = pymongo.MongoClient(mongo_uri)
    else:
        conn_str = f"mongodb://{host_name}:{port}/"
        client = pymongo.MongoClient(conn_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        sqlEngine.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

## Collecting data into dataframe

### MongoDB

#### Populate MongoDB with Source Data

In [6]:
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

data_dir = os.path.join(os.getcwd())

json_files = {"employees" : 'employees.json'}

for file in json_files:
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()   

#### Extract Data from the Source MongoDB Collections Into DataFrames

In [7]:
query = {}
collection = "employees"

df_employees = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query)
df_employees.head(2)

Unnamed: 0,employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
0,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
1,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales


### CSV File

#### Load Data from a Comma-Separated Values (CSV) File

In [52]:

data_file = os.path.join(data_dir, 'customers.csv')

df_customers = pd.read_csv(data_file, header=0, index_col=0)
df_customers.head()

Unnamed: 0_level_0,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
customerNumber,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1
103,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370.0,21000.0
112,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0,71800.0
114,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia,1611.0,117300.0
119,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",,Nantes,,44000,France,1370.0,118200.0
121,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,,Stavern,,4110,Norway,1504.0,81700.0


## MySQL

### Extract Data from the Source Database Tables

In [114]:
sql_offices = "SELECT * FROM classicmodels.offices;"
df_offices = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_offices)
df_offices.head(2)

Unnamed: 0,officeCode,city,phone,addressLine1,addressLine2,state,country,postalCode,territory
0,1,San Francisco,+1 650 219 4782,100 Market Street,Suite 300,CA,USA,94080,
1,2,Boston,+1 215 837 0825,1550 Court Place,Suite 102,MA,USA,2107,


In [70]:
sql_orderdetails = "SELECT * FROM classicmodels.orderdetails;"
df_orderdetails = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_orderdetails)
df_orderdetails.head(2)

Unnamed: 0,orderNumber,productCode,quantityOrdered,priceEach,orderLineNumber
0,10100,S18_1749,30,136.0,3
1,10100,S18_2248,50,55.09,2


In [21]:
sql_orders = "SELECT * FROM classicmodels.orders;"
df_orders = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_orders)
df_orders.head(2)

Unnamed: 0,orderNumber,orderDate,requiredDate,shippedDate,status,comments,customerNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,,363
1,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,Check on availability.,128


In [89]:
sql_payments = "SELECT * FROM classicmodels.payments;"
df_payments = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_payments)
df_payments.head(2)

Unnamed: 0,customerNumber,checkNumber,paymentDate,amount
0,103,HQ336336,2004-10-19,6066.78
1,103,JM555205,2003-06-05,14571.44


In [23]:
sql_productlines = "SELECT * FROM classicmodels.productlines;"
df_productlines = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_productlines)
df_productlines.head(2)

Unnamed: 0,productLine,textDescription,htmlDescription,image
0,Classic Cars,Attention car enthusiasts: Make your wildest c...,,
1,Motorcycles,Our motorcycles are state of the art replicas ...,,


In [98]:
sql_products = "SELECT * FROM classicmodels.products;"
df_products = get_sql_dataframe(user_id, pwd, host_name, src_dbname, sql_products)
df_products.head(2)

Unnamed: 0,productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3


We know have a dataframe for each of the tables within the database (customers, employees, offices, orderdetails, orders, payments, productlines, and products)

## Transformations

In [106]:
df_customers.insert(0, 'customers_key', range(0, 0 + len(df_customers)))
df_customers.head()

Unnamed: 0_level_0,customers_key,customerName,contactLastName,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
customerNumber,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1
103,0,Atelier graphique,Schmitt,Carine,40.32.2555,"54, rue Royale",,Nantes,,44000,France,1370.0,21000.0
112,1,Signal Gift Stores,King,Jean,7025551838,8489 Strong St.,,Las Vegas,NV,83030,USA,1166.0,71800.0
114,2,"Australian Collectors, Co.",Ferguson,Peter,03 9520 4555,636 St Kilda Road,Level 3,Melbourne,Victoria,3004,Australia,1611.0,117300.0
119,3,La Rochelle Gifts,Labrune,Janine,40.67.8555,"67, rue des Cinquante Otages",,Nantes,,44000,France,1370.0,118200.0
121,4,Baane Mini Imports,Bergulfsen,Jonas,07-98 9555,Erling Skakkes gate 78,,Stavern,,4110,Norway,1504.0,81700.0


In [112]:
df_employees.rename(columns={"employeeNumber":"employees_key"}, inplace= True)
df_employees.head()

Unnamed: 0,employees_key,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
0,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,President
1,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales
2,1076,Firrelli,Jeff,x9273,jfirrelli@classicmodelcars.com,1,1002.0,VP Marketing
3,1088,Patterson,William,x4871,wpatterson@classicmodelcars.com,6,1056.0,Sales Manager (APAC)
4,1102,Bondur,Gerard,x5408,gbondur@classicmodelcars.com,4,1056.0,Sale Manager (EMEA)


In [115]:
df_offices.insert(0, 'offices_key', range(0, 0 + len(df_offices)))
df_offices.head()

Unnamed: 0,offices_key,officeCode,city,phone,addressLine1,addressLine2,state,country,postalCode,territory
0,0,1,San Francisco,+1 650 219 4782,100 Market Street,Suite 300,CA,USA,94080,
1,1,2,Boston,+1 215 837 0825,1550 Court Place,Suite 102,MA,USA,02107,
2,2,3,NYC,+1 212 555 3000,523 East 53rd Street,apt. 5A,NY,USA,10022,
3,3,4,Paris,+33 14 723 4404,43 Rue Jouffroy D'abbans,,,France,75017,EMEA
4,4,5,Tokyo,+81 33 224 5000,4-1 Kioicho,,Chiyoda-Ku,Japan,102-8578,Japan


In [74]:
df_orderdetails.insert(0, 'orderdetails_key', range(0, 0 + len(df_orderdetails)))
df_orderdetails.head()

Unnamed: 0,orderdetails_key,orderNumber,productCode,quantityOrdered,priceEach,orderLineNumber
0,0,10100,S18_1749,30,136.0,3
1,1,10100,S18_2248,50,55.09,2
2,2,10100,S18_4409,22,75.46,4
3,3,10100,S24_3969,49,35.29,1
4,4,10101,S18_2325,25,108.06,4


In [77]:
df_orders.drop("comments", axis=1, inplace=True)
df_orders.rename(columns={"orderNumber":"orders_key"}, inplace= True)
df_orders.head()

Unnamed: 0,orders_key,orderDate,requiredDate,shippedDate,status,customerNumber
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363
1,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,128
2,10102,2003-01-10,2003-01-18,2003-01-14,Shipped,181
3,10103,2003-01-29,2003-02-07,2003-02-02,Shipped,121
4,10104,2003-01-31,2003-02-09,2003-02-01,Shipped,141


In [90]:
df_payments.insert(0, 'payments_key', range(0, 0 + len(df_payments)))
df_payments.head()

Unnamed: 0,payments_key,customerNumber,checkNumber,paymentDate,amount
0,0,103,HQ336336,2004-10-19,6066.78
1,1,103,JM555205,2003-06-05,14571.44
2,2,103,OM314933,2004-12-18,1676.14
3,3,112,BO864823,2004-12-17,14191.12
4,4,112,HQ55022,2003-06-06,32641.98


In [96]:
df_productlines.drop({"htmlDescription","image","textDescription"},  axis=1, inplace=True)
df_productlines.insert(0, 'productlines_key', range(0, 0 + len(df_productlines)))
df_productlines.head()

Unnamed: 0,productlines_key,productLine
0,0,Classic Cars
1,1,Motorcycles
2,2,Planes
3,3,Ships
4,4,Trains


In [99]:
df_products.drop({"productScale","productDescription"},  axis=1, inplace=True)
df_products.insert(0, 'products_key', range(0, 0 + len(df_products)))
df_products.head()

Unnamed: 0,products_key,productCode,productName,productLine,productVendor,quantityInStock,buyPrice,MSRP
0,0,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,Min Lin Diecast,7933,48.81,95.7
1,1,S10_1949,1952 Alpine Renault 1300,Classic Cars,Classic Metal Creations,7305,98.58,214.3
2,2,S10_2016,1996 Moto Guzzi 1100i,Motorcycles,Highway 66 Mini Classics,6625,68.99,118.94
3,3,S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,Red Start Diecast,5582,91.02,193.66
4,4,S10_4757,1972 Alfa Romeo GTA,Classic Cars,Motor City Art Classics,3252,85.68,136.0


## Load

In [119]:
sqlEngine = create_engine(conn_str["sql"], pool_recycle=3600)

sqlEngine.execute(f"DROP DATABASE IF EXISTS `{dst_dbname}`;")
sqlEngine.execute(f"CREATE DATABASE `{dst_dbname}`;")
sqlEngine.execute(f"USE {dst_dbname};")
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customers_key'),
          ('dim_employees', df_employees, 'employees_key'),
          ('dim_offices', df_offices, 'offices_key'),
          ('dim_orderdetails', df_orderdetails, 'orderdetails_key'),
          ('dim_orders', df_orders, 'orders_key'),
          ('dim_payments', df_payments, 'payments_key'),
          ('dim_productlines', df_productlines, 'productlines_key'),
          ('dim_products', df_products, 'products_key')
          ]

In [120]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(user_id, pwd, host_name, dst_dbname, dataframe, table_name, primary_key, db_operation)

## Fact Table

Merge orders and order details

In [124]:
ordersanddetails = df_orders.merge(df_orderdetails, how="inner", left_on="orders_key", right_on="orderNumber")
ordersanddetails.drop(columns={"orderNumber", "orderLineNumber"}, inplace= True)
ordersanddetails.head()

Unnamed: 0,orders_key,orderDate,requiredDate,shippedDate,status,customerNumber,orderdetails_key,productCode,quantityOrdered,priceEach
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,0,S18_1749,30,136.0
1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,1,S18_2248,50,55.09
2,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,2,S18_4409,22,75.46
3,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,3,S24_3969,49,35.29
4,10101,2003-01-09,2003-01-18,2003-01-11,Shipped,128,4,S18_2325,25,108.06


In [127]:
oodandproducts = ordersanddetails.merge(df_products, how="inner", left_on="productCode", right_on="productCode")
oodandproducts.drop(columns={"productCode","priceEach"}, inplace = True)
oodandproducts.head()

Unnamed: 0,orders_key,orderDate,requiredDate,shippedDate,status,customerNumber,orderdetails_key,quantityOrdered,products_key,productName,productLine,productVendor,quantityInStock,buyPrice,MSRP
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,0,30,22,1917 Grand Touring Sedan,Vintage Cars,Welly Diecast Productions,2724,86.7,170.0
1,10110,2003-03-18,2003-03-24,2003-03-20,Shipped,187,103,42,22,1917 Grand Touring Sedan,Vintage Cars,Welly Diecast Productions,2724,86.7,170.0
2,10124,2003-05-21,2003-05-29,2003-05-25,Shipped,112,214,21,22,1917 Grand Touring Sedan,Vintage Cars,Welly Diecast Productions,2724,86.7,170.0
3,10138,2003-07-07,2003-07-16,2003-07-13,Shipped,496,324,33,22,1917 Grand Touring Sedan,Vintage Cars,Welly Diecast Productions,2724,86.7,170.0
4,10149,2003-09-12,2003-09-18,2003-09-17,Shipped,487,443,34,22,1917 Grand Touring Sedan,Vintage Cars,Welly Diecast Productions,2724,86.7,170.0


In [136]:
fact_table = oodandproducts.merge(df_customers, how="inner", left_on="customerNumber", right_on="customerNumber")
fact_table.head()

Unnamed: 0,orders_key,orderDate,requiredDate,shippedDate,status,customerNumber,orderdetails_key,quantityOrdered,products_key,productName,...,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,0,30,22,1917 Grand Touring Sedan,...,Dorothy,6035558647,2304 Long Airport Avenue,,Nashua,NH,62005,USA,1216.0,114200.0
1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,1,50,26,1911 Ford Town Car,...,Dorothy,6035558647,2304 Long Airport Avenue,,Nashua,NH,62005,USA,1216.0,114200.0
2,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,2,22,49,1932 Alfa Romeo 8C2300 Spider Sport,...,Dorothy,6035558647,2304 Long Airport Avenue,,Nashua,NH,62005,USA,1216.0,114200.0
3,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,3,49,79,1936 Mercedes Benz 500k Roadster,...,Dorothy,6035558647,2304 Long Airport Avenue,,Nashua,NH,62005,USA,1216.0,114200.0
4,10322,2004-11-04,2004-11-12,2004-11-10,Shipped,363,2094,50,28,1932 Model A Ford J-Coupe,...,Dorothy,6035558647,2304 Long Airport Avenue,,Nashua,NH,62005,USA,1216.0,114200.0


In [132]:
fact_table.insert(0, 'ID', range(0, 0 + len(fact_table)))
fact_table.head()

Unnamed: 0,ID,orders_key,orderDate,requiredDate,shippedDate,status,customerNumber,orderdetails_key,quantityOrdered,products_key,...,contactFirstName,phone,addressLine1,addressLine2,city,state,postalCode,country,salesRepEmployeeNumber,creditLimit
0,0,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,0,30,22,...,Dorothy,6035558647,2304 Long Airport Avenue,,Nashua,NH,62005,USA,1216.0,114200.0
1,1,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,1,50,26,...,Dorothy,6035558647,2304 Long Airport Avenue,,Nashua,NH,62005,USA,1216.0,114200.0
2,2,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,2,22,49,...,Dorothy,6035558647,2304 Long Airport Avenue,,Nashua,NH,62005,USA,1216.0,114200.0
3,3,10100,2003-01-06,2003-01-13,2003-01-10,Shipped,363,3,49,79,...,Dorothy,6035558647,2304 Long Airport Avenue,,Nashua,NH,62005,USA,1216.0,114200.0
4,4,10322,2004-11-04,2004-11-12,2004-11-10,Shipped,363,2094,50,28,...,Dorothy,6035558647,2304 Long Airport Avenue,,Nashua,NH,62005,USA,1216.0,114200.0


In [133]:
set_dataframe(user_id, pwd, host_name, dst_dbname, fact_table, "fact_table", "ID", "insert")

## Query the fact_table

In [144]:
sqlEngine = create_engine(conn_str["sql"], pool_recycle=3600)
sqlEngine.execute(f"use classicmodel_dw;")
sqlEngine.execute(f"SELECT COUNT(*) as NumberOfSales, officeCity FROM (SELECT employees_key, lastName, firstName, city as officeCity FROM dim_employees as e INNER JOIN dim_offices as o ON e.officeCode = o.officeCode) as oe INNER JOIN fact_table as ft ON oe.employees_key = ft.salesRepEmployeeNumber GROUP BY officeCity;  ")

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x1201bf100>

In [150]:

sql_query = f"SELECT COUNT(*) as NumberOfSales, officeCity FROM (SELECT employees_key, lastName, firstName, city as officeCity FROM dim_employees as e INNER JOIN dim_offices as o ON e.officeCode = o.officeCode) as oe INNER JOIN fact_table as ft ON oe.employees_key = ft.salesRepEmployeeNumber GROUP BY officeCity;  "

def get_pymysql_dataframe(host, user, password, database_name, sql_query_string):
    connection = pymysql.connect(host=host, user=user, password=password, database=database_name)
    dframe = pd.read_sql(sql_query_string, connection)
    connection.close()
    
    return dframe

df_query = get_pymysql_dataframe(host_name, user_id, pwd, "classicmodel_dw", sql_query)
df_query.head()

  dframe = pd.read_sql(sql_query_string, connection)


Unnamed: 0,NumberOfSales,officeCity
0,276,Boston
1,456,London
2,445,San Francisco
3,370,Sydney
4,959,Paris
