## Set Up

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd
import pymysql

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text
from pymongo import MongoClient

In [2]:
mysql_args = {
    "uid" : "root",
    "pwd" : "tpwk",
    "hostname" : "localhost",
    "dbname" : "adventureworks"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "lcliau18_db_user",
    "password" : "tpwk",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "ep8zqil",
    "cluster_location" : "atlas", # "local"
    "db_name" : "adventureworks"
}

In [3]:
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

## Populate MongoDB with Source Data

In [4]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

json_files = {"customers" : 'customers.json',
              "employees" : 'employees.json',
              "vendors" : 'vendors.json',
              "products" : 'products.json',
              "fact_purchase_orders" : 'fact_purchase_orders.json',
              "fact_sales_orders" : 'fact_sales_orders.json'
             }

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)         

## Creating and Populating the New Dimension Tables

In [52]:
client = get_mongo_client(**mongodb_args)
query = {}
collection = "customers"
df_dim_customers = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_dim_customers.head(2)

InvalidOperation: Cannot use MongoClient after close

In [6]:
client = get_mongo_client(**mongodb_args)
query = {}
collection = "employees"
df_dim_employees = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_dim_employees.head(2)

Unnamed: 0,EmployeeID,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,14417807,adventure-works\guy1,16.0,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15 00:00:00,M,M,1996-07-31 00:00:00,0,21,30,1
1,2,253022876,adventure-works\kevin0,6.0,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03 00:00:00,S,M,1997-02-26 00:00:00,0,42,41,1


In [7]:
client = get_mongo_client(**mongodb_args)
query = {}
collection = "vendors"
df_dim_vendors = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_dim_vendors.head(2)

Unnamed: 0,VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode
0,1,INTERNAT0001,International,1,1,1,Main Office,683 Larch Ct.,,Salt Lake City,UT,Utah,84101
1,2,ELECTRON0002,Electronic Bike Repair & Supplies,1,1,1,Main Office,8547 Catherine Way,,Tacoma,WA,Washington,98403


In [8]:
client = get_mongo_client(**mongodb_args)
query = {}
collection = "products"
df_dim_products = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_dim_products.head(2)

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
0,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,0.0,...,0,,,,,,,1998-06-01 00:00:00,,
1,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,0.0,...,0,,,,,,,1998-06-01 00:00:00,,


In [9]:
sql_dim_date = "SELECT date_key, full_date FROM adventureworks.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


## Transforming the Data Frames

In [10]:
df_dim_customers.rename(columns={"id":"CustomerID"}, inplace=True)
df_dim_customers.insert(0, "customer_key", range(1, df_dim_customers.shape[0]+1))
df_dim_customers.head(2)

Unnamed: 0,customer_key,CustomerID,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,0,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,0,98055,US,United States,North America,Northwest


In [11]:
df_dim_employees.rename(columns={"id":"EmployeeID"}, inplace=True)
df_dim_employees.insert(0, "employee_key", range(1, df_dim_employees.shape[0]+1))
df_dim_employees.head(2)

Unnamed: 0,employee_key,EmployeeID,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,1,14417807,adventure-works\guy1,16.0,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15 00:00:00,M,M,1996-07-31 00:00:00,0,21,30,1
1,2,2,253022876,adventure-works\kevin0,6.0,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03 00:00:00,S,M,1997-02-26 00:00:00,0,42,41,1


In [12]:
df_dim_vendors.rename(columns={"id":"VendorID"}, inplace=True)
df_dim_vendors.insert(0, "vendor_key", range(1, df_dim_vendors.shape[0]+1))
df_dim_vendors.head(2)

Unnamed: 0,vendor_key,VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode
0,1,1,INTERNAT0001,International,1,1,1,Main Office,683 Larch Ct.,,Salt Lake City,UT,Utah,84101
1,2,2,ELECTRON0002,Electronic Bike Repair & Supplies,1,1,1,Main Office,8547 Catherine Way,,Tacoma,WA,Washington,98403


In [13]:
df_dim_products.rename(columns={"id":"ProductID"}, inplace=True)
df_dim_products.insert(0, "product_key", range(1, df_dim_products.shape[0]+1))
df_dim_products.head(2)

