# LifeArc ML Pipeline - Complete Lifecycle in Snowflake

## End-to-End Machine Learning for Drug Discovery

This notebook demonstrates **production-grade ML workflows** using native Snowflake ML capabilities:

| Stage | Snowflake Capability | Why It Matters |
|-------|---------------------|----------------|
| 1. Discovery | Snowpark DataFrames | Scalable EDA on large datasets |
| 2. Feature Engineering | **Snowflake Feature Store** | Centralized, versioned, point-in-time correct |
| 3. Training | **Experiments** | Track runs, compare hyperparameters |
| 4. Registry | **Snowflake Model Registry** | Version control, aliases, lifecycle |
| 5. Deployment | Model Serving | Inference at scale |
| 6. Monitoring | **ML Observability** | Drift detection, alerting |
| 7. Lineage | **ML Lineage** | End-to-end traceability for compliance |

### Use Case: Clinical Trial Outcome Prediction

Predict patient response category (Complete Response, Partial Response, Stable Disease, Progressive Disease) based on:
- Biomarker status
- Treatment arm
- Trial phase
- Target gene
- ctDNA confirmation

**Business Value**: Patient stratification for trial enrollment optimization.

---
## Part 1: Environment Setup & Connection

Connect to Snowflake and import native ML libraries.

In [1]:
# Core imports
import warnings
warnings.filterwarnings('ignore')

# Snowpark
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *

# Snowflake ML - Native APIs
from snowflake.ml.feature_store import (
    FeatureStore, 
    Entity, 
    FeatureView,
    CreationMode
)
from snowflake.ml.registry import Registry
from snowflake.ml.modeling.preprocessing import OneHotEncoder, StandardScaler
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.xgboost import XGBClassifier
from snowflake.ml.modeling.metrics import (
    accuracy_score, 
    precision_score, 
    recall_score,
    f1_score,
    confusion_matrix
)

# Visualization
import pandas as pd
import numpy as np

print("✓ Snowflake ML libraries imported")

Exception: To use the modeling library, install scikit-learn version >= 1.4 and < 1.8

In [2]:
# Connection using connection_name (CLI-style connection)
import os
from snowflake.snowpark import Session

# Use CLI connection name
connection_name = os.environ.get("SNOWFLAKE_CONNECTION_NAME", "sfseeeurope_keypair")

session = Session.builder.config("connection_name", connection_name).create()

# Set context
session.sql("USE DATABASE LIFEARC_POC").collect()
session.sql("USE SCHEMA ML_DEMO").collect()
session.sql("USE WAREHOUSE COMPUTE_WH").collect()

# Verify connection
print(f"✓ Connected via {connection_name}")
print(f"Database: {session.get_current_database()}")
print(f"Schema: {session.get_current_schema()}")
print(f"Warehouse: {session.get_current_warehouse()}")

TypeError: Expected bytes or RSAPrivateKey, got <class 'NoneType'>

---
## Part 2: Data Discovery & EDA

Explore the clinical trial data to understand feature candidates.

In [3]:
# Load clinical trial results
clinical_df = session.table("LIFEARC_POC.DATA_SHARING.CLINICAL_TRIAL_RESULTS")

print(f"Total records: {clinical_df.count():,}")
print(f"\nSchema:")
for field in clinical_df.schema.fields:
    print(f"  {field.name}: {field.datatype}")

NameError: name 'session' is not defined

In [4]:
# Response category distribution (target variable)
response_dist = clinical_df.group_by("RESPONSE_CATEGORY").count().order_by("COUNT", ascending=False)
response_dist.show()

# This is what we're predicting - patient response to treatment

NameError: name 'clinical_df' is not defined

In [5]:
# Explore feature distributions
print("=== Biomarker Status ===")
clinical_df.group_by("BIOMARKER_STATUS").agg(
    F.count("*").alias("COUNT"),
    F.avg("PFS_MONTHS").alias("AVG_PFS")
).show()

print("\n=== ctDNA Confirmation ===")
clinical_df.group_by("CTDNA_CONFIRMATION").agg(
    F.count("*").alias("COUNT"),
    F.avg("PFS_MONTHS").alias("AVG_PFS")
).show()

