In [None]:
#import necessary notebooks

In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine

In [2]:
#Connect Python to SQL and MongoDB Atlas

In [3]:
mysql_uid = "root"
mysql_pwd = "H3rshey3ch0!"
mysql_hostname = "localhost"

atlas_cluster_name = "cluster0.pbmagvz"
atlas_user_name = "kierakmurphy"
atlas_password = "H3rshey3ch0"

conn_str = {"local" : f"mongodb://localhost:27017/",
    "atlas" : f"mongodb+srv://{atlas_user_name}:{atlas_password}@{atlas_cluster_name}.mongodb.net"
}

src_dbname = "adventureworks"
dst_dbname = "adventureworks_dw"

print(f"Local Connection String: {conn_str['local']}")
print(f"Atlas Connection String: {conn_str['atlas']}")

Local Connection String: mongodb://localhost:27017/
Atlas Connection String: mongodb+srv://kierakmurphy:H3rshey3ch0@cluster0.pbmagvz.mongodb.net


In [4]:
#Define functions

In [5]:
def get_sql_dataframe(user_id, pwd, host_name, db_name, sql_query):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    conn = sqlEngine.connect()
    dframe = pd.read_sql(sql_query, conn);
    conn.close()
    
    return dframe


def get_mongo_dataframe(connect_str, db_name, collection, query):
    '''Create a connection to MongoDB'''
    client = pymongo.MongoClient(connect_str)
    
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    client.close()
    return dframe


def set_dataframe(user_id, pwd, host_name, db_name, df, table_name, pk_column, db_operation):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{user_id}:{pwd}@{host_name}/{db_name}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});")
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

In [6]:
#Populate MongoDB
client = pymongo.MongoClient(conn_str["atlas"])
db = client[src_dbname]

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'scripts')

json_files = {"dim_date" : 'dim_date.json'}

for file in json_files:
    db.drop_collection(file)
    json_file = os.path.join(data_dir, json_files[file])
    with open(json_file, 'r') as openfile:
        json_object = json.load(openfile)
        file = db[file]
        result = file.insert_many(json_object)
        #print(f"{file} was successfully loaded.")

        
client.close()      

In [7]:
#Extract Data from the SQL collection and populate dataframes

In [8]:
sql_products = "SELECT * FROM adventureworks.product;" #select products table from adventureworks database and create df_ products dataframe 
df_products = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, src_dbname, sql_products)
df_products.head(2)

Unnamed: 0,ProductID,Name,ProductNumber,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate,rowguid,ModifiedDate
0,1,Adjustable Race,AR-5381,b'\x00',b'\x00',,1000,750,0.0,0.0,...,,,,,,1998-06-01,NaT,,b'\xb7\x15Bi\xf7\x08\rL\xac\xb1\xd74\xbaD\xc0\...,2004-03-11 10:01:36
1,2,Bearing Ball,BA-8327,b'\x00',b'\x00',,1000,750,0.0,0.0,...,,,,,,1998-06-01,NaT,,b' <\xaeX:OIG\xa7\xd4\xd5h\x80l\xc57',2004-03-11 10:01:36


In [9]:
sql_location = "SELECT * FROM adventureworks.location;" #select location table from adventureworks database and create df_ products dataframe
df_location = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, src_dbname, sql_location)
df_location.head(2)

Unnamed: 0,LocationID,Name,CostRate,Availability,ModifiedDate
0,1,Tool Crib,0.0,0.0,1998-06-01
1,2,Sheet Metal Racks,0.0,0.0,1998-06-01


In [10]:
sql_department = "SELECT * FROM adventureworks.department;" #select department table from adventureworks database and create df_ products dataframe
df_department = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, src_dbname, sql_department)
df_department.head(2)

Unnamed: 0,DepartmentID,Name,GroupName,ModifiedDate
0,1,Engineering,Research and Development,1998-06-01
1,2,Tool Design,Research and Development,1998-06-01


