# Adventureworks Data Warehouse ETL - Project 1 (Mongo)
**Goal:** Load a small NoSQL dataset into MongoDB, extract to pandas, transform (derive `date_key`, map to `customer_key`), load into MySQL DW table `fact_web_events`, and validate with SQL queries.

#### Small JSON dataset file creation

In [1]:
import os
print(os.getcwd())

/Users/shepherdcrisp/Desktop/DS 2002 /DS-2002-main/CRISP Project 1


In [2]:
import os, json, random
from datetime import datetime, timedelta

# ensure /data directory exists
os.makedirs("data", exist_ok=True)

event_types = ["Website Visit", "Product View", "Add to Cart", "Checkout", "Product Review"]
pages = ["Home", "Mountain-500", "HL Road Frame", "Touring Tire", "Chainring", "Bike Stand",
         "Helmet", "Gloves", "Accessories", "Water Bottle", "Mountain Tire", "Road Tire"]

start_date = datetime(2004, 1, 1)
events = []

for i in range(50):
    customer_id = 11000 + random.randint(0, 49)
    event_type = random.choice(event_types)
    page = random.choice(pages)
    event_time = start_date + timedelta(days=random.randint(0, 180),
                                        hours=random.randint(0, 23),
                                        minutes=random.randint(0, 59))
    events.append({
        "customer_id": customer_id,
        "event_type": event_type,
        "event_time": event_time.strftime("%Y-%m-%dT%H:%M:%S"),
        "page": page
    })

with open("data/web_events.json", "w") as f:
    json.dump(events, f, indent=4)

print("Created data/web_events.json with", len(events), "records")

Created data/web_events.json with 50 records


In [3]:
!ls data

web_events.json


In [4]:
with open("data/web_events.json") as f:
    print(f.read()[:500])

