In [1]:
from dotenv import load_dotenv
load_dotenv()

True

In [2]:
import pandas as pd
from typing import Tuple, Dict

# ----------------------------------------------------------------------
# 1. Loader ----------------------------------------------------------------
def load_transactions(csv_path: str) -> pd.DataFrame:
    """
    Read the raw AML CSV and parse timestamps.
    """
    df = pd.read_csv(csv_path, parse_dates=["tx_datetime"])
    # Normalise column names if you expect user-supplied files
    return df


# ----------------------------------------------------------------------
# 2. Helper filters --------------------------------------------------------
def window(df: pd.DataFrame, start, end) -> pd.DataFrame:
    """Return rows whose tx_datetime falls inside [start, end]."""
    mask = (df.tx_datetime >= start) & (df.tx_datetime <= end)
    return df.loc[mask].copy()


# ----------------------------------------------------------------------
# 3. KPI engines -----------------------------------------------------------
def totals(df: pd.DataFrame) -> Dict[str, Dict[str, float]]:
    """
    Return count & ZAR value of deposits and withdrawals.
    """
    dep = df[df.amount > 0]
    wd  = df[df.amount < 0]

    return {
        "deposits": {
            "count": len(dep),
            "value": dep.amount.sum()
        },
        "withdrawals": {
            "count": len(wd),
            "value": wd.amount.abs().sum()   # absolute for readability
        }
    }


def hourly_bursts(df: pd.DataFrame) -> pd.Series:
    """
    Transactions per hour (index 0-23).
    """
    return (
        df.assign(hour=df.tx_datetime.dt.hour)
          .groupby("hour")["amount"]
          .size()
          .reindex(range(24), fill_value=0)
    )


def domestic_split(df: pd.DataFrame, home_code: str = "ZA") -> Dict[str, Dict[str, float]]:
    """
    Counts and totals for domestic vs international.
    """
    df = df.assign(is_domestic=df.counterparty_country_code.eq(home_code))
    grp = df.groupby("is_domestic")["amount"].agg(["count", "sum"])

    def row(flag: bool):
        row = grp.loc[flag] if flag in grp.index else {"count": 0, "sum": 0.0}
        return {"count": int(row["count"]), "value": float(abs(row["sum"]))}

    return {
        "domestic":      row(True),
        "international": row(False)
    }


def extremes(df: pd.DataFrame) -> Dict[str, pd.Series]:
    """
    Largest deposit & withdrawal rows (entire records for drill-down).
    """
    largest_dep = df[df.amount > 0].nlargest(1, "amount").squeeze()
    largest_wd  = df[df.amount < 0].nsmallest(1, "amount").squeeze()
    return {"largest_deposit": largest_dep, "largest_withdrawal": largest_wd}


def channel_mix(df: pd.DataFrame) -> pd.DataFrame:
    """
    Count & value by channel, sorted by total value.
    """
    return (
        df.groupby("channel")["amount"]
          .agg(count="size", value="sum")
          .abs()                     # withdrawals should be positive in the KPI
          .sort_values("value", ascending=False)
    )


# ----------------------------------------------------------------------
# 4. Convenience wrapper --------------------------------------------------
def kpi_bundle(df: pd.DataFrame, start, end) -> Dict:
    """
    One-stop shop that returns all required insights for the UI.
    """
    dfw = window(df, start, end)

    return {
        "totals":          totals(dfw),
        "per_hour":        hourly_bursts(dfw).to_dict(),
        "domestic_split":  domestic_split(dfw),
        "extremes":        extremes(dfw),          # full rows for tooltip/table
        "channel_mix":     channel_mix(dfw).to_dict(orient="index")
    }


In [4]:
df = load_transactions("aml_synthetic_transactions.csv")
metrics = kpi_bundle(df, "2025-06-01", "2025-06-30")
print(metrics)

