# 04_Cost_Aware_Decision_Logic

In [0]:
# ============================================
# 04_Cost_Aware_Decision_Logic.ipynb
# --------------------------------------------
# Purpose:
#   Transform risk predictions into
#   cost-aware investigation decisions
#   under limited operational capacity.
#
# Key Idea:
#   Optimize expected financial loss,
#   NOT prediction accuracy.
#
# Output:
#   - decision_recommendations (Gold)
# ============================================

### Imports & Configuration

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

spark = SparkSession.getActiveSession()

CATALOG = "cost_aware_capstone"
SCHEMA = "risk_decisioning"

In [0]:

SILVER_TABLE = (
    "cost_aware_capstone.risk_decisioning."
    "silver_cost_aware_features"
)

RISK_TABLE = (
    "cost_aware_capstone.risk_decisioning."
    "ml_risk_predictions"
)

GOLD_TABLE = (
    "cost_aware_capstone.risk_decisioning."
    "gold_decision_recommendations"
)

#### Business Constraints

In [0]:
DAILY_INVESTIGATION_CAPACITY = 50

### Read Inputs

In [0]:
silver_df = spark.table(SILVER_TABLE)
risk_df = spark.table(RISK_TABLE)

df = (
    silver_df
    .join(risk_df, on="case_id", how="inner")
)

display(df)

### Expected Loss Modeling

Formula -> `Expected Loss if NOT investigated
= risk_probability * fraud_loss_if_missed`

In [0]:
decision_df = (
    df
    .withColumn(
        "expected_loss_if_ignored",
        col("risk_probability") * col("fraud_loss_if_missed")
    )
    .withColumn(
        "expected_savings_if_investigated",
        col("expected_loss_if_ignored") - col("investigation_cost")
    )
)

### Rank Cases by Financial Impact (rank by expected savings)

In [0]:
ranked_df = (
    decision_df
    .orderBy(col("expected_savings_if_investigated").desc())
)

### Apply Capacity Constraint (Optimization Step)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec = Window.orderBy(
    col("expected_savings_if_investigated").desc()
)

final_df = (
    ranked_df
    .withColumn("priority_rank", row_number().over(window_spec))
    .withColumn(
        "decision",
        col("priority_rank") <= lit(DAILY_INVESTIGATION_CAPACITY)
    )
    .withColumn(
        "decision",
        col("decision")
        .cast("int")
    )
)

### Final Gold output

In [0]:
gold_df = final_df.select(
    "case_id",
    "risk_probability",
    "investigation_cost",
    "fraud_loss_if_missed",
    "expected_loss_if_ignored",
    "expected_savings_if_investigated",
    "priority_rank",
    "decision"
)

### Write Gold Table

In [0]:
(
    gold_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable(GOLD_TABLE)
)

In [0]:
display(gold_df)

### Preview Decisions

In [0]:
display(spark.sql(f"""
    SELECT *
    FROM {GOLD_TABLE}
    ORDER BY expected_savings_if_investigated DESC
    LIMIT 10
"""))

-----