# Project Overview

Dimensional data mart that represents a simple business involving customers, the products, and the vendors.

In [169]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server

In [170]:

mysql_args = {
    "uid" : "root",
    "pwd" : "Ashwaniis#1!",
    "hostname" : "localhost",
    "dbname" : "adventureworks"
}

mongodb_args = {
    "user_name" : "vaneeshagupta10",
    "password" : "Fdztq26kWFlyBXiE",
    "cluster_name" : "cluster0",
    "cluster_subnet" : "koqso",
    "cluster_location" : "atlas",
    "db_name" : "adventureworks_mongo"
}

#### Define Functions for Getting Data From and Setting Data Into Databases

In [164]:
import pandas as pd
from sqlalchemy import create_engine, text
import pymongo
import certifi
import json
import os

def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database and return a DataFrame'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    # Using context manager (with statement) to automatically manage connection
    with sqlEngine.connect() as connection:
        # Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame
        dframe = pd.read_sql(text(sql_query), connection)
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database and insert/update data'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    
    # Using context manager (with statement) to automatically manage connection
    with sqlEngine.connect() as connection:
        '''Invoke the Pandas DataFrame .to_sql() function to either create, or append to, a table'''
        if db_operation == "insert":
            df.to_sql(table_name, con=connection, index=False, if_exists='replace')
            connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
        elif db_operation == "update":
            df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    # No need to manually close the connection as it's handled by the context manager

def get_mongo_client(**args):
    '''Validate proper input and return a MongoClient instance'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    if args["cluster_location"] == "atlas":
        connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
        connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
        client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
        
    elif args["cluster_location"] == "local":
        client = pymongo.MongoClient("mongodb://localhost:27017/")
    
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB and return the results as a DataFrame'''
    db = mongo_client[db_name]
    
    # Query MongoDB and convert the results into a DataFrame
    dframe = pd.DataFrame(list(db[collection].find(query)))
    
    # Drop the '_id' column as it's not needed for the DataFrame
    if '_id' in dframe.columns:
        dframe.drop(['_id'], axis=1, inplace=True)
    
    # Return the DataFrame
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    '''Insert data into MongoDB collections from JSON files'''
    db = mongo_client[db_name]
    
    for file in json_files:
        # Drop the collection if it exists
        db.drop_collection(file)
        
        json_file = os.path.join(data_directory, json_files[file])
        
        # Load data from the JSON file
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            
            # Insert the data into the collection
            file_collection = db[file]
            result = file_collection.insert_many(json_object)
    
    # MongoClient remains open; it's not closed at the end of this function


#### Create the New Data Warehouse database, and to Use it, Switch the Connection Context.

In [165]:
conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

dst_dbname = 'data_mart'

connection.execute(text(f"DROP DATABASE IF EXISTS `{dst_dbname}`;"))
connection.execute(text(f"CREATE DATABASE `{dst_dbname}`;"))
connection.execute(text(f"USE {dst_dbname};"))

connection.close()

#### Data Extraction

In [166]:
# MySQL Data Extraction (product)
sql_product = "SELECT ProductID, Name, ProductNumber, ListPrice FROM product;"
df_product = get_sql_dataframe(sql_product, **mysql_args)
df_product.head()


Unnamed: 0,ProductID,Name,ProductNumber,ListPrice
0,1,Adjustable Race,AR-5381,0.0
1,2,Bearing Ball,BA-8327,0.0
2,3,BB Ball Bearing,BE-2349,0.0
3,4,Headset Ball Bearings,BE-2908,0.0
4,316,Blade,BL-2036,0.0


In [None]:
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook,
# and then Appends the 'data' directory.
data_dir = os.path.join(os.getcwd(), 'data')

# Add customers.json to the dictionary
json_files = {
    "customers": "customer.json"
}

# Populate MongoDB
set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)


In [None]:
# MongoDB Data Extraction (customer)

client = get_mongo_client(**mongodb_args)

query = {} # Select all elements (columns), and all documents (rows).
collection = "customers"

df_customers = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)
df_customers.head()

