# DS2002 Project 1 - Retail Sales ETL Pipeline
**Fashion Retail Sales Data Mart**

This notebook demonstrates:
- Extract from CSV files + MongoDB
- Transform data (cleaning, joining, key generation)
- Load into MySQL star schema

## 1. Import Libraries

In [1]:
import pandas as pd
import pymongo
import mysql.connector
from sqlalchemy import create_engine
import warnings
warnings.filterwarnings('ignore')

print("✓ Libraries imported successfully")

✓ Libraries imported successfully


## 2. Extract Data from Sources

### 2.1 Extract from CSV Files

In [2]:
# Load CSV files
df_sales = pd.read_csv('../data/Fashion_Retail_Sales.csv')
df_products = pd.read_csv('../data/products.csv')
df_customers = pd.read_csv('../data/customers.csv')

print(f"✓ Sales data: {len(df_sales)} records")
print(f"✓ Products: {len(df_products)} items")
print(f"✓ Customers: {len(df_customers)} profiles")

# Preview sales data
df_sales.head(3)

✓ Sales data: 3400 records
✓ Products: 61 items
✓ Customers: 166 profiles


Unnamed: 0,Customer Reference ID,Item Purchased,Purchase Amount (USD),Date Purchase,Review Rating,Payment Method
0,4018,Handbag,4619.0,2023-02-05,,Credit Card
1,4115,Tunic,2456.0,2023-07-11,2.0,Credit Card
2,4019,Tank Top,2102.0,2023-03-23,4.1,Cash


### 2.2 Extract from MongoDB Atlas

In [3]:
# MongoDB Atlas connection
atlas_url = "mongodb+srv://simonalam1234_db_user:GWXph3tW1aIUWdyr@cluster.axefz7f.mongodb.net/retail_db?retryWrites=true&w=majority"

try:
    client = pymongo.MongoClient(atlas_url)
    db = client["retail_db"]
    
    # Extract suppliers collection
    df_suppliers = pd.DataFrame(list(db["suppliers"].find()))
    
    # Remove MongoDB's _id column
    if "_id" in df_suppliers.columns:
        df_suppliers = df_suppliers.drop("_id", axis=1)
    
    print(f"✓ MongoDB connection successful")
    print(f"✓ Suppliers: {len(df_suppliers)} records")
    df_suppliers
    
except Exception as e:
    print(f"⚠ MongoDB connection failed: {e}")
    print("Continuing without MongoDB data...")
    df_suppliers = pd.DataFrame()

✓ MongoDB connection successful
✓ Suppliers: 4 records


## 3. Data Exploration & Quality Checks

In [4]:
print("Data Quality Checks:")
print("="*50)

# Check for duplicates
print(f"\n1. Duplicate customers: {df_customers['customer_reference_id'].duplicated().sum()}")
print(f"2. Duplicate products: {df_products['item_name'].duplicated().sum()}")

# Check for missing values
print(f"\n3. Missing review ratings: {df_sales['Review Rating'].isna().sum()} ({df_sales['Review Rating'].isna().sum()/len(df_sales)*100:.1f}%)")

# Check data consistency
sales_customers = set(df_sales['Customer Reference ID'].unique())
csv_customers = set(df_customers['customer_reference_id'].unique())
missing_customers = sales_customers - csv_customers
print(f"\n4. Missing customers in customer file: {len(missing_customers)}")

sales_items = set(df_sales['Item Purchased'].unique())
csv_items = set(df_products['item_name'].unique())
missing_items = sales_items - csv_items
print(f"5. Missing products in product file: {len(missing_items)}")

if missing_items:
    print(f"   ⚠ WARNING: These items have sales but no product record:")
    for item in list(missing_items)[:5]:
        print(f"      - {item}")

Data Quality Checks:

1. Duplicate customers: 0
2. Duplicate products: 0

3. Missing review ratings: 324 (9.5%)

4. Missing customers in customer file: 0
5. Missing products in product file: 0


## 4. Transform Data

### 4.1 Clean Data

