# Summit Freight Capital: CarrierMatch AI platform

This notebook walks through a complete Snowflake data platform built for freight factoring intelligence. It covers workload isolation, real-time data ingestion, a unified broker data model, GPU-accelerated ML, production inference, natural language analytics, and source control integration.

---

## Architecture overview

```
┌─────────────────────┐     ┌──────────────────────────────────────────────────────┐     ┌─────────────┐
│   LEGACY SYSTEMS    │     │            SNOWFLAKE: FREIGHT_DEMO                   │     │  NEXTLOAD   │
│                     │     │                                                      │     │   APP       │
│  Oracle DB          │────▶│  RAW ──▶ BROKER_360 (Dynamic Table, 5-min refresh)  │     │             │
│  FTP / JSON Files   │     │   │      ┌──────────┴──────────┐                    │     │  GET_       │
│                     │     │   │      │ Semantic View        │                    │────▶│  RECOMMEND- │
│                     │     │   │      │ BROKER_360_SV        │                    │     │  ATION_     │
│                     │     │   │      └──────────┬──────────┘                    │     │  SCORE()    │
│                     │     │   │                  │                               │     └─────────────┘
│                     │     │   │      ┌───────────▼──────────┐                   │
│                     │     │   │      │ Cortex Agent          │ ◀── Natural language queries
│                     │     │   │      │ BROKER_AGENT          │                   │
│                     │     │   │      └──────────────────────┘                   │
│                     │     │   │                                                  │
│  Marketplace ──────▶│   Weather ──▶ BROKER_360                                  │
│  (WeatherSource)    │     │                                                      │
│                     │     │  DS_SANDBOX (Zero-Copy Clone) ◀── ML workloads      │
│                     │     │  ML Schema ◀── PyTorch + GPU Pool + Model Registry  │
│                     │     │  Horizon: PII Tags + Masking + RBAC ◀── Governance  │
└─────────────────────┘     └──────────────────────────────────────────────────────┘
```

**See `architecture.md` for the full Mermaid diagram.**

## Phase 1: Workload isolation

Analytical queries running against production databases can cause contention and outages. Snowflake's architecture separates storage from compute, enabling:

1. **Zero-copy cloning** — instant, metadata-only copies for safe experimentation
2. **Warehouse isolation** — dedicated compute for different workloads with no resource contention

In [None]:
-- Setup: Use the Analytics warehouse
USE WAREHOUSE ANALYTICS_WH;
USE DATABASE FREIGHT_DEMO;
USE SCHEMA RAW;

In [None]:
-- Show the raw data tables (simulating Oracle replication)
SHOW TABLES IN SCHEMA RAW;

In [None]:
-- Quick data overview
SELECT 
    'carrier_profiles' AS table_name, COUNT(*) AS row_count FROM RAW.carrier_profiles
UNION ALL
SELECT 'broker_profiles', COUNT(*) FROM RAW.broker_profiles
UNION ALL
SELECT 'invoice_transactions', COUNT(*) FROM RAW.invoice_transactions
UNION ALL
SELECT 'load_postings', COUNT(*) FROM RAW.load_postings;

### Simulated real-time ingestion (Snowpipe Streaming concept)

In production, Snowpipe Streaming ingests JSON from FTP drops in real-time. A scheduled Task simulates inserting new invoice JSON every 60 seconds.

Pipeline: **JSON (VARIANT) → Stream → Flattened View → Dynamic Table**

In [None]:
-- JSON staging table (simulating Snowpipe Streaming landing zone)
-- New rows appear every 60 seconds from the scheduled Task
SELECT COUNT(*) AS total_json_records, MAX(INGESTED_AT) AS latest_ingestion
FROM RAW.INVOICE_TRANSACTIONS_JSON;

-- Peek at raw JSON format (what arrives from FTP)
SELECT RAW_DATA, INGESTED_AT FROM RAW.INVOICE_TRANSACTIONS_JSON ORDER BY INGESTED_AT DESC LIMIT 3;

