#### Flow of the presentation 

- Data Generation, Fraud detection, logs as well (cortex for summarizing )
- EDA 
- Feature engineering 
- Use Feature Store to track engineered features
- Train & evaluate models (XGBoost, RF, LR)
- Model deploymeent & monitoring - Low latency less < 1 ,s ( SPCS & Warehouse )  ####clarify
- App 
- cortex analyst to find answers 


In [None]:
# Standard Python Libraries
import sys
import json
import warnings
from datetime import timedelta

# Data Manipulation and Analysis
import pandas as pd
import numpy as np

# Data Visualization
import matplotlib.pyplot as plt
import seaborn as sns

# Machine Learning
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from xgboost import XGBClassifier

# Snowpark Core
from snowflake.snowpark import Session, DataFrame
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.version import VERSION
import snowflake.snowpark.functions as F
from snowflake.snowpark.exceptions import SnowparkSessionException
from snowflake.snowpark.functions import sproc, col, dayname
from snowflake.snowpark import types as T
from snowflake.snowpark.window import Window

# Snowpark ML
from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.ml.modeling.preprocessing import OrdinalEncoder, OneHotEncoder
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error
from snowflake.ml.registry import Registry

# Snowflake Feature Store
from snowflake.ml.feature_store import (
    FeatureStore, FeatureView, Entity, CreationMode, setup_feature_store
)

# Snowflake Task API
from snowflake.core import Root
from snowflake.core.database import Database
from snowflake.core.schema import Schema
from snowflake.core.warehouse import Warehouse
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.core._common import CreateMode

# Streamlit
import streamlit as st

# Suppress warnings
warnings.filterwarnings("ignore")



