In [1]:
import pandas as pd
import json
import dspy
from typing import Literal

In [2]:
SOURCE_FILE_PATH = "../../data/curated/churn_risk_scores.csv"
TARGET_FILE_PATH = "../../data/trusted/fact_churn_risk.csv"

In [3]:
df = pd.read_csv(SOURCE_FILE_PATH)

---

In [4]:
churn_risk_df = (
    df.sort_values("year_month")
        .groupby(["customer_id", "site_id"], as_index=False)
        .tail(1)
        .copy()
)

---

In [5]:
lm = dspy.LM("ollama_chat/gemma3:4b", api_base="http://localhost:11434", api_key="")
dspy.configure(lm=lm)

In [6]:
class ChurnExplanationSignature(dspy.Signature):
    """
    You are a churn risk explanation engine.

    The risk_category and probability are already calibrated.
    You MUST NOT change them.
    You MUST NOT recompute risk.

    Your task is ONLY to:
    1) Explain why this site has the assigned risk.
    2) Identify the most relevant deterioration drivers.
    3) Recommend concrete mitigation actions.

    You MUST return structured JSON only.
    Do NOT include text outside the defined fields.
    """

    site_data = dspy.InputField(
        desc="Structured JSON containing identity, calibrated churn risk, metrics snapshot, trend signals, and streak signals."
    )

    customer_id: str = dspy.OutputField(
        desc="Must match input customer_id exactly."
    )

    site_id: str = dspy.OutputField(
        desc="Must match input site_id exactly."
    )

    year_month: str = dspy.OutputField(
        desc="Must match input year_month exactly."
    )

    risk_category: str = dspy.OutputField(
        desc="Must match input risk_category exactly (low, medium, high)."
    )

    probability: float = dspy.OutputField(
        desc="Must match input probability exactly."
    )

    risk_summary: str = dspy.OutputField(
        desc="Short executive explanation (maximum 2 sentences)."
    )

    primary_drivers: list[str] = dspy.OutputField(
        desc="A JSON array with exactly 3 strings. Example: [\"driver 1\", \"driver 2\", \"driver 3\"]"
    )

    recommended_actions: list[str] = dspy.OutputField(
        desc="A JSON array with exactly 3 strings. Example: [\"action 1\", \"action 2\", \"action 3\"]"
    )

In [7]:
class ChurnExplanationModule(dspy.Module):
    def __init__(self):
        super().__init__()
        self.predict = dspy.Predict(ChurnExplanationSignature)

    def forward(self, site_data: dict):
        return self.predict(site_data=site_data)

In [8]:
explainer = ChurnExplanationModule()

In [9]:
def build_explanation_json(row: pd.Series) -> dict:
    """
    Build structured input JSON matching the ChurnExplanationSignature contract.
    """

    return {
        "customer_id": row["customer_id"],
        "site_id": row["site_id"],
        "year_month": row["year_month"],

        "risk_category": row["churn_risk"],
        "probability": float(row["calibrated_probability"]),
        "behavior_deterioration_score": float(row["behavior_deterioration_score"]),

        "metrics_snapshot": {
            "usage_volume": float(row["usage_volume"]),
            "bot_utilization": float(row["bot_utilization"]),
            "bot_performance": float(row["bot_performance"]),
            "bot_uptime": float(row["bot_uptime"]),
            "support_sla": float(row["support_sla"]),
            "nps": float(row["nps"])
        },

        "trend_signals": {
            "usage_slope_3m": float(row["usage_volume_slope_3m"]),
            "bot_performance_slope_3m": float(row["bot_performance_slope_3m"]),
            "bot_uptime_slope_3m": float(row["bot_uptime_slope_3m"]),
            "support_sla_slope_3m": float(row["support_sla_slope_3m"]),
            "nps_slope_3m": float(row["nps_slope_3m"])
        },

        "streak_signals": {
            "nps_consecutive_drops": int(row["nps_consec_down"]),
            "usage_consecutive_drops": int(row["usage_volume_consec_down"]),
            "upgrade_failures_consecutive_rise": int(row["upgrade_failures_consec_up"]),
            "lack_of_rca_consecutive_rise": int(row["lack_of_rca_consec_up"])
        }
    }

In [10]:
structured_inputs = churn_risk_df.apply(
    build_explanation_json,
    axis=1
)

In [11]:
%%time

results = structured_inputs.apply(
    lambda j: explainer(site_data=j)
)

CPU times: user 1.14 s, sys: 74.4 ms, total: 1.22 s
Wall time: 2.42 s


In [12]:
def prediction_to_dict(r):
    return {
        "customer_id": r.customer_id,
        "site_id": r.site_id,
        "year_month": r.year_month,
        "risk_category": r.risk_category,
        "probability": round(r.probability, 5),
        "risk_summary": r.risk_summary,
        "primary_drivers": r.primary_drivers,
        "recommended_actions": r.recommended_actions
    }

In [13]:
output = pd.DataFrame(results.apply(prediction_to_dict).tolist())

---

In [14]:
output.to_csv(TARGET_FILE_PATH, index=False)