In [None]:
# Cell 1: Import Necessary Libraries
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [None]:
# Cell 2: Print Library Versions (Optional for Verification)
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

In [None]:
# Cell 3: Declare & Assign Connection Variables for MySQL and MongoDB
mysql_args = {
    "uid": "root",  # MySQL username
    "pwd": "new_password",  #  MySQL password
    "hostname": "localhost",  # MySQL host
    "dbname": "project1_dw"  # target data warehouse database name
}

mongodb_args = {
    "user_name": "bob",  # Your MongoDB username
    "password": "bob",  # Your MongoDB password
    "cluster_name": "cluster0",  
    "cluster_subnet": "nqygc0x", 
    "cluster_location": "atlas",  # or "local"
    "db_name": "project1_data"  # MongoDB database name for source data
}

In [None]:
# Cell 4: Define Functions for Getting/Setting Data in MySQL and MongoDB
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection)
    connection.close()
    
    return dframe

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql() function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()

def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client

def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe

def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

In [None]:
# Cell 5: Create the Data Warehouse Database if it Doesn't Exist
conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
connection = sqlEngine.connect()

connection.execute(text(f"DROP DATABASE IF EXISTS `{mysql_args['dbname']}`;"))
connection.execute(text(f"CREATE DATABASE `{mysql_args['dbname']}`;"))
connection.execute(text(f"USE {mysql_args['dbname']};"))

connection.close()

In [None]:
# Cell 6: Load Customers Dimension from CSV File
# Assume 'customers.csv' is in the current working directory or specify the path
df_customers = pd.read_csv('customers.csv')

# Perform transformations (example: rename columns, drop unnecessary ones, add surrogate key)
# Adjust based on your CSV structure; this is a placeholder
df_customers.rename(columns={
    'LastName': 'last_name',  # Example renaming
    'FirstName': 'first_name',
    # Add more as needed
}, inplace=True)

# Modify number of columns (e.g., drop some, add derived ones)
# Example: Drop an unnecessary column if exists
if 'UnnecessaryColumn' in df_customers.columns:
    df_customers.drop('UnnecessaryColumn', axis=1, inplace=True)

# Add a surrogate key
df_customers.insert(0, 'customer_key', range(1, df_customers.shape[0] + 1))

# Load to MySQL dim_customers table
table_name = 'dim_customers'
primary_key = 'customer_key'
db_operation = 'insert'

set_dataframe(df_customers, table_name, primary_key, db_operation, **mysql_args)

# Verify
df_customers.head(2)

In [None]:
# Cell 7: Upload Products JSON to MongoDB and Load Products Dimension
client = get_mongo_client(**mongodb_args)

# Gets the path of the Current Working Directory for this Notebook, and then Appends the 'data' directory.
# Assume 'products.json' is in a 'data' subfolder
data_dir = os.path.join(os.getcwd())

json_files = {"products": 'products.json'}  # Adjust filename if needed

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)

In [None]:
# Cell 8: Extract Products from MongoDB into DataFrame and Load to Dim Products
client = get_mongo_client(**mongodb_args)

query = {}  # Select all elements (columns), and all documents (rows).
collection = "products"

df_products = get_mongo_dataframe(client, mongodb_args["db_name"], collection, query)

# Perform transformations (example: rename columns, drop unnecessary ones, add surrogate key)
# Adjust based on your JSON structure; this is a placeholder
df_products.rename(columns={
    'ProductName': 'product_name',  # Example
    'Category': 'category',
    # Add more as needed
}, inplace=True)

# Modify number of columns (e.g., drop some, add derived ones)
# Example: Drop an unnecessary column if exists
if 'UnnecessaryField' in df_products.columns:
    df_products.drop('UnnecessaryField', axis=1, inplace=True)

# Add a surrogate key
df_products.insert(0, 'product_key', range(1, df_products.shape[0] + 1))

# Load to MySQL dim_products table
table_name = 'dim_products'
primary_key = 'product_key'
db_operation = 'insert'

set_dataframe(df_products, table_name, primary_key, db_operation, **mysql_args)

# Verify
df_products.head(2)

