# Imports

In [None]:
# Make sure to restart kernel after package installation
%pip install --q -e ../ ipywidgets

In [None]:
# Import python packages
import pandas as pd
import json
from datetime import date

# ML packages
import xgboost as xgb
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_recall_fscore_support

# Snowpark
from snowflake.snowpark import functions as F
from snowflake.snowpark import Window
from snowflake.snowpark.context import get_active_session

# Experiment Tracking API
from snowflake.ml.experiment.experiment_tracking import ExperimentTracking
from snowflake.ml.experiment.callback.xgboost import SnowflakeXgboostCallback

# Feature Store API
from snowflake.ml.feature_store import FeatureStore, CreationMode, Entity, FeatureView, OnlineConfig

# Model Registry API
from snowflake.ml.registry import Registry
from snowflake.ml.model.model_signature import infer_signature
from snowflake.ml.monitoring.entities.model_monitor_config import ModelMonitorConfig, ModelMonitorSourceConfig

# Demo-specific functions
import demo_functions

session = get_active_session()

database = 'AI_DEMOS'
schema = 'IOT_PREDICTIVE_MAINTENANCE'
warehouse = 'AI_WH'
current_date = date.today().isoformat()

session.use_database(database)

# Setup demo
demo_functions.setup(session, schema)

# 1 - Setup Feature Store and Model Registry

In [None]:
# Create a Feature Store
my_feature_store = FeatureStore(
    session=session,
    database=database,
    name=f"{schema}_FEATURE_STORE",
    default_warehouse=warehouse,
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)

# Create a Model Registry
my_model_registry = Registry(
    session=session, 
    database_name=database, 
    schema_name=f'{schema}_MODEL_REGISTRY', 
    options={'enable_monitoring':True}
)

# 2 - Explore Data

In [None]:
sensor_data = session.table(f'{database}.{schema}.MACHINE_SENSORS')
print('Sensor Data:')
display(sensor_data.limit(5))

machine_failures = session.table(f'{database}.{schema}.MACHINE_FAILURES')
print('Machine Failures:')
display(machine_failures.limit(5))

In [None]:
daily_sensor_data = (
    # daily aggregration of sensor values 
    sensor_data
        .with_column('DATE', F.date_trunc('DAY','SENSOR_TIMESTAMP').cast('date'))
        .group_by('MACHINE_ID','DATE')
        .agg(
            F.avg('SENSOR_1').alias('SENSOR_1_DAILY_AVERAGE'),
            F.avg('SENSOR_2').alias('SENSOR_2_DAILY_AVERAGE'),
            F.avg('SENSOR_3').alias('SENSOR_3_DAILY_AVERAGE')
        )
)

display(daily_sensor_data.limit(5))

## Visualize Data

In [None]:
# use built-in visualizations
daily_sensor_data.filter(F.col('MACHINE_ID') == 'MACHINE_0000')

In [None]:
viz_df = (
    daily_sensor_data.join(machine_failures, how='left', on=['MACHINE_ID','DATE'])
        .order_by('MACHINE_ID','DATE')
        .to_pandas()
)

viz_df.head()

In [None]:
# Use plotly to visualize machine data
demo_functions.plot_machine_data(viz_df, 'MACHINE_0000')

# 3 - Feature Engineering

In [None]:
n_lags = 3

lag_features = (
    daily_sensor_data.analytics.compute_lag(
        cols=['SENSOR_1_DAILY_AVERAGE','SENSOR_2_DAILY_AVERAGE','SENSOR_3_DAILY_AVERAGE'],
        lags=list(range(1,n_lags+1)),
        order_by=["DATE"],
        group_by=["MACHINE_ID"]
    )
    .drop(['SENSOR_1_DAILY_AVERAGE','SENSOR_2_DAILY_AVERAGE','SENSOR_3_DAILY_AVERAGE'])
)

display(lag_features.limit(10))

# 4 - Register Features

In [None]:
# Create Entity
machine_entity = Entity(
    name="MACHINE",
    join_keys=["MACHINE_ID"],
    desc="Unique Machine ID"
)

# Register Entity
my_feature_store.register_entity(machine_entity)

