# End to End CLV Model Notebook

# Getting Started with Snowflake Feature Store
We will use the Use-Case to show how Snowflake Feature Store (and Model Registry) can be used to maintain & store features, retrieve them for training and perform micro-batch inference.

In the development (TRAINING) enviroment we will 
- create FeatureViews in the Feature Store that maintain the required customer-behaviour features.
- use these Features to train a model, and save the model in the Snowflake model-registry.
- plot the clusters for the trained model to visually verify. 

In the production (SERVING) environment we will
- re-create the FeatureViews on production data
- generate an Inference FeatureView that uses the saved model to perform incremental inference

# Feature Engineering & Model Training
#### Notebook Packages

In [None]:
# Python packages
import os
import json
import timeit

# SNOWFLAKE
# Snowpark
from snowflake.snowpark import Session, DataFrame, Window, WindowSpec

import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T

# Snowflake Feature Store
from snowflake.ml.feature_store import (
    FeatureView,
    Entity
)

### Set up helpful functions

In [None]:
!pip install sqlglot

In [None]:
import os
from snowflake.snowpark import Session, DataFrame, Window, WindowSpec
from os import listdir
from os.path import isfile, join
import json
import numpy as np
from snowflake.snowpark.version import VERSION
import snowflake.snowpark.functions as F
from snowflake.ml.dataset import Dataset
from snowflake.ml._internal.exceptions import (
    dataset_errors
)
from datetime import datetime, timedelta

import ast
def check_and_update(df, model_name):
    """
    Check and update the version numbering scheme for Model Registry 
    to get the next version number for a model.
    df         : dataframe from show_models
    model_name : model-name to acquire next version for
    """
    if "." in model_name:
        model_name = model_name.split(".")[-1]
    if df.empty:
        return "V_1"
    elif df[df["name"] == model_name].empty:
        return "V_1"
    else:
        # Increment model_version if df is not a pandas Series
        # The result is a Series where each element is a list (e.g., [ "V_1", "V_2" ])
        list_of_lists = df["versions"].apply(ast.literal_eval)

        # Step 2: Flatten the list of lists into a single list
        all_versions = [version for sublist in list_of_lists for version in sublist]

        # Extract only the number part from each string
        nums = [int(v.rsplit("_", 1)[-1]) for v in all_versions]
        nums.sort()
        
        # Work with the highest number
        last_num = nums[-1]
        new_last_value = f"V_{last_num + 1}"
        return new_last_value
    
def dataset_check_and_update(session, dataset_name, schema_name = None):
    """
    Check and update the version numbering scheme for Dataset
    to get the next version number for a dataset.
    session         : current session
    dataset_name : dataset_name to acquire next version for
    """
    if schema_name is None:
        schema_name = session.get_current_schema()
    full_name = session.get_current_database() + "." + schema_name + "." + dataset_name

    try:
        ds = Dataset.load(session=session, name=full_name)
        versions = ds.list_versions()
    except dataset_errors.DatasetNotExistError:
        return "V_1"

    if len(versions) == 0:
        return "V_1"
    else:
        # Extract only the number part from each string
        nums = [int(v.rsplit("_", 1)[-1]) for v in versions]
        nums.sort()
        
        # Work with the highest number
        last_num = nums[-1]
        new_last_value = f"V_{last_num + 1}"
        return new_last_value 

def get_latest(df, model_name):
    """
    Check and update the version numbering scheme for Model Registry 
    to get the next version number for a model.
    df         : dataframe from show_models
    model_name : model-name to acquire next version for
    """
    if df.empty:
        return "V_1"
    elif df[df["name"] == model_name].empty:
        return "V_1"
    else:
        # 1. Parse string to list
        raw_versions = ast.literal_eval(df["versions"][0])

        # 2. Sort based on the numeric suffix only
        # This removes the prefix context during comparison so "v_10" > "v_2"
        lst = sorted(raw_versions, key=lambda x: int(x.rsplit("_", 1)[-1]))

        # 3. Extract the highest value
        last_value = lst[-1]
        prefix, num = last_value.rsplit("_", 1)

        # 4. Increment the number
        new_last_value = f"{prefix}_{int(num) + 1}"
        return new_last_value 

