### Install mlflow libraries if you have not done it already

In [1]:
!pip install mlflow snowflake-snowpark-python==1.0.0 pyarrow==8.0.0 scikit-learn==1.1.1

In [None]:
!pip install snowflake_mlflow-0.0.1-py3-none-any.whl

### Import all libraries including snowpark related libraries

In [None]:
# Snowpark Imports
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
from snowflake.snowpark.functions import sproc, udf, col, log, when, lit
# Other Imports
import pandas as pd
import mlflow
from mlflow.deployments import get_deploy_client
import json
import joblib
from pprint import pprint

# Import Snowflake Plugin for MLflow
from snowflake.ml.mlflow import create_session

## from DataBricks

import numpy as np
import pandas as pd
from datetime import date
from datetime import datetime
import os
import gc

from scipy.stats.mstats import winsorize   
from sklearn import preprocessing

### Create the snowflake connection with cred.json connection details
### Create internal stage in the DB, Schema for storing Models and functions

In [2]:
# Reading Snowflake Connection Details
snowflake_connection_cfg = json.loads(open('cred.json').read())

# Creating Snowpark Session
mlflow_poc_session = Session.builder.configs(snowflake_connection_cfg).create()

# Create a fresh & new schema
mlflow_poc_session.sql('CREATE OR REPLACE SCHEMA MLFLOW_POC_DEMO').collect()
mlflow_poc_session.use_schema('MLFLOW_POC_DEMO')

# Creating stages for functions, models
mlflow_poc_session.sql("CREATE STAGE IF NOT EXISTS FUNCTIONS").collect()
mlflow_poc_session.sql("CREATE STAGE IF NOT EXISTS MODELS").collect()

[Row(status='Stage area MODELS successfully created.')]

### This is a temporary step to load data from csv file to a snowflake table

In [3]:
F_CLV_FEATURES_INPUT_df = pd.read_csv('F_CLV_FEATURES_INPUT.csv')
mlflow_poc_session.write_pandas(F_CLV_FEATURES_INPUT_df, 
                                table_name='F_CLV_FEATURES_INPUT', 
                                auto_create_table=True, 
                                overwrite=True)

<snowflake.snowpark.table.Table at 0x7fdec88a7d30>

### Read the snowflake table to snowpark dataframe

In [4]:
table = "F_CLV_FEATURES_INPUT"

# input_data = (spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(os.path.join(path, table))).toPandas()
input_data = mlflow_poc_session.table(table)

### Print shape of snowpark dataframe

In [5]:
# print(input_data.shape)
print([input_data.count(),len(input_data.columns)])

[5000, 95]


### Sample top nsample rows in snowpark dataframe and print shape of the data

In [6]:
# nSample = min(14000000, input_data.shape[0])

nSample = min(14000000, input_data.count())

# data = input_data[:nSample]

data = input_data.limit(nSample)

# print(input_data.shape)

print([data.count(),len(data.columns)])

[5000, 95]


In [7]:
## need to check
# del input_data
# gc.collect()

### define the features (categorical and numeric) and Targets

In [8]:
VAR_PROFILE = []

VAR_TARGET = ['RETAIL_LTR', 'RETAIL_LTV', 'COM_LTR', 'COM_LTV']

FEATURES_CATEGORICAL = ['RTL_RECENCY_12MO', 'RTL_FREQUENCY_12MO', 'RTL_TENURE_12MO', 
                    'COM_RECENCY_12MO', 'COM_FREQUENCY_12MO', 'COM_TENURE_12MO',
                    'VERTICAL', 'EMP_SIZE',  'IS_TEACHER', 'IS_PARENT',]