# Create Feature View
lag_features_fv = FeatureView(
    name='MACHINE_SENSORS_LAG_FEATURES',
    entities=[machine_entity],
    feature_df=lag_features,
    timestamp_col='DATE',
    refresh_freq='1 minute',
    refresh_mode='INCREMENTAL',
    online_config=OnlineConfig(enable=False),
    desc='Lag Features for Machine Sensors'
)

# Register Feature View
lag_features_fv = my_feature_store.register_feature_view(
    feature_view=lag_features_fv,
    version='1',
    overwrite=True
)

# 5 - Generate Training Dataset 

In [None]:
def generate_training_dataset(session, feature_store, training_start_date, training_end_date):
    # Get machines and timestamps
    machines_df = (
        session.table(f'{database}.{schema}.MACHINE_SENSORS')
            .with_column('DATE', F.date_trunc('DAY','SENSOR_TIMESTAMP').cast('date'))
            .filter(F.col('DATE').between(training_start_date, training_end_date))
            .select('MACHINE_ID','DATE')
            .distinct()
    )
    
    # Get machine failures and offset by 1 day
    machine_failures = (
        session.table(f'{database}.{schema}.MACHINE_FAILURES')
            .with_column('DATE', F.date_add(F.col('DATE'), F.lit(-1)))
            .rename({'FAILURE':'FAILURE_IN_1_DAY'})
    )
    
    # Create Spine DataFrame
    spine_df = machines_df.join(machine_failures, how='left', on=['MACHINE_ID','DATE']).order_by('MACHINE_ID','DATE').fillna(0, subset=['FAILURE_IN_1_DAY'])
    print('Created Spine DataFrame:')
    display(spine_df.limit(10))

    # Retrieve Features
    print('Generating Dataset ...')
    training_dataset = feature_store.generate_dataset(
        name=f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_DATASET',
        spine_df=spine_df,
        features=[lag_features_fv],
        spine_timestamp_col='DATE',
        spine_label_cols=['FAILURE_IN_1_DAY'],
        desc='Training dataset to predict machine failures.'
    )

    # View Training Data
    training_dataset_df = training_dataset.read.to_snowpark_dataframe()
    print('Created Dataset:')
    display(training_dataset_df.limit(10).to_pandas())
    return training_dataset_df

In [None]:
training_start_date = '2025-01-04'
training_end_date = '2025-04-01'

training_dataset_df = generate_training_dataset(session, my_feature_store, training_start_date, training_end_date)

# 6 - Train Model

In [None]:
def train_models(train_df, test_df, experiment_name):
    train_df = train_df.to_pandas()
    test_df = test_df.to_pandas()
    
    # Define target and features
    target_column = 'FAILURE_IN_1_DAY'
    unused_columns = [target_column,'MACHINE_ID','DATE']
    feature_columns = list(train_df.drop(unused_columns, axis=1).columns)
    
    X_train = train_df[feature_columns]
    y_train = train_df[target_column]
    
    X_test = test_df[feature_columns]
    y_test = test_df[target_column]

    # Setup Experiment Tracking
    exp = ExperimentTracking(session=session)
    exp.set_experiment(experiment_name)
    callback = SnowflakeXgboostCallback(exp, log_every_n_epochs=2, log_model=False)
    
    # Define models
    xgb_model = xgb.XGBClassifier(
        tree_method="hist", 
        callbacks=[callback], 
        eval_metric='aucpr',
        early_stopping_rounds=2,
        n_estimators=10
    )

    logreg_model = LogisticRegression(solver='liblinear', random_state=42) 

    training_results = {}
    training_results['feature_columns'] = feature_columns
    training_results['target_column'] = target_column
    training_results['xgboost'] = {'model':None, 'metrics':None}
    training_results['logisticregression'] = {'model':None, 'metrics':None}

    # XGBoost Model
    with exp.start_run(run_name='XGBoost_Classifier'):
        xgb_model = xgb_model.fit(
            X_train, 
            y_train, 
            eval_set=[(X_test, y_test)], 
            verbose=0
        )
        # Evaluate model on test data
        y_pred = xgb_model.predict(X_test)
        precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average='weighted', zero_division=0.0)
        metrics = {
            "precision": precision, 
            "recall": recall,
            "f1":f1
        }
        exp.log_metrics(metrics)
        training_results['xgboost']['model'] = xgb_model
        training_results['xgboost']['metrics'] = metrics

    # Logistic Regression Model
    with exp.start_run(run_name='Logistic_Regression'):
        logreg_model.fit(X_train, y_train)
        
        # Evaluate model on test data
        y_pred = logreg_model.predict(X_test)
        precision, recall, f1, _ = precision_recall_fscore_support(y_test, y_pred, average='weighted', zero_division=0.0)

        metrics = {
            "precision": precision, 
            "recall": recall,
            "f1": f1
        }
        exp.log_metrics(metrics)
        training_results['logisticregression']['model'] = logreg_model
        training_results['logisticregression']['metrics'] = metrics
    return training_results