import sqlglot
import sqlglot.optimizer.optimizer
def formatSQL (query_in:str, subq_to_cte = False):
    """
    Prettify the given raw SQL statement to nest/indent appropriately.
    Optionally replace subqueries with CTEs.
    query_in    : The raw SQL query to be prettified
    subq_to_cte : When TRUE convert nested sub-queries to CTEs
    """
    expression = sqlglot.parse_one(query_in)
    if subq_to_cte:
        query_in = sqlglot.optimizer.optimizer.eliminate_subqueries(expression).sql()
    return sqlglot.transpile(query_in, read='snowflake', pretty=True)[0]

from snowflake.ml.registry import Registry
from snowflake.ml._internal.utils import identifier  
def create_ModelRegistry(session, database, mr_schema = 'MODEL_1'):
    """
    Create Snowflake Model Registry if not exists and return as reference.
    session   : Snowpark session
    database  : Database to use for Model Registry
    mr_schema : Schema name to create/use for Model Registry
    """

    try:
        cs = session.get_current_schema()
        session.sql(f''' create schema {mr_schema} ''').collect()
        mr = Registry(session=session, database_name= database, schema_name=mr_schema)
        session.sql(f''' use schema {cs}''').collect()
    except:
        print(f"Model Registry ({mr_schema}) already exists")   
        mr = Registry(session=session, database_name= database, schema_name=mr_schema)
    else:
        print(f"Model Registry ({mr_schema}) created")

    return mr   

from snowflake.ml.feature_store import (FeatureStore,CreationMode) 
def create_FeatureStore(session, database, fs_schema, warehouse):
    """
    Create Snowflake Feature Store if not exists and return reference
    session   : Snowpark session
    database  : Database to use for Feature Store
    fs_schema : Schema name to ceate/use to check for Feature Store
    warehouse : Warehouse to use as default for Feature Store
    """

    try:
        fs = FeatureStore(session, database, fs_schema, warehouse, creation_mode=CreationMode.FAIL_IF_NOT_EXIST)
        print(f"Feature Store ({fs_schema}) already exists") 
    except:
        print(f"Feature Store ({fs_schema}) created")   
        fs = FeatureStore(session, database, fs_schema, warehouse, creation_mode=CreationMode.CREATE_IF_NOT_EXIST)

    return fs

def get_spine_df(dataframe):
    asof_date = datetime.now() 
    spine_sdf =  dataframe.feature_df.group_by('CUSTOMER_ID').agg( F.lit(asof_date.strftime('%Y-%m-%d')).as_('ASOF_DATE'))#.limit(10)
    spine_sdf = spine_sdf.with_column("col_1", F.lit("values1"))
    return spine_sdf

### Setup Snowflake connection and database parameters

In [None]:
session = get_active_session()
role_name = session.get_current_role()
database_name = session.get_current_database().strip('"')
schema_name = "DS"
warehouse_name = session.get_current_warehouse()

## MODEL DEVELOPMENT
* Create Snowflake Model-Registry
* Create Snowflake Feature-Store
* Establish and Create CUSTOMER Entity in the development Snowflake FeatureStore
* Create Source Data references and perform basic data-cleansing
* Create & Run Preprocessing Function to create features
* Create FeatureView_Preprocess from Preprocess Dataframe SQL
* Create training data from FeatureView_Preprocess (asof join)
* Create & Fit Snowpark-ml pipeline 
* Save model in Model Registry
* 'Verify' and approve model
* Create new FeatureView_Model_Inference with Transforms UDF + KMeans model

In [None]:
# Create/Reference Snowflake Model Registry - Common across Environments
mr = create_ModelRegistry(session, database_name, '_MODELLING')

# Create/Reference Snowflake Feature Store - Common across Environments
fs = create_FeatureStore(session, database_name, '_FEATURE_STORE', warehouse_name)


In [None]:
cust_tbl = '.'.join([database_name, schema_name,'CUSTOMERS'])
cust_sdf = session.table(cust_tbl)
print(cust_tbl, cust_sdf.count())
cust_sdf.limit(10).show()