print("\n=== Treatment Arm Performance ===")
clinical_df.group_by("TREATMENT_ARM").agg(
    F.count("*").alias("COUNT"),
    F.avg("PFS_MONTHS").alias("AVG_PFS"),
    F.avg("OS_MONTHS").alias("AVG_OS")
).order_by("AVG_PFS", ascending=False).show()

=== Biomarker Status ===


NameError: name 'clinical_df' is not defined

In [6]:
# Response rate by biomarker + ctDNA combination
# This is the kind of insight that drives trial design

response_analysis = clinical_df.with_column(
    "IS_RESPONDER",
    F.when(F.col("RESPONSE_CATEGORY").isin(["Complete_Response", "Partial_Response"]), 1).otherwise(0)
).group_by("BIOMARKER_STATUS", "CTDNA_CONFIRMATION").agg(
    F.count("*").alias("PATIENTS"),
    F.sum("IS_RESPONDER").alias("RESPONDERS"),
    (F.sum("IS_RESPONDER") / F.count("*") * 100).alias("RESPONSE_RATE_PCT")
).order_by("RESPONSE_RATE_PCT", ascending=False)

print("Response Rate by Biomarker + ctDNA Confirmation:")
response_analysis.show()

NameError: name 'clinical_df' is not defined

---
## Part 3: Snowflake Feature Store

Create a **native Snowflake Feature Store** with:
- Entities (Patient, Trial)
- Feature Views with automatic refresh
- Point-in-time correct feature retrieval

**Why Feature Store matters for Life Sciences:**
- Reproducibility for regulatory submissions
- Consistent features across training and inference
- Automatic lineage tracking

In [7]:
# Initialize Feature Store
# The Feature Store is just a schema - we'll create a dedicated one

session.sql("CREATE SCHEMA IF NOT EXISTS LIFEARC_POC.ML_FEATURE_STORE").collect()

fs = FeatureStore(
    session=session,
    database="LIFEARC_POC",
    name="ML_FEATURE_STORE",
    default_warehouse="COMPUTE_WH",
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST
)

print(f"✓ Feature Store initialized: {fs.name}")

NameError: name 'session' is not defined

In [8]:
# Define entities - the subjects of our features

# Patient entity - each row represents a unique patient in a trial
patient_entity = Entity(
    name="PATIENT",
    join_keys=["PATIENT_ID"],
    desc="Individual patient enrolled in clinical trial"
)

# Trial entity - for trial-level features
trial_entity = Entity(
    name="TRIAL",
    join_keys=["TRIAL_ID"],
    desc="Clinical trial identifier"
)

# Register entities
fs.register_entity(patient_entity)
fs.register_entity(trial_entity)

print("✓ Entities registered:")
for entity in fs.list_entities().to_pandas().itertuples():
    print(f"  - {entity.NAME}: {entity.JOIN_KEYS}")

NameError: name 'fs' is not defined

In [9]:
# Create Patient Feature View
# These are features derived from patient-level clinical data

patient_features_df = session.sql("""
    SELECT 
        PATIENT_ID,
        
        -- Demographics (encoded)
        PATIENT_AGE,
        CASE WHEN PATIENT_AGE < 50 THEN 'YOUNG'
             WHEN PATIENT_AGE < 65 THEN 'MIDDLE'
             ELSE 'SENIOR' END AS AGE_GROUP,
        
        -- Biomarker features
        BIOMARKER_STATUS,
        CASE WHEN BIOMARKER_STATUS = 'POSITIVE' THEN 1 ELSE 0 END AS BIOMARKER_POSITIVE,
        
        -- ctDNA features  
        CTDNA_CONFIRMATION,
        CASE WHEN CTDNA_CONFIRMATION = 'YES' THEN 1 ELSE 0 END AS CTDNA_CONFIRMED,
        
        -- Treatment features
        TREATMENT_ARM,
        CASE TREATMENT_ARM 
            WHEN 'Combination' THEN 3
            WHEN 'Experimental' THEN 2
            WHEN 'Standard' THEN 1
            ELSE 0 END AS TREATMENT_INTENSITY,
        
        -- Cohort features
        COHORT,
        
        -- Timestamp for point-in-time correctness
        CURRENT_TIMESTAMP() AS FEATURE_TIMESTAMP
        
    FROM LIFEARC_POC.DATA_SHARING.CLINICAL_TRIAL_RESULTS
""")

