## End to End ML In Snowflake 

#### Data Analysis & Preparation

- Perform exploratory data analysis (EDA) on transaction data
- Engineer fraud detection features using Snowpark & Cortex
- Utilize Feature Store to:
    - Track engineered fraud indicators
    - Store feature definitions for reproducible fraud detection signals

#### Model Development

- Train multiple fraud detection models:
    - SnowML XGBoost with tree booster
    - SnowML XGBoost with linear booster
    - Multiple scikit-learn classification models

- Register all models in Snowflake model registry
- Explore registry capabilities:
    - Metadata tracking
    - Inference for fraud predictions
    - Explainability of fraud determinations

#### Model Evaluation & Monitoring

- Configure Model Monitor to track 1 year of fraud predictions against confirmed fraud cases
- Compute key performance metrics:
    - F1 score (balance between precision and recall)
    - Precision (minimize false positives)
    -  Recall (capture all actual fraud)
- Analyze model drift (track changes in fraud detection patterns day-to-day)
-  Compare models side-by-side to determine best production candidate
- Identify and address data quality issues in fraud detection pipeline


#### Inference 
    - Inference on Warehouse 
    - Inference on SPCS

#### Lineage & Governance

- Track comprehensive data and model lineage throughout the fraud detection system
- Maintain visibility into:
    - Origin of data used for computed fraud indicators
    - Datasets used for fraud model training
    - Available fraud detection model versions under monitoring

#### Deployment

- Create Streamlit application for fraud analysts to make informed decisions about flagged transactions leveraging cortex analyst on predictions


In [None]:
-- CREATE OR REPLACE NETWORK RULE allow_all_rule
-- MODE = 'EGRESS'
-- TYPE = 'HOST_PORT'
-- VALUE_LIST = ('0.0.0.0:443','0.0.0.0:80');
-- CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION allow_all_integration
-- ALLOWED_NETWORK_RULES = (allow_all_rule)
-- ENABLED = true;

-- GRANT USAGE ON INTEGRATION allow_all_integration TO ROLE PUBLIC;
--!pip install shap

In [None]:
# Standard Python Libraries
import sys
import json
import warnings
from datetime import timedelta

# Data Manipulation and Analysis
import pandas as pd
import numpy as np

# Data Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Machine Learning
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from xgboost import XGBClassifier
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from snowflake.ml.modeling.xgboost import XGBRegressor, XGBClassifier

# Snowpark Core
from snowflake.snowpark import Session, DataFrame
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.version import VERSION
import snowflake.snowpark.functions as F
from snowflake.snowpark.exceptions import SnowparkSessionException
from snowflake.snowpark.functions import (sproc, col, dayname, 
                              to_timestamp,min, max,split
)


from snowflake.snowpark import types as T
from snowflake.snowpark.window import Window

# Snowpark ML
from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.ml.modeling.preprocessing import OrdinalEncoder, OneHotEncoder
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error
from snowflake.ml.registry import Registry

# Snowflake Feature Store
from snowflake.ml.feature_store import (
    FeatureStore, FeatureView, Entity, CreationMode, setup_feature_store
)

# Snowflake Task API
from snowflake.core import Root
from snowflake.core.database import Database
from snowflake.core.schema import Schema
from snowflake.core.warehouse import Warehouse
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.core._common import CreateMode

# Streamlit
import streamlit as st

# Suppress warnings
warnings.filterwarnings("ignore")


from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Create the Model Registry and register your initial model
from snowflake.ml.registry import Registry


In [None]:
# Create Snowflake Session object
session = get_active_session()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('SELECT current_user(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('\nConnection Established with the following parameters:')
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))


### Data Generation for Fraud Detection

*   **Purpose:** Generates synthetic data for fraud detection.
*   **Tables:** Creates two tables: `transactions` and `customer_complaints`.
*   **`transactions` Table:**
    *   Simulates 10,000 financial transactions.
    *   Includes randomized transaction amounts, merchant categories, device types, locations, and IP addresses.
    *   Marks 15% of transactions as fraudulent.
*   **`customer_complaints` Table:**
    *   Simulates 2,000 customer-reported issues.
    *   Includes complaint text, sentiment scores, and keywords.
*   **Relationships:**
    *   Tables are linked by `customer_id`.
    *   Fraud keywords in complaints map to fraudulent transactions.
*   **Use Cases:** Supports machine learning, anomaly detection, and temporal analysis for fraud detection.


[Data Generation Script](src/datagneration.sql)

In [None]:
select * from transactions limit 2;

In [None]:
select * from customer_complaints limit 2;

In [None]:
# Create the table using SQL
session.sql("""
CREATE OR REPLACE TABLE fraud_analysis AS
SELECT 
    t.transaction_id, 
    t.customer_id, 
    t.transaction_amount, 
    t.is_fraud, 
    t.merchant_category,
    t.device_type,
    t.location,
    t.transaction_time,
    c.complaint_text, 
    c.keywords,
    c.complaint_time
FROM transactions t
LEFT JOIN customer_complaints c
ON t.customer_id = c.customer_id
""").collect()

# Create a Snowpark DataFrame from the newly created table
df = session.table("fraud_analysis")


In [None]:
select * from fraud_analysis limit 2;

