## DS-2002 Data Project 1: AdventureWorks ETL Data Processor

This notebook builds a dimensional data mart for a simple business process using the AdventureWorks OLTP database (MySQL), a MongoDB Atlas cluster, and local file data. It demonstrates Extract-Transform-Load (ETL) pipelines from multiple sources into an OLAP-optimized schema, and validates the mart with example analytical queries.

Sources used:
- MySQL
- MongoDB Atlas (supplemental documents)
- Local file system (CSV, JSON)


In [None]:
# Import libraries
import os
import json
import numpy as np
import pandas as pd
import datetime
import sqlalchemy
from sqlalchemy import create_engine, text

import pymongo
import certifi


In [None]:
# Connection configuration (update passwords before running)
mysql = {
    "uid": "root",
    "pwd": "<YOUR_PASSWORD>",
    "hostname": "localhost",
    "src_db": "adventureworks",
    "dw_db": "adventureworks_dw"
}

mongo = {
    "user_name": "sebastianshirazi",
    "password": "<YOUR_PASSWORD>",
    "cluster_name": "Cluster0",
    "cluster_subnet": "wanj3eo",
    "cluster_location": "atlas",  # or "local"
    "db_name": "adventureworks_docs"
}

# Data is kept in "Data" folder
DATA_DIR = os.path.join(os.getcwd(), "data")


### For getting and Setting Data to the Database

In [45]:
def get_sql_dataframe(sql_query: str, *, db: str = None):
    conn_db = db if db else mysql["src_db"]
    conn_str = f"mysql+pymysql://{mysql['uid']}:{mysql['pwd']}@{mysql['hostname']}/{conn_db}"
    engine = create_engine(conn_str, pool_recycle=3600)
    with engine.connect() as conn:
        df = pd.read_sql(text(sql_query), conn)
    return df


def set_dataframe(df: pd.DataFrame, table_name: str, pk_column: str, *, db: str, if_exists: str = "replace"):
    conn_str = f"mysql+pymysql://{mysql['uid']}:{mysql['pwd']}@{mysql['hostname']}/{db}"
    engine = create_engine(conn_str, pool_recycle=3600)
    with engine.connect() as conn:
        df.to_sql(table_name, con=conn, index=False, if_exists=if_exists)
        if if_exists == "replace" and pk_column:
            conn.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))


def exec_sql(sql: str, *, db: str = None):
    conn_db = db if db else mysql["src_db"]
    conn_str = f"mysql+pymysql://{mysql['uid']}:{mysql['pwd']}@{mysql['hostname']}/{conn_db}"
    engine = create_engine(conn_str, pool_recycle=3600)
    with engine.connect() as conn:
        for stmt in [s.strip() for s in sql.split(';') if s.strip()]:
            conn.execute(text(stmt))


def get_mongo_client():
    if mongo["cluster_location"] == "atlas":
        uri = (
            f"mongodb+srv://{mongo['user_name']}:{mongo['password']}@"
            f"{mongo['cluster_name']}.{mongo['cluster_subnet']}.mongodb.net"
        )
        return pymongo.MongoClient(uri, tlsCAFile=certifi.where())
    return pymongo.MongoClient("mongodb://localhost:27017/")


def get_mongo_dataframe(collection: str, query: dict = None, *, db_name: str = None):
    client = get_mongo_client()
    try:
        database = client[db_name or mongo["db_name"]]
        df = pd.DataFrame(list(database[collection].find(query or {})))
        if "_id" in df.columns:
            df.drop(["_id"], axis=1, inplace=True)
        return df
    finally:
        client.close()


### Create / Reset Data Warehouse schema
We create a lightweight dimensional mart: `adventureworks_dw` with required dimensions and a sales fact.


In [46]:
# Create DW schema if not exists
root_conn = f"mysql+pymysql://{mysql['uid']}:{mysql['pwd']}@{mysql['hostname']}"
engine = create_engine(root_conn)
with engine.connect() as conn:
    conn.execute(text("CREATE DATABASE IF NOT EXISTS adventureworks_dw;"))
    conn.execute(text("USE adventureworks_dw;"))


