In [1]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd
import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.34
Running PyMongo Version: 4.11.1


#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases with which You'll be Working

In [6]:
mysql_args = {
    "uid" : "root",
    "pwd" : "mypassword",
    "hostname" : "localhost",
    "dbname" : "adventureworks"
}
# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "miamccarrick",
    "password" : "mypassword",
    "cluster_name" : "mycluster",
    "cluster_subnet" : "zl4ms",
    "cluster_location" : "atlas", # "local"
    "db_name" : "adventureworks"
}

def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()



#### Getting Data from MongoDB

In [9]:
#Query products to export as JSON
sql_products = """
SELECT ProductID, Name, ProductNumber, Color, SafetyStockLevel,
ReorderPoint, StandardCost, ListPrice, Size, SizeUnitMeasureCode,
ProductLine, Class, Style, ProductSubcategoryID, ProductModelID,
SellStartDate, SellEndDate, DiscontinuedDate FROM Product;
"""
product_data = get_sql_dataframe(sql_products, **mysql_args)
product_data.to_json("products.json", orient="records") 

In [11]:
#Populate MongoDB with product data
client = get_mongo_client(**mongodb_args)

data_dir = os.getcwd()

json_files = {"products" : 'products.json'}

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)  

In [12]:
#Extract Products from MongoDB
client = get_mongo_client(**mongodb_args)

query = {} 
collection = "products"

df_products = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_products.insert(0, "product_key", range(1, df_products.shape[0]+1))
df_products.head(2)

Unnamed: 0,product_key,ProductID,Name,ProductNumber,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,Size,SizeUnitMeasureCode,ProductLine,Class,Style,ProductSubcategoryID,ProductModelID,SellStartDate,SellEndDate,DiscontinuedDate
0,1,1,Adjustable Race,AR-5381,,1000,750,0.0,0.0,,,,,,,,896659200000,,
1,2,2,Bearing Ball,BA-8327,,1000,750,0.0,0.0,,,,,,,,896659200000,,


#### Getting Data from MySQL

In [14]:
#Get Customers data
sql_customers = "SELECT * FROM adventureworks.customer;"
df_customers = get_sql_dataframe(sql_customers, **mysql_args)
#Drop columns I don't need
drop_cols = ["ModifiedDate", "rowguid"]
df_customers.drop(drop_cols, axis=1, inplace=True)
df_customers.insert(0, "customer_key", range(1, df_customers.shape[0]+1))
df_customers.head(2)

Unnamed: 0,customer_key,CustomerID,TerritoryID,AccountNumber,CustomerType
0,1,1,1,AW00000001,S
1,2,2,1,AW00000002,S


In [15]:
#Get Date Data
sql_dim_date = "SELECT date_key, full_date FROM adventureworks.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


#### Getting Data from Local CSV

In [17]:
#Query products to export as CSV
sql_salesperson = """
SELECT SalesPersonID, TerritoryID, SalesQuota, Bonus,
CommissionPct, SalesYTD, SalesLastYear
FROM adventureworks.salesperson;
"""
salesperson_data = get_sql_dataframe(sql_salesperson, **mysql_args)
salesperson_data.to_csv('salesperson.csv', index=False)
#Read in CSV locally
df_salesperson = pd.read_csv('salesperson.csv')
df_salesperson.insert(0, "salesperson_key", range(1, df_salesperson.shape[0]+1))
df_salesperson.head(2)

Unnamed: 0,salesperson_key,SalesPersonID,TerritoryID,SalesQuota,Bonus,CommissionPct,SalesYTD,SalesLastYear
0,1,268,,,0.0,0.0,677558.5,0.0
1,2,275,2.0,300000.0,4100.0,0.012,4557045.0,1750406.0


#### Load the Transformed DataFrames into the New Data Warehouse by Creating New Tables

In [19]:
db_operation = "insert"

tables = [('dim_customers', df_customers, 'customer_key'),
          ('dim_salesperson', df_salesperson, 'salesperson_key'),
          ('dim_products', df_products, 'product_key'),
         ('dim_date', df_dim_date, 'date_key')]

for table_name, dataframe, primary_key in tables:
    set_dataframe(dataframe, table_name, primary_key, db_operation, **mysql_args)

#### Creating Fact Orders Table