In [None]:
import pandas as pd
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, count

# 1. Basic Data Overview
def basic_data_overview(df):
    print("Dataset Shape:")
    row_count = df.count()
    col_count = len(df.columns)
    print(f"Rows: {row_count}, Columns: {col_count}")
    
    print("\nColumn Names:")
    print(df.columns)
    
    print("\nData Types:")
    for col_name, col_type in df.dtypes:
        print(f"{col_name}: {col_type}")
    
    # Sample data
    print("\nSample Data:")
    print(df.limit(5).toPandas())
    
    return row_count

# 2. Missing Values Analysis
def missing_values_analysis(df, row_count):
    print("Missing Values Analysis:")
    
    missing_counts = {}
    for column in df.columns:
        missing_count = df.filter(col(column).isNull()).count()
        missing_counts[column] = missing_count
    
    missing_df = pd.DataFrame({
        'Column': list(missing_counts.keys()),
        'Missing Count': list(missing_counts.values())
    })
    missing_df['Missing Percentage'] = (missing_df['Missing Count'] / row_count) * 100
    
    return missing_df.sort_values('Missing Percentage', ascending=False)

# 3. Categorical Features Analysis
def categorical_features_analysis(df, row_count):
    print("Categorical Features Analysis:")
    
    # Use uppercase column names to match your dataset
    categorical_cols = ["MERCHANT_CATEGORY", "DEVICE_TYPE", "LOCATION"]
    
    categorical_stats = {}
    for cat_col in categorical_cols:
        # Distribution by category
        cat_dist = df.groupBy(cat_col).count().withColumn(
            "percentage", col("count") / row_count * 100
        ).orderBy("count", ascending=False).toPandas()
        
        # Fraud rate by category
        fraud_by_cat = df.groupBy(cat_col, "IS_FRAUD").count().toPandas()
        
        categorical_stats[cat_col] = {
            "distribution": cat_dist,
            "fraud_rate": fraud_by_cat
        }
    
    return categorical_stats

# 4. Correlation Analysis
def correlation_analysis(df):
    print("Correlation Analysis:")
    
    # Convert the Snowpark DataFrame to pandas for correlation analysis
    # This will only work for a reasonable sized dataset
    # For large datasets, you would need to compute correlations in Snowflake
    try:
        # Use uppercase column names
        pdf = df.select("TRANSACTION_AMOUNT", "IS_FRAUD").toPandas()
        pdf["IS_FRAUD"] = pdf["IS_FRAUD"].astype(int)
        
        # Add other numeric columns as needed
        
        corr_matrix = pdf.corr()
        return corr_matrix
    except Exception as e:
        print(f"Error in correlation analysis: {e}")
        print("For large datasets, compute correlations directly in Snowflake")
        return None

# Main function to run the EDA
def run_simplified_fraud_eda(df):
    print("=== Simplified Fraud Detection EDA ===")
    
    #Get basic data overview
    row_count = basic_data_overview(df)
    
    # Analyze missing values
    missing_df = missing_values_analysis(df, row_count)
    print("\nMissing Values:")
    print(missing_df)

   # Analyze correlations
    corr_matrix = correlation_analysis(df)
    if corr_matrix is not None:
        print("\nCorrelation Matrix:")
        print(corr_matrix)
    
    # Generate summary
    print("\nEDA Summary Report:")
    print(f"- Dataset has {row_count} transactions")
    
    # Calculate fraud rate (use uppercase IS_FRAUD)
    fraud_count = df.filter(col("IS_FRAUD") == True).count()
    fraud_rate = (fraud_count / row_count) * 100 if row_count > 0 else 0
    print(f"- Overall fraud rate: {fraud_rate:.2f}%")
    
    # Return all analysis results
    return {
        "row_count": row_count,
        "missing_values": missing_df,
        "correlation": corr_matrix,
        "fraud_rate": fraud_rate
    }

# Save key insights to a table
def save_eda_summary(session, df, eda_results):
    # Calculate basic fraud statistics
    fraud_count = df.filter(col("IS_FRAUD") == True).count()
    
    # Create a summary table
    session.sql("""
    CREATE OR REPLACE TABLE fraud_eda_summary AS
    SELECT
        CURRENT_TIMESTAMP() AS analysis_time,
        {} AS total_transactions,
        {} AS fraud_transactions,
        {:.2f} AS fraud_rate_percent
    """.format(eda_results["row_count"], fraud_count, eda_results["fraud_rate"])).collect()
    
    print("Basic EDA summary saved to table: fraud_eda_summary")

# Example usage:
# df = session.table("fraud_analysis")
# eda_results = run_simplified_fraud_eda(df)
# save_eda_summary(session, df, eda_results)

### Fraud Detection EDA Results Summary

#### Dataset Overview
- 13,620 transactions with 11 columns
- Key columns include transaction details, customer information, and complaint data

#### Missing Data Patterns
- Complaint-related fields (COMPLAINT_TEXT, COMPLAINT_TIME, KEYWORDS) missing for 26.37% of transactions
- All transaction and customer core data fields are complete (0% missing)

#### Fraud Statistics
- Overall fraud rate: 10.16% of transactions