In [None]:
print("Starting ETL for Date Dimension (Robust Method)...")
drop_table_sql = "DROP TABLE IF EXISTS `dim_date`"
create_table_sql = """
CREATE TABLE `dim_date` (
   `DateKey` INT NOT NULL, `Date` DATE NOT NULL, `Day` TINYINT NOT NULL, `DaySuffix` CHAR(2) NOT NULL,
   `Weekday` TINYINT NOT NULL, `WeekDayName` VARCHAR(10) NOT NULL, `WeekDayName_Short` CHAR(3) NOT NULL,
   `WeekDayName_FirstLetter` CHAR(1) NOT NULL, `DOWInMonth` TINYINT NOT NULL, `DayOfYear` SMALLINT NOT NULL,
   `WeekOfMonth` TINYINT NOT NULL, `WeekOfYear` TINYINT NOT NULL, `Month` TINYINT NOT NULL,
   `MonthName` VARCHAR(10) NOT NULL, `MonthName_Short` CHAR(3) NOT NULL, `MonthName_FirstLetter` CHAR(1) NOT NULL,
   `Quarter` TINYINT NOT NULL, `QuarterName` VARCHAR(6) NOT NULL, `Year` INT NOT NULL, `MMYYYY` CHAR(6) NOT NULL,
   `MonthYear` CHAR(8) NOT NULL, `IsWeekend` BIT NOT NULL, `IsHoliday` BIT NOT NULL,
   `HolidayName` VARCHAR(50) NULL, `SpecialDays` VARCHAR(50) NULL, `FirstDateofYear` DATE NULL,
   `LastDateofYear` DATE NULL, `FirstDateofMonth` DATE NULL, `LastDateofMonth` DATE NULL,
   PRIMARY KEY (`DateKey`)
)
"""
drop_procedure_sql = "DROP PROCEDURE IF EXISTS PopulateDimDate"
create_procedure_sql = """
CREATE PROCEDURE PopulateDimDate(IN StartDate DATE, IN EndDate DATE)
BEGIN
    DECLARE CurrentDate DATE;
    SET CurrentDate = StartDate;
    WHILE CurrentDate <= EndDate DO
        INSERT INTO `dim_date` (
            `DateKey`, `Date`, `Day`, `DaySuffix`, `Weekday`, `WeekDayName`, `WeekDayName_Short`, `WeekDayName_FirstLetter`,
            `DOWInMonth`, `DayOfYear`, `WeekOfMonth`, `WeekOfYear`, `Month`, `MonthName`, `MonthName_Short`, `MonthName_FirstLetter`,
            `Quarter`, `QuarterName`, `Year`, `MMYYYY`, `MonthYear`, `IsWeekend`, `IsHoliday`, `FirstDateofYear`, `LastDateofYear`,
            `FirstDateofMonth`, `LastDateofMonth`
        )
        SELECT
            DATE_FORMAT(CurrentDate, '%Y%m%d'), CurrentDate, DAY(CurrentDate),
            CASE WHEN DAY(CurrentDate) IN (1, 21, 31) THEN 'st' WHEN DAY(CurrentDate) IN (2, 22) THEN 'nd' WHEN DAY(CurrentDate) IN (3, 23) THEN 'rd' ELSE 'th' END,
            DAYOFWEEK(CurrentDate), DAYNAME(CurrentDate), UPPER(LEFT(DAYNAME(CurrentDate), 3)), LEFT(DAYNAME(CurrentDate), 1),
            DAYOFMONTH(CurrentDate), DAYOFYEAR(CurrentDate), FLOOR((DAYOFMONTH(CurrentDate) - 1) / 7) + 1, WEEKOFYEAR(CurrentDate),
            MONTH(CurrentDate), MONTHNAME(CurrentDate), UPPER(LEFT(MONTHNAME(CurrentDate), 3)), LEFT(MONTHNAME(CurrentDate), 1),
            QUARTER(CurrentDate),
            CASE QUARTER(CurrentDate) WHEN 1 THEN 'First' WHEN 2 THEN 'Second' WHEN 3 THEN 'Third' WHEN 4 THEN 'Fourth' END,
            YEAR(CurrentDate), DATE_FORMAT(CurrentDate, '%m%Y'), DATE_FORMAT(CurrentDate, '%Y-%b'),
            CASE WHEN DAYNAME(CurrentDate) IN ('Saturday', 'Sunday') THEN 1 ELSE 0 END,
            0, MAKEDATE(YEAR(CurrentDate), 1), STR_TO_DATE(CONCAT('12/31/', YEAR(CurrentDate)), '%m/%d/%Y'),
            DATE_FORMAT(CurrentDate, '%Y-%m-01'), LAST_DAY(CurrentDate);
        SET CurrentDate = DATE_ADD(CurrentDate, INTERVAL 1 DAY);
    END WHILE;
    UPDATE dim_date SET IsHoliday = 1, HolidayName = 'Christmas' WHERE Month = 12 AND Day = 25;
    UPDATE dim_date SET SpecialDays = 'Valentines Day' WHERE Month = 2 AND Day = 14;
END
"""
call_procedure_sql = "CALL PopulateDimDate('2000-01-01', '2030-12-31');"  # Expanded range to include 2001-07-01
conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}/{mysql_args['dbname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)
with sqlEngine.connect() as connection:
    print("Executing: DROP TABLE...")
    connection.execute(text(drop_table_sql))
       
    print("Executing: CREATE TABLE...")
    connection.execute(text(create_table_sql))
       
    print("Executing: DROP PROCEDURE...")
    connection.execute(text(drop_procedure_sql))
       
    print("Executing: CREATE PROCEDURE...")
    connection.execute(text(create_procedure_sql))
       
    print("Executing: CALL PROCEDURE...")
    connection.execute(text(call_procedure_sql))
       
    connection.commit()
print("\nSuccessfully created and populated 'dim_date' table.")

# Verify by querying a few rows from dim_date
df_date = get_sql_dataframe("SELECT * FROM dim_date LIMIT 2;", **mysql_args)
df_date

In [None]:
# Cell 10: Verify Structure of fact_sales_orders_vw View in adventureworks
print("Verifying structure of fact_sales_orders_vw view in adventureworks...")
# Temporarily connect to adventureworks database
conn_str_adventureworks = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}/adventureworks"
sqlEngine = create_engine(conn_str_adventureworks, pool_recycle=3600)
with sqlEngine.connect() as connection:
    sql_query = "DESCRIBE fact_sales_orders_vw;"
    df_view_structure = pd.read_sql(sql_query, connection)

    print("Columns in fact_sales_orders_vw view:")
    print(df_view_structure)

    # Optional: View a sample of the data
    sql_sample_query = "SELECT * FROM fact_sales_orders_vw LIMIT 5;"
    df_view_sample = pd.read_sql(sql_sample_query, connection)
    print("\nSample data from fact_sales_orders_vw view:")
    print(df_view_sample)

print("Verification complete.")

In [None]:
# Cell 11: Extract and Transform Data for Fact Table
# Extract from adventureworks.fact_sales_orders_vw
conn_str_adventureworks = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}/adventureworks"
sqlEngine_adventureworks = create_engine(conn_str_adventureworks, pool_recycle=3600)
with sqlEngine_adventureworks.connect() as connection:
    sql_query = "SELECT * FROM fact_sales_orders_vw;"
    df_orders = pd.read_sql(sql_query, connection)
    print("fact_sales_orders_vw columns and dtypes:")
    print(df_orders.dtypes)
    print("\nfact_sales_orders_vw sample (first 5 rows):")
    print(df_orders.head())

