### Load Snowpark libraries

In [35]:
# Import required libraries
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import avg, sum, col,lit
from snowflake.snowpark.functions import udf, sproc, col
from snowflake.snowpark.types import IntegerType, FloatType, LongType, DoubleType, DecimalType,StringType, BooleanType, Variant
from snowflake.snowpark.types import PandasSeries, PandasDataFrame
from snowflake.snowpark import functions as fn

import sys ,json
import io
import logging
import pandas as pd

import joblib
import pandas as pd
import numpy as np
import json

from snowflake.snowpark import version
print (f"snowflake snowpark version is: {version.VERSION}")

snowflake snowpark version is: (0, 10, 0)


### Connect to Snowflake and establish session

In [36]:
snowflake_connection_cfg = open('cred.json')
snowflake_connection_cfg = snowflake_connection_cfg.read()
snowflake_connection_cfg = json.loads(snowflake_connection_cfg)

# Creating Snowpark Session
tc_session = Session.builder.configs(snowflake_connection_cfg).create()
print('Current Database:', tc_session.get_current_database())
print('Current Schema:', tc_session.get_current_schema())
print('Current Warehouse:', tc_session.get_current_warehouse())

Current Database: "BANK1_CRM_DB"
Current Schema: "PUBLIC"
Current Warehouse: "APP_WH"


### Create stage location for models

In [37]:
tc_session.sql("CREATE OR REPLACE STAGE stage_models").collect()

[Row(status='Stage area STAGE_MODELS successfully created.')]

In [38]:
tc_session.clear_packages()
tc_session.add_packages("snowflake-snowpark-python")
tc_session.add_packages("scikit-learn","pandas","numpy","joblib","cachetools")
tc_session.clear_imports()

### Define function to train test random split

In [39]:
def train_test_split(training_table: str, sample_size_n: int, features:list, Y:str, test_size:float, random_state:int):
    # Loading data into pandas dataframe

    # Define features and label
    X = training_table[features]
    Y = training_table[Y]

    # Splitting data into training and test
    from sklearn.model_selection import train_test_split
    X_train, X_test, y_train, y_test = train_test_split(X, Y, test_size=test_size, random_state=random_state)

    return X_train, X_test, y_train, y_test

### Define function to save trained model

In [40]:
def save_file(session, model, path, dest_filename):
    # logger.debug('#save_file: -- START--')
    input_stream = io.BytesIO()
    joblib.dump(model, input_stream)
    session._conn.upload_stream(input_stream, path, dest_filename)
    return "successfully created file: " + path

### Define Features required to train model

In [41]:
features=['SEPTAL_LENGTH','SEPTAL_WIDTH','PETAL_LENGTH','PETAL_WIDTH']

### Define Model pipeline for Imputer, Standard Scaler and Random Classifier Model

In [42]:
def build_rf_model(p_df: pd.DataFrame,ne,nj,cw, md):
    from sklearn.pipeline import Pipeline
    from sklearn.impute import SimpleImputer
    from sklearn.preprocessing import StandardScaler, OneHotEncoder
    from sklearn.compose import ColumnTransformer
    from sklearn.ensemble import RandomForestClassifier
    numeric_features = p_df.select_dtypes(include=['int64', 'float64']).columns.tolist()
    categorical_features = p_df.select_dtypes(include=['object']).columns.tolist()

    feature_names = numeric_features + categorical_features

    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='mean')),
        ('scaler', StandardScaler(with_mean=True,with_std=True))])

    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)])

    model = Pipeline(steps=[
                ('preprocessor', preprocessor),
                ('classifier'
                    ,RandomForestClassifier(n_estimators=ne, n_jobs=-nj, class_weight=cw,max_depth=md)
                    # ,RandomForestClassifier(n_estimators=4, n_jobs=-1, class_weight='balanced_subsample',max_depth=20)
                    # ,RandomForestClassifier(maxBins=20,featureSubsetStrategy='onethird') need to find the equivalents
                    # of these maxBins and featureSubsetStrategy. For featureSubsetStrategy I do think it is the 
                    # classweight from sklearn based on the documentation. I also think maxBins could be the same as
                    # maxdepth.
                )
            ])

    return model