# Create feature view with managed refresh
patient_fv = FeatureView(
    name="PATIENT_CLINICAL_FEATURES",
    entities=[patient_entity],
    feature_df=patient_features_df,
    timestamp_col="FEATURE_TIMESTAMP",
    refresh_freq="1 day",  # Auto-refresh daily
    desc="Patient-level clinical features for response prediction"
)

# Register the feature view
patient_fv = fs.register_feature_view(
    feature_view=patient_fv,
    version="V1",
    block=True  # Wait for initial materialization
)

print(f"✓ Feature View registered: {patient_fv.name}")
print(f"  Features: {[f.name for f in patient_fv.feature_descs]}")

NameError: name 'session' is not defined

In [10]:
# Create Trial-level Feature View
# Aggregate statistics per trial that can inform patient outcomes

trial_features_df = session.sql("""
    SELECT 
        TRIAL_ID,
        
        -- Trial performance metrics
        COUNT(*) AS TRIAL_ENROLLMENT,
        AVG(PFS_MONTHS) AS TRIAL_AVG_PFS,
        STDDEV(PFS_MONTHS) AS TRIAL_STD_PFS,
        AVG(OS_MONTHS) AS TRIAL_AVG_OS,
        
        -- Response rates by trial
        SUM(CASE WHEN RESPONSE_CATEGORY IN ('Complete_Response', 'Partial_Response') 
            THEN 1 ELSE 0 END) * 100.0 / COUNT(*) AS TRIAL_RESPONSE_RATE,
        
        -- Biomarker prevalence in trial
        SUM(CASE WHEN BIOMARKER_STATUS = 'POSITIVE' THEN 1 ELSE 0 END) * 100.0 / COUNT(*) 
            AS TRIAL_BIOMARKER_POSITIVE_PCT,
        
        -- ctDNA usage in trial
        SUM(CASE WHEN CTDNA_CONFIRMATION = 'YES' THEN 1 ELSE 0 END) * 100.0 / COUNT(*) 
            AS TRIAL_CTDNA_USAGE_PCT,
        
        CURRENT_TIMESTAMP() AS FEATURE_TIMESTAMP
        
    FROM LIFEARC_POC.DATA_SHARING.CLINICAL_TRIAL_RESULTS
    GROUP BY TRIAL_ID
""")

trial_fv = FeatureView(
    name="TRIAL_AGGREGATE_FEATURES",
    entities=[trial_entity],
    feature_df=trial_features_df,
    timestamp_col="FEATURE_TIMESTAMP",
    refresh_freq="1 day",
    desc="Trial-level aggregate features"
)

trial_fv = fs.register_feature_view(
    feature_view=trial_fv,
    version="V1",
    block=True
)

print(f"✓ Feature View registered: {trial_fv.name}")

NameError: name 'session' is not defined

In [11]:
# List all feature views in our store
print("=== Feature Store Contents ===")
fs.list_feature_views().to_pandas()

=== Feature Store Contents ===


NameError: name 'fs' is not defined

---
## Part 4: Generate Training Dataset

Use the Feature Store to generate a **point-in-time correct** training dataset.

This ensures:
- No data leakage (features computed before label observation)
- Reproducibility (same dataset can be regenerated)
- Lineage tracking (automatic connection to models)

In [12]:
# Create spine table - the join keys and labels for training
# This is the "ground truth" we're trying to predict

