# DSE ↔ PROD Alert & Fraud Overlap — **CSV‑Only, Drag‑and‑Drop Notebook**

This notebook computes:
- Channel summaries for **PROD** (and **DSE** when provided)
- **Alert overlap**: intersection, DSE‑only, PROD‑only (+ percentages)
- **Fraud overlap** within alerts
- **Hour‑of‑day** distributions for intersections and mismatches

**How to use**
1) Fill the **Spark placeholders** in the next cell with your session code (Feedzai or Spark).
2) Set two GCS CSV addresses in the **CONFIG** (DSE is optional).
3) Run all cells. All key tables are shown inline and exported as CSVs under `overlap_outputs/`.

In [None]:
# =============================
# Spark / Feedzai PLACEHOLDERS
# =============================
# Replace the two lines below with your real session code.
# Example (Feedzai):
# from ds_api import feedzai, Server
# pulse = Server.connect(address="localhost", port=1899)
# feedzai = feedzai(pulse=pulse)
# new_spark_params = ["spark.executor.memory","8G","spark.dynamicAllocation.maxExecutors","100"]
# spark = feedzai.get_spark_session(new_spark_params)

# --- Optional local fallback for testing (uncomment if you want to run locally) ---
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName("DSE_PROD_Overlap").getOrCreate()


In [None]:
# =============================
#            CONFIG
# =============================
from IPython.display import display
from pyspark.sql import functions as F

CONFIG = {
    "paths": {
        "dse_csv":  None,
        "prod_csv": "gs://bucket/PROD_BASE.csv",
        "fraud_tags_csv": None
    },
    "csv_read": {"delimiter": ",", "header": True, "quote": '"', "escape": "\\", "inferSchema": True},
    "dse_cols":  {"lifecycle_id":"lifecycle_id","raw_score":"fraud_level_true_score","mt_score":"money_trick_score",
                  "fraud_label":"fraud_label","event_ts":"event_received_at","channel":"channel"},
    "prod_cols": {"lifecycle_id":"lifecycle_id","raw_score":"shadow_ob_score","mt_score":"mt_score_prod",
                  "fraud_label":"fraud_label_prod","event_ts":"event_received_at","channel":"channel"},
    "fraud_tag_cols": {"lifecycle_id":"lifecycle_id","fraud_label":"fraud_label"},
    "scaling": {"dse_raw_scale":True,"dse_mt_scale":True,"prod_raw_scale":True,"prod_mt_scale":True},
    "thresholds": {"DG":{"RS":980,"MT":765},"FD":{"RS":980,"MT":765},"CMB":{"RS":980,"MT":765}},
    "out_dir": "overlap_outputs"
}


In [None]:
# =============================
#            HELPERS
# =============================
from pyspark.sql import functions as F
import os

def ensure_outdir(path):
    os.makedirs(path, exist_ok=True)

def read_csv(path:str):
    if not path:
        return None
    opts = CONFIG["csv_read"]
    df = (spark.read.format("csv")
          .option("delimiter", opts["delimiter"])\
          .option("header", str(opts["header"]).lower())\
          .option("quote", opts["quote"])\
          .option("escape", opts["escape"])\
          .option("inferSchema", str(opts.get("inferSchema", True)).lower())\
          .load(path))
    return df

def coerce_boolean(df, col, out_col):
    return (df.withColumn(out_col,
            F.when(F.col(col).cast("boolean").isNotNull(), F.col(col).cast("boolean"))
             .when(F.col(col).cast("int")==1, True)
             .otherwise(False)))

def ensure_channel(df, channel_col):
    if channel_col in df.columns:
        return df
    raise ValueError("Channel column missing; add your derivation rule in ensure_channel().")

def scale_if_needed(df, col, flag):
    if col is None or col not in df.columns:
        return df
    return df.withColumn(col, (F.col(col) * 1000.0)) if flag else df

