In [18]:
from snowflake.snowpark.session import Session
import snowflake.snowpark.types as T

import json
import pandas as pd

In [29]:
! pip install lightgbm



In [30]:
# Saving the installed packages and their versions as a Json file
import pkg_resources
env = dict(tuple(str(ws).split()) for ws in pkg_resources.working_set)

sp_udf_packages = ['snowflake-snowpark-python', 'pandas', 'scikit-learn', 'lightgbm', 'xgboost', 'cachetools', 'joblib', 'imbalanced-learn', 'matplotlib']

packages_version = dict((k , env[k]) for k in sp_udf_packages if k in  env )
with open('packages_version.json', 'w') as outfile:
    outfile.write(json.dumps(packages_version))

In [31]:
with open('packages_version.json') as f:
    packages_version = json.load(f)

packages_version

{'snowflake-snowpark-python': '1.3.0',
 'pandas': '1.5.3',
 'scikit-learn': '1.2.2',
 'xgboost': '1.7.6',
 'joblib': '1.2.0',
 'imbalanced-learn': '0.10.1',
 'matplotlib': '3.7.1'}

In [21]:
with open('creds.json') as f:
    connection_parameters = json.load(f)

In [22]:
session = Session.builder.configs(connection_parameters).create()
print(f"Current Database and schema: {session.get_fully_qualified_current_schema()}")
print(f"Current Warehouse: {session.get_current_warehouse()}")
print(f"Role : {session.get_current_role()}")

Current Database and schema: "HOL_DB"."PUBLIC"
Current Warehouse: "HOL_WH"
Role : "ACCOUNTADMIN"


In [23]:
session.sql('CREATE OR REPLACE STAGE ML_MODELS').collect()


[Row(status='Stage area ML_MODELS successfully created.')]

In [24]:
pd.DataFrame(session.sql('SHOW STAGES').collect())


Unnamed: 0,created_on,name,database_name,schema_name,url,has_credentials,has_encryption_key,owner,comment,region,type,cloud,notification_channel,storage_integration
0,2023-06-27 05:08:44.437000-07:00,ML_MODELS,HOL_DB,PUBLIC,,N,N,ACCOUNTADMIN,,,INTERNAL,,,


In [36]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import time
from imblearn.over_sampling import SMOTE
import opendatasets as od
from sklearn import svm
from sklearn.inspection import DecisionBoundaryDisplay
from sklearn.metrics import f1_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import GridSearchCV, train_test_split, StratifiedKFold, cross_val_score
from sklearn.linear_model import LogisticRegression
from sklearn.neighbors import KNeighborsClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from snowflake.snowpark import version
from snowflake.snowpark.functions import udf, sproc, col
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import IntegerType, FloatType, StringType, BooleanType, Variant

print(f"Snowflake snowpark version is: {version.VERSION}")