In [None]:
-- Flattened view: JSON -> structured columns automatically
SELECT * FROM RAW.INVOICE_TRANSACTIONS_FLATTENED ORDER BY INGESTED_AT DESC LIMIT 5;

-- Verify Stream is tracking changes
SHOW STREAMS LIKE 'INVOICE_JSON_STREAM' IN SCHEMA RAW;

### Zero-copy clone

This clone is instant and uses zero additional storage. Data science teams can run heavy queries on `DS_SANDBOX` without affecting the production `RAW` schema.

In [None]:
-- Verify the clone exists (created instantly, zero bytes copied)
SHOW SCHEMAS LIKE 'DS_SANDBOX' IN DATABASE FREIGHT_DEMO;

In [None]:
-- Run a HEAVY aggregation on the SANDBOX using a DIFFERENT warehouse
-- This proves workload isolation - this query uses DS_SANDBOX_WH, not ANALYTICS_WH
USE WAREHOUSE DS_SANDBOX_WH;

SELECT 
    broker_id,
    COUNT(*) AS invoice_count,
    SUM(invoice_amount) AS total_factored,
    AVG(DATEDIFF('day', invoice_date, payment_received_date)) AS avg_payment_days,
    STDDEV(invoice_amount) AS amount_stddev
FROM DS_SANDBOX.invoice_transactions
GROUP BY broker_id
ORDER BY total_factored DESC
LIMIT 10;

In [None]:
-- Verify warehouse isolation - check which warehouse ran each query
SELECT 
    query_text,
    warehouse_name,
    warehouse_size,
    execution_time / 1000 AS execution_seconds
FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY())
WHERE query_text ILIKE '%invoice_transactions%'
ORDER BY start_time DESC
LIMIT 5;

### Governance: PII masking with Snowflake Horizon

PII fields (driver SSN, bank account numbers) are automatically masked based on role. Analysts see `***-**-1234` while authorized roles see the full value. No application-level code is required — masking is enforced at the platform level.

In [None]:
-- As ACCOUNTADMIN (IT Ops equivalent): See full PII
SELECT CARRIER_NAME, DRIVER_SSN, BANK_ACCOUNT_NUMBER 
FROM RAW.CARRIER_PROFILES LIMIT 5;

-- Show masking policies in place
SHOW MASKING POLICIES IN DATABASE FREIGHT_DEMO;

-- Show PII tags applied
SELECT * FROM TABLE(FREIGHT_DEMO.INFORMATION_SCHEMA.TAG_REFERENCES('FREIGHT_DEMO.RAW.CARRIER_PROFILES', 'TABLE'));

---

## Phase 2: Unified broker data model (single source of truth)

Multiple systems often hold conflicting versions of the same broker data. A Dynamic Table (`broker_360`) solves this by automatically refreshing and combining:
- Invoice and payment history (replicated from Oracle)
- Broker credit scores
- Double-brokering fraud signals
- Real-time weather risk (Snowflake Marketplace)

In [None]:
USE WAREHOUSE ANALYTICS_WH;

-- Show the Dynamic Table definition
SHOW DYNAMIC TABLES LIKE 'BROKER_360' IN SCHEMA ANALYTICS;

In [None]:
-- The Golden Record: One row per broker with ALL metrics unified
SELECT 
    broker_id,
    broker_name,
    mc_number,
    credit_score,
    avg_days_to_pay,
    fraud_risk_level,
    composite_risk_score,
    current_weather_risk,
    total_factored_amount,
    last_refreshed
FROM ANALYTICS.broker_360
ORDER BY composite_risk_score DESC
LIMIT 10;

In [None]:
-- High-risk brokers in Texas affected by weather
SELECT 
    broker_name,
    hq_state,
    credit_score,
    fraud_risk_level,
    current_weather_risk,
    composite_risk_score,
    double_broker_flag AS potential_fraud
FROM ANALYTICS.broker_360
WHERE hq_state = 'TX'
  AND (fraud_risk_level IN ('HIGH', 'CRITICAL') OR current_weather_risk = 'HIGH')
