# Mountain America Credit Union - ML Models & Feature Store

## Overview
This notebook demonstrates the creation of ML models and feature store entities for the MACU Intelligence Agent. It covers:
1. Feature Store Setup with Entity and FeatureView definitions
2. Training ML models for loan risk, fraud detection, and churn prediction
3. Registering models in Snowflake Model Registry
4. Creating online inference capabilities

---

In [None]:
# Import required libraries
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *
import pandas as pd
import numpy as np

# Get active Snowpark session
session = get_active_session()
print(f"Connected to Snowflake: {session.get_current_database()}.{session.get_current_schema()}")

In [None]:
%%sql -r setup_context
USE DATABASE MACU_INTELLIGENCE;
USE SCHEMA FEATURE_STORE;
USE WAREHOUSE MACU_FEATURE_WH;

## Part 1: Feature Store Setup

The Snowflake Feature Store provides a centralized repository for feature management, enabling:
- Feature reuse across ML projects
- Point-in-time correct feature retrieval for training
- Online serving for real-time inference
- Feature versioning and lineage tracking

In [None]:
# Import Feature Store components
from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity
from snowflake.ml.registry import Registry
from datetime import datetime, timedelta

# Initialize Feature Store
fs = FeatureStore(
    session=session,
    database="MACU_INTELLIGENCE",
    name="FEATURE_STORE",
    default_warehouse="MACU_FEATURE_WH"
)

print("Feature Store initialized successfully")

In [None]:
# Define Member Entity
member_entity = Entity(
    name="MEMBER",
    join_keys=["MEMBER_ID"],
    desc="Credit union member entity for all member-level features"
)

# Register the entity
fs.register_entity(member_entity)
print("Member entity registered")

In [None]:
# Create Member Profile Feature View
member_profile_query = """
SELECT
    m.member_id,
    DATEDIFF('day', m.membership_date, CURRENT_DATE()) as tenure_days,
    m.credit_score,
    m.income_verified as verified_income,
    CASE WHEN m.digital_banking_enrolled THEN 1 ELSE 0 END as is_digital,
    COALESCE(acct.num_accounts, 0) as num_accounts,
    COALESCE(acct.total_balance, 0) as total_deposit_balance,
    CURRENT_TIMESTAMP() as feature_timestamp
FROM MACU_INTELLIGENCE.RAW.MEMBERS m
LEFT JOIN (
    SELECT member_id, COUNT(*) as num_accounts, SUM(current_balance) as total_balance
    FROM MACU_INTELLIGENCE.RAW.ACCOUNTS
    WHERE account_status = 'ACTIVE' AND account_type != 'CREDIT_CARD'
    GROUP BY member_id
) acct ON m.member_id = acct.member_id
WHERE m.member_status = 'ACTIVE'
"""

# Create FeatureView
member_profile_fv = FeatureView(
    name="MEMBER_PROFILE_FEATURES",
    entities=[member_entity],
    feature_df=session.sql(member_profile_query),
    timestamp_col="FEATURE_TIMESTAMP",
    refresh_freq="1 day",
    desc="Member demographic and account profile features"
)

# Register the FeatureView
fs.register_feature_view(
    feature_view=member_profile_fv,
    version="v1",
    block=True
)

print("Member Profile FeatureView registered")

In [None]:
# Create Transaction Pattern Feature View
transaction_pattern_query = """
SELECT
    member_id,
    COUNT(*) as txn_count_30d,
    SUM(ABS(amount)) as txn_volume_30d,
    AVG(ABS(amount)) as avg_txn_amount,
    COUNT(CASE WHEN is_international THEN 1 END) as intl_txn_count,
    COUNT(CASE WHEN channel = 'MOBILE_APP' THEN 1 END) as mobile_txn_count,
    COUNT(DISTINCT merchant_name) as unique_merchants,
    MAX(ABS(amount)) as max_txn_amount,
    CURRENT_TIMESTAMP() as feature_timestamp
FROM MACU_INTELLIGENCE.RAW.TRANSACTIONS
WHERE transaction_date >= DATEADD('day', -30, CURRENT_DATE())
    AND status = 'COMPLETED'
GROUP BY member_id
"""

transaction_fv = FeatureView(
    name="TRANSACTION_PATTERN_FEATURES",
    entities=[member_entity],
    feature_df=session.sql(transaction_pattern_query),
    timestamp_col="FEATURE_TIMESTAMP",
    refresh_freq="1 hour",
    desc="30-day transaction pattern features"
)

fs.register_feature_view(
    feature_view=transaction_fv,
    version="v1",
    block=True
)

