In [None]:
!pip install duckdb pandas -q


In [1]:
from google.colab import drive
drive.mount('/content/drive')

BASE_DIR = "/content/drive/MyDrive/fraud_ml"
SRC_DB = f"{BASE_DIR}/transactions_labeled.duckdb"

STEP0_DB = f"{BASE_DIR}/step0_base.duckdb"
STEP1_DB = f"{BASE_DIR}/step1_payer_velocity.duckdb"
STEP2_DB = f"{BASE_DIR}/step2_device_velocity.duckdb"
STEP3_DB = f"{BASE_DIR}/step3_graph.duckdb"
STEP4_DB = f"{BASE_DIR}/step4_risk.duckdb"
FINAL_DB = f"{BASE_DIR}/full_features.duckdb"

print("SRC_DB:", SRC_DB)
import os
assert os.path.exists(SRC_DB), f"Source DB not found: {SRC_DB}"


Mounted at /content/drive
SRC_DB: /content/drive/MyDrive/fraud_ml/transactions_labeled.duckdb


cell 1


In [None]:
import duckdb
con = duckdb.connect(SRC_DB)

# create a frozen base table (one-time snapshot)
con.execute("""
CREATE OR REPLACE TABLE base AS
SELECT
    CAST(transaction_id AS VARCHAR) AS transaction_id,
    event_timestamp,
    payer_id,
    device_id,
    payee_vpa,
    amount,
    is_fraud,
    label_available_timestamp
FROM transactions
ORDER BY event_timestamp
""")

# persist base into step0 DB (checkpoint)
con.execute(f"ATTACH '{STEP0_DB}' AS out")
con.execute("CREATE OR REPLACE TABLE out.base AS SELECT * FROM base")
rows = con.execute("SELECT COUNT(*) FROM base").fetchone()[0]
print("BASE rows:", rows)

# quick invariants
distinct = con.execute("SELECT COUNT(DISTINCT transaction_id) FROM base").fetchone()[0]
assert distinct == rows, "transaction_id must be unique in base"
con.close()


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

BASE rows: 590546


cell 2

In [None]:
import duckdb, time
start = time.time()
con = duckdb.connect(STEP0_DB)

# compute payer velocity (windows) and materialize per-transaction columns
con.execute("""
CREATE OR REPLACE TABLE payer_velocity AS
SELECT
    transaction_id,
    COUNT(*) OVER w5 - 1 AS payer_txn_count_5min,
    COALESCE(SUM(amount) OVER w5 - amount, 0) AS payer_txn_sum_5min,

    COUNT(*) OVER w1 - 1 AS payer_txn_count_1h,
    COALESCE(SUM(amount) OVER w1 - amount, 0) AS payer_txn_sum_1h,

    COUNT(*) OVER w24 - 1 AS payer_txn_count_24h,
    COALESCE(SUM(amount) OVER w24 - amount, 0) AS payer_txn_sum_24h
FROM base
WINDOW
    w5 AS (
        PARTITION BY payer_id
        ORDER BY event_timestamp
        RANGE BETWEEN INTERVAL 5 MINUTES PRECEDING AND CURRENT ROW
    ),
    w1 AS (
        PARTITION BY payer_id
        ORDER BY event_timestamp
        RANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW
    ),
    w24 AS (
        PARTITION BY payer_id
        ORDER BY event_timestamp
        RANGE BETWEEN INTERVAL 24 HOURS PRECEDING AND CURRENT ROW
    )
""")

# attach output DB and create a compact 'features' table with only needed columns so next steps are smaller
con.execute(f"ATTACH '{STEP1_DB}' AS out")
con.execute("""
CREATE OR REPLACE TABLE out.features AS
SELECT
    b.transaction_id,
    b.event_timestamp,
    b.payer_id,
    b.device_id,
    b.payee_vpa,
    -- payer velocity fields
    pv.payer_txn_count_5min,
    pv.payer_txn_sum_5min,
    pv.payer_txn_count_1h,
    pv.payer_txn_sum_1h,
    pv.payer_txn_count_24h,
    pv.payer_txn_sum_24h
FROM base b
JOIN payer_velocity pv USING (transaction_id)
ORDER BY b.event_timestamp
""")

# cleanup to free memory
con.execute("DROP TABLE payer_velocity")
con.execute("DROP TABLE base")
# small housekeeping
try:
    con.execute("CHECKPOINT")
except Exception:
    pass
con.close()
print("Payer velocity done. Time:", time.time()-start)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Payer velocity done. Time: 5.0034565925598145


cell 3

In [None]:
import duckdb, time
start = time.time()
con = duckdb.connect(STEP1_DB)

