In [18]:
import os
import hashlib
import datetime
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError, OperationalError

# ====== CONFIG ======
DB_USER = "root"
DB_PASS = "1234qwert"
DB_HOST = "localhost"
DB_PORT = "3306"
DB_NAME = "retail_analytics"
CSV_CHUNKSIZE = 5000
csv_file = r"C:\Users\saubh\OneDrive\Desktop\Retail_Project\Data1\sales_data_sample.csv"

EXPECTED_COLS = [
    "ORDERNUMBER","QUANTITYORDERED","PRICEEACH","ORDERLINENUMBER","SALES",
    "ORDERDATE","STATUS","QTR_ID","MONTH_ID","YEAR_ID","PRODUCTLINE","MSRP",
    "PRODUCTCODE","CUSTOMERNAME","PHONE","ADDRESSLINE1","ADDRESSLINE2",
    "CITY","STATE","POSTALCODE","COUNTRY","TERRITORY","CONTACTLASTNAME",
    "CONTACTFIRSTNAME","DEALSIZE"
]

# ====== HELPERS ======
def make_engine():
    url = f"mysql+mysqlconnector://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}?charset=utf8mb4"
    return create_engine(url, pool_pre_ping=True)

def hash_row(row: pd.Series) -> str:
    s = "|".join([str(row.get(c, "")) for c in EXPECTED_COLS])
    return hashlib.sha256(s.encode("utf-8")).hexdigest()

