MID-TERM PROJECT, Katherine Anne Rukavina, prh9ss

IMPORTS

In [898]:
import pymongo
import sqlalchemy
import json
import os
import pandas as pd
import certifi
from sqlalchemy import create_engine, text

CONNECTION VARIABLES- mysql and mongoDB

In [899]:
mysql_args = {
    "uid" : "root",
    "pwd" : "bandit3661",
    "hostname" : "localhost",
    "dbname" : "adventureworks"
}

mongodb_args = {
    "user_name" : "",
    "password" : "",
    "cluster_name" : "",
    "cluster_subnet" : "",
    "cluster_location" : "local", #local
    "db_name" : "adventureworks"
}

SETUP FUNCTIONS- establishing database connections

In [900]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

#### EXTRACTING
1. first load source jsons into mongoDB
2. extract into pandas dataframes

In [901]:
#1- first load source jsons into mongoDB

client = get_mongo_client(**mongodb_args)
data_dir = os.path.join(os.getcwd(), "data")

json_files = {
    "customers": "dim_customers.json",
    "employees": "dim_employees.json",
    "products": "dim_products.json",
    "vendors": "dim_vendors.json",
    "fact_purchase_orders": "fact_purchase_orders.json",
    "fact_sales_orders": "fact_sales_orders.json",
    "date": "dim_date.json"
}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)

In [902]:
#2. extract collection into pandas dataframes for customers, employees, products, vendors, fact_sales_orders, fact_purchases_orders

client = get_mongo_client(**mongodb_args)
query = {} 
collection = "customers"
df_customers = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customers.head(2)

Unnamed: 0,CustomerID,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,0,98104,US,United States,North America,Northwest
1,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,0,98055,US,United States,North America,Northwest


In [903]:
client = get_mongo_client(**mongodb_args)
query = {}
collection = "employees"
df_employees = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_employees.head(2)

Unnamed: 0,EmployeeID,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,14417807,adventure-works\guy1,16.0,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15 00:00:00,M,M,1996-07-31 00:00:00,0,21,30,1
1,2,253022876,adventure-works\kevin0,6.0,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03 00:00:00,S,M,1997-02-26 00:00:00,0,42,41,1


In [904]:
client = get_mongo_client(**mongodb_args)
query = {} 
collection = "products"
df_products = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_products.head(2)

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
0,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,0.0,...,0,,,,,,,1998-06-01 00:00:00,,
1,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,0.0,...,0,,,,,,,1998-06-01 00:00:00,,


In [905]:
client = get_mongo_client(**mongodb_args)
query = {}
collection = "vendors"
df_vendors = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_vendors.head(2)

Unnamed: 0,VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode
0,1,INTERNAT0001,International,1,1,1,Main Office,683 Larch Ct.,,Salt Lake City,UT,Utah,84101
1,2,ELECTRON0002,Electronic Bike Repair & Supplies,1,1,1,Main Office,8547 Catherine Way,,Tacoma,WA,Washington,98403


In [906]:
client = get_mongo_client(**mongodb_args)
query = {}
collection = "fact_sales_orders"
df_fact_sales = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_fact_sales.head(2)

Unnamed: 0,SalesOrderID,RevisionNumber,OrderDate,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,...,CreditCardApprovalCode,SubTotal,TaxAmt,Freight,TotalDue,CarrierTrackingNumber,OrderQty,ProductID,UnitPrice,LineTotal
0,43659,1,2001-07-01 00:00:00,2001-07-13 00:00:00,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,...,105041Vi84182,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,4,711,20.1865,80.746
1,43659,1,2001-07-01 00:00:00,2001-07-13 00:00:00,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,...,105041Vi84182,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,2,712,5.1865,10.373


In [907]:
client = get_mongo_client(**mongodb_args)
query = {}
collection = "fact_purchase_orders"
df_fact_purchases = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_fact_purchases.head(2)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,EmployeeID,VendorID,ProductID,OrderQty,UnitPrice,LineTotal,OrderDate,...,ShipRate,ShipDate,SubTotal,TaxAmt,Freight,TotalDue,DueDate,ReceivedQty,RejectedQty,StockedQty
0,1,0,4,244,83,1,4,50.26,201.04,2001-05-17 00:00:00,...,2.99,2001-05-26 00:00:00,201.04,16.0832,5.026,222.1492,2001-05-31 00:00:00,3.0,0.0,3.0
1,2,0,1,231,32,360,3,45.5805,136.7415,2001-05-17 00:00:00,...,1.49,2001-05-26 00:00:00,272.1015,21.7681,6.8025,300.6721,2001-05-31 00:00:00,3.0,0.0,3.0