In [None]:
train_dataset_df = training_dataset_df.filter(F.col('DATE').between('2025-01-04','2025-02-28'))
test_datatset_df = training_dataset_df.filter(F.col('DATE').between('2025-03-01','2025-04-01'))

training_results = train_models(train_dataset_df, test_datatset_df, experiment_name='PREDICTIVE_MAINTENANCE_MODELS')

# Retrieve model results
feature_columns = training_results['feature_columns']
target_column = training_results['target_column']
xgb_model = training_results['xgboost']['model']
metrics = training_results['xgboost']['metrics']

In [None]:
importance_df = pd.DataFrame({
    'feature': xgb_model.get_booster().feature_names,
    'importance': xgb_model.feature_importances_
}).sort_values(by='importance', ascending=True)

plotfig = importance_df.plot.barh(x='feature',y='importance', figsize=(4,4))

# 7 - Register Model to Model Registry

In [None]:
# Infer the Model signature from training data
signature = infer_signature(
    input_data=train_dataset_df.select(feature_columns).limit(100),
    output_data=train_dataset_df.select(target_column).rename({'FAILURE_IN_1_DAY':'FAILURE_IN_1_DAY_PREDICTION'}).limit(100)
)

# Register model
registered_model = my_model_registry.log_model(
    xgb_model,
    model_name="PREDICTIVE_MAINTENANCE_MODEL",
    version_name='V1',
    metrics=metrics,
    comment="Model trained using XGBoost to predict machine failures",
    conda_dependencies=['xgboost'],
    signatures={'predict':signature},
    sample_input_data=training_dataset_df.select(feature_columns).limit(100),
    options={"relax_version": True, "enable_explainability": False},
    target_platforms=['WAREHOUSE','SNOWPARK_CONTAINER_SERVICES']
)

In [None]:
# View the registered model
registered_model

# 8 - Set Model to Production

In [None]:
# Set alias for model
registered_model.set_alias('PRODUCTION')

# Retrieve the production model in your pipelines like this
production_model = my_model_registry.get_model('PREDICTIVE_MAINTENANCE_MODEL').version('PRODUCTION')

In [None]:
# Query the model lineage to retrieve required feature views
featureviews = production_model.lineage(direction='upstream')[0].lineage(domain_filter=['feature_view'], direction='upstream')
for featureview in featureviews:
    print(f'Feature View Name: {featureview.name}')
    print('Feature Names:')
    for feature in featureview.feature_names:
        print(feature)

# 9 - Test Model

In [None]:
baseline_predictions = production_model.run(test_datatset_df, function_name='predict').cache_result()
display(baseline_predictions.limit(10))

# 10 - Model Monitoring

