In [None]:
# Cell 1 - config
import pyodbc, math
import pandas as pd, numpy as np
from scipy import stats
from datetime import date, timedelta

# === EDIT THESE ===
ODBC_DSN = "DSN=YourHiveDSN;UID=your_user;PWD=your_password"
TXN_TABLE = "db.txns"                  # raw transaction table (partitioned by txn_date)
DAILY_AGG_TABLE = "db.cohort_daily_agg"
COHORT_STATS_TABLE = "db.cohort_stats_for_test"
COHORT_FLAGS_TABLE = "db.cohort_drift_flags"
# ==================

# Parameters
RECENT_DAYS = 7
BASELINE_DAYS = 90
MIN_N_RECENT = 30
MIN_N_BASE = 100
ALPHA = 0.01
MIN_EFFECT_LOG = 0.5   # ~65% increase
TODAY = date.today()   # job date - adjust if testing historic runs




In [None]:
# Cell 2 - helpers
def get_conn():
    return pyodbc.connect(ODBC_DSN, autocommit=True)

def run_sql(conn, sql):
    """Run a SQL statement (DDL/aggregation) via ODBC. Returns nothing."""
    cur = conn.cursor()
    cur.execute(sql)
    cur.close()

def fetch_df(conn, sql):
    """Run a SELECT and return pandas DataFrame."""
    return pd.read_sql(sql, conn)


In [None]:
# Cell 3 - create/overwrite daily cohort aggregates for the needed window
conn = get_conn()

start_scan_date = (TODAY - timedelta(days=BASELINE_DAYS + RECENT_DAYS - 1)).isoformat()

daily_agg_sql = f"""
-- Compute daily cohort aggregates; adapt cohort definition as needed.
INSERT OVERWRITE TABLE {DAILY_AGG_TABLE}
PARTITION (txn_date)
SELECT
  concat(
    CASE WHEN days_since_account_opened <= 30 THEN 'new' WHEN days_since_account_opened <= 180 THEN 'mid' ELSE 'old' END,
    '_',
    CASE WHEN monthly_spend < 500 THEN 'low' WHEN monthly_spend < 2000 THEN 'med' ELSE 'high' END,
    '_', channel, '_', merchant_state
  ) as cohort_key,
  to_date(txn_ts) as txn_date,
  count(*) as n,
  sum(log(amount + 1)) as sum_log_amt,
  sum(log(amount + 1) * log(amount + 1)) as sum_sq_log_amt
FROM {TXN_TABLE}
WHERE to_date(txn_ts) >= '{start_scan_date}'
GROUP BY concat(
    CASE WHEN days_since_account_opened <= 30 THEN 'new' WHEN days_since_account_opened <= 180 THEN 'mid' ELSE 'old' END,
    '_',
    CASE WHEN monthly_spend < 500 THEN 'low' WHEN monthly_spend < 2000 THEN 'med' ELSE 'high' END,
    '_', channel, '_', merchant_state
  ), to_date(txn_ts)
;
"""

print("Submitting daily aggregate job to Hive (this runs in Hive).")
run_sql(conn, daily_agg_sql)
print("Daily aggregates written (check table):", DAILY_AGG_TABLE)
conn.close()


In [None]:
# Cell 4 - compute cohort-level aggregated stats in Hive
conn = get_conn()

recent_start = (TODAY - timedelta(days=RECENT_DAYS-1)).isoformat()
recent_end = TODAY.isoformat()
baseline_start = (TODAY - timedelta(days=BASELINE_DAYS + RECENT_DAYS -1)).isoformat()
baseline_end = (TODAY - timedelta(days=RECENT_DAYS)).isoformat()