FEATURES_NUMERIC = [
       'RETAIL_NET_SALES', 
       'RETAIL_NET_SALES_SUPPLIES', 
       'RETAIL_NET_SALES_COMPUTERS',
       'RETAIL_NET_SALES_FACILITIES', 'RETAIL_NET_SALES_FOOD',
       'RETAIL_NET_SALES_FURNITURES', 'RETAIL_NET_SALES_INK',
       'RETAIL_NET_SALES_MAIL', 'RETAIL_NET_SALES_SERVICES',
       'RETAIL_NET_SALES_PRINTERS', 'RETAIL_NET_SALES_PAPER',
       'RETAIL_NET_SALES_PRINT', 'RETAIL_NET_SALES_WARRANTIES',
       'RETAIL_NET_SALES_TECH_ACCESSORIES', 'RETAIL_NET_SALES_OTHER',
            
       'COM_NET_SALES',
       'COM_NET_SALES_SUPPLIES', 'COM_NET_SALES_COMPUTERS',
       'COM_NET_SALES_FACILITIES', 'COM_NET_SALES_FOOD',
       'COM_NET_SALES_FURNITURES', 'COM_NET_SALES_INK', 'COM_NET_SALES_MAIL',
       'COM_NET_SALES_SERVICES', 'COM_NET_SALES_PRINTERS',
       'COM_NET_SALES_PAPER', 'COM_NET_SALES_PRINT',
       'COM_NET_SALES_WARRANTIES', 'COM_NET_SALES_TECH_ACCESSORIES',
       'COM_NET_SALES_OTHER', 
    
       'RETAIL_MARGIN', 
       'RETAIL_MARGIN_SUPPLIES', 'RETAIL_MARGIN_COMPUTERS',
       'RETAIL_MARGIN_FACILITIES', 'RETAIL_MARGIN_FOOD',
       'RETAIL_MARGIN_FURNITURES', 'RETAIL_MARGIN_INK', 
       'RETAIL_MARGIN_MAIL',
       'RETAIL_MARGIN_SERVICES', 'RETAIL_MARGIN_PRINTERS',
       'RETAIL_MARGIN_PAPER', 'RETAIL_MARGIN_PRINT',
       'RETAIL_MARGIN_WARRANTIES', 'RETAIL_MARGIN_TECH_ACCESSORIES',
       'RETAIL_MARGIN_OTHER', 
    
       'COM_MARGIN', 'COM_MARGIN_SUPPLIES',
       'COM_MARGIN_COMPUTERS', 'COM_MARGIN_FACILITIES', 'COM_MARGIN_FOOD',
       'COM_MARGIN_FURNITURES', 'COM_MARGIN_INK', 'COM_MARGIN_MAIL',
       'COM_MARGIN_SERVICES', 'COM_MARGIN_PRINTERS', 'COM_MARGIN_PAPER',
       'COM_MARGIN_PRINT', 'COM_MARGIN_WARRANTIES',
       'COM_MARGIN_TECH_ACCESSORIES', 'COM_MARGIN_OTHER', 

        'RETAIL_SFC', 'RETAIL_VFC',
        'COM_SFC', 'COM_VFC',
        
       'RETAIL_NET_SALES_RETURNS', 'RETAIL_MARGIN_RETURNS',
       'COM_NET_SALES_RETURNS', 'COM_MARGIN_RETURNS',
    
       'RETAIL_NET_SALES_KIOSK', 'RETAIL_MARGIN_KIOSK', 
       'COM_NET_SALES_BOPIS', 'COM_MARGIN_BOPIS', 
       
       'RETAIL_SRWSTM', 'RETAIL_SRWINK', 
       'COM_SRWSTM', 'COM_SRWINK',
       'RETAIL_NET_SALES_STAPLES_BRAND', 'RETAIL_MARGIN_STAPLES_BRAND',
       'COM_NET_SALES_STAPLES_BRAND', 'COM_MARGIN_STAPLES_BRAND']

VAR_SUBSET = FEATURES_CATEGORICAL + FEATURES_NUMERIC + VAR_TARGET

VAR_TOTAL = VAR_SUBSET + VAR_PROFILE


In [None]:
### Databricks Code Comment start here

In [9]:
# def func_numeric_log(df):
#     for clsName in FEATURES_NUMERIC:
#         ###change to snowpark
#         df[clsName] = np.where(df[clsName]<0, -np.log(1-df[clsName]), np.log(1+df[clsName]))
#     return df
# def func_feature_preproc(df):
    
#     df_log = func_numeric_log(df=df)
    
#     output = df_log[VAR_TOTAL]
    
#     return output
# df_preproc = func_feature_preproc(df=data.to_pandas())

# # print(df_preproc.shape)