print("Transaction Pattern FeatureView registered")

In [None]:
# Create Loan Risk Feature View
loan_risk_query = """
SELECT
    m.member_id,
    COALESCE(l.num_active_loans, 0) as num_active_loans,
    COALESCE(l.total_loan_balance, 0) as total_loan_balance,
    COALESCE(l.avg_loan_rate, 0) as avg_loan_rate,
    COALESCE(l.total_late_payments, 0) as total_late_payments,
    COALESCE(l.max_days_past_due, 0) as max_days_past_due,
    CASE WHEN m.income_verified > 0 
         THEN COALESCE(l.total_monthly_payment, 0) / (m.income_verified / 12)
         ELSE 0 END as debt_to_income_ratio,
    CURRENT_TIMESTAMP() as feature_timestamp
FROM MACU_INTELLIGENCE.RAW.MEMBERS m
LEFT JOIN (
    SELECT 
        member_id,
        COUNT(*) as num_active_loans,
        SUM(current_balance) as total_loan_balance,
        AVG(interest_rate) as avg_loan_rate,
        SUM(times_30_days_late + times_60_days_late + times_90_days_late) as total_late_payments,
        MAX(days_past_due) as max_days_past_due,
        SUM(monthly_payment) as total_monthly_payment
    FROM MACU_INTELLIGENCE.RAW.LOANS
    WHERE loan_status = 'ACTIVE'
    GROUP BY member_id
) l ON m.member_id = l.member_id
WHERE m.member_status = 'ACTIVE'
"""

loan_risk_fv = FeatureView(
    name="LOAN_RISK_FEATURES",
    entities=[member_entity],
    feature_df=session.sql(loan_risk_query),
    timestamp_col="FEATURE_TIMESTAMP",
    refresh_freq="1 day",
    desc="Loan portfolio and risk features"
)

fs.register_feature_view(
    feature_view=loan_risk_fv,
    version="v1",
    block=True
)

print("Loan Risk FeatureView registered")

## Part 2: Model Training

We'll train three models:
1. **Loan Default Risk Model** - Predicts probability of loan default
2. **Fraud Detection Model** - Scores transaction fraud risk
3. **Member Churn Model** - Predicts member attrition likelihood

In [None]:
# Prepare training data for Loan Default Risk Model
loan_training_query = """
SELECT
    l.loan_id,
    m.credit_score,
    l.original_amount,
    l.interest_rate,
    l.term_months,
    DATEDIFF('day', m.membership_date, l.origination_date) as member_tenure_at_origination,
    m.income_verified,
    CASE WHEN m.income_verified > 0 THEN l.monthly_payment / (m.income_verified / 12) ELSE 0 END as dti_ratio,
    m.employment_status,
    l.collateral_type,
    COALESCE(prior.prior_loans, 0) as prior_loan_count,
    COALESCE(prior.prior_defaults, 0) as prior_defaults,
    CASE WHEN l.loan_status = 'DELINQUENT' OR l.days_past_due > 90 THEN 1 ELSE 0 END as is_default
FROM MACU_INTELLIGENCE.RAW.LOANS l
JOIN MACU_INTELLIGENCE.RAW.MEMBERS m ON l.member_id = m.member_id
LEFT JOIN (
    SELECT 
        member_id,
        origination_date as ref_date,
        COUNT(*) as prior_loans,
        SUM(CASE WHEN loan_status = 'DELINQUENT' THEN 1 ELSE 0 END) as prior_defaults
    FROM MACU_INTELLIGENCE.RAW.LOANS
    GROUP BY member_id, origination_date
) prior ON l.member_id = prior.member_id AND l.origination_date > prior.ref_date
"""

loan_training_df = session.sql(loan_training_query).to_pandas()
print(f"Loan training data: {len(loan_training_df)} records")
print(f"Default rate: {loan_training_df['IS_DEFAULT'].mean():.2%}")

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import roc_auc_score, precision_recall_curve, classification_report
import warnings
warnings.filterwarnings('ignore')

# Prepare features for loan default model
feature_cols = ['CREDIT_SCORE', 'ORIGINAL_AMOUNT', 'INTEREST_RATE', 'TERM_MONTHS', 
                'MEMBER_TENURE_AT_ORIGINATION', 'INCOME_VERIFIED', 'DTI_RATIO',
                'PRIOR_LOAN_COUNT', 'PRIOR_DEFAULTS']

# Encode categorical variables
le_employment = LabelEncoder()
loan_training_df['EMPLOYMENT_ENCODED'] = le_employment.fit_transform(loan_training_df['EMPLOYMENT_STATUS'].fillna('UNKNOWN'))

