# Primeros pasos con la Observabilidad de ML en Snowflake

## Resumen

MLOps es una función central de la ingeniería de ML y se enfoca en agilizar el proceso de llevar modelos de machine learning a producción, y luego mantenerlos y monitorearlos de manera efectiva. A diferencia del software tradicional, los modelos de machine learning pueden cambiar su comportamiento con el tiempo debido a varios factores, incluyendo la deriva de entrada (input drift), suposiciones obsoletas del entrenamiento del modelo, problemas en los pipelines de datos (data pipelines), y desafíos estándar como los entornos de hardware/software y el tráfico. Estos factores pueden llevar a una disminución en el rendimiento del modelo y a un comportamiento inesperado que necesita ser monitoreado muy de cerca.

Snowflake ML proporciona a las organizaciones un conjunto integrado de capacidades para machine learning de extremo a extremo en una única plataforma sobre datos gobernados. El comportamiento del modelo puede cambiar con el tiempo debido a la deriva de entrada (input drift), suposiciones de entrenamiento obsoletas, y problemas en los pipelines de datos (data pipelines), así como los factores habituales, incluyendo cambios en el hardware y software subyacentes y la naturaleza fluida del tráfico. La característica de Observabilidad de ML de Snowflake te permite rastrear la calidad de los modelos en producción que has desplegado a través del Snowflake Model Registry a través de múltiples dimensiones, como rendimiento, deriva y volumen.

Este repositorio contiene un notebook que te guía a través de la construcción, despliegue y monitoreo de un modelo de predicción de abandono de clientes (customer churn prediction) en Snowflake.