# # del data
# # gc.collect()

In [10]:
# def func_numeric_log(df):
#     for clsName in FEATURES_NUMERIC:
#         df[clsName] = np.where(df[clsName]<0, -np.log(1-df[clsName]), np.log(1+df[clsName]))
#     return df

# def func_feature_preproc(df):
    
#     df_log = func_numeric_log(df=df)
    
#     output = df_log[VAR_TOTAL]
    
#     return output

In [None]:
### Databricks Code Comment end here

### Define same function with snowpark dataframe functions

In [9]:
def func_numeric_log(df):
    for clsName in FEATURES_NUMERIC:
        ### snowpark dataframe function
        df=df.with_column(clsName,when(col(clsName)>=0, F.log(2.71828,1+data.col(clsName))) \
         .when(col(clsName)<0, -F.log(2.71828,1-data.col(clsName))) \
         .otherwise(lit(0)))

    return df

def func_feature_preproc(df):
    df_log = func_numeric_log(df=df)    
    output = df_log.select(VAR_TOTAL)
    return output

In [None]:
### Databricks Code start here

In [13]:
# df_preproc = func_feature_preproc(df=data.to_pandas())

# print(df_preproc.shape)

# ### need to find replacement in snowpark
# # del data
# # gc.collect()

In [3]:
### Databricks Code end here

### Call snowpark dataframe embedded in the function on the data and store output as transient table

In [10]:
df_preproc = func_feature_preproc(df=data)
print([df_preproc.count(),len(df_preproc.columns)])
mlflow_poc_session.create_dataframe(df_preproc.to_pandas())\
                  .write.mode("overwrite")\
                  .save_as_table("staple_df_preproc",table_type="transient")

[5000, 94]


create_temp_table is deprecated. We still respect this parameter when it is True but please consider using `table_type="temporary"` instead.


In [None]:
### Created this below piece of code to register separate winsorize stored proc but as I explained I have build a consolidated Stored Proc all the way below

In [16]:
# def func_winsorize_train(df_train_input, features):
#     df_train_winsorize = df_train_input.copy(deep=True)

#     dic_winsorize_min = {} 
#     dic_winsorize_max = {} 

#     for clsName in features:
#         df_train_winsorize[clsName] = winsorize(df_train_input[clsName], limits=[0.01, 0.01]).data

#         dic_winsorize_min[clsName] = df_train_winsorize[clsName].min()
#         dic_winsorize_max[clsName] = df_train_winsorize[clsName].max()
    
#     return df_train_winsorize, dic_winsorize_min, dic_winsorize_max

In [17]:
# @sproc(name='func_winsorize_train',
#        packages=['snowflake-snowpark-python','pandas','scipy'],
#        stage_location='@MODELS',
#        is_permanent=True,
#        replace=True)
# def func_winsorize_train(session: Session, inp_table_name: str ,
#                          out_table_name: str, 
#                          features:list)->T.Variant:
#     from scipy.stats.mstats import winsorize   
#     import pandas as pd
#     df_train_winsorize = session.table(inp_table_name).select(*features).to_pandas().copy(deep=True)

#     dic_winsorize_min = {} 
#     dic_winsorize_max = {} 

#     for clsName in features:
#         df_train_winsorize[clsName] = winsorize(df_train_winsorize[clsName], limits=[0.01, 0.01]).data
        
#         dic_winsorize_min[clsName] = df_train_winsorize[clsName].min()
#         dic_winsorize_max[clsName] = df_train_winsorize[clsName].max()
        
#     session.create_dataframe(df_train_winsorize)\
#     .write.mode("overwrite")\
#     .save_as_table(out_table_name,table_type="transient")

#     return {'winsorize_min':dic_winsorize_min,'winsorize_max':dic_winsorize_max}

In [None]:
### Databricks code starts here

In [18]:
# # COMMAND ----------

# X_winsorize, winsorize_feature_numeric_min, winsorize_feature_numeric_max = func_winsorize_train(df_train_input=df_preproc, features=FEATURES_NUMERIC)
# y_winsorize, winsorize_target_min, winsorize_target_max = func_winsorize_train(df_train_input=df_preproc, features=VAR_TARGET)