In [43]:
def build_dtree_model(p_df: pd.DataFrame,cw, md):
    from sklearn.pipeline import Pipeline
    from sklearn.impute import SimpleImputer
    from sklearn.preprocessing import StandardScaler, OneHotEncoder
    from sklearn.compose import ColumnTransformer
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.tree import DecisionTreeClassifier

    numeric_features = p_df.select_dtypes(include=['int64', 'float64']).columns.tolist()
    categorical_features = p_df.select_dtypes(include=['object']).columns.tolist()

    feature_names = numeric_features + categorical_features

    numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='mean')),
        ('scaler', StandardScaler(with_mean=True,with_std=True))])

    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
        ('onehot', OneHotEncoder(handle_unknown='ignore'))])

    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features),
            ('cat', categorical_transformer, categorical_features)])

    model = Pipeline(steps=[
                ('preprocessor', preprocessor),
                ('classifier'
                    ,DecisionTreeClassifier(class_weight=cw,max_depth=md)
                    # ,RandomForestClassifier(n_estimators=4, n_jobs=-1, class_weight='balanced_subsample',max_depth=20)
                    # ,RandomForestClassifier(maxBins=20,featureSubsetStrategy='onethird') need to find the equivalents
                    # of these maxBins and featureSubsetStrategy. For featureSubsetStrategy I do think it is the 
                    # classweight from sklearn based on the documentation. I also think maxBins could be the same as
                    # maxdepth.
                )
            ])

    return model

In [44]:
def get_classification_report(y_test, y_pred):
    from sklearn import metrics
    report = metrics.classification_report(y_test, y_pred, output_dict=True,target_names=['setosa', 'versicolor', 'virginica'])
    df_classification_report = pd.DataFrame(report).transpose()    
    return df_classification_report

In [45]:
def get_model_info(model_name, test_size, random_state,ne,nj,cw,max_depth):
    data = [[model_name,test_size,random_state,ne,nj,cw,max_depth]]  
    df_model_info = pd.DataFrame(data,columns=['model','test_size','random_state','n_estimator','n_jobs','class_weight','max_depth'])
    return df_model_info

### Train random forest classifier model

In [46]:
def train_rf_model(session: Session, training_table: str, sample_size_n: int, model_name: str,features:list, Y: str,test_size:float,random_state:int,ne:int,nj:int,cw:str, md:int) -> str:
    from sklearn.metrics import accuracy_score, classification_report, precision_score, recall_score
    from sklearn.metrics import accuracy_score, confusion_matrix, classification_report

    training_data = session.table(training_table).sample(n=sample_size_n).toPandas()

    Data_train, Data_test, y_train, y_test = train_test_split(training_data, sample_size_n, features, Y,test_size,random_state)
    from sklearn.ensemble import RandomForestClassifier 
    # Model building
    rf = build_rf_model(Data_train,ne,nj,cw, md)
    rf.fit(Data_train, y_train)

    model_dir = '@stage_models'
    model_fl = model_name+'.joblib'
    save_file(session, rf, model_dir ,model_fl)

    score = rf.score(Data_test, y_test)
    
    y_pred = rf.predict(Data_test)
    df_classification_report = get_classification_report(y_pred,y_test).reset_index().rename(columns={"index": "class"}).reset_index(drop=True)
    df_model_info = get_model_info(model_fl,test_size,random_state,ne,nj,cw,md)
    df_model_info=df_model_info.append([df_model_info]*5,ignore_index=True)
    # tc_session.create_dataframe(df_classification_report.join(df_model_info)).write.mode("overwrite").save_as_table("model_output")
    
    return df_classification_report.join(df_model_info)

In [47]:
def train_dtree_model(session: Session, training_table: str, sample_size_n: int, model_name: str,features:list, Y: str,test_size:float,random_state:int,cw:str, md:int) -> str:
    training_data = session.table(training_table).sample(n=sample_size_n).toPandas()

    Data_train, Data_test, y_train, y_test = train_test_split(training_data, sample_size_n, features, Y,test_size,random_state)
    # Model building
    dtree = build_dtree_model(Data_train,cw, md)
    dtree.fit(Data_train, y_train)

    model_dir = '@stage_models'
    model_fl = model_name+'.joblib'
    save_file(session, dtree, model_dir ,model_fl)
    
    y_pred = dtree.predict(Data_test)
    df_classification_report = get_classification_report(y_pred,y_test).reset_index().rename(columns={"index": "class"}).reset_index(drop=True)
    df_model_info = get_model_info(model_fl,test_size,random_state,None,None,cw,md)
    df_model_info=df_model_info.append([df_model_info]*5,ignore_index=True)
    # tc_session.create_dataframe(df_classification_report.join(df_model_info)).write.mode("overwrite").save_as_table("model_output")
    
    return df_classification_report.join(df_model_info)