Unnamed: 0,CustomerID,TerritoryID,AccountNumber,CustomerType,rowguid,ModifiedDate
0,1,1,AW00000001,S,,2004-10-13 11:15:07
1,2,1,AW00000002,S,,2004-10-13 11:15:07
2,3,4,AW00000003,S,,2004-10-13 11:15:07
3,4,4,AW00000004,S,,2004-10-13 11:15:07
4,5,4,AW00000005,S,,2004-10-13 11:15:07


In [183]:
# Local file data extraction (vendor)

# Define the path to the CSV file
data_dir = os.path.join(os.getcwd(), 'data')
csv_file_path = os.path.join(data_dir, 'vendor.csv')

# Read CSV into DataFrame
df_vendor = pd.read_csv(csv_file_path)

# Display first few rows
df_vendor.head()

Unnamed: 0,VendorID,AccountNumber,Name,CreditRating,PreferredVendorStatus,ActiveFlag,PurchasingWebServiceURL,ModifiedDate
0,1,INTERNAT0001,International,1,1,1,,2002-02-25 00:00:00
1,2,ELECTRON0002,Electronic Bike Repair & Supplies,1,1,1,,2002-02-17 00:00:00
2,3,PREMIER0001,"Premier Sport, Inc.",1,1,1,,2002-03-05 00:00:00
3,4,COMFORT0001,Comfort Road Bicycles,1,1,1,,2002-01-24 00:00:00
4,5,METROSP0001,Metro Sport Equipment,1,1,1,,2002-03-01 00:00:00


In [None]:
# Extracting date dimension table
sql_dim_date = "SELECT date_key, full_date FROM data_mart.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head()

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02
2,20000103,2000-01-03
3,20000104,2000-01-04
4,20000105,2000-01-05


#### Fact Table

In [None]:
# Extracting SalesOrderHeader (fact table)
sql_sales_order_header = "SELECT SalesOrderID, CustomerID, OrderDate, TotalDue FROM adventureworks.SalesOrderHeader;"
df_sales_order_header = get_sql_dataframe(sql_sales_order_header, **mysql_args)

# Preview the first 2 rows to verify the data
df_sales_order_header.head(2)



Unnamed: 0,SalesOrderID,CustomerID,OrderDate,TotalDue
0,43659,676,2001-07-01,27231.5495
1,43660,117,2001-07-01,1716.1794


#### Looking up Primary Keys and Merging Fact Table with the Dimension Tables

In [None]:
# Extracting SalesOrderDetail to get the ProductID
df_sales_order_detail = get_sql_dataframe("SELECT SalesOrderID, ProductID, OrderQty, LineTotal FROM adventureworks.SalesOrderDetail;", **mysql_args)
df_fact_orders = pd.merge(df_sales_order_header, df_sales_order_detail, on='SalesOrderID', how='left')

# Check the merged result
df_fact_orders.head(2)




Unnamed: 0,SalesOrderID,CustomerID,OrderDate,TotalDue,ProductID,OrderQty,LineTotal
0,43659,676,2001-07-01,27231.5495,776,1,2024.994
1,43659,676,2001-07-01,27231.5495,777,3,6074.982


In [None]:
# Rename for consistency
df_fact_orders.rename(columns={"CustomerID": "customer_id"}, inplace=True)
df_customers.rename(columns={"CustomerID": "customer_id"}, inplace=True)

# Merge the SalesOrderHeader (fact table) with the Customer dimension
df_sales_order_header = pd.merge(df_fact_orders, df_customers, on='customer_id', how='left')

# Check the result
df_sales_order_header.head(2)





Unnamed: 0,SalesOrderID,customer_id,OrderDate,TotalDue,ProductID,OrderQty,LineTotal,TerritoryID,AccountNumber,CustomerType,rowguid,ModifiedDate
0,43659,676,2001-07-01,27231.5495,776,1,2024.994,5.0,AW00000676,S,,2004-10-13 11:15:07
1,43659,676,2001-07-01,27231.5495,777,3,6074.982,5.0,AW00000676,S,,2004-10-13 11:15:07


In [None]:
# Convert 'OrderDate' and 'full_date' to datetime64[ns] for both DataFrames
df_fact_orders['OrderDate'] = pd.to_datetime(df_fact_orders['OrderDate'])
df_dim_date['full_date'] = pd.to_datetime(df_dim_date['full_date'])

