# üõ°Ô∏è Cybersecurity ML Training & Deployment

This notebook demonstrates advanced machine learning for cybersecurity using **Snowpark ML** and **Snowflake Model Registry**.

## üéØ What This Notebook Does

1. **Isolation Forest**: Detects anomalous user behavior patterns
2. **K-means Clustering**: Groups users into behavioral personas  
3. **Model Registry**: Enterprise-grade model management
4. **UDF Deployment**: Deploy models as scalable functions
5. **Hybrid Analysis**: Combine multiple ML approaches

## üìã Prerequisites

Before running this notebook, ensure you have:
1. ‚úÖ Run `sql/01_cybersecurity_schema.sql` 
2. ‚úÖ Run `sql/02_sample_data_generation.sql`
3. ‚úÖ Run `sql/03_native_ml_and_cortex.sql`

---


## üîß Environment Setup


In [None]:
# Import required libraries
import snowflake.snowpark as snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, lit, when, avg, count, max as max_, min as min_
from snowflake.snowpark.types import StructType, StructField, StringType, FloatType, IntegerType, BooleanType

# Snowpark ML imports
from snowflake.ml.modeling.ensemble import IsolationForest
from snowflake.ml.modeling.cluster import KMeans
from snowflake.ml.modeling.preprocessing import StandardScaler
from snowflake.ml.registry import Registry

# Data science libraries
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import json

print("üì¶ All libraries imported successfully!")


In [None]:
# Initialize Snowflake session (auto-connects in Snowflake Notebooks)
session = snowpark.context.get_active_session()

# Set up database context
session.sql("USE DATABASE CYBERSECURITY_DEMO").collect()
session.sql("USE SCHEMA SECURITY_ANALYTICS").collect()
session.sql("USE WAREHOUSE CYBERSECURITY_WH").collect()

print("üîó Connected to Snowflake session")
print(f"üìä Current database: {session.get_current_database()}")
print(f"üìÅ Current schema: {session.get_current_schema()}")


## üìä Data Preparation for ML

*The ML models require feature engineering to extract behavioral patterns from raw authentication logs.*


In [None]:
# Create feature engineering view for ML models with explicit type casting
feature_query = """
CREATE OR REPLACE VIEW ML_FEATURE_SET AS
SELECT 
    ual.USERNAME,
    
    -- Behavioral features (cast to avoid decimal precision warnings)
    COUNT(*)::INTEGER as TOTAL_LOGINS,
    AVG(CASE WHEN ual.SUCCESS THEN 1 ELSE 0 END)::FLOAT as SUCCESS_RATE,
    COUNT(CASE WHEN ual.SUCCESS = FALSE THEN 1 END)::INTEGER as FAILED_ATTEMPTS,
    
    -- Temporal features
    COUNT(CASE WHEN EXTRACT(hour FROM ual.TIMESTAMP) BETWEEN 22 AND 6 THEN 1 END)::INTEGER as OFF_HOURS_LOGINS,
    COUNT(CASE WHEN EXTRACT(dow FROM ual.TIMESTAMP) IN (0,6) THEN 1 END)::INTEGER as WEEKEND_LOGINS,
    
    -- Geographic features
    COUNT(DISTINCT ual.SOURCE_IP)::INTEGER as UNIQUE_IPS,
    COUNT(DISTINCT ual.LOCATION:country::STRING)::INTEGER as UNIQUE_COUNTRIES,
    
    -- Security features
    AVG(CASE WHEN ual.TWO_FACTOR_USED THEN 1 ELSE 0 END)::FLOAT as TWO_FACTOR_RATE,
    COUNT(DISTINCT ual.USER_AGENT)::INTEGER as UNIQUE_DEVICES,
    
    -- Organizational context
    ed.DEPARTMENT,
    ed.ROLE,
    ed.SECURITY_CLEARANCE,
    DATEDIFF(day, ed.HIRE_DATE, CURRENT_DATE())::INTEGER as TENURE_DAYS
    
FROM USER_AUTHENTICATION_LOGS ual
JOIN EMPLOYEE_DATA ed ON ual.USERNAME = ed.USERNAME
WHERE ual.TIMESTAMP >= DATEADD(day, -45, CURRENT_TIMESTAMP())  -- Extended window for more data
    AND ed.STATUS = 'active'
GROUP BY ual.USERNAME, ed.DEPARTMENT, ed.ROLE, ed.SECURITY_CLEARANCE, ed.HIRE_DATE
HAVING COUNT(*) >= 2  -- Filter users with minimum activity (reduced for demo)
"""

