## User Defined Functions (UDF)

There is two diffrent types of UDFs :
* UDF (Scalar User Defined Function)
    * Is a scalar function that returns one output row for each input row. 
    * The returned row consists of a single column/value.
    * Python UDF batch API enables defining UDFs that receive batches of input rows as Pandas DataFrames and return batches of results as Pandas arrays or Series
* UDTF (User Defined Tabular Function)
    * A tabular function, also called a table function, returns zero, one, or multiple rows for each input row.


A UDF can be created using the **@udf** decorator, the **udf** function or the **udf.register** method ofthe session object. It can be permanent or temporary.


In [None]:
# Make sure we do not get line breaks when doing show on wide dataframes
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# Snowpark imports 
import snowflake.snowpark as S
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
from snowflake.snowpark import Window

# Used for reading creds.json
import json

# Used for UDF examples
import cachetools
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

# Used for UDF/UDTF examples
import joblib


# Used for the UDTF examples
from collections import Counter
from typing import Iterable, Tuple

# Print the version of Snowpark we are using
print(f"Using Snowpark: {S.__version__}")

Connect to Snowflake

This example is using a JSON file with the following structure
```
{
    "account":"MY SNOWFLAKE ACCOUNT",
    "user": "MY USER",
    "password":"MY PASSWORD",
    "role":"MY ROLE",
    "warehouse":"MY WH",
    "database":"MY DB",
    "schema":"MY SCHEMA"
}

```

In [None]:
with open('../creds.json') as f:
    connection_parameters = json.load(f)

session = Session.builder.configs(connection_parameters).create()
print("Current role: " + session.get_current_role() + ", Current schema: " + session.get_fully_qualified_current_schema() + ", Current WH: " + session.get_current_warehouse())

## UDF

Start by creating a UDF that returns a string, by setting is_permanent=False the UDF will only be avalible for our user and also only until the active Snowflake session is closed

In [None]:
session.clear_imports()
session.clear_packages()
@F.udf(name="hello_udf", is_permanent=False, replace=True, session=session)
def hello_udf(name: str) -> str:
    return f'Hello {name}!'

In [None]:
test_name_df = session.create_dataframe([['Mats'], ['Pia']], schema=["name"])

In [None]:
test_name_df.select(F.call_function("hello_udf", F.col("name"))).show()

In [None]:
session.clear_imports()
session.clear_packages()
@F.udf(name="hello_batch_udf", is_permanent=False, replace=True, session=session)
def hello_batch_udf(pd_df: T.PandasDataFrame[str]) -> T.PandasSeries[str]:
    n = len(ds)
    return ds.apply(lambda x: f'Hello {x}, we got {n} rows')

In [None]:
customers_df = session.table("snowflake_sample_data.tpcds_sf100tcl.customer")
print(f"Nbr of customers: {customers_df.count():,}")
customers_df.show()

If we test this using **show** we will see that it is not providing 1,000 rows, but 15 since that is the limit we are setting

In [None]:
customers_df.select(F.col("C_FIRST_NAME")).select(F.call_function("hello_batch_udf", F.col("C_FIRST_NAME"))).show(15)

By using **cache_result** we can temprary store the result of the query generated by the DataFrame and then seee that each call to the function does provide more rows.

In [None]:
batch_udf_df = customers_df.select(F.call_function("hello_batch_udf", F.col("C_FIRST_NAME"))).cache_result()
batch_udf_df.show(15)

### Reading files with UDFs

To read a file in a UDF, the file needs to be added using **add_import**.

If we do not need to updated the file, we can refeer to a local file and that file will be uploaded to Snowflake when the UDF is created. To use a updated file, we would need to recreate the UDF.

By using cachetools wqe can make sure that the file is only loaded once, since cachetools will cache the return object of the function in memory and return it if the paramtere used in teh call is the same.

In [None]:
data_path = "../data/"
#data_stage_name = "~" # Using the user stage
udf_stage_name = "UDF_DEMO_STAGE"

Function to read a file from a stage that a UDF has access to, ie the file needs to be added using the imports parameter.

In [None]:
@cachetools.cached(cache={})
def read_file_cached(filename):
    import sys
    import os
    
    import_dir = sys._xoptions.get("snowflake_import_directory")
    if import_dir:
        with open(os.path.join(import_dir, filename), "r") as f:
            return f.read()


Create a UDF where the imports parameter is referring the local file.

In [None]:
session.clear_imports()
session.clear_packages()

@F.udf(name="read_file_static_udf", is_permanent=False, replace=True, packages=["cachetools"], imports=[f"{data_path}/text_file.txt"] ,session=session)
def read_file_static() -> str:
    return read_file_cached('text_file.txt')