# #--
# print(X_winsorize.shape)
# print(y_winsorize.shape)

# # del df_preproc
# # gc.collect()

In [19]:
# # COMMAND ----------
# winsorize_feature_numeric = func_winsorize_train("staple_df_preproc",
#                                                  "staple_df_features_winsorize",
#                                                  FEATURES_NUMERIC)

# winsorize_target = func_winsorize_train("staple_df_preproc",
#                                         "staple_df_target_winsorize",
#                                         VAR_TARGET)

# X_winsorize = mlflow_poc_session.table("staple_df_features_winsorize")
# Y_winsorize = mlflow_poc_session.table("staple_df_target_winsorize")

# winsorize_feature_numeric_min = eval(winsorize_feature_numeric)['winsorize_min']
# winsorize_feature_numeric_max = eval(winsorize_feature_numeric)['winsorize_max']

# winsorize_target_min = eval(winsorize_target)['winsorize_min']
# winsorize_target_max = eval(winsorize_target)['winsorize_max']

# #--
# print([X_winsorize.count(),len(X_winsorize.columns)])
# print([Y_winsorize.count(),len(Y_winsorize.columns)])

# # need to find equivalent in snowpark
# # del df_preproc
# # gc.collect()

In [4]:
### Databricks code end here

In [None]:
### Created this below piece of code to register separate standard stored proc but as I explained I have build a consolidated Stored Proc all the way below

In [21]:
# @sproc(name='func_standard_scale_train',
#        packages=['snowflake-snowpark-python','pandas','scikit-learn==1.1.1'],
#        stage_location='@MODELS',
#        is_permanent=True,
#        replace=True)
# def func_standard_scale_train(session: Session, 
#                               inp_table_name: str ,
#                              out_table_name: str, 
#                              features:list)->str:
#     from sklearn import preprocessing   
#     import pandas as pd
#     X_numeric = session.table(inp_table_name).select(*features).to_pandas().copy(deep=True)
    
#     scaler_feature_numeric = preprocessing.StandardScaler()
#     scaler_feature_numeric = scaler_feature_numeric.fit(X_numeric)
        
#     # session.create_dataframe(scaler_feature_numeric)\
#     # .write.mode("overwrite")\
#     # .save_as_table(out_table_name,table_type="transient")

#     return scaler_feature_numeric

In [22]:
# func_standard_scale_train("STAPLE_DF_PREPROC_FEATURES_WINSORIZE",
#                                                  "staple_df_features_std_scale",
#                                                  FEATURES_NUMERIC)

In [None]:
### Databricks Code start here

In [23]:
# del df_preproc
# gc.collect()

# # COMMAND ----------

# X_numeric = X_winsorize[FEATURES_NUMERIC]
# scaler_feature_numeric = preprocessing.StandardScaler()
# scaler_feature_numeric = scaler_feature_numeric.fit(X_numeric)

# #--
# print(X_numeric.shape)

# del X_numeric, X_winsorize, y_winsorize
# gc.collect()

# # COMMAND ----------

# # MAGIC %md #SAVE parameters for winsorize and scaler

# # COMMAND ----------

# import mlflow
# import mlflow.sklearn

# mlflow.set_tracking_uri("databricks")
# mlflow.set_experiment("/Users/adminzhali001@ussicorp5.onmicrosoft.com/LTV_train_score_prd/LTV_Experiment/LTV_TRAIN_DE_14M")

# with mlflow.start_run(nested=True) as run:
#     run_id = run.info.run_id
#     print(run_id)
    
#     mlflow.sklearn.log_model(scaler_feature_numeric,  "scaler_feature_numeric")
#     mlflow.sklearn.log_model(winsorize_feature_numeric_min,  "winsorize_feature_numeric_min")
#     mlflow.sklearn.log_model(winsorize_feature_numeric_max,  "winsorize_feature_numeric_max")
#     mlflow.sklearn.log_model(winsorize_target_min,  "winsorize_target_min")
#     mlflow.sklearn.log_model(winsorize_target_max,  "winsorize_target_max")

# # COMMAND ----------

# # MAGIC %md #SAVE RUN_ID TO DATBRICKS

# # COMMAND ----------