#### TRANSFORMING PART 1- cleaning/standardizing data
1.retrieving date dimension for date lookups
2.cleaning up dataframes
3.using date dimension to assign surrogate date keys to fact tables

In [908]:
#retrieving date dimension for date lookups

sql_dim_date = "SELECT date_key, full_date FROM adventureworks.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


dataframe transformations: renaming variables, adding in surrogate keys, fixing time formating, dropping null columns
for the following dataframes- customers, employees, products, vendors

In [909]:
df_customers.rename(columns={"CustomerID":"customer_id"}, inplace=True)
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0] + 1))
df_customers.head(5)

Unnamed: 0,customer_key,customer_id,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,0,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,0,98055,US,United States,North America,Northwest
2,3,2,AW00000002,S,Main Office,3207 S Grady Way,,Renton,WA,Washington,0,98055,US,United States,North America,Northwest
3,4,3,AW00000003,S,Main Office,12345 Sterling Avenue,,Irving,TX,Texas,0,75061,US,United States,North America,Southwest
4,5,4,AW00000004,S,Main Office,800 Interchange Blvd.,Suite 2501,Austin,TX,Texas,0,78701,US,United States,North America,Southwest


In [910]:
df_employees.rename(columns={"EmployeeID":"employee_id"}, inplace=True)
df_employees.insert(0, "employee_key", range(1, df_employees.shape[0] + 1))
df_employees.BirthDate = pd.to_datetime(df_employees['BirthDate']).dt.date
df_employees.HireDate = pd.to_datetime(df_employees['HireDate']).dt.date
df_employees.head(5)

Unnamed: 0,employee_key,employee_id,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,1,14417807,adventure-works\guy1,16.0,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15,M,M,1996-07-31,0,21,30,1
1,2,2,253022876,adventure-works\kevin0,6.0,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03,S,M,1997-02-26,0,42,41,1
2,3,3,509647174,adventure-works\roberto0,12.0,Roberto,,Tamburello,Engineering Manager,roberto0@adventure-works.com,0,212-555-0187,1964-12-13,M,M,1997-12-12,1,2,21,1
3,4,4,112457891,adventure-works\rob0,3.0,Rob,,Walters,Senior Tool Designer,rob0@adventure-works.com,0,612-555-0100,1965-01-23,S,M,1998-01-05,0,48,80,1
4,5,5,480168528,adventure-works\thierry0,263.0,Thierry,B,D'Hers,Tool Designer,thierry0@adventure-works.com,2,168-555-0183,1949-08-29,M,M,1998-01-11,0,9,24,1


In [911]:
df_products.rename(columns={"ProductID":"product_id"}, inplace=True)
df_products.insert(0, "product_key", range(1, df_products.shape[0] + 1))
df_products.SellStartDate = pd.to_datetime(df_products['SellStartDate']).dt.date
df_products.drop(['SellEndDate'], axis=1, inplace=True) #all nulls
df_products.drop(['DiscontinuedDate'], axis=1, inplace=True) #all nulls
df_products.head(5)

Unnamed: 0,product_key,product_id,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,...,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate
0,1,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,...,,,0,,,,,,,1998-06-01
1,2,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,...,,,0,,,,,,,1998-06-01
2,3,3,BB Ball Bearing,BE-2349,1,0,,800,600,0.0,...,,,1,,,,,,,1998-06-01
3,4,4,Headset Ball Bearings,BE-2908,0,0,,800,600,0.0,...,,,0,,,,,,,1998-06-01
4,5,316,Blade,BL-2036,1,0,,800,600,0.0,...,,,1,,,,,,,1998-06-01


In [912]:
df_vendors.rename(columns={"VendorID":"vendor_id"}, inplace=True)
df_vendors.insert(0, "vendor_key", range(1, df_vendors.shape[0] + 1))
df_vendors.head(5)