In [None]:
session.generator(F.call_function("read_file_static_udf"), rowcount=1).show()

If we want to be able to update the file we need to store it in a Snowflake stage, the stage can be either internal or external.

Create a Internal Snowflake stage

In [None]:
session.sql(f"create or replace stage {udf_stage_name}").collect()

Upload a local file to the new stage. 

In [None]:
session.file.put(f"{data_path}text_file.txt", f"@{udf_stage_name}", auto_compress=False, overwrite=True)

Check that the file is there.

In [None]:
session.sql(f"ls @{udf_stage_name}").show()

Create a UDF that hasa access to the file in the stage, using the imports parameter.

In [None]:
session.clear_imports()
session.clear_packages()

@F.udf(name="read_file_stage_udf", is_permanent=False, replace=True, packages=["cachetools"], imports=[f"@{udf_stage_name}/text_file.txt"] ,session=session)
def read_file_stage() -> str:
    return read_file_cached('text_file.txt')


In [None]:
session.generator(F.call_function("read_file_stage_udf"), rowcount=1).show()

If we change the text_file.txt (in the data folder) and upload it it

In [None]:
session.file.put(f"{data_path}text_file.txt", f"@{udf_stage_name}", auto_compress=False, overwrite=True)
session.sql(f"ls @{udf_stage_name}").show()

Rerun the call to the UDF

In [None]:
session.generator(F.call_function("read_file_stage_udf"), rowcount=1).show()

Creating a UDF that uses as saved Python object. In this case a fitted scikit-learn pipline.

Create and fit a pipeline, using titanic data (use 00_Load_demo_data.ipynb to load the data) 

In [None]:
cat_cols = ["EMBARKED", "SEX", "PCLASS"]
num_cols = ["AGE", "FARE"]

train_df = session.table("titanic").select(*cat_cols, *num_cols, "SURVIVED")

train_pd = train_df.to_pandas()

X = train_pd[[*cat_cols, *num_cols]]
y = train_pd["SURVIVED"]

In [None]:
# Imputer and OneHotEncoder for categorical columns
cat_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot', OneHotEncoder(handle_unknown='ignore', sparse=False))
])
# Imputer and Scaler for numerical columns
num_transformer = Pipeline(steps=[
    ('imputer', KNNImputer(n_neighbors=5)),
    ('scaler', StandardScaler())
])
preprocessor = ColumnTransformer(
  [
        ('num', num_transformer, num_cols),
        ('cat', cat_transformer, cat_cols)
    ],  verbose_feature_names_out=False,
)

pipe = Pipeline(steps=[('preprocessor', preprocessor),
                       ('classifier', RandomForestClassifier())])
rc_pipeline = pipe.fit(X, y)

In [None]:
rc_pipeline

Save the fitted pipeline as a file locally using joblib

In [None]:
joblib.dump(rc_pipeline, "rc_pipeline.joblib")

Upload the file to the Snowflake stage

In [None]:
session.file.put("rc_pipeline.joblib", f"@{udf_stage_name}", auto_compress=False, overwrite=True)

In [None]:
session.sql(f"ls @{udf_stage_name}").show()

Create a function to load the file using joblib, use cachetools so the read from stage is only done once

In [None]:
@cachetools.cached(cache={})
def load_joblib_file(filename):
    import joblib
    import sys
    import os

    import_dir = sys._xoptions.get("snowflake_import_directory")
    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m


Create the UDF, it is important that the imports parameter is refering the stage and file. Also, only the filename is needed for the load_joblib_file function.

In [None]:
@F.udf(name = "predict_survive_udf", is_permanent = False, imports = [f"@{udf_stage_name}/rc_pipeline.joblib"]
       , packages = ['pandas', 'scikit-learn==1.1.3', 'cachetools'], replace = True, session = session)
def predict_survive(pd_df: T.PandasDataFrame[str, str, str, float, float]) -> T.PandasSeries[int]:
    
    pd_df.columns = [*cat_cols, *num_cols]
    model = load_joblib_file('rc_pipeline.joblib') # Only call with the file name!

    return model.predict(pd_df)


Test that the UDF works.

In [None]:
input_cols = [F.col(col) for col in [*cat_cols, *num_cols]]
train_df.with_column("PREDICTION", F.call_function("predict_survive_udf", *input_cols)).show()

## UDTF

User Defined Table Functions (UDTF) is a function that returns zero, one, or multiple rows for each input row.

When creating a UDTF a Python class has to be used as the handler

A UDTF handler class implements the following, which Snowflake invokes at run time:
* An **__init__** method. Optional. Invoked to initialize stateful processing of input partitions.
* A **process** method. Required. Invoked for each input row. The method returns a tabular value as tuples.
* An **end_partition** method. Optional. Invoked to finalize processing of input partitions.