ORDER BY composite_risk_score DESC;

In [None]:
-- Visualize risk distribution
SELECT 
    fraud_risk_level,
    COUNT(*) AS broker_count,
    ROUND(AVG(credit_score), 0) AS avg_credit_score,
    ROUND(AVG(avg_days_to_pay), 1) AS avg_payment_days,
    SUM(total_factored_amount) AS total_exposure
FROM ANALYTICS.broker_360
GROUP BY fraud_risk_level
ORDER BY 
    CASE fraud_risk_level 
        WHEN 'CRITICAL' THEN 1 
        WHEN 'HIGH' THEN 2 
        WHEN 'MEDIUM' THEN 3 
        ELSE 4 
    END;

---

## Phase 3: Data science workbench

Managing separate container infrastructure (e.g., ECS) for ML workloads adds operational overhead. Snowflake Notebooks with Container Runtime and native GPU Compute Pools provide a fully managed alternative.

### Train a late payment risk classifier with PyTorch
Using `torch` and `snowflake-ml-python` — no Docker images, no container orchestration, no infrastructure tickets.
The GPU compute pool (`GPU_POOL`) is selected from a dropdown in the Notebook UI.

In [None]:
# Import required libraries
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd

import snowflake.snowpark as snowpark
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark import functions as F
from snowflake.ml.registry import Registry

# Check GPU availability (Container Runtime with GPU_POOL)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"PyTorch {torch.__version__} | Device: {device}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
else:
    print("No GPU detected -- select GPU_POOL in Notebook settings for GPU acceleration")

In [None]:
# Get active Snowpark session and prepare training data
session = get_active_session()
print(f"Connected as: {session.get_current_user()}")
print(f"Warehouse: {session.get_current_warehouse()}")

# Pull training data from broker_360
training_sp = session.table("FREIGHT_DEMO.ANALYTICS.BROKER_360").select(
    F.col("CREDIT_SCORE"),
    F.col("AVG_DAYS_TO_PAY"),
    F.col("TOTAL_INVOICES"),
    F.col("AVG_INVOICE_AMOUNT"),
    F.col("DISPUTED_INVOICES"),
    F.col("UNIQUE_LANES"),
    F.col("COMPOSITE_RISK_SCORE"),
    # Target: Is this a risky broker?
    F.when(F.col("FRAUD_RISK_LEVEL").isin(["HIGH", "CRITICAL"]), 1).otherwise(0).alias("IS_HIGH_RISK")
)

# Convert to pandas for PyTorch
df = training_sp.to_pandas()
print(f"Training samples: {len(df)}")
print(f"High-risk brokers: {df['IS_HIGH_RISK'].sum()} ({df['IS_HIGH_RISK'].mean()*100:.1f}%)")
df.head(5)

In [None]:
# Define PyTorch model: 2-layer neural network for broker risk classification
FEATURE_COLS = ["CREDIT_SCORE", "AVG_DAYS_TO_PAY", "TOTAL_INVOICES", 
                "AVG_INVOICE_AMOUNT", "DISPUTED_INVOICES", "UNIQUE_LANES", "COMPOSITE_RISK_SCORE"]
TARGET_COL = "IS_HIGH_RISK"

# Prepare tensors
X = df[FEATURE_COLS].values.astype(np.float32)
y = df[TARGET_COL].values.astype(np.float32).reshape(-1, 1)

# Normalize features
X_mean, X_std = X.mean(axis=0), X.std(axis=0) + 1e-8
X_norm = (X - X_mean) / X_std

X_tensor = torch.tensor(X_norm, dtype=torch.float32).to(device)
y_tensor = torch.tensor(y, dtype=torch.float32).to(device)

class BrokerRiskNet(nn.Module):
    """2-layer neural network for predicting high-risk brokers."""
    def __init__(self, input_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 32),
            nn.ReLU(),
            nn.Dropout(0.2),
            nn.Linear(32, 16),
            nn.ReLU(),
            nn.Linear(16, 1),
            nn.Sigmoid()
        )
    def forward(self, x):
        return self.net(x)