# Merge the fact orders DataFrame with the Date dimension
df_fact_orders = pd.merge(df_fact_orders, df_dim_date, left_on='OrderDate', right_on='full_date', how='left')

# Drop 'full_date' column (since redundant)
df_fact_orders.drop(['full_date'], axis=1, inplace=True)

# Check the result
df_fact_orders.head(2)




Unnamed: 0,SalesOrderID,customer_id,OrderDate,TotalDue,ProductID,OrderQty,LineTotal,date_key
0,43659,676,2001-07-01,27231.5495,776,1,2024.994,20010701
1,43659,676,2001-07-01,27231.5495,777,3,6074.982,20010701


In [None]:
# Define the SQL query to retrieve ProductID and VendorID from the productvendor table
sql_product_vendor = "SELECT ProductID, VendorID FROM productvendor;"

# Execute the SQL query to fetch the data into a DataFrame
df_product_vendor = get_sql_dataframe(sql_product_vendor, **mysql_args)

# Merge df_fact_orders with df_product_vendor on 'ProductID' to add the 'VendorID' column
df_fact_orders = pd.merge(df_fact_orders, df_product_vendor, on='ProductID', how='left')

# Drop redundant 'VendorID_x' and 'VendorID_y' columns after the merge and keep the 'VendorID_y' from productvendor (rename it to 'VendorID')
df_fact_orders['VendorID'] = df_fact_orders['VendorID_y']

# Drop the 'VendorID_y' column, as it's no longer needed
df_fact_orders.drop(['VendorID_x', 'VendorID_y'], axis=1, inplace=True)

# Check the final DataFrame
df_fact_orders.head()



Unnamed: 0,SalesOrderID,customer_id,OrderDate,TotalDue,ProductID,OrderQty,LineTotal,date_key,VendorID
0,43659,676,2001-07-01,27231.5495,776,1,2024.994,20010701,
1,43659,676,2001-07-01,27231.5495,777,3,6074.982,20010701,
2,43659,676,2001-07-01,27231.5495,778,1,2024.994,20010701,
3,43659,676,2001-07-01,27231.5495,771,1,2039.994,20010701,
4,43659,676,2001-07-01,27231.5495,772,1,2039.994,20010701,


#### More Transformations

In [185]:
# Define the columns to keep for each dimension table
dim_customer_cols = ['customer_id', 'CustomerType', 'AccountNumber']
dim_vendor_cols = ['VendorID','Name']
dim_product_cols = ['ProductID', 'Name', 'ProductNumber', 'ListPrice']

# Drop unnecessary columns
df_dim_customer = df_customers[dim_customer_cols]
df_dim_vendor = df_vendor[dim_vendor_cols]
df_dim_product = df_product[dim_product_cols]

# Display first few rows to verify
df_dim_customer.head(), df_dim_vendor.head(), df_dim_product.head()

(   customer_id CustomerType AccountNumber
 0            1            S    AW00000001
 1            2            S    AW00000002
 2            3            S    AW00000003
 3            4            S    AW00000004
 4            5            S    AW00000005,
    VendorID                               Name
 0         1                      International
 1         2  Electronic Bike Repair & Supplies
 2         3                Premier Sport, Inc.
 3         4              Comfort Road Bicycles
 4         5              Metro Sport Equipment,
    ProductID                   Name ProductNumber  ListPrice
 0          1        Adjustable Race       AR-5381        0.0
 1          2           Bearing Ball       BA-8327        0.0
 2          3        BB Ball Bearing       BE-2349        0.0
 3          4  Headset Ball Bearings       BE-2908        0.0
 4        316                  Blade       BL-2036        0.0)

In [189]:
# Standardizing ID column names
df_fact_orders.rename(columns={
    "SalesOrderID": "sales_order_id",
    "CustomerID": "customer_id",
    "ProductID": "product_id",
    "VendorID": "vendor_id",
    "OrderDate": "order_date",
    "OrderQty": "order_qty",
    "LineTotal": "line_total",
    "TotalDue": "total_due",
    "date_key": "order_date_key"
}, inplace=True)