Unnamed: 0,product_key,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,...,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
0,1,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01 00:00:00,,
1,2,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01 00:00:00,,


## Loading the Transformed DataFrames into the New Data Warehouse; Validating Creation

In [14]:
dataframe = df_dim_customers
table_name = 'dim_customers'
primary_key = 'customer_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [15]:
dataframe = df_dim_employees
table_name = 'dim_employees'
primary_key = 'employee_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [16]:
dataframe = df_dim_vendors
table_name = 'dim_vendors'
primary_key = 'vendor_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [17]:
dataframe = df_dim_products
table_name = 'dim_products'
primary_key = 'product_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [18]:
sql_dim_customers = "SELECT * FROM adventureworks.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_dim_customers, **mysql_args)
df_dim_customers.head(2)

Unnamed: 0,customer_key,CustomerID,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,1,AW00000001,S,Main Office,2251 Elliot Avenue,,Seattle,WA,Washington,0,98104,US,United States,North America,Northwest
1,2,2,AW00000002,S,Shipping,7943 Walnut Ave,,Renton,WA,Washington,0,98055,US,United States,North America,Northwest


In [19]:
sql_dim_employees = "SELECT * FROM adventureworks.dim_employees;"
df_dim_employees = get_sql_dataframe(sql_dim_employees, **mysql_args)
df_dim_employees.head(2)

Unnamed: 0,employee_key,EmployeeID,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailAddress,EmailPromotion,Phone,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,1,14417807,adventure-works\guy1,16.0,Guy,R,Gilbert,Production Technician - WC60,guy1@adventure-works.com,0,320-555-0195,1972-05-15 00:00:00,M,M,1996-07-31 00:00:00,0,21,30,1
1,2,2,253022876,adventure-works\kevin0,6.0,Kevin,F,Brown,Marketing Assistant,kevin0@adventure-works.com,2,150-555-0189,1977-06-03 00:00:00,S,M,1997-02-26 00:00:00,0,42,41,1


In [20]:
sql_dim_vendors = "SELECT * FROM adventureworks.dim_vendors;"
df_dim_vendors = get_sql_dataframe(sql_dim_vendors, **mysql_args)
df_dim_vendors.head(2)

Unnamed: 0,vendor_key,VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,PostalCode
0,1,1,INTERNAT0001,International,1,1,1,Main Office,683 Larch Ct.,,Salt Lake City,UT,Utah,84101
1,2,2,ELECTRON0002,Electronic Bike Repair & Supplies,1,1,1,Main Office,8547 Catherine Way,,Tacoma,WA,Washington,98403


In [21]:
sql_dim_products = "SELECT * FROM adventureworks.dim_products;"
df_dim_products = get_sql_dataframe(sql_dim_products, **mysql_args)
df_dim_products.head(2)

Unnamed: 0,product_key,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,...,DaysToManufacture,ProductLine,Class,Style,ProductCategory,ProductSubcategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
0,1,1,Adjustable Race,AR-5381,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01 00:00:00,,
1,2,2,Bearing Ball,BA-8327,0,0,,1000,750,0.0,...,0,,,,,,,1998-06-01 00:00:00,,


## Creating and Populating the New Fact Tables

In [22]:
client = get_mongo_client(**mongodb_args)
query = {}
collection = "fact_purchase_orders"
df_fact_purchase_orders = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_fact_purchase_orders.head(2)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,EmployeeID,VendorID,ProductID,OrderQty,UnitPrice,LineTotal,OrderDate,...,ShipRate,ShipDate,SubTotal,TaxAmt,Freight,TotalDue,DueDate,ReceivedQty,RejectedQty,StockedQty
0,1,0,4,244,83,1,4,50.26,201.04,2001-05-17 00:00:00,...,2.99,2001-05-26 00:00:00,201.04,16.0832,5.026,222.1492,2001-05-31 00:00:00,3.0,0.0,3.0
1,2,0,1,231,32,360,3,45.5805,136.7415,2001-05-17 00:00:00,...,1.49,2001-05-26 00:00:00,272.1015,21.7681,6.8025,300.6721,2001-05-31 00:00:00,3.0,0.0,3.0