Unnamed: 0,vendor_key,vendor_id,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode
0,1,1,INTERNAT0001,International,1,1,1,Main Office,683 Larch Ct.,,Salt Lake City,UT,Utah,84101
1,2,2,ELECTRON0002,Electronic Bike Repair & Supplies,1,1,1,Main Office,8547 Catherine Way,,Tacoma,WA,Washington,98403
2,3,3,PREMIER0001,"Premier Sport, Inc.",1,1,1,Main Office,7682 Fern Leaf Lane,,Boston,MA,Massachusetts,2113
3,4,4,COMFORT0001,Comfort Road Bicycles,1,1,1,Main Office,7651 Smiling Tree Court,Space 55,Los Angeles,CA,California,90012
4,5,5,METROSP0001,Metro Sport Equipment,1,1,1,Main Office,60 Oakgrove Rd.,,Lebanon,OR,Oregon,97355


Using date dimension to assign surrogate date keys to fact tables- fact_sales and fact_purchases

In [913]:
df_dim_order_date= df_dim_date.rename(columns={"date_key" : "order_date_key", "full_date" : "OrderDate"})
df_fact_sales.OrderDate = df_fact_sales.OrderDate.astype('datetime64[ns]').dt.date
df_fact_sales = pd.merge(df_fact_sales, df_dim_order_date, on='OrderDate', how='left')
df_fact_sales.drop(['OrderDate'], axis=1, inplace=True)
df_fact_sales.head(5)

Unnamed: 0,SalesOrderID,RevisionNumber,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,...,SubTotal,TaxAmt,Freight,TotalDue,CarrierTrackingNumber,OrderQty,ProductID,UnitPrice,LineTotal,order_date_key
0,43659,1,2001-07-13 00:00:00,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,...,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,4,711,20.1865,80.746,20010701
1,43659,1,2001-07-13 00:00:00,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,...,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,2,712,5.1865,10.373,20010701
2,43659,1,2001-07-13 00:00:00,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,...,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,6,709,5.7,34.2,20010701
3,43659,1,2001-07-13 00:00:00,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,...,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,1,716,28.8404,28.8404,20010701
4,43659,1,2001-07-13 00:00:00,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,...,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,3,714,28.8404,86.5212,20010701


In [914]:
df_dim_due_date= df_dim_date.rename(columns={"date_key" : "due_date_key", "full_date" : "DueDate"})
df_fact_sales.DueDate = df_fact_sales.DueDate.astype('datetime64[ns]').dt.date
df_fact_sales = pd.merge(df_fact_sales, df_dim_due_date, on='DueDate', how='left')
df_fact_sales.drop(['DueDate'], axis=1, inplace=True)
df_fact_sales.head(5)

Unnamed: 0,SalesOrderID,RevisionNumber,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,ContactID,...,TaxAmt,Freight,TotalDue,CarrierTrackingNumber,OrderQty,ProductID,UnitPrice,LineTotal,order_date_key,due_date_key
0,43659,1,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,378,...,1971.5149,616.0984,27231.5495,4911-403C-98,4,711,20.1865,80.746,20010701,20010713
1,43659,1,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,378,...,1971.5149,616.0984,27231.5495,4911-403C-98,2,712,5.1865,10.373,20010701,20010713
2,43659,1,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,378,...,1971.5149,616.0984,27231.5495,4911-403C-98,6,709,5.7,34.2,20010701,20010713
3,43659,1,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,378,...,1971.5149,616.0984,27231.5495,4911-403C-98,1,716,28.8404,28.8404,20010701,20010713
4,43659,1,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,378,...,1971.5149,616.0984,27231.5495,4911-403C-98,3,714,28.8404,86.5212,20010701,20010713


In [915]:
df_dim_ship_date= df_dim_date.rename(columns={"date_key" : "ship_date_key", "full_date" : "ShipDate"})
df_fact_sales.ShipDate = df_fact_sales.ShipDate.astype('datetime64[ns]').dt.date
df_fact_sales = pd.merge(df_fact_sales, df_dim_ship_date, on='ShipDate', how='left')
df_fact_sales.drop(['ShipDate'], axis=1, inplace=True)
df_fact_sales.head(5)

