In [None]:
# Import python packages
import streamlit as st
import pandas as pd

# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


In [None]:
from snowflake.ml.modeling.ensemble import RandomForestClassifier
from snowflake.ml.modeling import metrics as snowml_metrics
from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity, CreationMode
from snowflake.ml.registry import registry
from snowflake.ml._internal.utils import identifier

# ML packages  
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, OrdinalEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import f1_score
from xgboost import XGBClassifier

In [None]:
import pandas as pd
from sklearn import datasets

iris = datasets.load_iris()
df_iris = pd.DataFrame(iris.data, columns=['SEP_LEN','SEP_WIDTH','PET_LEN','PET_WIDTH'])
df_iris['IRIS_TYPE'] = iris.target
df_iris = df_iris[df_iris['IRIS_TYPE'] != 2]
df_iris

In [None]:
from datetime import datetime, timedelta
# Function to add an artificial timestamp
def add_timestamp(df, start_date):
    df_copy = df.copy()
    num_rows = len(df_copy)
    timestamps = [start_date + timedelta(minutes=i) for i in range(num_rows)]
    df_copy['DUMMY_TS'] = timestamps
    return df_copy

# Create three different snapshots with different timestamps
df1 = add_timestamp(df_iris, datetime(2023, 1, 1, 10, 0, 0))
df2 = add_timestamp(df_iris, datetime(2023, 1, 1, 11, 0, 0))
df3 = add_timestamp(df_iris, datetime(2023, 1, 1, 12, 0, 0))

# Concatenate the DataFrames
df_evolving = pd.concat([df1, df2, df3], ignore_index=True)

# Remove the DUMMY_TS column before upload to avoid type issues
df_upload = df_evolving.drop('DUMMY_TS', axis=1)

# Create table without timestamp column first
table_name = "IRIS_EVOLVING_DATA"
session.sql(f"""
    CREATE OR REPLACE TABLE {table_name} (
        IRIS_ID int, 
        SEP_LEN FLOAT,
        SEP_WIDTH FLOAT,
        PET_LEN FLOAT,
        PET_WIDTH FLOAT,
        IRIS_TYPE INT
    )
""").collect()

df_upload['IRIS_ID'] = [i + 1 for i in range(len(df_upload))]

# Upload data without timestamp
session.write_pandas(df_upload, table_name, auto_create_table=False, overwrite=True)

# Now add the timestamp column in Snowflake using SQL
session.sql(f"""
    ALTER TABLE {table_name} 
    ADD COLUMN DUMMY_TS TIMESTAMP_NTZ(9)
""").collect()

# Create a new table with the timestamp data using a CTE approach
session.sql(f"""
    CREATE OR REPLACE TABLE {table_name}_WITH_TS AS
    SELECT 
        IRIS_ID, SEP_LEN, SEP_WIDTH, PET_LEN, PET_WIDTH, IRIS_TYPE,
        DATEADD(minute, 
                (ROW_NUMBER() OVER (ORDER BY IRIS_ID) - 1), 
                '2023-01-01 10:00:00'::TIMESTAMP_NTZ) AS DUMMY_TS
    FROM {table_name}
""").collect()

# Drop the old table and rename the new one
session.sql(f"DROP TABLE {table_name}").collect()
session.sql(f"ALTER TABLE {table_name}_WITH_TS RENAME TO {table_name}").collect()

print("Data uploaded and timestamp column created successfully!")

In [None]:
session.table('IRIS_EVOLVING_DATA')

In [None]:
ALTER TABLE IRIS_EVOLVING_DATA SET CHANGE_TRACKING = TRUE;


In [None]:
fs = FeatureStore(session=session, 
                  database="SSML", 
                  name="PUBLIC", 
                  default_warehouse="MBDA_WH",
                  creation_mode=CreationMode.CREATE_IF_NOT_EXIST,)

entity = Entity(name="IRIS", join_keys=["IRIS_ID"])
fs.register_entity(entity)
fs.list_entities().show()