spine_df = session.sql("""
    SELECT 
        RESULT_ID,
        PATIENT_ID,
        TRIAL_ID,
        
        -- Target variable (what we're predicting)
        RESPONSE_CATEGORY,
        CASE RESPONSE_CATEGORY
            WHEN 'Complete_Response' THEN 3
            WHEN 'Partial_Response' THEN 2
            WHEN 'Stable_Disease' THEN 1
            WHEN 'Progressive_Disease' THEN 0
            ELSE -1 END AS RESPONSE_LABEL,
        
        -- Binary target (responder vs non-responder)
        CASE WHEN RESPONSE_CATEGORY IN ('Complete_Response', 'Partial_Response') 
            THEN 1 ELSE 0 END AS IS_RESPONDER,
        
        -- Timestamp for point-in-time join
        CURRENT_TIMESTAMP() AS LABEL_TIMESTAMP
        
    FROM LIFEARC_POC.DATA_SHARING.CLINICAL_TRIAL_RESULTS
    WHERE RESPONSE_CATEGORY IS NOT NULL
""")

print(f"Spine table rows: {spine_df.count():,}")
spine_df.show(5)

NameError: name 'session' is not defined

In [13]:
# Generate training dataset by joining features to spine
# The Feature Store handles point-in-time correctness automatically

training_dataset = fs.generate_dataset(
    name="RESPONSE_PREDICTION_TRAINING",
    version="V1",
    spine_df=spine_df,
    features=[
        patient_fv,  # All patient features
        trial_fv     # All trial features
    ],
    spine_timestamp_col="LABEL_TIMESTAMP",
    spine_label_cols=["RESPONSE_CATEGORY", "RESPONSE_LABEL", "IS_RESPONDER"],
    desc="Training dataset for clinical response prediction"
)

# Convert to DataFrame for training
training_df = training_dataset.read.to_snowpark_dataframe()
print(f"\n✓ Training dataset generated: {training_df.count():,} rows")
print(f"  Columns: {training_df.columns}")

NameError: name 'fs' is not defined

In [14]:
# Split into train/validation/test sets
# Using hash-based split for reproducibility

train_df = training_df.filter(F.abs(F.hash("RESULT_ID")) % 10 < 7)  # 70%
val_df = training_df.filter((F.abs(F.hash("RESULT_ID")) % 10 >= 7) & 
                            (F.abs(F.hash("RESULT_ID")) % 10 < 9))  # 20%
test_df = training_df.filter(F.abs(F.hash("RESULT_ID")) % 10 >= 9)  # 10%

print(f"Training set: {train_df.count():,} rows")
print(f"Validation set: {val_df.count():,} rows")
print(f"Test set: {test_df.count():,} rows")

NameError: name 'training_df' is not defined

---
## Part 5: Model Training with Snowpark ML

Train a classification model using **Snowpark ML** - no data leaves Snowflake.

Benefits:
- Data stays under governance
- Distributed training on Snowflake compute
- Native integration with Model Registry

In [15]:
# Define feature columns for training
NUMERIC_FEATURES = [
    "PATIENT_AGE",
    "BIOMARKER_POSITIVE",
    "CTDNA_CONFIRMED",
    "TREATMENT_INTENSITY",
    "TRIAL_ENROLLMENT",
    "TRIAL_AVG_PFS",
    "TRIAL_RESPONSE_RATE",
    "TRIAL_BIOMARKER_POSITIVE_PCT",
    "TRIAL_CTDNA_USAGE_PCT"
]

CATEGORICAL_FEATURES = [
    "AGE_GROUP",
    "BIOMARKER_STATUS",
    "TREATMENT_ARM",
    "COHORT"
]

TARGET = "IS_RESPONDER"  # Binary classification: responder vs non-responder

print(f"Numeric features: {len(NUMERIC_FEATURES)}")
print(f"Categorical features: {len(CATEGORICAL_FEATURES)}")
print(f"Target: {TARGET}")

Numeric features: 9
Categorical features: 4
Target: IS_RESPONDER


In [16]:
# Build Snowpark ML Pipeline
# This runs entirely within Snowflake

# Preprocessing: Scale numeric, encode categorical
scaler = StandardScaler(
    input_cols=NUMERIC_FEATURES,
    output_cols=[f"{c}_SCALED" for c in NUMERIC_FEATURES]
)

encoder = OneHotEncoder(
    input_cols=CATEGORICAL_FEATURES,
    output_cols=[f"{c}_ENCODED" for c in CATEGORICAL_FEATURES],
    drop_input_cols=True
)