Unnamed: 0,SalesOrderID,RevisionNumber,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,ContactID,SalesPersonID,...,Freight,TotalDue,CarrierTrackingNumber,OrderQty,ProductID,UnitPrice,LineTotal,order_date_key,due_date_key,ship_date_key
0,43659,1,5,0,SO43659,PO522145787,10-4020-000676,676,378,279.0,...,616.0984,27231.5495,4911-403C-98,4,711,20.1865,80.746,20010701,20010713,20010708
1,43659,1,5,0,SO43659,PO522145787,10-4020-000676,676,378,279.0,...,616.0984,27231.5495,4911-403C-98,2,712,5.1865,10.373,20010701,20010713,20010708
2,43659,1,5,0,SO43659,PO522145787,10-4020-000676,676,378,279.0,...,616.0984,27231.5495,4911-403C-98,6,709,5.7,34.2,20010701,20010713,20010708
3,43659,1,5,0,SO43659,PO522145787,10-4020-000676,676,378,279.0,...,616.0984,27231.5495,4911-403C-98,1,716,28.8404,28.8404,20010701,20010713,20010708
4,43659,1,5,0,SO43659,PO522145787,10-4020-000676,676,378,279.0,...,616.0984,27231.5495,4911-403C-98,3,714,28.8404,86.5212,20010701,20010713,20010708


In [916]:
df_dim_order_date= df_dim_date.rename(columns={"date_key" : "order_date_key", "full_date" : "OrderDate"})
df_fact_purchases.OrderDate = df_fact_purchases.OrderDate.astype('datetime64[ns]').dt.date
df_fact_purchases = pd.merge(df_fact_purchases, df_dim_order_date, on='OrderDate', how='left')
df_fact_purchases.drop(['OrderDate'], axis=1, inplace=True)
df_fact_purchases.head(5)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,EmployeeID,VendorID,ProductID,OrderQty,UnitPrice,LineTotal,ShipMethod,...,ShipDate,SubTotal,TaxAmt,Freight,TotalDue,DueDate,ReceivedQty,RejectedQty,StockedQty,order_date_key
0,1,0,4,244,83,1,4,50.26,201.04,OVERSEAS - DELUXE,...,2001-05-26 00:00:00,201.04,16.0832,5.026,222.1492,2001-05-31 00:00:00,3.0,0.0,3.0,20010517
1,2,0,1,231,32,360,3,45.5805,136.7415,CARGO TRANSPORT 5,...,2001-05-26 00:00:00,272.1015,21.7681,6.8025,300.6721,2001-05-31 00:00:00,3.0,0.0,3.0,20010517
2,2,0,1,231,32,359,3,45.12,135.36,CARGO TRANSPORT 5,...,2001-05-26 00:00:00,272.1015,21.7681,6.8025,300.6721,2001-05-31 00:00:00,3.0,0.0,3.0,20010517
3,3,0,4,241,38,530,550,16.086,8847.3,ZY - EXPRESS,...,2001-05-26 00:00:00,8847.3,707.784,221.1825,9776.2665,2001-05-31 00:00:00,550.0,0.0,550.0,20010517
4,4,0,3,266,85,4,3,57.0255,171.0765,CARGO TRANSPORT 5,...,2001-05-26 00:00:00,171.0765,13.6861,4.2769,189.0395,2001-05-31 00:00:00,2.0,1.0,1.0,20010517


In [917]:
df_dim_due_date= df_dim_date.rename(columns={"date_key" : "due_date_key", "full_date" : "DueDate"})
df_fact_purchases.DueDate = df_fact_purchases.DueDate.astype('datetime64[ns]').dt.date
df_fact_purchases = pd.merge(df_fact_purchases, df_dim_due_date, on='DueDate', how='left')
df_fact_purchases.drop(['DueDate'], axis=1, inplace=True)
df_fact_purchases.head(5)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,EmployeeID,VendorID,ProductID,OrderQty,UnitPrice,LineTotal,ShipMethod,...,ShipDate,SubTotal,TaxAmt,Freight,TotalDue,ReceivedQty,RejectedQty,StockedQty,order_date_key,due_date_key
0,1,0,4,244,83,1,4,50.26,201.04,OVERSEAS - DELUXE,...,2001-05-26 00:00:00,201.04,16.0832,5.026,222.1492,3.0,0.0,3.0,20010517,20010531
1,2,0,1,231,32,360,3,45.5805,136.7415,CARGO TRANSPORT 5,...,2001-05-26 00:00:00,272.1015,21.7681,6.8025,300.6721,3.0,0.0,3.0,20010517,20010531
2,2,0,1,231,32,359,3,45.12,135.36,CARGO TRANSPORT 5,...,2001-05-26 00:00:00,272.1015,21.7681,6.8025,300.6721,3.0,0.0,3.0,20010517,20010531
3,3,0,4,241,38,530,550,16.086,8847.3,ZY - EXPRESS,...,2001-05-26 00:00:00,8847.3,707.784,221.1825,9776.2665,550.0,0.0,550.0,20010517,20010531
4,4,0,3,266,85,4,3,57.0255,171.0765,CARGO TRANSPORT 5,...,2001-05-26 00:00:00,171.0765,13.6861,4.2769,189.0395,2.0,1.0,1.0,20010517,20010531


