# DS-2002 Data Project 1: ETL Data Pipeline

### **Project Overview**
This project builds a **dimensional data mart** to analyze **retail sales transactions**.  
It integrates **structured (MySQL), semi-structured (MongoDB), and file-based (CSV) data** and transforms them for **OLAP (Online Analytical Processing) analysis**.

### **Business Process Chosen: Retail Sales**
The data mart is designed for a **retail business**, tracking **sales transactions** with information on **customers, products, and purchase dates**.

### **ETL Pipeline**
1. **Extract** data from:
   - MySQL (`AdventureWorks` retail database)
   - MongoDB (`customer_data` NoSQL database)
   - CSV file (`products.csv` from local file system)
2. **Transform** the data (cleaning, renaming, standardizing).
3. **Load** the processed data into `AdventureWorks_DW` (data warehouse).

### **OLTP vs. OLAP**
- **OLTP (Transactional Data)** → Stored in `AdventureWorks` MySQL.
- **OLAP (Aggregated Data for Analysis)** → Processed & loaded into `AdventureWorks_DW`.

### Documentation
- **Extract**: Data is sourced from MySQL (AdventureWorks), MongoDB (customer_data), and a CSV file (products.csv).
- **Transform**: Data is cleaned (removing nulls, standardizing column names) and mapped to the dimensional schema.
- **Load**: Data is loaded into `AdventureWorks_DW` with tables `dim_customers`, `dim_products`, `dim_date`, and `fact_sales`.
- **Deployment**: MySQL and MongoDB are hosted locally. All code is in this notebook, and data files are included in the GitHub repo.
- **Analysis**: SQL queries provide insights into customer spending and product category performance over time.

---

In [1]:
# importing all the necessary libraries 

import os
import re
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo 
from pymongo import MongoClient
import sqlalchemy
from sqlalchemy import create_engine, text

In [2]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")


Running SQL Alchemy Version: 2.0.38
Running PyMongo Version: 4.11.2


#### Declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases

In [3]:
# declare & Assign Connection Variables for the MongoDB Server, the MySQL Server & Databases
mysql_args = {
    "uid": "root",
    "pwd": "Newyear2024!",
    "hostname": "localhost",
    "port": "3306",
    "src_dbname": "adventureworks",
    "dst_dbname": "adventureworks_dw",
}

mongodb_args = {
    "cluster_location": "local",  # Using local MongoDB
    "db_name": "northwind_purchasing"
}

#### Define Functions for Getting Data From and Setting Data Into Databases

In [4]:
# helper function from lab 3 and 4
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL source database and return a Pandas DataFrame.'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}:{args['port']}/{args['src_dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    # Execute Query and Fetch Data
    dframe = pd.read_sql(text(sql_query), connection)
    connection.close()
    
    return dframe