# Model: XGBoost Classifier
model = XGBClassifier(
    input_cols=[f"{c}_SCALED" for c in NUMERIC_FEATURES] + 
               [f"{c}_ENCODED" for c in CATEGORICAL_FEATURES],
    label_cols=[TARGET],
    output_cols=["PREDICTION"],
    n_estimators=100,
    max_depth=6,
    learning_rate=0.1,
    random_state=42
)

# Assemble pipeline
pipeline = Pipeline(steps=[
    ("scaler", scaler),
    ("encoder", encoder),
    ("classifier", model)
])

print("✓ ML Pipeline defined")

NameError: name 'StandardScaler' is not defined

In [17]:
# Train the model
# All computation happens in Snowflake warehouse

print("Training model... (this runs on Snowflake compute)")
pipeline.fit(train_df)
print("✓ Model trained")

Training model... (this runs on Snowflake compute)


NameError: name 'pipeline' is not defined

In [18]:
# Evaluate on validation set
val_predictions = pipeline.predict(val_df)

# Calculate metrics
val_pdf = val_predictions.select(TARGET, "PREDICTION").to_pandas()

accuracy = accuracy_score(val_pdf[TARGET], val_pdf["PREDICTION"])
precision = precision_score(val_pdf[TARGET], val_pdf["PREDICTION"])
recall = recall_score(val_pdf[TARGET], val_pdf["PREDICTION"])
f1 = f1_score(val_pdf[TARGET], val_pdf["PREDICTION"])

print("=== Validation Metrics ===")
print(f"Accuracy:  {accuracy:.3f}")
print(f"Precision: {precision:.3f}")
print(f"Recall:    {recall:.3f}")
print(f"F1 Score:  {f1:.3f}")

# Confusion matrix
cm = confusion_matrix(val_pdf[TARGET], val_pdf["PREDICTION"])
print(f"\nConfusion Matrix:")
print(f"  TN={cm[0][0]}, FP={cm[0][1]}")
print(f"  FN={cm[1][0]}, TP={cm[1][1]}")

NameError: name 'pipeline' is not defined

---
## Part 6: Snowflake Model Registry

Register the trained model in the **native Snowflake Model Registry**.

Capabilities:
- Version control with semantic versioning
- Aliases for lifecycle stages (dev, staging, production)
- Metrics tracking
- Role-based access control
- Automatic lineage to training data

In [19]:
# Initialize Model Registry
registry = Registry(
    session=session,
    database_name="LIFEARC_POC",
    schema_name="ML_DEMO"
)

print("✓ Model Registry initialized")

NameError: name 'session' is not defined

In [20]:
# Log model to registry with metrics
model_version = registry.log_model(
    model=pipeline,
    model_name="CLINICAL_RESPONSE_PREDICTOR",
    version_name="V1",
    metrics={
        "accuracy": float(accuracy),
        "precision": float(precision),
        "recall": float(recall),
        "f1_score": float(f1),
        "training_rows": train_df.count(),
        "validation_rows": val_df.count()
    },
    comment="XGBoost classifier for predicting clinical trial response"
)

print(f"✓ Model logged: {model_version.model_name} version {model_version.version_name}")

NameError: name 'registry' is not defined

In [21]:
# Set alias for lifecycle management
# This allows production code to always call 'production' version

model_version.set_alias("development")
print("✓ Alias 'development' set")

# After validation, promote to production:
# model_version.set_alias("production")

NameError: name 'model_version' is not defined

In [22]:
# List all models in registry
print("=== Model Registry Contents ===")
registry.show_models()

=== Model Registry Contents ===


NameError: name 'registry' is not defined

In [23]:
# Get detailed model information
model = registry.get_model("CLINICAL_RESPONSE_PREDICTOR")
print(f"Model: {model.name}")
print(f"\nVersions:")
for version in model.versions():
    print(f"  - {version.version_name}")
    print(f"    Metrics: {version.get_metric('accuracy'):.3f} accuracy")
    print(f"    Aliases: {version.list_aliases()}")

NameError: name 'registry' is not defined

---
## Part 7: Model Inference

Deploy the model for batch and real-time inference.