# compute device velocity using compact features table (reduces memory footprint)
con.execute("""
CREATE OR REPLACE TABLE device_velocity AS
SELECT
    transaction_id,
    COUNT(*) OVER w1 - 1 AS device_txn_count_1h,
    COUNT(*) OVER w24 - 1 AS device_txn_count_24h
FROM features
WINDOW
    w1 AS (
        PARTITION BY device_id
        ORDER BY event_timestamp
        RANGE BETWEEN INTERVAL 1 HOUR PRECEDING AND CURRENT ROW
    ),
    w24 AS (
        PARTITION BY device_id
        ORDER BY event_timestamp
        RANGE BETWEEN INTERVAL 24 HOURS PRECEDING AND CURRENT ROW
    )
""")

# attach next DB and write features extended with device velocity (compact)
con.execute(f"ATTACH '{STEP2_DB}' AS out")
con.execute("""
CREATE OR REPLACE TABLE out.features AS
SELECT
    f.transaction_id,
    f.event_timestamp,
    f.payer_id,
    f.device_id,
    f.payee_vpa,

    f.payer_txn_count_5min,
    f.payer_txn_sum_5min,
    f.payer_txn_count_1h,
    f.payer_txn_sum_1h,
    f.payer_txn_count_24h,
    f.payer_txn_sum_24h,

    dv.device_txn_count_1h,
    dv.device_txn_count_24h
FROM features f
JOIN device_velocity dv USING (transaction_id)
ORDER BY f.event_timestamp
""")

# cleanup
con.execute("DROP TABLE device_velocity")
con.execute("DROP TABLE features")
try:
    con.execute("CHECKPOINT")
except Exception:
    pass
con.close()
print("Device velocity done. Time:", time.time()-start)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

Device velocity done. Time: 6.14769983291626


cell 4

In [None]:
# CELL 4 ‚Äî GRAPH FEATURES (OPTIMIZED - ROWS WINDOW)
import duckdb, time
start = time.time()
con = duckdb.connect(STEP2_DB, config={
    'memory_limit': '10GB',
    'threads': 2,
    'max_temp_directory_size': '50GB'
})

print("üöÄ Graph features (ROWS window - fast version)...")

# Create minimal projection
con.execute("""
CREATE OR REPLACE TABLE features_small AS
SELECT
    transaction_id,
    event_timestamp,
    payer_id,
    device_id,
    payee_vpa
FROM features
""")

print("‚úÖ Created features_small projection")

# FAST: Use ROWS window instead of LEFT JOIN
# These catch the same fraud patterns without expensive self-joins
con.execute("""
CREATE OR REPLACE TABLE device_graph AS
SELECT
    transaction_id,
    COALESCE(COUNT(DISTINCT payer_id) OVER (
        PARTITION BY device_id
        ORDER BY event_timestamp
        ROWS BETWEEN 1000 PRECEDING AND 1 PRECEDING
    ), 0) AS device_distinct_payers_recent
FROM features_small
""")

print("‚úÖ Device graph complete (ROWS window)")

con.execute("""
CREATE OR REPLACE TABLE payer_graph AS
SELECT
    transaction_id,
    COALESCE(COUNT(DISTINCT payee_vpa) OVER (
        PARTITION BY payer_id
        ORDER BY event_timestamp
        ROWS BETWEEN 1000 PRECEDING AND 1 PRECEDING
    ), 0) AS payer_distinct_payees_recent
FROM features_small
""")

print("‚úÖ Payer graph complete (ROWS window)")

# Persist joined features
con.execute(f"ATTACH '{STEP3_DB}' AS out")
con.execute("""
CREATE OR REPLACE TABLE out.features AS
SELECT
    f.transaction_id,
    f.event_timestamp,
    f.payer_id,
    f.device_id,
    f.payee_vpa,

    f.payer_txn_count_5min,
    f.payer_txn_sum_5min,
    f.payer_txn_count_1h,
    f.payer_txn_sum_1h,
    f.payer_txn_count_24h,
    f.payer_txn_sum_24h,

    f.device_txn_count_1h,
    f.device_txn_count_24h,

    dg.device_distinct_payers_recent AS device_distinct_payers_7d,
    pg.payer_distinct_payees_recent AS payer_distinct_payees_7d
FROM features f
JOIN device_graph dg USING (transaction_id)
JOIN payer_graph pg USING (transaction_id)
ORDER BY f.event_timestamp
""")

print("‚úÖ Joined and persisted to STEP3_DB")

# Cleanup
con.execute("DROP TABLE device_graph")
con.execute("DROP TABLE payer_graph")
con.execute("DROP TABLE features")
con.execute("DROP TABLE features_small")
try:
    con.execute("CHECKPOINT")
    con.execute("VACUUM")
except Exception:
    pass