# Rordering the columns
column_order = ['sales_order_id', 'customer_id', 'order_date', 'order_date_key', 'product_id', 
                'vendor_id', 'order_qty', 'line_total', 'total_due']
df_fact_orders = df_fact_orders[column_order]


df_fact_orders


Unnamed: 0,sales_order_id,customer_id,order_date,order_date_key,product_id,vendor_id,order_qty,line_total,total_due
0,43659,676,2001-07-01,20010701,776,0.0,1,2024.9940,27231.5495
12,43660,117,2001-07-01,20010701,762,0.0,1,419.4589,1716.1794
14,43661,442,2001-07-01,20010701,745,0.0,1,809.7600,43561.4424
29,43662,227,2001-07-01,20010701,764,0.0,3,1258.3767,38331.9613
51,43663,510,2001-07-01,20010701,760,0.0,1,419.4589,556.2026
...,...,...,...,...,...,...,...,...,...
684964,75119,11981,2004-07-31,20040731,921,98.0,1,4.9900,46.7194
685030,75120,18749,2004-07-31,20040731,878,0.0,1,21.9800,93.8808
685033,75121,15251,2004-07-31,20040731,921,98.0,1,4.9900,82.8529
685099,75122,15868,2004-07-31,20040731,878,0.0,1,21.9800,34.2219


#### Write the DataFrames Back to the Database

In [190]:
table_name = "fact_orders"
primary_key = "sales_order_id"
db_operation = "insert"

# Modify the database to point to the destination database
mysql_args["dbname"] = "data_mart"

# Handle missing data
df_fact_orders['vendor_id'].fillna(0, inplace=True)  # Replace NaN with 0
df_fact_orders['order_qty'].fillna(0, inplace=True)  # Replace NaN with 0

# Remove duplicates based on the primary key column
df_fact_orders = df_fact_orders.drop_duplicates(subset='sales_order_id', keep='first')

# Ensure that the column types are correct
df_fact_orders['order_qty'] = df_fact_orders['order_qty'].astype(int)

# Call the set_dataframe function to insert the data into the correct table in 'data_mart'
set_dataframe(df_fact_orders, table_name, primary_key, db_operation, **mysql_args)



The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_fact_orders['vendor_id'].fillna(0, inplace=True)  # Replace NaN with 0
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_fact_orders['order_qty'].fillna(0, inplace=True)  # Replace NaN with 0


In [192]:
set_dataframe(df_dim_customer, table_name="dim_customer", pk_column="customer_id", db_operation="insert", **mysql_args)
set_dataframe(df_dim_vendor, table_name="dim_vendor", pk_column="VendorID", db_operation="insert", **mysql_args)
set_dataframe(df_dim_product, table_name="dim_product", pk_column="ProductID", db_operation="insert", **mysql_args)

#### SQL Queries

In [207]:
sql_sales = """
SELECT
    c.customer_id,
    d.full_date AS OrderDate,
    SUM(fo.total_due) AS TotalSalesAmount,
    AVG(fo.order_qty) AS AvgOrderQuantity,
    COUNT(fo.sales_order_id) AS TotalOrders
FROM
    fact_orders fo
JOIN
    dim_customer c ON fo.customer_id = c.customer_id
JOIN
    dim_date d ON fo.order_date_key = d.date_key
GROUP BY
    c.customer_id, d.full_date
ORDER BY
    d.full_date;


"""

# Execute the query
df_sales = get_sql_dataframe(sql_sales, **mysql_args)
df_sales


Unnamed: 0,customer_id,OrderDate,TotalSalesAmount,AvgOrderQuantity,TotalOrders
0,676,2001-07-01,27231.5495,1.0,1
1,117,2001-07-01,1716.1794,1.0,1
2,442,2001-07-01,43561.4424,1.0,1
3,227,2001-07-01,38331.9613,3.0,1
4,510,2001-07-01,556.2026,1.0,1
...,...,...,...,...,...
4821,11118,2004-07-29,8.7848,1.0,1
4822,11185,2004-07-29,87.8144,1.0,1
4823,11176,2004-07-30,27.6140,1.0,1
4824,11078,2004-07-31,132.6000,1.0,1