In [5]:
# Remove duplicates
df_customers_clean = df_customers.drop_duplicates(subset=['customer_reference_id'])
df_products_clean = df_products.drop_duplicates(subset=['item_name'])

print(f"✓ Cleaned customers: {len(df_customers_clean)} (removed {len(df_customers) - len(df_customers_clean)} duplicates)")
print(f"✓ Cleaned products: {len(df_products_clean)} (removed {len(df_products) - len(df_products_clean)} duplicates)")

✓ Cleaned customers: 166 (removed 0 duplicates)
✓ Cleaned products: 61 (removed 0 duplicates)


### 4.2 Create Customer Dimension

In [6]:
# Create customer dimension with surrogate keys
dim_customer = df_customers_clean.copy()
dim_customer.insert(0, 'customer_id', range(1, len(dim_customer) + 1))

# Combine first and last name
dim_customer['name'] = dim_customer['first_name'] + ' ' + dim_customer['last_name']

# Map loyalty tier to member status
dim_customer['loyalty_member'] = dim_customer['loyalty_tier'].apply(
    lambda x: 'Yes' if x in ['Gold', 'Platinum'] else 'No'
)

# Select relevant columns for dimension
dim_customer = dim_customer[[
    'customer_id',
    'customer_reference_id', 
    'name',
    'email',
    'city',
    'loyalty_member',
    'age'
]]

print(f"✓ Created customer dimension: {len(dim_customer)} rows")
dim_customer.head(3)

✓ Created customer dimension: 166 rows


Unnamed: 0,customer_id,customer_reference_id,name,email,city,loyalty_member,age
0,1,3957,Aiden Davis,aiden.davis26@email.com,San Diego,Yes,20
1,2,3958,Olivia Garcia,olivia.garcia224@email.com,Seattle,No,46
2,3,3959,Grace Moore,grace.moore829@email.com,Las Vegas,No,66


### 4.3 Create Product Dimension

In [7]:
# Product dimension already has product_id
dim_product = df_products_clean.copy()

print(f"✓ Created product dimension: {len(dim_product)} rows")
dim_product.head(3)

✓ Created product dimension: 61 rows


Unnamed: 0,product_id,item_name,category,brand,material,season,gender_target,base_price,stock_quantity,supplier_name,product_introduction_date
0,1001,Handbag,Accessories,Classic Collection,Synthetic,All Season,Women,4650,259,Elite Textiles,2020-05-08
1,1002,Tunic,Tops,Modern Wardrobe,Polyester,Fall/Winter,Women,4285,314,Premium Manufacturing,2022-06-18
2,1003,Tank Top,Tops,Haute Couture,Canvas,All Season,Men,2245,443,International Imports,2021-05-29


### 4.4 Create Date Dimension

In [8]:
# Extract unique dates from sales
dim_date = df_sales[['Date Purchase']].drop_duplicates().copy()
dim_date.columns = ['purchase_date']

# Add surrogate key
dim_date = dim_date.sort_values('purchase_date').reset_index(drop=True)
dim_date.insert(0, 'date_id', range(1, len(dim_date) + 1))

# Parse date components
dim_date['purchase_date'] = pd.to_datetime(dim_date['purchase_date'])
dim_date['year'] = dim_date['purchase_date'].dt.year
dim_date['month'] = dim_date['purchase_date'].dt.month
dim_date['day'] = dim_date['purchase_date'].dt.day

print(f"✓ Created date dimension: {len(dim_date)} rows")
print(f"  Date range: {dim_date['purchase_date'].min().date()} to {dim_date['purchase_date'].max().date()}")
dim_date.head(3)

✓ Created date dimension: 365 rows
  Date range: 2022-10-02 to 2023-10-01


Unnamed: 0,date_id,purchase_date,year,month,day
0,1,2022-10-02,2022,10,2
1,2,2022-10-03,2022,10,3
2,3,2022-10-04,2022,10,4


### 4.5 Create Payment Dimension

In [9]:
# Extract unique payment methods
dim_payment = df_sales[['Payment Method']].drop_duplicates().copy()
dim_payment.columns = ['payment_method']

