In [None]:
import os
import pandas as pd
from sqlalchemy import create_engine
from word2number import w2n

DB_URI = "" #Fill in the DB URI
engine = create_engine(DB_URI)


# CLEANING UTILS

def convert_to_number(val):
    try:
        return int(val)
    except (ValueError, TypeError):
        try:
            return w2n.word_to_num(str(val).lower())
        except:
            return None

def clean_numeric_column(series):
    return series.apply(lambda x: convert_to_number(x) if isinstance(x, str) else x)


# EXTRACT + TRANSFORM
def load_customers(path):
    df = pd.read_json(path)
    df.columns = df.columns.str.lower()
    return df

def load_products(path):
    df = pd.read_csv(path)
    df.columns = df.columns.str.lower()
    df['price'] = pd.to_numeric(df['price'], errors='coerce')
    return df

def load_inventory(path):
    df = pd.read_csv(path)
    df.columns = df.columns.str.lower()
    for col in ['stock_level', 'reorder_level', 'avg_daily_sales', 'days_until_reorder']:
        df[col] = clean_numeric_column(df[col])
    return df

def load_sales_folder(folder_path, limit=None):
    all_files = sorted([
        os.path.join(folder_path, f) 
        for f in os.listdir(folder_path) 
        if f.endswith('.xlsx')
    ])
    if limit:
        all_files = all_files[:limit]
    
    sales_data = []
    for file in all_files:
        df = pd.read_excel(file)
        df.columns = df.columns.str.lower()
        df['sale_date'] = pd.to_datetime(df['sale_date'], errors='coerce')
        df['quantity'] = clean_numeric_column(df['quantity'])
        df['product_price'] = pd.to_numeric(df['product_price'], errors='coerce')
        df['total_sale_amount'] = df['quantity'] * df['product_price']
        sales_data.append(df)
    
    return pd.concat(sales_data, ignore_index=True)

def transform_and_join(sales, customers, products, inventory):
    # Join sales with customers and products
    sales = sales.merge(customers, on='customer_id', how='left')
    sales = sales.merge(products, on='product_id', how='left')
    
    # Optional: Join with inventory (not always 1:1, may need warehouse/date match logic)
    sales = sales.merge(inventory, on='product_id', how='left')
    
    return sales


# LOAD INTO DATABASE
def load_to_sql(df, table_name, engine, if_exists='append'):
    df.to_sql(table_name, con=engine, index=False, if_exists=if_exists, method='multi')
    print(f"Loaded {len(df)} records into '{table_name}'.")


# MAIN ETL FUNCTION


def run_etl_pipeline(
    sales_folder="sales/",
    customers_path="customers.json",
    products_path="products.csv",
    inventory_path="inventory.csv",
    output_path="output/cleaned_sales_data.csv",
    limit_sales_files=None
):
    print("Loading raw data...")
    customers = load_customers(customers_path)
    products = load_products(products_path)
    inventory = load_inventory(inventory_path)
    sales = load_sales_folder(sales_folder, limit=limit_sales_files)

    print("Cleaning and transforming...")

    # print("Transforming and joining...")
    # final_df = transform_and_join(sales, customers, products, inventory)

    # print(f"Saving output to {output_path}")
    # os.makedirs(os.path.dirname(output_path), exist_ok=True)
    # final_df.to_csv(output_path, index=False)
    # print("ETL complete!")

    print("Loading to SQL database...")
    load_to_sql(customers, "customers", engine)
    load_to_sql(products, "products", engine)
    load_to_sql(inventory, "inventory", engine)
    load_to_sql(sales, "sales", engine)

    print(" ETL Pipeline Completed!")