In [55]:
#Step 1: Import Libraries
import pandas as pd
import numpy as np
import mysql.connector
import os
import glob
import logging
from datetime import datetime


In [56]:
#Step 2: Configure Logging
logging.basicConfig(filename='etl_log.log', level=logging.INFO,
                    format='%(asctime)s:%(levelname)s:%(message)s')


In [57]:
#Step 3: Extract Data from Multiple CSV Files
data_folder = 'data/'  
all_files = glob.glob(os.path.join(data_folder, "*.csv"))

if not all_files:
    logging.warning("No CSV files found in the data/ folder.")
    print("⚠️ No CSV files found. Please check the 'data/' folder path and file extensions.")
else:
    df_list = []
    for file in all_files:
        try:
            temp_df = pd.read_csv(file)
            df_list.append(temp_df)
            logging.info(f"Loaded file: {file}")
        except Exception as e:
            logging.error(f"Error loading {file}: {e}")
    
    if df_list:
        sales_df = pd.concat(df_list, ignore_index=True)
        print("Combined DataFrame shape:", sales_df.shape)
    else:
        print("⚠️ All files failed to load. Check CSV formatting.")


Combined DataFrame shape: (81, 8)


In [58]:
#Step 4: Transform the Data
# Handle missing values
sales_df.fillna({
    'Quantity_Sold': 0,
    'Unit_Price': 0.0,
    'Discount_Percent': 0.0,
    'Payment_Mode': 'Unknown'
}, inplace=True)


In [59]:
#Normalize column names first
sales_df.columns = [col.lower().strip().replace(" ", "_") for col in sales_df.columns]

# Create total_sale_value column using normalized names
sales_df['total_sale_value'] = (
    sales_df['quantity_sold'] * 
    sales_df['unit_price'] * 
    (1 - sales_df['discount_percent'] / 100)
)

# Convert date column to datetime
sales_df['date'] = pd.to_datetime(sales_df['date'], errors='coerce')

# Remove duplicates
sales_df.drop_duplicates(subset=['store_id', 'date', 'product_id'], inplace=True)

# Categorize sales
conditions = [
    (sales_df['total_sale_value'] >= 1000),
    (sales_df['total_sale_value'] >= 500) & (sales_df['total_sale_value'] < 1000),
    (sales_df['total_sale_value'] < 500)
]
choices = ['High', 'Medium', 'Low']
sales_df['sale_category'] = np.select(conditions, choices, default='Unknown')

print("Transformed DataFrame preview:")
sales_df.head()


Transformed DataFrame preview:


Unnamed: 0,store_id,date,product_id,product_name,quantity_sold,unit_price,discount_percent,payment_mode,total_sale_value,sale_category
0,S001,2023-07-01,P001,Soap,10,25.5,5,Cash,242.25,Low
1,S002,2023-07-01,P002,Shampoo,5,120.0,10,Card,540.0,Medium
2,S003,2023-07-02,P001,Soap,8,25.5,0,Cash,204.0,Low
3,S004,2023-07-02,P003,Toothpaste,12,40.0,2,Card,470.4,Low
4,S005,2023-07-03,P002,Shampoo,6,120.0,5,Cash,684.0,Medium


In [60]:
#Step 5: Load to MySQL
try:
    # Connect without specifying the database
    mydb = mysql.connector.connect(
        host="localhost",
        user="Srivarshan",
        password="1234567890"
    )
    cursor = mydb.cursor()
    # Create database if not exists, then use it
    cursor.execute("CREATE DATABASE IF NOT EXISTS retail")
    cursor.execute("USE retail")
    logging.info("Connected to MySQL and ensured database exists.")
except Exception as e:
    logging.error(f"MySQL connection failed: {e}")
    raise


In [61]:

# Create database and table if not exists
cursor.execute("CREATE DATABASE IF NOT EXISTS retail")
cursor.execute("USE retail")

create_table_query = """
CREATE TABLE IF NOT EXISTS retail_sales (
    store_id VARCHAR(20),
    date DATE,
    product_id VARCHAR(20),
    product_name VARCHAR(100),
    quantity_sold INT,
    unit_price FLOAT,
    discount_percent FLOAT,
    payment_mode VARCHAR(20),
    total_sale_value FLOAT,
    sale_category VARCHAR(10),
    PRIMARY KEY (store_id, date, product_id)
)
"""
cursor.execute(create_table_query)


In [62]:
# Insert data with idempotency
insert_query = """
REPLACE INTO retail_sales (
    store_id, date, product_id, product_name, quantity_sold,
    unit_price, discount_percent, payment_mode, total_sale_value, sale_category
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""

for _, row in sales_df.iterrows():
    cursor.execute(insert_query, tuple(row))
mydb.commit()
logging.info("Data inserted into MySQL successfully.")
print("✅ Data loaded into MySQL.")


✅ Data loaded into MySQL.


In [63]:

#Step 6: Analysis & Reporting
# Total sales per store
store_sales = sales_df.groupby('store_id')['total_sale_value'].sum().reset_index()
store_sales.to_csv('store_sales_summary.csv', index=False)

# Top 5 products by total sales
top_products = sales_df.groupby('product_name')['total_sale_value'].sum().nlargest(5).reset_index()
top_products.to_csv('top_5_products.csv', index=False)

# Daily sales trend per store
daily_trend = sales_df.groupby(['store_id', 'date'])['total_sale_value'].sum().reset_index()
daily_trend.to_csv('daily_sales_trend.csv', index=False)

print("📁 Reports exported successfully.")

📁 Reports exported successfully.