In [918]:
df_dim_ship_date= df_dim_date.rename(columns={"date_key" : "ship_date_key", "full_date" : "ShipDate"})
df_fact_purchases.ShipDate = df_fact_purchases.ShipDate.astype('datetime64[ns]').dt.date
df_fact_purchases = pd.merge(df_fact_purchases, df_dim_ship_date, on='ShipDate', how='left')
df_fact_purchases.drop(['ShipDate'], axis=1, inplace=True)
df_fact_purchases.head(5)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,EmployeeID,VendorID,ProductID,OrderQty,UnitPrice,LineTotal,ShipMethod,...,SubTotal,TaxAmt,Freight,TotalDue,ReceivedQty,RejectedQty,StockedQty,order_date_key,due_date_key,ship_date_key
0,1,0,4,244,83,1,4,50.26,201.04,OVERSEAS - DELUXE,...,201.04,16.0832,5.026,222.1492,3.0,0.0,3.0,20010517,20010531,20010526
1,2,0,1,231,32,360,3,45.5805,136.7415,CARGO TRANSPORT 5,...,272.1015,21.7681,6.8025,300.6721,3.0,0.0,3.0,20010517,20010531,20010526
2,2,0,1,231,32,359,3,45.12,135.36,CARGO TRANSPORT 5,...,272.1015,21.7681,6.8025,300.6721,3.0,0.0,3.0,20010517,20010531,20010526
3,3,0,4,241,38,530,550,16.086,8847.3,ZY - EXPRESS,...,8847.3,707.784,221.1825,9776.2665,550.0,0.0,550.0,20010517,20010531,20010526
4,4,0,3,266,85,4,3,57.0255,171.0765,CARGO TRANSPORT 5,...,171.0765,13.6861,4.2769,189.0395,2.0,1.0,1.0,20010517,20010531,20010526


#### LOADING PART 1- loading back into MySQL
1.loading the transformed dimension/fact tables into MySQL for customers, employees, products, vendors

In [919]:
dataframe = df_customers
table_name = 'dim_customers'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [920]:
dataframe = df_employees
table_name = 'dim_employees'
primary_key = 'employee_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [921]:
dataframe = df_products
table_name = 'dim_products'
primary_key = 'product_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [922]:
dataframe = df_vendors
table_name = 'dim_vendors'
primary_key = 'vendor_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### TRANSFORMING PART 2-
4.retrieving surrogate keys from MySQL dim tables for merging
5.renaming variables in fact_sales and fact_purchases for simplicity
6.merging fact tables with dimension keys

retrieving surrogate keys from MySQL dimension tables for merging for customers, employees, products, vendors

In [923]:
sql_dim_customers = "SELECT customer_key, customer_id FROM adventureworks.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_dim_customers, **mysql_args)
df_dim_customers.head(5)

Unnamed: 0,customer_key,customer_id
0,1,1
1,2,2
2,3,2
3,4,3
4,5,4


In [924]:
sql_dim_employees = "SELECT employee_key, employee_id FROM adventureworks.dim_employees;"
df_dim_employees = get_sql_dataframe(sql_dim_employees, **mysql_args)
df_dim_employees.head(5)

Unnamed: 0,employee_key,employee_id
0,1,1
1,2,2
2,3,3
3,4,4
4,5,5


In [925]:
sql_dim_products = "SELECT product_key, product_id FROM adventureworks.dim_products;"
df_dim_products = get_sql_dataframe(sql_dim_products, **mysql_args)
df_dim_products.head(5)

Unnamed: 0,product_key,product_id
0,1,1
1,2,2
2,3,3
3,4,4
4,5,316