model = BrokerRiskNet(len(FEATURE_COLS)).to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train
print(f"Training BrokerRiskNet on {device}...")
for epoch in range(200):
    model.train()
    optimizer.zero_grad()
    outputs = model(X_tensor)
    loss = criterion(outputs, y_tensor)
    loss.backward()
    optimizer.step()
    if (epoch + 1) % 50 == 0:
        preds = (outputs > 0.5).float()
        acc = (preds == y_tensor).float().mean()
        print(f"  Epoch {epoch+1}/200 | Loss: {loss.item():.4f} | Accuracy: {acc.item()*100:.1f}%")

print("Model training complete -- no ECS, no Docker, no infrastructure tickets.")

In [None]:
# Evaluate and show predictions
model.eval()
with torch.no_grad():
    predictions = model(X_tensor).cpu().numpy()

df['RISK_PROBABILITY'] = predictions.flatten()
df['PREDICTED_HIGH_RISK'] = (df['RISK_PROBABILITY'] > 0.5).astype(int)

# Confusion summary
correct = (df['PREDICTED_HIGH_RISK'] == df['IS_HIGH_RISK']).sum()
print(f"Accuracy: {correct}/{len(df)} ({correct/len(df)*100:.1f}%)")
print(f"\nTop 10 riskiest brokers by model probability:")
df.nlargest(10, 'RISK_PROBABILITY')[['CREDIT_SCORE', 'AVG_DAYS_TO_PAY', 'DISPUTED_INVOICES', 
                                      'COMPOSITE_RISK_SCORE', 'IS_HIGH_RISK', 'RISK_PROBABILITY']]

In [None]:
# Register PyTorch model to Snowflake Model Registry
# Save model artifacts for registry
import tempfile, os

model_dir = tempfile.mkdtemp()
model_path = os.path.join(model_dir, "broker_risk_net.pt")
torch.save({
    'model_state_dict': model.state_dict(),
    'feature_cols': FEATURE_COLS,
    'X_mean': X_mean.tolist(),
    'X_std': X_std.tolist(),
    'input_dim': len(FEATURE_COLS)
}, model_path)

registry = Registry(session=session, database_name="FREIGHT_DEMO", schema_name="ML")

# Log the model with sample input
from snowflake.ml.model import custom_model

class BrokerRiskModel(custom_model.CustomModel):
    """Wrapper for PyTorch model to register in Snowflake Model Registry."""
    
    @custom_model.inference_api
    def predict(self, input_df: pd.DataFrame) -> pd.DataFrame:
        import torch, numpy as np
        features = input_df[self.context.model_ref.meta.signatures["predict"].inputs[0].as_snowpark_type().column_names].values.astype(np.float32)
        X_norm = (features - np.array(self.context.artifacts["X_mean"])) / np.array(self.context.artifacts["X_std"])
        tensor = torch.tensor(X_norm, dtype=torch.float32)
        with torch.no_grad():
            probs = self.context.artifacts["model"](tensor).numpy().flatten()
        return pd.DataFrame({"RISK_PROBABILITY": probs})

# For the demo, register using the simpler log_model approach
model_ref = registry.log_model(
    model=model,
    model_name="BROKER_RISK_CLASSIFIER",
    version_name="v1.0-experimental",
    comment="PyTorch neural network for predicting high-risk brokers. Trained on broker_360 features.",
    sample_input_data=df[FEATURE_COLS].head(10),
    conda_dependencies=["pytorch"]
)
print(f"Model registered: {model_ref.model_name} version {model_ref.version_name}")

In [None]:
# Show registered models in the ML schema
registry.show_models()

---

## Phase 4: Production inference (CarrierMatch app integration)

**Goal:** Deploy the model as a SQL-callable UDF for sub-second inference in the CarrierMatch app.

The `GET_RECOMMENDATION_SCORE(driver_id, load_id)` function is deployed from the Model Registry and returns a 0.0-1.0 match score. High-risk brokers automatically return 0.0.