def main(session : Session, 
         table_name : str,
         target : str,
         clf : str) -> str :

    # Load features
    df = session.table(table_name).to_pandas()
    print(df.head(), flush=True)
    X = df.drop(target, axis = 1)
    y = df[target]

    # Split dataset into training and test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=1/3, random_state = 42)
    X_train, y_train = SMOTE().fit_resample(X_train,y_train)

    # Preprocess numeric columns
    numeric_features = df.select_dtypes(include=['float64']).columns
    numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = 2)),('scaler', StandardScaler())])
    preprocessor = ColumnTransformer(transformers=[('num', numeric_transformer, numeric_features)])

    
    def score_classifier(estimator,param_grid, X_train, y_train, X_test, y_test):

        pipeline = Pipeline(steps=[('preprocessor', preprocessor),('pca', PCA(n_components = 0.9)),('estimator', estimator)])

        # search for best parameters
        cv = StratifiedKFold(n_splits = 5, shuffle = True).split(X_train,y_train)
        clf = GridSearchCV(pipeline, param_grid, scoring ='recall', verbose =1, cv = cv)
        clf.fit(X_train,y_train)

        # evaluate best estimator
        best_estimator = clf.best_estimator_
        parameters = clf.best_params_
        train_score = clf.best_score_
        print(f'Average score of the best estimator (training dataset) : {train_score}')
        y_pred = clf.predict(X_test) # same as best_estimator.predict(X)
        cm = confusion_matrix(y_test, y_pred)
        test_score = recall_score(y_test, y_pred)
        disp = ConfusionMatrixDisplay(cm)
        disp.plot()
        title = str(estimator)+'\n Recall score on test dataset : '+str(test_score)
        plt.title(title)
        plt.show()
        return(best_estimator, parameters, y_pred, train_score, test_score, cm)

    # Create pipeline and train
    if clf == 'lr' : 

        estimator = LogisticRegression()
        param_grid = {
                'estimator__penalty': ['l1', 'l2'],  # Regularization penalty: L1 or L2
                'estimator__C': [0.1, 1, 10],  # Inverse of regularization strength
                'estimator__solver': ['liblinear', 'saga']  # Solver algorithm: 'liblinear' for small datasets, 'saga' for large datasets
        }   
        best_estimator, parameters, y_pred, train_score, test_score, cm = score_classifier(estimator,param_grid, X_train, y_train, X_test, y_test)   
    
    if clf == 'svm' : 
        parameters = {
            'estimator__C': [0.1, 1, 10],  # Regularization parameter
            'estimator__kernel': ['linear', 'rbf'],  # Kernel type: linear or radial basis function (RBF)
            'estimator__gamma': ['scale', 'auto'],  # Kernel coefficient for RBF
        }
        estimator = svm.SVC()
        best_estimator, parameters, y_pred, score, cm = score_classifier(X, estimator, parameters, y)


    if clf == 'dt' : 
        parameters = {
            'estimator__criterion': ['gini', 'entropy'],  # Split criterion: Gini impurity or information gain
            'estimator__max_depth': [5, 10],  # Maximum depth of the tree
            'estimator__min_samples_split': [2, 5, 10],  # Minimum number of samples required to split an internal node
            'estimator__min_samples_leaf': [1, 2, 4],  # Minimum number of samples required to be at a leaf node
            'estimator__max_features': [0.99, 'sqrt'],  # Number of features to consider when looking for the best split
        }
        estimator = DecisionTreeClassifier(verbose = 1)
        best_estimator, parameters, y_pred, score, cm = score_classifier(X, estimator, parameters, y)

    
    if clf == 'rf' : 
        parameters = {
        'estimator__n_estimators': [10, 100],  # Number of trees in the random forest
        'estimator__max_depth': [5, 10],  # Maximum depth of each tree
        'estimator__min_samples_split': [2, 5, 10],  # Minimum number of samples required to split an internal node
        'estimator__min_samples_leaf': [1, 2, 4],  # Minimum number of samples required to be at a leaf node
        'estimator__max_features': [0.99, 'sqrt'],  # Number of features to consider when looking for the best split
        'estimator__bootstrap': [True, False]  # Whether bootstrap samples are used when building trees
        }
        estimator = RandomForestClassifier(verbose = 1)
        best_estimator, parameters, y_pred, score, cm = score_classifier(X, estimator, parameters, y)


    if clf == 'lgb' : 
        estimator = lgb.LGBMClassifier()
        param_grid = {
            'estimator__boosting_type': ['gbdt', 'dart'],  # Boosting type: Gradient Boosting Decision Tree (gbdt) or Dart
            'estimator__num_leaves': [31, 63, 127],  # Maximum number of leaves in one tree
            'estimator__learning_rate': [1.0, 0.1, 0.01],  # Learning rate
            'estimator__subsample': [0.8, 1.0],  # Subsample ratio of the training instances
            'estimator__colsample_bytree': [0.8, 1.0],  # Subsample ratio of columns when constructing each tree
            'estimator__reg_alpha': [0.0, 0.1, 0.5],  # L1 regularization term
            'estimator__reg_lambda': [0.0, 0.1, 0.5]  # L2 regularization term
        }

        best_estimator, parameters, y_pred, score, cm = score_classifier(estimator,param_grid, X_train, y_train, X_test, y_test)


    # Upload trained model to a stage
    model_file = os.path.join('/tmp', str(estimator)+'.joblib')
    dump(best_estimator, model_file)
    session.file.put(model_file, "@ml_models",overwrite=True)

    return {"Classifier tested" : str(estimator), "Recall score on Train": train_score,"R2 score on Test": test_score}


In [37]:
# Registering the function as a Stored Procedure

sproc_main = session.sproc.register(func = main,
                                    name ='main',
                                    is_permanent = True,
                                    replace=True, 
                                    stage_location='@ML_MODELS', 
                                    packages=[f'snowflake-snowpark-python=={packages_version["snowflake-snowpark-python"]}'
                                                ,f'scikit-learn=={packages_version["scikit-learn"]}'
                                                ,f'joblib=={packages_version["joblib"]}'
                                                ,f'imbalanced-learn=={packages_version["imbalanced-learn"]}'
                                                ,f'pandas=={packages_version["pandas"]}'
                                                ,f'matplotlib=={packages_version["matplotlib"]}'
                                                , 'lightgbm'])

# Registering the function as a Stored Procedure

# sproc_main = session.sproc.register(func = main,
#                                     name ='main',
#                                     is_permanent = True,
#                                     replace=True, 
#                                     stage_location='@ML_MODELS', 
#                                     packages=[f'scikit-learn=={packages_version["scikit-learn"]}'
#                                                 ,f'joblib=={packages_version["joblib"]}'])

In [38]:
table_name = 'CREDIT_RECORD'

# table_name = session.get_fully_qualified_current_schema()+'.CREDIT_RECORD'
target = 'FRAUD_BOOL'

msg = sproc_main(table_name, target, 'lgb' , session=session)
print(msg)

SnowparkSQLException: (1304): 390114 (08001): Authentication token has expired.  The user must authenticate again.

In [None]:
# The model is now stored in a Snowflake stage
pd.DataFrame(session.sql('LIST @ML_MODELS').collect())

Unnamed: 0,name,size,md5,last_modified
0,ml_models/LogisticRegression().joblib.gz,9072,20a1f9c32fa77e892516a93efca76e06,"Mon, 26 Jun 2023 16:17:59 GMT"
1,ml_models/main/udf_py_1294080697.zip,3888,70f912261607487dcdd13d85c44222c2,"Mon, 26 Jun 2023 15:43:33 GMT"
2,ml_models/main/udf_py_1413835883.zip,3888,ebfe916dd4837f3d81e59e066997a5f3,"Mon, 26 Jun 2023 15:27:50 GMT"
3,ml_models/main/udf_py_2012092536.zip,3888,1d0ddda64fc44815587270e7f1d112f2,"Mon, 26 Jun 2023 15:54:43 GMT"
4,ml_models/main/udf_py_92652747.zip,3872,452f6594eb16baab7eb3123b11a43d7d,"Mon, 26 Jun 2023 16:10:12 GMT"