In [926]:
sql_dim_vendors = "SELECT vendor_key, vendor_id FROM adventureworks.dim_vendors;"
df_dim_vendors = get_sql_dataframe(sql_dim_vendors, **mysql_args)
df_dim_vendors.head(5)

Unnamed: 0,vendor_key,vendor_id
0,1,1
1,2,2
2,3,3
3,4,4
4,5,5


renaming variables in fact_sales and fact_purchases for simplicity

In [927]:
df_fact_sales.rename(columns={"CustomerID": "customer_id","SalesPersonID": "employee_id", "ProductID": "product_id"}, inplace=True)
df_fact_purchases.rename(columns={"EmployeeID": "employee_id", "ProductID": "product_id", "VendorID": "vendor_id"}, inplace=True)

merging fact tables with dimension keys for fact_sales and fact_purchases

In [928]:
df_fact_sales = pd.merge(df_fact_sales, df_dim_employees, on='employee_id', how='left')
df_fact_sales.drop(['employee_id'], axis=1, inplace=True)

df_fact_sales = pd.merge(df_fact_sales, df_dim_products, on='product_id', how='left')
df_fact_sales.drop(['product_id'], axis=1, inplace=True)

df_fact_sales = pd.merge(df_fact_sales, df_dim_customers, on='customer_id', how='left')
df_fact_sales.drop(['customer_id'], axis=1, inplace=True)

df_fact_sales.head(5)

Unnamed: 0,SalesOrderID,RevisionNumber,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,ContactID,Sales Territory Group,Sales Territory,...,CarrierTrackingNumber,OrderQty,UnitPrice,LineTotal,order_date_key,due_date_key,ship_date_key,employee_key,product_key,customer_key
0,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,North America,Southeast,...,4911-403C-98,4,20.1865,80.746,20010701,20010713,20010708,279.0,216,687.0
1,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,North America,Southeast,...,4911-403C-98,2,5.1865,10.373,20010701,20010713,20010708,279.0,217,687.0
2,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,North America,Southeast,...,4911-403C-98,6,5.7,34.2,20010701,20010713,20010708,279.0,214,687.0
3,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,North America,Southeast,...,4911-403C-98,1,28.8404,28.8404,20010701,20010713,20010708,279.0,221,687.0
4,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,North America,Southeast,...,4911-403C-98,3,28.8404,86.5212,20010701,20010713,20010708,279.0,219,687.0


In [929]:
df_fact_purchases = pd.merge(df_fact_purchases, df_dim_employees, on='employee_id', how='left')
df_fact_purchases.drop(['employee_id'], axis=1, inplace=True)

df_fact_purchases = pd.merge(df_fact_purchases, df_dim_products, on='product_id', how='left')
df_fact_purchases.drop(['product_id'], axis=1, inplace=True)

df_fact_purchases = pd.merge(df_fact_purchases, df_dim_vendors, on='vendor_id', how='left')
df_fact_purchases.drop(['vendor_id'], axis=1, inplace=True)

df_fact_purchases.head(2)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,OrderQty,UnitPrice,LineTotal,ShipMethod,ShipBase,ShipRate,SubTotal,...,TotalDue,ReceivedQty,RejectedQty,StockedQty,order_date_key,due_date_key,ship_date_key,employee_key,product_key,vendor_key
0,1,0,4,4,50.26,201.04,OVERSEAS - DELUXE,29.95,2.99,201.04,...,222.1492,3.0,0.0,3.0,20010517,20010531,20010526,244,1,83
1,2,0,1,3,45.5805,136.7415,CARGO TRANSPORT 5,8.99,1.49,272.1015,...,300.6721,3.0,0.0,3.0,20010517,20010531,20010526,231,39,32


In [930]:
df_fact_sales.insert(0, "fact_sales_order_key", range(1, df_fact_sales.shape[0] + 1))
df_fact_sales.head(2)

Unnamed: 0,fact_sales_order_key,SalesOrderID,RevisionNumber,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,ContactID,Sales Territory Group,...,CarrierTrackingNumber,OrderQty,UnitPrice,LineTotal,order_date_key,due_date_key,ship_date_key,employee_key,product_key,customer_key
0,1,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,North America,...,4911-403C-98,4,20.1865,80.746,20010701,20010713,20010708,279.0,216,687.0
1,2,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,North America,...,4911-403C-98,2,5.1865,10.373,20010701,20010713,20010708,279.0,217,687.0