In [None]:
# Generate baseline predictions
baseline_predictions.write.save_as_table(f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_MODEL_BASELINE_V1', mode='overwrite')
baseline_predictions.write.save_as_table(f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_MODEL_SOURCE_V1', mode='overwrite')

# Create model monitor
source_config = ModelMonitorSourceConfig(
    baseline=f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_MODEL_BASELINE_V1',
    source=f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_MODEL_SOURCE_V1',
    timestamp_column='DATE',
    id_columns=['MACHINE_ID'],
    prediction_class_columns=['FAILURE_IN_1_DAY_PREDICTION'],
    actual_class_columns=['FAILURE_IN_1_DAY']
)

monitor_config = ModelMonitorConfig(
    model_version=production_model,
    model_function_name='predict',
    background_compute_warehouse_name='AI_WH',
    refresh_interval='1 minute',
    aggregation_window='1 day'
)

model_monitor = my_model_registry.add_monitor(
    name=f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_MODEL_MM_V1',
    source_config=source_config,
    model_monitor_config=monitor_config
)

# 11 - Simulate Future Data and Model Predictions

In [None]:
#demo_functions.generate_machine_data(session, schema, start_date='2025-04-01', end_date='2025-10-31', mode='append')
demo_functions.generate_machine_data(session, schema, start_date='2025-04-01', end_date='2026-01-20', mode='append')

# we manually refresh the feature store for demo purposes
my_feature_store.refresh_feature_view(lag_features_fv)

In [None]:
def create_feature_df(start_date, end_date, feature_store):
    # Create spine dataframe
    spine_df = (
        session.table(f'{database}.{schema}.MACHINE_SENSORS')
            .with_column('DATE', F.date_trunc('DAY','SENSOR_TIMESTAMP').cast('date'))
            .filter(F.col('DATE').between(start_date, end_date))
            .select('MACHINE_ID','DATE')
            .distinct()
            .order_by('MACHINE_ID','DATE')
    )

    # Retrieve features
    feature_df = feature_store.retrieve_feature_values(
        spine_df=spine_df,
        features=[lag_features_fv],
        spine_timestamp_col="DATE"
    )

    return feature_df

#feature_df = create_feature_df(start_date='2025-04-01', end_date='2025-10-31', feature_store=my_feature_store)
feature_df = create_feature_df(start_date='2025-04-01', end_date='2026-01-20', feature_store=my_feature_store)
display(feature_df.limit(10))

In [None]:
# Score model
predictions = production_model.run(feature_df, function_name='predict').cache_result()

# Add actual machine failures
machine_failures = (
    session.table(f'{database}.{schema}.MACHINE_FAILURES')
        .with_column('DATE', F.date_add(F.col('DATE'), F.lit(-1)))
        .rename({'FAILURE':'FAILURE_IN_1_DAY'})
)

predictions = predictions.join(machine_failures, how='left', on=['MACHINE_ID','DATE']).fillna(0, subset=['FAILURE_IN_1_DAY'])

# Append predictions and actual to model monitor
predictions.write.save_as_table(
    f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_MODEL_SOURCE_V1', 
    mode='append', 
    column_order='name'
)

# 12 - Train a new model

## 12.1 Generate Dataset

In [None]:
training_start_date = '2025-06-01'
training_end_date = '2025-07-31'

training_dataset_df = generate_training_dataset(session, my_feature_store, training_start_date, training_end_date)

## 12.2 Train Model

In [None]:
train_dataset_df = training_dataset_df.filter(F.col('DATE').between('2025-06-01','2025-06-30'))
test_datatset_df = training_dataset_df.filter(F.col('DATE').between('2025-07-01','2025-07-31'))

training_results = train_models(train_dataset_df, test_datatset_df, experiment_name='PREDICTIVE_MAINTENANCE_MODELS_V2')

# Retrieve model results
feature_columns = training_results['feature_columns']
target_column = training_results['target_column']
xgb_model = training_results['xgboost']['model']
metrics = training_results['xgboost']['metrics']

## 12.3 Register Model

In [None]:
# Register model
registered_model = my_model_registry.log_model(
    xgb_model,
    model_name="PREDICTIVE_MAINTENANCE_MODEL",
    version_name='V2',
    metrics=metrics,
    comment="Model trained using XGBoost to predict machine failures",
    conda_dependencies=['xgboost'],
    signatures={'predict':signature},
    sample_input_data=training_dataset_df.select(feature_columns).limit(100),
    options={"relax_version": True, "enable_explainability": False},
    target_platforms=['WAREHOUSE','SNOWPARK_CONTAINER_SERVICES']
)

## 12.4 Replace Production Model

In [None]:
# Unset current production model
production_model.unset_alias('PRODUCTION')

# Set alias for model
registered_model.set_alias('PRODUCTION')

# Retrieve the production model in your pipelines like this
production_model = my_model_registry.get_model('PREDICTIVE_MAINTENANCE_MODEL').version('PRODUCTION')
production_model.show_metrics()

## 12.5 Create Model Monitor

In [None]:
# Generate baseline predictions
baseline_predictions = production_model.run(test_datatset_df, function_name='predict').cache_result()
baseline_predictions.write.save_as_table(f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_MODEL_BASELINE_V2', mode='overwrite')
baseline_predictions.write.save_as_table(f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_MODEL_SOURCE_V2', mode='overwrite')

# Create model monitor
source_config = ModelMonitorSourceConfig(
    baseline=f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_MODEL_BASELINE_V2',
    source=f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_MODEL_SOURCE_V2',
    timestamp_column='DATE',
    id_columns=['MACHINE_ID'],
    prediction_class_columns=['FAILURE_IN_1_DAY_PREDICTION'],
    actual_class_columns=['FAILURE_IN_1_DAY']
)

monitor_config = ModelMonitorConfig(
    model_version=production_model,
    model_function_name='predict',
    background_compute_warehouse_name='AI_WH',
    refresh_interval='1 minute',
    aggregation_window='1 day'
)

model_monitor = my_model_registry.add_monitor(
    name=f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_MODEL_MM_V2',
    source_config=source_config,
    model_monitor_config=monitor_config
)

## 12.6 Generate Predictions

In [None]:
# Retrieve features
#feature_df = create_feature_df(start_date='2025-08-01', end_date='2025-10-31', feature_store=my_feature_store)
feature_df = create_feature_df(start_date='2025-08-01', end_date='2026-01-20', feature_store=my_feature_store)

# Score model
predictions = production_model.run(feature_df, function_name='predict').cache_result()

# Add actual machine failures
machine_failures = (
    session.table(f'{database}.{schema}.MACHINE_FAILURES')
        .with_column('DATE', F.date_add(F.col('DATE'), F.lit(-1)))
        .rename({'FAILURE':'FAILURE_IN_1_DAY'})
)

predictions = predictions.join(machine_failures, how='left', on=['MACHINE_ID','DATE']).fillna(0, subset=['FAILURE_IN_1_DAY'])

# Append predictions and actuals to model monitor
predictions.write.save_as_table(
    f'{database}.{schema}_MODEL_REGISTRY.PREDICTIVE_MAINTENANCE_MODEL_SOURCE_V2', 
    mode='append', 
    column_order='name'
)

# 13 - Deploy Model as Inference Service in SPCS

In [None]:
# Create Inference Service
registered_model.create_service(
    service_name="my_pred_maintenance_prediction_service",
    service_compute_pool="SYSTEM_COMPUTE_POOL_CPU",
    ingress_enabled=True,
    gpu_requests=None
)

# 14 - Call Model via Python and REST-API

In [None]:
# Call the service in Python
service_prediction = registered_model.run(
    feature_df,
    function_name="predict",
    service_name="my_pred_maintenance_prediction_service"
)

display(service_prediction.limit(10))

In [None]:
endpoint = session.sql("SHOW ENDPOINTS IN SERVICE my_pred_maintenance_prediction_service").collect()
pd.DataFrame(endpoint)

In [None]:
import requests

# Get Programmatic Access Token
pat_token = session.sql("ALTER USER ADD PROGRAMMATIC ACCESS TOKEN demo_token;").collect()
pat_token = pat_token[0]['token_secret']

# Define URL and headers
URL = f"https://{endpoint[0]['ingress_url']}/predict"
headers = {"Authorization": f'Snowflake Token="{pat_token}"'}

# Prepare data to be sent
payload_data = {
    'data': []
}

# normal condition
payload_data['data'].append([0, 1,1,1,1,1,1,1,1,1])

# failure condition
payload_data['data'].append([
    1, 
    2.051990032196045,
    2.684382438659668,
    2.0562567710876465,
    1.9639095067977905,
    1.320631742477417,
    1.7546310424804688,
    1.9142093658447266,
    1.3605632781982422,
    1.601170897483825
])

print('Input Data:')
print(json.dumps(payload_data, indent=2))

print('Output:')
r = requests.post(URL, json=payload_data, headers=headers)
print(json.dumps(r.json(), indent=2))

# Remove PAT
_ = session.sql("ALTER USER REMOVE PROGRAMMATIC ACCESS TOKEN demo_token;").collect()

In [None]:
_ = session.sql("ALTER USER REMOVE PROGRAMMATIC ACCESS TOKEN demo_token;").collect()