Start with a simple UDTF that splits a string into words and fore each unique word it returns a row with it and the number of ocurrances in the string of it.

In [None]:
class MyWordCount:
    # Called once for each partition
    def __init__(self):
        self._total_per_partition = 0
    
    # Called for each row in a partition
    def process(self, s1: str) -> Iterable[Tuple[str, int]]:
        words = s1.split()
        self._total_per_partition = len(words)
        # Counter will return a dict with the uinique words as keys and the number ocurrances as the values
        counter = Counter(words) 
        yield from counter.items()
    
    # Called after the last row in a partion has been processed
    def end_partition(self):
        yield ("partition_total", self._total_per_partition)

Create a temporary UDTF. We need to provide the output schema ie the columns of the returning rows. If only names are provided the data types are inheried from the process parameters

In [None]:
word_count_udtf = session.udtf.register(handler=MyWordCount, output_schema=["word", "count"], name="word_count_udtf", is_permanent=False, replace=True)

Test the UDTF, by using session.table_function we will get a new DataFrame with the data generated by teh UDTF

In [None]:
df_udtf = session.table_function("word_count_udtf", F.lit("w1 w2 w2 w3 w3 w3"))
df_udtf.show()

We can also use it with a DataFrame, using call_table_function

In [None]:
df_udtf_data = session.create_dataframe([["w1 w2 w2 w3 w3 w3"]], schema=["text"])
df_udtf_data.show()

In [None]:
df_udtf_data.select(F.call_table_function("word_count_udtf", F.col("TEXT"))).show()

If we want to do the split/count by a column, the partition_by parameter can be used.

In [None]:
df_udtf_part_data = session.create_dataframe([["1", "w1 w2 w2 w3 w3 w3"], ["2", "w4 w4 w4 w4 w1"]], schema=["partition","text"])
df_udtf_part_data.show()

In [None]:
df_udtf_part_data.select("partition", F.call_table_function("word_count_udtf", F.col("TEXT")).over(partition_by="partition")).show()

In [None]:
@F.udtf(name="collect_list", is_permanent=False, replace=True, packages=["typing"], output_schema=T.StructType([T.StructField("list", T.ArrayType())]), session=session)
class collect_list_handler:
    def __init__(self) -> None:
        self.list = []
    def process(self, element: float) -> Iterable[Tuple[list]]:
        self.list.append(element)
        yield (self.list,)


In [None]:
train_df.with_column("collect_list", F.call_table_function("collect_list", F.col("FARE"))).show(5)

We can use a UDTF for doing Scoring, for example if we want to return multiple columns.

The example below uses the sklearn pipline we trained earlier to return the probabilities for 0 and 1 and the predicted class.

In [None]:
import joblib
import sys
import os
import pandas as pd
import numpy as np

session.clear_imports()
session.clear_packages()
@F.udtf(name="predict_survive_udtf", is_permanent=False, replace=True, packages=['typing', 'pandas', 'numpy', 'joblib', 'scikit-learn==1.1.3'], imports = [f"@{udf_stage_name}/rc_pipeline.joblib"]
        , output_schema=T.StructType([T.StructField("prob_0", T.FloatType()), T.StructField("prob_1", T.FloatType()), T.StructField("prediction", T.StringType())]), session=session)
class predict_survive_handler:
    # We load the model from stage at the start of each partition
    def __init__(self) -> None:
        import_dir = sys._xoptions.get("snowflake_import_directory")
        with open(os.path.join(import_dir, 'rc_pipeline.joblib'), 'rb') as file:
            self.model = joblib.load(file)
        self.classes = self.model.classes_
        
    # Score each input row
    def process(self, embarked: str, sex: str, pclass: str, age: float, fare: float) -> Iterable[Tuple[float, float, str]]:
        # Create a Pandas DataFrame of the input values
        pd_df = pd.DataFrame([[embarked, sex, pclass, age, fare]], columns=["EMBARKED","SEX", "PCLASS", "AGE", "FARE"])
        
        # Get the probabilities for 0/1
        prediction_proba = self.model.predict_proba(pd_df)[0]
        
        # Get the label for the highest probablility
        predicted_class_idx = np.argmax(prediction_proba)
        predicted_class = self.classes[predicted_class_idx]
        
        # Create a list with return values
        return_list = prediction_proba.tolist()
        return_list.append(predicted_class)
        
        # Return the list as a tuple
        yield tuple(return_list)


In [None]:
train_df = train_df.with_column("PCLASS", F.to_varchar(F.col("PCLASS")))
train_df.select( *input_cols, F.call_table_function("predict_survive_udtf", *input_cols)).show()