In [931]:
df_fact_purchases.insert(0, "fact_purchase_orders_key", range(1, df_fact_purchases.shape[0] + 1))
df_fact_purchases.head(5)

Unnamed: 0,fact_purchase_orders_key,PurchaseOrderID,RevisionNumber,Status,OrderQty,UnitPrice,LineTotal,ShipMethod,ShipBase,ShipRate,...,TotalDue,ReceivedQty,RejectedQty,StockedQty,order_date_key,due_date_key,ship_date_key,employee_key,product_key,vendor_key
0,1,1,0,4,4,50.26,201.04,OVERSEAS - DELUXE,29.95,2.99,...,222.1492,3.0,0.0,3.0,20010517,20010531,20010526,244,1,83
1,2,2,0,1,3,45.5805,136.7415,CARGO TRANSPORT 5,8.99,1.49,...,300.6721,3.0,0.0,3.0,20010517,20010531,20010526,231,39,32
2,3,2,0,1,3,45.12,135.36,CARGO TRANSPORT 5,8.99,1.49,...,300.6721,3.0,0.0,3.0,20010517,20010531,20010526,231,38,32
3,4,3,0,4,550,16.086,8847.3,ZY - EXPRESS,9.95,1.99,...,9776.2665,550.0,0.0,550.0,20010517,20010531,20010526,241,203,38
4,5,4,0,3,3,57.0255,171.0765,CARGO TRANSPORT 5,8.99,1.49,...,189.0395,2.0,1.0,1.0,20010517,20010531,20010526,266,4,85


#### LOADING Part 2- loading back into MySQL
1.loading the transformed dimension/fact tables into MySQL for fact_sales, fact_purchases

In [932]:
dataframe = df_fact_sales
table_name = 'fact_sales_orders'
primary_key = 'fact_sales_order_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [933]:
dataframe = df_fact_purchases
table_name = 'fact_purchase_orders'
primary_key = 'fact_purchase_orders_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### VALIDATING- confirming loading was successful
1.read from MySQL to confirm for customers, employees, vendors, products, fact_sales, fact_purcases

In [934]:
sql_customers = "SELECT * FROM adventureworks.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_customers, **mysql_args)
df_dim_customers.head(2)

Unnamed: 0,customer_key,customer_id,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,0,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,0,98055,US,United States,North America,Northwest


In [935]:
sql_employees = "SELECT * FROM adventureworks.dim_employees;"
df_dim_employees = get_sql_dataframe(sql_employees, **mysql_args)
df_dim_employees.head(2)

Unnamed: 0,employee_key,employee_id,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,1,14417807,adventure-works\guy1,16.0,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15,M,M,1996-07-31,0,21,30,1
1,2,2,253022876,adventure-works\kevin0,6.0,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03,S,M,1997-02-26,0,42,41,1


In [936]:
sql_products = "SELECT * FROM adventureworks.dim_products;"
df_dim_products = get_sql_dataframe(sql_products, **mysql_args)
df_dim_products.head(2)

Unnamed: 0,product_key,product_id,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,...,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate
0,1,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,...,,,0,,,,,,,1998-06-01
1,2,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,...,,,0,,,,,,,1998-06-01


In [937]:
sql_vendors = "SELECT * FROM adventureworks.dim_vendors;"
df_dim_vendors = get_sql_dataframe(sql_vendors, **mysql_args)
df_dim_vendors.head(2)

Unnamed: 0,vendor_key,vendor_id,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode
0,1,1,INTERNAT0001,International,1,1,1,Main Office,683 Larch Ct.,,Salt Lake City,UT,Utah,84101
1,2,2,ELECTRON0002,Electronic Bike Repair & Supplies,1,1,1,Main Office,8547 Catherine Way,,Tacoma,WA,Washington,98403


In [938]:
sql_fact_sales_orders = "SELECT * FROM adventureworks.fact_sales_orders;"
df_fact_sales_orders = get_sql_dataframe(sql_fact_sales_orders, **mysql_args)
df_fact_sales_orders.head(2)