In [11]:
sql_employee = "SELECT * FROM adventureworks.employee;" #select employee table from adventureworks database and create df_ products dataframe
df_employee = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, src_dbname, sql_employee)
df_employee.head(2)

Unnamed: 0,EmployeeID,NationalIDNumber,ContactID,LoginID,ManagerID,Title,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag,rowguid,ModifiedDate
0,1,14417807,1209,adventure-works\guy1,16.0,Production Technician - WC60,1972-05-15,M,M,1996-07-31,b'\x00',21,30,b'\x01',b'J\xd0\xe1\xaa7\xc2tI\xb4\xd5\x93RGsw\x18',2004-07-31
1,2,253022876,1030,adventure-works\kevin0,6.0,Marketing Assistant,1977-06-03,S,M,1997-02-26,b'\x00',42,41,b'\x01',b'@\x02H\x1b\xc0\x95\x0fA\xa7\x17\xeb)\x94<\x8...,2004-07-31


In [12]:
#Perform Transformations and add a new primary key for each variable

In [13]:
drop_cols = ['rowguid','ProductModelID','ProductSubcategoryID', 'ProductLine', 'WeightUnitMeasureCode', 'MakeFlag', 'FinishedGoodsFlag', 'SizeUnitMeasureCode']
df_products.drop(drop_cols, axis=1, inplace=True) #drop unnecessary columns
df_products.insert(0, "product_key", range(1, df_products.shape[0]+1)) #Insert a new column, with an ever-incrementing numeric value, to serve as the primary key. 
df_products.head(2)

Unnamed: 0,product_key,ProductID,Name,ProductNumber,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,Weight,DaysToManufacture,Class,Style,SellStartDate,SellEndDate,DiscontinuedDate,ModifiedDate
0,1,1,Adjustable Race,AR-5381,,1000,750,0.0,0.0,,,0,,,1998-06-01,NaT,,2004-03-11 10:01:36
1,2,2,Bearing Ball,BA-8327,,1000,750,0.0,0.0,,,0,,,1998-06-01,NaT,,2004-03-11 10:01:36


In [14]:
df_location.insert(0, "location_key", range(1, df_location.shape[0]+1)) #Insert a new column, with an ever-incrementing numeric value, to serve as the primary key. 
df_location.head(2)

Unnamed: 0,location_key,LocationID,Name,CostRate,Availability,ModifiedDate
0,1,1,Tool Crib,0.0,0.0,1998-06-01
1,2,2,Sheet Metal Racks,0.0,0.0,1998-06-01


In [15]:
df_department.insert(0, "department_key", range(1, df_department.shape[0]+1)) #Insert a new column, with an ever-incrementing numeric value, to serve as the primary key. 
df_department.head(2)

Unnamed: 0,department_key,DepartmentID,Name,GroupName,ModifiedDate
0,1,1,Engineering,Research and Development,1998-06-01
1,2,2,Tool Design,Research and Development,1998-06-01


In [16]:
drop_cols = ['rowguid', 'CurrentFlag', 'SalariedFlag']
df_employee.drop(drop_cols, axis=1, inplace=True) #drop unnecessary columns
df_employee.insert(0, "employee_key", range(1, df_employee.shape[0]+1)) #Insert a new column, with an ever-incrementing numeric value, to serve as the primary key. 
df_employee.head(2)

Unnamed: 0,employee_key,EmployeeID,NationalIDNumber,ContactID,LoginID,ManagerID,Title,BirthDate,MaritalStatus,Gender,HireDate,VacationHours,SickLeaveHours,ModifiedDate
0,1,1,14417807,1209,adventure-works\guy1,16.0,Production Technician - WC60,1972-05-15,M,M,1996-07-31,21,30,2004-07-31
1,2,2,253022876,1030,adventure-works\kevin0,6.0,Marketing Assistant,1977-06-03,S,M,1997-02-26,42,41,2004-07-31


In [17]:
#Load transformed Data into new datawarehouse