session.sql(feature_query).collect()
print("üîß Feature engineering view created with explicit type casting")

# Load feature data into Snowpark DataFrame
feature_df = session.table('ML_FEATURE_SET')
print(f"üìà Feature dataset: {feature_df.count()} users with behavioral features")

# Additional type casting in Snowpark DataFrame to ensure clean data types
from snowflake.snowpark.types import IntegerType, FloatType

# Cast numerical columns to appropriate types to prevent decimal conversion warnings
feature_df = feature_df.with_column("TOTAL_LOGINS", col("TOTAL_LOGINS").cast(IntegerType())) \
                       .with_column("SUCCESS_RATE", col("SUCCESS_RATE").cast(FloatType())) \
                       .with_column("FAILED_ATTEMPTS", col("FAILED_ATTEMPTS").cast(IntegerType())) \
                       .with_column("OFF_HOURS_LOGINS", col("OFF_HOURS_LOGINS").cast(IntegerType())) \
                       .with_column("WEEKEND_LOGINS", col("WEEKEND_LOGINS").cast(IntegerType())) \
                       .with_column("UNIQUE_IPS", col("UNIQUE_IPS").cast(IntegerType())) \
                       .with_column("UNIQUE_COUNTRIES", col("UNIQUE_COUNTRIES").cast(IntegerType())) \
                       .with_column("TWO_FACTOR_RATE", col("TWO_FACTOR_RATE").cast(FloatType())) \
                       .with_column("UNIQUE_DEVICES", col("UNIQUE_DEVICES").cast(IntegerType())) \
                       .with_column("TENURE_DAYS", col("TENURE_DAYS").cast(IntegerType()))

print("‚úÖ Data types explicitly cast to prevent precision warnings")

# Inspect and display data types to ensure all are correct
print("\nüîç Final data types inspection:")
feature_df.dtypes

# Show sample of features with clean data types
print("\nüìä Sample of clean feature data:")
feature_df.limit(5).show()


## ü§ñ Isolation Forest - Anomaly Detection

*Uses unsupervised learning to identify users with anomalous behavior patterns that could indicate security threats.*


In [None]:
# Alternative Isolation Forest Implementation (Fallback)
# This provides a working implementation if the Snowpark ML version has issues

def create_statistical_anomaly_scores(df, features):
    """
    Create anomaly scores using statistical methods as a fallback
    """
    print("üîÑ Using statistical anomaly detection as fallback...")
    
    # Calculate z-scores for each feature and combine them
    anomaly_df = df
    
    for feature in features:
        # Calculate mean and std for the feature
        stats = df.select(
            avg(col(feature)).alias('mean_val'),
            stddev(col(feature)).alias('std_val')
        ).collect()[0]
        
        mean_val = float(stats['MEAN_VAL']) if stats['MEAN_VAL'] is not None else 0.0
        std_val = float(stats['STD_VAL']) if stats['STD_VAL'] is not None else 1.0
        
        # Calculate z-score for this feature
        anomaly_df = anomaly_df.with_column(
            f'{feature}_ZSCORE',
            abs((col(feature) - lit(mean_val)) / lit(std_val))
        )
    
    # Combine z-scores into an anomaly score
    zscore_cols = [f'{feature}_ZSCORE' for feature in features]
    anomaly_df = anomaly_df.with_column(
        'ISOLATION_ANOMALY_SCORE',
        -(sqrt(sum([col(zs) ** 2 for zs in zscore_cols]) / lit(len(zscore_cols))))
    )
    
    # Clean up temporary columns
    for zs_col in zscore_cols:
        anomaly_df = anomaly_df.drop(zs_col)
    
    print("‚úÖ Statistical anomaly scores calculated!")
    return anomaly_df


