In [None]:
from snowflake.snowpark import Session
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import DoubleType, Variant, PandasDataFrame, PandasSeries
from dotenv import load_dotenv
from sklearn.feature_extraction import text as txt
from sklearn import svm
from joblib import dump
from snowflake.snowpark.functions import sproc,udf,call_udf,col
import cachetools

In [None]:
load_dotenv()

In [None]:
pars=SnowflakeLoginOptions("test_conn")

In [None]:
pars['database']='imdb'
pars['schema']='public'

In [None]:
session=Session.builder.configs(pars).create()

In [None]:
session.query_tag='sentiment-serving'

In [None]:
test_dataset=session.table("train_dataset")
df_flag=test_dataset.withColumn(
    "SENTIMENT_FLAG",
    F.when(test_dataset['SENTIMENT']=='positive',1).otherwise(value=2)
)

In [None]:
df_flag.show(5)

In [None]:
session.clear_imports()
session.add_import("@models/model_review1.joblib")
session.add_import("@models/vect_review1.joblib")

session.clear_packages()
session.add_packages(
        'snowflake-snowpark-python==1.39.0',
        'snowflake-ml-python',
        'scikit-learn==1.2.2',
        'pandas==1.5.3',
        'numpy==1.23.5',
        'joblib==1.2.0',
        'cachetools'
        )


In [None]:
@cachetools.cached(cache={})
def load_file(filename):
    import os, sys, joblib
    import_dir=sys._xoptions.get("snowflake_import_directory")  # the files will be stored in the snowflake_import_directory location 
    if import_dir:
        with open(os.path.join(import_dir,filename),"rb") as file:
            return joblib.load(file)

In [None]:
udf_packages=['pandas==1.5.3','numpy==1.23.5','joblib==1.2.0','cachetools','scikit-learn==1.2.2',
              'snowflake-snowpark-python==1.39.0','snowflake-ml-python']
udf_imports=["@models/model_review1.joblib","@models/vect_review1.joblib"]
@udf(name="predict_review",is_permanent=True,stage_location="@files",
     replace=True,packages=udf_packages,imports=udf_imports)
def predict_review(args:list) -> float:
    import pandas as pd
    row=pd.DataFrame(data=[args],columns=list(["REVIEW","SENTIMENT_FLAG"]))
    bow_test=load_file(filename='vect_review1.joblib').transform(row['REVIEW'].values)
    
    prediction=load_file(filename='model_review1.joblib').predict(bow_test)
    return float(prediction[0])

In [None]:
query=df_flag.select(
    df_flag['REVIEW'],
    df_flag['SENTIMENT'],
    df_flag['SENTIMENT_FLAG'],
    call_udf(
        "predict_review",
        F.array_construct(F.col("REVIEW"),F.col("SENTIMENT_FLAG"))
    ).alias('PREDICTED_REVIEW')
)

query.show()

In [None]:
query.queries

In [None]:
# Create and test alternative vectorized udf for batch inference
udf_packages=['pandas==1.5.3','numpy==1.23.5','joblib==1.2.0','cachetools','scikit-learn==1.2.2',
              'snowflake-snowpark-python==1.39.0','snowflake-ml-python']
udf_imports=["@models/model_review1.joblib","@models/vect_review1.joblib"]
@udf(name="predict_review_batch",is_permanent=True,stage_location="@files",
     replace=True,packages=udf_packages,imports=udf_imports)

def predict_review_batch(df:PandasDataFrame[str]) -> PandasSeries[float]:
    vec=load_file(filename='vect_review1.joblib')
    bowTest=vec.transform(df[0].values)

    model=load_file(filename="model_review1.joblib")
    return model.predict(bowTest)

In [None]:
query=df_flag.select(
    df_flag['REVIEW'],
    df_flag['SENTIMENT'],
    df_flag['SENTIMENT_FLAG'],
    call_udf(
        "predict_review_batch",
        col("REVIEW"),
    ).alias('PREDICTED_REVIEW')
)

query.show()

In [None]:
query.queries