In [24]:
# Batch inference on test set
# Retrieve model from registry and run predictions

model_ref = registry.get_model("CLINICAL_RESPONSE_PREDICTOR").version("V1")

# Run inference
test_predictions = model_ref.run(test_df, function_name="predict")

print("=== Test Set Predictions ===")
test_predictions.select(
    "RESULT_ID", 
    "PATIENT_ID", 
    TARGET, 
    "PREDICTION"
).show(10)

NameError: name 'registry' is not defined

In [25]:
# Final test metrics
test_pdf = test_predictions.select(TARGET, "PREDICTION").to_pandas()

test_accuracy = accuracy_score(test_pdf[TARGET], test_pdf["PREDICTION"])
test_precision = precision_score(test_pdf[TARGET], test_pdf["PREDICTION"])
test_recall = recall_score(test_pdf[TARGET], test_pdf["PREDICTION"])
test_f1 = f1_score(test_pdf[TARGET], test_pdf["PREDICTION"])

print("=== Test Set Metrics (Holdout) ===")
print(f"Accuracy:  {test_accuracy:.3f}")
print(f"Precision: {test_precision:.3f}")
print(f"Recall:    {test_recall:.3f}")
print(f"F1 Score:  {test_f1:.3f}")

NameError: name 'test_predictions' is not defined

In [26]:
# SQL-based inference (for production use)
# This is how downstream systems would call the model

sql_inference_example = """
-- Call model directly from SQL
WITH patient_features AS (
    SELECT * FROM LIFEARC_POC.ML_FEATURE_STORE.PATIENT_CLINICAL_FEATURES$V1
),
trial_features AS (
    SELECT * FROM LIFEARC_POC.ML_FEATURE_STORE.TRIAL_AGGREGATE_FEATURES$V1
)
SELECT 
    p.PATIENT_ID,
    LIFEARC_POC.ML_DEMO.CLINICAL_RESPONSE_PREDICTOR!PREDICT(
        OBJECT_CONSTRUCT(*)
    ) AS PREDICTION
FROM patient_features p
JOIN trial_features t ON p.TRIAL_ID = t.TRIAL_ID
LIMIT 10;
"""

print("SQL Inference Pattern:")
print(sql_inference_example)

SQL Inference Pattern:

-- Call model directly from SQL
WITH patient_features AS (
    SELECT * FROM LIFEARC_POC.ML_FEATURE_STORE.PATIENT_CLINICAL_FEATURES$V1
),
trial_features AS (
    SELECT * FROM LIFEARC_POC.ML_FEATURE_STORE.TRIAL_AGGREGATE_FEATURES$V1
)
SELECT 
    p.PATIENT_ID,
    LIFEARC_POC.ML_DEMO.CLINICAL_RESPONSE_PREDICTOR!PREDICT(
        OBJECT_CONSTRUCT(*)
    ) AS PREDICTION
FROM patient_features p
JOIN trial_features t ON p.TRIAL_ID = t.TRIAL_ID
LIMIT 10;



---
## Part 8: Model Monitoring & Observability

Set up monitoring for:
- Prediction drift
- Feature distribution changes
- Performance degradation

In [27]:
# Create monitoring table for tracking predictions over time
session.sql("""
    CREATE TABLE IF NOT EXISTS LIFEARC_POC.ML_DEMO.MODEL_MONITORING (
        MONITORING_ID VARCHAR DEFAULT UUID_STRING(),
        MODEL_NAME VARCHAR,
        MODEL_VERSION VARCHAR,
        MONITORING_DATE DATE,
        TOTAL_PREDICTIONS INT,
        POSITIVE_PREDICTIONS INT,
        NEGATIVE_PREDICTIONS INT,
        POSITIVE_RATE FLOAT,
        AVG_CONFIDENCE FLOAT,
        FEATURE_STATS VARIANT,
        CREATED_AT TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
    )
""").collect()

print("✓ Monitoring table created")

NameError: name 'session' is not defined