# Extract dimension keys from project1_dw
conn_str_project1_dw = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}/{mysql_args['dbname']}"
sqlEngine_project1_dw = create_engine(conn_str_project1_dw, pool_recycle=3600)
df_customers = get_sql_dataframe("SELECT CustomerID, customer_key FROM dim_customers;", **mysql_args)
df_products = get_sql_dataframe("SELECT ProductID, product_key FROM dim_products;", **mysql_args)
df_date = get_sql_dataframe("SELECT DateKey, Date FROM dim_date WHERE Date >= '2000-01-01' AND Date <= '2030-12-31';", **mysql_args)

# Convert Date column in df_date to datetime64[ns]
df_date['Date'] = pd.to_datetime(df_date['Date'])
# Convert OrderDate to datetime64[ns] and normalize
df_orders['OrderDate'] = pd.to_datetime(df_orders['OrderDate']).dt.normalize()

# Merge to get dimension keys (adjust column names based on fact_sales_orders_vw output)
df_fact = df_orders.merge(df_customers, left_on='CustomerID', right_on='CustomerID', how='left')
print("\nAfter customer merge:")
print(df_fact[['CustomerID', 'customer_key']].head())
df_fact = df_fact.merge(df_products, left_on='ProductID', right_on='ProductID', how='left')
print("\nAfter product merge:")
print(df_fact[['ProductID', 'product_key']].head())
df_fact = df_fact.merge(df_date, left_on='OrderDate', right_on='Date', how='left')
print("\nAfter date merge:")
print(df_fact[['OrderDate', 'Date', 'DateKey']].head())