In [None]:
# Function to ensure clean data types and prevent decimal warnings
def ensure_clean_datatypes(df, numerical_columns):
    """Ensures all numerical columns have appropriate data types for ML"""
    from snowflake.snowpark.types import IntegerType, FloatType, DecimalType
    
    for col_name in numerical_columns:
        current_type = dict(df.dtypes)[col_name]
        print(f"  {col_name}: {current_type}")
        
        # Convert any remaining decimal types to appropriate types
        if isinstance(current_type, DecimalType):
            # Scaled features (ending with _SCALED) should always be FloatType
            if col_name.endswith('_SCALED') or col_name in ['SUCCESS_RATE', 'TWO_FACTOR_RATE']:
                df = df.with_column(col_name, col(col_name).cast(FloatType()))
                print(f"    ‚Üí Converted {col_name} to FloatType")
            else:
                df = df.with_column(col_name, col(col_name).cast(IntegerType()))
                print(f"    ‚Üí Converted {col_name} to IntegerType")
    
    return df

# Prepare numerical features for Isolation Forest
iso_features = [
    'TOTAL_LOGINS', 'SUCCESS_RATE', 'FAILED_ATTEMPTS', 
    'OFF_HOURS_LOGINS', 'WEEKEND_LOGINS', 'UNIQUE_IPS', 
    'UNIQUE_COUNTRIES', 'TWO_FACTOR_RATE', 'UNIQUE_DEVICES', 'TENURE_DAYS'
]

print("üîß Ensuring clean data types for ML features:")
feature_df = ensure_clean_datatypes(feature_df, iso_features)

# Create Isolation Forest model
# Note: Snowpark ML IsolationForest returns anomaly scores only, not binary flags
isolation_forest = IsolationForest(
    n_estimators=100,
    contamination=0.1,  # Expect ~10% anomalies  
    random_state=42,
    input_cols=iso_features,
    output_cols=['ISOLATION_ANOMALY_SCORE']  # Single output column for anomaly scores
)

print("üå≤ Training Isolation Forest model...")

# Debug: Check input data shape and columns
print(f"üìä Input data shape: {feature_df.count()} rows")
print(f"üìã Input columns: {iso_features}")
print(f"üéØ Expected output columns: {isolation_forest.get_output_cols()}")

# Train the model
isolation_model = isolation_forest.fit(feature_df)

print("‚úÖ Isolation Forest training completed!")
print(f"üîç Model output columns: {isolation_model.get_output_cols()}")

# Apply predictions to the dataset with error handling
print("üîÆ Applying Isolation Forest predictions...")
try:
    feature_df_with_iso = isolation_model.predict(feature_df)
    print("‚úÖ Predictions applied successfully!")
except Exception as e:
    print(f"‚ùå Error during prediction: {str(e)}")
    print("üîß Attempting alternative approaches...")
    
    # Alternative 1: Recreate the model with explicit single output
    try:
        print("üîÑ Attempting approach 1: Recreating Isolation Forest...")
        isolation_forest_fixed = IsolationForest(
            n_estimators=100,
            contamination=0.1,
            random_state=42,
            input_cols=iso_features,
            output_cols=['ANOMALY_SCORE']  # Different column name to avoid conflicts
        )
        
        isolation_model_fixed = isolation_forest_fixed.fit(feature_df)
        feature_df_with_iso = isolation_model_fixed.predict(feature_df)
        
        # Rename the column to match expected name
        feature_df_with_iso = feature_df_with_iso.with_column_renamed('ANOMALY_SCORE', 'ISOLATION_ANOMALY_SCORE')
        print("‚úÖ Alternative approach 1 successful!")
        
    except Exception as e2:
        print(f"‚ùå Approach 1 also failed: {str(e2)}")
        print("üîÑ Attempting approach 2: Statistical anomaly detection...")
        
        # Alternative 2: Use statistical anomaly detection
        feature_df_with_iso = create_statistical_anomaly_scores(feature_df, iso_features)
        print("‚úÖ Statistical anomaly detection successful!")

# Create binary anomaly flag from anomaly scores
# Isolation Forest returns negative scores for anomalies (lower scores = more anomalous)
feature_df_with_iso = feature_df_with_iso.with_column(
    'ISOLATION_IS_ANOMALY', 
    when(col('ISOLATION_ANOMALY_SCORE') < -0.1, lit(True)).otherwise(lit(False))
)