#### Correlation Analysis
- Very weak negative correlation (-0.011) between transaction amount and fraud
- This suggests fraud occurs across various transaction sizes rather than being concentrated in larger or smaller transactions

#### Data Completeness
- Transaction core data is complete
- Missing complaint data likely represents transactions without customer complaints

In [None]:
eda_results = run_simplified_fraud_eda(df)

### Feature Engineering Summary

#### New Features Created

* **TRANSACTION_TIME**
  * Transforms the transaction timestamp string into a proper timestamp data type
  * Enables date/time operations and analysis
  * Essential for time-based fraud pattern detection

* **SENTIMENT_SCORE**
  * Leverages Snowflake's built-in Cortex NLP capabilities
  * Analyzes the sentiment of customer complaint text
  * Negative sentiment may correlate with fraudulent transactions
  * Provides a numeric score that can be used in fraud detection models

* **TRANSACTION_DAY**
  * Extracts the day name (Monday, Tuesday, etc.) from transaction timestamp
  * Allows for analysis of fraud patterns by day of week
  * Helps identify if certain days have higher fraud rates

### Implementation Method
* Uses Snowflake Snowpark API's dictionary-based feature transformation
* Applies all transformations in a single dataframe operation
* More efficient than sequential column additions

In [None]:
#Create a dict with keys for feature names and values containing transform code
from snowflake.snowpark.functions import call_udf
feature_eng_dict = dict()
feature_eng_dict["TRANSACTION_TIME"] = to_timestamp("TRANSACTION_TIME")
feature_eng_dict["SENTIMENT_SCORE"] = call_udf("SNOWFLAKE.CORTEX.SENTIMENT", col("complaint_text"))
feature_eng_dict["TRANSACTION_DAY"] = dayname(col("transaction_time"))
df = df.with_columns(feature_eng_dict.keys(), feature_eng_dict.values())

In [None]:
df.show(2)

### Feature Engineering & Store Initialization Summary