### Define stored proc to register random forest classifier model

In [48]:
# Registering the function as a Stored Procedure
rf_sproc = tc_session.sproc.register(func=train_rf_model, # training function defined above
                                            name='train_rf_model', # training model name to be registered in snowlake
                                            is_permanent=True, # permanent stored proc
                                            replace=True, # replace if existing already
                                            stage_location='@stage_models', # save the model in stage location
                                            packages=['snowflake-snowpark-python','scikit-learn','joblib']) # import model libaries


In [49]:
# Registering the function as a Stored Procedure
dtree_sproc = tc_session.sproc.register(func=train_dtree_model, # training function defined above
                                            name='train_dtree_model', # training model name to be registered in snowlake
                                            is_permanent=True, # permanent stored proc
                                            replace=True, # replace if existing already
                                            stage_location='@stage_models', # save the model in stage location
                                            packages=['snowflake-snowpark-python','scikit-learn','joblib']) # import model libaries


In [52]:
print ("random forest classifier report")
print (rf_sproc('iris_dataset',100, 'rf_iris_model_ts10_md15',features,'LABEL',0.1,43,4,1,'balanced_subsample', 15))

random forest classifier report
          class  precision    recall  ...  n_jobs        class_weight max_depth
0        setosa   1.000000  1.000000  ...       1  balanced_subsample        15
1    versicolor   1.000000  0.666667  ...       1  balanced_subsample        15
2     virginica   0.000000  0.000000  ...       1  balanced_subsample        15
3      accuracy   0.800000  0.800000  ...       1  balanced_subsample        15
4     macro avg   0.666667  0.555556  ...       1  balanced_subsample        15
5  weighted avg   1.000000  0.800000  ...       1  balanced_subsample        15

[6 rows x 12 columns]


In [53]:
print ("random forest classifier report")
print (rf_sproc('iris_dataset',100, 'rf_iris_model_ts25_md20',features,'LABEL',0.25,43,4,1,'balanced_subsample', 20))

random forest classifier report
          class  precision    recall  ...  n_jobs        class_weight max_depth
0        setosa   1.000000  1.000000  ...       1  balanced_subsample        20
1    versicolor   1.000000  0.888889  ...       1  balanced_subsample        20
2     virginica   0.900000  1.000000  ...       1  balanced_subsample        20
3      accuracy   0.960000  0.960000  ...       1  balanced_subsample        20
4     macro avg   0.966667  0.962963  ...       1  balanced_subsample        20
5  weighted avg   0.964000  0.960000  ...       1  balanced_subsample        20

[6 rows x 12 columns]


In [54]:
print ("random forest classifier report")
print (rf_sproc('iris_dataset',100, 'rf_iris_model_ts25_md25',features,'LABEL',0.25,43,4,1,'balanced_subsample', 25))

random forest classifier report
          class  precision    recall  ...  n_jobs        class_weight max_depth
0        setosa   1.000000  1.000000  ...       1  balanced_subsample        25
1    versicolor   1.000000  0.875000  ...       1  balanced_subsample        25
2     virginica   0.888889  1.000000  ...       1  balanced_subsample        25
3      accuracy   0.960000  0.960000  ...       1  balanced_subsample        25
4     macro avg   0.962963  0.958333  ...       1  balanced_subsample        25
5  weighted avg   0.964444  0.960000  ...       1  balanced_subsample        25

[6 rows x 12 columns]


In [55]:
print ("decision tree classifier report")
print (dtree_sproc('iris_dataset',100, 'dtree_iris_model_ts25_md20',features,'LABEL',0.25,43,'balanced', 20))

decision tree classifier report
          class  precision    recall  ...  n_jobs  class_weight max_depth