Unnamed: 0,fact_sales_order_key,SalesOrderID,RevisionNumber,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,ContactID,Sales Territory Group,...,CarrierTrackingNumber,OrderQty,UnitPrice,LineTotal,order_date_key,due_date_key,ship_date_key,employee_key,product_key,customer_key
0,1,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,North America,...,4911-403C-98,4,20.1865,80.746,20010701,20010713,20010708,279.0,216,687.0
1,2,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,North America,...,4911-403C-98,2,5.1865,10.373,20010701,20010713,20010708,279.0,217,687.0


In [939]:
sql_fact_purchase_orders = "SELECT * FROM adventureworks.fact_purchase_orders;"
df_fact_purchase_orders = get_sql_dataframe(sql_fact_purchase_orders, **mysql_args)
df_fact_purchase_orders.head(2)

Unnamed: 0,fact_purchase_orders_key,PurchaseOrderID,RevisionNumber,Status,OrderQty,UnitPrice,LineTotal,ShipMethod,ShipBase,ShipRate,...,TotalDue,ReceivedQty,RejectedQty,StockedQty,order_date_key,due_date_key,ship_date_key,employee_key,product_key,vendor_key
0,1,1,0,4,4,50.26,201.04,OVERSEAS - DELUXE,29.95,2.99,...,222.1492,3.0,0.0,3.0,20010517,20010531,20010526,244,1,83
1,2,2,0,1,3,45.5805,136.7415,CARGO TRANSPORT 5,8.99,1.49,...,300.6721,3.0,0.0,3.0,20010517,20010531,20010526,231,39,32


#### DEMONSTRATION- show proper functionality

In [940]:
sql_total_product_sold_by_state = """
SELECT 
    customers.State_Province AS state,
    products.Name AS product_name,
    SUM(fact_sales.OrderQty) AS total_qty_sold
FROM adventureworks.fact_sales_orders AS fact_sales
JOIN adventureworks.dim_products AS products
    ON fact_sales.product_key = products.product_key
JOIN adventureworks.dim_customers AS customers
    ON fact_sales.customer_key = customers.customer_key
GROUP BY products.Name, customers.State_Province
ORDER BY total_qty_sold DESC;
"""

In [941]:
df_total_product_sold_by_state = get_sql_dataframe(sql_total_product_sold_by_state, **mysql_args)
df_total_product_sold_by_state

Unnamed: 0,state,product_name,total_qty_sold
0,Florida,"Mountain-100 Silver, 44",30.0
1,Minnesota,"Mountain-100 Silver, 42",28.0
2,Florida,"Mountain-100 Black, 38",26.0
3,Illinois,"Mountain Bike Socks, M",21.0
4,Minnesota,"Mountain-100 Black, 44",20.0
...,...,...,...
568,Alberta,"LL Road Frame - Red, 44",1.0
569,Alberta,"Road-450 Red, 60",1.0
570,Alberta,"Road-650 Red, 52",1.0
571,Alberta,"HL Mountain Frame - Black, 38",1.0


In [944]:
sql_total_product_purchased_by_vendor = """
SELECT 
    vendors.Name AS vendor,
    products.Name AS product_name,
    SUM(fact_purchases.OrderQty) AS total_qty_purchased
FROM adventureworks.fact_purchase_orders AS fact_purchases
JOIN adventureworks.dim_products AS products
    ON fact_purchases.product_key = products.product_key
JOIN adventureworks.dim_vendors AS vendors
    ON fact_purchases.vendor_key = vendors.vendor_key
GROUP BY vendors.Name, products.Name
ORDER BY total_qty_purchased DESC;
"""

In [945]:
df_total_product_purchased_by_vendor = get_sql_dataframe(sql_total_product_purchased_by_vendor, **mysql_args)
df_total_product_purchased_by_vendor

Unnamed: 0,vendor,product_name,total_qty_purchased
0,SUPERSALES INC.,Decal 2,6250.0
1,SUPERSALES INC.,Decal 1,6250.0
2,Allenson Cycles,Seat Post,3850.0
3,American Bikes,HL Road Rim,3850.0
4,Anderson's Custom Bikes,Touring Rim,3850.0
...,...,...,...
401,Ready Rentals,Thin-Jam Lock Nut 11,3.0
402,Ready Rentals,Thin-Jam Lock Nut 12,3.0
403,Ready Rentals,Thin-Jam Lock Nut 8,3.0
404,Ready Rentals,Thin-Jam Lock Nut 7,3.0
