In [3]:
import os
import json
import pandas as pd
import sqlite3
from datetime import datetime

In [4]:
#load Configuration
def load_config(path='config.json'):
    if not os.path.exists(path):
     raise FileNotFoundError(f"Configuration file not found at {path}")
    with open(path, 'r') as f:
        return json.load(f)

In [5]:
#Read Data from csv
def read_raw(file_path):
    if not os.path.exists(file_path):
        raise FileNotFoundError(f"File not found: {file_path}")

    if file_path.lower().endswith(".csv"):
        try:
            return pd.read_csv(file_path, low_memory=False)
        except UnicodeDecodeError:
            return pd.read_csv(file_path, encoding="latin1", low_memory=False)

    elif file_path.lower().endswith(".xlsx") or file_path.lower().endswith(".xls"):
        return pd.read_excel(file_path)

    else:
        raise ValueError("Unsupported file type. Use CSV or Excel.")


In [6]:
def parse_invoice_date(df):
    # common date column names
    possible_cols = ["InvoiceDate", "invoice_date", "Invoice Date", "Date"]

    for col in possible_cols:
        if col in df.columns:
            df["invoice_date"] = pd.to_datetime(df[col], errors="coerce")
            return df

    # fallback: try to detect automatically
    for col in df.columns:
        if df[col].dtype == object:
            try:
                df["invoice_date"] = pd.to_datetime(df[col], errors="coerce")
                return df
            except:
                pass

    # if still not found
    df["invoice_date"] = pd.NaT
    return df


In [7]:
#normalize columns name to lower case and replace spaces with underscores

def normalize_columns(df):
    col_map = {}
    for c in df.columns:
        lc = c.lower().replace(" ","")
        if lc in ("invoiceno","invoice"):
            col_map[c] ="invoice_no"
        if lc == "quantity":
            col_map[c] ="quantity"
        if lc in ("unitprice","unit_price"):
            col_map[c] = "unit_price"
        if lc in ("description","product","productname"):
            col_map[c] = "description"
        if lc == "country":
            col_map[c] = "country"
    return df.rename(columns=col_map)
    

In [8]:
def clean_numeric(df):
    # Fix quantity column
    if "quantity" in df.columns:
        df["quantity"] = pd.to_numeric(df["quantity"], errors='coerce').fillna(0).astype(int)
    else:
        df["quantity"] = 0

    # Fix unit_price column
    if "unit_price" in df.columns:
        df["unit_price"] = pd.to_numeric(df["unit_price"], errors='coerce').fillna(0.0)
    else:
        df["unit_price"] = 0.0

    return df


In [9]:
def derive_revenue(df, simulate_cost_pct=0.6):
    df["revenue"] = df["quantity"] * df["unit_price"]
    df["cost_per_unit"] = df["unit_price"] * simulate_cost_pct
    df["total_cost"] = df["cost_per_unit"] * df["quantity"]
    df["profit"] = df["revenue"] - df["total_cost"]

    # create invoice_date_only safely
    if "invoice_date" in df.columns:
        df["invoice_date_only"] = pd.to_datetime(df["invoice_date"].dt.date)
    else:
        raise ValueError("invoice_date is missing. Cannot create invoice_date_only.")

    return df


In [10]:
#here we delete duplicate
def duplicates(df):
    if "invoice_no" in df.columns and  "quantity" in df.columns:

        df =df.drop_duplicates(subset=["invoice_no","quantity"])
    else:
        df = df.drop_duplicates()
    return df

In [11]:
def compute_daily_kpis(df):
    """
    Create daily KPI summary from cleaned transaction data.

    Requires in df:
      - 'invoice_date_only' (date per row)
      - 'revenue' (numeric)
      - optionally 'invoice_no' (for order count)
    """
    import pandas as pd

    # safety checks
    if "invoice_date_only" not in df.columns:
        raise ValueError("invoice_date_only is missing. Run derive_revenue() before compute_daily_kpis().")
    if "revenue" not in df.columns:
        raise ValueError("revenue column is missing. Make sure derive_revenue() created it.")

    # make sure date-only is treated as date
    df["invoice_date_only"] = pd.to_datetime(df["invoice_date_only"]).dt.date

    # group by day
    kpi = (
        df.groupby("invoice_date_only")
          .agg(
              total_revenue=("revenue", "sum"),
              total_orders=("invoice_no", lambda s: s.nunique() if "invoice_no" in df.columns else len(s))
          )
          .reset_index()
          .sort_values("invoice_date_only")
    )

    # previous day's revenue
    kpi["prev_revenue"] = kpi["total_revenue"].shift(1)

    # safe percent change (avoid divide by zero)
    kpi["revenue_change_pct"] = 0.0
    mask = kpi["prev_revenue"].notna() & (kpi["prev_revenue"] != 0)
    kpi.loc[mask, "revenue_change_pct"] = (
        (kpi.loc[mask, "total_revenue"] - kpi.loc[mask, "prev_revenue"])
        / kpi.loc[mask, "prev_revenue"] * 100
    ).round(2)

    return kpi