# run_id_hist =  (spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("/FileStore/tables/run_id")).toPandas()

# # COMMAND ----------

# df_runId = pd.DataFrame({'name':'winsorize_scale',
#                          'run_day':datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
#                          'run_id': run_id}, index=[0])

# df_runId_full = run_id_hist.append(df_runId)

# df_runId_full = df_runId_full.assign(name = lambda f: f['name'].astype(str))\
#             .assign(run_day = lambda f: f['run_day'].astype(str))\
#             .assign(run_id = lambda f: f['run_id'].astype(str)) \
#             .reset_index(drop=True)

# # COMMAND ----------

# spark_run_id = spark.createDataFrame(df_runId_full)
# spark_run_id.write.mode('overwrite').option("header",True).csv("/FileStore/tables/run_id")

# # COMMAND ----------

# # MAGIC %md # CLEANING CACHE

# # COMMAND ----------

# # MAGIC %reset -f

In [5]:
### Databricks Code end here

In [None]:
### Complete Stored Proc for winsorize (feature numeric, target) and Standard Scale the numeric features

In [17]:
### Wrap by defining the stored proc by calling the packages required , model storage, function that will be registered.
@sproc(name='func_winsorize_std_scale',
       packages=['snowflake-snowpark-python','pandas','scipy','scikit-learn==1.1.1','joblib','mlflow'],
       stage_location='@MODELS',
       is_permanent=True,
       replace=True)
### Define the function by calling the input parameters with the corresponding datatypes and the output as variant
def func_winsorize_std_scale(session: Session, 
                             inp_table_name: str,
                             features:list,
                             targets:list,
                             stage_name: str,
                             experiment_name: str)->T.Variant:
    ### Import the required libraries as usual
    import mlflow
    import io
    import joblib
    # set MLflow-path to /tmp
    mlflow.set_tracking_uri('/tmp/mlruns')
    mlflow.sklearn.autolog()
    mlflow.set_experiment(experiment_name=experiment_name)
    
    # start the mlflow experiment
    run = mlflow.start_run()

    from scipy.stats.mstats import winsorize   
    import pandas as pd
    
    # load the table and convert to pandas in a dataframe for features numeric
    df_features_winsorize = session.table(inp_table_name).select(*features).to_pandas().copy(deep=True)
    
    dic_winsorize_features_min = {} 
    dic_winsorize_features_max = {} 
    
    # Apply the winsorize function on every variable and update the dataframe
    for clsName in features:
        df_features_winsorize[clsName] = winsorize(df_features_winsorize[clsName], limits=[0.01, 0.01]).data

        # Get the min and max value per feature
        dic_winsorize_features_min[clsName] = df_features_winsorize[clsName].min()
        dic_winsorize_features_max[clsName] = df_features_winsorize[clsName].max()

    # optionally Store the winsorized features in a transient table    
    out_table_name=inp_table_name+"_features"+"_winsorize"        
    session.create_dataframe(df_features_winsorize)\
    .write.mode("overwrite")\
    .save_as_table(out_table_name,table_type="transient")

    # load the table and convert to pandas in a dataframe for the targets    
    df_targets_winsorize = session.table(inp_table_name).select(*targets).to_pandas().copy(deep=True)

    dic_winsorize_targets_min = {} 
    dic_winsorize_targets_max = {} 

    for clsName in targets:
        df_targets_winsorize[clsName] = winsorize(df_targets_winsorize[clsName], limits=[0.01, 0.01]).data

        # Get the min and max value per target
        dic_winsorize_targets_min[clsName] = df_targets_winsorize[clsName].min()
        dic_winsorize_targets_max[clsName] = df_targets_winsorize[clsName].max()

    # optionally Store the winsorized targets in a transient table       
    out_table_name=inp_table_name+"_targets"+"_winsorize"        
    session.create_dataframe(df_targets_winsorize)\
    .write.mode("overwrite")\
    .save_as_table(out_table_name,table_type="transient")

    from sklearn import preprocessing   
    import pandas as pd

    # Define the Standard scaler and fit the winsorized features dataframe
    scaler_feature_numeric = preprocessing.StandardScaler()
    scaler_feature_numeric = scaler_feature_numeric.fit(df_features_winsorize)

    # Log the model information through mlflow
    mlflow.sklearn.log_model(scaler_feature_numeric,  "scaler_feature_numeric")
    mlflow.sklearn.log_model(dic_winsorize_features_min,  "winsorize_feature_numeric_min")
    mlflow.sklearn.log_model(dic_winsorize_features_max,  "winsorize_feature_numeric_max")
    mlflow.sklearn.log_model(dic_winsorize_targets_min,  "winsorize_target_min")
    mlflow.sklearn.log_model(dic_winsorize_targets_max,  "winsorize_target_max")
    mlflow.end_run()
    
    # Get the run id information in a variable
    run = mlflow.get_run(run.info.run_id)
    
    # Save model to Snowflake stage
    input_stream = io.BytesIO()
    joblib.dump(scaler_feature_numeric, input_stream)
    model_path = f'@{stage_name}/mlflow_models/{experiment_name}.joblib'
    session._conn._cursor.upload_stream(input_stream, model_path)
    
    # Return mlflow tracking and model path
    experiment_run = run.to_dictionary()
    experiment_run['SNOWFLAKE_MODEL_PATH'] = model_path
    
    # Capture the run id information in a pandas dataframe
    df_runId = pd.DataFrame({'name':'winsorize_scale',
                         'run_day':datetime.now().strftime("%d/%m/%Y %H:%M:%S"),
                         'run_id': str(experiment_run)}, index=[0])
    # define the log table
    log_table_name = "MLFLOW_LTV_STAPLES_LOG"
    
    # read the history log table in pandas dataframe and append the new run id
    run_id_hist = session.table(log_table_name).to_pandas()
    df_runId_full = run_id_hist.append(df_runId)
    
    # append the dataframe to the full pandas dataframe
    df_runId_full = df_runId_full.assign(name = lambda f: f['name'].astype(str))\
                .assign(run_day = lambda f: f['run_day'].astype(str))\
                .assign(run_id = lambda f: f['run_id'].astype(str)) \
                .reset_index(drop=True)

    #Write the dataframe to the table
    session.create_dataframe(df_runId_full)\
    .write.mode("overwrite")\
    .save_as_table(log_table_name)
    
    # return the experiment as variant
    return experiment_run