[
    {
        "customer_id": 11041,
        "event_type": "Checkout",
        "event_time": "2004-04-07T23:32:00",
        "page": "Touring Tire"
    },
    {
        "customer_id": 11034,
        "event_type": "Website Visit",
        "event_time": "2004-05-15T06:38:00",
        "page": "Water Bottle"
    },
    {
        "customer_id": 11036,
        "event_type": "Website Visit",
        "event_time": "2004-03-30T04:18:00",
        "page": "Touring Tire"
    },
    {
        "customer_id": 


#### Import the necessary libraries

In [5]:
%pip install "pymongo[srv]" 
import pymongo, sys
print("pymongo:", pymongo.__version__) 
print("python exe:", sys.executable)

Note: you may need to restart the kernel to use updated packages.
pymongo: 4.15.3
python exe: /opt/anaconda3/envs/py312/bin/python


In [6]:
import os
import json
import numpy
import datetime
import certifi
import pandas as pd

import pymongo
import sqlalchemy
from sqlalchemy import create_engine, text

In [7]:
print(f"Running SQL Alchemy Version: {sqlalchemy.__version__}")
print(f"Running PyMongo Version: {pymongo.__version__}")

Running SQL Alchemy Version: 2.0.44
Running PyMongo Version: 4.15.3


#### Connection

In [8]:
mysql_args = {
    "uid" : "root",
    "pwd" : "shep1234",
    "hostname" : "localhost",
    "dbname" : "adventureworks_dw"
}

# The 'cluster_location' must either be "atlas" or "local".
mongodb_args = {
    "user_name" : "sscrisp2004_db_user",
    "password" : "shep1234",
    "host": "ds2002.yh7sc51.mongodb.net",
    "cluster_name" : "ds2002",
    "cluster_subnet" : "yh7sc51",
    "cluster_location" : "atlas", # "local"
    "db_name" : "adventureworks_mongo"
}

In [9]:
# Mongo helpers 
def get_sql_dataframe(sql_query, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the pd.read_sql() function to query the database, and fill a Pandas DataFrame.'''
    dframe = pd.read_sql(text(sql_query), connection);
    connection.close()
    
    return dframe
    

def set_dataframe(df, table_name, pk_column, db_operation, **args):
    '''Create a connection to the MySQL database'''
    conn_str = f"mysql+pymysql://{args['uid']}:{args['pwd']}@{args['hostname']}/{args['dbname']}"
    sqlEngine = create_engine(conn_str, pool_recycle=3600)
    connection = sqlEngine.connect()
    
    '''Invoke the Pandas DataFrame .to_sql( ) function to either create, or append to, a table'''
    if db_operation == "insert":
        df.to_sql(table_name, con=connection, index=False, if_exists='replace')
        connection.execute(text(f"ALTER TABLE {table_name} ADD PRIMARY KEY ({pk_column});"))
            
    elif db_operation == "update":
        df.to_sql(table_name, con=connection, index=False, if_exists='append')
    
    connection.close()


def get_mongo_client(**args):
    '''Validate proper input'''
    if args["cluster_location"] not in ['atlas', 'local']:
        raise Exception("You must specify either 'atlas' or 'local' for the cluster_location parameter.")
    
    else:
        if args["cluster_location"] == "atlas":
            connect_str = f"mongodb+srv://{args['user_name']}:{args['password']}@"
            connect_str += f"{args['cluster_name']}.{args['cluster_subnet']}.mongodb.net"
            client = pymongo.MongoClient(connect_str, tlsCAFile=certifi.where())
            
        elif args["cluster_location"] == "local":
            client = pymongo.MongoClient("mongodb://localhost:27017/")
        
    return client


def get_mongo_dataframe(mongo_client, db_name, collection, query):
    '''Query MongoDB, and fill a python list with documents to create a DataFrame'''
    db = mongo_client[db_name]
    dframe = pd.DataFrame(list(db[collection].find(query)))
    dframe.drop(['_id'], axis=1, inplace=True)
    mongo_client.close()
    
    return dframe


def set_mongo_collections(mongo_client, db_name, data_directory, json_files):
    db = mongo_client[db_name]
    
    for file in json_files:
        db.drop_collection(file)
        json_file = os.path.join(data_directory, json_files[file])
        with open(json_file, 'r') as openfile:
            json_object = json.load(openfile)
            file = db[file]
            result = file.insert_many(json_object)
        
    mongo_client.close()

##### Load JSON into Mongo

In [10]:
client = get_mongo_client(**mongodb_args)

data_dir   = os.path.join(os.getcwd(), "data")
json_files = {"web_events": "web_events.json"}   # collection : filename

set_mongo_collections(client, mongodb_args["db_name"], data_dir, json_files)
print("Loaded web_events.json into Mongo:", mongodb_args["db_name"], "/ web_events")

Loaded web_events.json into Mongo: adventureworks_mongo / web_events


In [11]:
client = get_mongo_client(**mongodb_args)
db = client[mongodb_args["db_name"]]
print("Collections:", db.list_collection_names())
print("web_events count:", db["web_events"].count_documents({}))
client.close()

Collections: ['web_events']
web_events count: 50


In [12]:
mongo_client = get_mongo_client(**mongodb_args)
df_events = get_mongo_dataframe(mongo_client, mongodb_args["db_name"], "web_events", {})
df_events.head()

Unnamed: 0,customer_id,event_type,event_time,page
0,11041,Checkout,2004-04-07T23:32:00,Touring Tire
1,11034,Website Visit,2004-05-15T06:38:00,Water Bottle
2,11036,Website Visit,2004-03-30T04:18:00,Touring Tire
3,11041,Product View,2004-04-18T13:06:00,HL Road Frame
4,11005,Checkout,2004-02-11T23:56:00,Chainring


#### Transform (derive date_key and map to DW customer_key)

In [13]:
df_events["event_time"] = pd.to_datetime(df_events["event_time"], errors="coerce")
df_events = df_events.dropna(subset=["event_time"]).copy()

df_events["date_key"] = (
    df_events["event_time"].dt.year*10000
    + df_events["event_time"].dt.month*100
    + df_events["event_time"].dt.day
).astype(int)

In [14]:
dim_cust = get_sql_dataframe("SELECT customer_key, customer_id FROM dim_customers", **mysql_args)
df_events = df_events.merge(dim_cust, how="left", on="customer_id")

In [15]:
df_events.insert(0, "fact_web_event_key", range(1, len(df_events)+1))
ordered_cols = ["fact_web_event_key","customer_key","customer_id","event_type","page","event_time","date_key"]
df_events = df_events[[c for c in ordered_cols if c in df_events.columns]]
df_events.head()

Unnamed: 0,fact_web_event_key,customer_key,customer_id,event_type,page,event_time,date_key
0,1,754,11041,Checkout,Touring Tire,2004-04-07 23:32:00,20040407
1,2,747,11034,Website Visit,Water Bottle,2004-05-15 06:38:00,20040515
2,3,749,11036,Website Visit,Touring Tire,2004-03-30 04:18:00,20040330
3,4,754,11041,Product View,HL Road Frame,2004-04-18 13:06:00,20040418
4,5,718,11005,Checkout,Chainring,2004-02-11 23:56:00,20040211


##### Load into MySQL dw

In [16]:
set_dataframe(df_events, table_name="fact_web_events",
              pk_column="fact_web_event_key", db_operation="insert", **mysql_args)
print("Loaded fact_web_events to MySQL DW")

Loaded fact_web_events to MySQL DW


In [17]:
# Validation of new fact table 
sql_validate_web = """
    SELECT COUNT(*) AS row_count,
           MIN(fact_web_event_key) AS min_key,
           MAX(fact_web_event_key) AS max_key
    FROM fact_web_events;
"""
df_validation_web = get_sql_dataframe(sql_validate_web, **mysql_args)
df_validation_web 

Unnamed: 0,row_count,min_key,max_key
0,50,1,50


### DEMOS 

In [18]:
# Events by year and month --> proves that date_key is valid
sql_demo_web_1 = """
    SELECT
      YEAR(dd.full_date)  AS year_num,
      MONTH(dd.full_date) AS month_num,
      COUNT(*)            AS events
    FROM fact_web_events f
    JOIN dim_date dd ON f.date_key = dd.date_key
    GROUP BY year_num, month_num
    ORDER BY year_num, month_num;
"""
df_demo_web_1 = get_sql_dataframe(sql_demo_web_1, **mysql_args)
df_demo_web_1

Unnamed: 0,year_num,month_num,events
0,2004,1,9
1,2004,2,9
2,2004,3,8
3,2004,4,10
4,2004,5,10
5,2004,6,4


In [19]:
# top pages by events 
sql_demo_web_2 = """
    SELECT page, COUNT(*) AS events 
    FROM fact_web_events
    GROUP BY page 
    ORDER BY events DESC; 
    """
df_demo_web_2 = get_sql_dataframe(sql_demo_web_2, **mysql_args)
df_demo_web_2

Unnamed: 0,page,events
0,Mountain Tire,7
1,Chainring,6
2,Touring Tire,5
3,HL Road Frame,5
4,Accessories,5
5,Bike Stand,5
6,Water Bottle,4
7,Home,4
8,Gloves,4
9,Helmet,2


In [20]:
# top customer by event count --> proves that customer_key join works 
sql_demo_web_3 = """
    SELECT
      dc.customer_id,
      COUNT(*) AS events
    FROM fact_web_events f
    JOIN dim_customers dc ON f.customer_key = dc.customer_key
    GROUP BY dc.customer_id
    ORDER BY events DESC
    LIMIT 10;
"""
df_demo_web_3 = get_sql_dataframe(sql_demo_web_3, **mysql_args)
df_demo_web_3

Unnamed: 0,customer_id,events
0,11041,4
1,11031,4
2,11047,3
3,11036,2
4,11016,2
5,11038,2
6,11040,2
7,11001,2
8,11019,2
9,11045,2


In [21]:
# joined sample rows 
sql_demo_web_4 = """
    SELECT
      f.fact_web_event_key,
      dd.full_date,
      dc.customer_id,
      f.event_type,
      f.page
    FROM fact_web_events f
    JOIN dim_date dd       ON f.date_key = dd.date_key
    LEFT JOIN dim_customers dc ON f.customer_key = dc.customer_key
    ORDER BY dd.full_date, f.fact_web_event_key
    LIMIT 10;
"""
df_demo_web_4 = get_sql_dataframe(sql_demo_web_4, **mysql_args)
df_demo_web_4

Unnamed: 0,fact_web_event_key,full_date,customer_id,event_type,page
0,15,2004-01-02,11016,Website Visit,Touring Tire
1,38,2004-01-04,11012,Add to Cart,Chainring
2,18,2004-01-13,11004,Add to Cart,Accessories
3,41,2004-01-14,11031,Add to Cart,Chainring
4,8,2004-01-15,11047,Checkout,Water Bottle
5,34,2004-01-20,11007,Product Review,Bike Stand
6,6,2004-01-21,11038,Website Visit,Home
7,43,2004-01-29,11041,Add to Cart,Accessories
8,31,2004-01-31,11031,Product Review,Bike Stand
9,16,2004-02-06,11004,Product View,Chainring


In [23]:
sql_demo_web_territory = """
SELECT
  YEAR(dd.full_date)  AS year_num,
  MONTH(dd.full_date) AS month_num,
  COALESCE(dc.Sales_Territory, 'Unknown') AS sales_territory,
  COUNT(*) AS events
FROM fact_web_events f
JOIN dim_date dd ON f.date_key = dd.date_key
LEFT JOIN dim_customers dc ON f.customer_key = dc.customer_key
GROUP BY year_num, month_num, sales_territory
ORDER BY year_num, month_num, events DESC;
"""
get_sql_dataframe(sql_demo_web_territory, **mysql_args)


Unnamed: 0,year_num,month_num,sales_territory,events
0,2004,1,Australia,6
1,2004,1,Southwest,2
2,2004,1,Northwest,1
3,2004,2,Australia,6
4,2004,2,Southwest,2
5,2004,2,Northwest,1
6,2004,3,Australia,5
7,2004,3,Southwest,2
8,2004,3,Canada,1
9,2004,4,Australia,5