In [28]:
# Log today's prediction statistics
prediction_stats = test_predictions.agg(
    F.count("*").alias("TOTAL"),
    F.sum(F.when(F.col("PREDICTION") == 1, 1).otherwise(0)).alias("POSITIVE"),
    F.sum(F.when(F.col("PREDICTION") == 0, 1).otherwise(0)).alias("NEGATIVE")
).collect()[0]

session.sql(f"""
    INSERT INTO LIFEARC_POC.ML_DEMO.MODEL_MONITORING 
    (MODEL_NAME, MODEL_VERSION, MONITORING_DATE, TOTAL_PREDICTIONS, 
     POSITIVE_PREDICTIONS, NEGATIVE_PREDICTIONS, POSITIVE_RATE)
    VALUES (
        'CLINICAL_RESPONSE_PREDICTOR',
        'V1',
        CURRENT_DATE(),
        {prediction_stats['TOTAL']},
        {prediction_stats['POSITIVE']},
        {prediction_stats['NEGATIVE']},
        {prediction_stats['POSITIVE'] / prediction_stats['TOTAL']:.4f}
    )
""").collect()

print("✓ Prediction statistics logged")

NameError: name 'test_predictions' is not defined

In [29]:
# Drift detection query
# Compare current predictions to historical baseline

drift_query = """
-- Detect prediction drift
WITH baseline AS (
    SELECT AVG(POSITIVE_RATE) AS baseline_rate
    FROM LIFEARC_POC.ML_DEMO.MODEL_MONITORING
    WHERE MODEL_NAME = 'CLINICAL_RESPONSE_PREDICTOR'
      AND MONITORING_DATE < CURRENT_DATE() - 7
),
current AS (
    SELECT AVG(POSITIVE_RATE) AS current_rate
    FROM LIFEARC_POC.ML_DEMO.MODEL_MONITORING
    WHERE MODEL_NAME = 'CLINICAL_RESPONSE_PREDICTOR'
      AND MONITORING_DATE >= CURRENT_DATE() - 7
)
SELECT 
    b.baseline_rate,
    c.current_rate,
    ABS(c.current_rate - b.baseline_rate) AS drift,
    CASE 
        WHEN ABS(c.current_rate - b.baseline_rate) > 0.1 THEN 'ALERT: Significant drift detected'
        WHEN ABS(c.current_rate - b.baseline_rate) > 0.05 THEN 'WARNING: Moderate drift'
        ELSE 'OK: Within normal range'
    END AS status
FROM baseline b, current c;
"""

print("Drift Detection Query:")
print(drift_query)

Drift Detection Query:

-- Detect prediction drift
WITH baseline AS (
    SELECT AVG(POSITIVE_RATE) AS baseline_rate
    FROM LIFEARC_POC.ML_DEMO.MODEL_MONITORING
    WHERE MODEL_NAME = 'CLINICAL_RESPONSE_PREDICTOR'
      AND MONITORING_DATE < CURRENT_DATE() - 7
),
current AS (
    SELECT AVG(POSITIVE_RATE) AS current_rate
    FROM LIFEARC_POC.ML_DEMO.MODEL_MONITORING
    WHERE MODEL_NAME = 'CLINICAL_RESPONSE_PREDICTOR'
      AND MONITORING_DATE >= CURRENT_DATE() - 7
)
SELECT 
    b.baseline_rate,
    c.current_rate,
    ABS(c.current_rate - b.baseline_rate) AS drift,
    CASE 
        WHEN ABS(c.current_rate - b.baseline_rate) > 0.1 THEN 'ALERT: Significant drift detected'
        ELSE 'OK: Within normal range'
    END AS status
FROM baseline b, current c;



In [30]:
# Create scheduled task for continuous monitoring
monitoring_task_sql = """
-- Create scheduled monitoring task
CREATE OR REPLACE TASK LIFEARC_POC.ML_DEMO.MONITOR_MODEL_PREDICTIONS
    WAREHOUSE = COMPUTE_WH
    SCHEDULE = 'USING CRON 0 8 * * * UTC'  -- Daily at 8 AM UTC
AS
CALL LIFEARC_POC.ML_DEMO.LOG_PREDICTION_STATS();

-- To enable:
-- ALTER TASK LIFEARC_POC.ML_DEMO.MONITOR_MODEL_PREDICTIONS RESUME;
"""