# Add surrogate key
dim_payment = dim_payment.sort_values('payment_method').reset_index(drop=True)
dim_payment.insert(0, 'payment_id', range(1, len(dim_payment) + 1))

print(f"✓ Created payment dimension: {len(dim_payment)} rows")
dim_payment

✓ Created payment dimension: 2 rows


Unnamed: 0,payment_id,payment_method
0,1,Cash
1,2,Credit Card


### 4.6 Create Sales Fact Table

In [10]:
# Start with sales data
fact_sales = df_sales.copy()

# Convert date for joining
fact_sales['Date Purchase'] = pd.to_datetime(fact_sales['Date Purchase'])

# Join with customer dimension to get customer_id
customer_lookup = dim_customer[['customer_id', 'customer_reference_id']]
fact_sales = fact_sales.merge(
    customer_lookup,
    left_on='Customer Reference ID',
    right_on='customer_reference_id',
    how='left'
)

# Join with product dimension to get product_id
product_lookup = dim_product[['product_id', 'item_name']]
fact_sales = fact_sales.merge(
    product_lookup,
    left_on='Item Purchased',
    right_on='item_name',
    how='left'
)

# Join with date dimension to get date_id
date_lookup = dim_date[['date_id', 'purchase_date']]
fact_sales = fact_sales.merge(
    date_lookup,
    left_on='Date Purchase',
    right_on='purchase_date',
    how='left'
)

# Join with payment dimension to get payment_id
fact_sales = fact_sales.merge(
    dim_payment,
    left_on='Payment Method',
    right_on='payment_method',
    how='left'
)

# Select only fact table columns
fact_sales = fact_sales[[
    'customer_id',
    'product_id',
    'date_id',
    'payment_id',
    'Purchase Amount (USD)',
    'Review Rating'
]].rename(columns={
    'Purchase Amount (USD)': 'purchase_amount',
    'Review Rating': 'review_rating'
})

# Check for any NULL foreign keys
print("Foreign Key Validation:")
print(f"  NULL customer_id: {fact_sales['customer_id'].isna().sum()}")
print(f"  NULL product_id: {fact_sales['product_id'].isna().sum()}")
print(f"  NULL date_id: {fact_sales['date_id'].isna().sum()}")
print(f"  NULL payment_id: {fact_sales['payment_id'].isna().sum()}")

# Remove rows with NULL foreign keys (if any)
fact_sales_clean = fact_sales.dropna(subset=['customer_id', 'product_id', 'date_id', 'payment_id'])

print(f"\n✓ Created sales fact table: {len(fact_sales_clean)} rows")
print(f"  (Removed {len(fact_sales) - len(fact_sales_clean)} rows with missing foreign keys)")
fact_sales_clean.head(3)

Foreign Key Validation:
  NULL customer_id: 0
  NULL product_id: 0
  NULL date_id: 0
  NULL payment_id: 0

✓ Created sales fact table: 3400 rows
  (Removed 0 rows with missing foreign keys)


Unnamed: 0,customer_id,product_id,date_id,payment_id,purchase_amount,review_rating
0,62,1001,127,2,4619.0,
1,159,1002,283,2,2456.0,2.0
2,63,1003,173,1,2102.0,4.1


## 5. Load Data into MySQL

### 5.1 Connect to MySQL

In [11]:
# MySQL connection
mysql_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'Oct2703thh',
    'database': 'ds2002_retail'
}

# Create SQLAlchemy engine
engine = create_engine(f"mysql+mysqlconnector://{mysql_config['user']}:{mysql_config['password']}@{mysql_config['host']}/{mysql_config['database']}")

print("✓ MySQL connection established")

✓ MySQL connection established


### 5.2 Load Dimension Tables

In [12]:
# Load dimensions (order matters - load dimensions before fact table)
print("Loading dimension tables...")

dim_customer.to_sql('customer_dim', engine, if_exists='replace', index=False)
print(f"  ✓ customer_dim: {len(dim_customer)} rows")

