In [8]:

# Snowpark for Python
import snowflake.snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import udf, sproc

cn_params = {
    "user": "USERNAME",
    "password": "PASSWORD",
    "account": "SERVER", 
    "warehouse": "WAREHOUSE",
    "database" : "EDWPRODHH",
    "schema" : "PUB_MBUTLER"
}

snowpark_session = Session.builder.configs(cn_params).create()

snowpark_session.add_packages('snowflake-snowpark-python', 'xgboost', 'pandas', 'numpy', 'joblib', 'cachetools')



def train_emails_v1 (session: Session) -> str:
    
    from joblib import dump
    from sklearn.pipeline import Pipeline
    from sklearn.model_selection import GridSearchCV
    from xgboost import XGBRegressor
    import pandas as pd
    ##  READ DATA
    
#     df_project = session.table("CONTACT_STRATEGY_EMAILS_6")
#     df = df_project.to_pandas()
  
   
    df = session.sql("""
    SELECT *
    FROM EDWPRODHH.PUB_MBUTLER.CONTACT_STRATEGY_EMAILS_6
    """).to_pandas()
    labels = df["DOL_COMMISSION_ATTR"]
    features = df.drop(
        [
            "DOL_COMMISSION_ATTR"
        ],
        axis = 1
    )
    ##  FIT AND TUNE HYPERPARAMETERS
    pipeline = XGBRegressor(
            n_jobs = 16, ##
            random_state = 42,
            
        )
    parameter_grid = {
        "n_estimators": [150], ## [1-Inf]. Number of trees, aka iterations/steps. Controls stopping point.
        "max_depth": [9], ## [0-20]. Depth of tree, where 0 is unlimited.
        "learning_rate": [0.6], ## [0-1]. Lower means more iterations but more accuracy (small steps).
        "gamma": [1],
        "sub_sample": [1], ## [0-1]. % of sample to train on.
        "colsample_bytree": [1] ## [0-1]. % of variables to train on.
    }
    model = GridSearchCV(
        estimator = pipeline,
        param_grid = parameter_grid,
        cv = 5,
    #         scoring = "neg_median_absolute_error",
    #         scoring = "neg_mean_squared_error",
        scoring = "r2",
        verbose = True
    )
    model.fit(features, labels)
    dump_path = "/tmp/train_emails_v1.joblib"
    dump(model, dump_path)
    session.file.put(
        dump_path,
        "@prod_models",
        overwrite = True
    )
    print(model.best_params_)
    print(model.best_score_)
    return ("Best Parameters: " + str(model.best_params_) + " with Best Score: " + str(model.best_score_) + ".")


In [10]:
snowpark_session.sproc.register(
    func = train_emails_v1,
    name = "train_emails_v1",
    replace = True
)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x1878e85e670>

In [11]:
snowpark_session.call("train_emails_v1")

"Best Parameters: {'colsample_bytree': 1, 'gamma': 1, 'learning_rate': 0.6, 'max_depth': 9, 'n_estimators': 150, 'sub_sample': 1} with Best Score: 0.535523093039143."

In [12]:


snowpark_session.add_import("@edwprodhh.hermes.prod_models/train_emails_v1.joblib.gz")

In [13]:
@udf (name = "prod_predict_v1_emails", stage_location = '@prod_models', session = snowpark_session, packages = ["pandas", "joblib", "scikit-learn", "xgboost"], replace = True)
def predict_Email_rev_v1 (inputs: list) -> float:
    
    import sys
    import pandas as pd
    import numpy as np
    from joblib import load
        
        
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    model_file = import_dir + "train_emails_v1.joblib.gz"
    model = load(model_file)
    
    df = pd.DataFrame(
        [inputs],
        columns = [
            'ASSIGNED_AMT',
            'DEBT_AGE',
            'PREVIOUS_CONTACTS',
            'DIALER_AGENT_CALL', 
            'OUTBOUND_MANUAL_CALL', 
            'TEXT_MESSAGE',
            'VOAPP', 
            'DIALER_AGENTLESS_CALL',
            'LETTER',
            'INBOUND_AGENT_CALL',
            'EMAIL', 
            'MEDIAN_HOUSEHOLD_INCOME',
            'EXPERIAN_SCORE' 
        ]
    )
    df['EXPERIAN_SCORE'] = pd.to_numeric(df['EXPERIAN_SCORE'], errors='coerce')
    df['MEDIAN_HOUSEHOLD_INCOME'] = pd.to_numeric(df['MEDIAN_HOUSEHOLD_INCOME'], errors='coerce')
    df['ASSIGNED_AMT'] = pd.to_numeric(df['ASSIGNED_AMT'], errors='coerce')
    
    y_pred = model.predict(df)[0]
    y_pred = np.clip(y_pred, a_min=0, a_max=None)
    return y_pred

The version of package joblib in the local environment is 1.2.0, which does not fit the criteria for the requirement joblib. Your UDF might not work when the package version is different between the server and your local environment
The version of package xgboost in the local environment is 1.7.5, which does not fit the criteria for the requirement xgboost. Your UDF might not work when the package version is different between the server and your local environment