# ====== MAIN ======
def main(csv_path):
    if not os.path.exists(csv_path):
        print("CSV not found:", csv_path)
        return 1

    try:
        engine = make_engine()
        connection = engine.connect()
    except OperationalError as e:
        print("MySQL connection failed:", e)
        return 1

    ingestion_id = None
    try:
        # Start ingestion log
        start = datetime.datetime.utcnow()
        r = connection.execute(text("""
            INSERT INTO ingestion_logs (run_started_at, source_filename, rows_seen, rows_loaded, rows_rejected, warnings)
            VALUES (:started, :fname, 0, 0, 0, :warnings)
        """), {"started": start, "fname": os.path.basename(csv_path), "warnings": ""})
        ingestion_id = r.lastrowid

        rows_seen = 0
        rows_loaded = 0
        rows_rejected = 0
        warnings = []

        # Header check
        df_head = pd.read_csv(csv_path, nrows=0, encoding='latin1')
        csv_cols = [c.strip() for c in df_head.columns.tolist()]
        missing = set(EXPECTED_COLS) - set(csv_cols)
        extra = set(csv_cols) - set(EXPECTED_COLS)
        if missing:
            warnings.append(f"missing_columns:{sorted(list(missing))}")
        if extra:
            warnings.append(f"extra_columns:{sorted(list(extra))}")

        # Chunked read
        for chunk in pd.read_csv(
            csv_path,
            chunksize=CSV_CHUNKSIZE,
            dtype=str,
            keep_default_na=False,
            na_values=[""],
            encoding='latin1'
        ):
            chunk.columns = [c.strip() for c in chunk.columns]
            rows_seen += len(chunk)

            # Ensure all expected columns
            for c in EXPECTED_COLS:
                if c not in chunk.columns:
                    chunk[c] = None

            # Convert numerics
            for num_col in ["QUANTITYORDERED","PRICEEACH","SALES","MSRP","ORDERLINENUMBER","QTR_ID","MONTH_ID","YEAR_ID"]:
                if num_col in chunk.columns:
                    chunk[num_col] = pd.to_numeric(chunk[num_col].replace("", pd.NA), errors='coerce')

            # Convert ORDERDATE (robust handling)
            if "ORDERDATE" in chunk.columns:
                chunk["ORDERDATE"] = pd.to_datetime(
                    chunk["ORDERDATE"].str.strip(),
                    errors="coerce",
                    infer_datetime_format=True
                ).dt.date

            # Metadata
            chunk["_source_filename"] = os.path.basename(csv_path)
            chunk["_ingested_at"] = datetime.datetime.utcnow()
            chunk["_row_hash"] = chunk.apply(hash_row, axis=1)

            # Row-level validation
            bad_idx = []
            error_rows = []

            # Detect intra-chunk duplicate PKs
            if "ORDERNUMBER" in chunk.columns and "ORDERLINENUMBER" in chunk.columns:
                dup_mask = chunk.duplicated(subset=["ORDERNUMBER", "ORDERLINENUMBER"], keep="first")
                for i, is_dup in dup_mask.items():
                    if is_dup:
                        bad_idx.append(i)
                        error_rows.append({
                            "error_type": "duplicate_pk",
                            "ordernumber": chunk.at[i, "ORDERNUMBER"],
                            "orderlinenumber": chunk.at[i, "ORDERLINENUMBER"],
                            "raw_row": chunk.loc[i].to_json(date_format='iso'),
                            "error_details": "Duplicate ordernumber+orderlinenumber in same file"
                        })

            for i, row in chunk.iterrows():
                if i in bad_idx:
                    continue
                if pd.isna(row["ORDERDATE"]):
                    bad_idx.append(i)
                    error_rows.append({
                        "error_type": "invalid_date",
                        "ordernumber": row.get("ORDERNUMBER"),
                        "orderlinenumber": row.get("ORDERLINENUMBER"),
                        "raw_row": row.to_json(date_format='iso'),
                        "error_details": "ORDERDATE parse failure or missing"
                    })
                    continue
                try:
                    price = float(row["PRICEEACH"]) if not pd.isna(row["PRICEEACH"]) else None
                except Exception:
                    price = None
                if price is None or price < 0:
                    bad_idx.append(i)
                    error_rows.append({
                        "error_type": "invalid_price",
                        "ordernumber": row.get("ORDERNUMBER"),
                        "orderlinenumber": row.get("ORDERLINENUMBER"),
                        "raw_row": row.to_json(date_format='iso'),
                        "error_details": f"PRICEEACH invalid: {row.get('PRICEEACH')}"
                    })
                    continue
                try:
                    qty = row.get("QUANTITYORDERED")
                    if pd.isna(qty) or int(qty) < 0:
                        bad_idx.append(i)
                        error_rows.append({
                            "error_type": "invalid_quantity",
                            "ordernumber": row.get("ORDERNUMBER"),
                            "orderlinenumber": row.get("ORDERLINENUMBER"),
                            "raw_row": row.to_json(date_format='iso'),
                            "error_details": f"QUANTITYORDERED invalid: {qty}"
                        })
                        continue
                except Exception:
                    bad_idx.append(i)
                    error_rows.append({
                        "error_type": "invalid_quantity_parse",
                        "ordernumber": row.get("ORDERNUMBER"),
                        "orderlinenumber": row.get("ORDERLINENUMBER"),
                        "raw_row": row.to_json(date_format='iso'),
                        "error_details": f"QUANTITYORDERED parse failed: {qty}"
                    })
                    continue

            # Remove bad rows
            if bad_idx:
                bad_rows = chunk.loc[bad_idx]
                chunk = chunk.drop(index=bad_idx)
                rows_rejected += len(bad_rows)
                for er in error_rows:
                    connection.execute(text("""
                        INSERT INTO ingestion_errors (ingestion_id, error_type, ordernumber, orderlinenumber, raw_row, error_details)
                        VALUES (:ingestion_id, :error_type, :ordernumber, :orderlinenumber, :raw_row, :error_details)
                    """), {
                        "ingestion_id": ingestion_id,
                        "error_type": er["error_type"],
                        "ordernumber": er["ordernumber"],
                        "orderlinenumber": er["orderlinenumber"],
                        "raw_row": er["raw_row"],
                        "error_details": er["error_details"]
                    })

            # Insert good rows
            if not chunk.empty:
                df_to_insert = chunk.rename(columns={c: c.lower() for c in chunk.columns})
                df_to_insert.to_sql('raw_staging_sales', con=engine, if_exists='append', index=False, method='multi', chunksize=1000)
                rows_loaded += len(df_to_insert)

        # Update log
        end = datetime.datetime.utcnow()
        connection.execute(text("""
            UPDATE ingestion_logs
            SET run_completed_at = :completed,
                rows_seen = :seen,
                rows_loaded = :loaded,
                rows_rejected = :rejected,
                warnings = :warnings
            WHERE ingestion_id = :ingestion_id
        """), {
            "completed": end,
            "seen": rows_seen,
            "loaded": rows_loaded,
            "rejected": rows_rejected,
            "warnings": ";".join(warnings),
            "ingestion_id": ingestion_id
        })

        print(f"Ingestion complete: seen={rows_seen} loaded={rows_loaded} rejected={rows_rejected}")

    except SQLAlchemyError as e:
        msg = str(e.__dict__.get('orig')) if hasattr(e, '__dict__') else str(e)
        print("DB error:", msg)
        if ingestion_id:
            connection.execute(text("UPDATE ingestion_logs SET run_completed_at=NOW(), error_message=:err WHERE ingestion_id=:iid"),
                               {"err": msg, "iid": ingestion_id})
        raise
    finally:
        connection.close()
    return 0

