In [63]:
from snowflake.snowpark import Session
import configparser
def get_session():
    parser = configparser.ConfigParser()
    # Add the credential file name here
    parser.read('config.ini')

    connection_params = dict(user=parser['Credentials']['user'], 
                         password=parser['Credentials']['password'], 
                         account=parser['Credentials']['account'], 
                         warehouse=parser['Credentials']['warehouse'], 
                         database=parser['Credentials']['database'],
                         schema=parser['Credentials']['schema'], 
                         role=parser['Credentials']['role'])

    session = Session.builder.configs(connection_params).create()
    return session

In [64]:
session = get_session()

### Deploy the model from stage location as a UDF

In [44]:
session.clear_imports()
session.clear_packages()
from snowflake.snowpark.functions import udf,call_udf

# Add trained model as dependency
session.add_import("@models/Predictive_Maintenance_model_20230919_194031.joblib.gz")
import pandas
@udf(name='predict', session=session,is_permanent=True,replace=True,stage_location="@SCORE",packages=["snowflake-snowpark-python","pandas", "joblib","scikit-learn"])
def score(payload: list) -> str:
    import sys
    import pandas as pd
    from joblib import load

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    pipeline_file = import_dir + 'Predictive_Maintenance_model_20230919_194031.joblib.gz'
    pipeline = load(pipeline_file)

    prediction = pipeline.predict(df)[0]
    
    return prediction

The version of package joblib in the local environment is 1.3.2, which does not fit the criteria for the requirement joblib. Your UDF might not work when the package version is different between the server and your local environment


#### Sample Input 

In [62]:
inp =[6,407438,1,3,'S1F0',1,0,26953834,0,0,2.079441541679836,0.0,3.9702919135521215,0.0,2.079441541679836]

Consuming the model one request at a time. For batch inferenece, deploy the model as vectorized UDF

In [59]:
output = session.sql("select PREDICT("+str(inp)+")").collect()

In [60]:
output

[Row(PREDICT([6, 407438, 1, 3, 'S1F0', 1, 0, 26953834, 0, 0, 2.079441541679836, 0.0, 3.9702919135521215, 0.0, 2.079441541679836])='0')]

In [65]:
    #add all imports
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import OneHotEncoder
    from sklearn.metrics import recall_score, f1_score, roc_auc_score, confusion_matrix,classification_report
    from sklearn.linear_model import SGDClassifier
    from sklearn.pipeline import Pipeline
    
    df_final = session.sql("SELECT * from {table}".format(table="predictive_maintenance_final")).to_pandas()
    
    # prepare data for training
    X = df_final.drop(['FAILURE'], axis = 1)
    y = df_final['FAILURE']
    X_train, X_test, Y_train, Y_test = train_test_split(X,y,random_state = 0, test_size=0.25, shuffle = True)
    X_train.reset_index(inplace = True, drop = True)
    Y_train.reset_index(inplace = True, drop = True)

    X_test.reset_index(inplace = True, drop = True)
    Y_test.reset_index(inplace = True, drop = True)
    
    x_train, x_val, y_train, y_val = train_test_split(X_train, Y_train, random_state = 0, test_size=0.25)
    
    pipeline = Pipeline([
        ('OHE', OneHotEncoder(handle_unknown = 'ignore',sparse=False)),
#         ('SCALER', StandardScaler()),
        ('SGD', SGDClassifier(random_state=0))
     ])
    
    pipeline.fit(x_train,y_train)
    y_pred = pipeline.predict(x_val) 
    cf_matrix = confusion_matrix(y_val, y_pred)
    cls_report = classification_report(y_val, y_pred, output_dict=True)
    print(cls_report)



{'0': {'precision': 0.9991432120978452, 'recall': 1.0, 'f1-score': 0.9995714224488921, 'support': 23323.0}, '1': {'precision': 0.0, 'recall': 0.0, 'f1-score': 0.0, 'support': 20.0}, 'accuracy': 0.9991432120978452, 'macro avg': {'precision': 0.4995716060489226, 'recall': 0.5, 'f1-score': 0.49978571122444604, 'support': 23343.0}, 'weighted avg': {'precision': 0.9982871582811996, 'recall': 0.9991432120978452, 'f1-score': 0.9987150017467982, 'support': 23343.0}}


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


In [67]:
import pickle
pickle.dump(pipeline, open("/data/pipeline.pkl", 'wb'))

In [68]:
loaded_pipeline = pickle.load(open("/data/pipeline.pkl", 'rb'))

Write the score function

In [71]:
from refractml import *
from refractml.constants import MLModelFlavours
import requests



In [72]:
@scoring_func
def score(model, request):
    payload = request.json["payload"]
    prediction = pipeline.predict([payload])[0]
    return prediction

In [74]:
req = requests.Request()
req.json = {"payload":[6,407438,1,3,'S1F0',1,0,26953834,0,0,2.079441541679836,0.0,3.9702919135521215,0.0,2.079441541679836]}
print(score(loaded_pipeline,req))

0




### Register on refract

In [80]:
## registering the model in refract.
tmp = register_model(loaded_pipeline, 
               score, 
               name="Predictive_Maintenance", 
               description="By analyzing data patterns and using machine learning algorithms, it predicts when maintenance is needed, allowing organizations to perform repairs or servicing precisely when required, minimizing downtime, reducing costs, and preventing unexpected equipment failures. This approach enhances operational efficiency and extends the lifespan of assets by addressing maintenance needs based on data-driven insights rather than fixed schedules.",
               flavour=MLModelFlavours.sklearn,
               model_type="classification",
               y_true=y_val,
               y_pred=y_pred, 
               features=x_train.columns,
               labels=[0,1],
               feature_names=X_train.columns.tolist(),
               original_features=X_train.columns.tolist(),
               feature_ids=X_train.columns,
               kyd=True, kyd_score = True)