#### Feature Store Setup
* **Initialization Mode**  
  - Use `CREATE_IF_NOT_EXIST` for first-time setup:  
    ```
    from snowflake.ml.feature_store import FeatureStore
    fs = FeatureStore(mode=FeatureStore.Mode.CREATE_IF_NOT_EXIST)
    ```
  - Subsequent connections use `FAIL_IF_NOT_EXIST` mode
  - Requires pre-existing database (Feature Store won't create it)

* **Core Components**  
  - **Entities:** Define core business objects (e.g., `Customer`, `Transaction`)  
  - **Feature Definitions:** Create reusable feature transformations  
  - **Metadata:** Track feature lineage and versioning

#### Behavioral Metrics Generation
* **Data Sources**  
  - Primary table: `CREDITCARD_TRANSACTIONS`
  - Supplemental tables: User activity logs

* **Key Transformations**  


In [None]:
fs = FeatureStore(
    session=session, 
    database=session.get_current_database(), 
    name=session.get_current_schema(), 
    default_warehouse=session.get_current_warehouse(),
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST
)

In [None]:
df.select(min('TRANSACTION_TIME'), max('TRANSACTION_TIME'))

In [None]:
df.explain()

### Feature Store Entity Registration (Concise)

*   **Entities Defined:** `CUSTOMER` 
*   **Purpose:** Represent key data objects for feature lookup.
*   **`CUSTOMER` Entity:**
    *   `primary_keys`: `CUSTOMER_ID` (string)
    *   Links customer data across tables.
*   **Implementation:**
    *   Register entities with `fs.register_entity()`.
*   **Impact:** Enables consistent feature access & lineage tracking.


In [None]:
# First try to retrieve an existing entity definition, if not define a new one and register
try:
    # Retrieve existing entity
    customer_entity = fs.get_entity('CUSTOMER_ID_ENTITY') 
    print('Retrieved existing entity')
except:
    # Define new entity
    customer_entity = Entity(
        name = "CUSTOMER_ID_ENTITY",
        join_keys = ["CUSTOMER_ID"],
        desc = "Features defined on a per customer level")
    
    # Register
    fs.register_entity(customer_entity)
    print("Registered new entity")


In [None]:
fs.list_entities()

In [None]:
#Create a dataframe with just the ID, timestamp, and engineered features. We will use this to define our feature view
feature_df = df.select(["CUSTOMER_ID"]+list(feature_eng_dict.keys()))
feature_df.show(5)

### Feature View Summary: FRAUD_FEATURES

*   **Purpose**: Group logically-related customer features for streamlined access.

*   **Key Components**:

    *   `name`: "FRAUD\_FEATURES"
    *   `entities`: \[`customer_entity`] (links to customer data)
    *   `feature_df`:  Snowpark DataFrame containing feature generation logic.
    *   `timestamp_col`: "TRANSACTION\_TIME" (for time-series data)

*   **Workflow**:

    1.  Define a Snowpark DataFrame (`feature_df`) with the desired feature transformations.
    2.  Create a `FeatureView` instance.
    3.  Register the `FeatureView` using `fs.register_feature_view()`.

*   **Configuration**:

    *   `version`: `"1"` (initial version)
    *   `overwrite`: `True` (allows overwriting existing versions)

*   **Impact**:

    *   Provides a consistent and versioned interface for retrieving customer fraud features.
    *   Simplifies feature retrieval for machine learning models.

* **Requirement**

    * DataFrame should contain join keys column for entity and timestamp column for time-based data.




In [None]:
#define and register feature view
fraud_fv = FeatureView(
    name="FRAUD_FEATURES",
    entities=[customer_entity],
    feature_df=feature_df,
    timestamp_col="TRANSACTION_TIME")

fraud_fv = fs.register_feature_view(fraud_fv, version="1", overwrite=True)

In [None]:
fraud_fv

In [None]:
fs.list_feature_views()

### Training Dataset Generation Summary

*   **Workflow Completion:** Marks the completion of database object setup and Feature Store Producer workflow.

*   **Data Availability:** Indicates that generated data and features are ready for consumption (with appropriate privileges).

*   **Dataset Generation:** Highlights the transition to generating a training dataset.

*   **Spine DataFrame:**
    *   Defines a "spine" DataFrame.
    *   Serves as a request template for the dataset.
    *   Specifies entities, labels, and timestamps.

*   **FeatureStore.generate\_dataset():**
    *   Employs `FeatureStore.generate_dataset()` to create the training set.
    *   Utilizes Feature Views.

*   **AS-OF Join:**
    *   Feature Store attaches feature values along the spine using an AS-OF join.
    *   Efficiently combines and serves relevant, point-in-time correct feature data.


In [None]:
ds = fs.generate_dataset(
    name="FRAUD_DETECTION_DATASET_V1",
    spine_df=df.drop("SENTIMENT_SCORE", "complaint_text",
 "TRANSACTION_DAY","KEYWORDS","COMPLAINT_TIME"),
    features=[fraud_fv],
    spine_timestamp_col="TRANSACTION_TIME",
    spine_label_cols=["IS_FRAUD"]
)

In [None]:
ds_sp = ds.read.to_snowpark_dataframe()
ds_sp.show(5)

#### Feature Engineering: One-Hot Encoding Summary

*   **Objective**: Convert categorical string columns into numerical representations suitable for machine learning.

*   **Implementation**: Utilizes Snowflake ML's `OneHotEncoder`.

*   **Steps:**

    1.  **Identify Categorical Columns:**
        *   Select columns with `StringType` (excluding `CUSTOMER_ID` and `TRANSACTION_ID`).
        *   `OHE_COLS` list stores names of categorical columns.

    2.  **Create Output Column Names:**
        *   `OHE_POST_COLS` list generates new column names by appending "\_OHE" to original names.

    3.  **Instantiate OneHotEncoder:**
        *   `snowml_ohe = snowml.OneHotEncoder(input_cols=OHE_COLS, output_cols=OHE_POST_COLS, drop_input_cols=True)`
        *   `input_cols`: Specifies columns to encode.
        *   `output_cols`: Specifies names for encoded columns.
        *   `drop_input_cols=True`: Removes original categorical columns.

    4.  **Fit and Transform:**
        *   `ds_sp_ohe = snowml_ohe.fit(ds_sp).transform(ds_sp)`
        *   `fit()`: Learns the unique categories from the input data.
        *   `transform()`: Applies the encoding to create new numerical columns.

    5.  **Result:**
        *   `ds_sp_ohe`: New DataFrame with one-hot encoded columns.
        *   Original string columns are replaced by numerical representations.

*   **Benefits:**

    *   Enables machine learning algorithms to process categorical data effectively.
    *   Avoids imposing ordinal relationships on categorical features.


In [None]:
from snowflake.snowpark.types import StringType
import snowflake.ml.modeling.preprocessing as snowml

# Select categorical columns (columns with StringType), excluding CUSTOMER_ID and TRANSACTION_ID
OHE_COLS = [col.name for col in ds_sp.schema.fields
            if isinstance(col.datatype, StringType)
            and col.name not in ('CUSTOMER_ID', 'TRANSACTION_ID')]

# Create output column names for one-hot encoding
OHE_POST_COLS = [i + "_OHE" for i in OHE_COLS]

# Encode categorical columns to numeric columns using OneHotEncoder
snowml_ohe = snowml.OneHotEncoder(input_cols=OHE_COLS, output_cols=OHE_POST_COLS, drop_input_cols=True)

# Fit and transform the dataset
ds_sp_ohe = snowml_ohe.fit(ds_sp).transform(ds_sp)

# Print the resulting column names
ds_sp_ohe.columns


### training the model trying snowml model 


In [None]:
ds_sp_ohe.show(2)

In [None]:
train, test = ds_sp_ohe.random_split(weights=[0.70, 0.30], seed=216)

In [None]:
train = train.fillna(0)
test = test.fillna(0)

In [None]:
train.show(2)

In [None]:
test.show(2)

In [None]:
from snowflake.ml.modeling.xgboost import XGBRegressor, XGBClassifier

snow_xgb_tree = XGBClassifier(
    input_cols=train.drop(["IS_FRAUD", "TRANSACTION_TIME", "CUSTOMER_ID",'TRANSACTION_ID']).columns,
    label_cols=train.select("IS_FRAUD").columns,
    output_cols="FRAUD_PREDICTION",
    learning_rate = 0.75,
    ##tree_method="exact",
    ##n_estimators=5,
    booster = 'gbtree'
)

snow_xgb_linear = XGBClassifier(
    input_cols=train.drop(["IS_FRAUD", "TRANSACTION_TIME", "CUSTOMER_ID","TRANSACTION_ID"]).columns,
    label_cols=train.select("IS_FRAUD").columns,
    output_cols="FRAUD_PREDICTION",
    # tree_method="exact",
    # n_estimators=10,
    booster = 'gblinear'
)

In [None]:

snow_xgb_tree.fit(train)

In [None]:
snow_xgb_linear.fit(train)

In [None]:
from sklearn.metrics import f1_score, precision_score, recall_score
test_preds_tree = snow_xgb_tree.predict(test).select(["IS_FRAUD", "FRAUD_PREDICTION"]).to_pandas()
test_preds_linear = snow_xgb_linear.predict(test).select(["IS_FRAUD", "FRAUD_PREDICTION"]).to_pandas()

f1_tree = f1_score(test_preds_tree.IS_FRAUD, test_preds_tree.FRAUD_PREDICTION)
f1_linear = f1_score(test_preds_linear.IS_FRAUD, test_preds_linear.FRAUD_PREDICTION)


precision_tree = precision_score(test_preds_tree.IS_FRAUD, test_preds_tree.FRAUD_PREDICTION)
precision_linear = precision_score(test_preds_linear.IS_FRAUD, test_preds_linear.FRAUD_PREDICTION)

recall_tree = recall_score(test_preds_tree.IS_FRAUD, test_preds_tree.FRAUD_PREDICTION)
recall_linear = recall_score(test_preds_linear.IS_FRAUD, test_preds_linear.FRAUD_PREDICTION)


print(f'GB Tree: \n f1: {f1_tree} \n precision {precision_tree} \n recall: {recall_tree}')
print(f'GB Linear: \n f1: {f1_linear} \n precision {precision_linear} \n recall: {recall_linear}')


In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
# Create the Model Registry and register your initial model
from snowflake.ml.registry import Registry


In [None]:
from sklearn.model_selection import train_test_split
ds_sp_pandas = ds_sp.to_pandas()

# Assuming 'ds_sp' is a valid pandas DataFrame
X = ds_sp_pandas[['DEVICE_TYPE', 'MERCHANT_CATEGORY', 'TRANSACTION_DAY', 
                  'TRANSACTION_AMOUNT', 'SENTIMENT_SCORE']]
y = ds_sp_pandas['IS_FRAUD']

# Splitting the dataset into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Optional: Display shapes to verify the split
print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)