In [None]:
# Create Snowflake Session object
session = get_active_session()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('SELECT current_user(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('\nConnection Established with the following parameters:')
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

- Do some basics EDA 

In [None]:
select * from transactions limit 2;

In [None]:
select * from customer_complaints limit 2;

In [None]:
--drop table fraud_analysis;

Merge two datasets

In [None]:
CREATE OR REPLACE TABLE fraud_analysis AS
SELECT 
    t.transaction_id, 
    t.customer_id, 
    t.transaction_amount, 
    t.is_fraud, 
    t.merchant_category,
    t.device_type,
    t.location,
    t.transaction_time,
    c.complaint_text, 
    c.keywords,
    c.complaint_time
FROM transactions t
LEFT JOIN customer_complaints c
ON t.customer_id = c.customer_id,

In [None]:
select * from fraud_analysis limit 2;

In [None]:
# print(fraud_analysis.head())
# print(fraud_analysis.columns)

In [None]:
 --drop table  fraud_analysis;

- lets do feature engineering 

In [None]:
-- ALTER TABLE fraud_analysis 
-- ADD COLUMN computed_sentiment STRING;
-- UPDATE fraud_analysis 
-- SET computed_sentiment = SNOWFLAKE.CORTEX.SENTIMENT(complaint_text);

In [None]:
-- SELECT complaint_text, computed_sentiment 
-- FROM fraud_analysis 
-- LIMIT 10;

### Create features with Feature Store

Initialize Feature Store
Let's first create a feature store client. With CREATE_IF_NOT_EXIST mode, it will try to create a new Feature Store schema and all necessary feature store metadata if it doesn't exist already. It is required for the first time to set up a Feature Store. Afterwards, you can use FAIL_IF_NOT_EXIST mode to connect to an existing Feature Store.

Note that the database being used must already exist. Feature Store will NOT try to create the database even in CREATE_IF_NOT_EXIST mode.

Generate cumulative behavioral metrics for users based on their transaction data, such as cumulative clicks and cumulative logins per hour. It involves the use of window functions and joins to combine and transform data from the CREDITCARD_TRANSACTIONS table.

In [None]:
feature_df = session.sql("""
  SELECT 
    customer_id,
    AVG(transaction_amount) AS avg_transaction_amount,
    COUNT(*) AS transaction_count,
    MAX(transaction_time) AS last_transaction_time,
    SUM(CASE WHEN is_fraud = TRUE THEN 1 ELSE 0 END) AS fraud_count,
    MODE(merchant_category) AS most_common_merchant_category,
    MODE(device_type) AS most_common_device_type,
    MODE(location) AS most_common_location,
    AVG(SNOWFLAKE.CORTEX.SENTIMENT(complaint_text)) AS avg_sentiment_score,
    COUNT(CASE WHEN SNOWFLAKE.CORTEX.SENTIMENT(complaint_text) > 0 THEN 1 END) AS positive_sentiment_count,
    COUNT(CASE WHEN SNOWFLAKE.CORTEX.SENTIMENT(complaint_text) < 0 THEN 1 END) AS negative_sentiment_count,
    MODE(DAYNAME(transaction_time)) AS most_common_transaction_day
  FROM fraud_analysis
  GROUP BY customer_id
""")



In [None]:
# feature_df.write.mode('overwrite').save_as_table('feature_df')
# feature_df=session.sql("select * from feature_df")


In [None]:
# Get the first two rows
feature_df.limit(2).collect()

In [None]:
# df = session.table("fraud_analysis")
# df.show(2)

In [None]:
select * from fraud_analysis limit 2 ;

Creating Entities

An entity is an abstraction over a set of primary keys used for looking up feature data. An Entity represents a real-world "thing" that has data associated with it. Below cell registers an entity for Customer and Transaction in Feature Store

In [None]:
fs = FeatureStore(
    session=session, 
    database=session.get_current_database(), 
    name=session.get_current_schema(), 
    default_warehouse=session.get_current_warehouse(),
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST
)

In [None]:
from snowflake.ml.feature_store import Entity

customer_entity = Entity(
    name="CUSTOMER",
    join_keys=["customer_id"],
    desc="Primary Key for Customer"
)
fs.register_entity(customer_entity)

In [None]:
fs.list_entities()

# Using Feature Views

A feature view is a group of logically-related features that are refreshed on the same schedule. The FeatureView constructor accepts a Snowpark DataFrame that contains the feature generation logic. The provided DataFrame must contain the join_keys columns specified in the entities associated with the feature view. In this example we are using time-series data, so we will also specify the timestamp column name.

Below cell creates a feature view for the customer features


In [None]:
fraud_feature_view = FeatureView(
    name="FRAUD_FEATURES",
    feature_df=feature_df,
    entities=[customer_entity],
    desc="Features derived from customer transactions for fraud detection",
    tags={"domain": "fraud_detection", "update_frequency": "daily"},
    overwrite=True,
    refresh_freq="1 day"
)
# Register the feature view with the feature store
registered_fv=fs.register_feature_view(fraud_feature_view,version="V1")


In [None]:
registered_fv

#### More code to ref later 

In [None]:
# # from snowflake.snowpark.functions import call_udf

# # feature_eng_dict = dict()

# # # New features
# # feature_eng_dict["SENTIMENT_SCORE"] = call_udf("SNOWFLAKE.CORTEX.SENTIMENT", col("complaint_text"))
# # feature_eng_dict["TRANSACTION_DAY"] = dayname(col("transaction_time"))

# # # Apply all features to the DataFrame
# # df = df.with_columns(feature_eng_dict.keys(), feature_eng_dict.values())

# # df.show(2)

# # feature_df = df.select(
# #     "transaction_amount", 
# #     "merchant_category", 
# #     *list(feature_eng_dict.keys())
# # )
# # feature_df.show(5)

# # print(df.explain())
# # #Create a dataframe with just the ID, timestamp, and engineered features. We will use this to define our feature view
# # feature_df = df.select([["transaction_amount","merchant_category"]]+list(feature_eng_dict.keys()))
# # feature_df.show(5)
# # ds = fs.generate_dataset(
# #     name="FRAUD_DETECTION_DATASET_V1",
# #     spine_df=feature_df.drop("TRANSACTION_AMOUNT", "IS_FRAUD", "SENTIMENT_SCORE", "SENTIMENT_CATEGORY"),
# #     features=[fraud_feature_view],
# #     spine_timestamp_col="TRANSACTION_TIME",
# #     spine_label_cols=["IS_FRAUD"]
# # )
# # registered_fv = fs.register_feature_view(fraud_feature_view, version="v2", overwrite=True)
# # print(registered_fv.status)  # This should print FeatureViewStatus.ACTIVE
# # features=[fraud_feature_view]
# # features
# #print(fraud_feature_view.name)

# # fs = FeatureStore(
# #     session=session, 
# #     database=session.get_current_database(), 
# #     name=session.get_current_schema(), 
# #     default_warehouse=session.get_current_warehouse(),
# #     creation_mode=CreationMode.CREATE_IF_NOT_EXIST
# # )

# # from snowflake.ml.feature_store import Entity

# # customer_entity = Entity(
# #     name="CUSTOMER",
# #     join_keys=["customer_id"],
# #     desc="Primary Key for Customer"
# # )
# # fs.register_entity(customer_entity)
# # Delete a specific version of a feature view
# # fs.delete_feature_view(
# #     feature_view="FRAUD_FEATURES",  
# #     version="v2"                   
# # )

# X = fraud_analysis[['TRANSACTION_AMOUNT', 'DEVICE_TYPE','MERCHANT_CATEGORY','LOCATION']]
# y = fraud_analysis['IS_FRAUD']
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


# #Use the Snowpark DataFrame .describe function. You need to need to visualize from a pandas DataFrame
# fraud_analysis.describe()

# # Get transactions dataset and get features from the feature store
# def create_dataset(spine_df, name):
#     train_dataset = fs.generate_dataset(
#     name=name,
#     spine_df=spine_df,
#     features=[customer_fv]
#     )
#     df = train_dataset.read.to_snowpark_dataframe()
#     return df
# # Split into train/validation/test


# # Generate dataset for training
# train_dataset = fs.generate_dataset(
#     name="FRAUD_DETECTION_DATASET",
#     spine_df=spine_df,  # Now properly defined
#     features=[customer_fv],
#     spine_timestamp_col="transaction_time",
#     spine_label_cols=["is_fraud"],
#     version="v1"
# )
 # Ensure using sklearn-compatible XGBoost

# models = {
#     "XGBoost": XGBClassifier(),
#     "RandomForest": RandomForestClassifier(),
#     "LogisticRegression": LogisticRegression()
# }

# best_model = None
# best_score = 0

# for name, model in models.items():
#     print(f"Training {name} model...")
#     model.fit(X_train, y_train)  # Ensure fit method is correct
#     preds = model.predict(X_test)
#     score = accuracy_score(y_test, preds)
#     print(f"{name} Accuracy: {score:.4f}")

#     if score > best_score:
#         best_model = model
#         best_score = score

# print(f"Best model: {best_model} with accuracy {best_score:.4f}")

In [None]:
fs.list_feature_views()

This completes the setup for the Database objects and Feature Store Producer workflow. The data and the features which have been generated is available for the consumer with appropritate privileges. Time to head on to the next notebook!

Generating Datasets for Training
We are now ready to generate our training set. We'll define a spine DataFrame to form the backbone of our generated dataset and pass it into FeatureStore.generate_dataset() along with our Feature Views.

NOTE: The spine serves as a request template and specifies the entities, labels and timestamps (when applicable). The feature store then attaches feature values along the spine using an AS-OF join to efficiently combine and serve the relevant, point-in-time correct feature data.

In [None]:
select * from fraud_analysis limit 2 ;

In [None]:
session.sql("create or replace TABLE TRANSACTIONS_DATA (customer_id VARCHAR,TRANSACTION_ID VARCHAR(16777216),IS_FRAUD VARCHAR)").collect()

In [None]:
session.sql("insert into TRANSACTIONS_DATA(customer_id,TRANSACTION_ID, IS_FRAUD) SELECT distinct customer_id,TRANSACTION_ID, IS_FRAUD FROM fraud_analysis").collect()
TRANSACTIONS_DATA_df = session.table("TRANSACTIONS_DATA")
TRANSACTIONS_DATA_df.show()

Descriptive statistics include those that summarize the central tendency, dispersion and shape of a dataset’s distribution.

In [None]:
full_df = session.sql("SELECT * FROM fraud_analysis")
full_df.describe()

In [None]:
full_df.columns

Visualization of the fraud and normal data using a bar chart displayed in Streamlit. Shows the total number of distinct transactions for each fraud category.

In [None]:
# Load the dataset
dataset=full_df.toPandas()
# Group by 'IS_FRAUD' and count distinct TRANSACTION_ID
df= TRANSACTIONS_DATA_df.select( F.col("TRANSACTION_ID"),F.col("IS_FRAUD")).groupBy(F.col("IS_FRAUD")) \
          .agg(F.count_distinct(F.col("TRANSACTION_ID")).alias("TOTAL_FRAUD")) 


st.bar_chart(df,x="IS_FRAUD",y="TOTAL_FRAUD")

## Feature Store
The feature store contains feature views for customers and transactions. Model features will be accessed from the feature store.

**Snowflake Feature:** Feature Store (PrPr) - Easily find features that work with your data

In [None]:
customer_fv : FeatureView = fs.get_feature_view(
    name='FRAUD_FEATURES',
    version='V1'
)
print(customer_fv)

Generate a training data set with the feature store’s generate_training_set method, which enriches a Snowpark DataFrame that contains the source data with the derived feature values

In [None]:
from snowflake.snowpark import functions as F

# Example 1: Use raw data as spine
spine_df = session.table("fraud_analysis").select(
    "customer_id", 
    "transaction_time", 
    "is_fraud"
)
spine_df

In [None]:
train_dataset

In [None]:
# Generate dataset for training
train_dataset = fs.generate_dataset(
    name="FRAUD_DETECTION_DATASET",
    spine_df=spine_df,
    features=[customer_fv],
    spine_timestamp_col="transaction_time",
    spine_label_cols=["is_fraud"]
)

# Convert to pandas DataFrame
Fraud_data = train_dataset.read.to_pandas()
Fraud_data

see how can i incorporate the version 

In [None]:
# Print all column names
print(Fraud_data.columns)

# Expected output should include:
# ['AVG_TRANSACTION_AMOUNT', 'MOST_COMMON_DEVICE_TYPE', ...]

In [None]:
X = Fraud_data[['AVG_TRANSACTION_AMOUNT', 'TRANSACTION_COUNT','MOST_COMMON_DEVICE_TYPE','MOST_COMMON_LOCATION','MOST_COMMON_MERCHANT_CATEGORY','MOST_COMMON_TRANSACTION_DAY','AVG_SENTIMENT_SCORE']]
y = Fraud_data['IS_FRAUD']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
X_train.head(2)

In [None]:
y_train.head()

View the training dataset.

This contains the columns except for Ids. The Label is included here as this will be specified in the LABEL field during model training.

# Training the model


In [None]:
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from xgboost import XGBClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Create the Model Registry and register your initial model
from snowflake.ml.registry import Registry

In [None]:
# Define categorical and numerical columns
categorical_features = ['MOST_COMMON_DEVICE_TYPE', 'MOST_COMMON_LOCATION',
                        'MOST_COMMON_MERCHANT_CATEGORY', 'MOST_COMMON_TRANSACTION_DAY']
numerical_features = ['AVG_TRANSACTION_AMOUNT', 'TRANSACTION_COUNT', 'AVG_SENTIMENT_SCORE']

# Create preprocessor
preprocessor = ColumnTransformer(
    transformers=[
        ('num', SimpleImputer(strategy='median'), numerical_features),
        ('cat', OneHotEncoder(handle_unknown='ignore'), categorical_features)
    ])


In [None]:
# Define models
models = {
    "XGBoost": Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', XGBClassifier(use_label_encoder=False, eval_metric='logloss'))
    ]),
    "RandomForest": Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', RandomForestClassifier())
    ]),
    "LogisticRegression": Pipeline([
        ('preprocessor', preprocessor),
        ('classifier', LogisticRegression())
    ])
}