In [18]:
db_operation = "update"

tables = [('dim_department', df_department, 'department_key'),
          ('dim_employee', df_employee, 'employee_key'),
          ('dim_products', df_products, 'product_key'),
          ('dim_location', df_location, 'location_key')]

In [19]:
for table_name, dataframe, primary_key in tables:
    set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [20]:
#validate dimension tables in new datawarehouse 

In [21]:
sql_dim_products = "SELECT product_key, ProductID FROM adventureworks.dim_products;" #create new product dimension dataframe with just the key and ID
df_dim_products = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_products)
df_dim_products.head(2)

Unnamed: 0,product_key,ProductID
0,1,1
1,2,2


In [22]:
sql_dim_employee = "SELECT employee_key, EmployeeID FROM adventureworks.dim_employee;" #create new employee dimension dataframe with just the key and ID
df_dim_employee = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_employee)
df_dim_employee.head(2)

Unnamed: 0,employee_key,EmployeeID
0,1,1
1,2,2


In [23]:
sql_dim_department = "SELECT department_key, DepartmentID FROM adventureworks.dim_department;" #create new department dimension dataframe with just the key and ID
df_dim_department = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_department)
df_dim_department.head(2)

Unnamed: 0,department_key,DepartmentID
0,1,1
1,2,2


In [24]:
sql_dim_location = "SELECT location_key, LocationID FROM adventureworks.dim_location;" #create new location dimension dataframe with just the key and ID
df_dim_location = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_location)
df_dim_location.head(2)

Unnamed: 0,location_key,LocationID
0,1,1
1,2,2


In [25]:
#Create Date Dimension Table from MongoDB

In [26]:
query = {} # Select all elements (columns), and all documents (rows).
collection = "dim_date" 

df_dim_date = get_mongo_dataframe(conn_str['atlas'], src_dbname, collection, query) #create date dimension dataframe
df_dim_date.head(2)

Unnamed: 0,date_key,full_date,date_name,date_name_us,date_name_eu,day_of_week,day_name_of_week,day_of_month,day_of_year,weekday_weekend,...,is_last_day_of_month,calendar_quarter,calendar_year,calendar_year_month,calendar_year_qtr,fiscal_month_of_year,fiscal_quarter,fiscal_year,fiscal_year_month,fiscal_year_qtr
0,20000101,2000-01-01,2000/01/01,01/01/2000,01/01/2000,7,Saturday,1,1,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3
1,20000102,2000-01-02,2000/01/02,01/02/2000,02/01/2000,1,Sunday,2,2,Weekend,...,N,1,2000,2000-01,2000Q1,7,3,2000,2000-07,2000Q3


In [27]:
sql_dim_date = "SELECT date_key, full_date FROM adventureworks_dw.dim_date;" #extract date key and full date to creat date dimension table
df_dim_date = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_date)
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [28]:
#Merge date dimension with product dimension
df_modified_date = df_dim_date.rename(columns={"date_key" : "modified_date_key", "full_date" : "ModifiedDate"}) #rename date key and full date to match the product modified date
df_products.ModifiedDate = df_products.ModifiedDate.astype('datetime64[ns]').dt.date
df_products = pd.merge(df_products, df_modified_date, on='ModifiedDate', how='left') #merge the modified date table with the products dataframe
df_products.drop(['ModifiedDate'], axis=1, inplace=True) #drop the modified date
df_products.head(2)

Unnamed: 0,product_key,ProductID,Name,ProductNumber,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,Weight,DaysToManufacture,Class,Style,SellStartDate,SellEndDate,DiscontinuedDate,modified_date_key
0,1,1,Adjustable Race,AR-5381,,1000,750,0.0,0.0,,,0,,,1998-06-01,NaT,,20040311
1,2,2,Bearing Ball,BA-8327,,1000,750,0.0,0.0,,,0,,,1998-06-01,NaT,,20040311