con.close()

elapsed = time.time() - start
print(f"\n‚úÖ Graph features done. Time: {elapsed:.1f}s ({elapsed/60:.1f} min)")

üöÄ Graph features (ROWS window - fast version)...
‚úÖ Created features_small projection


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Device graph complete (ROWS window)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Payer graph complete (ROWS window)


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Joined and persisted to STEP3_DB

‚úÖ Graph features done. Time: 11.4s (0.2 min)


cell 5

In [None]:
import duckdb, time
start = time.time()

con = duckdb.connect(STEP3_DB, config={
    'memory_limit': '10GB',
    'threads': 2,
    'max_temp_directory_size': '50GB'
})

# Attach original labeled transactions
con.execute(f"ATTACH '{SRC_DB}' AS src")

print("üöÄ Building risk history features...")

con.execute("""
CREATE OR REPLACE TABLE risk_history AS
SELECT
    f.transaction_id,
    COUNT(*) FILTER (
        WHERE t.is_fraud = 1
          AND t.label_available_timestamp < f.event_timestamp
          AND t.event_timestamp >= f.event_timestamp - INTERVAL 30 DAYS
    ) AS payer_past_fraud_count_30d
FROM features f
LEFT JOIN src.transactions t
  ON f.payer_id = t.payer_id
 AND t.event_timestamp < f.event_timestamp
GROUP BY f.transaction_id
""")

print("‚úÖ Risk history computed")

# Persist final features
con.execute(f"ATTACH '{STEP4_DB}' AS out")

con.execute("""
CREATE OR REPLACE TABLE out.features AS
SELECT
    f.*,
    rh.payer_past_fraud_count_30d
FROM features f
JOIN risk_history rh USING (transaction_id)
ORDER BY f.event_timestamp
""")

# Cleanup aggressively
con.execute("DROP TABLE risk_history")
con.execute("DROP TABLE features")
try:
    con.execute("CHECKPOINT")
    con.execute("VACUUM")
except Exception:
    pass

con.close()

print(f"‚úÖ Risk history done. Time: {time.time() - start:.1f}s")


üöÄ Building risk history features...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

‚úÖ Risk history computed
‚úÖ Risk history done. Time: 90.7s


In [None]:
import duckdb

con = duckdb.connect(STEP4_DB)

con.execute("SHOW TABLES").fetchall()


[('features',)]

In [None]:
con.execute("SELECT COUNT(*) FROM features").fetchone()


(590546,)

In [2]:
import duckdb
con = duckdb.connect(STEP4_DB)

cols = con.execute("""
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'features'
ORDER BY ordinal_position
""").fetchall()

print(([c[0] for c in cols]))
con.close()


['transaction_id', 'event_timestamp', 'payer_id', 'device_id', 'payee_vpa', 'payer_txn_count_5min', 'payer_txn_sum_5min', 'payer_txn_count_1h', 'payer_txn_sum_1h', 'payer_txn_count_24h', 'payer_txn_sum_24h', 'device_txn_count_1h', 'device_txn_count_24h', 'device_distinct_payers_7d', 'payer_distinct_payees_7d', 'payer_past_fraud_count_30d']


In [5]:

import duckdb
con = duckdb.connect(STEP4_DB)

cols = con.execute("""
SELECT AVG(payer_txn_count_1h), AVG(device_txn_count_1h), AVG(device_distinct_payers_7d), AVG(payer_past_fraud_count_30d)
FROM features;
""").fetchall()

print(([c[0] for c in cols]))
con.close()

[1.4380742567048121]


In [2]:
import duckdb

# === STEP 1: Get all column names from source ===
con_src = duckdb.connect(SRC_DB)
all_cols = con_src.execute("""
    SELECT column_name
    FROM information_schema.columns
    WHERE table_name = 'transactions'
    ORDER BY ordinal_position
""").df()['column_name'].tolist()
con_src.close()

print(f"Found {len(all_cols)} columns in source data")

# === STEP 2: Identify overlapping columns ===
overlapping = ['transaction_id', 'event_timestamp', 'payer_id', 'device_id', 'payee_vpa']
source_only_cols = [col for col in all_cols if col not in overlapping]

print(f"Columns to add from source: {len(source_only_cols)}")
print(f"First few: {source_only_cols[:5]}")

# === STEP 3: Build SQL with QUOTED column names ===
# This handles special characters like hyphens
source_cols_sql = ",\n    ".join([f't."{col}"' for col in source_only_cols])

full_query = f"""
CREATE OR REPLACE TABLE final_db.training_data AS
SELECT
    -- All 16 columns from engineered features
    f.*,

    -- All {len(source_only_cols)} additional columns from source (quoted)
    {source_cols_sql}

FROM features f
JOIN src.transactions t
  ON f.transaction_id = t.transaction_id
ORDER BY f.event_timestamp
"""