In [23]:
client = get_mongo_client(**mongodb_args)
query = {}
collection = "fact_sales_orders"
df_fact_sales_orders = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_fact_sales_orders.head(2)

Unnamed: 0,SalesOrderID,RevisionNumber,OrderDate,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,...,CreditCardApprovalCode,SubTotal,TaxAmt,Freight,TotalDue,CarrierTrackingNumber,OrderQty,ProductID,UnitPrice,LineTotal
0,43659,1,2001-07-01 00:00:00,2001-07-13 00:00:00,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,...,105041Vi84182,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,4,711,20.1865,80.746
1,43659,1,2001-07-01 00:00:00,2001-07-13 00:00:00,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,...,105041Vi84182,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,2,712,5.1865,10.373


In [24]:
sql_dim_date = "SELECT date_key, full_date FROM adventureworks.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [25]:
df_dim_sales_date = df_dim_date.rename(columns={"date_key" : "sales_order_date_key", "full_date" : "OrderDate"})
df_fact_sales_orders.OrderDate = df_fact_sales_orders.OrderDate.astype('datetime64[ns]').dt.date
df_fact_sales_orders = df_fact_sales_orders.merge(df_dim_sales_date, on='OrderDate', how='left')
df_fact_sales_orders.drop(['OrderDate'], axis=1, inplace=True)
df_fact_sales_orders.head(2)

Unnamed: 0,SalesOrderID,RevisionNumber,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,...,SubTotal,TaxAmt,Freight,TotalDue,CarrierTrackingNumber,OrderQty,ProductID,UnitPrice,LineTotal,sales_order_date_key
0,43659,1,2001-07-13 00:00:00,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,...,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,4,711,20.1865,80.746,20010701
1,43659,1,2001-07-13 00:00:00,2001-07-08 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,...,24643.9362,1971.5149,616.0984,27231.5495,4911-403C-98,2,712,5.1865,10.373,20010701


In [26]:
df_dim_ship_date = df_dim_date.rename(columns={"date_key" : "ship_date_key", "full_date" : "ShipDate"})
df_fact_sales_orders.ShipDate = df_fact_sales_orders.ShipDate.astype('datetime64[ns]').dt.date
df_fact_sales_orders = df_fact_sales_orders.merge(df_dim_ship_date, on='ShipDate', how='left')
df_fact_sales_orders.drop(['ShipDate'], axis=1, inplace=True)
df_fact_sales_orders.head(2)

Unnamed: 0,SalesOrderID,RevisionNumber,DueDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,ContactID,...,TaxAmt,Freight,TotalDue,CarrierTrackingNumber,OrderQty,ProductID,UnitPrice,LineTotal,sales_order_date_key,ship_date_key
0,43659,1,2001-07-13 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,378,...,1971.5149,616.0984,27231.5495,4911-403C-98,4,711,20.1865,80.746,20010701,20010708
1,43659,1,2001-07-13 00:00:00,5,0,SO43659,PO522145787,10-4020-000676,676,378,...,1971.5149,616.0984,27231.5495,4911-403C-98,2,712,5.1865,10.373,20010701,20010708


In [27]:
df_dim_due_date = df_dim_date.rename(columns={"date_key" : "due_date_key", "full_date" : "DueDate"})
df_fact_sales_orders.DueDate = df_fact_sales_orders.DueDate.astype('datetime64[ns]').dt.date
df_fact_sales_orders = df_fact_sales_orders.merge(df_dim_due_date, on='DueDate', how='left')
df_fact_sales_orders.drop(['DueDate'], axis=1, inplace=True)
df_fact_sales_orders.head(2)

Unnamed: 0,SalesOrderID,RevisionNumber,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,CustomerID,ContactID,SalesPersonID,...,Freight,TotalDue,CarrierTrackingNumber,OrderQty,ProductID,UnitPrice,LineTotal,sales_order_date_key,ship_date_key,due_date_key
0,43659,1,5,0,SO43659,PO522145787,10-4020-000676,676,378,279.0,...,616.0984,27231.5495,4911-403C-98,4,711,20.1865,80.746,20010701,20010708,20010713
1,43659,1,5,0,SO43659,PO522145787,10-4020-000676,676,378,279.0,...,616.0984,27231.5495,4911-403C-98,2,712,5.1865,10.373,20010701,20010708,20010713