dim_product.to_sql('product_dim', engine, if_exists='replace', index=False)
print(f"  ✓ product_dim: {len(dim_product)} rows")

dim_date.to_sql('date_dim', engine, if_exists='replace', index=False)
print(f"  ✓ date_dim: {len(dim_date)} rows")

dim_payment.to_sql('payment_dim', engine, if_exists='replace', index=False)
print(f"  ✓ payment_dim: {len(dim_payment)} rows")

Loading dimension tables...
  ✓ customer_dim: 166 rows
  ✓ product_dim: 61 rows
  ✓ date_dim: 365 rows
  ✓ payment_dim: 2 rows


### 5.3 Load Fact Table

In [13]:
# Load fact table
print("Loading fact table...")

fact_sales_clean.to_sql('sales_fact', engine, if_exists='replace', index=False)
print(f"  ✓ sales_fact: {len(fact_sales_clean)} rows")

print("\n🎉 ETL Pipeline completed successfully!")

Loading fact table...
  ✓ sales_fact: 3400 rows

🎉 ETL Pipeline completed successfully!


## 6. Validation: Query Loaded Data

In [14]:
# Verify table row counts
query = """
SELECT 
    TABLE_NAME,
    TABLE_ROWS
FROM 
    INFORMATION_SCHEMA.TABLES
WHERE 
    TABLE_SCHEMA = 'ds2002_retail'
ORDER BY 
    TABLE_NAME;
"""

result = pd.read_sql(query, engine)
print("\nTable Row Counts:")
result


Table Row Counts:


Unnamed: 0,TABLE_NAME,TABLE_ROWS
0,customer_dim,72
1,date_dim,365
2,payment_dim,2
3,product_dim,61
4,sales_fact,3400


In [15]:
# Test query: Total sales by product
query = """
SELECT 
    pd.item_name, 
    SUM(sf.purchase_amount) AS total_sales,
    COUNT(*) AS num_sales,
    AVG(sf.review_rating) AS avg_rating
FROM sales_fact sf
JOIN product_dim pd ON sf.product_id = pd.product_id
GROUP BY pd.item_name
ORDER BY total_sales DESC
LIMIT 10;
"""

result = pd.read_sql(query, engine)
print("\nTop 10 Products by Sales:")
result


Top 10 Products by Sales:


Unnamed: 0,item_name,total_sales,num_sales,avg_rating
0,Tunic,17275.0,61,2.406897
1,Jeans,13068.0,58,3.263462
2,Pajamas,12798.0,81,2.820548
3,Shorts,12702.0,87,2.911111
4,Handbag,12668.0,72,3.169841
5,Gloves,12330.0,59,3.092593
6,Boots,11891.0,65,3.224138
7,Poncho,11422.0,73,2.95303
8,Flip-Flops,11309.0,51,3.302128
9,Slippers,11284.0,58,3.086


In [16]:
# Test query: Sales by customer segment
query = """
SELECT 
    cd.loyalty_member,
    COUNT(DISTINCT cd.customer_id) AS num_customers,
    SUM(sf.purchase_amount) AS total_sales,
    AVG(sf.purchase_amount) AS avg_sale
FROM sales_fact sf
JOIN customer_dim cd ON sf.customer_id = cd.customer_id
GROUP BY cd.loyalty_member;
"""

result = pd.read_sql(query, engine)
print("\nSales by Loyalty Status:")
result


Sales by Loyalty Status:


Unnamed: 0,loyalty_member,num_customers,total_sales,avg_sale
0,No,75,195086.0,153.008627
1,Yes,91,235866.0,159.909153


## Summary

✅ **Extract:**
- CSV files: Fashion_Retail_Sales.csv, customers.csv, products.csv
- MongoDB Atlas: suppliers collection

✅ **Transform:**
- Created surrogate keys
- Cleaned duplicates
- Generated date/payment dimensions
- Joined fact table with all dimensions

✅ **Load:**
- 4 dimension tables
- 1 fact table
- All loaded into MySQL star schema

✅ **Validation:**
- Verified row counts
- Tested query joins
- Confirmed data integrity