### Build Date Dimension
The Date dimension enables time-based analysis (year, quarter, month, day).


In [47]:
# Determine date range from source OLTP
sql_minmax = """
SELECT 
  MIN(DATE(OrderDate)) AS min_date,
  GREATEST(MAX(DATE(ShipDate)), MAX(DATE(DueDate))) AS max_date
FROM salesorderheader;
"""
mm = get_sql_dataframe(sql_minmax, db=mysql["src_db"])
start_date = pd.to_datetime(mm.loc[0, "min_date"]).date()
end_date = pd.to_datetime(mm.loc[0, "max_date"]).date()

# Safety buffer in case of nulls or gaps
if pd.isna(start_date) or pd.isna(end_date):
    start_date = datetime.date(2000, 1, 1)
    end_date = datetime.date(2010, 12, 31)

all_days = pd.date_range(start=start_date, end=end_date, freq="D")

df_date = pd.DataFrame({"full_date": all_days})
df_date["date_key"] = df_date["full_date"].dt.strftime("%Y%m%d").astype(int)
df_date["year"] = df_date["full_date"].dt.year
df_date["quarter"] = df_date["full_date"].dt.quarter
df_date["month"] = df_date["full_date"].dt.month
df_date["day"] = df_date["full_date"].dt.day
df_date["day_name"] = df_date["full_date"].dt.day_name()
df_date["month_name"] = df_date["full_date"].dt.month_name()
df_date["week_of_year"] = df_date["full_date"].dt.isocalendar().week.astype(int)
df_date["is_weekend"] = df_date["day_name"].isin(["Saturday", "Sunday"]) 

# Reorder
cols = ["date_key", "full_date", "year", "quarter", "month", "day", "day_name", "month_name", "week_of_year", "is_weekend"]
df_date = df_date[cols]

df_date.head(3)


Unnamed: 0,date_key,full_date,year,quarter,month,day,day_name,month_name,week_of_year,is_weekend
0,20010701,2001-07-01,2001,3,7,1,Sunday,July,26,True
1,20010702,2001-07-02,2001,3,7,2,Monday,July,27,False
2,20010703,2001-07-03,2001,3,7,3,Tuesday,July,27,False


In [None]:
# Load Date dimension for the fact table
set_dataframe(df_date, "dim_date", "date_key", db=mysql["dw_db"], if_exists="replace")
get_sql_dataframe("SELECT COUNT(*) AS num_rows FROM dim_date;", db=mysql["dw_db"])


Unnamed: 0,num_rows
0,1139


### Dimensional model and business process

- Business process: Retail sales orders (from AdventureWorks).
- Fact: `fact_sales` at line-item grain (one row per `SalesOrderID` + `SalesOrderDetailID`).
- Dimensions:
  - `dim_date` (existing): calendar attributes keyed by `date_key` (YYYYMMDD).
  - `dim_product`: product details with category and subcategory, enriched with file and Mongo data.
  - `dim_customer`: unified customer entity (store or individual) with territory attributes.

Sources used:
- MySQL OLTP (AdventureWorks): core sales, product, customer tables.
- File system (CSV): `product_attributes.csv` with marketing attributes.
- MongoDB Atlas: `product_reviews_external` with external review metrics.

Load targets (MySQL schema `adventureworks_dw`):
- `dim_date`, `dim_product`, `dim_customer`, `fact_sales`.

Validation: example analytical queries aggregate `fact_sales` by date, product category, and customer.


In [49]:
# Build base dim_product from MySQL (product + subcategory + category)
sql_dim_product_base = """
SELECT 
  p.ProductID,
  p.Name AS product_name,
  p.ProductNumber,
  p.Color,
  p.ListPrice,
  p.ProductSubcategoryID,
  sc.Name AS subcategory_name,
  sc.ProductCategoryID,
  c.Name AS category_name
FROM product p
LEFT JOIN productsubcategory sc ON p.ProductSubcategoryID = sc.ProductSubcategoryID
LEFT JOIN productcategory c ON sc.ProductCategoryID = c.ProductCategoryID;
"""