In [28]:
df_dim_purchase_date = df_dim_date.rename(columns={"date_key" : "purchase_date_key", "full_date" : "OrderDate"})
df_fact_purchase_orders.OrderDate = df_fact_purchase_orders.OrderDate.astype('datetime64[ns]').dt.date
df_fact_purchase_orders = df_fact_purchase_orders.merge(df_dim_purchase_date, on='OrderDate', how='left')
df_fact_purchase_orders.drop(['OrderDate'], axis=1, inplace=True)
df_fact_purchase_orders.head(2)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,EmployeeID,VendorID,ProductID,OrderQty,UnitPrice,LineTotal,ShipMethod,...,ShipDate,SubTotal,TaxAmt,Freight,TotalDue,DueDate,ReceivedQty,RejectedQty,StockedQty,purchase_date_key
0,1,0,4,244,83,1,4,50.26,201.04,OVERSEAS - DELUXE,...,2001-05-26 00:00:00,201.04,16.0832,5.026,222.1492,2001-05-31 00:00:00,3.0,0.0,3.0,20010517
1,2,0,1,231,32,360,3,45.5805,136.7415,CARGO TRANSPORT 5,...,2001-05-26 00:00:00,272.1015,21.7681,6.8025,300.6721,2001-05-31 00:00:00,3.0,0.0,3.0,20010517


## Looking up Primary Keys from the Dimension Tables

In [29]:
sql_dim_customers = "SELECT customer_key, CustomerID FROM adventureworks.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_dim_customers, **mysql_args)
df_dim_customers.head(2)

Unnamed: 0,customer_key,CustomerID
0,1,1
1,2,2


In [30]:
sql_dim_employees = "SELECT employee_key, EmployeeID FROM adventureworks.dim_employees;"
df_dim_employees = get_sql_dataframe(sql_dim_employees, **mysql_args)
df_dim_employees.head(2)

Unnamed: 0,employee_key,EmployeeID
0,1,1
1,2,2


In [31]:
sql_dim_vendors = "SELECT vendor_key, VendorID FROM adventureworks.dim_vendors;"
df_dim_vendors = get_sql_dataframe(sql_dim_vendors, **mysql_args)
df_dim_vendors.head(2)

Unnamed: 0,vendor_key,VendorID
0,1,1
1,2,2


In [32]:
sql_dim_products = "SELECT product_key, ProductID FROM adventureworks.dim_products;"
df_dim_products = get_sql_dataframe(sql_dim_products, **mysql_args)
df_dim_products.head(2)

Unnamed: 0,product_key,ProductID
0,1,1
1,2,2


## Looking up Corresponding Surrogate Primary Keys in the Dimension Tables

In [33]:
df_fact_purchase_orders = pd.merge(df_fact_purchase_orders, df_dim_employees[['EmployeeID', 'employee_key']], on = 'EmployeeID', how='inner')
df_fact_purchase_orders.drop(['EmployeeID'], axis=1, inplace=True)
df_fact_purchase_orders.head(2)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,VendorID,ProductID,OrderQty,UnitPrice,LineTotal,ShipMethod,ShipBase,...,SubTotal,TaxAmt,Freight,TotalDue,DueDate,ReceivedQty,RejectedQty,StockedQty,purchase_date_key,employee_key
0,1,0,4,83,1,4,50.26,201.04,OVERSEAS - DELUXE,29.95,...,201.04,16.0832,5.026,222.1492,2001-05-31 00:00:00,3.0,0.0,3.0,20010517,244
1,2,0,1,32,360,3,45.5805,136.7415,CARGO TRANSPORT 5,8.99,...,272.1015,21.7681,6.8025,300.6721,2001-05-31 00:00:00,3.0,0.0,3.0,20010517,231


In [34]:
df_fact_purchase_orders = pd.merge(
    df_fact_purchase_orders, 
    df_dim_vendors[['VendorID', 'vendor_key']], 
    on='VendorID', 
    how='inner'
)
df_fact_purchase_orders.drop(['VendorID'], axis=1, inplace=True)
df_fact_purchase_orders.head(2)