def build_alert_flag(df, raw_col, mt_col, channel_col, thresholds, out_flag):
    if raw_col not in df.columns or mt_col not in df.columns:
        return df.withColumn(out_flag, F.lit(False))
    cond = None
    for ch, thr in thresholds.items():
        piece = ((F.col(channel_col)==ch) & (F.col(raw_col)>=thr["RS"]) & (F.col(mt_col)>=thr["MT"]))
        cond = piece if cond is None else (cond | piece)
    return df.withColumn(out_flag, F.when(cond, True).otherwise(False))

def add_hour(df, ts_col, out_col="hour"):
    if ts_col not in df.columns:
        return df.withColumn(out_col, F.lit(None).cast("int"))
    return df.withColumn(out_col, F.hour(F.to_timestamp(F.col(ts_col))))

def summarise_counts(df, flag_col, fraud_col, by=["channel"]):
    return (df.groupBy(*by)
            .agg(F.count(F.lit(1)).alias("total"),
                 F.sum(F.when(F.col(flag_col)==True, 1).otherwise(0)).alias("alerts"),
                 F.sum(F.when((F.col(flag_col)==True) & (F.col(fraud_col)==True), 1).otherwise(0)).alias("fraud_in_alerts")))

def show_and_export(df, csv_name):
    out_dir = CONFIG["out_dir"]
    ensure_outdir(out_dir)
    pdf = df.toPandas()
    display(pdf.head(10))
    import os
    pdf.to_csv(os.path.join(out_dir, csv_name), index=False)
    print(f"Saved: {os.path.join(out_dir, csv_name)}")


In [None]:
# =============================
#              LOAD
# =============================
paths = CONFIG["paths"]
dse  = read_csv(paths["dse_csv"])   if paths["dse_csv"]  else None
prod = read_csv(paths["prod_csv"])  if paths["prod_csv"] else None
tags = read_csv(paths["fraud_tags_csv"]) if paths["fraud_tags_csv"] else None

print("Loaded -> DSE:", dse is not None, "| PROD:", prod is not None, "| TAGS:", tags is not None)
if prod is None:
    raise RuntimeError("PROD CSV is required to proceed.")


In [None]:
# =============================
#           PREPARE DSE
# =============================
dse_cols = CONFIG["dse_cols"]
if dse is not None:
    dse = (dse
           .withColumn(dse_cols["lifecycle_id"], F.col(dse_cols["lifecycle_id"]).cast("string"))
           .withColumnRenamed(dse_cols["lifecycle_id"], "lifecycle_id"))
    dse = scale_if_needed(dse, dse_cols["raw_score"], CONFIG["scaling"]["dse_raw_scale"])
    dse = scale_if_needed(dse, dse_cols["mt_score"],  CONFIG["scaling"]["dse_mt_scale"])
    dse = ensure_channel(dse, dse_cols["channel"])
    if dse_cols["channel"] != "channel":
        dse = dse.withColumnRenamed(dse_cols["channel"], "channel")
    if dse_cols["fraud_label"] in dse.columns:
        dse = coerce_boolean(dse, dse_cols["fraud_label"], "fraud_label_dse")
    else:
        dse = dse.withColumn("fraud_label_dse", F.lit(False))
    dse = build_alert_flag(dse, dse_cols["raw_score"], dse_cols["mt_score"], "channel", CONFIG["thresholds"], "is_alert_dse")
    dse = add_hour(dse, dse_cols["event_ts"], "hour")
    dse = dse.select("lifecycle_id","channel","hour",
                     F.col(dse_cols["raw_score"]).alias("raw_score_dse"),
                     F.col(dse_cols["mt_score"]).alias("mt_score_dse") if dse_cols["mt_score"] in dse.columns else F.lit(None).alias("mt_score_dse"),
                     "is_alert_dse","fraud_label_dse")
    print("DSE preview:")
    show_and_export(dse.limit(1000), "preview_dse.csv")
else:
    print("DSE not provided — continuing with PROD only.")


In [None]:
# =============================
#          PREPARE PROD
# =============================
prod_cols = CONFIG["prod_cols"]
prod = (prod
        .withColumn(prod_cols["lifecycle_id"], F.col(prod_cols["lifecycle_id"]).cast("string"))
        .withColumnRenamed(prod_cols["lifecycle_id"], "lifecycle_id"))