cohort_stats_sql = f"""
CREATE TABLE IF NOT EXISTS {COHORT_STATS_TABLE} (
  cohort_key STRING,
  recent_n BIGINT, recent_sum DOUBLE, recent_sum_sq DOUBLE,
  base_n BIGINT, base_sum DOUBLE, base_sum_sq DOUBLE,
  recent_mean DOUBLE, base_mean DOUBLE, recent_var DOUBLE, base_var DOUBLE
) STORED AS PARQUET
;

INSERT OVERWRITE TABLE {COHORT_STATS_TABLE}
SELECT
  r.cohort_key,
  r.recent_n, r.recent_sum, r.recent_sum_sq,
  b.base_n, b.base_sum, b.base_sum_sq,
  CASE WHEN r.recent_n > 0 THEN r.recent_sum / r.recent_n ELSE NULL END as recent_mean,
  CASE WHEN b.base_n > 0 THEN b.base_sum / b.base_n ELSE NULL END as base_mean,
  CASE WHEN r.recent_n > 1 THEN (r.recent_sum_sq - (r.recent_sum*r.recent_sum)/r.recent_n) / (r.recent_n - 1) ELSE NULL END as recent_var,
  CASE WHEN b.base_n > 1 THEN (b.base_sum_sq - (b.base_sum*b.base_sum)/b.base_n) / (b.base_n - 1) ELSE NULL END as base_var
FROM
  ( SELECT cohort_key,
           sum(n) as recent_n, sum(sum_log_amt) as recent_sum, sum(sum_sq_log_amt) as recent_sum_sq
    FROM {DAILY_AGG_TABLE}
    WHERE txn_date >= '{recent_start}' AND txn_date <= '{recent_end}'
    GROUP BY cohort_key
  ) r
JOIN
  ( SELECT cohort_key,
           sum(n) as base_n, sum(sum_log_amt) as base_sum, sum(sum_sq_log_amt) as base_sum_sq
    FROM {DAILY_AGG_TABLE}
    WHERE txn_date >= '{baseline_start}' AND txn_date <= '{baseline_end}'
    GROUP BY cohort_key
  ) b
ON r.cohort_key = b.cohort_key
;
"""

print("Running cohort stats job in Hive (small).")
run_sql(conn, cohort_stats_sql)
print("Cohort stats table written:", COHORT_STATS_TABLE)
conn.close()


In [None]:
# Cell 5 - fetch cohort stats to pandas
conn = get_conn()
sql = f"SELECT cohort_key, recent_n, recent_mean, recent_var, base_n, base_mean, base_var FROM {COHORT_STATS_TABLE}"
df = fetch_df(conn, sql)
print("Rows fetched:", len(df))
conn.close()

# quick sanity
df.head()


In [None]:
# Cell 6 - compute stats and flags
def compute_flags(df, alpha=ALPHA, min_effect=MIN_EFFECT_LOG, min_n_recent=MIN_N_RECENT, min_n_base=MIN_N_BASE):
    df2 = df.copy()
    # filter small counts
    df2 = df2[(df2['recent_n'] >= min_n_recent) & (df2['base_n'] >= min_n_base)].reset_index(drop=True)
    # safe variance
    df2['recent_var'] = df2['recent_var'].replace({0:1e-9}).fillna(1e-9)
    df2['base_var']   = df2['base_var'].replace({0:1e-9}).fillna(1e-9)
    # t-stat
    df2['denom'] = np.sqrt(df2['recent_var']/df2['recent_n'] + df2['base_var']/df2['base_n'])
    df2['t_stat'] = (df2['recent_mean'] - df2['base_mean']) / df2['denom']
    # Welch df
    num = (df2['recent_var']/df2['recent_n'] + df2['base_var']/df2['base_n'])**2
    den = ((df2['recent_var']/df2['recent_n'])**2) / (df2['recent_n'] - 1) + ((df2['base_var']/df2['base_n'])**2) / (df2['base_n'] - 1)
    df2['df'] = num / den
    df2['df'] = df2['df'].replace([np.inf, -np.inf], np.nan).fillna(1.0)
    # p-value two-sided
    df2['p_value'] = df2.apply(lambda r: 2.0 * stats.t.sf(abs(r['t_stat']), df=r['df']), axis=1)
    df2['mean_diff'] = df2['recent_mean'] - df2['base_mean']
    df2['drift_flag'] = ((df2['p_value'] < alpha) & (df2['mean_diff'] >= min_effect))
    return df2

df_flags = compute_flags(df)
print("Cohorts evaluated:", len(df), "Flags:", df_flags['drift_flag'].sum())
df_flags.sort_values('p_value').head(10)


