In [1]:
# =====================================================
# FlexiMart ETL Pipeline (FINAL - ERROR FREE)
# Handles: transaction date, data quality report
# =====================================================

import pandas as pd
import mysql.connector

# =====================================================
# DATABASE CONNECTION
# =====================================================
conn = mysql.connector.connect(
    host="localhost",
    user="root",
    password="sachin@1234",   # üî¥ CHANGE THIS
    database="fleximart",
    autocommit=False
)
cursor = conn.cursor()
print("‚úÖ Database connected")

# =====================================================
# READ CSV FILES
# =====================================================
customers = pd.read_csv("../data/customers_raw.csv")
products = pd.read_csv("../data/products_raw.csv")
sales = pd.read_csv("../data/sales_raw.csv")

print("‚úÖ CSV files loaded")

# =====================================================
# STANDARDIZE COLUMN NAMES
# =====================================================
customers.columns = customers.columns.str.strip().str.lower()
products.columns = products.columns.str.strip().str.lower()
sales.columns = (
    sales.columns
    .str.strip()
    .str.lower()
    .str.replace(" ", "_")
)

# =====================================================
# DATA QUALITY METRICS (BEFORE CLEANING)
# =====================================================
customer_count = len(customers)
customer_duplicates = customers.duplicated().sum()
missing_emails = customers["email"].isna().sum()

product_count = len(products)
missing_prices = products["price"].isna().sum()

sales_count = len(sales)
sales_duplicates = sales.duplicated().sum()

# =====================================================
# TRANSFORM - CUSTOMERS
# =====================================================
customers.drop_duplicates(inplace=True)

customers["email"] = customers["email"].fillna("unknown@email.com")

customers["phone"] = (
    customers["phone"]
    .astype(str)
    .str.replace(r"\D", "", regex=True)
    .str[-10:]
)
customers["phone"] = "+91-" + customers["phone"]

customers["registration_date"] = pd.to_datetime(
    customers["registration_date"],
    errors="coerce",
    dayfirst=True
)

# =====================================================
# TRANSFORM - PRODUCTS
# =====================================================
products["price"] = products["price"].fillna(products["price"].mean())
products["stock_quantity"] = products["stock_quantity"].fillna(0)
products["category"] = products["category"].str.title()

# =====================================================
# TRANSFORM - SALES (transaction date FIX)
# =====================================================
if "transaction_date" not in sales.columns:
    print("‚ùå Available columns in sales:", sales.columns.tolist())
    raise Exception("‚ùå transaction_date column not found in sales CSV")

sales.rename(columns={"transaction_date": "order_date"}, inplace=True)

sales.drop_duplicates(inplace=True)

sales["order_date"] = pd.to_datetime(
    sales["order_date"],
    errors="coerce",
    dayfirst=True
)

print("‚úÖ transaction_date successfully converted to order_date")

# =====================================================
# LOAD - CUSTOMERS (BULK INSERT)
# =====================================================
customer_data = []
for _, row in customers.iterrows():
    customer_data.append((
        row["first_name"],
        row["last_name"],
        row["email"],
        row["phone"],
        row["city"],
        row["registration_date"]
    ))

cursor.executemany("""
    INSERT IGNORE INTO customers
    (first_name, last_name, email, phone, city, registration_date)
    VALUES (%s, %s, %s, %s, %s, %s)
""", customer_data)

conn.commit()
print("‚úÖ Customers loaded")

# =====================================================
# LOAD - PRODUCTS
# =====================================================
product_data = []
for _, row in products.iterrows():
    product_data.append((
        row["product_name"],
        row["category"],
        row["price"],
        int(row["stock_quantity"])
    ))

cursor.executemany("""
    INSERT INTO products
    (product_name, category, price, stock_quantity)
    VALUES (%s, %s, %s, %s)
""", product_data)

conn.commit()
print("‚úÖ Products loaded")

# =====================================================
# DATA QUALITY REPORT (AUTO GENERATED)
# =====================================================
with open("data_quality_report.txt", "w") as f:
    f.write(f"Customers Processed: {customer_count}\n")
    f.write(f"Duplicates Removed: {customer_duplicates}\n")
    f.write(f"Missing Emails Filled: {missing_emails}\n")
    f.write(f"Products Processed: {product_count}\n")
    f.write(f"Missing Prices Filled: {missing_prices}\n")
    f.write(f"Sales Processed: {sales_count}\n")
    f.write(f"Duplicate Orders Removed: {sales_duplicates}\n")
    f.write("Records Loaded Successfully\n")

print("üìÑ Data quality report generated")

# =====================================================
# CLOSE CONNECTION
# =====================================================
cursor.close()
conn.close()

print("üéâ ETL PIPELINE COMPLETED SUCCESSFULLY")


‚úÖ Database connected
‚úÖ CSV files loaded
‚úÖ transaction_date successfully converted to order_date
‚úÖ Customers loaded
‚úÖ Products loaded
üìÑ Data quality report generated
üéâ ETL PIPELINE COMPLETED SUCCESSFULLY


  customers["registration_date"] = pd.to_datetime(
  sales["order_date"] = pd.to_datetime(