### CUSTOMER Entity
Establish and Create CUSTOMER Entity in Snowflake FeatureStore for this Use-Case

In [None]:
if "CUSTOMER" not in json.loads(fs.list_entities().select(F.to_json(F.array_agg("NAME", True))).collect()[0][0]):
    customer_entity = Entity(name="CUSTOMER", join_keys=["CUSTOMER_ID"],desc="Primary Key for CUSTOMER ORDER")
    fs.register_entity(customer_entity)
else:
    customer_entity = fs.get_entity("CUSTOMER")

fs.list_entities().show()

 ### Create & Load Source Data
 Our Feature engineering pipelines are defined using Snowpark dataframes (or SQL expressions).  In the `QS_feature_engineering_fns.py` file we have created two feature engineering functions to create our pipeline :
* __uc01_load_data__(order_data: DataFrame, lineitem_data: DataFrame, order_returns_data: DataFrame) -> DataFrame   
* __uc01_pre_process__(data: DataFrame) -> DataFrame

`uc01_load_data`, takes the source tables, as dataframe objects, and joins them together, performing some data-cleansing by replacing NA's with default values. It returns a dataframe as it's output.

`uc01_pre_process`, takes the dataframe output from `uc01_load_data`  and performs aggregation on it to derive some features that will be used in our segmentation model.  It returns a dataframe as output, which we will use to provide the feature-pipeline definition within our FeatureView.

In this way we can build up a complex pipeline step-by-step and use it to derive a FeatureView, that will be maintained as a pipeline in Snowflake.

We will import the functions, and create dataframes from them using the dataframes we created earlier pointing to the tables in our TRAINING (Development) schema.  We will use the last dataframe we create at the end of the pipeline as our input to the FeatureView.


In [None]:
# Feature Engineering Functions
def uc01_load_data(customer_data: DataFrame, behavior_data: DataFrame) -> DataFrame:
    """
    Merges order, linetime and order_returns data and replaces Nulls/None with appropriate default values.
    customer_data      : A dataframe referencing the "CUSTOMER" table in the relevant schema
    behavior_data      : A dataframe referencing the "PURCHASE_BEHAVIOR" table in the relevant schema

    Returns            : Merged/cleansed dataframe with required columns
    """
    # Merge two dataframes
    raw_data =  customer_data.join(
            behavior_data,
            (customer_data["CUSTOMER_ID"] == behavior_data["CUSTOMER_ID"]),
            "left"
        )\
        .rename(
            {
                customer_data["UPDATED_AT"]: "CUSTOMER_UPDATED_AT",
                customer_data["CUSTOMER_ID"]: "CUSTOMER_ID",
                behavior_data["UPDATED_AT"]: "BEHAVIOR_UPDATED_AT"
            })

    return raw_data[[
        "CUSTOMER_ID",
        "AGE", 
        "GENDER",
        "STATE",
        "ANNUAL_INCOME", 
        "LOYALTY_TIER", 
        "TENURE_MONTHS", 
        "SIGNUP_DATE", 
        "CUSTOMER_UPDATED_AT", 
        "AVG_ORDER_VALUE", 
        "PURCHASE_FREQUENCY", 
        "RETURN_RATE", 
        "LIFETIME_VALUE", 
        "LAST_PURCHASE_DATE", 
        "TOTAL_ORDERS", 
        "BEHAVIOR_UPDATED_AT"
    ]]