0        setosa   1.000000  1.000000  ...    None      balanced        20
1    versicolor   0.875000  1.000000  ...    None      balanced        20
2     virginica   1.000000  0.888889  ...    None      balanced        20
3      accuracy   0.960000  0.960000  ...    None      balanced        20
4     macro avg   0.958333  0.962963  ...    None      balanced        20
5  weighted avg   0.965000  0.960000  ...    None      balanced        20

[6 rows x 12 columns]


In [56]:
print ("decision tree classifier report")
print (dtree_sproc('iris_dataset',100, 'dtree_iris_model_ts25_md25',features,'LABEL',0.25,43,'balanced', 25))

decision tree classifier report
          class  precision    recall  ...  n_jobs  class_weight max_depth
0        setosa   1.000000  1.000000  ...    None      balanced        25
1    versicolor   0.833333  0.625000  ...    None      balanced        25
2     virginica   0.625000  0.833333  ...    None      balanced        25
3      accuracy   0.840000  0.840000  ...    None      balanced        25
4     macro avg   0.819444  0.819444  ...    None      balanced        25
5  weighted avg   0.856667  0.840000  ...    None      balanced        25

[6 rows x 12 columns]


In [57]:
print ("decision tree classifier report")
print (dtree_sproc('iris_dataset',100, 'dtree_iris_model_ts25_md30',features,'LABEL',0.25,43,'balanced', 30))

decision tree classifier report
          class  precision    recall  ...  n_jobs  class_weight max_depth
0        setosa   1.000000  1.000000  ...    None      balanced        30
1    versicolor   0.888889  1.000000  ...    None      balanced        30
2     virginica   1.000000  0.800000  ...    None      balanced        30
3      accuracy   0.960000  0.960000  ...    None      balanced        30
4     macro avg   0.962963  0.933333  ...    None      balanced        30
5  weighted avg   0.964444  0.960000  ...    None      balanced        30

[6 rows x 12 columns]


### You can also Train multiple random forest classifier using registered stored proc

In [58]:
# test_size = np.arange(0.30, 0.34, 0.01)
# random_state = np.arange(22, 25, 1)
# max_depth = np.arange(10,35,5)
# for t in test_size:
#     for r in random_state:
#         for m in max_depth:
#             print (f"random forest classifier for test size : {t}, random state : {r} and max_depth :{m}")
#             print (rf_sproc('iris_dataset',150, 'rf_iris_model',features,'LABEL',float(t),int(r),4,1,'balanced_subsample', int(m)))

### Check if the classifier models are saved in stage location.
### Remember if the same model name was used for all the iterations, then only the last trained model will be saved
### for the model name.

In [59]:
tc_session.sql("list @stage_models").collect()

[Row(name='stage_models/dtree_iris_model_ts25_md20.joblib', size=4464, md5='55699d68aef93e8d437bae7c9bca2f6a', last_modified='Mon, 28 Nov 2022 03:30:55 GMT'),
 Row(name='stage_models/dtree_iris_model_ts25_md25.joblib', size=4464, md5='f260ad381d643f7133073755b228a9ea', last_modified='Mon, 28 Nov 2022 03:31:09 GMT'),
 Row(name='stage_models/dtree_iris_model_ts25_md30.joblib', size=4624, md5='0da182b87ef07e4bf7cf7879700ea556', last_modified='Mon, 28 Nov 2022 03:31:25 GMT'),
 Row(name='stage_models/rf_iris_model.joblib', size=7536, md5='dd9c3bb79ef0a030e89b6bce8b3bd716', last_modified='Mon, 28 Nov 2022 03:27:55 GMT'),
 Row(name='stage_models/rf_iris_model_ts10_md15.joblib', size=9456, md5='1d266b594d99225c43631f5836e96879', last_modified='Mon, 28 Nov 2022 03:29:34 GMT'),
 Row(name='stage_models/rf_iris_model_ts25_md20.joblib', size=10576, md5='18cb306df02eb278e265e6762eab5c0e', last_modified='Mon, 28 Nov 2022 03:29:41 GMT'),
 Row(name='stage_models/rf_iris_model_ts25_md25.joblib', size=88

In [60]:
tc_session.close()
print('Finished!!!')

Finished!!!
