In [None]:
import snowflake.snowpark as snowpark
from snowflake.snowpark.functions import row_number, col
from snowflake.snowpark.window import Window

### Example of splitting a Snowflake Dataframe without shuffling 
Good for ordered data like time series

In [None]:

def main(session: snowpark.Session): 
    tableName = 'silver.daily_revenue'
    df = session.table(tableName)
    window_spec = Window.order_by(col("TRANSACTION_DATE"))
    df_with_index = df.with_column("row_num", row_number().over(window_spec))
    total_rows = df_with_index.count()
    train_size = int(total_rows * 0.7)
    train_df = df_with_index.filter(col("row_num") <= train_size).drop("row_num")
    test_df = df_with_index.filter(col("row_num") > train_size).drop("row_num")
    
    return train_df


## Create SPROC for model training
Creates a Stored to procedure that trains a model and saves it to the model registry

To be used in Snowflake Python Worksheet

All variables that must be changed is tagged with #edit

### Without trace

In [None]:
from snowflake.snowpark import Session
import snowflake.snowpark
import snowflake.snowpark.functions as F
from snowflake.snowpark.functions import sproc
import snowflake.snowpark.types as T

# Snowpark ML
from snowflake.ml.registry import registry
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.xgboost import XGBRegressor
import snowflake.ml.modeling.preprocessing as snowmlpp
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error, mean_squared_error


sproc_name = ''
stage_name = ''
@sproc(name=sproc_name, #edit
       stage_location=stage_name,  #edit
       is_permanent=True, 
       replace=True, 
       packages=[
        "snowflake-snowpark-python",
        'snowflake-ml-python', 
        'xgboost',
        'pandas', 
         ])
def train_and_save_model(session: Session, source_table: str, major_version: bool = True) -> str:
    # setting variables 
    model_name = 'silver.daily_revenue_test'
    train_vw_name =  'silver.vw_daily_revenue_train'
    target_cols = ['TOTAL_AMOUNT'] 
    output_cols = ['PREDICTED_AMOUNT']    
    # read in training and test data 
    train_df = session.table(train_vw_name)
    test_df = session.table(test_df)
    
    #Join in date features
    ts_features = session.table('Retail_demo.Silver.Date_features')
    train_df = train_df.join(ts_features, ['TRANSACTION_DATE'])
    train_df = train_df.drop("Transaction_date")

    test_df = test_df.join(ts_features, ['TRANSACTION_DATE'])
    test_df = test_df.drop("Transaction_date")

    other_cols = [i for i in train_df.schema.names if i not in target_cols]
    numeric_types = [T.DecimalType, T.DoubleType, T.FloatType, T.IntegerType, T.LongType]
    features = [col.name for col in train_df.schema.fields if (type(col.datatype) in numeric_types) and (col.name in other_cols)]
 
    model_pipe = Pipeline(
                            steps=[
                                ('grid_search_reg', GridSearchCV(estimator=XGBRegressor(),
                                                                    param_grid={ "n_estimators":[50, 100, 200], # 25
                                                                                "learning_rate":[0.01, 0.1, 0.5 ], # .5
                                                                                },
                                                                    n_jobs = -1,
                                                                    scoring="neg_mean_squared_error",
                                                                    input_cols=features,
                                                                    label_cols=target_cols,
                                                                    output_cols=output_cols
                                                                    )
                                )
                            ]      
                        )
    model_pipe.fit(train_df)
    results = model_pipe.predict(test_df)

    mape = mean_absolute_percentage_error(df=results, y_true_col_names=target_cols, y_pred_col_names=output_cols)
    mse = mean_squared_error(df=results, y_true_col_names=target_cols, y_pred_col_names=output_cols)
    def set_model_version(registry_object,model_name, major_version=True):
        import numpy as np
        import json
            
        model_list = registry_object.show_models()
        model_list_filter = model_list[model_list['name'] ==  model_name]
        if (len(model_list) == 0) or (len(model_list_filter) == 0):
            return 'V1'
        version_list_string = model_list_filter['versions'].iloc[0]
        version_list = json.loads(version_list_string)
        version_numbers = [float(s.replace('V', '')) for s in version_list]
        model_last_version = max(version_numbers)
                
        if np.isnan(model_last_version) == True:
            model_new_version = 'V1'
        elif np.isnan(model_last_version) == False and major_version == True:
            model_new_version = round(model_last_version + 1,2)
            model_new_version = 'V' + str(model_new_version)
            
        else:
            model_new_version = round(model_last_version + .1,2)
            model_new_version = 'V' + str(model_new_version)
                    
        return model_new_version 
    try:
        model_registry = registry.Registry(session=session, database_name=session.get_current_database(), schema_name='ML_PIPE')

    except Exception as e:
        return (f'Error with creating model registry object: {e}')
                
    try:
        target_cols = ['TOTAL_AMOUNT'] #edit
        feature_cols = [i for i in test_df.schema.names if i not in target_cols]
        X = train_df.select(feature_cols).limit(100)
        model_name = model_name
        version_name = set_model_version(model_registry, model_name, major_version=major_version)
        model_version = model_registry.log_model(
            model = model_pipe, 
            model_name = model_name, 
            version_name= f'"{version_name}"',
            sample_input_data=X,
            conda_dependencies=['snowflake-snowpark-python','snowflake-ml-python','scikit-learn', 'xgboost']
            )

        model_version.set_metric(metric_name='mean_abs_pct_err', value=mape)
        model_version.set_metric(metric_name='mean_sq_err', value=mse)
                
    except Exception as e:
        return (f'Error with saving model to registry: {e}')
    try:
        session.sql(f'alter model {model_name} set default_version = "{version_name}";')
    except Exception as e:
        return (f'Error with setting default version: {e}')

    return f'Model {model_name} has been logged with version {version_name} and has a MAPE of {mape} and MSE of {mse}'