In [12]:
from datetime import datetime
import os

def save_csv(df_clean, kpi_df, folder="data/processed"):
    """
    Save cleaned transaction data and daily KPI summary to CSV files.
    """
    os.makedirs(folder, exist_ok=True)

    # unique name per run
    date_str = datetime.now().strftime("%Y%m%d_%H%M%S")

    clean_path = os.path.join(folder, f"sales_clean_{date_str}.csv")
    kpi_path  = os.path.join(folder, f"daily_kpis_{date_str}.csv")

    df_clean.to_csv(clean_path, index=False)
    kpi_df.to_csv(kpi_path, index=False)

    print(f"Cleaned data saved to: {clean_path}")
    print(f"KPI data saved to:     {kpi_path}")

    return clean_path, kpi_path


In [24]:
import mysql.connector

def save_kpi_to_mysql(kpi_df):
    conn = mysql.connector.connect(
        host="localhost",
        port = 3306,
        user="root",
        password="Rasik@2005",
        database="retail_etl"
    )
    
    cursor = conn.cursor()

    insert_sql = """
    INSERT INTO daily_kpis (invoice_date, total_revenue, total_orders, revenue_change_pct)
    VALUES (%s, %s, %s, %s)
    ON DUPLICATE KEY UPDATE
        total_revenue = VALUES(total_revenue),
        total_orders = VALUES(total_orders),
        revenue_change_pct = VALUES(revenue_change_pct);
    """

    records = []
    for _, row in kpi_df.iterrows():
        records.append((
            row["invoice_date_only"],
            row["total_revenue"],
            row["total_orders"],
            row["revenue_change_pct"]
        ))

    cursor.executemany(insert_sql, records)
    conn.commit()
    cursor.close()
    conn.close()

    print("KPI data saved to MySQL table 'daily_kpis'.")
    


In [15]:
def check_alert_print(kpi_df, threshold):
    """
    Print alerts when % revenue change exceeds +/- threshold.
    threshold: percent drop or increase considered significant
    """
    latest = kpi_df.iloc[-1]       # last date
    prev = kpi_df.iloc[-2] if len(kpi_df) > 1 else None

    change = latest["revenue_change_pct"]

    print("\n>>> Alert Check")
    print(f"Latest Date: {latest['invoice_date_only']}, Change: {change}%")

    if change <= -threshold:
        print(f"ðŸš¨ Sales dropped by {abs(change)}% vs previous day!")
    elif change >= threshold:
        print(f"ðŸ“ˆ Sales increased by {change}% vs previous day!")
    else:
        print("âœ“ No major sales change detected.")

def main():
    # Load config
    config = load_config()

    # Read raw data
    raw_path = config.get("data_file", "data/raw/sales_data.csv")
    df_raw = read


In [25]:
def etl_process(config):
    raw_csv          = config["raw_csv"]
    processed_folder = config["processed_folder"]
    threshold        = config["sales_drop_threshold_pct"]

    print(">>> Reading data")
    df = read_raw(raw_csv)

    print(">>> Parsing date")
    df = parse_invoice_date(df)

    print(">>> Normalizing columns")
    df = normalize_columns(df)

    print(">>> Cleaning numeric values")
    df = clean_numeric(df)

    print(">>> Calculating revenue & profit")
    df_clean = derive_revenue(df)

    print(">>> Removing duplicates")
    df_clean = duplicates(df_clean)

    print(">>> Computing daily KPIs")
    kpi_df = compute_daily_kpis(df_clean)

    print(">>> Saving cleaned & KPI CSVs")
    save_csv(df_clean, kpi_df, processed_folder)

    print(">>> Saving KPIs to MySQL")
    save_kpi_to_mysql(kpi_df)   # ðŸ‘ˆ HERE we push to SQL

    print(">>> Checking alerts")
    check_alert_print(kpi_df, threshold)

    print("\nPipeline completed successfully!")

    
    
    
    
    
    
if __name__ == "__main__":
    cfg = load_config()
    etl_process(cfg)



>>> Reading data
>>> Parsing date
>>> Normalizing columns
>>> Cleaning numeric values
>>> Calculating revenue & profit
>>> Removing duplicates
>>> Computing daily KPIs
>>> Saving cleaned & KPI CSVs


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["invoice_date_only"] = pd.to_datetime(df["invoice_date_only"]).dt.date


Cleaned data saved to: data/processed\sales_clean_20251120_134813.csv
KPI data saved to:     data/processed\daily_kpis_20251120_134813.csv
>>> Saving KPIs to MySQL
KPI data saved to MySQL table 'daily_kpis'.
>>> Checking alerts

>>> Alert Check
Latest Date: 2010-12-09, Change: 0.0%
âœ“ No major sales change detected.

Pipeline completed successfully!
