# Applied Data Finance ML Models

This Snowflake Notebook trains and registers three models used by the ADF Intelligence Agent:

1. Payment volume forecast for servicing cash-flow planning
2. Borrower risk classification to flag likely delinquencies
3. Collections promise-to-pay success estimator

Run the cells sequentially after executing the SQL setup scripts so the underlying tables contain synthetic data.


In [None]:
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import col, date_trunc
from snowflake.ml.modeling.linear_model import LinearRegression
from snowflake.ml.modeling.ensemble import RandomForestClassifier
from snowflake.ml.modeling.linear_model import LogisticRegression
from snowflake.ml.registry import Registry
from sklearn.model_selection import train_test_split
import pandas as pd

session = get_active_session()
reg = Registry(session)

DATABASE = "ADF_INTELLIGENCE"
SCHEMA = "RAW"
ANALYTICS_SCHEMA = "ANALYTICS"
WAREHOUSE = "ADF_SI_WH"

print(f"Using database={DATABASE}, schema={SCHEMA}, warehouse={WAREHOUSE}")


## 1. Payment Volume Forecast


In [None]:
payment_sf = session.sql(
    f"""
    SELECT
        DATE_TRUNC('month', payment_date) AS payment_month,
        COUNT(DISTINCT loan_id) AS active_loans,
        SUM(amount) AS total_payments
    FROM {DATABASE}.{SCHEMA}.PAYMENT_HISTORY
    GROUP BY 1
    ORDER BY 1
    """
)

payment_pd = payment_sf.to_pandas()
payment_pd["MONTH_INDEX"] = (payment_pd["PAYMENT_MONTH"] - payment_pd["PAYMENT_MONTH"].min()).dt.days / 30
payment_pd


In [None]:
X = payment_pd[["MONTH_INDEX", "ACTIVE_LOANS"]]
y = payment_pd["TOTAL_PAYMENTS"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

payment_model = LinearRegression()
payment_model.fit(X_train, y_train)
print("R^2 on holdout:", payment_model.score(X_test, y_test))


In [None]:
payment_model.save(session=session,
                   name="PAYMENT_VOLUME_FORECASTER",
                   version_name="v1",
                   replace=True)

prediction_example = payment_model.predict(X_test.head(5))
pd.DataFrame({
    "MONTH_INDEX": X_test.head(5)["MONTH_INDEX"],
    "ACTIVE_LOANS": X_test.head(5)["ACTIVE_LOANS"],
    "PREDICTED_PAYMENTS": prediction_example
})


## 2. Borrower Risk Classification


In [None]:
risk_sf = session.sql(
    f"""
    SELECT
        c.customer_id,
        c.credit_score,
        c.annual_income,
        COALESCE(SUM(l.outstanding_principal), 0) AS total_outstanding,
        COUNT_IF(l.servicing_status = 'DELINQUENT') AS delinquent_loans
    FROM {DATABASE}.{SCHEMA}.CUSTOMERS c
    LEFT JOIN {DATABASE}.{SCHEMA}.LOAN_ACCOUNTS l ON c.customer_id = l.customer_id
    GROUP BY 1,2,3
    HAVING COUNT(*) > 0
    """
)

risk_pd = risk_sf.to_pandas()
risk_pd["TARGET_AT_RISK"] = (risk_pd["DELINQUENT_LOANS"] > 0).astype(int)
risk_pd.head()


In [None]:
feature_cols = ["CREDIT_SCORE", "ANNUAL_INCOME", "TOTAL_OUTSTANDING"]
X = risk_pd[feature_cols]
y = risk_pd["TARGET_AT_RISK"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=7, stratify=y)

risk_model = RandomForestClassifier(n_estimators=200, max_depth=6, random_state=7)
risk_model.fit(X_train, y_train)
print("Accuracy:", risk_model.score(X_test, y_test))


In [None]:
risk_model.save(session=session,
                 name="BORROWER_RISK_MODEL",
                 version_name="v1",
                 replace=True)

risk_probs = risk_model.predict_proba(X_test.head(5))[:, 1]
pd.DataFrame({
    "CUSTOMER_ID": risk_pd.iloc[X_test.head(5).index]["CUSTOMER_ID"],
    "P_DEFAULT": risk_probs
})


## 3. Collections Promise-to-Pay Success


In [None]:
collections_sf = session.sql(
    f"""
    SELECT
        l.loan_id,
        l.delinquency_bucket,
        l.outstanding_principal,
        COUNT(DISTINCT p.payment_id) AS payment_count,
        AVG(p.amount) AS avg_payment,
        COUNT_IF(c.promise_to_pay_date IS NOT NULL) AS ptp_events,
        COUNT_IF(c.outcome = 'PAYMENT_MADE') AS immediate_payments,
        MAX(CASE WHEN c.promise_to_pay_date IS NOT NULL THEN 1 ELSE 0 END) AS target_ptp
    FROM {DATABASE}.{SCHEMA}.LOAN_ACCOUNTS l
    LEFT JOIN {DATABASE}.{SCHEMA}.PAYMENT_HISTORY p ON l.loan_id = p.loan_id
    LEFT JOIN {DATABASE}.{SCHEMA}.COLLECTION_EVENTS c ON l.loan_id = c.loan_id
    WHERE l.servicing_status <> 'CHARGED_OFF'
    GROUP BY 1,2,3
    HAVING payment_count > 0
    """
)

collections_pd = collections_sf.to_pandas()
collections_pd.head()


In [None]:
collections_features = pd.get_dummies(collections_pd, columns=["DELINQUENCY_BUCKET"], dummy_na=True)
feature_cols = [col for col in collections_features.columns if col.startswith("DELINQUENCY_BUCKET_")]
feature_cols += ["OUTSTANDING_PRINCIPAL", "PAYMENT_COUNT", "AVG_PAYMENT", "PTP_EVENTS", "IMMEDIATE_PAYMENTS"]

X = collections_features[feature_cols]
y = collections_features["TARGET_PTP"]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=11, stratify=y)

ptp_model = LogisticRegression(max_iter=200)
ptp_model.fit(X_train, y_train)
print("ROC-AUC:", ptp_model.score(X_test, y_test))


In [None]:
ptp_model.save(session=session,
                name="COLLECTION_SUCCESS_MODEL",
                version_name="v1",
                replace=True)

ptp_scores = ptp_model.predict_proba(X_test.head(5))[:, 1]
pd.DataFrame({
    "LOAN_ID": collections_pd.iloc[X_test.head(5).index]["LOAN_ID"],
    "P_PROMISE_TO_PAY": ptp_scores
})


### Notebook Completion

After all cells run successfully:

1. Confirm the three models (`PAYMENT_VOLUME_FORECASTER`, `BORROWER_RISK_MODEL`, `COLLECTION_SUCCESS_MODEL`) exist in the Snowflake Model Registry.
2. Execute `sql/ml/07_create_model_wrapper_functions.sql` to expose them as stored procedures.
3. Re-run `sql/agent/08_create_intelligence_agent.sql` if you want the agent to immediately reference the refreshed models.
