In [10]:
import os
import pandas as pd
import numpy as np
import mysql.connector
import logging
from datetime import datetime

In [11]:
logging.basicConfig(
    filename='retail_etl.log',
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)

In [12]:
# Create the 'data/' folder if it doesn't exist
data_folder = 'data/'

if not os.path.exists(data_folder):
    os.makedirs(data_folder)
    print("✅ 'data/' folder created. Please add CSVs inside it.")
else:
    print("📁 'data/' folder already exists.")

📁 'data/' folder already exists.


In [13]:
import os
import pandas as pd

# Define folder where CSVs are stored
data_folder = 'data/'

# Create the folder if it doesn't exist
if not os.path.exists(data_folder):
    os.makedirs(data_folder)
    print("✅ 'data/' folder created. Add your CSV files into this folder.")
else:
    print("📁 'data/' folder already exists.")

# Combine all CSV files in the folder into a single DataFrame
combined_df = pd.DataFrame()

for file in os.listdir(data_folder):
    if file.endswith(".csv"):
        file_path = os.path.join(data_folder, file)
        temp_df = pd.read_csv(file_path)
        combined_df = pd.concat([combined_df, temp_df], ignore_index=True)

# Display the first few rows
print("✅ Raw data combined:")
print(combined_df.head())


📁 'data/' folder already exists.
✅ Raw data combined:
Empty DataFrame
Columns: []
Index: []


In [17]:
import numpy as np
from datetime import datetime

# ✅ Normalize column names FIRST
combined_df.columns = [col.strip().lower().replace(' ', '_') for col in combined_df.columns]

# 🔍 Step 1: Print available columns
print("📋 Available columns after normalization:")
print(combined_df.columns.tolist())

# ✅ Step 2: Check if required columns exist before proceeding
required_columns = ['quantity_sold', 'unit_price', 'discount_percent', 'payment_mode', 'date', 'store_id', 'product_id', 'product_name']

missing_columns = [col for col in required_columns if col not in combined_df.columns]
if missing_columns:
    print(f"❌ Missing columns in the data: {missing_columns}")
else:
    # ✅ Handle missing values
    combined_df.fillna({
        'quantity_sold': 0,
        'unit_price': 0.0,
        'discount_percent': 0.0,
        'payment_mode': 'Unknown'
    }, inplace=True)

    # ✅ Calculate total sale value
    combined_df['total_sale_value'] = (
        combined_df['quantity_sold'] *
        combined_df['unit_price'] *
        (1 - combined_df['discount_percent'] / 100)
    )

    # ✅ Convert 'date' column to datetime
    combined_df['date'] = pd.to_datetime(combined_df['date'], errors='coerce')

    # ✅ Drop duplicates
    combined_df.drop_duplicates(subset=['store_id', 'date', 'product_id'], inplace=True)

    # ✅ Categorize sales
    combined_df['sale_category'] = np.where(
        combined_df['total_sale_value'] >= 10000, 'High',
        np.where(combined_df['total_sale_value'] >= 5000, 'Medium', 'Low')
    )

    # 👀 Preview the cleaned data
    print("✅ Transformed DataFrame:")
    print(combined_df.head())
    print(combined_df.columns.tolist())


📋 Available columns after normalization:
[]
❌ Missing columns in the data: ['quantity_sold', 'unit_price', 'discount_percent', 'payment_mode', 'date', 'store_id', 'product_id', 'product_name']


In [18]:
import mysql.connector

try:
    # ✅ Connect to MySQL database (update credentials if needed)
    mydb = mysql.connector.connect(
        host="localhost",
        user="root",
        password="root",      # Update if you use a different password
        database="retail"     # Make sure this DB exists. Create manually if needed.
    )

    cursor = mydb.cursor()

    # ✅ Create table if not exists
    cursor.execute("""
        CREATE TABLE IF NOT EXISTS retail_sales (
            store_id VARCHAR(20),
            date DATE,
            product_id VARCHAR(20),
            product_name VARCHAR(100),
            quantity_sold INT,
            unit_price FLOAT,
            discount_percent FLOAT,
            payment_mode VARCHAR(20),
            total_sale_value FLOAT,
            sale_category VARCHAR(20),
            PRIMARY KEY (store_id, date, product_id)
        )
    """)

    # ✅ Insert each row with ON DUPLICATE KEY UPDATE (idempotent insert)
    for _, row in combined_df.iterrows():
        sql = """
            INSERT INTO retail_sales (
                store_id, date, product_id, product_name,
                quantity_sold, unit_price, discount_percent,
                payment_mode, total_sale_value, sale_category
            )
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
            ON DUPLICATE KEY UPDATE
                quantity_sold=VALUES(quantity_sold),
                unit_price=VALUES(unit_price),
                discount_percent=VALUES(discount_percent),
                payment_mode=VALUES(payment_mode),
                total_sale_value=VALUES(total_sale_value),
                sale_category=VALUES(sale_category)
        """
        values = (
            row['store_id'],
            row['date'],
            row['product_id'],
            row['product_name'],
            int(row['quantity_sold']),
            float(row['unit_price']),
            float(row['discount_percent']),
            row['payment_mode'],
            float(row['total_sale_value']),
            row['sale_category']
        )
        cursor.execute(sql, values)

    # ✅ Commit and close
    mydb.commit()
    cursor.close()
    mydb.close()

    print("✅ Data loaded into MySQL successfully.")

except mysql.connector.Error as err:
    print("❌ MySQL Error:", err)


❌ MySQL Error: 1045 (28000): Access denied for user 'root'@'localhost' (using password: YES)


In [20]:
# 🔍 Check actual column names first
print("📋 Available columns:", combined_df.columns.tolist())

# Map fallback names if store_id is missing
fallbacks = {
    'store_id': None,
    'product_name': None,
    'total_sale_value': None,
    'date': None
}

# Try to detect correct column names dynamically
for col in combined_df.columns:
    if 'store' in col and 'id' in col:
        fallbacks['store_id'] = col
    if 'product' in col and 'name' in col:
        fallbacks['product_name'] = col
    if 'total_sale_value' in col:
        fallbacks['total_sale_value'] = col
    if col == 'date':
        fallbacks['date'] = col

# Validate all required columns found
if None in fallbacks.values():
    print("❌ Missing required column(s):", [k for k, v in fallbacks.items() if v is None])
else:
    # ✅ Total sales per store
    store_sales = (
        combined_df.groupby(fallbacks['store_id'])[fallbacks['total_sale_value']]
        .sum()
        .reset_index()
        .sort_values(by=fallbacks['total_sale_value'], ascending=False)
    )
    store_sales.to_csv('store_sales_summary.csv', index=False)
    print("✅ store_sales_summary.csv created.")

    # ✅ Top 5 products
    top_products = (
        combined_df.groupby(fallbacks['product_name'])[fallbacks['total_sale_value']]
        .sum()
        .sort_values(ascending=False)
        .head(5)
        .reset_index()
    )
    top_products.to_csv('top_products.csv', index=False)
    print("✅ top_products.csv created.")

    # ✅ Daily sales trend
    daily_trend = (
        combined_df.groupby([fallbacks['date'], fallbacks['store_id']])[fallbacks['total_sale_value']]
        .sum()
        .reset_index()
        .sort_values(by=fallbacks['date'])
    )
    daily_trend.to_csv('daily_sales_trend.csv', index=False)
    print("✅ daily_sales_trend.csv created.")


📋 Available columns: []
❌ Missing required column(s): ['store_id', 'product_name', 'total_sale_value', 'date']