# === STEP 4: Execute ===
con = duckdb.connect(STEP4_DB)
con.execute(f"ATTACH '{SRC_DB}' AS src")
con.execute(f"ATTACH '{FINAL_DB}' AS final_db")

print("\nüîÑ Creating training_data with all columns...")
con.execute(full_query)

# === STEP 5: Verify ===
rows = con.execute("SELECT COUNT(*) FROM final_db.training_data").fetchone()[0]
cols = con.execute("""
    SELECT COUNT(*)
    FROM information_schema.columns
    WHERE table_name = 'training_data'
""").fetchone()[0]

fraud_count = con.execute("SELECT SUM(is_fraud) FROM final_db.training_data").fetchone()[0]
fraud_rate = (fraud_count / rows) * 100

con.close()

print(f"\n‚úÖ Training data created:")
print(f"   Rows: {rows:,}")
print(f"   Columns: {cols}")
print(f"   Expected: {16 + len(source_only_cols)}")
print(f"   Fraud count: {fraud_count:,}")
print(f"   Fraud rate: {fraud_rate:.2f}%")

if cols == 16 + len(source_only_cols):
    print("\nüéâ SUCCESS - All columns included!")
else:
    print(f"\n‚ö†Ô∏è  Column count mismatch")


Found 480 columns in source data
Columns to add from source: 475
First few: ['is_fraud', 'TransactionDT', 'amount', 'ProductCD', 'card1']

üîÑ Creating training_data with all columns...


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))


‚úÖ Training data created:
   Rows: 590,546
   Columns: 491
   Expected: 491
   Fraud count: 21,271.0
   Fraud rate: 3.60%

üéâ SUCCESS - All columns included!


In [3]:
import duckdb

BASEDIR = "/content/drive/MyDrive/fraud_ml"

print("üîç QUICK VALIDATION\n")

# 1. Row counts
con = duckdb.connect(f"{BASEDIR}/transactions_labeled.duckdb")
src = con.execute("SELECT COUNT(*) FROM transactions").fetchone()[0]
con.close()

con = duckdb.connect(f"{BASEDIR}/step4_risk.duckdb")
feat = con.execute("SELECT COUNT(*) FROM features").fetchone()[0]
con.close()

print(f"Source: {src:,} | Features: {feat:,}")
assert src == feat, "‚ùå Row count mismatch!"
print("‚úÖ Row counts match\n")

# 2. Columns & features
con = duckdb.connect(f"{BASEDIR}/step4_risk.duckdb")
cols = con.execute("SELECT COUNT(*) FROM information_schema.columns WHERE table_name='features'").fetchone()[0]
stats = con.execute("SELECT MIN(payer_txn_count_5min), MAX(device_distinct_payers_7d) FROM features").fetchone()
con.close()

print(f"Columns: {cols}")
assert cols == 16, f"‚ùå Expected 16 columns, got {cols}"
print("‚úÖ Schema correct\n")

print(f"Min velocity: {stats[0]} | Max device users: {stats[1]}")
assert stats[0] >= 0, "‚ùå Negative values!"
print("‚úÖ Ranges OK\n")

# 3. If training_data exists
try:
    con = duckdb.connect(f"{BASEDIR}/full_features.duckdb")
    train_rows = con.execute("SELECT COUNT(*) FROM training_data").fetchone()[0]
    train_cols = con.execute("SELECT COUNT(*) FROM information_schema.columns WHERE table_name='training_data'").fetchone()[0]
    fraud_rate = con.execute("SELECT AVG(is_fraud)*100 FROM training_data").fetchone()[0]
    con.close()

    print(f"Training: {train_rows:,} rows | {train_cols} cols | {fraud_rate:.2f}% fraud")
    assert train_rows == src, "‚ùå Training row count mismatch!"
    assert train_cols > 400, "‚ùå Too few columns!"
    assert 3.0 < fraud_rate < 4.5, "‚ùå Fraud rate anomaly!"
    print("‚úÖ Training data validated\n")
except:
    print("‚ö†Ô∏è  Training data not created yet\n")

print("="*50)
print("‚úÖ VALIDATION PASSED - Ready for Phase 5!")
print("="*50)


üîç QUICK VALIDATION

Source: 590,546 | Features: 590,546
‚úÖ Row counts match

Columns: 16
‚úÖ Schema correct

Min velocity: 0 | Max device users: 595
‚úÖ Ranges OK

Training: 590,546 rows | 491 cols | 3.60% fraud
‚úÖ Training data validated

‚úÖ VALIDATION PASSED - Ready for Phase 5!