le_collateral = LabelEncoder()
loan_training_df['COLLATERAL_ENCODED'] = le_collateral.fit_transform(loan_training_df['COLLATERAL_TYPE'].fillna('NONE'))

feature_cols_extended = feature_cols + ['EMPLOYMENT_ENCODED', 'COLLATERAL_ENCODED']

# Prepare X and y
X = loan_training_df[feature_cols_extended].fillna(0)
y = loan_training_df['IS_DEFAULT']

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y)

print(f"Training set: {len(X_train)} samples")
print(f"Test set: {len(X_test)} samples")

In [None]:
# Train Loan Default Risk Model
loan_default_model = GradientBoostingClassifier(
    n_estimators=100,
    max_depth=5,
    learning_rate=0.1,
    min_samples_split=50,
    random_state=42
)

loan_default_model.fit(X_train, y_train)

# Evaluate model
y_pred_proba = loan_default_model.predict_proba(X_test)[:, 1]
auc_score = roc_auc_score(y_test, y_pred_proba)

print(f"Loan Default Model Performance:")
print(f"ROC-AUC Score: {auc_score:.4f}")
print(f"\nFeature Importances:")
for feat, imp in sorted(zip(feature_cols_extended, loan_default_model.feature_importances_), key=lambda x: -x[1]):
    print(f"  {feat}: {imp:.4f}")

In [None]:
# Prepare Churn Prediction Training Data
churn_training_query = """
SELECT
    m.member_id,
    DATEDIFF('day', m.membership_date, CURRENT_DATE()) as tenure_days,
    m.credit_score,
    CASE WHEN m.digital_banking_enrolled THEN 1 ELSE 0 END as is_digital,
    COALESCE(acct.product_count, 0) as product_count,
    COALESCE(txn.txn_count_30d, 0) as txn_count_30d,
    COALESCE(txn.days_since_last_txn, 999) as days_since_last_txn,
    COALESCE(support.support_count_90d, 0) as support_contacts,
    COALESCE(support.avg_satisfaction, 3) as avg_satisfaction,
    CASE WHEN m.member_status = 'INACTIVE' OR m.member_status = 'CLOSED' THEN 1 ELSE 0 END as churned
FROM MACU_INTELLIGENCE.RAW.MEMBERS m
LEFT JOIN (
    SELECT member_id, COUNT(DISTINCT account_type) as product_count
    FROM MACU_INTELLIGENCE.RAW.ACCOUNTS WHERE account_status = 'ACTIVE'
    GROUP BY member_id
) acct ON m.member_id = acct.member_id
LEFT JOIN (
    SELECT member_id, COUNT(*) as txn_count_30d,
           DATEDIFF('day', MAX(transaction_date), CURRENT_DATE()) as days_since_last_txn
    FROM MACU_INTELLIGENCE.RAW.TRANSACTIONS
    WHERE transaction_date >= DATEADD('day', -30, CURRENT_DATE())
    GROUP BY member_id
) txn ON m.member_id = txn.member_id
LEFT JOIN (
    SELECT member_id, COUNT(*) as support_count_90d, AVG(satisfaction_score) as avg_satisfaction
    FROM MACU_INTELLIGENCE.RAW.SUPPORT_INTERACTIONS
    WHERE interaction_date >= DATEADD('day', -90, CURRENT_DATE())
    GROUP BY member_id
) support ON m.member_id = support.member_id
"""

churn_training_df = session.sql(churn_training_query).to_pandas()
print(f"Churn training data: {len(churn_training_df)} records")
print(f"Churn rate: {churn_training_df['CHURNED'].mean():.2%}")

In [None]:
# Train Churn Prediction Model
churn_features = ['TENURE_DAYS', 'CREDIT_SCORE', 'IS_DIGITAL', 'PRODUCT_COUNT', 
                  'TXN_COUNT_30D', 'DAYS_SINCE_LAST_TXN', 'SUPPORT_CONTACTS', 'AVG_SATISFACTION']

X_churn = churn_training_df[churn_features].fillna(0)
y_churn = churn_training_df['CHURNED']

X_train_churn, X_test_churn, y_train_churn, y_test_churn = train_test_split(
    X_churn, y_churn, test_size=0.2, random_state=42, stratify=y_churn
)

churn_model = GradientBoostingClassifier(
    n_estimators=100,
    max_depth=4,
    learning_rate=0.1,
    random_state=42
)

churn_model.fit(X_train_churn, y_train_churn)