def uc01_pre_process(data: DataFrame) -> DataFrame:
    """
    Performs model-agnostic Feature-Engineering to prepare data for model input for Customer Entity level features
    data         : A dataframe containing the merged/cleansed data from CUSTOMERS and PURCHASE_BEHAVIOR tables
    result       : Customer level model input features
    """
    # Round annual income to no decimals
    data = data\
        .with_column("ANNUAL_INCOME", F.round(F.col("ANNUAL_INCOME"), 0))
    
    # Generate new features from existing columns
    data = data.with_columns([
        "AVERAGE_ORDER_PER_MONTH",
        "DAYS_SINCE_LAST_PURCHASE", 
        "DAYS_SINCE_SIGNUP",
        "EXPECTED_DAYS_BETWEEN_PURCHASES",
        "DAYS_SINCE_EXPECTED_LAST_PURCHASE_DATE",
    ], [
        F.col("TOTAL_ORDERS") / F.col("TENURE_MONTHS"),
        F.datediff("day", F.col("LAST_PURCHASE_DATE"), F.col("BEHAVIOR_UPDATED_AT")),
        F.datediff("day", F.col("SIGNUP_DATE"), F.col("BEHAVIOR_UPDATED_AT")),
        F.lit(30) / F.col("PURCHASE_FREQUENCY"),
        F.round(F.datediff("day", F.col("LAST_PURCHASE_DATE"), F.col("BEHAVIOR_UPDATED_AT")) - F.lit((F.lit(30) / F.col("PURCHASE_FREQUENCY"))),0)
    ])

    return data


In [None]:
# Tables
cust_tbl                    = '.'.join([database_name, schema_name,'CUSTOMERS'])
behavior_tbl                = '.'.join([database_name, schema_name,'PURCHASE_BEHAVIOR'])

# Snowpark Dataframe
cust_sdf              = session.table(cust_tbl)
behavior_tbl          = session.table(behavior_tbl)

# Row Counts
print(f'''\nTABLE ROW_COUNTS IN {schema_name}''')
print(cust_tbl, cust_sdf.count())
print(behavior_tbl, behavior_tbl.count())

In [None]:
# Call load data feature engineering function
raw_data = uc01_load_data(cust_sdf, behavior_tbl)

# Format and print the SQL for the Snowpark Dataframe
rd_sql = formatSQL(raw_data.queries['queries'][0], True)
print(os.linesep.join(rd_sql.split(os.linesep)[:1000]))

In [None]:
raw_data.show()

### Create & Run Preprocessing Function 

In [None]:
preprocessed_data = uc01_pre_process(raw_data)
preprocessed_data.show()

In [None]:
# Format and print the SQL for the Snowpark Dataframe
ppd_sql = formatSQL(preprocessed_data.queries['queries'][0], True)
print(os.linesep.join(ppd_sql.split(os.linesep)[:1000]))

### Create Preprocessing FeatureView from Preprocess Dataframe (SQL)

In [None]:
# Define descriptions for the FeatureView's Features.  These will be added as comments to the database object
preprocess_features_desc = {  
   "AVERAGE_ORDER_PER_MONTH":"Average number of orders per month",
   "DAYS_SINCE_LAST_PURCHASE":"Days since last purchase",
   "DAYS_SINCE_SIGNUP":"Days since signup",
   "EXPECTED_DAYS_BETWEEN_PURCHASES":"Expected days between purchases",
   "DAYS_SINCE_EXPECTED_LAST_PURCHASE_DATE":"Days since expected last purchase date from LAST_PURCHASE_DATE"
}

ppd_fv_name    = "FV_PREPROCESS"
ppd_fv_version = "V_1"


# Create the FeatureView instance
fv_uc01_preprocess_instance = FeatureView(
  name=ppd_fv_name, 
  entities=[customer_entity], 
  feature_df=preprocessed_data,      # <- We can use the snowpark dataframe as-is from our Python
  # feature_df=preprocessed_data.queries['queries'][0],    # <- Or we can use SQL, in this case linted from the dataframe generated SQL to make more human readable
  timestamp_col="BEHAVIOR_UPDATED_AT",
  refresh_freq="60 minute",            # <- specifying optional refresh_freq creates FeatureView as Dynamic Table, else created as View.
  desc="Customer Modelling Features").attach_feature_desc(preprocess_features_desc)

# Register the FeatureView instance.  Creates  object in Snowflake
fv_uc01_preprocess = fs.register_feature_view(
  feature_view=fv_uc01_preprocess_instance, 
  version=ppd_fv_version, 
  block=True,     # whether function call blocks until initial data is available
  overwrite=True # whether to replace existing feature view with same name/version
)

spine = fv_uc01_preprocess