def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Insert or update data in the MySQL destination database using Pandas DataFrame.'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}:{args['port']}/{args['dst_dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate MongoDB connection and return a client instance.'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = MongoClient(connect_str)
            
        elif args["cluster_location"] == "local":
            client = MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query={}):
    '''Query MongoDB, retrieve documents, and return a Pandas DataFrame.'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    
    # Drop MongoDB's default `_id` column
    if "_id" in dframe.columns:
        dframe.drop(["_id"], axis=1, inplace=True)
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    '''Load JSON data from a directory and insert into MongoDB collections.'''
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)  # Drop existing collection to avoid duplicates
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            file.insert_many(json_object)
        
    #mongo_client.close()
    


In [5]:
mongo_client = get_mongo_client(**mongodb_args)
df_products = get_mongo_dataframe(mongo_client, mongodb_args["db_name"], "products", {})
mongo_client.close() 

In [6]:
# tests adventureworks_dw exists before testing connection
engine = create_engine(f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}:{mysql_args['port']}")
with engine.connect() as connection:
    connection.execute(text("CREATE DATABASE IF NOT EXISTS adventureworks_dw"))
print(" 'adventureworks_dw' created successfully!")

 'adventureworks_dw' created successfully!


#### Test MongoDB Connection

In [7]:
# Test MySQL Connections for Source and Destination Databases
def test_mysql_connection(db_name, db_type="source"):
    """Test MySQL connection for a given database."""
    try:
        conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}:{mysql_args['port']}/{db_name}"
        with create_engine(conn_str).connect() as connection:
            print(f"Successfully connected to MySQL {db_type} database: {db_name}")
    except Exception as e:
        print(f"Error connecting to MySQL {db_type} database ({db_name}):", e)

# Test Source Database
test_mysql_connection(mysql_args['src_dbname'], db_type="source")

# Test Destination Database
test_mysql_connection(mysql_args['dst_dbname'], db_type="destination")


# Step 2: Ensure the Destination Database Exists and Use It
# Correct MySQL connection string with port included
conn_str = f"mysql+pymysql://{mysql_args['uid']}:{mysql_args['pwd']}@{mysql_args['hostname']}:{mysql_args['port']}/{mysql_args['src_dbname']}"
sqlEngine = create_engine(conn_str, pool_recycle=3600)

# Connect to MySQL and ensure the destination database exists
with sqlEngine.connect() as connection:
    # Create the database if it doesn't exist, then use it
    connection.execute(text(f"CREATE DATABASE IF NOT EXISTS `{mysql_args['dst_dbname']}`;"))
    connection.execute(text(f"USE `{mysql_args['dst_dbname']}`;"))

print("Database and schema setup completed successfully!")

# Step 3: Test MongoDB Connection
def test_mongo_connection(db_name):
    """Test MongoDB connection."""
    try:
        client = MongoClient("mongodb://localhost:27017/")  # Adjust if using a different URI
        db = client[db_name]
        db.command("ping")  # Ping MongoDB server
        print(f"Successfully connected to MongoDB database: {db_name}")
        client.close()
    except Exception as e:
        print(f"Error connecting to MongoDB database ({db_name}):", e)

# Test MongoDB Connection
test_mongo_connection(mongodb_args["db_name"])


Successfully connected to MySQL source database: adventureworks
Successfully connected to MySQL destination database: adventureworks_dw
Database and schema setup completed successfully!
Successfully connected to MongoDB database: northwind_purchasing


### Create and Populate Dimension tables

#### extract data

In [8]:
# SQL query to extract date_key and full_date from adventureworks_dw
sql_query = """
    SELECT date_key, full_date
    FROM adventureworks_dw.dim_date;
"""

# Extract data using the get_sql_dataframe function
df_dim_date = get_sql_dataframe(sql_query, **mysql_args)

# Convert 'full_date' to datetime with proper error handling and create a new column for the cleaned date
df_dim_date['cleaned_date'] = pd.to_datetime(df_dim_date['full_date'], errors='coerce').dt.date

# Display the first few rows for validation (showing both original and cleaned dates)
df_dim_date[['date_key', 'full_date', 'cleaned_date']].head(2)


Unnamed: 0,date_key,full_date,cleaned_date
0,20000101,2000-01-01,2000-01-01
1,20000102,2000-01-02,2000-01-02


In [9]:
# SQL query to extract customer data from the view
sql_customer_query = "SELECT * FROM dim_customers_vw;"

# Fetch the customer data using the get_sql_dataframe function
df_customer = get_sql_dataframe(sql_customer_query, **mysql_args)

# Check the columns of the dataframe
print("Columns in df_customer:", df_customer.columns)




Columns in df_customer: Index(['CustomerID', 'AccountNumber', 'CustomerType', 'AddressType',
       'AddressLine1', 'AddressLine2', 'City', 'StateProvinceCode',
       'State_Province', 'IsOnlyStateProvinceFlag', 'PostalCode',
       'CountryRegionCode', 'Country_Region', 'Sales Territory Group',
       'Sales Territory'],
      dtype='object')


#### Transformation

In [10]:
# transformation
df_customer.dropna(inplace=True)  

# Drop unnecessary columns (update based on actual columns in df_customer)
# Since columns like 'email_address', 'home_phone', etc., do not exist in df_customer,
# you can choose to drop any other columns you don't need.
drop_cols_customer = ['accountnumber', 'salesterritorygroup', 'salesterritory']  # Just an example
existing_cols_to_drop = [col for col in drop_cols_customer if col in df_customer.columns]
df_customer.drop(existing_cols_to_drop, axis=1, inplace=True)

# Rename columns (e.g., 'CustomerID' -> 'customer_id')
df_customer.rename(columns={"customerid": "customer_id"}, inplace=True)

# Insert surrogate keys (add a unique key for each row)
df_customer.insert(0, "customer_key", range(1, df_customer.shape[0] + 1))

# Display the first few rows to verify the transformation
df_customer.head(2)

Unnamed: 0,customer_key,CustomerID,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
4,1,4,AW00000004,S,Main Office,800 Interchange Blvd.,Suite 2501,Austin,TX,Texas,b'\x00',78701,US,United States,North America,Southwest
36,2,34,AW00000034,S,Main Office,"Science Park South, Birchwood",Stanford House,Warrington,ENG,England,b'\x01',WA3 7BH,GB,United Kingdom,Europe,United Kingdom


In [11]:
# SQL query to extract product data from the view
sql_product_query = "SELECT * FROM dim_products_vw;"

# Fetch the product data using the get_sql_dataframe function
df_product = get_sql_dataframe(sql_product_query, **mysql_args)



In [12]:
# transformation:
# Check the columns of the dataframe to verify what we have
print("Columns in df_product:", df_product.columns)

# Drop unnecessary columns from the product dimension (update with actual columns to drop)
# Adjusted based on available columns in the view
drop_cols_product = ['ProductNumber', 'Style', 'ProductSubcategory']  # Example: Modify as needed

# Drop columns only if they exist in the DataFrame
for col in drop_cols_product:
    if col in df_product.columns:
        df_product.drop(col, axis=1, inplace=True)
    else:
        print(f"Column '{col}' not found in the DataFrame.")

# Rename the "ProductID" column to reflect the entity (product) and use it as the business key
df_product.rename(columns={"ProductID": "product_id"}, inplace=True)

# Insert a new column with an ever-incrementing numeric value for the surrogate primary key
df_product.insert(0, "product_key", range(1, df_product.shape[0] + 1))

# Display the first 2 rows to validate
df_product.head(2)

Columns in df_product: Index(['ProductID', 'Name', 'ProductNumber', 'MakeFlag', 'FinishedGoodsFlag',
       'Color', 'SafetyStockLevel', 'ReorderPoint', 'StandardCost',
       'ListPrice', 'Size', 'SizeUnitMeasureCode', 'WeightUnitMeasureCode',
       'Weight', 'DaysToManufacture', 'ProductLine', 'Class', 'Style',
       'ProductCategory', 'ProductSubcategory', 'ProductModel',
       'SellStartDate', 'SellEndDate', 'DiscontinuedDate'],
      dtype='object')


Unnamed: 0,product_key,product_id,Name,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,ProductCategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
0,1,1,Adjustable Race,b'\x00',b'\x00',,1000,750,0.0,0.0,...,,,0,,,,,1998-06-01,NaT,
1,2,2,Bearing Ball,b'\x00',b'\x00',,1000,750,0.0,0.0,...,,,0,,,,,1998-06-01,NaT,


In [13]:
# SQL query to extract employee data from the view
sql_employee_query = "SELECT * FROM dim_employee_vw;"

# Fetch the employee data using the get_sql_dataframe function
df_employee = get_sql_dataframe(sql_employee_query, **mysql_args)




In [14]:
# transformation:
# Check the columns of the dataframe to verify what we have
print("Columns in df_employee:", df_employee.columns)

# Drop unnecessary columns from the employee dimension (update with actual columns)
# For example, drop 'EmailAddress' and 'Phone' (adjust according to what you want to drop)
drop_cols_employee = ['EmailAddress', 'Phone']  # Update as necessary

# Drop columns only if they exist in the DataFrame
for col in drop_cols_employee:
    if col in df_employee.columns:
        df_employee.drop(col, axis=1, inplace=True)
    else:
        print(f"Column '{col}' not found in the DataFrame.")

# Rename the "EmployeeID" column to reflect the entity (employee) and use it as the business key
df_employee.rename(columns={"EmployeeID": "employee_id"}, inplace=True)

# Insert a new column with an ever-incrementing numeric value for the surrogate primary key
df_employee.insert(0, "employee_key", range(1, df_employee.shape[0] + 1))

# Display the first 2 rows to validate
df_employee.head(2)

Columns in df_employee: Index(['EmployeeID', 'NationalIDNumber', 'LoginID', 'ManagerID', 'FirstName',
       'MiddleName', 'LastName', 'Title', 'EmailAddress', 'EmailPromotion',
       'Phone', 'BirthDate', 'MaritalStatus', 'Gender', 'HireDate',
       'SalariedFlag', 'VacationHours', 'SickLeaveHours', 'CurrentFlag'],
      dtype='object')


Unnamed: 0,employee_key,employee_id,NationalIDNumber,LoginID,ManagerID,FirstName,MiddleName,LastName,Title,EmailPromotion,BirthDate,MaritalStatus,Gender,HireDate,SalariedFlag,VacationHours,SickLeaveHours,CurrentFlag
0,1,1,14417807,adventure-works\guy1,16.0,Guy,R,Gilbert,Production Technician - WC60,0,1972-05-15,M,M,1996-07-31,b'\x00',21,30,b'\x01'
1,2,2,253022876,adventure-works\kevin0,6.0,Kevin,F,Brown,Marketing Assistant,2,1977-06-03,S,M,1997-02-26,b'\x00',42,41,b'\x01'


In [15]:
# SQL query to extract vendor data from the view
sql_vendor_query = "SELECT * FROM dim_vendors_vw;"

# Fetch the vendor data using the get_sql_dataframe function
df_vendor = get_sql_dataframe(sql_vendor_query, **mysql_args)




In [16]:
# Check the columns of the dataframe to verify what we have
print("Columns in df_vendor:", df_vendor.columns)

# Drop unnecessary columns from the vendor dimension (update based on available columns)
# Since the columns like 'email_address', 'phone', 'fax_number', 'notes', 'attachments' are not available,
# let's remove the relevant ones like 'AccountNumber' or 'AddressLine2' that may not be needed in your analysis.
drop_cols_vendor = ['AccountNumber', 'AddressLine2']  # Adjust as needed

# Drop columns only if they exist in the DataFrame
for col in drop_cols_vendor:
    if col in df_vendor.columns:
        df_vendor.drop(col, axis=1, inplace=True)
    else:
        print(f"Column '{col}' not found in the DataFrame.")

# Rename the "VendorID" column to reflect the entity (vendor) and use it as the business key
df_vendor.rename(columns={"VendorID": "vendor_id"}, inplace=True)

# Insert a new column with an ever-incrementing numeric value for the surrogate primary key
df_vendor.insert(0, "vendor_key", range(1, df_vendor.shape[0] + 1))

# Display the first 2 rows to validate
df_vendor.head(2)

Columns in df_vendor: Index(['VendorID', 'AccountNumber', 'Name', 'CreditRating',
       'PreferredVendorStatus', 'ActiveFlag', 'AddressType', 'AddressLine1',
       'AddressLine2', 'City', 'StateProvinceCode', 'State_Province',
       'PostalCode'],
      dtype='object')


Unnamed: 0,vendor_key,vendor_id,Name,CreditRating,PreferredVendorStatus,ActiveFlag,AddressType,AddressLine1,City,StateProvinceCode,State_Province,PostalCode
0,1,1,International,1,b'\x01',b'\x01',Main Office,683 Larch Ct.,Salt Lake City,UT,Utah,84101
1,2,2,Electronic Bike Repair & Supplies,1,b'\x01',b'\x01',Main Office,8547 Catherine Way,Tacoma,WA,Washington,98403


### Loading the transformed DataFrames in a new Data Warehouse

In [17]:
# List of DataFrames to load with their respective table names and primary key columns
tables_to_load = [
    ('dim_customer', df_customer, 'customer_key'),
    ('dim_product', df_product, 'product_key'),
    ('dim_vendor', df_vendor, 'vendor_key'),
    ('dim_employee', df_employee, 'employee_key'),
    ('dim_date', df_dim_date, 'date_key')
]

# Loop through each table, DataFrame, and primary key column to load the data
for table_name, dataframe, primary_key in tables_to_load:
    # Call the set_dataframe function to load the data into the new MySQL database
    set_dataframe(dataframe, table_name, primary_key, db_operation="insert", **mysql_args)

print("All transformed data successfully loaded into the new data warehouse!")


All transformed data successfully loaded into the new data warehouse!


### Fact Sales Order Tables

In [18]:
# Fact Sales Order Tables
# get data for salesorderheader
sql_sales_order_header = "SELECT * FROM adventureworks.salesorderheader;"
df_sales_order_header = get_sql_dataframe(sql_sales_order_header, **mysql_args)
df_sales_order_header.rename(columns={"salesorderid": "order_id"}, inplace=True)

# get data for salesorderdetail
sql_sales_order_detail = "SELECT * FROM adventureworks.salesorderdetail;"
df_sales_order_detail = get_sql_dataframe(sql_sales_order_detail, **mysql_args)
df_sales_order_detail.rename(columns={"salesorderdetailid": "order_detail_id"}, inplace=True)

# Re-run the query to get the customer dimension data
sql_dim_customer = "SELECT customer_key, CustomerID FROM adventureworks_dw.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_dim_customer, **mysql_args)




In [19]:
# Merge df_sales_order_header and df_sales_order_detail on SalesOrderID
df_fact_sales = pd.merge(df_sales_order_header, df_sales_order_detail, on='SalesOrderID', how='inner')

# Insert surrogate keys (for each row in the fact table)
df_fact_sales.insert(0, "sales_id", range(1, df_fact_sales.shape[0] + 1))

# Calculate total sales (OrderQty * UnitPrice)
df_fact_sales['total_sales'] = df_fact_sales['OrderQty'] * df_fact_sales['UnitPrice']


In [20]:
# Drop any unnecessary columns that might not be needed in the final fact table
# For example, if `rowguid_x` and `rowguid_y` are not needed:
df_fact_sales.drop(columns=["rowguid_x", "rowguid_y"], inplace=True)


In [21]:
# Fetch the Primary Key and Business Key from the Date Dimension Table
sql_dim_date = "SELECT date_key, full_date FROM adventureworks_dw.dim_date;"
df_dim_date = get_sql_dataframe(sql_dim_date, **mysql_args)

# Convert full_date to datetime and extract the date portion
df_dim_date.full_date = df_dim_date.full_date.astype('datetime64[ns]').dt.date
df_dim_date.head(2)

Unnamed: 0,date_key,full_date
0,20000101,2000-01-01
1,20000102,2000-01-02


In [22]:
# Ensure the 'OrderDate' column is in datetime format
df_fact_sales['OrderDate'] = pd.to_datetime(df_fact_sales['OrderDate']).dt.date  # Convert 'OrderDate' to datetime format

# Verify the columns in df_dim_date
print(df_dim_date.columns)  # Check if 'full_date' is the correct column

# Rename columns in df_dim_date for clarity
df_dim_order_date = df_dim_date.rename(columns={"date_key": "order_date_key", "full_date": "order_date"})

# Merge with the date dimension to get the surrogate primary key for the order date
df_fact_sales = pd.merge(df_fact_sales, df_dim_order_date, left_on='OrderDate', right_on='order_date', how='left')

# Drop the original 'OrderDate' column now that we have the surrogate key
df_fact_sales.drop(['OrderDate'], axis=1, inplace=True)

# Display the first few rows to verify the transformation
df_fact_sales.head(2)


Index(['date_key', 'full_date'], dtype='object')


Unnamed: 0,sales_id,SalesOrderID,RevisionNumber,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,...,OrderQty,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,ModifiedDate_y,total_sales,order_date_key,order_date
0,1,43659,1,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,1,776,1,2024.994,0.0,2024.994,2001-07-01,2024.994,20010701,2001-07-01
1,2,43659,1,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,3,777,1,2024.994,0.0,6074.982,2001-07-01,6074.982,20010701,2001-07-01


In [23]:
print(df_fact_sales.columns)

Index(['sales_id', 'SalesOrderID', 'RevisionNumber', 'DueDate', 'ShipDate',
       'Status', 'OnlineOrderFlag', 'SalesOrderNumber', 'PurchaseOrderNumber',
       'AccountNumber', 'CustomerID', 'ContactID', 'SalesPersonID',
       'TerritoryID', 'BillToAddressID', 'ShipToAddressID', 'ShipMethodID',
       'CreditCardID', 'CreditCardApprovalCode', 'CurrencyRateID', 'SubTotal',
       'TaxAmt', 'Freight', 'TotalDue', 'Comment', 'ModifiedDate_x',
       'SalesOrderDetailID', 'CarrierTrackingNumber', 'OrderQty', 'ProductID',
       'SpecialOfferID', 'UnitPrice', 'UnitPriceDiscount', 'LineTotal',
       'ModifiedDate_y', 'total_sales', 'order_date_key', 'order_date'],
      dtype='object')


In [24]:
# Lookup the Surrogate Primary Key for "order_date" (adjust as needed)
df_fact_sales['order_date'] = pd.to_datetime(df_fact_sales['order_date']).dt.date  # Convert 'order_date' to datetime and extract date part

# Rename the Date Dimension table columns for clarity
df_dim_order_date = df_dim_date.rename(columns={"date_key": "order_date_key", "full_date": "order_date"})

# Merge df_fact_sales with df_dim_order_date to get the surrogate key for order_date
df_fact_sales = pd.merge(df_fact_sales, df_dim_order_date, on='order_date', how='left')

# Drop the original "order_date" column after merging
df_fact_sales.drop(['order_date'], axis=1, inplace=True)

# Display the updated dataframe to verify the transformation
print(df_fact_sales.head(2))


   sales_id  SalesOrderID  RevisionNumber    DueDate   ShipDate  Status  \
0         1         43659               1 2001-07-13 2001-07-08       5   
1         2         43659               1 2001-07-13 2001-07-08       5   

  OnlineOrderFlag SalesOrderNumber PurchaseOrderNumber   AccountNumber  ...  \
0         b'\x00'          SO43659         PO522145787  10-4020-000676  ...   
1         b'\x00'          SO43659         PO522145787  10-4020-000676  ...   

   OrderQty  ProductID  SpecialOfferID  UnitPrice  UnitPriceDiscount  \
0         1        776               1   2024.994                0.0   
1         3        777               1   2024.994                0.0   

   LineTotal  ModifiedDate_y  total_sales order_date_key_x  order_date_key_y  
0   2024.994      2001-07-01     2024.994         20010701          20010701  
1   6074.982      2001-07-01     6074.982         20010701          20010701  

[2 rows x 38 columns]


In [25]:
print(df_fact_sales.columns)

Index(['sales_id', 'SalesOrderID', 'RevisionNumber', 'DueDate', 'ShipDate',
       'Status', 'OnlineOrderFlag', 'SalesOrderNumber', 'PurchaseOrderNumber',
       'AccountNumber', 'CustomerID', 'ContactID', 'SalesPersonID',
       'TerritoryID', 'BillToAddressID', 'ShipToAddressID', 'ShipMethodID',
       'CreditCardID', 'CreditCardApprovalCode', 'CurrencyRateID', 'SubTotal',
       'TaxAmt', 'Freight', 'TotalDue', 'Comment', 'ModifiedDate_x',
       'SalesOrderDetailID', 'CarrierTrackingNumber', 'OrderQty', 'ProductID',
       'SpecialOfferID', 'UnitPrice', 'UnitPriceDiscount', 'LineTotal',
       'ModifiedDate_y', 'total_sales', 'order_date_key_x',
       'order_date_key_y'],
      dtype='object')


In [26]:
# Merge with df_fact_sales using 'CustomerID' as the business key
df_fact_sales = pd.merge(df_fact_sales, df_dim_customer, left_on='CustomerID', right_on='CustomerID', how='left')

# Drop the 'CustomerID' column after the merge if needed
df_fact_sales.drop(['CustomerID'], axis=1, inplace=True)

# Display the updated dataframe to verify the transformation
df_fact_sales.head(2)


Unnamed: 0,sales_id,SalesOrderID,RevisionNumber,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,...,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,ModifiedDate_y,total_sales,order_date_key_x,order_date_key_y,customer_key
0,1,43659,1,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,776,1,2024.994,0.0,2024.994,2001-07-01,2024.994,20010701,20010701,
1,2,43659,1,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,777,1,2024.994,0.0,6074.982,2001-07-01,6074.982,20010701,20010701,


In [27]:
print(df_fact_sales.columns)

Index(['sales_id', 'SalesOrderID', 'RevisionNumber', 'DueDate', 'ShipDate',
       'Status', 'OnlineOrderFlag', 'SalesOrderNumber', 'PurchaseOrderNumber',
       'AccountNumber', 'ContactID', 'SalesPersonID', 'TerritoryID',
       'BillToAddressID', 'ShipToAddressID', 'ShipMethodID', 'CreditCardID',
       'CreditCardApprovalCode', 'CurrencyRateID', 'SubTotal', 'TaxAmt',
       'Freight', 'TotalDue', 'Comment', 'ModifiedDate_x',
       'SalesOrderDetailID', 'CarrierTrackingNumber', 'OrderQty', 'ProductID',
       'SpecialOfferID', 'UnitPrice', 'UnitPriceDiscount', 'LineTotal',
       'ModifiedDate_y', 'total_sales', 'order_date_key_x', 'order_date_key_y',
       'customer_key'],
      dtype='object')


In [28]:
# Query to get the customer key
sql_dim_customer = "SELECT customer_key FROM adventureworks_dw.dim_customer;"
df_dim_customer = get_sql_dataframe(sql_dim_customer, **mysql_args)

# Check the column names in the DataFrame
print(df_dim_customer.columns)

# Merge with df_fact_sales using 'customer_key' as the business key
df_fact_sales = pd.merge(df_fact_sales, df_dim_customer, left_on='customer_key', right_on='customer_key', how='left')

# Drop the 'customer_id' column after the merge if needed (it's not in df_fact_sales)
if 'customer_id' in df_fact_sales.columns:
    df_fact_sales.drop(['customer_id'], axis=1, inplace=True)
df_fact_sales.head(2)


Index(['customer_key'], dtype='object')


Unnamed: 0,sales_id,SalesOrderID,RevisionNumber,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,...,ProductID,SpecialOfferID,UnitPrice,UnitPriceDiscount,LineTotal,ModifiedDate_y,total_sales,order_date_key_x,order_date_key_y,customer_key
0,1,43659,1,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,776,1,2024.994,0.0,2024.994,2001-07-01,2024.994,20010701,20010701,
1,2,43659,1,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,777,1,2024.994,0.0,6074.982,2001-07-01,6074.982,20010701,20010701,


In [29]:
# Example for the employee dimension
sql_dim_employee = "SELECT employee_key, employee_id FROM adventureworks_dw.dim_employee;"
df_dim_employee = get_sql_dataframe(sql_dim_employee, **mysql_args)

# Merge with df_fact_sales using the SalesPersonID as the business key
df_fact_sales = pd.merge(df_fact_sales, df_dim_employee, left_on='SalesPersonID', right_on='employee_id', how='left')
df_fact_sales.drop(['SalesPersonID'], axis=1, inplace=True)

# Display the updated dataframe
df_fact_sales.head(2)


Unnamed: 0,sales_id,SalesOrderID,RevisionNumber,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,...,UnitPrice,UnitPriceDiscount,LineTotal,ModifiedDate_y,total_sales,order_date_key_x,order_date_key_y,customer_key,employee_key,employee_id
0,1,43659,1,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,2024.994,0.0,2024.994,2001-07-01,2024.994,20010701,20010701,,279.0,279.0
1,2,43659,1,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,2024.994,0.0,6074.982,2001-07-01,6074.982,20010701,20010701,,279.0,279.0


In [30]:
print(df_fact_sales.columns)

Index(['sales_id', 'SalesOrderID', 'RevisionNumber', 'DueDate', 'ShipDate',
       'Status', 'OnlineOrderFlag', 'SalesOrderNumber', 'PurchaseOrderNumber',
       'AccountNumber', 'ContactID', 'TerritoryID', 'BillToAddressID',
       'ShipToAddressID', 'ShipMethodID', 'CreditCardID',
       'CreditCardApprovalCode', 'CurrencyRateID', 'SubTotal', 'TaxAmt',
       'Freight', 'TotalDue', 'Comment', 'ModifiedDate_x',
       'SalesOrderDetailID', 'CarrierTrackingNumber', 'OrderQty', 'ProductID',
       'SpecialOfferID', 'UnitPrice', 'UnitPriceDiscount', 'LineTotal',
       'ModifiedDate_y', 'total_sales', 'order_date_key_x', 'order_date_key_y',
       'customer_key', 'employee_key', 'employee_id'],
      dtype='object')


In [31]:
# Example for the product dimension
sql_dim_product = "SELECT product_key, product_id FROM adventureworks_dw.dim_product;"
df_dim_product = get_sql_dataframe(sql_dim_product, **mysql_args)

# Merge with df_fact_sales using 'ProductID' from df_fact_sales and 'product_id' from df_dim_product
df_fact_sales = pd.merge(df_fact_sales, df_dim_product, left_on='ProductID', right_on='product_id', how='left')

# Drop the 'ProductID' column after the merge if needed
df_fact_sales.drop(['ProductID'], axis=1, inplace=True)
df_fact_sales.head(2)

Unnamed: 0,sales_id,SalesOrderID,RevisionNumber,DueDate,ShipDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,...,LineTotal,ModifiedDate_y,total_sales,order_date_key_x,order_date_key_y,customer_key,employee_key,employee_id,product_key,product_id
0,1,43659,1,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,2024.994,2001-07-01,2024.994,20010701,20010701,,279.0,279.0,281,776
1,2,43659,1,2001-07-13,2001-07-08,5,b'\x00',SO43659,PO522145787,10-4020-000676,...,6074.982,2001-07-01,6074.982,20010701,20010701,,279.0,279.0,282,777


### SQL Queries

In [32]:
sql_show_tables_query = "SHOW TABLES;"
df_tables = get_sql_dataframe(sql_show_tables_query, **mysql_args)
df_tables.head()  # Display the first few rows of the result


Unnamed: 0,Tables_in_adventureworks
0,address
1,addresstype
2,awbuildversion
3,billofmaterials
4,contact


In [33]:
# SQL query to calculate total sales per customer and product, with correct reference to ProductID
sql_sales_query = """
SELECT c.customer_key, c.CustomerID, p.product_key, p.Name AS product_name, 
       COUNT(*) AS total_orders, SUM(fs.total_sales) AS total_revenue
FROM adventureworks_dw.fact_sales AS fs
INNER JOIN adventureworks_dw.dim_customer AS c ON fs.customer_key = c.customer_key
INNER JOIN adventureworks_dw.dim_product AS p ON fs.ProductID = p.product_id  -- Corrected the join
GROUP BY c.customer_key, c.CustomerID, p.product_key, p.Name
ORDER BY total_revenue DESC
LIMIT 10;
"""

# Fetch the results and display the first 5 rows
df_sales_summary = get_sql_dataframe(sql_sales_query, **mysql_args)
display(df_sales_summary)


Unnamed: 0,customer_key,CustomerID,product_key,product_name,total_orders,total_revenue
0,1,4,298,"Road-250 Black, 44",8,51781.5675
1,1,4,301,"Road-250 Black, 58",8,46022.2425
2,18,340,288,"Mountain-200 Black, 42",8,45981.7635
3,1,4,300,"Road-250 Black, 52",8,43090.2225
4,1,4,299,"Road-250 Black, 48",8,42461.9325
5,7,90,298,"Road-250 Black, 44",5,37426.8868
6,7,90,481,"Road-350-W Yellow, 48",4,36316.1365
7,1,4,297,"Road-250 Red, 58",8,33613.515
8,7,90,294,"Road-250 Red, 44",3,30786.21
9,18,340,285,"Mountain-200 Silver, 42",5,29729.0148


In [34]:
# SQL query to calculate total sales, average order quantity per customer and product
sql_sales_query = """
SELECT 
    c.customer_key, 
    c.CustomerID, 
    p.product_key, 
    p.Name AS product_name, 
    COUNT(*) AS total_orders, 
    SUM(fs.TotalDue) AS total_revenue,
    AVG(fs.OrderQty) AS avg_order_qty
FROM 
    adventureworks_dw.fact_sales AS fs
INNER JOIN 
    adventureworks_dw.dim_customer AS c ON fs.customer_key = c.customer_key
INNER JOIN 
    adventureworks_dw.dim_product AS p ON fs.ProductID = p.product_id  -- Adjusted join to use ProductID
GROUP BY 
    c.customer_key, c.CustomerID, p.product_key, p.Name
ORDER BY 
    total_revenue DESC
LIMIT 10;
"""

# Fetch the results and display the first 5 rows
df_sales_summary = get_sql_dataframe(sql_sales_query, **mysql_args)
display(df_sales_summary)


Unnamed: 0,customer_key,CustomerID,product_key,product_name,total_orders,total_revenue,avg_order_qty
0,1,4,305,"Road-550-W Yellow, 44",8,780035.2121,3.375
1,1,4,297,"Road-250 Red, 58",8,780035.2121,3.0
2,1,4,298,"Road-250 Black, 44",8,780035.2121,4.75
3,1,4,299,"Road-250 Black, 48",8,780035.2121,3.875
4,1,4,300,"Road-250 Black, 52",8,780035.2121,3.875
5,1,4,301,"Road-250 Black, 58",8,780035.2121,4.125
6,1,4,302,"Road-550-W Yellow, 38",8,780035.2121,3.875
7,1,4,303,"Road-550-W Yellow, 40",8,780035.2121,3.25
8,1,4,304,"Road-550-W Yellow, 42",8,780035.2121,2.75
9,5,54,213,"Sport-100 Helmet, Black",12,713834.6957,6.8333


### Mongo DB

In [35]:
#mango client connection
mongo_client = get_mongo_client(**mongodb_args)

In [36]:
# MySQL Data Extraction for fact_sales, dim_customer, and dim_product
sql_fact_sales = "SELECT * FROM adventureworks_dw.fact_sales;"
df_fact_sales = get_sql_dataframe(sql_fact_sales, **mysql_args)

sql_dim_customer = "SELECT * FROM adventureworks_dw.dim_customer;"  
df_dim_customer = get_sql_dataframe(sql_dim_customer, **mysql_args) 

sql_dim_product = "SELECT * FROM adventureworks_dw.dim_product;"
df_dim_product = get_sql_dataframe(sql_dim_product, **mysql_args)


In [37]:
# Create a data directory and export MySQL data to JSON
data_dir = os.path.join(os.getcwd(), 'data')
os.makedirs(data_dir, exist_ok=True)

json_files = {
    "fact_sales_orders": "fact_sales_orders.json",
    "dim_customers": "dim_customers.json",
    "dim_products": "dim_products.json"
}

fact_sales_path = os.path.join(data_dir, json_files["fact_sales_orders"])
dim_customers_path = os.path.join(data_dir, json_files["dim_customers"])
dim_products_path = os.path.join(data_dir, json_files["dim_products"])

# Export data to JSON files
df_fact_sales.to_json(fact_sales_path, orient="records", indent=4)
df_dim_customer.to_json(dim_customers_path, orient="records", indent=4)  # Now this will work
df_dim_product.to_json(dim_products_path, orient="records", indent=4)

print(f" JSON Export Complete: {fact_sales_path} & {dim_customers_path} & {dim_products_path}")

 JSON Export Complete: /Users/pratistha/data/fact_sales_orders.json & /Users/pratistha/data/dim_customers.json & /Users/pratistha/data/dim_products.json


In [38]:
# Load JSON files into MongoDB collections
set_mongo_collections(mongo_client, mongodb_args["db_name"], data_dir, json_files)


In [39]:
# Confirm insertion by counting records in MongoDB collections
fact_sales_count = mongo_client[mongodb_args["db_name"]]["fact_sales_orders"].count_documents({})
dim_customers_count = mongo_client[mongodb_args["db_name"]]["dim_customers"].count_documents({})
dim_products_count = mongo_client[mongodb_args["db_name"]]["dim_products"].count_documents({})

In [40]:
# Retrieve a sample from MongoDB collections
df_mongo_sales = get_mongo_dataframe(mongo_client, mongodb_args["db_name"], "fact_sales_orders", {})
df_mongo_customers = get_mongo_dataframe(mongo_client, mongodb_args["db_name"], "dim_customers", {})
df_mongo_products = get_mongo_dataframe(mongo_client, mongodb_args["db_name"], "dim_products", {})

In [41]:
display(df_mongo_sales.head(), df_mongo_customers.head(), df_mongo_products.head())


Unnamed: 0,sales_id,SalesOrderID,RevisionNumber,DueDate,Status,OnlineOrderFlag,SalesOrderNumber,PurchaseOrderNumber,AccountNumber,ContactID,...,UnitPriceDiscount,LineTotal,ModifiedDate_y,total_sales,order_date_key_x,order_date_key_y,shipped_date_key,customer_key,employee_key,employee_id
0,1,43659,1,994982400000,5,�,SO43659,PO522145787,10-4020-000676,378,...,0.0,2024.994,993945600000,2024.994,20010701,20010701,20010708,,279.0,279.0
1,2,43659,1,994982400000,5,�,SO43659,PO522145787,10-4020-000676,378,...,0.0,6074.982,993945600000,6074.982,20010701,20010701,20010708,,279.0,279.0
2,3,43659,1,994982400000,5,�,SO43659,PO522145787,10-4020-000676,378,...,0.0,2024.994,993945600000,2024.994,20010701,20010701,20010708,,279.0,279.0
3,4,43659,1,994982400000,5,�,SO43659,PO522145787,10-4020-000676,378,...,0.0,2039.994,993945600000,2039.994,20010701,20010701,20010708,,279.0,279.0
4,5,43659,1,994982400000,5,�,SO43659,PO522145787,10-4020-000676,378,...,0.0,2039.994,993945600000,2039.994,20010701,20010701,20010708,,279.0,279.0


Unnamed: 0,customer_key,CustomerID,AccountNumber,CustomerType,AddressType,AddressLine1,AddressLine2,City,StateProvinceCode,State_Province,IsOnlyStateProvinceFlag,PostalCode,CountryRegionCode,Country_Region,Sales Territory Group,Sales Territory
0,1,4,AW00000004,S,Main Office,800 Interchange Blvd.,Suite 2501,Austin,TX,Texas,�,78701,US,United States,North America,Southwest
1,2,34,AW00000034,S,Main Office,"Science Park South, Birchwood",Stanford House,Warrington,ENG,England,,WA3 7BH,GB,United Kingdom,Europe,United Kingdom
2,3,50,AW00000050,S,Main Office,Bundesallee 9571,Rechnungsstelle - C 035,Berlin,BB,Brandenburg,�,14197,DE,Germany,Europe,Germany
3,4,51,AW00000051,S,Main Office,2-252 Beauchamp Road,Botany Bay Industrial Estate,Cloverdale,SA,South Australia,�,6105,AU,Australia,Pacific,Australia
4,5,54,AW00000054,S,Main Office,Attn: Accounts Payable,44 Main Place,Melville,NY,New York,�,11747,US,United States,North America,Northeast


Unnamed: 0,product_key,product_id,Name,MakeFlag,FinishedGoodsFlag,Color,SafetyStockLevel,ReorderPoint,StandardCost,ListPrice,...,WeightUnitMeasureCode,Weight,DaysToManufacture,ProductLine,Class,ProductCategory,ProductModel,SellStartDate,SellEndDate,DiscontinuedDate
0,1,1,Adjustable Race,�,�,,1000,750,0.0,0.0,...,,,0,,,,,896659200000,,
1,2,2,Bearing Ball,�,�,,1000,750,0.0,0.0,...,,,0,,,,,896659200000,,
2,3,3,BB Ball Bearing,,�,,800,600,0.0,0.0,...,,,1,,,,,896659200000,,
3,4,4,Headset Ball Bearings,�,�,,800,600,0.0,0.0,...,,,0,,,,,896659200000,,
4,5,316,Blade,,�,,800,600,0.0,0.0,...,,,1,,,,,896659200000,,


#### Export the data

In [42]:
# Ensure 'data' directory exists
os.makedirs(data_dir, exist_ok=True)

# Base filename
base_filename = "dim_customers"
existing_files = [f for f in os.listdir(data_dir) if re.match(rf"{base_filename}_v\d+\.csv", f)]

# Determine the next version number
if existing_files:
    existing_versions = [int(re.search(r"_v(\d+)\.csv", f).group(1)) for f in existing_files]
    next_version = max(existing_versions) + 1
else:
    next_version = 1

# Define the file path with versioned filename
csv_file_path = os.path.join(data_dir, f"{base_filename}_v{next_version}.csv")

# Extract dim_customers from MySQL
sql_dim_customers = "SELECT * FROM adventureworks_dw.dim_customer;"
df_dim_customers = get_sql_dataframe(sql_dim_customers, **mysql_args)

# Save as CSV with a versioned filename and escape character for handling special characters
df_dim_customers.to_csv(csv_file_path, index=False, escapechar='\\')

# Print confirmation message
print(f"CSV Export Complete: {csv_file_path}")


CSV Export Complete: /Users/pratistha/data/dim_customers_v4.csv