In [29]:
#extract puchase order detail table from csv to use for fact purchase table
data_dir = os.path.join(os.getcwd(), 'scripts')
data_file = os.path.join(data_dir, 'purchaseorderdetail.csv')

df_purchaseorderdetail = pd.read_csv(data_file, header=0, index_col=0) #create purchase order detail dataframe that will be used to create fact purchase
df_purchaseorderdetail.reset_index(inplace = True) 
df_purchaseorderdetail.insert(0, "purchaseorderdetail_key", range(1, df_purchaseorderdetail.shape[0]+1)) #Insert a new column, with an ever-incrementing numeric value, to serve as the primary key. 
df_purchaseorderdetail.head(2)

Unnamed: 0,purchaseorderdetail_key,PurchaseOrderID,PurchaseOrderDetailID,DueDate,OrderQty,ProductID,UnitPrice,LineTotal,ReceivedQty,RejectedQty,StockedQty,ModifiedDate
0,1,1,1,2001-05-31 00:00:00,4,1,50.26,201.04,3.0,0.0,3.0,2001-05-24 00:00:00
1,2,2,2,2001-05-31 00:00:00,3,359,45.12,135.36,3.0,0.0,3.0,2001-05-24 00:00:00


In [30]:
#Upload the purchase order detail dataframe to the new datawarehouse in SQL
db_operation = "update"

tables = [('dim_purchaseorderdetail', df_purchaseorderdetail, 'purchaseorderdetail_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [31]:
#CREATE FACT PURCHASE TABLE

In [32]:
sql_dim_purchaseorderdetail = "SELECT PurchaseOrderID, ProductID FROM adventureworks_dw.dim_purchaseorderdetail;" #extract PurchaseOrderID and ProductId from purchaseorderdetail dimension table
dim_purchaseorderdetail = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_dim_purchaseorderdetail)
dim_purchaseorderdetail.insert(0, "purchaseorderdetail_key", range(1, dim_purchaseorderdetail.shape[0]+1)) #Insert a new column, with an ever-incrementing numeric value, to serve as the primary key. 
dim_purchaseorderdetail.head(2)

Unnamed: 0,purchaseorderdetail_key,PurchaseOrderID,ProductID
0,1,1,1
1,2,2,359


In [33]:
sql_purchaseorderheader = "SELECT PurchaseOrderID, EmployeeID FROM adventureworks.purchaseorderheader;" #extract PurchaseOrderID and EmployeeId from purchaseorderheader table
df_purchaseorderheader = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, sql_purchaseorderheader)
df_purchaseorderheader.insert(0, "purchaseorderheader_key", range(1, df_purchaseorderheader.shape[0]+1)) #Insert a new column, with an ever-incrementing numeric value, to serve as the primary key. 
df_purchaseorderheader.head(2)

Unnamed: 0,purchaseorderheader_key,PurchaseOrderID,EmployeeID
0,1,1,244
1,2,2,231


In [35]:
df_fact_purchase = pd.merge(dim_purchaseorderdetail, df_dim_products, on='ProductID', how='left') #create fact purchase table by merging 
                                                                                                    #the purchase order detail and products dimension tables on the product ID
df_fact_purchase.head()

Unnamed: 0,purchaseorderdetail_key,PurchaseOrderID,ProductID,product_key
0,1,1,1,1
1,1,1,1,1
2,1,1,1,1
3,1,1,1,1
4,2,2,359,38


In [36]:
df_fact_purchase = pd.merge(df_fact_purchase, df_purchaseorderheader, on='PurchaseOrderID', how='left') #add purchaseorderheader key to the fact purchase table by merging the 
                                                                                                        #fact purchase table and purchase order header dimension table on the purchase order ID
df_fact_purchase.head()

Unnamed: 0,purchaseorderdetail_key,PurchaseOrderID,ProductID,product_key,purchaseorderheader_key,EmployeeID
0,1,1,1,1,1,244
1,1,1,1,1,1,244
2,1,1,1,1,1,244
3,1,1,1,1,1,244
4,2,2,359,38,2,231