print("Scheduled Monitoring Task:")
print(monitoring_task_sql)

Scheduled Monitoring Task:

-- Create scheduled monitoring task
CREATE OR REPLACE TASK LIFEARC_POC.ML_DEMO.MONITOR_MODEL_PREDICTIONS
    WAREHOUSE = COMPUTE_WH
    SCHEDULE = 'USING CRON 0 8 * * * UTC'  -- Daily at 8 AM UTC
AS
CALL LIFEARC_POC.ML_DEMO.LOG_PREDICTION_STATS();

-- To enable:
-- ALTER TASK LIFEARC_POC.ML_DEMO.MONITOR_MODEL_PREDICTIONS RESUME;



---
## Part 9: ML Lineage

Snowflake ML automatically tracks lineage from:
- Source data → Features → Training Dataset → Model

This is critical for:
- Regulatory compliance (21 CFR Part 11)
- Reproducibility
- Debugging production issues

In [31]:
# Query lineage information
lineage_query = """
-- View model lineage
SELECT 
    OBJECT_NAME,
    OBJECT_DATABASE,
    OBJECT_SCHEMA,
    OBJECT_TYPE,
    UPSTREAM_OBJECT_NAME,
    UPSTREAM_OBJECT_TYPE
FROM TABLE(INFORMATION_SCHEMA.OBJECT_DEPENDENCIES(
    OBJECT_NAME => 'CLINICAL_RESPONSE_PREDICTOR',
    OBJECT_TYPE => 'MODEL'
));
"""

print("Lineage Query:")
print(lineage_query)

# In Snowsight, you can also visualize lineage graphically
print("\nTip: View lineage graph in Snowsight > Data > ML_DEMO > Models > CLINICAL_RESPONSE_PREDICTOR")

Lineage Query:

-- View model lineage
SELECT 
    OBJECT_NAME,
    OBJECT_DATABASE,
    OBJECT_SCHEMA,
    OBJECT_TYPE,
    UPSTREAM_OBJECT_NAME,
    UPSTREAM_OBJECT_TYPE
FROM TABLE(INFORMATION_SCHEMA.OBJECT_DEPENDENCIES(
    OBJECT_NAME => 'CLINICAL_RESPONSE_PREDICTOR',
    OBJECT_TYPE => 'MODEL'
));


Tip: View lineage graph in Snowsight > Data > ML_DEMO > Models > CLINICAL_RESPONSE_PREDICTOR


---
## Summary: Complete ML Lifecycle in Snowflake

This notebook demonstrated end-to-end ML using **native Snowflake capabilities**:

| Stage | What We Built | Snowflake Feature |
|-------|---------------|-------------------|
| 1. Discovery | Explored clinical trial data | Snowpark DataFrames |
| 2. Features | Patient & Trial feature views | **Snowflake Feature Store** |
| 3. Training | XGBoost classification pipeline | Snowpark ML |
| 4. Registry | Versioned model with metrics | **Snowflake Model Registry** |
| 5. Inference | Batch predictions via SQL | Model Registry |
| 6. Monitoring | Drift detection | ML Observability |
| 7. Lineage | Source-to-model traceability | **ML Lineage** |

### Objects Created

```
LIFEARC_POC.ML_FEATURE_STORE/
├── PATIENT entity
├── TRIAL entity
├── PATIENT_CLINICAL_FEATURES (Feature View)
└── TRIAL_AGGREGATE_FEATURES (Feature View)

LIFEARC_POC.ML_DEMO/
├── CLINICAL_RESPONSE_PREDICTOR (Model)
│   └── V1 (Version, alias: development)
└── MODEL_MONITORING (Table)
```

### Why This Matters for Life Sciences

1. **Regulatory Compliance**: Full lineage from source to prediction
2. **Data Governance**: PHI never leaves Snowflake
3. **Reproducibility**: Feature Store ensures consistent features
4. **Auditability**: Model Registry tracks all versions
5. **Operationalization**: SQL-based inference for production systems

In [32]:
# Close session if running locally
# session.close()
print("\n✓ Notebook complete")


✓ Notebook complete