In [None]:
best_model = None
best_score = 0

for name, model in models.items():
    print(f"Training {name} model...")
    model.fit(X_train, y_train)
    preds = model.predict(X_test)
    score = accuracy_score(y_test, preds)
    print(f"{name} Accuracy: {score:.4f}")

    if score > best_score:
        best_model = model
        best_score = score

print(f"Best model: {type(best_model.named_steps['classifier']).__name__} with accuracy {best_score:.4f}")


In [None]:
import numpy as np

print("y_train distribution:", np.bincount(y_train))
print("y_test distribution:", np.bincount(y_test))


# Logging the model to Model Registry

In [None]:

registry = Registry(session=session, database_name="FRAUD_DB", schema_name="PUBLIC")
model_name = "fraud_detection"

In [None]:
mv = registry.log_model(best_model,
                   model_name="fraud_detection",
                   version_name=f'V{get_next_model_version_to_be_registered(model_name)}',
                   #conda_dependencies=["scikit-learn"],
                   comment=" ML model",
                   metrics={"accuracy_score": best_score},
                   sample_input_data=X_train)
                  # task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION)

In [None]:
# Let's confirm model(s) that were added
registry.get_model(model_name).show_versions()

In [None]:
# We can see what the default model is when we have multiple versions with the same model name:
registry.get_model(model_name).default.version_name

In [None]:
# Now we can use the default version model to perform inference.
model_ver = registry.get_model(model_name).version('V0')
model_ver

In [None]:
result_sdf = model_ver.run(X_test, function_name="predict")
result_sdf

In [None]:
result_sdf.show()