In [None]:
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline



# Define categorical and numerical columns
categorical_features = ['DEVICE_TYPE', 'MERCHANT_CATEGORY', 'TRANSACTION_DAY']
numerical_features = ['TRANSACTION_AMOUNT', 'SENTIMENT_SCORE']

# Preprocessing pipeline
preprocessor = ColumnTransformer(
    transformers=[
        ('num', Pipeline([
            ('imputer', SimpleImputer(strategy='median')),
            ('scaler', StandardScaler())
        ]), numerical_features),
        ('cat', Pipeline([
            ('imputer', SimpleImputer(strategy='most_frequent')),
            ('encoder', OneHotEncoder(handle_unknown='ignore'))
        ]), categorical_features)
    ]
)


# Define models
models = {
    "XGBoost": Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', XGBClassifier(use_label_encoder=False, eval_metric='logloss'))
    ]),
    "RandomForest": Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', RandomForestClassifier())
    ]),
    "LogisticRegression": Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', LogisticRegression())
    ])
}

best_model = None
best_score = 0

for name, model in models.items():
    print(f"Training {name} model...")
    model.fit(X_train, y_train)
    preds = model.predict(X_test)
    score = accuracy_score(y_test, preds)
    print(f"{name} Accuracy: {score:.4f}")

    if score > best_score:
        best_model = model
        best_score = score

print(f"Best model: {type(best_model.named_steps['classifier']).__name__} with accuracy {best_score:.4f}")

#### Model Registry, Deployment, and Serving Summary (with Snowpark Container Services)

*   **Model Registry**:
    *   **Purpose**: Stores and manages ML models within Snowflake.
    *   **Functionality**:
        *   Versioning
        *   Metadata Tracking
        *   Model Performance Metrics
    *   **Workflow**:
        1.  Create a registry connected to Snowflake database and schema.
        2.  Log the model to the registry, including model files and any relevant metadata, along with model versions.
        3.  Set performance metrics to track model performance.

*   **Model Deployment (Multiple Options):**
    *   **Purpose**: Make the model available for inference or prediction tasks within Snowflake.
    *   **Deployment Strategies**:
        *   **User-Defined Function (UDF) Deployment**: Package the model as a UDF in Snowflake, allowing direct SQL calls for inference.
        *   **Snowpark Integration**: Use Snowpark to load the model and perform predictions on data within a Snowpark session.
        *   **Snowpark Container Services (SPCS)**: Package the model and inference code into a container, then deploy it as a service in SPCS. This allows for greater flexibility in the inference environment and can support complex models.