In [None]:
-- Verify the UDF is deployed
SHOW USER FUNCTIONS LIKE 'GET_RECOMMENDATION_SCORE' IN SCHEMA FREIGHT_DEMO.ML;

In [None]:
-- Simulating the CarrierMatch Mobile App: requesting loads for a driver
-- This is the EXACT pattern the app would call via the Snowflake REST API
SELECT 
    l.load_id, 
    l.origin_city || ', ' || l.origin_state AS origin,
    l.destination_city || ', ' || l.destination_state AS destination,
    l.total_rate,
    l.equipment_required,
    b.broker_name,
    b.fraud_risk_level,
    FREIGHT_DEMO.ML.GET_RECOMMENDATION_SCORE(1, SPLIT_PART(l.load_id, '-', 2)::INT) AS match_score
FROM FREIGHT_DEMO.RAW.LOAD_POSTINGS l
JOIN FREIGHT_DEMO.ANALYTICS.BROKER_360 b ON l.BROKER_ID = b.BROKER_ID
WHERE l.status = 'AVAILABLE'
  AND FREIGHT_DEMO.ML.GET_RECOMMENDATION_SCORE(1, SPLIT_PART(l.load_id, '-', 2)::INT) > 0.50
ORDER BY match_score DESC
LIMIT 15;

In [None]:
import time

# Measure UDF latency - targeting sub-500ms for production CarrierMatch API
start = time.time()
result = session.sql("""
    SELECT FREIGHT_DEMO.ML.GET_RECOMMENDATION_SCORE(1, 1) AS score
""").collect()
elapsed_ms = (time.time() - start) * 1000

print(f"UDF Result: {result[0]['SCORE']}")
print(f"Latency:    {elapsed_ms:.0f}ms")
print()
if elapsed_ms < 500:
    print(f"SUB-500ms TARGET MET  ({elapsed_ms:.0f}ms < 500ms)")
else:
    print(f"Above 500ms target ({elapsed_ms:.0f}ms) -- first call may be cold start, re-run to verify")

# Also check via query history
history = session.sql("""
    SELECT QUERY_ID, TOTAL_ELAPSED_TIME, ROWS_PRODUCED
    FROM TABLE(INFORMATION_SCHEMA.QUERY_HISTORY_BY_SESSION(RESULT_LIMIT => 3))
    WHERE QUERY_TEXT ILIKE '%GET_RECOMMENDATION_SCORE%'
    ORDER BY START_TIME DESC LIMIT 1
""").collect()
if history:
    print(f"\nSnowflake-reported execution time: {history[0]['TOTAL_ELAPSED_TIME']}ms")

---

## Phase 5: Natural language analytics with Cortex

Executives and business users can ask questions in plain English and get trusted answers without writing SQL or filing tickets.

Configured components:
1. **Semantic View** on `broker_360` for structured queries
2. **Cortex Agent** combining Semantic View + Search capabilities

### Cortex Agent: natural language broker intelligence

The `BROKER_AGENT` allows users to ask questions in plain English.
No SQL knowledge required — the agent automatically:
1. Interprets the question
2. Generates SQL using the Semantic View  
3. Returns business-friendly answers

**Note:** Cortex Agents are invoked via REST API or Snowsight UI. Both invocation approaches are shown below.

In [None]:
-- Verify the Agent is deployed
DESCRIBE AGENT FREIGHT_DEMO.ANALYTICS.BROKER_AGENT;

In [None]:
# Cortex Agent invocation via Python REST API
import requests
import json

session = get_active_session()

def ask_broker_agent(question: str) -> str:
    """Query the Broker Intelligence Agent"""
    
    # Get connection details from session
    conn = session.connection
    
    # Agent REST API endpoint
    url = f"https://{conn.host}/api/v2/databases/FREIGHT_DEMO/schemas/ANALYTICS/agents/BROKER_AGENT:run"
    
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Snowflake Token=\"{conn.token}\"",
        "Accept": "application/json"
    }
    
    payload = {
        "stream": False,
        "messages": [
            {"role": "user", "content": question}
        ]
    }
    
    response = requests.post(url, headers=headers, json=payload)
    result = response.json()
    
    # Extract the agent's response
    if "messages" in result:
        for msg in result["messages"]:
            if msg.get("role") == "assistant":
                return msg.get("content", "No response")
    return str(result)