Unnamed: 0,PurchaseOrderID,RevisionNumber,Status,ProductID,OrderQty,UnitPrice,LineTotal,ShipMethod,ShipBase,ShipRate,...,TaxAmt,Freight,TotalDue,DueDate,ReceivedQty,RejectedQty,StockedQty,purchase_date_key,employee_key,vendor_key
0,1,0,4,1,4,50.26,201.04,OVERSEAS - DELUXE,29.95,2.99,...,16.0832,5.026,222.1492,2001-05-31 00:00:00,3.0,0.0,3.0,20010517,244,83
1,2,0,1,360,3,45.5805,136.7415,CARGO TRANSPORT 5,8.99,1.49,...,21.7681,6.8025,300.6721,2001-05-31 00:00:00,3.0,0.0,3.0,20010517,231,32


In [35]:
df_fact_purchase_orders.insert(0, 'fact_purchase_order_key', range(1, len(df_fact_purchase_orders) + 1))
df_fact_purchase_orders.head(2)

Unnamed: 0,fact_purchase_order_key,PurchaseOrderID,RevisionNumber,Status,ProductID,OrderQty,UnitPrice,LineTotal,ShipMethod,ShipBase,...,TaxAmt,Freight,TotalDue,DueDate,ReceivedQty,RejectedQty,StockedQty,purchase_date_key,employee_key,vendor_key
0,1,1,0,4,1,4,50.26,201.04,OVERSEAS - DELUXE,29.95,...,16.0832,5.026,222.1492,2001-05-31 00:00:00,3.0,0.0,3.0,20010517,244,83
1,2,2,0,1,360,3,45.5805,136.7415,CARGO TRANSPORT 5,8.99,...,21.7681,6.8025,300.6721,2001-05-31 00:00:00,3.0,0.0,3.0,20010517,231,32


In [36]:
df_fact_purchase_orders = pd.merge(
    df_fact_purchase_orders, 
    df_dim_products[['ProductID', 'product_key']], 
    on='ProductID',
    how='inner'
)
df_fact_purchase_orders.drop(['ProductID'], axis=1, inplace=True)
df_fact_purchase_orders.head(2)

Unnamed: 0,fact_purchase_order_key,PurchaseOrderID,RevisionNumber,Status,OrderQty,UnitPrice,LineTotal,ShipMethod,ShipBase,ShipRate,...,Freight,TotalDue,DueDate,ReceivedQty,RejectedQty,StockedQty,purchase_date_key,employee_key,vendor_key,product_key
0,1,1,0,4,4,50.26,201.04,OVERSEAS - DELUXE,29.95,2.99,...,5.026,222.1492,2001-05-31 00:00:00,3.0,0.0,3.0,20010517,244,83,1
1,2,2,0,1,3,45.5805,136.7415,CARGO TRANSPORT 5,8.99,1.49,...,6.8025,300.6721,2001-05-31 00:00:00,3.0,0.0,3.0,20010517,231,32,39


In [37]:
df_dim_ship_date_purchase_orders = df_dim_date.rename(columns={"date_key": "ship_date_key_purchase_orders", "full_date":"ShipDate"})
df_fact_purchase_orders['ShipDate'] = df_fact_purchase_orders['ShipDate'].astype('datetime64[ns]').dt.date
df_fact_purchase_orders = df_fact_purchase_orders.merge(
    df_dim_ship_date_purchase_orders, 
    on='ShipDate', 
    how='left'
)
df_fact_purchase_orders.drop(['ShipDate'], axis=1, inplace=True)
df_fact_purchase_orders.head(2)

Unnamed: 0,fact_purchase_order_key,PurchaseOrderID,RevisionNumber,Status,OrderQty,UnitPrice,LineTotal,ShipMethod,ShipBase,ShipRate,...,TotalDue,DueDate,ReceivedQty,RejectedQty,StockedQty,purchase_date_key,employee_key,vendor_key,product_key,ship_date_key_purchase_orders
0,1,1,0,4,4,50.26,201.04,OVERSEAS - DELUXE,29.95,2.99,...,222.1492,2001-05-31 00:00:00,3.0,0.0,3.0,20010517,244,83,1,20010526
1,2,2,0,1,3,45.5805,136.7415,CARGO TRANSPORT 5,8.99,1.49,...,300.6721,2001-05-31 00:00:00,3.0,0.0,3.0,20010517,231,32,39,20010526