# Transform: Select relevant columns and add fact key
df_fact = df_fact[['SalesOrderID', 'customer_key', 'product_key', 'DateKey', 'OrderQty', 'UnitPrice']]
df_fact.rename(columns={
    'SalesOrderID': 'order_id',
    'DateKey': 'date_key',
    'OrderQty': 'quantity',
    'UnitPrice': 'unit_price'
}, inplace=True)
df_fact.insert(0, 'fact_sales_key', range(1, df_fact.shape[0] + 1))

# Handle any NaN values (fill with 0 or drop if appropriate)
df_fact['customer_key'] = df_fact['customer_key'].fillna(0).astype(int)
df_fact['product_key'] = df_fact['product_key'].fillna(0).astype(int)
df_fact['date_key'] = df_fact['date_key'].fillna(0).astype(int)

# Calculate total amount (optional metric for the fact table)
df_fact['total_amount'] = df_fact['quantity'] * df_fact['unit_price']

print("\nFinal fact table data before load:")
print(df_fact.head())

In [None]:
# Cell 12: Load Fact Table into Data Warehouse
table_name = 'fact_sales'
primary_key = 'fact_sales_key'
db_operation = 'insert'

set_dataframe(df_fact, table_name, primary_key, db_operation, **mysql_args)

print(f"Fact table '{table_name}' loaded successfully.")

# Verify
df_fact_result = get_sql_dataframe(f"SELECT * FROM {table_name} LIMIT 10;", **mysql_args)
print("\nVerification of fact_sales after reload:")
print(df_fact_result)

In [None]:
# Cell 13: Demonstrate SQL Queries
print("Executing SQL queries to demonstrate data mart functionality...")

# Query 1: Total sales quantity and amount per customer with AccountNumber
sql_query1 = """
SELECT 
    c.customer_key, c.AccountNumber, 
    SUM(f.quantity) AS total_quantity, 
    SUM(f.total_amount) AS total_sales_amount
FROM fact_sales f
JOIN dim_customers c ON f.customer_key = c.customer_key
JOIN dim_date d ON f.date_key = d.DateKey
GROUP BY c.customer_key, c.AccountNumber
ORDER BY total_sales_amount DESC
LIMIT 5;
"""
df_query1 = get_sql_dataframe(sql_query1, **mysql_args)
print("\nQuery 1 Result: Total sales per customer with AccountNumber (top 5):")
print(df_query1)

# Query 2: Total quantity per product with ProductCategory
sql_query2 = """
SELECT 
    p.product_key, p.ProductCategory, 
    SUM(f.quantity) AS total_quantity
FROM fact_sales f
JOIN dim_products p ON f.product_key = p.product_key
JOIN dim_date d ON f.date_key = d.DateKey
GROUP BY p.product_key, p.ProductCategory
ORDER BY total_quantity DESC
LIMIT 5;
"""
df_query2 = get_sql_dataframe(sql_query2, **mysql_args)
print("\nQuery 2 Result: Total quantity per product with category (top 5):")
print(df_query2)

print("SQL queries executed successfully.")