# Alternative: Use Snowflake's _snowflake module (available in Snowflake Notebooks)
try:
    import _snowflake
    def ask_broker_agent_native(question: str) -> dict:
        """Query agent using native Snowflake API (Snowflake Notebooks only)"""
        return _snowflake.send_snow_api_request(
            "POST",
            f"/api/v2/databases/FREIGHT_DEMO/schemas/ANALYTICS/agents/BROKER_AGENT:run",
            {},  # headers
            {},  # params  
            {"stream": False, "messages": [{"role": "user", "content": question}]},
            {},  # request_guid
            60000  # timeout_ms
        )
    print("Native Snowflake API available")
except ImportError:
    print("Running outside Snowflake - using REST API method")

In [None]:
# Question 1: "Show me our riskiest brokers"
print("Asking: 'Show me the top 5 riskiest brokers and their total exposure'\n")

try:
    response = ask_broker_agent_native("Show me the top 5 riskiest brokers and their total exposure")
    print(response)
except:
    # Fallback - query the semantic view directly via Cortex Analyst
    result = session.sql("""
        SELECT broker_name, composite_risk_score, fraud_risk_level, total_factored_amount
        FROM FREIGHT_DEMO.ANALYTICS.BROKER_360 
        ORDER BY composite_risk_score DESC 
        LIMIT 5
    """).collect()
    print("Top 5 Riskiest Brokers:")
    for row in result:
        print(f"  - {row['BROKER_NAME']}: Risk Score {row['COMPOSITE_RISK_SCORE']}, {row['FRAUD_RISK_LEVEL']} risk, ${row['TOTAL_FACTORED_AMOUNT']:,.2f} exposure")

In [None]:
# Question 2: "Any double brokering concerns?"
print("Asking: 'Which brokers have double brokering flags?'\n")

try:
    response = ask_broker_agent_native("Which brokers have double brokering flags? What is our total exposure to potential fraud?")
    print(response)
except:
    result = session.sql("""
        SELECT broker_name, double_broker_flag, total_factored_amount, fraud_risk_level
        FROM FREIGHT_DEMO.ANALYTICS.BROKER_360 
        WHERE double_broker_flag = TRUE
        ORDER BY total_factored_amount DESC
    """).collect()
    
    total_exposure = sum(row['TOTAL_FACTORED_AMOUNT'] for row in result)
    print(f"Found {len(result)} brokers with double-brokering flags")
    print(f"Total exposure to potential fraud: ${total_exposure:,.2f}\n")
    print("Flagged brokers:")
    for row in result[:5]:
        print(f"  - {row['BROKER_NAME']}: ${row['TOTAL_FACTORED_AMOUNT']:,.2f} ({row['FRAUD_RISK_LEVEL']} risk)")

In [None]:
# Question 3: "Texas weather impact"
print("Asking: 'How many Texas brokers affected by severe weather?'\n")

try:
    response = ask_broker_agent_native("How many of our Texas brokers are currently affected by severe weather? What is their combined factoring exposure?")
    print(response)
except:
    result = session.sql("""
        SELECT broker_name, hq_state, current_weather_risk, total_factored_amount
        FROM FREIGHT_DEMO.ANALYTICS.BROKER_360 
        WHERE hq_state = 'TX' AND current_weather_risk IN ('HIGH', 'SEVERE')
        ORDER BY total_factored_amount DESC
    """).collect()
    
    total_exposure = sum(row['TOTAL_FACTORED_AMOUNT'] for row in result)
    print(f"Texas brokers with severe weather risk: {len(result)}")
    print(f"Combined exposure: ${total_exposure:,.2f}\n")
    if result:
        print("Affected brokers:")
        for row in result[:5]:
            print(f"  - {row['BROKER_NAME']}: {row['CURRENT_WEATHER_RISK']} weather, ${row['TOTAL_FACTORED_AMOUNT']:,.2f}")

