In [58]:
import pymysql
import pymongo
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
import datetime
import decimal
import pandas as pd

### Step 1: Connect to MySQL

In [None]:
mysql_conn = pymysql.connect(
    host='mdsmysql.sci.pitt.edu',
    user='mdsGlobalUser',
    password='mds$uper$ecurePassword123',
    database='classicmodels'
)
mysql_cursor = mysql_conn.cursor(pymysql.cursors.DictCursor)

with mysql_conn.cursor() as cursor:
    cursor.execute("SELECT VERSION()")
    version = cursor.fetchone()
    print("MySQL version:", version[0])
    

MySQL version: 8.4.4


### Step 2: Connect to MongoDB

In [57]:
username = "etladmin"
password = "securePassword123"
host = "mdsmongodb.sci.pitt.edu"
port = 27017
auth_db = "etl"  # or the database where the user is defined

# Format the connection string
uri = f"mongodb://{username}:{password}@{host}:{port}/?authSource={auth_db}"

mongo_client = MongoClient(uri)
mongo_db = mongo_client['etl']
mongo_client.admin.command('ping')  # Trigger connection
print("✅ Successfully connected to MongoDB with authentication.")
print("Available databases:", mongo_client.list_database_names())

✅ Successfully connected to MongoDB with authentication.
Available databases: ['etl']


### Step 3: Create a Helper Function to Sanitize Data for MongoDB

* MongoDB is unable to serialize decimal.Decimal objects. 
* MongoDB (via PyMongo) expects standard Python types like float, int, str, etc., and Decimal is not supported by default.
* **Fix**: Convert Decimal to float before inserting into MongoDB by creating a recursive function to walk through your document and convert all Decimal objects to float (and optionally also convert datetime.date to datetime.datetime if needed).

In [46]:
def sanitize_for_mongo(doc):
    if isinstance(doc, dict):
        return {k: sanitize_for_mongo(v) for k, v in doc.items()}
    elif isinstance(doc, list):
        return [sanitize_for_mongo(item) for item in doc]
    elif isinstance(doc, decimal.Decimal):
        return float(doc)
    elif isinstance(doc, datetime.date) and not isinstance(doc, datetime.datetime):
        return datetime.datetime.combine(doc, datetime.time())
    else:
        return doc


### Step 4: Define helper function to fetch, sanitize, and insert collections

In [47]:
def migrate_table_to_collection(sql_query, mongo_collection, transform_func=None):
    mysql_cursor.execute(sql_query)
    rows = mysql_cursor.fetchall()
    if transform_func:
        rows = [transform_func(row) for row in rows]
    sanitized_rows = [sanitize_for_mongo(row) for row in rows]
    if sanitized_rows:
        mongo_collection.insert_many(sanitized_rows)

### Step 5: ETL - Extract, Transform, Load Data

In [48]:
# --- Migrate Employees (Referenced) ---
migrate_table_to_collection("SELECT * FROM employees", mongo_db.employees)

In [49]:
# --- Migrate Offices (Referenced) ---
migrate_table_to_collection("SELECT * FROM offices", mongo_db.offices)


In [50]:
# --- Migrate Customers and Embed Payments ---
mysql_cursor.execute("SELECT * FROM customers")
customers = mysql_cursor.fetchall()
for customer in customers:
    # Embed payments into customer
    mysql_cursor.execute("SELECT * FROM payments WHERE customerNumber = %s", (customer['customerNumber'],))
    payments = mysql_cursor.fetchall()
    customer['payments'] = payments

    # Embed sales rep name
    mysql_cursor.execute("SELECT lastName, firstName FROM employees WHERE employeeNumber = %s", 
                         (customer['salesRepEmployeeNumber'],))
    rep = mysql_cursor.fetchone()
    if rep:
        customer['salesRep'] = rep

    # Insert sanitized customer document
    mongo_db.customers.insert_one(sanitize_for_mongo(customer))

In [51]:
# --- Migrate Orders and Embed OrderDetails ---
mysql_cursor.execute("SELECT * FROM orders")
orders = mysql_cursor.fetchall()
for order in orders:
    # Embed order details
    mysql_cursor.execute("SELECT * FROM orderdetails WHERE orderNumber = %s", (order['orderNumber'],))
    order_details = mysql_cursor.fetchall()
    order['orderDetails'] = order_details

    mongo_db.orders.insert_one(sanitize_for_mongo(order))

In [52]:
# --- Migrate Products (Referenced) ---
migrate_table_to_collection("SELECT * FROM products", mongo_db.products)


In [53]:
# --- Migrate ProductLines and Embed Products ---
mysql_cursor.execute("SELECT * FROM productlines")
productlines = mysql_cursor.fetchall()
for pl in productlines:
    mysql_cursor.execute("SELECT * FROM products WHERE productLine = %s", (pl['productLine'],))
    products = mysql_cursor.fetchall()
    pl['products'] = products
    mongo_db.productlines.insert_one(sanitize_for_mongo(pl))

### Verify ETL Results

In [59]:
# Employees
employees_df = pd.DataFrame(list(mongo_db.employees.find()))
employees_df.head()