[Fuente](https://quickstarts.snowflake.com/guide/getting-started-with-ml-observability-in-snowflake/index.html#0)


![](https://www.snowflake.com/wp-content/themes/snowflake/assets/img/brand-guidelines/logo-sno-blue-example.svg)

# Construye, Despliega y Monitoriza tu Modelo en Snowflake

En esta laboratorio veremos cómo es un ciclo de vida completo de un modelo en Snowflake.
Utilizaremos las siguientes funcionalidades de Snowflake:

* Snowflake ML Python SDK
* Registro de Modelos
* Observabilidad de ML
* Alertas
* Monitorización de Deriva

![](https://drive.google.com/file/d/1jWryVEAjyetHMRgTTMo_bnx_BZRdeNuC/view?usp=sharing)

>**Caso de uso:** Su empresa está teniendo problamas con la pérdida de clientes a manos de la competencia. Deseas comprender la probabilidad de que cada uno de los clientes abandone, con el objetivo, de tomar las medidas necesarias para los usuarios con alta probabilidad de abandono. Con el tiempo, se observa una nueva tendencia en la pérdida de clientes que debe abordarse de inmediato para obtener una ventaja competitiva. La institución financiera aprovecha el panel de Observabilidad de ML para ver las métricas y los factores de precisión. Puede tomar medidas proactivas para reentrenar el modelo y también comparar diferentes versiones del modelo monitorizando el rendimiento del modelo.

### **Características**

* **CREDITSCORE:** Puntuación de crédito del cliente basada en su comportamiento y gestión crediticia histórica
* **GEOGRAPHY:** País de residencia
* **GENDER:** Género del cliente
* **AGE:** Edad del cliente
* **TENURE:** Duración en años que han sido cliente
* **BALANCE:** Saldo actual de su cuenta bancaria
* **NUMOFPRODUCTS:** Número de productos comprados al banco
* **HASCRCARD:** ¿Tiene el cliente tarjeta de crédito? - 1 si la tiene, 0 si no
* **ISACTIVEMEMBER:** ¿Ha utilizado el cliente su cuenta bancaria en los últimos 3 meses? - 1 si lo hizo, 0 si no lo hizo
* **ESTIMATEDSALARY:** Salario estimado del cliente
* **DEBTTOINCOME:** Relación deuda-ingresos

### **Modelo**

Construiremos un modelo de clasificación utilizando el framework XGBoost con la API de ML de Snowflake y registraremos el modelo en el registro. A lo largo del proceso, crearemos monitores de modelo y veremos el rendimiento del modelo en el panel de Snowsight.

#### Agrega los siguientes paquetes: `snowflake-ml-python`,`snowflake-snowpark-python`

In [58]:
# Import python packages
import streamlit as st
import pandas as pd

from snowflake.snowpark.context import get_active_session
session = get_active_session()
from datetime import datetime, timedelta
from snowflake.ml.registry import Registry
import joblib
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.ml.modeling.preprocessing as pp
from snowflake.ml.modeling.xgboost import XGBClassifier
from snowflake.snowpark.types import StringType, IntegerType
import snowflake.snowpark.functions as F
from snowflake.snowpark.functions import col, current_date, dateadd, random, floor,current_date, datediff
import warnings
warnings.filterwarnings('ignore')

session.query_tag = {"origin":"sf_sit-is", "name":"mlops_customerchurn", "version":{"major":1, "minor":0}}

import snowflake.snowpark.functions as F
from IPython.display import Markdown, display

solution_prefix = session.get_current_warehouse()
solution_prefix

### Cargar datos sintéticos del data_stage a una tabla de Snowflake usando un comando COPY INTO.

In [66]:
-- Create csv format
CREATE FILE FORMAT IF NOT EXISTS CSVFORMAT 
    SKIP_HEADER = 1 
    TYPE = 'CSV';
    
CREATE OR REPLACE STAGE data_stage
    FILE_FORMAT = (TYPE = 'CSV') 
    URL = 's3://sfquickstarts/sfguide_getting_started_with_ml_observability_in_snowflake/mlops_customerchurn.csv';
    
-- Inspect content of stage
LS @data_stage;


Total exited customers: 1714 (Target: ~2000)
   CustomerId  Surname  CreditScore Geography  Gender  Age  Tenure    Balance  \
0           1    Johns          402    France    Male   55       9   91944.03   
1           2  Schultz          735     Spain    Male   59       8  126536.56   
2           3    Jones          570     Spain    Male   54       7  191357.66   
3           4    Baker          406    France  Female   73       3  125263.00   
4           5  Aguirre          371     Spain    Male   88       9  195626.75   

   NumOfProducts  HasCrCard  IsActiveMember  EstimatedSalary  Exited  \
0              1          1               1         36899.18       0   
1              2          0               0         33120.74       0   
2              2          1               1         34751.09       1   
3              4          0               0        169844.77       0   
4              4          0               1         13787.72       0   

  TransactionTimestamp  debttoincom

### Leer un archivo CSV usando Snowpark desde un stage en Snowflake a un DataFrame.

In [67]:
spdf = session.read.options({"field_delimiter": ",",
                                    "field_optionally_enclosed_by": '"',
                                    "infer_schema": True,
                                    "parse_header": True}).csv("@data_stage")



In [None]:
from snowflake.snowpark.types import DecimalType, FloatType, DoubleType

# Get schema of the DataFrame
schema = spdf.schema.fields

# Identify columns that are of type NUMBER (DecimalType)
num_columns = [col.name for col in schema if isinstance(col.datatype, DecimalType)]

# Convert columns to FLOAT
for col in num_columns:
    spdf = spdf.with_column(col, spdf[col].cast(DoubleType()))



In [None]:
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, current_date, dateadd, to_date,lit

# Step 1: Get today's date
todays_date = datetime.now()

latest_date = max(spdf.select('TRANSACTIONTIMESTAMP').collect())[0]

# Step 3: Calculate the difference in days
diff_days = (todays_date - latest_date).days - 1

df = spdf.with_column(
    "TRANSACTIONTIMESTAMP", 
    dateadd("day", lit(diff_days), col("TRANSACTIONTIMESTAMP"))
)


df = df.with_column(
    "CREDITSCORE", col("CREDITSCORE").cast("float")
)
df = df.with_column(
    "PREDICTED_CHURN", F.lit(9999)
)
df.show()

df.write.mode("overwrite").save_as_table("CUSTOMERS")


In [75]:
spdf= df.drop('ROWNUMBER')



--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CUSTOMERID"  |"CREDITSCORE"  |"GEOGRAPHY"  |"GENDER"  |"AGE"  |"TENURE"  |"BALANCE"  |"NUMOFPRODUCTS"  |"HASCRCARD"  |"ISACTIVEMEMBER"  |"ESTIMATEDSALARY"  |"EXITED"  |"TRANSACTIONTIMESTAMP"  |"DEBTTOINCOME"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|1             |402            |France       |Male      |55     |9         |91944.03   |1                |1            |1                 |36899.18           |0         |2022-01-09 14:08:54     |23              |
|2             |735            |Spain        |Male      |59     |8         |126536.56  |2                |0            |0                 |33120.74 

#### Define un pipeline de preprocesamiento usando Pipeline con dos pasos: Ordinal Encoding (Codificación Ordinal) para columnas categóricas y Escalado Min-Max para columnas numéricas. Luego divide los datos en conjuntos de entrenamiento y prueba, aplica los pasos de preprocesamiento a los datos de entrenamiento y guarda el pipeline como un archivo joblib (preprocessing_pipeline.joblib).

In [None]:
num_cols = ['ESTIMATEDSALARY', 'BALANCE', 'CREDITSCORE','AGE','TENURE','DEBTTOINCOME']
output_cols=['EstimatedSalary_SS', 'Balance_SS', 'CreditScore_SS','Age_SS','Tenure_SS','debttoincome_SS']

cat_cols = ['HasCrCard', 'IsActiveMember', 'Geography','Gender', 'NumOfProducts']
string_columns = ['GEOGRAPHY', 'GENDER']
string_columns_oe = ['GEOGRAPHY_oe', 'GENDER_oe']
preprocessing_pipeline = Pipeline(
    steps=[
            (
                "OE",
                pp.OrdinalEncoder(
                    input_cols=string_columns,
                    output_cols=string_columns_oe,
                    drop_input_cols= False,
                )
                
            ),
            (
                "MMS",
                pp.MinMaxScaler(
                    clip=True,
                    input_cols=num_cols,
                    output_cols=output_cols,
                    drop_input_cols= False,
                )
            )
    ]
)

PIPELINE_FILE = '/tmp/preprocessing_pipeline.joblib'
joblib.dump(preprocessing_pipeline, PIPELINE_FILE) # We are just pickling it locally first
training, testing = spdf.random_split(weights=[0.8, 0.2], seed=111)
training_spdf = preprocessing_pipeline.fit(training).transform(training)
testing_spdf=preprocessing_pipeline.fit(testing).transform(testing)

#### Almacenar el archivo del pipeline en un stage

In [None]:
session.sql("CREATE or replace stage ML_STAGE").collect()
session.file.put(PIPELINE_FILE, "@ML_STAGE", overwrite=True)

In [None]:
ls @ML_STAGE

## Construir el modelo XGBClassifier y entrenarlo usando los datos de entrenamiento

In [83]:
num_cols = ['EstimatedSalary', 'Balance', 'CreditScore','Age','Tenure','debttoincome']

cat_cols = ['HasCrCard', 'IsActiveMember', 'GEOGRAPHY','GENDER', 'NumOfProducts']
Target = ["EXITED"]

feature_names_input = [c for c in training_spdf.columns if c not in ["EXITED", "TRANSACTIONTIMESTAMP", "CUSTOMERID","ESTIMATEDSALARY", "BALANCE", "CREDITSCORE","AGE","TENURE","DEBTTOINCOME","GEOGRAPHY","GENDER","PREDICTED_CHURN"]]


training_spdf = training_spdf.with_column(
    "CREDITSCORE_SS", col("CREDITSCORE_SS").cast("float")
)
output_label = ["PREDICTED_CHURN"]
# Initialize a XGBClassifier object with input, label, and output column names
model = XGBClassifier(
    input_cols=feature_names_input,
    label_cols=Target,
    output_cols=output_label
    
)

# Train the classifier model using the training set
_ = model.fit(training_spdf)



<snowflake.snowpark.dataframe.DataFrame at 0x33cc21a60>

### Inicializar el Snowflake Model Registry

Registrar y gestionar el modelo de aprendizaje automático entrenado en Snowflake.

Observa que task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION: Especifica que esta es una tarea de clasificación binaria (predecir el churn: Yes/No).

In [None]:
from snowflake.ml.registry import Registry
from snowflake.ml.model import type_hints

reg = Registry(session=session)

MODEL_NAME = "QS_CustomerChurn_classifier"
MODEL_VERSION = "v1"

mv = reg.log_model(model,
                   model_name=MODEL_NAME,
                   version_name=MODEL_VERSION,
                   options={'relax_version': True},
                   task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION)
reg.show_models()


## Inferencia Continua
La función de inferencia realiza predicciones utilizando un modelo preentranado de aprendizaje automático dentro de Snowflake. Esta función utiliza el pipeline de preprocesamiento creado anteriormente para asegurar que los datos se transformen de manera consistente.
✅ Ejecuta predicciones utilizando una versión registrada del modelo.
✅ Actualiza las predicciones directamente en la tabla de Snowflake.
✅ Maneja eficientemente la inferencia en batch con actualizaciones SQL.

In [None]:
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.ml.modeling.preprocessing as pp
import snowflake.snowpark.functions as F

def inference(table_name, modelname, modelversion) -> str:
    reg = Registry(session=session)
    m = reg.get_model(modelname)
    mv = m.version(modelversion)
    
    # Load preprocessing pipeline from a file
    session.file.get('@ML_STAGE/preprocessing_pipeline.joblib.gz', '/tmp')
    pipeline_file = '/tmp/preprocessing_pipeline.joblib.gz'
    
    
    preprocessing_pipeline = joblib.load(pipeline_file)
    
    df = session.table(table_name)
    
    # Apply preprocessing
    testing_spdf = preprocessing_pipeline.fit(df).transform(df)
    testing_spdf = testing_spdf.with_column(
    "CREDITSCORE_SS", col("CREDITSCORE_SS").cast("float")
)
    # Perform prediction
    results = mv.run(testing_spdf, function_name="predict")
    results =results.drop("CREDITSCORE_SS", "BALANCE_SS", "DEBTTOINCOME_SS", "TENURE_SS", "AGE_SS", "ESTIMATEDSALARY_SS", "GENDER_OE", "GEOGRAPHY_OE")
    #results.write.save_as_table("customer_churn", mode="overwrite")
    results.create_or_replace_temp_view("results_temp")
    update_statement = f"""
    UPDATE {table_name} t
    SET PREDICTED_CHURN = r.PREDICTED_CHURN
    FROM results_temp r
    WHERE t.CUSTOMERID = r.CUSTOMERID
    AND t.TRANSACTIONTIMESTAMP=r.TRANSACTIONTIMESTAMP;
"""

    # Execute the merge statement
    session.sql(update_statement).collect()
        
    return "Success"


Ejecuta el modelo entrenado sobre el DataFrame testing_spdf usando mv.run(). function_name="predict" especifica que la función a utilizar para la inferencia es "predict". La salida es un DataFrame que contiene las predicciones.

In [None]:
testing_spdf = testing_spdf.with_column(
    "CREDITSCORE_SS", col("CREDITSCORE_SS").cast("float")
)
# Perform prediction
results = mv.run(testing_spdf, function_name="predict")
results

## DATA DRIFT (DERIVA DE DATOS) Y OBSERVABILIDAD EN EL PANEL DE OBSERVABILIDAD DE ML


Ahora veamos cómo se puede monitorizar una deriva de datos y cómo una acción proactiva podría ayudar a la firma financiera a prevenir el abandono de clientes.

In [None]:
-- Create csv format
CREATE FILE FORMAT IF NOT EXISTS CSVFORMAT 
    SKIP_HEADER = 1 
    TYPE = 'CSV';
    
CREATE OR REPLACE STAGE data_stage
    FILE_FORMAT = (TYPE = 'CSV') 
    URL = 's3://sfquickstarts/sfguide_getting_started_with_ml_observability_in_snowflake/CUSTOMERS_DRIFTED.csv';
    
-- Inspect content of stage
LS @data_stage;




Los datos en el archivo CUSTOMERS_DRIFTED contienen nuevas tendencias de clientes que llevan a una baja precisión de la inferencia usando la versión v1 del modelo.

In [None]:
spdf = session.read.options({"field_delimiter": ",",
                                    "field_optionally_enclosed_by": '"',
                                    "infer_schema": True,
                                    "parse_header": True}).csv("@data_stage")

from snowflake.snowpark.types import DecimalType, FloatType

# Get schema of the DataFrame
schema = spdf.schema.fields

# Identify columns that are of type NUMBER (DecimalType)
num_columns = [col.name for col in schema if isinstance(col.datatype, DecimalType)]

# Convert columns to FLOAT
for col in num_columns:
    spdf = spdf.with_column(col, spdf[col].cast(FloatType()))


from snowflake.snowpark.functions import col, current_date, dateadd, to_date, lit,to_timestamp
from datetime import datetime

# Step 1: Get today's date
todays_date = datetime.now()

# Ensure TRANSACTIONTIMESTAMP is stored as a string first
spdf = spdf.with_column("TRANSACTIONTIMESTAMP", col("TRANSACTIONTIMESTAMP").cast("string"))

# Get the latest date from the dataset
latest_date_str = max(spdf.select('TRANSACTIONTIMESTAMP').collect())[0]

# Convert latest_date to datetime
latest_date = datetime.strptime(latest_date_str, '%m/%d/%y %H:%M')

# Step 3: Calculate the difference in days
diff_days = (todays_date - latest_date).days - 1

# Apply date adjustment
df = spdf.with_column(
    "TRANSACTIONTIMESTAMP",
    dateadd("day", lit(diff_days), to_timestamp(col("TRANSACTIONTIMESTAMP"), 'MM/DD/YY HH24:MI'))
)

# Cast CREDIT SCORE to float
df = df.with_column("CREDITSCORE", col("CREDITSCORE").cast("float"))

# Add PREDICTED_CHURN column
spdf_drift = df.with_column("PREDICTED_CHURN", lit(9999))

spdf_drift.show()


La firma financiera realiza las predicciones con la deriva de datos usando la versión v1 del modelo. Los datos derivados se guardan en la tabla CUSTOMERS_DRIFTED. Una copia de los mismos datos se guarda en la tabla CUSTOMERS_EVAL para mostrar cómo se monitorizó esa deriva y se reentrenó un nuevo modelo de forma proactiva para evitar la toma de decisiones inexactas.

In [None]:

current_columns = spdf_drift.columns 
new_columns = [col.strip('"') for col in current_columns] 

spdf_drift = spdf_drift.select([spdf_drift[col].alias(new_col) for col, new_col in zip(current_columns, new_columns)])
spdf_drift = spdf_drift.with_column(
    "CREDITSCORE", col("CREDITSCORE").cast("float")
)
spdf_drift.write.mode("overwrite").save_as_table("CUSTOMERS_DRIFTED")
spdf_drift.write.mode("overwrite").save_as_table("CUSTOMERS_EVAL")


# Habilitar la Monitorización
Crea un monitor para sus modelos utilizando el comando CREATE MODEL MONITOR. El objeto monitor refresca automáticamente los registros del monitor consultando los datos de origen y actualiza los informes de monitorización basándose en los registros. El primero que está comentado muestra el método Pythonic y la siguiente celda muestra cómo monitorizar usando SQL.

In [None]:
'''
reg = Registry(session=session,options={"enable_monitoring": True})
modelname='QS_CustomerChurn_classifier'
modelversion='v1'
m = reg.get_model(modelname)
mv = m.version(modelversion)

# Fetch model version that will be monitored
model_version = mv

from snowflake.ml.monitoring.entities.model_monitor_config import ModelMonitorConfig, ModelMonitorSourceConfig
source_config = ModelMonitorSourceConfig(
    source="CUSTOMERS_DRIFTED",
    baseline="CUSTOMERS",
    timestamp_column="TRANSACTIONTIMESTAMP",
    prediction_class_columns=["PREDICTED_CHURN"],
    actual_class_columns=["EXITED"],
    id_columns=["CUSTOMERID"],
)

# Set up config for ModelMonitor.
model_monitor_config = ModelMonitorConfig(
    model_version=model_version,
    model_function_name="predict",
    background_compute_warehouse_name="ml_wh",
)

# Add a new ModelMonitor
model_monitor = reg.add_monitor(
    name=f"CHURN_MODEL_MONITOR", 
    source_config=source_config,
    model_monitor_config=model_monitor_config,
)
model_monitor
'''

## Versión SQL para crear el monitor del modelo

In [None]:
query = f"""
CREATE OR REPLACE MODEL MONITOR CHURN_MODEL_MONITOR
WITH
    MODEL=QS_CustomerChurn_classifier
    VERSION=v1
    FUNCTION=predict
    SOURCE=CUSTOMERS_DRIFTED
    BASELINE=CUSTOMERS
    TIMESTAMP_COLUMN=TRANSACTIONTIMESTAMP
    PREDICTION_CLASS_COLUMNS=(PREDICTED_CHURN)  
    ACTUAL_CLASS_COLUMNS=(EXITED)
    ID_COLUMNS=(CUSTOMERID)
    WAREHOUSE=ML_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';
"""
session.sql(query).collect()


## Predecir el churn (abandono) para las nuevas tendencias de clientes

In [None]:
status= inference('CUSTOMERS_DRIFTED','QS_CUSTOMERCHURN_CLASSIFIER', 'v1');

Veamos cómo se ven las predicciones de abandono y las métricas. 

Abre el Panel navegando a Studio->Models.

Haz clic en tu modelo y elige el monitor que acabas de añadir arriba. Cambia el rango de fechas a "Últimos 3 meses".

In [None]:
-- Create csv format
CREATE FILE FORMAT IF NOT EXISTS CSVFORMAT 
    SKIP_HEADER = 1 
    TYPE = 'CSV';
    
CREATE OR REPLACE STAGE data_stage
    FILE_FORMAT = (TYPE = 'CSV') 
    URL = 's3://sfquickstarts/sfguide_getting_started_with_ml_observability_in_snowflake/CUSTOMERS_TRAINING.csv';
    
-- Inspect content of stage
LS @data_stage;


## Se puede encontrar que la deriva de datos y la deriva de concepto han impactado significativamente el rendimiento del modelo con el tiempo.

Para mantener la precisión, los modelos requieren reentrenamiento periódicamente. A continuación se muestra cómo se maneja esto en el pipeline de predicción de abandono de clientes. Ahora reentrenemos los datos con las nuevas tendencias.

In [None]:
spdf = session.read.options({"field_delimiter": ",",
                                    "field_optionally_enclosed_by": '"',
                                    "infer_schema": True,
                                    "parse_header": True}).csv("@data_stage")

from snowflake.snowpark.types import DecimalType, FloatType

# Get schema of the DataFrame
schema = spdf.schema.fields

# Identify columns that are of type NUMBER (DecimalType)
num_columns = [col.name for col in schema if isinstance(col.datatype, DecimalType)]

# Convert columns to FLOAT
for col in num_columns:
    spdf = spdf.with_column(col, spdf[col].cast(FloatType()))


from snowflake.snowpark.functions import col, current_date, dateadd, to_date, lit,to_timestamp
from datetime import datetime

# Step 1: Get today's date
todays_date = datetime.now()

# Ensure TRANSACTIONTIMESTAMP is stored as a string first
spdf = spdf.with_column("TRANSACTIONTIMESTAMP", col("TRANSACTIONTIMESTAMP").cast("string"))

# Get the latest date from the dataset
latest_date_str = max(spdf.select('TRANSACTIONTIMESTAMP').collect())[0]

# Convert latest_date to datetime
latest_date = datetime.strptime(latest_date_str, '%m/%d/%y %H:%M')

# Step 3: Calculate the difference in days
diff_days = (todays_date - latest_date).days - 1

# Apply date adjustment
df = spdf.with_column(
    "TRANSACTIONTIMESTAMP",
    dateadd("day", lit(diff_days), to_timestamp(col("TRANSACTIONTIMESTAMP"), 'MM/DD/YY HH24:MI'))
)

# Cast CREDIT SCORE to float
df = df.with_column("CREDITSCORE", col("CREDITSCORE").cast("float"))

# Add PREDICTED_CHURN column
spdf_drift = df.with_column("PREDICTED_CHURN", lit(9999))

spdf_drift.show()

current_columns = spdf_drift.columns 
new_columns = [col.strip('"') for col in current_columns] 

spdf_drift = spdf_drift.select([spdf_drift[col].alias(new_col) for col, new_col in zip(current_columns, new_columns)])
spdf_drift = spdf_drift.with_column(
    "CREDITSCORE", col("CREDITSCORE").cast("float")
)


spdf_drift.write.mode("overwrite").save_as_table("CUSTOMERS_TRAINING")


In [None]:
# Load preprocessing pipeline from a file
session.file.get('@ML_STAGE/preprocessing_pipeline.joblib.gz', '/tmp')
pipeline_file = '/tmp/preprocessing_pipeline.joblib.gz'


preprocessing_pipeline = joblib.load(pipeline_file)

# Apply preprocessing
training_spdf = preprocessing_pipeline.fit(spdf_drift).transform(spdf_drift)
training_spdf = training_spdf.with_column(
"CREDITSCORE_SS", col("CREDITSCORE_SS").cast("float")
)

num_cols = ['EstimatedSalary', 'Balance', 'CreditScore','Age','Tenure','debttoincome']

cat_cols = ['HasCrCard', 'IsActiveMember', 'GEOGRAPHY','GENDER', 'NumOfProducts']
Target = ["EXITED"]

feature_names_input = [c for c in training_spdf.columns if c not in ["EXITED", "TRANSACTIONTIMESTAMP", "CUSTOMERID","ESTIMATEDSALARY", "BALANCE", "CREDITSCORE","AGE","TENURE","DEBTTOINCOME","GEOGRAPHY","GENDER","PREDICTED_CHURN","DATASET_TYPE"]]

output_label = ["PREDICTED_CHURN"]
# Initialize a XGBClassifier object with input, label, and output column names
model = XGBClassifier(
    input_cols=feature_names_input,
    label_cols=Target,
    output_cols=output_label
    
)

# Train the classifier model using the training set
_ = model.fit(training_spdf)


## Registrar el modelo como una nueva versión V2 con el mismo nombre de modelo QS_CustomerChurn_classifier

In [None]:
from snowflake.ml.registry import Registry
from snowflake.ml.model import type_hints

reg = Registry(session=session)

MODEL_NAME = "QS_CustomerChurn_classifier"
MODEL_VERSION = "v2"

mv = reg.log_model(model,
                   model_name=MODEL_NAME,
                   version_name=MODEL_VERSION,
                   options={'relax_version': True},
                   task=type_hints.Task.TABULAR_BINARY_CLASSIFICATION)
reg.show_models()



## Crear un monitor para la nueva versión del modelo

In [None]:
query = f"""
CREATE OR REPLACE MODEL MONITOR CHURN_MODEL_MONITOR_NEW
WITH
    MODEL=QS_CustomerChurn_classifier
    VERSION=v2
    FUNCTION=predict
    SOURCE=CUSTOMERS_EVAL
    BASELINE=CUSTOMERS_TRAINING
    TIMESTAMP_COLUMN=TRANSACTIONTIMESTAMP
    PREDICTION_CLASS_COLUMNS=(PREDICTED_CHURN)  
    ACTUAL_CLASS_COLUMNS=(EXITED)
    ID_COLUMNS=(CUSTOMERID)
    WAREHOUSE=ML_WH
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';
"""
session.sql(query).collect()

## Predecir el abandono con los nuevos datos de clientes utilizando el modelo reentrenado

In [None]:
status= inference('CUSTOMERS_EVAL','QS_CUSTOMERCHURN_CLASSIFIER', 'v2');

### Recuperar los resúmenes estadísticos de una característica, etiqueta o predicción del modelo de un modelo monitorizado a lo largo del tiempo.
🔹 Caso de Uso: Ayuda a analizar tendencias en el rendimiento del modelo, el comportamiento de las características y la distribución de las predicciones. El Nombre de la Métrica podría ser {'COUNT', 'COUNT_NULL'}
### La granularidad puede ser de cualquier forma ‘<num> {DAY, WEEK, MONTH, QUARTER, YEAR}’, ALL o NULL

In [None]:

SELECT * FROM 
TABLE(
MODEL_MONITOR_STAT_METRIC(
'CHURN_MODEL_MONITOR', 'COUNT', 'PREDICTED_CHURN', '1 DAY', TO_TIMESTAMP_TZ('2024-11-01'), TO_TIMESTAMP_TZ('2025-02-06'))
) as a
JOIN (SELECT * FROM 
TABLE(
MODEL_MONITOR_STAT_METRIC(
'CHURN_MODEL_MONITOR_NEW', 'COUNT', 'PREDICTED_CHURN', '1 DAY', TO_TIMESTAMP_TZ('2024-11-01'), TO_TIMESTAMP_TZ('2025-02-06'))
)) as b ON a.EVENT_TIMESTAMP = b.EVENT_TIMESTAMP;


### Calcular métricas de deriva para una característica, etiqueta o predicción del modelo especificada durante un período de tiempo dado.
Esto ayuda a detectar cambios en las distribuciones de datos (deriva de características) o cambios en las predicciones (deriva de concepto).

In [None]:

SELECT * FROM TABLE(MODEL_MONITOR_DRIFT_METRIC(
'CHURN_MODEL_MONITOR_NEW', 'DIFFERENCE_OF_MEANS', 'PREDICTED_CHURN', '1 DAY', TO_TIMESTAMP_TZ('2025-02-01'), TO_TIMESTAMP_TZ('2025-02-04')))

### Propósito: Obtiene métricas de rendimiento para un modelo monitorizado en un rango de tiempo especificado.
🔹 Caso de Uso: Permite realizar un seguimiento de cómo ha evolucionado el rendimiento del modelo (por ejemplo, caídas de precisión o degradación del rendimiento).

In [None]:
SELECT * FROM TABLE(MODEL_MONITOR_PERFORMANCE_METRIC(
'CHURN_MODEL_MONITOR_NEW', 'PRECISION', '1 DAY', TO_TIMESTAMP_TZ('2024-11-01'), TO_TIMESTAMP_TZ('2025-02-05')))

### Configuración de Alertas

Configurar Alertas para recibir notificaciones cuando una métrica determinada supera el límite de umbral.

In [None]:
query=f'''CREATE or replace TABLE TEST_NOTIFICATION(
    notification varchar (100),
    created_at timestamp
);'''

session.sql(query).collect()

In [None]:
CREATE OR REPLACE ALERT high_drift_alert
    WAREHOUSE = ML_WH
    SCHEDULE = '60 minutes'
    IF ( EXISTS (SELECT * FROM TABLE(MODEL_MONITOR_DRIFT_METRIC(
    'CHURN_MODEL_MONITOR', 'DIFFERENCE_OF_MEANS', 'PREDICTED_CHURN', '1 MONTH', TO_TIMESTAMP_TZ('2024-01-01'), TO_TIMESTAMP_TZ('2025-02-04')))))
    THEN
        INSERT INTO TEST_NOTIFICATION (notification, created_at) VALUES ('ALERT',(SELECT CURRENT_TIMESTAMP));

## Fin del Notebook