if __name__ == "__main__":
    main(csv_file)


  chunk["ORDERDATE"] = pd.to_datetime(


Ingestion complete: seen=2823 loaded=2823 rejected=0


In [21]:
import pandas as pd
expected = [  "ORDERNUMBER","QUANTITYORDERED","PRICEEACH","ORDERLINENUMBER","SALES",
    "ORDERDATE","STATUS","QTR_ID","MONTH_ID","YEAR_ID","PRODUCTLINE","MSRP",
    "PRODUCTCODE","CUSTOMERNAME","PHONE","ADDRESSLINE1","ADDRESSLINE2",
    "CITY","STATE","POSTALCODE","COUNTRY","TERRITORY","CONTACTLASTNAME",
    "CONTACTFIRSTNAME","DEALSIZE" ]  # same EXPECTED_COLS list
df = pd.read_csv("Data1/sales_data_sample.csv", nrows=0, encoding="latin1")
cols = [c.strip() for c in df.columns]
missing = set(expected) - set(cols)
extra = set(cols) - set(expected)
print("missing", missing)
print("extra", extra)


missing set()
extra set()


In [8]:
import os
df = pd.read_csv("Data1/sales_data_sample.csv", parse_dates=['ORDERDATE'], dayfirst=False, keep_default_na=False, encoding="latin1")

# Filter bad dates
bad_dates = df[df['ORDERDATE'].isna()]

# Ensure the folder exists
os.makedirs("data", exist_ok=True)

# Save the bad rows
bad_dates.to_csv("Data1/bad_orderdate_rows.csv", index=False)


In [4]:
import pandas as pd
import numpy as np

# Load dataset from your CSV file
df = pd.read_csv("Data1/sales_data_sample.csv", encoding="latin1")

# Convert numeric fields
df['msrp'] = pd.to_numeric(df['MSRP'], errors='coerce')
df['priceeach'] = pd.to_numeric(df['PRICEEACH'], errors='coerce')
df['quantityordered'] = pd.to_numeric(df['QUANTITYORDERED'], errors='coerce').fillna(0).astype(int)

# Derived fields
df['cost_each'] = df['msrp'].fillna(0) * 0.6   # assume cost = 60% of MSRP
df['profit_est'] = (df['priceeach'] - df['cost_each']) * df['quantityordered']
df['discount_pct'] = np.where(df['msrp'] > 0, (df['msrp'] - df['priceeach']) / df['msrp'], 0)

# Preview
print(df[['MSRP','PRICEEACH','quantityordered','cost_each','profit_est','discount_pct']].head())



   MSRP  PRICEEACH  quantityordered  cost_each  profit_est  discount_pct
0    95      95.70               30       57.0     1161.00     -0.007368
1    95      81.35               34       57.0      827.90      0.143684
2    95      94.74               41       57.0     1547.34      0.002737
3    95      83.26               45       57.0     1181.70      0.123579
4    95     100.00               49       57.0     2107.00     -0.052632