# Evaluate
y_churn_pred = churn_model.predict_proba(X_test_churn)[:, 1]
churn_auc = roc_auc_score(y_test_churn, y_churn_pred)

print(f"Churn Model Performance:")
print(f"ROC-AUC Score: {churn_auc:.4f}")
print(f"\nFeature Importances:")
for feat, imp in sorted(zip(churn_features, churn_model.feature_importances_), key=lambda x: -x[1]):
    print(f"  {feat}: {imp:.4f}")

## Part 3: Model Registry

Register the trained models in Snowflake Model Registry for versioning, deployment, and governance.

In [None]:
# Initialize Model Registry
registry = Registry(session=session, database_name="MACU_INTELLIGENCE", schema_name="ANALYTICS")

print("Model Registry initialized")
print(f"Registry location: MACU_INTELLIGENCE.ANALYTICS")

In [None]:
# Log Loan Default Model to Registry
from snowflake.ml.model import ModelVersion
from snowflake.ml._internal.utils import identifier

# Create sample input for schema inference
sample_input = X_train.head(10)

# Log the model
loan_model_version = registry.log_model(
    model=loan_default_model,
    model_name="LOAN_DEFAULT_RISK_MODEL",
    version_name="v1",
    sample_input_data=sample_input,
    comment="Gradient Boosting model for predicting loan default risk",
    metrics={
        "roc_auc": float(auc_score),
        "training_samples": len(X_train),
        "test_samples": len(X_test)
    }
)

print(f"Loan Default Model registered successfully")
print(f"Model: LOAN_DEFAULT_RISK_MODEL, Version: v1")
print(f"ROC-AUC: {auc_score:.4f}")

In [None]:
# Log Churn Model to Registry
churn_sample_input = X_train_churn.head(10)

churn_model_version = registry.log_model(
    model=churn_model,
    model_name="MEMBER_CHURN_MODEL",
    version_name="v1",
    sample_input_data=churn_sample_input,
    comment="Gradient Boosting model for predicting member churn",
    metrics={
        "roc_auc": float(churn_auc),
        "training_samples": len(X_train_churn),
        "test_samples": len(X_test_churn)
    }
)

print(f"Churn Model registered successfully")
print(f"Model: MEMBER_CHURN_MODEL, Version: v1")
print(f"ROC-AUC: {churn_auc:.4f}")

In [None]:
# List all registered models
models = registry.show_models()
print("\nRegistered Models in MACU_INTELLIGENCE.ANALYTICS:")
print(models[['name', 'comment']].to_string())

## Part 4: Feature Store Retrieval Demo

Demonstrate how to retrieve features for inference using the Feature Store.

In [None]:
# Example: Retrieve features for a set of members
example_members = session.sql("""
    SELECT DISTINCT member_id 
    FROM MACU_INTELLIGENCE.RAW.MEMBERS 
    WHERE member_status = 'ACTIVE' 
    LIMIT 5
""").to_pandas()

print("Example members for feature retrieval:")
print(example_members)

In [None]:
# Retrieve features from Feature Store
member_spine = session.create_dataframe(example_members)

# Get the registered feature views
member_profile_fv_ref = fs.get_feature_view("MEMBER_PROFILE_FEATURES", "v1")
transaction_fv_ref = fs.get_feature_view("TRANSACTION_PATTERN_FEATURES", "v1")
loan_risk_fv_ref = fs.get_feature_view("LOAN_RISK_FEATURES", "v1")

# Generate training dataset with all features
feature_df = fs.generate_dataset(
    spine_df=member_spine,
    features=[member_profile_fv_ref, transaction_fv_ref, loan_risk_fv_ref],
    spine_timestamp_col=None,
    include_feature_view_timestamp_col=False
)

result_df = feature_df.to_pandas()
print("\nRetrieved Features:")
print(result_df.to_string())

## Summary

This notebook demonstrated:

1. **Feature Store Setup**
   - Created Member entity
   - Registered 3 FeatureViews: Member Profile, Transaction Patterns, Loan Risk
   - Features are automatically refreshed on schedule

2. **Model Training**
   - Trained Loan Default Risk Model (ROC-AUC shown above)
   - Trained Member Churn Model (ROC-AUC shown above)

3. **Model Registry**
   - Registered both models with versioning
   - Captured training metrics and metadata

4. **Feature Retrieval**
   - Demonstrated point-in-time feature retrieval
   - Features ready for online inference

### Next Steps
- Deploy models for real-time inference via SQL functions
- Set up model monitoring dashboards
- Schedule feature refresh pipelines
- Implement A/B testing framework