print("üîç Anomaly detection results:")
anomaly_summary = feature_df_with_iso.group_by('ISOLATION_IS_ANOMALY').agg(count(lit(1)).alias('COUNT'))
anomaly_summary.show()

# Show distribution of anomaly scores
print("\nüìä Anomaly score distribution:")
score_stats = feature_df_with_iso.select(
    min_('ISOLATION_ANOMALY_SCORE').alias('MIN_SCORE'),
    avg('ISOLATION_ANOMALY_SCORE').alias('AVG_SCORE'), 
    max_('ISOLATION_ANOMALY_SCORE').alias('MAX_SCORE')
)
score_stats.show()

# Show sample anomalies
print("\nüö® Sample anomalous users:")
anomalies = feature_df_with_iso.filter(col('ISOLATION_IS_ANOMALY') == True).select(
    'USERNAME', 'DEPARTMENT', 'TOTAL_LOGINS', 'SUCCESS_RATE', 
    'UNIQUE_COUNTRIES', 'ISOLATION_ANOMALY_SCORE'
).limit(10)
anomalies.show()


In [None]:
## üîç Data Validation & Troubleshooting

*Validate that we have sufficient data for ML training before proceeding with clustering.*


In [None]:
# Data validation before ML training
print("üîç Validating data availability for ML training...")

# Check feature dataset size
feature_count = feature_df_with_iso.count()
print(f"üìä Total users with features: {feature_count}")

# Check data distribution
if feature_count == 0:
    print("‚ùå ERROR: No users found in feature dataset!")
    print("üîß Troubleshooting steps:")
    print("   1. Verify that sample data generation (02_sample_data_generation.sql) was run")
    print("   2. Check that authentication logs have recent timestamps")
    print("   3. Ensure employee data has active users")
    raise Exception("Insufficient data for ML training")
elif feature_count < 4:
    print(f"‚ö†Ô∏è  WARNING: Only {feature_count} users available for clustering")
    print("üîß Consider running with more sample data for better clustering results")
else:
    print(f"‚úÖ Sufficient data available: {feature_count} users")

# Show sample of feature data for verification
print("\nüìã Sample feature data:")
feature_df_with_iso.select(
    'USERNAME', 'DEPARTMENT', 'TOTAL_LOGINS', 'SUCCESS_RATE', 
    'UNIQUE_COUNTRIES', 'ISOLATION_IS_ANOMALY'
).show(5)


## üë• K-means Clustering - User Personas

*Groups users into behavioral clusters to understand normal patterns and identify deviations from expected behavior.*


In [None]:
# Prepare features for clustering (normalize for better clustering)
cluster_features = [
    'TOTAL_LOGINS', 'SUCCESS_RATE', 'OFF_HOURS_LOGINS', 
    'WEEKEND_LOGINS', 'UNIQUE_IPS', 'UNIQUE_COUNTRIES', 'TWO_FACTOR_RATE'
]

print("üîß Ensuring clean data types for clustering features:")
feature_df_with_iso = ensure_clean_datatypes(feature_df_with_iso, cluster_features)

# Scale features for clustering
scaler = StandardScaler(
    input_cols=cluster_features,
    output_cols=[f"{col}_SCALED" for col in cluster_features]
)

scaled_df = scaler.fit(feature_df_with_iso).transform(feature_df_with_iso)

# Fix decimal precision warnings from StandardScaler output
print("üîß Ensuring clean data types for scaled features:")
scaled_feature_cols = [f"{col}_SCALED" for col in cluster_features]
scaled_df = ensure_clean_datatypes(scaled_df, scaled_feature_cols)

# Create K-means clustering model with dynamic cluster count
# First check the number of samples to ensure we have enough data
sample_count = scaled_df.count()
print(f"üìä Sample count for clustering: {sample_count}")