In [None]:
-- Cortex-powered broker intelligence: Summarize a high-risk broker
WITH high_risk_broker AS (
    SELECT * FROM ANALYTICS.broker_360 
    WHERE fraud_risk_level = 'CRITICAL' 
    LIMIT 1
)
SELECT 
    broker_name,
    composite_risk_score,
    SNOWFLAKE.CORTEX.COMPLETE(
        'mistral-large',
        'Based on this broker profile, provide a 3-sentence risk assessment for the credit team:\n' ||
        'Broker: ' || broker_name || '\n' ||
        'Credit Score: ' || credit_score || '\n' ||
        'Avg Days to Pay: ' || avg_days_to_pay || '\n' ||
        'Double Brokering Flag: ' || double_broker_flag || '\n' ||
        'Disputed Invoices: ' || disputed_invoices || '\n' ||
        'Total Exposure: $' || total_factored_amount
    ) AS ai_risk_assessment
FROM high_risk_broker;

In [None]:
# Broker Risk Dashboard - Visual output for agent queries
import pandas as pd

# Query top risk brokers (same data the agent analyzes)
risk_df = session.sql("""
    SELECT 
        BROKER_NAME,
        COMPOSITE_RISK_SCORE,
        CREDIT_SCORE,
        FRAUD_RISK_LEVEL,
        TOTAL_FACTORED_AMOUNT,
        AVG_DAYS_TO_PAY
    FROM FREIGHT_DEMO.ANALYTICS.BROKER_360
    ORDER BY COMPOSITE_RISK_SCORE DESC
    LIMIT 15
""").to_pandas()

# Display formatted risk dashboard
print("=" * 78)
print("  BROKER RISK DASHBOARD -- Top 15 by Composite Risk Score  ".center(78))
print("=" * 78)
print(f"{'':2} {'Broker':<25} {'Risk':>6} {'Credit':>7} {'Fraud':>10} {'Factored':>12} {'DTP':>6}")
print("-" * 78)
for _, row in risk_df.iterrows():
    indicator = "!!" if row['COMPOSITE_RISK_SCORE'] > 70 else "--" if row['COMPOSITE_RISK_SCORE'] > 40 else "  "
    factored = f"${row['TOTAL_FACTORED_AMOUNT']:,.0f}"
    print(f"{indicator} {row['BROKER_NAME']:<25} {row['COMPOSITE_RISK_SCORE']:>5.0f} {row['CREDIT_SCORE']:>7.0f} {row['FRAUD_RISK_LEVEL']:>10} {factored:>12} {row['AVG_DAYS_TO_PAY']:>5.1f}d")
print("-" * 78)
high_risk = len(risk_df[risk_df['COMPOSITE_RISK_SCORE'] > 70])
print(f"Portfolio Avg Risk: {risk_df['COMPOSITE_RISK_SCORE'].mean():.1f} | "
      f"Critical (>70): {high_risk} | "
      f"Avg Days to Pay: {risk_df['AVG_DAYS_TO_PAY'].mean():.1f}d")
print(f"Total Exposure: ${risk_df['TOTAL_FACTORED_AMOUNT'].sum():,.0f}")
print()
print("Legend: !! = Critical risk (>70)  -- = Elevated risk (40-70)")
print("This is the SAME data the Cortex Agent queries via natural language.")

---

## Phase 6: Source control integration (Git Repository in Snowflake)

All artifacts in this platform are version-controlled and accessible directly from Snowflake — no external CI/CD tooling required.

Every SQL script, notebook, and config file lives in a GitHub repo, and Snowflake reads it natively via a **Git Repository** object.

In [None]:
-- Verify the Git Repository integration is live
SHOW GIT REPOSITORIES IN SCHEMA FREIGHT_DEMO.ANALYTICS;