prod_base = get_sql_dataframe(sql_dim_product_base, db=mysql["src_db"])
prod_base.head(3)


Unnamed: 0,ProductID,product_name,ProductNumber,Color,ListPrice,ProductSubcategoryID,subcategory_name,ProductCategoryID,category_name
0,1,Adjustable Race,AR-5381,,0.0,,,,
1,2,Bearing Ball,BA-8327,,0.0,,,,
2,3,BB Ball Bearing,BE-2349,,0.0,,,,


In [None]:
# Enrich base with local CSV attributes and load dim_product
csv_path = os.path.join(DATA_DIR, "product_attributes.csv")
attrs = pd.read_csv(csv_path)
prod_enriched = prod_base.merge(attrs, on="ProductID", how="left")

# Define final columns and types
prod_enriched["OnlineOnly"] = prod_enriched["OnlineOnly"].fillna(False).astype(bool)

cols = [
    "ProductID", "product_name", "ProductNumber", "Color", "ListPrice",
    "subcategory_name", "category_name",
    "MarketingSegment", "BrandTier", "OnlineOnly", "Season", "LaunchYear"
]
prod_final = prod_enriched[cols].copy()

# Load to DW
set_dataframe(prod_final, "dim_product", "ProductID", db=mysql["dw_db"], if_exists="replace")
get_sql_dataframe("SELECT COUNT(*) AS num_rows FROM dim_product;", db=mysql["dw_db"])


Unnamed: 0,num_rows
0,504


### Export SQL data to local JSON (for MongoDB load)
We will export product review aggregates to JSON files under `data/` and then load them into MongoDB Atlas.


In [None]:
# Build product review average from MySQL and export to JSON
sql_reviews = """
SELECT 
  ProductID,
  COUNT(*) AS num_reviews_sql,
  AVG(Rating) AS avg_rating_sql
FROM productreview
GROUP BY ProductID;
"""

reviews_df = get_sql_dataframe(sql_reviews, db=mysql["src_db"])
reviews_df["avg_rating_sql"] = reviews_df["avg_rating_sql"].round(2)

json_path = os.path.join(DATA_DIR, "product_reviews.json")
reviews_records = reviews_df.to_dict(orient="records")
with open(json_path, "w") as f:
    json.dump(reviews_records, f, indent=2)

len(reviews_records), json_path


(3, '/Users/sebastian/ds2002-midterm/data/product_reviews.json')

### Load local JSON into MongoDB Atlas
This uses the Mongo connection above. It writes documents to a collection `product_review_aggregates` in database `adventureworks_docs`.


In [61]:
# Insert local JSON records into MongoDB Atlas
client = get_mongo_client()
try:
    db_m = client[mongo["db_name"]]
    coll = db_m["product_review_aggregates"]
    # Drop and reload for idempotency
    coll.drop()
    with open(os.path.join(DATA_DIR, "product_reviews.json"), "r") as f:
        docs = json.load(f)
    for d in docs:
        if pd.notna(d["ProductID"]):
         d["ProductID"] = int(d["ProductID"]) 
        else:
            None
        if pd.notna(d["num_reviews_sql"]):
            d["num_reviews_sql"] = int(d["num_reviews_sql"]) 
        else: 
            0
        if pd.notna(d["avg_rating_sql"]):
            d["avg_rating_sql"] = float(d["avg_rating_sql"]) 
        else:
            None
    if docs:
        coll.insert_many(docs)
    coll.count_documents({})
finally:
    client.close()


### Read from MongoDB and enrich dim_product
We’ll fetch the review aggregates and join to the product dimension before finalizing the enriched dim table.


In [62]:
# Pull Mongo docs into DataFrame and enrich dim_product
reviews_mongo = get_mongo_dataframe("product_review_aggregates")