In [38]:
df_dim_due_date_purchase_orders = df_dim_date.rename(columns={
    "date_key": "due_date_key_purchase_orders", 
    "full_date": "DueDate"
})
df_fact_purchase_orders['DueDate'] = df_fact_purchase_orders['DueDate'].astype('datetime64[ns]').dt.date
df_fact_purchase_orders = df_fact_purchase_orders.merge(
    df_dim_due_date_purchase_orders, 
    on='DueDate', 
    how='left'
)
df_fact_purchase_orders.drop(['DueDate'], axis=1, inplace=True)
df_fact_purchase_orders.head(2)

Unnamed: 0,fact_purchase_order_key,PurchaseOrderID,RevisionNumber,Status,OrderQty,UnitPrice,LineTotal,ShipMethod,ShipBase,ShipRate,...,TotalDue,ReceivedQty,RejectedQty,StockedQty,purchase_date_key,employee_key,vendor_key,product_key,ship_date_key_purchase_orders,due_date_key_purchase_orders
0,1,1,0,4,4,50.26,201.04,OVERSEAS - DELUXE,29.95,2.99,...,222.1492,3.0,0.0,3.0,20010517,244,83,1,20010526,20010531
1,2,2,0,1,3,45.5805,136.7415,CARGO TRANSPORT 5,8.99,1.49,...,300.6721,3.0,0.0,3.0,20010517,231,32,39,20010526,20010531


In [39]:
df_fact_sales_orders = pd.merge(
    df_fact_sales_orders, 
    df_dim_customers[['CustomerID', 'customer_key']], 
    on='CustomerID', 
    how='inner'
)
df_fact_sales_orders.drop(['CustomerID'], axis=1, inplace=True)
df_fact_sales_orders.head(2)

Unnamed: 0,SalesOrderID,RevisionNumber,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,ContactID,SalesPersonID,Sales Territory Group,...,TotalDue,CarrierTrackingNumber,OrderQty,ProductID,UnitPrice,LineTotal,sales_order_date_key,ship_date_key,due_date_key,customer_key
0,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,279.0,North America,...,27231.5495,4911-403C-98,4,711,20.1865,80.746,20010701,20010708,20010713,687
1,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,279.0,North America,...,27231.5495,4911-403C-98,2,712,5.1865,10.373,20010701,20010708,20010713,687


In [40]:
df_fact_sales_orders = pd.merge(df_fact_sales_orders, df_dim_products[['ProductID', 'product_key']], 
    on='ProductID',how='inner')
df_fact_sales_orders.drop(['ProductID'], axis=1, inplace=True)
df_fact_sales_orders.head(2)

Unnamed: 0,SalesOrderID,RevisionNumber,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,ContactID,SalesPersonID,Sales Territory Group,...,TotalDue,CarrierTrackingNumber,OrderQty,UnitPrice,LineTotal,sales_order_date_key,ship_date_key,due_date_key,customer_key,product_key
0,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,279.0,North America,...,27231.5495,4911-403C-98,4,20.1865,80.746,20010701,20010708,20010713,687,216
1,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,279.0,North America,...,27231.5495,4911-403C-98,2,5.1865,10.373,20010701,20010708,20010713,687,217


In [41]:
df_fact_sales_orders.insert(0, 'fact_sales_orders_key', range(1, len(df_fact_sales_orders) + 1))
df_fact_sales_orders.head(2)

Unnamed: 0,fact_sales_orders_key,SalesOrderID,RevisionNumber,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,ContactID,SalesPersonID,...,TotalDue,CarrierTrackingNumber,OrderQty,UnitPrice,LineTotal,sales_order_date_key,ship_date_key,due_date_key,customer_key,product_key
0,1,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,279.0,...,27231.5495,4911-403C-98,4,20.1865,80.746,20010701,20010708,20010713,687,216
1,2,43659,1,5,0,SO43659,PO522145787,10-4020-000676,378,279.0,...,27231.5495,4911-403C-98,2,5.1865,10.373,20010701,20010708,20010713,687,217