# Dynamically adjust cluster count based on available data
# Rule: need at least 2 samples per cluster for meaningful clustering
max_clusters = max(1, min(4, sample_count // 2))
actual_clusters = max_clusters if sample_count >= 4 else min(sample_count, 2)

print(f"üéØ Using {actual_clusters} clusters for {sample_count} samples")

kmeans = KMeans(
    n_clusters=actual_clusters,  # Dynamic clusters based on data availability
    random_state=42,
    input_cols=[f"{col}_SCALED" for col in cluster_features],
    output_cols=['CLUSTER_LABEL', 'CLUSTER_DISTANCE']
)

print("üîÑ Training K-means clustering model...")

# Train the clustering model
kmeans_model = kmeans.fit(scaled_df)

print("‚úÖ K-means clustering completed!")

# Apply clustering
final_df = kmeans_model.predict(scaled_df)

print("üë• User cluster distribution:")

# Apply type casting to prevent decimal precision warnings in aggregations
print("üîß Ensuring clean data types for cluster analysis...")
final_df_clean = final_df.with_column('TOTAL_LOGINS', col('TOTAL_LOGINS').cast(IntegerType())) \
                         .with_column('SUCCESS_RATE', col('SUCCESS_RATE').cast(FloatType())) \
                         .with_column('UNIQUE_COUNTRIES', col('UNIQUE_COUNTRIES').cast(IntegerType()))

cluster_summary = final_df_clean.group_by('CLUSTER_LABEL').agg(
    count(lit(1)).alias('USER_COUNT'),
    avg('TOTAL_LOGINS').alias('AVG_LOGINS'),
    avg('SUCCESS_RATE').alias('AVG_SUCCESS_RATE'),
    avg('UNIQUE_COUNTRIES').alias('AVG_COUNTRIES')
)
cluster_summary.show()

# Analyze cluster characteristics
print("\nüìä Cluster characteristics by department:")

# Get actual cluster labels from the data
cluster_labels = [str(i) for i in range(actual_clusters)]
print(f"üìã Cluster labels: {cluster_labels}")

if actual_clusters > 1:
    # Only do pivot analysis if we have multiple clusters
    agg_dict = {str(i): 'sum' for i in range(actual_clusters)}
    
    dept_clusters = final_df_clean.group_by('DEPARTMENT', 'CLUSTER_LABEL').agg(
        count(lit(1)).alias('COUNT')
    ).pivot('CLUSTER_LABEL', cluster_labels).agg(agg_dict).fillna(0)
    dept_clusters.show()
else:
    print("‚ö†Ô∏è Only one cluster created - no department pivot analysis available")
    simple_dept = final_df_clean.group_by('DEPARTMENT').agg(count(lit(1)).alias('USER_COUNT'))
    simple_dept.show()


## üìä Model Registry - Enterprise ML Management

*Deploy trained models to Snowflake Model Registry for enterprise-grade model lifecycle management.*


In [None]:
# Initialize Model Registry
registry = Registry(session=session, database_name="CYBERSECURITY_DEMO", schema_name="SECURITY_ANALYTICS")

print("üìã Registering models in Snowflake Model Registry...")

# Register Isolation Forest model
iso_model_ref = registry.log_model(
    model=isolation_model,
    model_name="cybersecurity_isolation_forest",
    version_name="v1.0",
    comment="Isolation Forest for detecting anomalous user behavior patterns",
    tags={"use_case": "anomaly_detection", "model_type": "isolation_forest", "department": "security"}
)

print(f"‚úÖ Isolation Forest registered: {iso_model_ref.fully_qualified_model_name}")

# Register K-means model  
kmeans_model_ref = registry.log_model(
    model=kmeans_model,
    model_name="cybersecurity_user_clustering", 
    version_name="v1.0",
    comment="K-means clustering for user behavioral personas",
    tags={"use_case": "user_clustering", "model_type": "kmeans", "department": "security"}
)

print(f"‚úÖ K-means model registered: {kmeans_model_ref.fully_qualified_model_name}")

# Register feature scaler
scaler_ref = registry.log_model(
    model=scaler,
    model_name="cybersecurity_feature_scaler",
    version_name="v1.0", 
    comment="StandardScaler for normalizing cybersecurity features",
    tags={"use_case": "preprocessing", "model_type": "scaler", "department": "security"}
)

print(f"‚úÖ Feature scaler registered: {scaler_ref.fully_qualified_model_name}")

# List all registered models
print("\nüìö All models in registry:")
models = registry.list_models()
for model in models:
    print(f"  ü§ñ {model}")

print("\nüéØ Model Registry deployment completed!")


## ‚ö° UDF Deployment - Production ML Functions

*Deploy models as SQL UDFs for real-time scoring in production workloads.*


In [None]:
# Deploy Isolation Forest as UDF for real-time anomaly detection
iso_udf = iso_model_ref.run(
    X=session.table('ML_FEATURE_SET').select(iso_features),
    function_name="DETECT_USER_ANOMALIES"
)

print("üö® Isolation Forest UDF deployed: DETECT_USER_ANOMALIES")

# Deploy K-means clustering as UDF for user segmentation  
cluster_udf = kmeans_model_ref.run(
    X=scaler_ref.run(session.table('ML_FEATURE_SET').select(cluster_features)),
    function_name="CLASSIFY_USER_BEHAVIOR"
)

print("üë• K-means clustering UDF deployed: CLASSIFY_USER_BEHAVIOR")

# Test the deployed UDFs
print("\nüß™ Testing deployed UDFs...")

# Test anomaly detection UDF
anomaly_test = session.sql("""
    SELECT 
        USERNAME,
        DEPARTMENT,
        TOTAL_LOGINS,
        DETECT_USER_ANOMALIES(
            TOTAL_LOGINS, SUCCESS_RATE, FAILED_ATTEMPTS, OFF_HOURS_LOGINS,
            WEEKEND_LOGINS, UNIQUE_IPS, UNIQUE_COUNTRIES, TWO_FACTOR_RATE,
            UNIQUE_DEVICES, TENURE_DAYS
        ) as ANOMALY_PREDICTION
    FROM ML_FEATURE_SET
    LIMIT 5
""")

print("üîç Anomaly Detection UDF test:")
anomaly_test.show()

# Test clustering UDF  
cluster_test = session.sql("""
    SELECT 
        USERNAME,
        DEPARTMENT, 
        CLASSIFY_USER_BEHAVIOR(
            TOTAL_LOGINS, SUCCESS_RATE, OFF_HOURS_LOGINS,
            WEEKEND_LOGINS, UNIQUE_IPS, UNIQUE_COUNTRIES, TWO_FACTOR_RATE
        ) as CLUSTER_PREDICTION
    FROM ML_FEATURE_SET
    LIMIT 5
""")

print("\nüë• User Clustering UDF test:")
cluster_test.show()

print("\n‚úÖ UDF deployment and testing completed!")


## üîÑ Update Snowpark ML Results Tables

*Populate the ML results tables that the Streamlit app uses for hybrid analysis.*


In [None]:
# Create Snowpark ML anomaly results table
print("üìä Creating SNOWPARK_ML_ANOMALY_RESULTS table...")

snowpark_anomaly_query = """
CREATE OR REPLACE TABLE SNOWPARK_ML_ANOMALY_RESULTS AS
SELECT 
    USERNAME,
    ISOLATION_ANOMALY_SCORE,
    ISOLATION_IS_ANOMALY,
    CURRENT_TIMESTAMP() as ANALYSIS_TIMESTAMP
FROM ML_FEATURE_SET f
JOIN ({}) ml ON f.USERNAME = ml.USERNAME
""".format(
    final_df.select('USERNAME', 'ISOLATION_ANOMALY_SCORE', 'ISOLATION_IS_ANOMALY').queries['queries'][0]
)

session.sql(snowpark_anomaly_query).collect()

# Create Snowpark ML clustering results table
print("üë• Creating SNOWPARK_ML_USER_CLUSTERS table...")

snowpark_cluster_query = """
CREATE OR REPLACE TABLE SNOWPARK_ML_USER_CLUSTERS AS
SELECT 
    f.USERNAME,
    f.DEPARTMENT,
    f.TOTAL_LOGINS,
    f.SUCCESS_RATE,
    f.UNIQUE_COUNTRIES,
    f.WEEKEND_LOGINS,
    ml.CLUSTER_LABEL,
    ml.CLUSTER_DISTANCE,
    CURRENT_TIMESTAMP() as ANALYSIS_TIMESTAMP
FROM ML_FEATURE_SET f
JOIN ({}) ml ON f.USERNAME = ml.USERNAME
""".format(
    final_df.select('USERNAME', 'CLUSTER_LABEL', 'CLUSTER_DISTANCE').queries['queries'][0]
)

session.sql(snowpark_cluster_query).collect()

# Update the ML_MODEL_COMPARISON view with real Snowpark ML results
print("üîÑ Updating ML_MODEL_COMPARISON view...")

update_comparison_view = """
CREATE OR REPLACE VIEW ML_MODEL_COMPARISON AS
SELECT
    n.USERNAME,
    ed.DEPARTMENT,
    ed.ROLE,
    CURRENT_TIMESTAMP() as ANALYSIS_DATE,

    -- Native ML Results 
    COALESCE(n.IS_ANOMALY, FALSE) as NATIVE_IS_ANOMALY,
    COALESCE(n.FORECAST, 0) as NATIVE_ANOMALY_SCORE,

    -- Snowpark ML Results (now with real data!)
    COALESCE(s.ISOLATION_IS_ANOMALY, FALSE) as ISOLATION_FOREST_ANOMALY,
    COALESCE(s.ISOLATION_ANOMALY_SCORE, 0.0) as ISOLATION_FOREST_SCORE,
    COALESCE(c.CLUSTER_LABEL, 0) as CLUSTER_LABEL,
    COALESCE(c.CLUSTER_DISTANCE, 0.0) as CLUSTER_DISTANCE,

    -- Enhanced Risk Assessment using both models
    CASE
        WHEN COALESCE(n.IS_ANOMALY, FALSE) = TRUE AND COALESCE(s.ISOLATION_IS_ANOMALY, FALSE) = TRUE THEN 'CRITICAL'
        WHEN COALESCE(n.IS_ANOMALY, FALSE) = TRUE OR COALESCE(s.ISOLATION_IS_ANOMALY, FALSE) = TRUE THEN 'HIGH'
        WHEN n.FORECAST IS NOT NULL OR s.ISOLATION_ANOMALY_SCORE > 0.5 THEN 'MEDIUM'
        ELSE 'LOW'
    END as RISK_LEVEL,

    -- Model agreement analysis
    CASE
        WHEN COALESCE(n.IS_ANOMALY, FALSE) = TRUE AND COALESCE(s.ISOLATION_IS_ANOMALY, FALSE) = TRUE THEN 'BOTH_AGREE_ANOMALY'
        WHEN COALESCE(n.IS_ANOMALY, FALSE) = FALSE AND COALESCE(s.ISOLATION_IS_ANOMALY, FALSE) = FALSE THEN 'BOTH_AGREE_NORMAL'
        WHEN COALESCE(n.IS_ANOMALY, FALSE) = TRUE THEN 'NATIVE_ONLY'
        WHEN COALESCE(s.ISOLATION_IS_ANOMALY, FALSE) = TRUE THEN 'SNOWPARK_ONLY'
        ELSE 'INSUFFICIENT_DATA'
    END as MODEL_AGREEMENT

FROM NATIVE_ML_ANOMALY_RESULTS n
FULL OUTER JOIN SNOWPARK_ML_ANOMALY_RESULTS s ON n.USERNAME = s.USERNAME  
FULL OUTER JOIN SNOWPARK_ML_USER_CLUSTERS c ON COALESCE(n.USERNAME, s.USERNAME) = c.USERNAME
JOIN EMPLOYEE_DATA ed ON COALESCE(n.USERNAME, s.USERNAME, c.USERNAME) = ed.USERNAME
WHERE ed.STATUS = 'active'
"""

session.sql(update_comparison_view).collect()

print("‚úÖ Snowpark ML tables created and ML_MODEL_COMPARISON view updated!")
print("üéØ The Streamlit app now has access to real ML predictions!")

# Show summary statistics
print("\nüìà Summary of ML Results:")

summary_stats = session.sql("""
    SELECT 
        COUNT(*) as TOTAL_USERS,
        SUM(CASE WHEN NATIVE_IS_ANOMALY THEN 1 ELSE 0 END) as NATIVE_ANOMALIES,
        SUM(CASE WHEN ISOLATION_FOREST_ANOMALY THEN 1 ELSE 0 END) as SNOWPARK_ANOMALIES,
        SUM(CASE WHEN RISK_LEVEL = 'CRITICAL' THEN 1 ELSE 0 END) as CRITICAL_RISK,
        SUM(CASE WHEN MODEL_AGREEMENT = 'BOTH_AGREE_ANOMALY' THEN 1 ELSE 0 END) as MODELS_AGREE
    FROM ML_MODEL_COMPARISON
""")

summary_stats.show()