In [None]:
# You can also use the following to retrieve a Feature View instance for use within Python
FV_UC01_PREPROCESS_V_1 = fs.get_feature_view(ppd_fv_name, 'V_1')

# We can look at the FeatureView's contents with
FV_UC01_PREPROCESS_V_1.feature_df.sort(F.col("CUSTOMER_ID"), ascending=False).show(10)

### Create training data Dataset from FeatureView_Preprocess

In [None]:
# Create Spine
spine_sdf = get_spine_df(spine)
spine_sdf.sort('CUSTOMER_ID').show(5)

In [None]:
def generate_training_df(spine_sdf, feature_view, feature_store):
    dataset_name = 'TRAINING_DATASET'
    schema_name = feature_store.list_feature_views().to_pandas()['SCHEMA_NAME'][0]

    dataset_version = dataset_check_and_update(session, dataset_name, schema_name= schema_name)
    # Generate_Dataset
    training_dataset = feature_store.generate_dataset( 
        name = dataset_name,
        version = dataset_version,
        spine_df = spine_sdf, 
        features = [feature_view], 
        spine_timestamp_col = 'ASOF_DATE'
        )                                     
    # Create a snowpark dataframe reference from the Dataset
    training_dataset_sdf = training_dataset.read.to_snowpark_dataframe()
    
    return training_dataset_sdf, schema_name

In [None]:
# Generate_Dataset
training_dataset_sdf_v1, fv_schema_name = generate_training_df(spine_sdf, fv_uc01_preprocess, feature_store=fs)
# Display some sample data
training_dataset_sdf_v1.sort('CUSTOMER_ID').show(5)

In [None]:
training_dataset_sdf_v1.to_pandas().head()

In [None]:
import math
# ML
import pandas as pd
import xgboost as xgb
from sklearn.compose import ColumnTransformer
from sklearn.metrics import mean_absolute_error, mean_absolute_percentage_error, r2_score
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder, MinMaxScaler

# Snowpark
from snowflake.ml.data.data_connector import DataConnector
from snowflake.ml.registry import Registry as ModelRegistry
from snowflake.snowpark import Session, Row
from snowflake.ml.dataset import Dataset
from snowflake.ml.dataset import load_dataset
from snowflake.ml.experiment import ExperimentTracking
from snowflake.ml.experiment.callback.xgboost import SnowflakeXgboostCallback
from snowflake.ml.model.model_signature import infer_signature
from snowflake.snowpark.context import get_active_session


def create_data_connector(session, dataset_name) -> DataConnector:
    """Load data from Snowflake DataSet"""
    ds = Dataset.load(
        session=session, 
        name=dataset_name
    )
    ds_latest_version = str(ds.list_versions()[-1])
    ds_df = load_dataset(
        session, 
        dataset_name, 
        ds_latest_version
    )

    return DataConnector.from_dataset(ds_df)


def compare_params(input_d, extracted_d):
    ignore_keys = ['callbacks'] # Ignore complex objects
    mismatches = []
    
    for key, val in input_d.items():
        if key in ignore_keys: continue
            
        # Check if key exists in extraction
        if key not in extracted_d:
            mismatches.append(f"Missing key: {key}")
            continue
            
        ex_val = extracted_d[key]
        
        # Handle Float vs Int (63 vs 63.0) and NaNs
        if isinstance(val, (int, float)) and isinstance(ex_val, (int, float)):
            # Check for NaN in both (NaN != NaN in Python, so we must handle explicitly)
            if pd.isna(val) and pd.isna(ex_val):
                continue
            if not math.isclose(val, ex_val):
                mismatches.append(f"{key}: {val} (Input) != {ex_val} (Row)")
        
        # Standard comparison for strings/others
        elif val != ex_val:
            mismatches.append(f"{key}: {val} != {ex_val}")
            
    return mismatches