In [37]:
df_fact_purchase = pd.merge(df_fact_purchase, df_dim_employee, on='EmployeeID', how='left') #add employee key to the fact purchase table by merging the 
                                                                                             #fact purchase order and employee dimension tables on the employee ID
df_fact_purchase.head()

Unnamed: 0,purchaseorderdetail_key,PurchaseOrderID,ProductID,product_key,purchaseorderheader_key,EmployeeID,employee_key
0,1,1,1,1,1,244,244
1,1,1,1,1,1,244,244
2,1,1,1,1,1,244,244
3,1,1,1,1,1,244,244
4,1,1,1,1,1,244,244


In [38]:
drop_cols = ['PurchaseOrderID','ProductID','EmployeeID'] #drop ID columns
df_fact_purchase.drop(drop_cols, axis=1, inplace=True)
df_fact_purchase.head()

Unnamed: 0,purchaseorderdetail_key,product_key,purchaseorderheader_key,employee_key
0,1,1,1,244
1,1,1,1,244
2,1,1,1,244
3,1,1,1,244
4,1,1,1,244


In [39]:
df_fact_purchase.insert(0, "fact_purchase_key", range(1, df_fact_purchase.shape[0]+1)) #Insert a new column, with an ever-incrementing numeric value, to serve as the primary key.
df_fact_purchase.head()

Unnamed: 0,fact_purchase_key,purchaseorderdetail_key,product_key,purchaseorderheader_key,employee_key
0,1,1,1,1,244
1,2,1,1,1,244
2,3,1,1,1,244
3,4,1,1,1,244
4,5,1,1,1,244


In [40]:
db_operation = "update" #upload fact purchase table to SQL

tables = [('dim_fact_purchase', df_fact_purchase, 'fact_purchase_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(mysql_uid, mysql_pwd, mysql_hostname, dst_dbname, dataframe, table_name, primary_key, db_operation)

In [41]:
#SQL TEST STATEMENTS

In [42]:
sql_test = """
    SELECT `dim_employee`.`Title` AS `Job Title`,
        SUM(`dim_employee`.`employee_key`) AS `Total Quantity`
    FROM `adventureworks_dw`.`dim_employee` 
    GROUP BY `dim_employee`.`Title`
    ORDER BY `Total Quantity` DESC;
""".format(src_dbname)

df_test = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, src_dbname, sql_test)
df_test.head()

Unnamed: 0,Job Title,Total Quantity
0,Production Technician - WC60,23898.0
1,Sales Representative,23688.0
2,Production Technician - WC40,22650.0
3,Production Technician - WC20,20214.0
4,Production Technician - WC50,18162.0


In [43]:
sql_test = """
    SELECT `dim_products`.`Name` AS `Product Name`,
        SUM(`dim_products`.`product_key`) AS `Total Quantity`
    FROM `adventureworks_dw`.`dim_products` 
    GROUP BY `dim_products`.`Name`
    ORDER BY `Total Quantity` DESC;
""".format(src_dbname)

df_test = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, src_dbname, sql_test)
df_test.head()

Unnamed: 0,Product Name,Total Quantity
0,"Road-750 Black, 52",2016.0
1,"Road-750 Black, 48",2012.0
2,"Road-750 Black, 44",2008.0
3,HL Bottom Bracket,2004.0
4,ML Bottom Bracket,2000.0


In [44]:
sql_test = """
    SELECT `dim_fact_purchase`.`product_key` AS `Product Key` ,
        `dim_fact_purchase`.`employee_key` AS `Employee Key` 
    FROM `adventureworks_dw`.`dim_fact_purchase` 
""".format(src_dbname)

df_test = get_sql_dataframe(mysql_uid, mysql_pwd, mysql_hostname, src_dbname, sql_test)
df_test.head()

Unnamed: 0,Product Key,Employee Key
0,1,244
1,1,244
2,1,244
3,1,244
4,1,244