# Left join onto the currently loaded prod_final (rebuild from base to avoid lost state)
prod_base = get_sql_dataframe(sql_dim_product_base, db=mysql["src_db"]).merge(
    pd.read_csv(os.path.join(DATA_DIR, "product_attributes.csv")), on="ProductID", how="left"
)
prod_base["OnlineOnly"] = prod_base["OnlineOnly"].fillna(False).astype(bool)

prod_enriched2 = prod_base.merge(reviews_mongo, on="ProductID", how="left")

cols2 = [
    "ProductID", "product_name", "ProductNumber", "Color", "ListPrice",
    "subcategory_name", "category_name",
    "MarketingSegment", "BrandTier", "OnlineOnly", "Season", "LaunchYear",
    "num_reviews_sql", "avg_rating_sql"
]
prod_final2 = prod_enriched2[cols2].copy()

set_dataframe(prod_final2, "dim_product", "ProductID", db=mysql["dw_db"], if_exists="replace")
get_sql_dataframe("SELECT COUNT(*) AS num_rows, COUNT(avg_rating_sql) AS num_with_reviews FROM dim_product;", db=mysql["dw_db"])


Unnamed: 0,num_rows,num_with_reviews
0,504,3


### Build dim_customer (individuals or stores) with territory
We’ll join `customer` to either `store` or `individual + contact`, then add `salesterritory` details.


In [None]:
# Build dim_customer from MySQL
sql_store = """
SELECT c.CustomerID,
       c.AccountNumber,
       c.CustomerType,
       c.TerritoryID,
       s.Name AS store_name,
       NULL AS first_name,
       NULL AS last_name
FROM customer c
JOIN store s ON s.CustomerID = c.CustomerID
"""

sql_individual = """
SELECT c.CustomerID,
       c.AccountNumber,
       c.CustomerType,
       c.TerritoryID,
       NULL AS store_name,
       ct.FirstName AS first_name,
       ct.LastName  AS last_name
FROM customer c
JOIN individual i ON i.CustomerID = c.CustomerID
JOIN contact ct ON ct.ContactID = i.ContactID
"""

stores_df = get_sql_dataframe(sql_store, db=mysql["src_db"]) 
inds_df   = get_sql_dataframe(sql_individual, db=mysql["src_db"]) 

cust_all = pd.concat([stores_df, inds_df], ignore_index=True)

# Join territory
sql_terr = "SELECT TerritoryID, Name AS territory_name, CountryRegionCode, `Group` AS terr_group FROM salesterritory;"
terr = get_sql_dataframe(sql_terr, db=mysql["src_db"]) 

cust_dim = cust_all.merge(terr, on="TerritoryID", how="left")

# Simple surrogate natural key (CustomerID)
cols = [
    "CustomerID", "AccountNumber", "CustomerType", "store_name", "first_name", "last_name",
    "TerritoryID", "territory_name", "CountryRegionCode", "terr_group"
]

cust_dim = cust_dim[cols]

set_dataframe(cust_dim, "dim_customer", "CustomerID", db=mysql["dw_db"], if_exists="replace")
get_sql_dataframe("SELECT COUNT(*) AS num_rows FROM dim_customer;", db=mysql["dw_db"])


Unnamed: 0,num_rows
0,19185


### Build fact_sales from salesorderdetail + salesorderheader
One row per order line (`SalesOrderID`, `SalesOrderDetailID`). We link to dimensions by `ProductID`, `CustomerID`, and `date_key` from `OrderDate`.


In [55]:
# Build fact_sales
sql_fact = """
SELECT 
  d.SalesOrderID,
  d.SalesOrderDetailID,
  h.OrderDate,
  h.CustomerID,
  d.ProductID,
  d.OrderQty,
  d.UnitPrice,
  d.UnitPriceDiscount,
  d.LineTotal,
  h.SubTotal,
  h.TaxAmt,
  h.Freight,
  h.TotalDue
FROM salesorderdetail d
JOIN salesorderheader h ON h.SalesOrderID = d.SalesOrderID
"""

fact = get_sql_dataframe(sql_fact, db=mysql["src_db"]) 

# Derive date_key
df = fact.copy()
df["date_key"] = pd.to_datetime(df["OrderDate"]).dt.strftime("%Y%m%d").astype(int)