def generate_train_val_set(dataframe: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    """Generate train and validation dataset"""
    # Split data
    X = dataframe[[
        'AGE', 
        'GENDER',
        'LOYALTY_TIER', 
        'TENURE_MONTHS', 
        'AVG_ORDER_VALUE', 
        'PURCHASE_FREQUENCY', 
        'RETURN_RATE', 
        'TOTAL_ORDERS',
        'ANNUAL_INCOME', 
        'AVERAGE_ORDER_PER_MONTH', 
        'DAYS_SINCE_LAST_PURCHASE', 
        'DAYS_SINCE_SIGNUP', 
        'EXPECTED_DAYS_BETWEEN_PURCHASES',
        'DAYS_SINCE_EXPECTED_LAST_PURCHASE_DATE'
    ]]
    y = dataframe["LIFETIME_VALUE"]
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=42
    )
    print(f"Splitted data")

    # Combine features and target for each split
    train_df = pd.concat([X_train, y_train], axis=1)
    val_df = pd.concat([X_test, y_test], axis=1)
    return train_df, val_df

def build_pipeline(**model_params) -> Pipeline:
    """Create pipeline with preprocessors and model"""
    # Define column types
    ordinal_feature_cols = ['LOYALTY_TIER'] 
    categorical_feature_cols = ['GENDER'] 
    numerical_feature_cols = [
        'AGE',
        'TENURE_MONTHS',
        'AVG_ORDER_VALUE',
        'PURCHASE_FREQUENCY',
        'RETURN_RATE',
        'TOTAL_ORDERS',
        'ANNUAL_INCOME',
        'AVERAGE_ORDER_PER_MONTH',
        'DAYS_SINCE_LAST_PURCHASE',
        'DAYS_SINCE_SIGNUP',
        'EXPECTED_DAYS_BETWEEN_PURCHASES',
        'DAYS_SINCE_EXPECTED_LAST_PURCHASE_DATE'
    ] 
    passthrough_feature_cols = [] 
    
    # Create preprocessing steps
    tier_order = ['low', 'medium', 'high']
    explicit_categories = [tier_order]
    ordinal_encoder = OrdinalEncoder(categories=explicit_categories, dtype=int)

    preprocessor = ColumnTransformer(
        transformers=[
            ('NUM', MinMaxScaler(), numerical_feature_cols),
            ('CAT', OneHotEncoder(), categorical_feature_cols),
            ('ORD', ordinal_encoder, ordinal_feature_cols)
        ],
        remainder='passthrough',
    )

    model = xgb.XGBRegressor(**(model_params))

    return Pipeline([("preprocessor", preprocessor), ("regressor", model)])


def evaluate_model(model: Pipeline, X_test: pd.DataFrame, y_test: pd.DataFrame):
    """Evaluate model performance"""
    # Make predictions
    y_pred = model.predict(X_test)
    # Calculate metrics
    metrics = {
        "mean_absolute_error": mean_absolute_error(y_test, y_pred),
        "mean_absolute_percentage_error": mean_absolute_percentage_error(y_test, y_pred),
        "r2_score": r2_score(y_test, y_pred),
    }

    return metrics