The version of package joblib in the local environment is 1.2.0, which does not fit the criteria for the requirement joblib. Your UDF might not work when the package version is different between the server and your local environment
The version of package mlflow in the local environment is 2.1.1, which does not fit the criteria for the requirement mlflow. Your UDF might not work when the package version is different between the server and your local environment


### Create mlflow log table if you have not created already

In [22]:
mlflow_poc_session.sql('create or replace table MLFLOW_LTV_STAPLES_LOG ("name" string,"run_day" timestamp, "run_id" variant)').collect();

In [23]:
# input table as string to read
inp_table_name='staple_df_preproc'
# stage location as string for model to save
stage_name='MODELS'
# numeric features as list
features=FEATURES_NUMERIC
# experiment name as string
experiment_name="LTV_STAPLES",
# targets list as list
targets=VAR_TARGET
# registered model name
registered_model_name = 'LTV_STAPLES_MODEL'

# just incase an mlflow run is alreayd in progress
mlflow.end_run()

# start mlflow run
run = mlflow.start_run()
# Train the model, returns mlflow run as dict
mlflow_dict = json.loads(func_winsorize_std_scale('staple_df_preproc', 
                                                  FEATURES_NUMERIC, 
                                                  VAR_TARGET,
                                                  'MODELS',
                                                  'LTV_STAPLES'))
mlflow_dict

mlflow.end_run()


In [24]:
show=['run_id']
mlflow_poc_session.sql("SELECT * FROM MLFLOW_LTV_STAPLES_LOG").show()
# check the table where the experiment is captured as variant.
# for best experiment login to snowflake worksheet to see the output on the variant.

----------------------------------------------------------------------------------------------
|"name"           |"run_day"            |"run_id"                                            |
----------------------------------------------------------------------------------------------
|winsorize_scale  |17/02/2023 14:25:48  |{'info': {'artifact_uri': '/tmp/mlruns/44181700...  |
----------------------------------------------------------------------------------------------