In [None]:
sdf = session.table('IRIS_EVOLVING_DATA')
sdf = sdf.withColumn("MY_NEW_FEATURE", sdf["SEP_LEN"] + sdf["SEP_WIDTH"])
sdf.limit(3).show()

In [None]:
fv = FeatureView(name="IRIS_FEATURES", 
                 entities=[entity], 
                 feature_df=sdf['IRIS_ID', 'SEP_LEN', 'SEP_WIDTH', 'PET_LEN', 'PET_WIDTH', 'MY_NEW_FEATURE'], 
                 refresh_freq="1 minute", 
                 desc="iris features")

fv.attach_feature_desc(
    {
        "SEP_LEN": "The Sepal length",
        "SEP_WIDTH": "The sepal width",
        "PET_LEN": "The petal length",
        "PET_WIDTH": "The petal width",
        "MY_NEW_FEATURE": "The sepal length + the sepal width", 
    }
)

fv = fs.register_feature_view(feature_view=fv, 
                              version="V1", 
                              block=True)

fs.read_feature_view(fv).limit(3).show()

In [None]:
sdf_v2 = sdf.withColumn("MY_NEW_FEATURE2", sdf["SEP_LEN"] * sdf["SEP_WIDTH"])
fv2 = FeatureView(name="IRIS_FEATURES", 
                  entities=[entity],
                  feature_df=sdf_v2['IRIS_ID', 'MY_NEW_FEATURE2'], 
                  refresh_freq="1 minute", # can also be a cron schedule - * * * * * America/Los_Angeles
                  desc="iris features")

fv2 = fs.register_feature_view(feature_view=fv2,               
                               version="V2", 
                               block=True)

fs.read_feature_view(fv2).limit(3).show()

In [None]:
spine_df = session.table("IRIS_EVOLVING_DATA")
spine_df = spine_df.select("IRIS_ID", "IRIS_TYPE")
training_data = fs.generate_dataset(name="MY_DATASET",
                                    spine_df=spine_df,
                                    features=[fv.slice(["SEP_LEN", 
                                                        "SEP_WIDTH",
                                                        "MY_NEW_FEATURE"]),
                                              fv2.slice(["MY_NEW_FEATURE2"])],)

training_data.read.to_snowpark_dataframe().limit(3).show()

In [None]:
# Train initial model
print("🔄 Training initial churn prediction model...")

# Define feature columns
categorical_cols = []
numerical_cols = ['SEP_LEN', 'SEP_WIDTH', 'MY_NEW_FEATURE', 'MY_NEW_FEATURE2']
feature_cols = categorical_cols + numerical_cols
target_col = "IRIS_TYPE"

def train_model(feature_df, pipeline):
    """Train XGBoost model for prediction"""
    
    # Convert to pandas
    train_df = feature_df.to_pandas()
    
    # Split data
    train_data, test_data = train_test_split(train_df, test_size=0.2, random_state=111)
    
    # Create preprocessing pipeline
    preprocessor = ColumnTransformer(
        transformers=[
            ("ordinal", OrdinalEncoder(), categorical_cols),
            ("scaler", StandardScaler(), numerical_cols)
        ]
    )

    # Train model
    X_train = train_data[feature_cols]
    y_train = train_data[target_col]

    if pipeline:
        print('pipeline used')
        # Create model pipeline
        pipeline = Pipeline(
            steps=[ 
                ("preprocessor", preprocessor),
                ("model", XGBClassifier(random_state=42))
            ]
        )
           
        pipeline.fit(X_train, y_train)
    else:
        pipeline = XGBClassifier(random_state=42)
        pipeline.fit(X_train, y_train)
    
    # Evaluate on training set
    train_predictions = pipeline.predict(X_train)
    train_f1 = f1_score(y_train, train_predictions, average='weighted')
    
    # Evaluate on test set
    X_test = test_data[feature_cols]
    y_test = test_data[target_col]
    
    test_predictions = pipeline.predict(X_test)
    test_f1 = f1_score(y_test, test_predictions, average='weighted')
    
    return {
        'model': pipeline,
        'train_f1_score': train_f1,
        'test_f1_score': test_f1
    }