## Loading Newly Transformed MongoDB Date into the AdventureWorks Data Warehouse

In [42]:
dataframe = df_fact_sales_orders
table_name = 'fact_sales_orders'
primary_key = 'fact_sales_orders_key'
db_operation = "insert"

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

In [43]:
dataframe = df_fact_purchase_orders
table_name = 'fact_purchase_orders'
primary_key = 'fact_purchase_order_key'
db_operation = 'insert'

set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

## Demonstrating the New Data Warehouse Exists and Contains Correct Data

In [44]:
sql_employee_find = """
    SELECT 
        e.FirstName AS first_name,
        e.LastName AS last_name,
        e.Title AS job_title,
        SUM(e.SickLeaveHours) AS sick_leave_hours
        from adventureworks.dim_employees AS e
    GROUP BY e.employee_key
    ORDER BY e.LastName
"""

In [45]:
df_fact_employee_find = get_sql_dataframe(sql_employee_find, **mysql_args)
df_fact_employee_find.head(10)

Unnamed: 0,first_name,last_name,job_title,sick_leave_hours
0,Syed,Abbas,Pacific Sales Manager,30.0
1,Kim,Abercrombie,Production Technician - WC60,32.0
2,Hazem,Abolrous,Quality Assurance Manager,60.0
3,Pilar,Ackerman,Shipping and Receiving Supervisor,66.0
4,Jay,Adams,Production Technician - WC60,36.0
5,François,Ajenstat,Database Administrator,53.0
6,Amy,Alberts,European Sales Manager,30.0
7,Greg,Alderson,Production Technician - WC45,62.0
8,Sean,Alexander,Quality Assurance Technician,61.0
9,Gary,Altman,Facilities Manager,63.0


In [46]:
sql_fact_purchase_orders = """
    SELECT 
        p.Name AS product_name,
        COUNT(f.PurchaseOrderID) AS total_transactions,
        SUM(f.OrderQty) AS total_quantity
    FROM adventureworks.dim_products AS p
    JOIN adventureworks.fact_purchase_orders AS f
        ON f.product_key = p.product_key
    GROUP BY p.product_key, p.Name
    ORDER BY p.product_key;
"""

In [47]:
df_fact_purchase_orders = get_sql_dataframe(sql_fact_purchase_orders, **mysql_args)
df_fact_purchase_orders

Unnamed: 0,product_name,total_transactions,total_quantity
0,Adjustable Race,7,22.0
1,Bearing Ball,6,18.0
2,Headset Ball Bearings,7,21.0
3,LL Crankarm,9,4950.0
4,ML Crankarm,9,4950.0
...,...,...,...
206,ML Road Pedal,9,4950.0
207,HL Road Pedal,5,2750.0
208,Touring Pedal,7,3850.0
209,Front Brakes,5,2750.0


In [48]:
sql_fact_sales_orders = """
    SELECT 
        c.AccountNumber AS account_number,
        COUNT(s.SalesOrderID) AS total_transactions,
        MAX(s.SubTotal) AS total_price
    FROM adventureworks.dim_customers AS c
    JOIN adventureworks.fact_sales_orders AS s
        ON s.customer_key = c.customer_key
    GROUP BY c.customer_key, c.AccountNumber
    ORDER BY c.customer_key;
"""

In [49]:
df_fact_sales_orders = get_sql_dataframe(sql_fact_sales_orders, **mysql_args)
df_fact_sales_orders.head(10)

Unnamed: 0,account_number,total_transactions,total_price
0,AW00000001,12,13216.0537
1,AW00000011,6,31625.9064
2,AW00000017,5,17040.8246
3,AW00000018,10,39677.4848
4,AW00000022,1,503.3507
5,AW00000027,8,47343.2903
6,AW00000029,9,39078.9348
7,AW00000036,13,16463.34
8,AW00000073,10,24713.954
9,AW00000078,28,49671.5619