*   **Model Serving/Inference**:
    *   **Purpose**: Use the deployed model to generate predictions on new data.
    *   **Methods**:
        *   **SQL Inference**: Call the deployed model using SQL queries (if deployed as UDF).
        *   **Snowpark Inference**: Load and use the model within a Snowpark session to make predictions.
        *   **SPCS Inference**: Send requests to the deployed SPCS service to get predictions. This can be done from external applications or from within Snowflake.
    *   **Framework**: Cortex, Snowpark, SPCS


In [None]:
from snowflake.ml._internal.utils import identifier
#Create a snowflake model registry object 
db = identifier._get_unescaped_name(session.get_current_database())
schema = identifier._get_unescaped_name(session.get_current_schema())

model_registry = Registry(session=session, 
                    database_name=session.get_current_database(), 
                    schema_name=session.get_current_schema(),
                    options={"enable_monitoring": True})

In [None]:
#Deploy the tree booster model to the model registry
# Define model name
model_name = "FRAUD_ANALYSIS_XGB"
tree_version_name = 'V2'

try:
    mv_tree = model_registry.get_model(model_name).version(tree_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    mv_tree = model_registry.log_model(
        model_name=model_name,
        model=snow_xgb_tree, 
        version_name=tree_version_name,
        comment = "snow ml model built off feature store using tree booster",
    )
    mv_tree.set_metric(metric_name="F1_score", value=f1_tree)
    mv_tree.set_metric(metric_name="Precision_score", value=precision_tree)
    mv_tree.set_metric(metric_name="Recall_score", value=recall_tree)




In [None]:
#Deploy the tree booster model to the model registry
# Define model name
model_name = "FRAUD_ANALYSIS_LINEAR"
tree_version_name = 'V1'

try:
    mv_tree = model_registry.get_model(model_name).version(tree_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    mv_linear = model_registry.log_model(
        model_name=model_name,
        model=snow_xgb_linear, 
        version_name=tree_version_name,
        comment = "snow ml model built off feature store using linear booster",
    )
    mv_linear.set_metric(metric_name="F1_score", value=f1_linear)
    mv_linear.set_metric(metric_name="Precision_score", value=precision_linear)
    mv_linear.set_metric(metric_name="Recall_score", value=recall_linear)

In [None]:
#Deploy the sklearn model to the model registry
# Define model name
model_name = "FRAUD_ANALYSIS"
version_name = 'V1'

try:
    mv = model_registry.get_model(model_name).version(version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    mv  = model_registry.log_model(
        model_name=model_name,
        model=best_model, 
        version_name=version_name,
        sample_input_data=X_train,
        comment = "sklearn model build",
    )

In [None]:
model_registry.show_models()

In [None]:
## model_registry.delete_model("FRAUD_ANALYSIS")

In [None]:
model_registry.get_model("FRAUD_ANALYSIS").show_versions()

In [None]:
### retrieving sklearn - frad model - version 1 
reg_model = model_registry.get_model("FRAUD_ANALYSIS").version("v1")
### retrieving sklearn - frad model - version 1 
reg_model_tree = model_registry.get_model("FRAUD_ANALYSIS_XGB").version("v2")

reg_model_linear = model_registry.get_model("FRAUD_ANALYSIS_LINEAR").version("v1")


print(mv)
print(mv.show_metrics())

print(mv_tree)
print(mv_tree.show_metrics())

print(mv_linear)
print(mv_linear.show_metrics())

# Batch Inferencing on warehouse

#### Two model what we have trained Sklearn & snowml lets predict using both the models 

In [None]:
reg_preds_tree = mv_tree.run(test, function_name = "predict")
reg_probs_tree_prob = mv_tree.run(test, function_name="predict_proba") 
reg_preds_tree.show(2)
reg_probs_tree_prob.show(2)

In [None]:
### Extracting customer id, transaction Id, and prediction 

reg_probs_tree_prob_complete = mv_tree.run(ds_sp_ohe, 
                            function_name="predict_proba").select("CUSTOMER_ID", 
                        "TRANSACTION_ID", "PREDICT_PROBA_0", "PREDICT_PROBA_1" ) 

reg_probs_tree_prob_complete.show(3)

In [None]:
### Earlier data set before split 
ds_sp.show(5)

In [None]:
joined_df = ds_sp.join(
    reg_probs_tree_prob_complete,
    on=["CUSTOMER_ID", "TRANSACTION_ID"],
    join_type="left"
)

joined_df.show(2)
# # Save the dataframe
#joined_df.write.mode("overwrite").save_as_table("FT_PREDICTION_FINAL")

In [None]:
select * from FT_PREDICTION_FINAL limit 2; 

#### This was an additional code to predict using sciktlearn model 

In [None]:
reg_probs_sk = mv.run(X_test, function_name="predict_proba") 
reg_preds_sk = mv.run(X_test, function_name = "predict")
print(reg_probs_sk.head(2))
print(reg_preds_sk.head(2))


### Feature importance and score 

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Get feature names from preprocessor
feature_names = (models["RandomForest"]
                 .named_steps["preprocessor"]
                 .transformers_[0][2] +  # Numerical features
                 list(models["RandomForest"].named_steps["preprocessor"]
                      .transformers_[1][1]
                      .named_steps["encoder"]
                      .get_feature_names_out(categorical_features)))  # Encoded categorical features

# Get feature importance from the RandomForestClassifier
importances = best_model.named_steps["classifier"].feature_importances_

# Convert to DataFrame for better readability
feature_importance_df = pd.DataFrame(
    {"FEATURE": feature_names, "IMPORTANCE": importances}
).sort_values(by="IMPORTANCE", ascending=False)

# Print feature importances
#print(feature_importance_df)

snowpark_df = session.create_dataframe(feature_importance_df)

#snowpark_df
#snowpark_df.write.mode("overwrite").save_as_table("feature_importance_df")

In [None]:
select * from feature_importance_df;

In [None]:
# Plot feature importance
plt.figure(figsize=(10, 5))
plt.barh(feature_importance_df["FEATURE"], feature_importance_df["IMPORTANCE"])
plt.xlabel("Importance Score")
plt.ylabel("FEATURE")
plt.title("Feature Importance for RandomForestClassifier Model")
plt.gca().invert_yaxis()
plt.show()

#### Real time inferencing deployment on Snowpark Container Services
#### Batch predicting using SPCS 

In [None]:
# Define model name
model_name = "FRAUD_ANALYSIS_SPCS"
image_repo_name = "AI_ML_REPO"
cp_name = "AI_ML_CP"
num_spcs_nodes = '3'
spcs_instance_family = 'CPU_X64_L'
service_name = 'FRAUD_DETECTION_SERVICE'

In [None]:
current_database = session.get_current_database().replace('"', '')
current_schema = session.get_current_schema().replace('"', '')
extended_image_repo_name = f"{current_database}.{current_schema}.{image_repo_name}"
extended_service_name = f'{current_database}.{current_schema}.{service_name}'

In [None]:
session.sql(f"alter compute pool if exists {cp_name} stop all").collect()
session.sql(f"drop compute pool if exists {cp_name}").collect()
session.sql(f"create compute pool {cp_name} min_nodes={num_spcs_nodes} max_nodes={num_spcs_nodes} instance_family={spcs_instance_family} auto_resume=True auto_suspend_secs=300").collect()
session.sql(f"describe compute pool {cp_name}").show()

In [None]:
session.sql(f"create image repository if not exists {extended_image_repo_name}").collect()

In [None]:
mv_tree.create_service(
    service_name=extended_service_name,
    service_compute_pool=cp_name,
    image_repo=extended_image_repo_name,
    ingress_enabled=True,
    max_instances=int(num_spcs_nodes),
    build_external_access_integration="ALLOW_ALL_INTEGRATION"
)

In [None]:
mv_tree.list_services()

In [None]:
session.sql(f"SELECT VALUE:status::VARCHAR as SERVICESTATUS, VALUE:message::VARCHAR as SERVICEMESSAGE FROM TABLE(FLATTEN(input => parse_json(system$get_service_status('{service_name}')), outer => true)) f").show(100)

In [None]:
session.sql(f"show endpoints in service {service_name}").collect()[0]["ingress_url"]

In [None]:
test.limit(1).show()

In [None]:
mv_tree.run(test.limit(4) , service_name=service_name, function_name="predict")


In [None]:
mv_tree.run(test.limit(4) , service_name=service_name, function_name="predict_proba")

# Model explainability

In [None]:
shap_vals = mv_tree.run(test.sample(n=1000), function_name="explain")
shap_vals.show(2)

In [None]:
## convert in to pandas
shap_pd = shap_vals.to_pandas()
shap_pd.head(2)

In [None]:
# !pip install shap

In [None]:
# column_list = shap_pd.columns.tolist()
# print(column_list)


In [None]:
# column_list = just_input_vals.columns.tolist()
# print(column_list)

In [None]:
# shap_pd.columns.intersection(just_input_vals.columns)

In [None]:
# just_shap = shap_pd.iloc[:, 31:]
# just_input_vals = shap_pd.iloc[:, :31]

In [None]:
# just_input_vals = shap_pd.iloc[:, :31].drop(["CUSTOMER_ID","IS_FRAUD", "TRANSACTION_TIME"], axis=1)
# just_input_vals.shape

In [None]:
# just_shap = shap_pd.iloc[:, 31:]
# just_shap.shape

In [None]:
# import shap 
# just_shap = shap_pd.iloc[:, 30:]
# just_input_vals = shap_pd.iloc[:, :30].drop(["CUSTOMER_ID","IS_FRAUD","TRANSACTION_ID", "TRANSACTION_TIME"], axis=1)
# shap.summary_plot(np.array(just_shap), just_input_vals, feature_names = just_input_vals.columns)

In [None]:
# import seaborn as sns

# sns.scatterplot(data = shap_pd, x ="LOAN_PURPOSE_NAME_Home purchase", y = "LOAN_PURPOSE_NAME_Home purchase_explanation")

In [None]:
# import seaborn as sns

# income_0_to_1M = shap_pd[(shap_pd.INCOME>0) & (shap_pd.INCOME<1000000)]
# sns.scatterplot(data = income_0_to_1M, x ="INCOME", y = "INCOME_explanation")

## Model Monitoring setup

In [None]:
train.write.save_as_table("DEMO_FRAUD_TRAIN", mode="overwrite")
test.write.save_as_table("DEMO_FRAUD_TEST", mode="overwrite")

In [None]:
from snowflake import snowpark
from snowflake.ml.registry import Registry
import joblib
import os
import logging
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.ml.modeling.preprocessing as pp
from snowflake.snowpark.types import StringType, IntegerType
import snowflake.snowpark.functions as F


def demo_inference_sproc(session: snowpark.Session, table_name: str, modelname: str, modelversion: str) -> str:
    
    database=session.get_current_database()
    schema=session.get_current_schema()
    reg = Registry(session=session)
    m = reg.get_model(modelname)  # Fetch the model using the registry
    mv = m.version(modelversion)
    
    input_table_name=table_name
    pred_col = f'{modelversion}_PREDICTION'

    # Read the temporary DataFrame
    df = session.table(input_table_name)

    # Perform prediction using the model
    results = mv.run(df, function_name="predict")  # 'results' is the output DataFrame with predictions
    results = results.withColumnRenamed("FRAUD_PREDICTION", pred_col)

    # Write results to a temporary Snowflake table
    temp_results_table = "DEMO_TEMP_PREDICTION_RESULTS"
    results.write.save_as_table(temp_results_table, mode='overwrite')

    
    # # Execute the update statement

    df = df.with_column(pred_col, F.lit(9999))
    df.write.save_as_table(input_table_name, mode='overwrite')
    update_sql1 = f"""
    UPDATE {input_table_name} t
    SET {pred_col} = r.{pred_col}
    FROM DEMO_TEMP_PREDICTION_RESULTS r
    WHERE t.CUSTOMER_ID = r.CUSTOMER_ID
    AND t.TRANSACTION_TIME=r.TRANSACTION_TIME ;
    """
    
    # Execute the update statement
    session.sql(update_sql1).collect()

    return "Success"

# Register the stored procedure
session.sproc.register(
    func=demo_inference_sproc,
    name="fraud_prediction_inference_sproc",
    replace=True,
    is_permanent=True,
    stage_location="@AI_ML_STAGE",
    packages=['joblib', 'snowflake-snowpark-python', 'snowflake-ml-python'],
    return_type=StringType()
)


In [None]:
CALL fraud_prediction_inference_sproc('DEMO_FRAUD_TRAIN','FRAUD_ANALYSIS_XGB', 'V2');

In [None]:
SELECT * FROM DEMO_FRAUD_TRAIN LIMIT 2; 

In [None]:
CALL fraud_prediction_inference_sproc('DEMO_FRAUD_TEST','FRAUD_ANALYSIS_XGB', 'V2');

In [None]:
SELECT * FROM DEMO_FRAUD_TEST LIMIT 2; 

In [None]:
CREATE OR REPLACE MODEL MONITOR FRAUD_ANALYSIS_XGB_MODEL_MONITOR
WITH
    MODEL=FRAUD_ANALYSIS_XGB
    VERSION=V1
    FUNCTION=predict
    SOURCE=DEMO_FRAUD_TEST
    BASELINE=DEMO_FRAUD_TRAIN
    TIMESTAMP_COLUMN=TRANSACTION_TIME
    PREDICTION_CLASS_COLUMNS=(V1_PREDICTION)  
    ACTUAL_CLASS_COLUMNS=(IS_FRAUD)
    ID_COLUMNS=(CUSTOMER_ID)
    WAREHOUSE=COMPUTE_WH
    REFRESH_INTERVAL='5 min'
    AGGREGATION_WINDOW='1 day';

In [None]:
SHOW MODEL MONITORS;

Valid Time Units 1 DAY 1 WEEK 1 MONTH 1 QUARTER 1 YEAR

Custom Time Periods
The amount must be a positive integer, allowing for custom aggregation periods such as:

7 DAY
14 DAY
3 MONTH

In [None]:
SELECT *
FROM TABLE(MODEL_MONITOR_DRIFT_METRIC(
    'FRAUD_ANALYSIS_XGB_MODEL_MONITOR', 
    'JENSEN_SHANNON',                   
    'TRANSACTION_AMOUNT',                
    '1 DAY',                             
))

In [None]:
SELECT *
FROM TABLE(MODEL_MONITOR_PERFORMANCE_METRIC(
    'FRAUD_ANALYSIS_XGB_MODEL_MONITOR',     -- model_monitor_name
    'CLASSIFICATION_ACCURACY',              -- updated metric_name
    '1 DAY',                                -- granularity
    DATEADD(day, -30, CURRENT_DATE()),      -- start_time (30 days ago)
    CURRENT_DATE()                          -- end_time (today)
))

In [None]:
-- SELECT *
-- FROM TABLE(MODEL_MONITOR_STAT_METRIC(
--     'FRAUD_ANALYSIS_XGB_MODEL_MONITOR',     -- model_monitor_name
--     CLASSIFICATION_ACCURACY',              -- updated metric_name
--     '1 DAY',                                -- granularity
--     DATEADD(day, -30, CURRENT_DATE()),      -- start_time (30 days ago)
--     CURRENT_DATE()                          -- end_time (today)
-- ))

              ******************************** End of notebook **********************