def train():
    from snowflake.ml.modeling import tune
    from snowflake.ml.registry import Registry
    from snowflake.ml.model.volatility import Volatility
    
    session = get_active_session()
    # Get tuner context
    tuner_context = tune.get_tuner_context()
    params = tuner_context.get_hyper_params()
    dm = tuner_context.get_dataset_map()
    model_name = params.pop("model_name")
    mr_schema_name = params.pop("mr_schema_name")
    experiment_name = params.pop("experiment_name")
    
    # Initialize experiment tracking for this trial
    exp = ExperimentTracking(session=session, schema_name=mr_schema_name)
    exp.set_experiment(experiment_name)
    registry = Registry(session=session)

    with exp.start_run() as run:
        # Load data
        train_data = dm["train"].to_pandas()
        val_data = dm["val"].to_pandas()
    
        # Separate features and target
        X_train = train_data.drop('LIFETIME_VALUE', axis=1)
        y_train = train_data['LIFETIME_VALUE']
        X_val = val_data.drop('LIFETIME_VALUE', axis=1)
        y_val = val_data['LIFETIME_VALUE']
    
        # Train model
        sig = infer_signature(X_train, y_train)
        callback = SnowflakeXgboostCallback(
            exp, model_name=model_name, model_signature=sig
        )
        params['callbacks'] = [callback]
    
        model = build_pipeline(
            model_params=params
        )
        # Log model parameters with the log_param(...) or log_params(...) methods
        exp.log_params(params)
    
        print("Training model...", end="")
        model.fit(X_train, y_train)
    
        # Evaluate model
        print("Evaluating model...", end="")
        metrics = evaluate_model(
            model,
            X_val,
            y_val,
        )
        
        print("Log metrics...", end="")    
        # Log model metrics with the log_metric(...) or log_metrics(...) methods
        exp.log_metrics(metrics)

        metrics['run_name'] = run.name

        # Log model
        exp.log_model(
            model=model, 
            model_name=model_name, 
            version_name=run.name,
            sample_input_data=X_train
            # target_platforms=["WAREHOUSE", "SNOWPARK_CONTAINER_SERVICES"],
            # options={
            #     "volatility": Volatility.IMMUTABLE,
            #     # "enable_explainability": True
            # }
            # target_platforms=["WAREHOUSE","SNOWPARK_CONTAINER_SERVICES"]
        )

        # Report to HPO framework (optimize on validation F1)
        tuner_context.report(
            metrics=metrics,
            model=model
        )

In [None]:
from snowflake.ml.modeling import tune
from snowflake.ml.modeling.tune.search import RandomSearch, BayesOpt
def train_remote(
        source_dataset: str, 
        model_name: str, 
        mr_schema_name: str,
        experiment_name: str,
        session
    ):
    # Load data
    print("Loading data...", end="", flush=True)
    dc = create_data_connector(session, dataset_name=source_dataset)
    df = dc.to_pandas()

    print(f"Building train/val data")
    train_df, val_df = generate_train_val_set(df)

    # Create DataConnectors
    dataset_map = {
        "train": DataConnector.from_dataframe(session.create_dataframe(train_df)),
        "val": DataConnector.from_dataframe(session.create_dataframe(val_df)),
    }

    # Define search space for XGBoost (intentionally mixes strong and weak configs)
    search_space = {
        'mr_schema_name': mr_schema_name,
        'model_name': model_name,
        'experiment_name': experiment_name,
        # model capacity
        'max_depth': tune.choice([1, 4, 6, 10]),
        # learning rate: very small underfits, very large can explode
        'eta': tune.choice([0.01, 0.1, 0.8]),
        # boosting rounds: low for underfit, high for stronger models
        'n_estimators': tune.choice([10, 150, 500]),
        # row/column subsampling to vary bias/variance
        'subsample': tune.choice([0.5, 0.7, 1.0]),
        # regularization knobs to swing between over/under-fitting
        'reg_lambda': tune.choice([0.1, 1, 10]),
        'random_state': tune.choice([42]),
    }

    # Configure tuner (increase trials to see both good and bad configs)
    tuner_config = tune.TunerConfig(
        metric='mean_absolute_percentage_error',
        mode='min',
        search_alg=RandomSearch(),
        num_trials=10,
        
    )

    # Create tuner
    tuner = tune.Tuner(
        train_func=train,
        search_space=search_space, 
        tuner_config=tuner_config
    )

    print(f"HPO starting")
    results = tuner.run(dataset_map=dataset_map)
    print("HPO DONE")
    return results

In [None]:
results = train_remote(
    source_dataset=database_name+"."+fv_schema_name+"."+"TRAINING_DATASET",
    model_name = "_MODELLING.UC01_SNOWFLAKEML_RF_REGRESSOR_MODEL",
    mr_schema_name = "_MODELLING",
    experiment_name="MY_EXPERIMENT",
    session=session
)


# Productionise Best Model

In [None]:
best_result = results.best_result
best_run_name = best_result[best_result['run_name'].notnull()]['run_name'].iloc[0]
all_results = results.results
all_results

In [None]:
best_run = all_results[all_results["run_name"] == best_run_name]['run_name'].iloc[0]
best_model_name = all_results[all_results["run_name"] == best_run_name]['config/model_name'].iloc[0].split(".")[1]