# Train the model
input_df = training_data.read.to_snowpark_dataframe()
model_result = train_model(input_df, False)

print(f"✅ Model training completed:")
print(f"   Training F1 Score: {model_result['train_f1_score']:.4f}")
print(f"   Test F1 Score: {model_result['test_f1_score']:.4f}")

In [None]:
from snowflake.ml.registry import registry

REGISTRY_DATABASE_NAME = "SSML"
REGISTRY_SCHEMA_NAME = "PUBLIC"
mr = registry.Registry(session=session, database_name=REGISTRY_DATABASE_NAME, schema_name=REGISTRY_SCHEMA_NAME)

In [None]:
# Register model in Model Registry
print("🔄 Registering model in Model Registry...")

# Log the trained model
baseline_model = mr.log_model(
    model=model_result['model'],
    model_name="IrisDetector",
    version_name="explainv8",
    conda_dependencies=["snowflake-ml-python", "xgboost", "scikit-learn"],
    sample_input_data=input_df[feature_cols].limit(100),
    options={"enable_explainability": True},
    target_platforms=["WAREHOUSE", "SNOWPARK_CONTAINER_SERVICES"],                
    comment="Baseline model for customer churn detection"
)

# Set metrics for the model
baseline_model.set_metric(metric_name="train_f1_score", value=model_result['train_f1_score'])
baseline_model.set_metric(metric_name="test_f1_score", value=model_result['test_f1_score'])

print("✅ Baseline model registered and set as default")
print(f"   Model name: IrisDetector")
print(f"   Version: baseline")

In [None]:
# mv is a snowflake.ml.model.ModelVersion object

model_name = "IrisDetector"
version_name = "explainv8"
m = mr.get_model(model_name)
mv = m.version(version_name)

In [None]:
explanations = mv.run(input_df[feature_cols], function_name="explain")
explanations

In [None]:
remote_prediction = mv.run(feature_df, function_name="predict")
remote_prediction.show()

In [None]:
spine_df = session.table("IRIS_EVOLVING_DATA")
spine_df = spine_df.select("IRIS_ID", "IRIS_TYPE", "DUMMY_TS")
training_data = fs.generate_dataset(name="MY_DATASET",
                                    spine_df=spine_df,
                                    features=[fv.slice(["SEP_LEN", 
                                                        "SEP_WIDTH",
                                                        "MY_NEW_FEATURE"]),
                                              fv2.slice(["MY_NEW_FEATURE2"])],)
training_data.read.to_snowpark_dataframe().limit(100).write.save_as_table("IRIS_TRAIN", mode="overwrite")

In [None]:
CREATE OR REPLACE MODEL MONITOR IRIS_MODEL_MONITOR_SQL
WITH
    MODEL=IrisDetector
    VERSION=baseline
    FUNCTION=predict
    SOURCE=IRIS_TRAIN
    BASELINE=IRIS_TRAIN
    TIMESTAMP_COLUMN=DUMMY_TS
    ID_COLUMNS=(IRIS_ID)
    WAREHOUSE=ALERTS_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';

In [None]:
show MODEL MONITORS;

In [None]:
iris = datasets.load_iris()
df_iris = pd.DataFrame(iris.data, columns=['SEP_LEN','SEP_WIDTH','PET_LEN','PET_WIDTH'])
df_iris['IRIS_TYPE'] = iris.target
df_iris = df_iris[df_iris['IRIS_TYPE'] == 2]
df_iris

In [None]:
# Get signature of the inference function in Python
# mv is a snowflake.ml.model.ModelVersion object
mv.show_functions()

mv.create_service(service_name="myservice",
                  service_compute_pool="TUTORIAL_COMPUTE_POOL",
                  ingress_enabled=True,
                  gpu_requests=None)

In [None]:
mv.run(
    feature_df,
    function_name="predict",
    service_name="my_service")