cols = [
    "SalesOrderID", "SalesOrderDetailID", "date_key", "CustomerID", "ProductID",
    "OrderQty", "UnitPrice", "UnitPriceDiscount", "LineTotal", "SubTotal", "TaxAmt", "Freight", "TotalDue"
]

fact_final = df[cols]

set_dataframe(fact_final, "fact_sales", "SalesOrderDetailID", db=mysql["dw_db"], if_exists="replace")
get_sql_dataframe("SELECT COUNT(*) AS num_rows FROM fact_sales;", db=mysql["dw_db"])


Unnamed: 0,num_rows
0,121317


### Analytical queries (SELECT with GROUP BY)
We’ll validate by aggregating sales across the fact and at least two dimensions.


In [None]:
# Total sales by year and product category
q1 = """
SELECT d.year,
       p.category_name,
       ROUND(SUM(f.LineTotal), 2) AS total_sales
FROM fact_sales f
JOIN dim_date d       ON f.date_key = d.date_key
JOIN dim_product p    ON f.ProductID = p.ProductID
GROUP BY d.year, p.category_name
ORDER BY d.year, p.category_name;
"""
get_sql_dataframe(q1, db=mysql["dw_db"]).head(10)


Unnamed: 0,year,category_name,total_sales
0,2001,Accessories,20235.36
1,2001,Bikes,10661722.28
2,2001,Clothing,34376.34
3,2001,Components,615474.98
4,2002,Accessories,92735.35
5,2002,Bikes,26486358.2
6,2002,Clothing,485587.15
7,2002,Components,3610092.47
8,2003,Accessories,590257.59
9,2003,Bikes,34923280.24


In [None]:
# Average order discount by territory group and year
q2 = """
SELECT d.year,
       c.terr_group,
       ROUND(AVG(f.UnitPriceDiscount), 4) AS avg_discount
FROM fact_sales f
JOIN dim_date d        ON f.date_key = d.date_key
JOIN dim_customer c    ON f.CustomerID = c.CustomerID
GROUP BY d.year, c.terr_group
ORDER BY d.year, c.terr_group;
"""
get_sql_dataframe(q2, db=mysql["dw_db"]).head(10)


Unnamed: 0,year,terr_group,avg_discount
0,2001,Europe,0.0
1,2001,North America,0.0002
2,2001,Pacific,0.0
3,2002,Europe,0.0021
4,2002,North America,0.0052
5,2002,Pacific,0.0
6,2003,Europe,0.0036
7,2003,North America,0.0033
8,2003,Pacific,0.0032
9,2004,Europe,0.0012


In [None]:
# Top products by total sales and review rating
q3 = """
SELECT p.product_name,
       p.category_name,
       COALESCE(p.avg_rating_sql, 0) AS avg_rating,
       ROUND(SUM(f.LineTotal), 2) AS total_sales
FROM fact_sales f
JOIN dim_product p ON f.ProductID = p.ProductID
GROUP BY p.product_name, p.category_name, p.avg_rating_sql
ORDER BY total_sales DESC
LIMIT 10;
"""
get_sql_dataframe(q3, db=mysql["dw_db"])


Unnamed: 0,product_name,category_name,avg_rating,total_sales
0,"Mountain-200 Black, 38",Bikes,0.0,4400592.8
1,"Mountain-200 Black, 42",Bikes,0.0,4009494.76
2,"Mountain-200 Silver, 38",Bikes,0.0,3693678.03
3,"Mountain-200 Silver, 42",Bikes,0.0,3438478.86
4,"Mountain-200 Silver, 46",Bikes,0.0,3434256.94
5,"Mountain-200 Black, 46",Bikes,0.0,3309673.22
6,"Road-250 Black, 44",Bikes,0.0,2516857.31
7,"Road-250 Black, 48",Bikes,0.0,2347655.95
8,"Road-250 Black, 52",Bikes,0.0,2012447.78
9,"Road-150 Red, 56",Bikes,0.0,1847818.63