In [21]:
#Creating fact table with the relevant columns that I want
sql_fact_sales = """
SELECT sod.SalesOrderID,
sod.OrderQty, 
sod.ProductID,
sod.UnitPrice,
sod.SpecialOfferID,
sod.UnitPriceDiscount, 
soh.OrderDate,
soh.DueDate,
soh.ShipDate, 
soh.CustomerID,
soh.SalesPersonID
FROM adventureworks.salesorderdetail sod
LEFT JOIN adventureworks.salesorderheader soh
ON sod.SalesOrderID = soh.SalesOrderID
"""

df_fact_sales = get_sql_dataframe(sql_fact_sales, **mysql_args)
df_fact_sales.head(2)

Unnamed: 0,SalesOrderID,OrderQty,ProductID,UnitPrice,SpecialOfferID,UnitPriceDiscount,OrderDate,DueDate,ShipDate,CustomerID,SalesPersonID
0,43659,1,776,2024.994,1,0.0,2001-07-01,2001-07-13,2001-07-08,676,279.0
1,43659,3,777,2024.994,1,0.0,2001-07-01,2001-07-13,2001-07-08,676,279.0


#### Lookup the DateKeys from the Date Dimension Table.

In [23]:
#Converting dates to proper format
df_dim_order_date = df_dim_date.rename(columns={"date_key" : "order_date_key", "full_date" : "OrderDate"})
df_fact_sales.OrderDate = df_fact_sales.OrderDate.astype('datetime64[ns]').dt.date
df_fact_sales = pd.merge(df_fact_sales, df_dim_order_date, on='OrderDate', how='left')
df_fact_sales.drop(['OrderDate'], axis=1, inplace=True)

df_dim_due_date = df_dim_date.rename(columns={"date_key" : "due_date_key", "full_date" : "DueDate"})
df_fact_sales.DueDate = df_fact_sales.DueDate.astype('datetime64[ns]').dt.date
df_fact_sales = pd.merge(df_fact_sales, df_dim_due_date, on='DueDate', how='left')
df_fact_sales.drop(['DueDate'], axis=1, inplace=True)

df_dim_ship_date = df_dim_date.rename(columns={"date_key" : "ship_date_key", "full_date" : "ShipDate"})
df_fact_sales.ShipDate = df_fact_sales.ShipDate.astype('datetime64[ns]').dt.date
df_fact_sales = pd.merge(df_fact_sales, df_dim_ship_date, on='ShipDate', how='left')
df_fact_sales.drop(['ShipDate'], axis=1, inplace=True)


df_fact_sales.head(2)

Unnamed: 0,SalesOrderID,OrderQty,ProductID,UnitPrice,SpecialOfferID,UnitPriceDiscount,CustomerID,SalesPersonID,order_date_key,due_date_key,ship_date_key
0,43659,1,776,2024.994,1,0.0,676,279.0,20010701,20010713,20010708
1,43659,3,777,2024.994,1,0.0,676,279.0,20010701,20010713,20010708


#### Lookup the Primary Keys from the Dimension Tables

In [26]:
sql_dim_products = "SELECT product_key, ProductID FROM adventureworks.dim_products;"
df_dim_products = get_sql_dataframe(sql_dim_products, **mysql_args)
df_dim_products.head(2)

Unnamed: 0,product_key,ProductID
0,1,1
1,2,2


In [27]:
sql_dim_customers = "SELECT customer_key, CustomerID FROM adventureworks.dim_customers;"
df_dim_customers = get_sql_dataframe(sql_dim_customers, **mysql_args)
df_dim_customers.head(2)

Unnamed: 0,customer_key,CustomerID
0,1,1
1,2,2


In [28]:
sql_dim_salesperson = "SELECT salesperson_key, SalesPersonID FROM adventureworks.dim_salesperson;"
df_dim_salesperson = get_sql_dataframe(sql_dim_salesperson, **mysql_args)
df_dim_salesperson.head(2)

Unnamed: 0,salesperson_key,SalesPersonID
0,1,268
1,2,275


In [32]:
df_fact_sales = df_fact_sales.merge(df_dim_products, on='ProductID', how='left')
df_fact_sales = df_fact_sales.drop(columns=['ProductID'])
df_fact_sales.head(2)

Unnamed: 0,SalesOrderID,OrderQty,UnitPrice,SpecialOfferID,UnitPriceDiscount,CustomerID,SalesPersonID,order_date_key,due_date_key,ship_date_key,product_key
0,43659,1,2024.994,1,0.0,676,279.0,20010701,20010713,20010708,281
1,43659,3,2024.994,1,0.0,676,279.0,20010701,20010713,20010708,282


In [34]:
df_fact_sales = df_fact_sales.merge(df_dim_salesperson, on='SalesPersonID', how='left')
df_fact_sales = df_fact_sales.drop(columns=['SalesPersonID'])
df_fact_sales.head(2)