# Initialize experiment tracking for this trial
model_object = mr.get_model(best_model_name)
model_versions = model_object.show_versions()
best_version = model_object.version(best_run)
best_version_df = model_versions[model_versions['name'] == best_run]
best_version_df

In [None]:
# Setting model for production there are 4 options:
# Using the default version
model_object.default = best_run

# Using aliases
try:
    best_version.set_alias("PROD")
except:
    pass

# Using tags
session.sql(f"""CREATE TAG {mr._database_name}.{mr._schema_name}.live_model_version;""")

In [None]:
model_object.set_tag(f"""{mr._database_name}.{mr._schema_name}.live_model_version""", best_run)

In [None]:
# Using multiple schemas
session.sql(f"""CREATE OR REPLACE SCHEMA {mr._database_name}.{"PROD_SCHEMA"};""")

In [None]:
session.sql(f"""
CREATE OR REPLACE MODEL {mr._database_name}.{"PROD_SCHEMA"}.{best_model_name}
WITH VERSION {best_run}
FROM MODEL {mr._database_name}.{mr._schema_name}.{best_model_name}
VERSION {best_run};
"""
)

In [None]:
print(model_object.show_tags())

In [None]:
# Initialize experiment tracking for this trial
model_object_v2 = mr.get_model(best_model_name)
model_versions_v2 = model_object_v2.show_versions()
best_version_v2 = model_versions_v2[model_versions_v2['name'] == best_run]
best_version_v2

In [None]:
model_object_v2.version("DEFAULT").create_service(service_name="clv_service",
                  service_compute_pool="CLV_MODEL_POOL_CPU",
                  ingress_enabled=True,
                  gpu_requests=None)

In [None]:
# Get signature of the inference function in Python
model_object_v2.version("DEFAULT").show_functions()
# Call the function in Python
service_prediction = model_object_v2.version("DEFAULT").run(
    training_dataset_sdf_v1,
    function_name="predict",
    service_name="clv_service")

In [None]:
def serve(featurevector, km4_purchases) -> DataFrame:
    return km4_purchases.run(
        featurevector, 
        function_name="predict",
        service_name="clv_service"
    )

# Test Inference process
preprocessed_data_df = fs.get_feature_view(ppd_fv_name, 'V_1').feature_df
inference_result_sdf = serve(
    preprocessed_data_df, 
    model_object_v2.version("DEFAULT")
).with_column_renamed('"output_feature_0"', "PREDICTION")


In [None]:
session.use_schema("DS")

inference_result_sdf.write.mode("overwrite").save_as_table("prediction_table")


In [None]:
session.table("prediction_table")

In [None]:
from snowflake.ml.monitoring.entities.model_monitor_config import ModelMonitorConfig, ModelMonitorSourceConfig

source_config = ModelMonitorSourceConfig(
    source="RETAIL_REGRESSION_DEMO.DS.prediction_table",
    timestamp_column="BEHAVIOR_UPDATED_AT",
    id_columns=["CUSTOMER_ID"],
    prediction_score_columns=["PREDICTION"],
    actual_score_columns=["LIFETIME_VALUE"],
)

# Set up config for ModelMonitor.
model_monitor_config = ModelMonitorConfig(
    model_version=model_object_v2.version("DEFAULT"),
    model_function_name="predict",
    background_compute_warehouse_name="COMPUTE_WH",
    refresh_interval="1 hour",
    aggregation_window="1 day"
)
mr.delete_monitor(f"""{best_model_name}_monitor""")

session.use_schema("_MODELLING")
# Add a new ModelMonitor
model_monitor = mr.add_monitor(
    name=f"""{best_model_name}_monitor""", 
    source_config=source_config,
    model_monitor_config=model_monitor_config,
)
model_monitor

## CLEAN UP

In [None]:
# session.close()

In [None]:
from datetime import datetime
from zoneinfo import ZoneInfo
formatted_time = datetime.now(ZoneInfo("Australia/Melbourne")).strftime("%A, %B %d, %Y %I:%M:%S %p %Z")

print(f"The last run time in Melbourne is: {formatted_time}")