In [0]:
# -*- coding: utf-8 -*-
import dataiku
import pandas as pd, numpy as np
from dataiku.snowpark import DkuSnowpark

from snowflake.snowpark import Session
from snowflake.snowpark.functions import udf
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import version
from snowflake.snowpark.types import *
from snowflake.snowpark.functions import when, col, lit, avg, stddev, stddev_pop , round, log

import mlflow
import os

In [0]:
# Use the Dataiku S3 connection API to set our AWS credentials, which will be required to read our model from S3 for scoring in Snowflake UDF
client = dataiku.api_client()

s3_conn = client.get_connection("s3")
s3_conn_info = s3_conn.get_info()

os.environ["AWS_ACCESS_KEY_ID"] = s3_conn_info["resolvedAWSCredential"]["accessKey"]
os.environ["AWS_SECRET_ACCESS_KEY"] = s3_conn_info["resolvedAWSCredential"]["secretKey"]
os.environ["AWS_SESSION_TOKEN"] = s3_conn_info["resolvedAWSCredential"]["sessionToken"]
os.environ["AWS_DEFAULT_REGION"] = "us-west-1"

project_key = dataiku.default_project_key()
final_table_name = project_key + "_UCI_BANK_EVALUATE_SNOWPARK_SCORED"

In [0]:
# Create the Snowpark Session and install packages
dku_snowpark = DkuSnowpark()

In [0]:
session = dku_snowpark.get_session("MLFlow")
session.add_packages("scikit-learn==1.0.2", "pandas", "numpy")

In [0]:
# Read input dataset for scoring
input_dataset = dataiku.Dataset("uci_bank_evaluate_sf")
uci_band_evaluate_df = dku_snowpark.get_dataframe(input_dataset, session = session)

features = list(uci_band_evaluate_df.columns)

# This is a hack to deal with weird quoting issues going from Snowflake column name to column name for scoring
features_1 = [item [1:-1] for item in features]

In [0]:
# Read in the trained MLFlow model from S3
mlflow_model_folder = dataiku.Folder("Py36")

s3_bucket = mlflow_model_folder.get_info()['accessInfo']['bucket']
s3_subpath = mlflow_model_folder.get_info()['accessInfo']['root']
model_path = '/python38_models/run_20230224181749/artifacts/RandomForestClassifier-run_20230224181749'
full_model_path = "s3://" + s3_bucket + s3_subpath + model_path #+ s3_bucket + s3_subpath

model = mlflow.sklearn.load_model(full_model_path)

In [0]:
# Define Snowpark UDF which will take input records and score them using the MLFlow model
@udf(name='predict_loan_payback',is_permanent = True, stage_location = '@UDF', replace=True)
def predict_loan_payback(args: list) -> str:
    row = pd.DataFrame([args], columns=features_1)
    return model.predict(row)[0]

In [0]:
# Run the model scoring UDF on the input dataset - save it explicitly to an output Snowflake table
uci_band_evaluate_df.select(col('"age"'),
                            col('"job"'),
                            col('"marital"'),
                            col('"education"'),
                            col('"default"'),
                            col('"balance"'),
                            col('"housing"'),
                            col('"loan"'),
                            col('"contact"'),
                            col('"day"'),
                            col('"month"'),
                            col('"duration"'),
                            col('"campaign"'),
                            col('"pdays"'),
                            col('"previous"'),
                            col('"poutcome"'),
                            col('"y"'),
                            F.call_udf("predict_loan_payback", F.array_construct(*features)).alias('"predicted_loan_payback"')) \
        .write.mode('overwrite').saveAsTable(final_table_name)