Unnamed: 0,SalesOrderID,OrderQty,UnitPrice,SpecialOfferID,UnitPriceDiscount,CustomerID,order_date_key,due_date_key,ship_date_key,product_key,salesperson_key
0,43659,1,2024.994,1,0.0,676,20010701,20010713,20010708,281,6.0
1,43659,3,2024.994,1,0.0,676,20010701,20010713,20010708,282,6.0


In [35]:
df_fact_sales = df_fact_sales.merge(df_dim_customers, on='CustomerID', how='left')
df_fact_sales = df_fact_sales.drop(columns=['CustomerID'])
df_fact_sales.head(2)

Unnamed: 0,SalesOrderID,OrderQty,UnitPrice,SpecialOfferID,UnitPriceDiscount,order_date_key,due_date_key,ship_date_key,product_key,salesperson_key,customer_key
0,43659,1,2024.994,1,0.0,20010701,20010713,20010708,281,6.0,676
1,43659,3,2024.994,1,0.0,20010701,20010713,20010708,282,6.0,676


In [37]:
#Adding a primary key for my fact table
df_fact_sales.insert(0, "fact_sales_key", range(1, df_fact_sales.shape[0]+1))
df_fact_sales.head(2)

Unnamed: 0,fact_sales_key,SalesOrderID,OrderQty,UnitPrice,SpecialOfferID,UnitPriceDiscount,order_date_key,due_date_key,ship_date_key,product_key,salesperson_key,customer_key
0,1,43659,1,2024.994,1,0.0,20010701,20010713,20010708,281,6.0,676
1,2,43659,3,2024.994,1,0.0,20010701,20010713,20010708,282,6.0,676


#### Write Dataframe back to the Database

In [39]:
set_dataframe(df_fact_sales, 'dim_fact_sales', 'fact_sales_key', 'insert', **mysql_args)

####  Demonstrate that the New Data Warehouse Exists and Contains the Correct Data

In [41]:
#This query can tell us what products are producing the most sales
sql_product_totals = """
SELECT 
    p.Name AS ProductName,
    SUM(fs.OrderQty * fs.UnitPrice) AS TotalSalesAmount
FROM 
    dim_fact_sales fs
JOIN 
    dim_products p ON fs.product_key = p.product_key
GROUP BY 
    p.Name
ORDER BY 
    TotalSalesAmount DESC;
"""
df_product_totals = get_sql_dataframe(sql_product_totals, **mysql_args)
df_product_totals.head(5)

Unnamed: 0,ProductName,TotalSalesAmount
0,"Mountain-200 Black, 38",4406151.0
1,"Mountain-200 Black, 42",4014068.0
2,"Mountain-200 Silver, 38",3696486.0
3,"Mountain-200 Silver, 42",3441293.0
4,"Mountain-200 Silver, 46",3436091.0


In [42]:
#This query can tell us which customers are purchasing the most
sql_customer_totals = """
SELECT 
    c.CustomerID,
    SUM(fs.OrderQty * fs.UnitPrice) AS TotalSalesAmount
FROM 
    dim_fact_sales fs
JOIN 
    dim_customers c ON fs.customer_key = c.customer_key
GROUP BY 
    c.CustomerID
ORDER BY 
    TotalSalesAmount DESC;
"""
df_customer_totals = get_sql_dataframe(sql_customer_totals, **mysql_args)
df_customer_totals.head(5)

Unnamed: 0,CustomerID,TotalSalesAmount
0,697,882276.4966
1,678,860147.511
2,170,853850.6395
3,328,817127.8029
4,514,803769.8509


In [43]:
#This query can tell us which salesperson is handling the most customers
sql_salesperson_sales = """
SELECT 
    sp.SalesPersonID,
    COUNT(DISTINCT f.customer_key) AS num_customers_handled
FROM 
    dim_fact_sales f
INNER JOIN 
    dim_salesperson sp ON f.salesperson_key = sp.salesperson_key
WHERE 
    f.customer_key IS NOT NULL
GROUP BY 
    sp.SalesPersonID
ORDER BY 
    num_customers_handled DESC;
"""
df_salesperson_sales = get_sql_dataframe(sql_salesperson_sales, **mysql_args)
df_salesperson_sales.head(10)

Unnamed: 0,SalesPersonID,num_customers_handled
0,277,121
1,275,118
2,279,74
3,276,69
4,282,67
5,285,62
6,268,44
7,278,38
8,281,35
9,286,34
