In [1]:
import pandas as pd
import sys
pd.set_option("display.max_colwidth", None)
pd.set_option("display.width", 800)

In [2]:
import pandas as pd

def ingest_csv(file_path):
    print("[INGEST] Reading CSV...")
    df = pd.read_csv(file_path)
    print(f"[INGEST] Loaded {len(df)} rows with {len(df.columns)} columns.")
    return df

In [3]:
txn = ingest_csv("../data/mock_transactions.csv")
print(txn.head(15))
print(txn.shape)

[INGEST] Reading CSV...
[INGEST] Loaded 10000 rows with 5 columns.
                          transaction_id  bank_id  customer_id  transaction_amount transaction_date
0   595fbc6c-9cc5-44fa-8fc5-6f70e0a2909b     1007         2043             2989.78       2025-07-24
1   fa427870-d966-4398-9b75-3e3fd0bcbf29     1004         2012             2293.15       2025-07-13
2   68c9433b-0235-4ee5-bb4c-0a1f84455418     1008         2013             1710.41       2025-05-20
3   094ac07a-9c89-49f5-9284-768c862113cf     1005         2036             2344.39       2025-07-22
4   189fb491-17c3-4494-bcce-b8ae86d7b567     1007         2039             1563.96       2025-05-29
5   44bedefc-52a3-4a01-a5ae-e9ae9d6394c1     1003         2034             2196.23       2025-06-25
6   7e46562f-712e-41bc-a5cd-2d58f8641bcc     1007         2006             1800.98       2025-06-20
7   8cb85b01-ee2e-4cd9-aaed-2740ce74679b     1008         2038             2835.36       2025-07-06
8   599daa68-0d9e-4a7c-808c-66fd0

In [20]:
from sqlalchemy import create_engine, text
def get_engine(user, password, host, port, db):
    url = f"mysql+pymysql://{user}:{password}@{host}:{port}/{db}"
    return create_engine(url)

engine = get_engine("root", "root", "localhost", 3306, "eft_db")

In [21]:
def transform_customer_txns(df):
    print("[TRANSFORM-CUSTOMER] Starting transformation...")

    required_cols = ["customer_id", "transaction_date", "transaction_amount", "transaction_id"]
    missing = [c for c in required_cols if c not in df.columns]
    if missing:
        raise ValueError(f"Missing columns: {missing}")

    df["transaction_date"] = pd.to_datetime(df["transaction_date"], errors="coerce")
    df["transaction_amount"] = pd.to_numeric(df["transaction_amount"], errors="coerce")
    df["customer_id"] = pd.to_numeric(df["customer_id"], errors="coerce", downcast="integer")

    before_rows = len(df)
    df = df.dropna(subset=["transaction_date", "transaction_amount", "customer_id"])
    print(f"[TRANSFORM-CUSTOMER] Dropped {before_rows - len(df)} null rows.")

    before_rows = len(df)
    df = df[df["transaction_amount"] >= 0]
    print(f"[TRANSFORM-CUSTOMER] Removed {before_rows - len(df)} rows with negative amounts.")

    agg_df = (
        df.groupby(["customer_id", "transaction_date"], as_index=False)
          .agg(
              total_amount=("transaction_amount", "sum"),
              num_transactions=("transaction_id", "count")
          )
    )
    agg_df.rename(columns={"transaction_date": "agg_date"}, inplace=True)
    print(f"[TRANSFORM-CUSTOMER] Aggregated into {len(agg_df)} rows.")

    return agg_df


In [22]:
cust_df = transform_customer_txns(txn)

[TRANSFORM-CUSTOMER] Starting transformation...
[TRANSFORM-CUSTOMER] Dropped 0 null rows.
[TRANSFORM-CUSTOMER] Removed 0 rows with negative amounts.
[TRANSFORM-CUSTOMER] Aggregated into 4023 rows.


In [23]:
def load_to_mysql(engine, df, table_name, truncate=False):
    if df.empty:
        print(f"[LOAD] No data to load for {table_name}.")
        return
    
    with engine.begin() as conn:
        if truncate:
            print(f"[LOAD] Truncating table {table_name}...")
            conn.execute(text(f"TRUNCATE TABLE {table_name}"))

        df.to_sql(table_name, engine, if_exists="append", index=False)
        print(f"[LOAD] Inserted {len(df)} rows into {table_name}.")

In [25]:
load_to_mysql(engine, cust_df, "ana_customer_daily_summary", truncate=True)

[LOAD] Truncating table ana_customer_daily_summary...
[LOAD] Inserted 4023 rows into ana_customer_daily_summary.