In [None]:
-- Fetch latest commits from GitHub (idempotent)
ALTER GIT REPOSITORY FREIGHT_DEMO.ANALYTICS.LOADSTAR_REPO FETCH;

-- Browse the repo contents directly from Snowflake
LIST @FREIGHT_DEMO.ANALYTICS.LOADSTAR_REPO/branches/main/;

In [None]:
-- List the SQL deployment scripts versioned in the repo
LIST @FREIGHT_DEMO.ANALYTICS.LOADSTAR_REPO/branches/main/sql/;

-- The notebook you're running right now also lives in the repo
LIST @FREIGHT_DEMO.ANALYTICS.LOADSTAR_REPO/branches/main/notebooks/;

---

## Platform summary

| Capability | Challenge addressed | Snowflake solution |
|------------|--------------------|--------------------|
| Workload isolation | Analytics contention on production databases | Zero-copy clone + dedicated warehouse compute |
| Real-time ingestion | Batch-only data pipelines with stale data | Snowpipe Streaming + Streams + Tasks |
| Unified data model | Conflicting broker data across multiple systems | `broker_360` Dynamic Table (auto-refreshing) |
| Data governance | PII exposure and inconsistent access controls | Horizon masking policies + RBAC + PII tags |
| ML development | Container infrastructure overhead for GPU workloads | Native Notebooks + GPU Compute Pools + Model Registry |
| Production inference | High-latency model serving | SQL UDF from Model Registry (sub-second) |
| Natural language analytics | SQL barrier for business users | Cortex Agent + Semantic View |
| Version control | No source control for Snowflake objects | Git Repository integration |

In [None]:
-- Final summary: All demo objects created
SELECT 'Database' AS object_type, 'FREIGHT_DEMO' AS object_name, 'Isolated demo environment' AS purpose
UNION ALL SELECT 'Schema', 'RAW', 'Simulated Oracle replication + JSON ingestion'
UNION ALL SELECT 'Schema', 'ANALYTICS', 'The Broker Object (Golden Record)'
UNION ALL SELECT 'Schema', 'DS_SANDBOX', 'Zero-Copy Clone for Data Science'
UNION ALL SELECT 'Schema', 'ML', 'Model Registry, UDFs, and ML artifacts'
UNION ALL SELECT 'Dynamic Table', 'broker_360', 'Single Source of Truth - auto-refreshing every 5 min'
UNION ALL SELECT 'Semantic View', 'BROKER_360_SV', 'Business-friendly model for Cortex AI'
UNION ALL SELECT 'Cortex Agent', 'BROKER_AGENT', 'Natural language broker intelligence'
UNION ALL SELECT 'UDF', 'GET_RECOMMENDATION_SCORE()', 'ML-backed load matching for CarrierMatch app'
UNION ALL SELECT 'Stream', 'INVOICE_JSON_STREAM', 'Tracks new JSON ingestion events'
UNION ALL SELECT 'Task', 'SIMULATE_STREAMING_INGESTION', 'Simulates real-time Snowpipe Streaming'
UNION ALL SELECT 'Masking Policy', 'SSN_MASK / BANK_ACCOUNT_MASK', 'PII protection via Snowflake Horizon'
UNION ALL SELECT 'Roles', 'FREIGHT_ANALYST / FREIGHT_DATA_SCIENTIST / FREIGHT_OPS', 'RBAC governance'
UNION ALL SELECT 'Warehouse', 'ANALYTICS_WH (Medium)', 'Production analytics (isolated)'
UNION ALL SELECT 'Warehouse', 'DS_SANDBOX_WH (X-Small)', 'Data Science workloads (isolated)'
UNION ALL SELECT 'Compute Pool', 'GPU_POOL (GPU_NV_S)', 'GPU compute for PyTorch training'
UNION ALL SELECT 'API Integration', 'GIT_API_INTEGRATION', 'GitHub API access for Git Repository'
UNION ALL SELECT 'Git Repository', 'LOADSTAR_REPO', 'Version-controlled demo artifacts from GitHub'