prod = scale_if_needed(prod, prod_cols["raw_score"], CONFIG["scaling"]["prod_raw_scale"])
prod = scale_if_needed(prod, prod_cols["mt_score"],  CONFIG["scaling"]["prod_mt_scale"])
prod = ensure_channel(prod, prod_cols["channel"])
if prod_cols["channel"] != "channel":
    prod = prod.withColumnRenamed(prod_cols["channel"], "channel")

# Attach/normalize fraud label
if prod_cols["fraud_label"] in prod.columns:
    prod = coerce_boolean(prod, prod_cols["fraud_label"], "fraud_label_prod")
elif tags is not None:
    tag_cols = CONFIG["fraud_tag_cols"]
    tags_use = (tags
                .select(F.col(tag_cols["lifecycle_id"]).cast("string").alias("lifecycle_id"),
                        F.col(tag_cols["fraud_label"]).alias("fraud_label_join")))
    prod = (prod.join(tags_use, "lifecycle_id", "left")
               .withColumn("fraud_label_prod",
                           F.when(F.col("fraud_label_join").cast("boolean").isNotNull(), F.col("fraud_label_join").cast("boolean"))
                            .when(F.col("fraud_label_join").cast("int")==1, True)
                            .otherwise(False))
               .drop("fraud_label_join"))
else:
    prod = prod.withColumn("fraud_label_prod", F.lit(False))

prod = build_alert_flag(prod, prod_cols["raw_score"], prod_cols["mt_score"], "channel", CONFIG["thresholds"], "is_alert_prod")
prod = add_hour(prod, prod_cols["event_ts"], "hour")
prod = prod.select("lifecycle_id","channel","hour",
                   F.col(prod_cols["raw_score"]).alias("raw_score_prod"),
                   F.col(prod_cols["mt_score"]).alias("mt_score_prod") if prod_cols["mt_score"] in prod.columns else F.lit(None).alias("mt_score_prod"),
                   "is_alert_prod","fraud_label_prod")
print("PROD preview:")
show_and_export(prod.limit(1000), "preview_prod.csv")


In [None]:
# =============================
#       CHANNEL SUMMARIES
# =============================
prod_summary = (prod.groupBy("channel")
                    .agg(F.count("*").alias("total"),
                         F.sum(F.when(F.col("is_alert_prod")==True, 1).otherwise(0)).alias("alerts"),
                         F.sum(F.when((F.col("is_alert_prod")==True) & (F.col("fraud_label_prod")==True), 1).otherwise(0)).alias("fraud_in_alerts")))
print("PROD summary by channel:")
show_and_export(prod_summary.orderBy("channel"), "prod_summary_by_channel.csv")

if 'dse' in globals() and dse is not None:
    dse_summary = (dse.groupBy("channel")
                      .agg(F.count("*").alias("total"),
                           F.sum(F.when(F.col("is_alert_dse")==True, 1).otherwise(0)).alias("alerts"),
                           F.sum(F.when((F.col("is_alert_dse")==True) & (F.col("fraud_label_dse")==True), 1).otherwise(0)).alias("fraud_in_alerts")))
    print("DSE summary by channel:")
    show_and_export(dse_summary.orderBy("channel"), "dse_summary_by_channel.csv")