In [None]:
# Cell 7 - write small flags back to Hive via batched INSERTs
def write_flags_odbc(df_flags, flags_table=COHORT_FLAGS_TABLE, run_date=TODAY.isoformat(), batch=200):
    conn = get_conn()
    cur = conn.cursor()
    # create table if not exists (simple)
    create_sql = f"""
    CREATE TABLE IF NOT EXISTS {flags_table} (
      cohort_key STRING, run_date DATE, recent_n BIGINT, base_n BIGINT,
      recent_mean DOUBLE, base_mean DOUBLE, recent_var DOUBLE, base_var DOUBLE,
      t_stat DOUBLE, df DOUBLE, p_value DOUBLE, mean_diff DOUBLE, drift_flag BOOLEAN
    ) STORED AS PARQUET
    """
    cur.execute(create_sql)
    # prepare rows
    rows = []
    for _, r in df_flags.iterrows():
        rows.append((
            str(r['cohort_key']).replace("'", "''"), run_date,
            int(r['recent_n']), int(r['base_n']),
            float(r['recent_mean']), float(r['base_mean']),
            float(r['recent_var']), float(r['base_var']),
            float(r['t_stat']), float(r['df']),
            float(r['p_value']), float(r['mean_diff']),
            bool(r['drift_flag'])
        ))
        if len(rows) >= batch:
            vals = ",".join(["('{}','{}',{},{},{:.12g},{:.12g},{:.12g},{:.12g},{:.12g},{:.12g},{:.12g},{:.12g},{})".format(*row) for row in rows])
            sql = f"INSERT INTO TABLE {flags_table} VALUES {vals}"
            cur.execute(sql)
            rows = []
    if rows:
        vals = ",".join(["('{}','{}',{},{},{:.12g},{:.12g},{:.12g},{:.12g},{:.12g},{:.12g},{:.12g},{:.12g},{})".format(*row) for row in rows])
        sql = f"INSERT INTO TABLE {flags_table} VALUES {vals}"
        cur.execute(sql)
    cur.close()
    conn.close()
    print("Wrote flags to Hive table:", flags_table)

# call it (writes all cohorts back; but only a few rows per day will have drift_flag=True)
write_flags_odbc(df_flags)


In [None]:
# Cell 8 - quick backtest SQL (runs inside Hive). Edit FRAUD_TXN_TABLE to your fraud-labeled txn table.
FRAUD_TXN_TABLE = "db.txns"   # if your txns contain fraud_flag column use this; otherwise adjust

W = 14  # lookahead days
# Build a Hive query that finds, for each flagged cohort on run_date, whether any fraud txn occurs in next W days.
backtest_sql = f"""
SELECT f.cohort_key, f.run_date, f.drift_flag,
       SUM(CASE WHEN t.fraud_flag = 1 AND to_date(t.txn_ts) BETWEEN date_add(f.run_date, 0) AND date_add(f.run_date, {W}) THEN 1 ELSE 0 END) as fraud_count_in_window
FROM {COHORT_FLAGS_TABLE} f
LEFT JOIN {FRAUD_TXN_TABLE} t
  ON concat(
        CASE WHEN t.days_since_account_opened <= 30 THEN 'new' WHEN t.days_since_account_opened <= 180 THEN 'mid' ELSE 'old' END,
        '_',
        CASE WHEN t.monthly_spend < 500 THEN 'low' WHEN t.monthly_spend < 2000 THEN 'med' ELSE 'high' END,
        '_', t.channel, '_', t.merchant_state
     ) = f.cohort_key
GROUP BY f.cohort_key, f.run_date, f.drift_flag
HAVING f.run_date = '{TODAY.isoformat()}'
;
"""

print("Submitting backtest SQL to Hive; returns a cohort-level fraud count in the lookahead window.")
conn = get_conn()
df_backtest = fetch_df(conn, backtest_sql)
conn.close()
print(df_backtest.head())
print("Flagged cohorts with any fraud in lookahead:", (df_backtest['fraud_count_in_window']>0).sum(), "out of", len(df_backtest))