### With trace

In [None]:
# Imports and create a session
from snowflake.snowpark import Session
import snowflake.snowpark
import snowflake.snowpark.functions as F
from snowflake.snowpark.functions import sproc
import snowflake.snowpark.types as T
from snowflake import telemetry

from opentelemetry import trace

# Snowpark ML
from snowflake.ml.registry import registry

from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.xgboost import XGBRegressor
import snowflake.ml.modeling.preprocessing as snowmlpp
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error, mean_squared_error


sproc_name = ''
stage_name = ''
@sproc(name=sproc_name, #edit
       stage_location=stage_name,  #edit
       is_permanent=True, 
       replace=True, 
       packages=[
        "snowflake-snowpark-python",
        'snowflake-ml-python', 
        'xgboost',
        'pandas', 
        "snowflake-telemetry-python",  # Required for tracing
        "opentelemetry-api" # Required for tracing
        ])
def train_and_save_model(session: Session, source_table: str, major_version: bool = True) -> str:
    model_name = ''
    train_df = ''
    test_df = ''
    
    tracer = trace.get_tracer("daily_revenue.train")
    with tracer.start_as_current_span(sproc_name) as main_span: #edit
        try:
            telemetry.set_span_attribute("model.name", model_name)
            # Data loading
            with tracer.start_as_current_span("data_loading"):
                try:
                    df = session.table(source_table)#.limit(100000) # only need to limit if the data is huge
                    telemetry.set_span_attribute("data.row_count", df.count()) 
                except Exception as e:
                    return (f'Error with getting table data: {e}')

            with tracer.start_as_current_span("feature_engineering"):
                target_cols = ['TOTAL_AMOUNT'] #edit
                feature_cols = [i for i in df.schema.names if i not in target_cols]
                output_cols = ['PREDICTED_AMOUNT'] #edit

                # Define Snowflake numeric types (possibly for scaling, ordinal encoding)
                '''
                numeric_types = [T.DecimalType, T.DoubleType, T.FloatType, T.IntegerType, T.LongType]
                numeric_columns = [col.name for col in df.schema.fields if (type(col.datatype) in numeric_types) and (col.name in feature_cols)]
                '''
                # Define Snowflake categorical types and determine which columns to OHE
                '''
                categorical_types = [T.StringType]
                cols_to_ohe = [col.name for col in df.schema.fields if (type(col.datatype) in categorical_types)]
                ohe_cols_output = [col + '_OHE' for col in cols_to_ohe]
                '''
                # Standardize the values in the rows by removing spaces, capitalizing
                def fix_values(columnn):
                        return F.upper(F.regexp_replace(F.col(columnn), '[^a-zA-Z0-9]+', '_'))
                
                '''
                try:
                    for col in cols_to_ohe:
                            df = df.na.fill('NONE', subset=col)
                            df = df.withColumn(col, fix_values(col))
                    telemetry.add_event("feature_engineering_complete")

                except Exception as e:
                    return (f'Error with standardizing values: {e}')
                '''
                telemetry.add_event("feature_engineering_complete")

            # Model training
            with tracer.start_as_current_span("define_pipeline"):
                try:
                    pipe = Pipeline(
                        steps=[
                            #('imputer', SimpleImputer(input_cols=all_cols)),
                            #('mms', snowmlpp.MinMaxScaler(input_cols=cols_to_scale, output_cols=scale_cols_output)),
                            #('ohe', snowmlpp.OneHotEncoder(input_cols=cols_to_ohe, output_cols=ohe_cols_output, drop_input_cols=True)),
                            ('grid_search_reg', GridSearchCV(estimator=XGBRegressor(),
                                                                param_grid={ "n_estimators":[50, 100, 200], # 25
                                                                            "learning_rate":[0.01, 0.1, 0.5 ], # .5
                                                                            },
                                                                n_jobs = -1,
                                                                scoring="neg_mean_squared_error",
                                                                input_cols=feature_cols,
                                                                label_cols=target_cols,
                                                                output_cols=output_cols
                                                                )
                            )
                        ]      
                    )

                except Exception as e:
                    return (f'Error with defining the pipeline: {e}')
            '''
            with tracer.start_as_current_span("train_test_split"):
                # Split the data into training and testing
                train_df, test_df = df.randomSplit([0.8, 0.2], seed=42)
            '''

            with tracer.start_as_current_span("fit_pipeline"):
                try:
                    pipe.fit(train_df)
                    telemetry.set_span_attribute("training.param_grid", "Fitting done")
                except Exception as e:
                    return (f'Error with fitting pipeline: {e}')

            # Model evaluation
            with tracer.start_as_current_span("model_evaluation"):
                try:
                    results = pipe.predict(test_df)
                except Exception as e:
                    return (f'Error with predicting with pipeline: {e}')

                # Use Snowpark ML metrics to calculate MAPE and MSE
                mape = mean_absolute_percentage_error(df=results, y_true_col_names=target_cols, y_pred_col_names=output_cols)
                mse = mean_squared_error(df=results, y_true_col_names=target_cols, y_pred_col_names=output_cols)
                telemetry.set_span_attribute("model.mape", mape)
                telemetry.set_span_attribute("model.mse", mse)

            # Model registration
            with tracer.start_as_current_span("model_registration"):
                def set_model_version(registry_object,model_name, major_version=True):
                    # See what we've logged so far, dynamically set the model version
                    import numpy as np
                    import json
                
                    model_list = registry_object.show_models()
                    if len(model_list) == 0:
                        return 'V1'
                    
                    model_list_filter = model_list[model_list['name'] ==  model_name]
                    if len(model_list_filter) == 0:
                        return 'V1'

                    version_list_string = model_list_filter['versions'].iloc[0]
                    version_list = json.loads(version_list_string)
                    version_numbers = [float(s.replace('V', '')) for s in version_list]
                    model_last_version = max(version_numbers)
                    
                    
                    if np.isnan(model_last_version) == True:
                        model_new_version = 'V1'

                    elif np.isnan(model_last_version) == False and major_version == True:
                        model_new_version = round(model_last_version + 1,2)
                        model_new_version = 'V' + str(model_new_version)
                        
                    else:
                        model_new_version = round(model_last_version + .1,2)
                        model_new_version = 'V' + str(model_new_version)
                        
                    return model_new_version # This is the version we will use when we log the new model.

                # Create model regisry object
                try:
                    model_registry = registry.Registry(session=session, database_name=session.get_current_database(), schema_name='ML_PIPE')

                except Exception as e:
                    return (f'Error with creating model registry object: {e}')
                
                try:
                    target_cols = ['TOTAL_AMOUNT'] #edit
                    feature_cols = [i for i in df.schema.names if i not in target_cols]
                    X = train_df.select(feature_cols).limit(100)

                    model_name = model_name
                    version_name = set_model_version(model_registry, model_name, major_version=major_version)
                    model_version = model_registry.log_model(
                        model = pipe, 
                        model_name = model_name, 
                        version_name= f'"{version_name}"',
                        sample_input_data=X,
                        conda_dependencies=['snowflake-snowpark-python','snowflake-ml-python','scikit-learn', 'xgboost']
                        )

                    model_version.set_metric(metric_name='mean_abs_pct_err', value=mape)
                    model_version.set_metric(metric_name='mean_sq_err', value=mse)
                    telemetry.add_event("model_registered", {"version": version_name})
                
                except Exception as e:
                    return (f'Error with saving model to registry: {e}')
                
                try:
                    session.sql(f'alter model {model_name} set default_version = "{version_name}";')
                except Exception as e:
                    return (f'Error with setting default version: {e}')

            return f'Model {model_name} has been logged with version {version_name} and has a MAPE of {mape} and MSE of {mse}'

        except Exception as e:
            telemetry.add_event("pipeline_failure", {
                "error": str(e),
                "stack_trace": traceback.format_exc()
            })
            raise  # Re-raise to preserve error handling