In [None]:
# =============================
#         ALERT OVERLAPS
# =============================
if 'dse' in globals() and dse is not None:
    dse_alerts  = dse.filter(F.col("is_alert_dse")==True).select("lifecycle_id","channel","hour")
    prod_alerts = prod.filter(F.col("is_alert_prod")==True).select("lifecycle_id","channel","hour")

    inter = dse_alerts.join(prod_alerts, "lifecycle_id", "inner").withColumnRenamed("channel","channel_dse")
    dse_only  = dse_alerts.join(prod_alerts.select("lifecycle_id"), "lifecycle_id", "left_anti")
    prod_only = prod_alerts.join(dse_alerts.select("lifecycle_id"), "lifecycle_id", "left_anti")

    dse_alert_counts  = dse_alerts.groupBy("channel").agg(F.count("*").alias("dse_alerts"))
    prod_alert_counts = prod_alerts.groupBy("channel").agg(F.count("*").alias("prod_alerts"))
    inter_counts = inter.groupBy("channel_dse").agg(F.count("*").alias("intersection"))

    overlap_pct = (inter_counts
        .join(dse_alert_counts, inter_counts["channel_dse"]==dse_alert_counts["channel"], "left")
        .join(prod_alert_counts, inter_counts["channel_dse"]==prod_alert_counts["channel"], "left")
        .select(F.col("channel_dse").alias("channel"),
                "intersection",
                (F.col("intersection")/F.col("dse_alerts")).alias("overlap_vs_dse"),
                (F.col("intersection")/F.col("prod_alerts")).alias("overlap_vs_prod"))
        .orderBy("channel"))

    print("Overlap % by channel:")
    show_and_export(overlap_pct, "overlap_percentages.csv")
    print("DSE‑only alerts by channel:")
    show_and_export(dse_only.groupBy("channel").agg(F.count("*").alias("dse_only_alerts")).orderBy("channel"),
                    "dse_only_alerts.csv")
    print("PROD‑only alerts by channel:")
    show_and_export(prod_only.groupBy("channel").agg(F.count("*").alias("prod_only_alerts")).orderBy("channel"),
                    "prod_only_alerts.csv")
else:
    print("DSE not provided — overlaps will compute once DSE CSV is set.")


In [None]:
# =============================
#   FRAUD OVERLAP (IN ALERTS)
# =============================
if 'dse' in globals() and dse is not None:
    dse_alerts_fraud  = dse.filter((F.col("is_alert_dse")==True) & (F.col("fraud_label_dse")==True)).select("lifecycle_id","channel")
    prod_alerts_fraud = prod.filter((F.col("is_alert_prod")==True) & (F.col("fraud_label_prod")==True)).select("lifecycle_id","channel")

    fraud_inter = (dse_alerts_fraud.join(prod_alerts_fraud, "lifecycle_id", "inner")
                   .withColumnRenamed("channel","channel_dse")
                   .groupBy("channel_dse").agg(F.count("*").alias("fraud_intersection")).orderBy("channel_dse"))
    print("Fraud intersection (alerts) by channel:")
    show_and_export(fraud_inter, "fraud_intersection_by_channel.csv")

    dse_only_fraud = (dse_alerts_fraud.join(prod_alerts_fraud.select("lifecycle_id"), "lifecycle_id", "left_anti")
                      .groupBy("channel").agg(F.count("*").alias("dse_only_fraud_alerts")).orderBy("channel"))
    print("DSE-only fraud-in-alerts by channel:")
    show_and_export(dse_only_fraud, "dse_only_fraud_alerts.csv")

    prod_only_fraud = (prod_alerts_fraud.join(dse_alerts_fraud.select("lifecycle_id"), "lifecycle_id", "left_anti")
                       .groupBy("channel").agg(F.count("*").alias("prod_only_fraud_alerts")).orderBy("channel"))
    print("PROD-only fraud-in-alerts by channel:")
    show_and_export(prod_only_fraud, "prod_only_fraud_alerts.csv")


In [None]:
# =============================
#    HOUR-OF-DAY DISTRIBUTIONS
# =============================
if 'dse' in globals() and dse is not None:
    inter_hours   = inter.groupBy("hour").agg(F.count("*").alias("alerts_intersection_by_hour")).orderBy("hour")
    dse_only_hours  = dse_only.groupBy("hour").agg(F.count("*").alias("dse_only_by_hour")).orderBy("hour")
    prod_only_hours = prod_only.groupBy("hour").agg(F.count("*").alias("prod_only_by_hour")).orderBy("hour")
    print("Alerts intersection by hour:")
    show_and_export(inter_hours, "alerts_intersection_by_hour.csv")
    print("DSE-only alerts by hour:")
    show_and_export(dse_only_hours, "dse_only_by_hour.csv")
    print("PROD-only alerts by hour:")
    show_and_export(prod_only_hours, "prod_only_by_hour.csv")