Unnamed: 0,_id,employeeNumber,lastName,firstName,extension,email,officeCode,reportsTo,jobTitle
0,67fd2d6ac7e3c946d00b3030,1002,Murphy,Diane,x5800,dmurphy@classicmodelcars.com,1,,Probation
1,67fd2d6ac7e3c946d00b3031,1056,Patterson,Mary,x4611,mpatterso@classicmodelcars.com,1,1002.0,VP Sales
2,67fd2d6ac7e3c946d00b3032,1076,Firrelli,Jeff,x9273,jfirrelli@classicmodelcars.com,1,1002.0,VP Marketing
3,67fd2d6ac7e3c946d00b3033,1088,Patterson,William,x4871,wpatterson@classicmodelcars.com,6,1056.0,Sales Manager (APAC)
4,67fd2d6ac7e3c946d00b3034,1102,Bondur,Gerard,x5408,gbondur@classicmodelcars.com,4,1056.0,Sale Manager (EMEA)


In [60]:
# Offices
offices_df = pd.DataFrame(list(mongo_db.offices.find()))
offices_df.head()

Unnamed: 0,_id,officeCode,city,phone,addressLine1,addressLine2,state,country,postalCode,territory
0,67fd2d73c7e3c946d00b3047,1,San Francisco,+1 650 219 4782,100 Market Street,Suite 300,CA,USA,94080,
1,67fd2d73c7e3c946d00b3048,2,Boston,+1 215 837 0825,1550 Court Place,Suite 102,MA,USA,02107,
2,67fd2d73c7e3c946d00b3049,3,NYC,+1 212 555 3000,523 East 53rd Street,apt. 5A,NY,USA,10022,
3,67fd2d73c7e3c946d00b304a,4,Paris,+33 14 723 4404,43 Rue Jouffroy D'abbans,,,France,75017,EMEA
4,67fd2d73c7e3c946d00b304b,5,Tokyo,+81 33 224 5000,4-1 Kioicho,,Chiyoda-Ku,Japan,102-8578,Japan


In [61]:
# Customers (nested 'payments' and 'salesRep' will be embedded dicts/lists)
customers_df = pd.DataFrame(list(mongo_db.customers.find()))
customers_df[['customerNumber', 'customerName', 'payments']].head()


Unnamed: 0,customerNumber,customerName,payments
0,103,Atelier graphique,"[{'customerNumber': 103, 'checkNumber': 'HQ336..."
1,112,Signal Gift Stores,"[{'customerNumber': 112, 'checkNumber': 'BO864..."
2,114,"Australian Collectors, Co.","[{'customerNumber': 114, 'checkNumber': 'GG314..."
3,119,La Rochelle Gifts,"[{'customerNumber': 119, 'checkNumber': 'DB933..."
4,121,Baane Mini Imports,"[{'customerNumber': 121, 'checkNumber': 'DB889..."


In [62]:
# Orders (nested 'orderDetails' will be lists of dicts)
orders_df = pd.DataFrame(list(mongo_db.orders.find()))
orders_df[['orderNumber', 'orderDate', 'orderDetails']].head()


Unnamed: 0,orderNumber,orderDate,orderDetails
0,10100,2003-01-06,"[{'orderNumber': 10100, 'productCode': 'S18_17..."
1,10101,2003-01-09,"[{'orderNumber': 10101, 'productCode': 'S18_23..."
2,10102,2003-01-10,"[{'orderNumber': 10102, 'productCode': 'S18_13..."
3,10103,2003-01-29,"[{'orderNumber': 10103, 'productCode': 'S10_19..."
4,10104,2003-01-31,"[{'orderNumber': 10104, 'productCode': 'S12_31..."


In [63]:
# Products
products_df = pd.DataFrame(list(mongo_db.products.find()))
products_df.head()

Unnamed: 0,_id,productCode,productName,productLine,productScale,productVendor,productDescription,quantityInStock,buyPrice,MSRP
0,67fd2daec7e3c946d00b320e,S10_1678,1969 Harley Davidson Ultimate Chopper,Motorcycles,1:10,Min Lin Diecast,"This replica features working kickstand, front...",7933,48.81,95.7
1,67fd2daec7e3c946d00b320f,S10_1949,1952 Alpine Renault 1300,Classic Cars,1:10,Classic Metal Creations,Turnable front wheels; steering function; deta...,7305,98.58,214.3
2,67fd2daec7e3c946d00b3210,S10_2016,1996 Moto Guzzi 1100i,Motorcycles,1:10,Highway 66 Mini Classics,"Official Moto Guzzi logos and insignias, saddl...",6625,68.99,118.94
3,67fd2daec7e3c946d00b3211,S10_4698,2003 Harley-Davidson Eagle Drag Bike,Motorcycles,1:10,Red Start Diecast,"Model features, official Harley Davidson logos...",5582,91.02,193.66
4,67fd2daec7e3c946d00b3212,S10_4757,1972 Alfa Romeo GTA,Classic Cars,1:10,Motor City Art Classics,Features include: Turnable front wheels; steer...,3252,85.68,136.0


In [64]:
# Product Lines (with embedded products)
productlines_df = pd.DataFrame(list(mongo_db.productlines.find()))
productlines_df[['productLine', 'products']].head()

Unnamed: 0,productLine,products
0,Classic Cars,"[{'productCode': 'S10_1949', 'productName': '1..."
1,Motorcycles,"[{'productCode': 'S10_1678', 'productName': '1..."
2,Planes,"[{'productCode': 'S18_1662', 'productName': '1..."
3,Ships,"[{'productCode': 'S18_3029', 'productName': '1..."
4,Trains,"[{'productCode': 'S18_3259', 'productName': 'C..."


### Close Connections

In [65]:
# --- Close Connections ---
mysql_cursor.close()
mysql_conn.close()
mongo_client.close()