{'totals': {'deposits': {'count': 5, 'value': np.float64(103885.31)}, 'withdrawals': {'count': 7, 'value': np.float64(32120.12)}}, 'per_hour': {0: 0, 1: 0, 2: 0, 3: 2, 4: 0, 5: 1, 6: 1, 7: 0, 8: 0, 9: 0, 10: 1, 11: 1, 12: 0, 13: 0, 14: 0, 15: 1, 16: 0, 17: 0, 18: 3, 19: 0, 20: 0, 21: 1, 22: 1, 23: 0}, 'domestic_split': {'domestic': {'count': 6, 'value': 1609.2799999999988}, 'international': {'count': 6, 'value': 70155.90999999999}}, 'extremes': {'largest_deposit': transaction_id                          TX000149
account_id                              ACC12345
tx_datetime                  2025-06-04 03:06:54
amount                                  78697.12
channel                                      EFT
counterparty_country_code                     DE
Name: 148, dtype: object, 'largest_withdrawal': transaction_id                          TX000500
account_id                              ACC12345
tx_datetime                  2025-06-24 21:15:24
amount                                 -10

In [None]:
# ----------  1. RULE-BASED SPOTLIGHTS  ----------
from datetime import timedelta

def spotlights(df_window: pd.DataFrame, df_history: pd.DataFrame) -> dict:
    """
    Return boolean flags + quantitative context for UI badges.
    """
    out = {}
    
    # Burst: compare hourly max of window vs 90-day median of hourly counts
    window_hourly = df_window.tx_datetime.dt.hour.value_counts()
    hist_90d_start = df_window.tx_datetime.max() - timedelta(days=90)
    historic = df_history[df_history.tx_datetime >= hist_90d_start]
    hist_hourly_med = historic.tx_datetime.dt.hour.value_counts().median()
    burst_score = window_hourly.max() / max(hist_hourly_med, 1)  # avoid divide-by-zero
    
    out["hourly_burst"] = {
        "flag": burst_score > 3,
        "score": burst_score,
        "hour": window_hourly.idxmax()
    }
    
    # Inflow/outflow imbalance
    dep_sum = df_window[df_window.amount > 0].amount.sum()
    wd_sum  = abs(df_window[df_window.amount < 0].amount.sum())
    ratio   = wd_sum / dep_sum if dep_sum else float('inf')
    out["imbalance"] = {"flag": ratio > 1.2, "ratio": ratio}
    
    # Extreme deposit percentile
    p95 = df_history[df_history.amount > 0].amount.quantile(0.95)
    max_dep = df_window[df_window.amount > 0].amount.max()
    out["extreme_deposit"] = {"flag": max_dep > p95, "value": max_dep, "p95": p95}
    
    return out


# ----------  2. LLM NARRATIVE (optional)  ----------
import openai   # pip install --upgrade openai
import json

def narrative(metrics: dict, spotlights: dict) -> str:
    """
    Send metrics + spotlight flags to GPT and receive a short compliance note.
    """
    prompt = f"""You are an AML analyst.
    The following JSON contains KPI metrics and spotlight flags for a
    30-day transaction window. Write a concise, professional summary that:
    • Mentions any flagged anomalies.
    • Quantifies them (values, percentages, timestamps).
    • Uses bullet points; max 120 words.

    JSON:
    {json.dumps({"metrics": metrics, "spotlights": spotlights}, indent=2)}
    """
    resp = openai.ChatCompletion.create(
          model="gpt-4o-mini",
          messages=[{"role": "user", "content": prompt}],
          max_tokens=200,
          temperature=0.2
    )
    return resp.choices[0].message.content.strip()


In [None]:
def totals(df):
    by_type = df.groupby("is_deposit")["amount"].agg(['count','sum'])
    return by_type.loc[True], by_type.loc[False]

def hourly(df):
    return df.groupby("hour")["amount"].size()

def home_vs_away(df):
    g = df.groupby("is_domestic")["amount"].agg(['count','sum'])
    return g.loc[True], g.loc[False]

def extremes(df):
    largest_dep = df[df.amount>0].nlargest(1, "amount")
    largest_wd  = df[df.amount<0].nsmallest(1, "amount")
    return largest_dep, largest_wd

def channels(df):
    return df.groupby("channel")["amount"].agg(['count','sum']).sort_values('sum', ascending=False)
