# UDF/UDTF Examples

This notebook contains different examples of how to create UDF/UDTF using the Snowpark API. A lot of this notebook comes from Mats Stellwall in his repo at https://github.com/mstellwa/snowpark_examples. Other places for help include the Snowflake documentation at https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-batch and Snowflake API documentation at https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/1.8.0/api/snowflake.snowpark.udf.UDFRegistration.


There is threes diffrent types of UDFs  in this notebook:
* UDF (Scalar User Defined Function)
    * Is a scalar function that returns one output row for each input row. 
    * The returned row consists of a single column/value.
    * Python UDF batch API enables defining UDFs that receive batches of input rows as Pandas DataFrames and return batches of results as Pandas arrays or Series
* A Vectorized UDF - 
* UDTF (User Defined Tabular Function)
    * A tabular function, also called a table function, returns zero, one, or multiple rows for each input row.

## 1. Setup Environment 
I have not put all the imports for the notebook in this section, so it's easier to see what is needed for the UDF/UDTF examples.

In [1]:
# Make sure we do not get line breaks when doing show on wide dataframes
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

# Snowpark imports 
import snowflake.snowpark as S
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
from snowflake.snowpark import Window

# Used for reading creds.json
import json

# Print the version of Snowpark we are using
print(f"Using Snowpark: {S.__version__}")

Using Snowpark: 1.11.1


In [2]:
with open('../creds.json') as f:
    connection_parameters = json.load(f)

session = Session.builder.configs(connection_parameters).create()
print("Current role: " + session.get_current_role() + ", Current schema: " + session.get_fully_qualified_current_schema() + ", Current WH: " + session.get_current_warehouse())

Current role: "RAJIV", Current schema: "RAJIV"."PUBLIC", Current WH: "RAJIV"


## 2. UDF - Hello World Example
Start by creating a UDF that returns a string, by setting *is_permanent=False* the UDF will only be avalible for our user and also only until the active Snowflake session is closed. The Function will be called for each input row ie it is not using the Batch API. By using **session.clear_imports()** and **session.clear_packages()** we make sure that old imports and packages are not included for the creation.

A UDF can be created using the **@udf** decorator, the **udf** function or the **udf.register** method ofthe session object. It can be permanent or temporary.

In [3]:
session.clear_imports()
session.clear_packages()
@F.udf(name="hello_udf", is_permanent=False, replace=True, session=session)
def hello_udf(name: str) -> str:
    return f'Hello {name}!'

Create a Snowpark DataFrame

In [4]:
test_name_df = session.create_dataframe([['Mats'], ['Pia']], schema=["name"])
test_name_df.show()

----------
|"NAME"  |
----------
|Mats    |
|Pia     |
----------



Test the function on the Snowpark DataFrame. Yes, your UDF is running inside Snowflake!

In [5]:
test_name_df.select(F.call_function("hello_udf", F.col("name"))).show()

-------------------------
|"HELLO_UDF(""NAME"")"  |
-------------------------
|Hello Pia!             |
|Hello Mats!            |
-------------------------



If a NULL value is provided to a UDF, it will be converted into a *None* value for the Python function **How big a deal is this???**

In [162]:
test_name_with_null_df = session.create_dataframe([['Mats'],[None], ['Pia']], schema=["name"])
test_name_with_null_df.show()

----------
|"NAME"  |
----------
|Mats    |
|NULL    |
|Pia     |
----------



In [163]:
test_name_with_null_df.select(F.call_function("hello_udf", F.col("name"))).show()

-------------------------
|"HELLO_UDF(""NAME"")"  |
-------------------------
|Hello Mats!            |
|Hello None!            |
|Hello Pia!             |
-------------------------



### Batch Example of Hello World

Create the same function again using the Python UDF batch API, this is done by changing the parameter to **PandasDataframe** or **PandasSeries** and the return to **PandasSeries**. The benfit of using the Python UDF batch API is that the function will not be called for each input row , but for a batches of rows instead.

In [6]:
session.clear_imports()
session.clear_packages()
@F.udf(name="hello_batch_udf", is_permanent=False, replace=True, session=session)
def hello_batch_udf(ds: T.PandasSeries[str]) -> T.PandasSeries[str]:
    n = len(ds)
    return ds.apply(lambda x: f'Hello {x}, we got {n} rows')

In [165]:
test_name_df.select(F.call_function("hello_batch_udf", F.col("name"))).show()

-------------------------------
|"HELLO_BATCH_UDF(""NAME"")"  |
-------------------------------
|Hello Mats, we got 1 rows    |
|Hello Pia, we got 1 rows     |
-------------------------------



Use a larger dataset for testing.

In [166]:
customers_df = session.table("SFSALESSHARED_SFC_SAMPLES_PROD3_SAMPLE_DATA.tpcds_sf100tcl.customer")# Name of Snowflake Sample Database might be different...
print(f"Nbr of customers: {customers_df.count():,}")
customers_df.show(5)

Nbr of customers: 100,000,000
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"C_CUSTOMER_SK"  |"C_CUSTOMER_ID"   |"C_CURRENT_CDEMO_SK"  |"C_CURRENT_HDEMO_SK"  |"C_CURRENT_ADDR_SK"  |"C_FIRST_SHIPTO_DATE_SK"  |"C_FIRST_SALES_DATE_SK"  |"C_SALUTATION"  |"C_FIRST_NAME"  |"C_LAST_NAME"  |"C_PREFERRED_CUST_FLAG"  |"C_BIRTH_DAY"  |"C_BIRTH_MONTH"  |"C_BIRTH_YEAR"  |"C_BIRTH_COUNTRY"  |"C_LOGIN"  |"C_EMAIL_ADDRESS"                |"C_LAST_REVIEW_DATE"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

If we test this using **show** we will see that it is only providing 15 rows since that is the limit we are setting

In [167]:
customers_df.select(F.col("C_FIRST_NAME")).select(F.call_function("hello_batch_udf", F.col("C_FIRST_NAME"))).show(15)

---------------------------------------
|"HELLO_BATCH_UDF(""C_FIRST_NAME"")"  |
---------------------------------------
|Hello Brian, we got 15 rows          |
|Hello Margurite, we got 15 rows      |
|Hello Keith, we got 15 rows          |
|Hello Jessica, we got 15 rows        |
|Hello Susan, we got 15 rows          |
|Hello Orpha, we got 15 rows          |
|Hello Matthew, we got 15 rows        |
|Hello Jared, we got 15 rows          |
|Hello Brenda, we got 15 rows         |
|Hello William, we got 15 rows        |
|Hello Dennis, we got 15 rows         |
|Hello Donna, we got 15 rows          |
|Hello Kevin, we got 15 rows          |
|Hello Adam, we got 15 rows           |
|Hello John, we got 15 rows           |
---------------------------------------



By using **cache_result** we can temprary store the result of the query generated by the DataFrame and then seee that each call to the function does provide more rows.  You can see we can run more than 100M in less than 1 minute!

In [168]:
batch_udf_df = customers_df.select(F.call_function("hello_batch_udf", F.col("C_FIRST_NAME"))).cache_result()
batch_udf_df.show(15)

---------------------------------------
|"HELLO_BATCH_UDF(""C_FIRST_NAME"")"  |
---------------------------------------
|Hello <NA>, we got 256 rows          |
|Hello Joseph, we got 256 rows        |
|Hello Gregory, we got 256 rows       |
|Hello Melvin, we got 256 rows        |
|Hello Joe, we got 256 rows           |
|Hello Thomas, we got 256 rows        |
|Hello Pearl, we got 256 rows         |
|Hello Ellen, we got 256 rows         |
|Hello Trenton, we got 256 rows       |
|Hello Cynthia, we got 256 rows       |
|Hello Irene, we got 256 rows         |
|Hello Craig, we got 256 rows         |
|Hello Vincent, we got 256 rows       |
|Hello Alan, we got 256 rows          |
|Hello Tony, we got 256 rows          |
---------------------------------------



## 3. Create a UDF that uses as saved Python object

Creating a UDF that uses as saved Python object. In this case a fitted scikit-learn pipline. **I am using native SKLearn to show how to use a any sort of python package.**  
For many packages, Snowpark has a distributed version that is much easier to use.  Those versions also allow for easy use of the model registry and predict function.  
The goal here is when you are using Python packages that are not optimized for Snowpark. 

### Build the pipeline

In [9]:
import numpy as np
import pandas as pd
import random
import string

from sklearn.datasets import make_regression
from sklearn.preprocessing import MinMaxScaler, OrdinalEncoder
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestRegressor
from sklearn.compose import ColumnTransformer



NUMERICAL_COLS = ["X1", "X2", "X3"]
CATEGORICAL_COLS = ["C1", "C2", "C3"]
FEATURE_COLS = NUMERICAL_COLS + CATEGORICAL_COLS

# Create a dataset with numerical and categorical features
X, y = make_regression(
    n_samples=1000,
    n_features=3,
    noise=0.1,
    random_state=0,
)

def generate_random_string(length):
    return "".join(random.choices(string.ascii_uppercase, k=length))

num_categorical_cols, categorical_feature_length = 3, 2
categorical_features = []
for _ in range(num_categorical_cols):
    categorical_column = [generate_random_string(categorical_feature_length) for _ in range(X.shape[0])]
    categorical_features.append(categorical_column)
X = np.column_stack((X, *categorical_features))
X = pd.DataFrame(X, columns=FEATURE_COLS)
X[NUMERICAL_COLS] = X[NUMERICAL_COLS].astype(float)  ##Change

preprocessor = ColumnTransformer(
    transformers=[
        ('num', MinMaxScaler(), NUMERICAL_COLS),
        ('cat', OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=99999), CATEGORICAL_COLS)
    ]
)

# Create the pipeline with the ColumnTransformer
pipeline = Pipeline(steps=[('preprocessor', preprocessor),
                       ('classifier',RandomForestRegressor())])

rc_pipeline = pipeline.fit(X,y)

Remember what the attributes are of the object you are using. Often for models they will have a predict function that we later want to use.
Save the fitted pipeline as a file locally using joblib.

In [339]:
joblib.dump(rc_pipeline, "rc_pipeline.joblib")

['rc_pipeline.joblib']

Upload the file to the Snowflake stage

In [10]:
udf_stage_name ="RAJ_MODELS" ##Put your stage name here
session.file.put("rc_pipeline.joblib", f"@{udf_stage_name}", auto_compress=False, overwrite=True)

[PutResult(source='rc_pipeline.joblib', target='rc_pipeline.joblib', source_size=9161831, target_size=9161840, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

Verify the object is in the stage

In [438]:
session.sql(f"ls @{udf_stage_name}").show()

--------------------------------------------------------------------------------------------------------------
|"name"                         |"size"   |"md5"                             |"last_modified"                |
--------------------------------------------------------------------------------------------------------------
|raj_models/model.joblib.gz     |13216    |abe8081ad8ca6dea523caac2518f7c11  |Tue, 6 Feb 2024 02:39:52 GMT   |
|raj_models/rc_pipeline.joblib  |9161840  |57c818aac2f62e96685e603d2a880d70  |Sun, 11 Feb 2024 03:19:51 GMT  |
--------------------------------------------------------------------------------------------------------------



### Creating the UDF

Create a function to load the file using joblib, use cachetools so the read from stage is only done once

In [12]:

import cachetools

@cachetools.cached(cache={})
def load_joblib_file(filename):
    import joblib
    import sys
    import os

    import_dir = sys._xoptions.get("snowflake_import_directory")
    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m


Create the UDF, it is important that the *imports* parameter is refering the stage and file. Also, only the filename is needed for the *load_joblib_file* function.

Since the function is depended on **Pandas**, **scikit-learn** and **cachetools** we need to add them to the *packages* parameter.

We will also make sure UDF scikit-learn version matches the local one.

In [13]:
from sklearn import __version__ as sk_version
sk_version

'1.3.0'

In [14]:
from pandas import __version__ as pd_version
pd_version

'1.5.3'

If you get an error here like: `Cannot add package scikit-learn==1.3.2 because it is not available in Snowflake ` - Welcome to production data science where version numbers matter - go ahead and install a supported version of scikit-learn in your local account. 

In [27]:
session.clear_imports()
session.clear_packages()
@F.udf(name = "predict_model_udf", is_permanent = False, imports = [f"@{udf_stage_name}/rc_pipeline.joblib"]  
       , packages = [f'pandas=={pd_version}', f'scikit-learn=={sk_version}', 'cachetools'], replace = True, session = session)
def predict_survive(pd_df: T.PandasDataFrame[float, float, float, str, str, str]) -> T.PandasSeries[int]:
    
    pd_df.columns = [FEATURE_COLS]
    model = load_joblib_file('rc_pipeline.joblib') # Only call with the file name!

    return model.predict(pd_df)    

Test that the UDF works.  

In [23]:
features_df = session.create_dataframe(X)
features_df.show()

-------------------------------------------------------------------------------------------
|"X1"                 |"X2"                   |"X3"                  |"C1"  |"C2"  |"C3"  |
-------------------------------------------------------------------------------------------
|-1.1802856063511906  |1.012168295174788      |-0.28044778146745414  |HY    |OH    |XT    |
|0.7335948682293696   |2.0118642631265615     |0.3030110504561574    |NN    |CB    |LR    |
|-0.5139029502799155  |-0.7688491596748099    |0.9882405737426969    |NV    |BG    |DV    |
|1.037585667050634    |0.018791791774257802   |1.3925184494342724    |PB    |WM    |EM    |
|-0.3550287310553741  |-1.8923618933173414    |-0.3004787855854223   |RZ    |GR    |FV    |
|1.183119557331707    |0.7188971655282916     |1.4969109935208271    |QX    |LH    |AA    |
|0.2390336012467649   |-1.0003303489537054    |-0.04932407014757259  |OS    |YG    |KK    |
|0.46637957438197886  |-0.09439250641118496   |-0.47728627040322935  |NO    |VE 

Now you are using the UDF on your Snowflake data!!!

In [28]:
input_cols = [F.col(col) for col in FEATURE_COLS ]
features_df.with_column("PREDICTION", F.call_function("predict_model_udf",  *input_cols)).show()

----------------------------------------------------------------------------------------------------------
|"X1"                 |"X2"                   |"X3"                  |"C1"  |"C2"  |"C3"  |"PREDICTION"  |
----------------------------------------------------------------------------------------------------------
|-1.1802856063511906  |1.012168295174788      |-0.28044778146745414  |HY    |OH    |XT    |-15           |
|0.7335948682293696   |2.0118642631265615     |0.3030110504561574    |NN    |CB    |LR    |257           |
|-0.5139029502799155  |-0.7688491596748099    |0.9882405737426969    |NV    |BG    |DV    |-54           |
|1.037585667050634    |0.018791791774257802   |1.3925184494342724    |PB    |WM    |EM    |168           |
|-0.3550287310553741  |-1.8923618933173414    |-0.3004787855854223   |RZ    |GR    |FV    |-222          |
|1.183119557331707    |0.7188971655282916     |1.4969109935208271    |QX    |LH    |AA    |237           |
|0.2390336012467649   |-1.00033034895

### Batch UDF
A batch UDF can also be called by providing the inputs as a array for example

In [29]:
session.clear_imports()
session.clear_packages()

@F.udf(name = "predict_model_array_udf", is_permanent = False, imports = [f"@{udf_stage_name}/rc_pipeline.joblib"]
       , packages = [f'pandas=={pd_version}', f'scikit-learn=={sk_version}', 'cachetools'], replace = True, session = session)
def predict_survive_array(pd_s: T.PandasSeries[list]) -> T.PandasSeries[int]:
    pd_df = pd.DataFrame.from_dict(dict(zip(pd_s.index, pd_s.values))).T
    pd_df.columns = [FEATURE_COLS]
    model = load_joblib_file('rc_pipeline.joblib') # Only call with the file name!

    return model.predict(pd_df)


In [30]:
input_cols = [F.col(col) for col in FEATURE_COLS ]
features_df.with_column("PREDICTION", F.call_function("predict_model_array_udf", F.array_construct(*input_cols))).show()

----------------------------------------------------------------------------------------------------------
|"X1"                 |"X2"                   |"X3"                  |"C1"  |"C2"  |"C3"  |"PREDICTION"  |
----------------------------------------------------------------------------------------------------------
|-1.1802856063511906  |1.012168295174788      |-0.28044778146745414  |HY    |OH    |XT    |-15           |
|0.7335948682293696   |2.0118642631265615     |0.3030110504561574    |NN    |CB    |LR    |257           |
|-0.5139029502799155  |-0.7688491596748099    |0.9882405737426969    |NV    |BG    |DV    |-54           |
|1.037585667050634    |0.018791791774257802   |1.3925184494342724    |PB    |WM    |EM    |168           |
|-0.3550287310553741  |-1.8923618933173414    |-0.3004787855854223   |RZ    |GR    |FV    |-222          |
|1.183119557331707    |0.7188971655282916     |1.4969109935208271    |QX    |LH    |AA    |237           |
|0.2390336012467649   |-1.00033034895

### Dict as an input into the UDF

In [31]:
session.clear_imports()
session.clear_packages()

@F.udf(name = "predict_model_dict_udf", is_permanent = False, imports = [f"@{udf_stage_name}/rc_pipeline.joblib"]
       , packages = [f'pandas=={pd_version}', f'scikit-learn=={sk_version}', 'cachetools'], replace = True, session = session)
def predict_survive_dict(pd_s: T.PandasSeries[dict]) -> T.PandasSeries[int]:
    pd_df = pd.json_normalize(pd_s)[['X1', 'X2', 'X3', 'C1', 'C2', 'C3']]
    model = load_joblib_file('rc_pipeline.joblib') # Only call with the file name!

    return model.predict(pd_df)

In [32]:
features_df.with_column("PREDICTION", F.call_function("predict_model_dict_udf", F.object_construct('*'))).show()

----------------------------------------------------------------------------------------------------------
|"X1"                 |"X2"                   |"X3"                  |"C1"  |"C2"  |"C3"  |"PREDICTION"  |
----------------------------------------------------------------------------------------------------------
|-1.1802856063511906  |1.012168295174788      |-0.28044778146745414  |HY    |OH    |XT    |-15           |
|0.7335948682293696   |2.0118642631265615     |0.3030110504561574    |NN    |CB    |LR    |257           |
|-0.5139029502799155  |-0.7688491596748099    |0.9882405737426969    |NV    |BG    |DV    |-54           |
|1.037585667050634    |0.018791791774257802   |1.3925184494342724    |PB    |WM    |EM    |168           |
|-0.3550287310553741  |-1.8923618933173414    |-0.3004787855854223   |RZ    |GR    |FV    |-222          |
|1.183119557331707    |0.7188971655282916     |1.4969109935208271    |QX    |LH    |AA    |237           |
|0.2390336012467649   |-1.00033034895

It is possible to return multiple values with a UDF a list or Dict is needed. The code below doesn't work for my model, since I bult a regressor. But by definining the output as a list, you are able to return multiple values.

In [33]:
##Doesn't work -- My model does't support predict_proba
session.clear_imports()
session.clear_packages()

@F.udf(name = "predict_model_array_return_udf", is_permanent = False, imports = [f"@{udf_stage_name}/rc_pipeline.joblib"]
       , packages = [f'pandas=={pd_version}', f'scikit-learn=={sk_version}', 'cachetools'], replace = True, session = session)
def predict_survive_array_return(pd_df: T.PandasDataFrame[str, str, str, float, float]) -> T.PandasSeries[list]:
    
    pd_df.columns = [FEATURE_COLS]
    model = load_joblib_file('rc_pipeline.joblib') # Only call with the file name!
    prediction_proba = model.predict_proba(pd_df)
        
    # Get the label for the highest probablility
    predicted_classes_idx = np.argmax(prediction_proba, axis=1)
    classes = model.classes_
    predicted_classes = classes[predicted_classes_idx]

    # Create a list with return values
    return_array = np.column_stack((prediction_proba, predicted_classes))

    return return_array

# Doesn't work -- My model does't support predict_proba
#features_df.with_column("RETURN_ARRAY", F.call_function("predict_survive_array_return_udf", *input_cols)).show()

## 4. Vectorized UDFs
Vectorized UDFs and standard udfs are both parallelized the exact same way. The difference between them is that a vectorized udf relies on passing numpy arrays of data as input versus rowsets which can be much more efficient when your udf is utilizing numpy based functions for computation (which is the case with most many ML libraries)
You then run the UDF with batches, each Batch must finish within 180 seconds.  

To use a vectorized UDF, you just need to set the proper return variable types. Snowpark will then automatically use the vectorized UDFs. 

In [34]:
session.clear_imports()
session.clear_packages()
@F.udf(name = "predict_survive_udf", is_permanent = False, imports = [f"@{udf_stage_name}/rc_pipeline.joblib"],max_batch_size= 100
       , packages = [f'pandas=={pd_version}', f'scikit-learn=={sk_version}', 'cachetools'], replace = True, session = session)
def predict_survive(pd_df: T.PandasDataFrame[float, float, float, str, str, str]) -> T.PandasSeries[int]:
    
    pd_df.columns = [FEATURE_COLS]
    model = load_joblib_file('rc_pipeline.joblib') # Only call with the file name!

    return model.predict(pd_df)    

In [35]:
input_cols = [F.col(col) for col in FEATURE_COLS ]
features_df.with_column("PREDICTION", F.call_function("predict_survive_udf",  *input_cols)).show()

----------------------------------------------------------------------------------------------------------
|"X1"                 |"X2"                   |"X3"                  |"C1"  |"C2"  |"C3"  |"PREDICTION"  |
----------------------------------------------------------------------------------------------------------
|-1.1802856063511906  |1.012168295174788      |-0.28044778146745414  |HY    |OH    |XT    |-15           |
|0.7335948682293696   |2.0118642631265615     |0.3030110504561574    |NN    |CB    |LR    |257           |
|-0.5139029502799155  |-0.7688491596748099    |0.9882405737426969    |NV    |BG    |DV    |-54           |
|1.037585667050634    |0.018791791774257802   |1.3925184494342724    |PB    |WM    |EM    |168           |
|-0.3550287310553741  |-1.8923618933173414    |-0.3004787855854223   |RZ    |GR    |FV    |-222          |
|1.183119557331707    |0.7188971655282916     |1.4969109935208271    |QX    |LH    |AA    |237           |
|0.2390336012467649   |-1.00033034895

## 5. UDTF

User Defined Table Functions (UDTF) is a function that returns zero, one, or multiple rows for each input row.

When creating a UDTF a Python class has to be used as the handler

A UDTF handler class implements the following, which Snowflake invokes at run time:
* An **__init__** method. Optional. Invoked to initialize stateful processing of input partitions.
* A **process** method. Required. Invoked for each input row. The method returns a tabular value as tuples.
* An **end_partition** method. Optional. Invoked to finalize processing of input partitions.

A UDTF can be created using the **@udtf** decorator, the **udtf** function or the **udtf.register** method ofthe session object. It can be permanent or temporary.



### Python model as UDTF
Use our model and move it to a UDTF. The key here is setting the input and output types. For a UDTF the output is typically multiple rows, so dataframe is used here. This is getting to more code, so I broke up registration of the UTDF and the code for the UTDF. This allows me to ensure the code is running correctly locally, before I try to register the UDTF inside Snowflake.

In [42]:
from snowflake.snowpark.functions import udtf, lit
from snowflake.snowpark.types import PandasDataFrameType, IntegerType, StringType, FloatType
session.clear_imports()
session.clear_packages()

class predict_model:
    # We load the model from stage at the start of each partition
    def __init__(self) -> None:
        import os
        import joblib
        import sys
        import pandas as pd
        #self.model = joblib.load('rc_pipeline.joblib')
        import_dir = sys._xoptions.get("snowflake_import_directory")
        with open(os.path.join(import_dir, 'rc_pipeline.joblib'), 'rb') as file:
            self.model = joblib.load(file)
        
    # Called after the last row in a partion has been processed
    def end_partition(self, pd_df):
        # Set the column name of the provided Pandas DataFrame
        pd_df.columns=["X1","X2","X3","C1","C2","C3"]
        results = self.model.predict(pd_df) # Get the name of the 7th column
        print (results)
        df_seventh_column = pd.DataFrame(results)
        print (df_seventh_column)
        yield df_seventh_column

predict_udtf = session.udtf.register(predict_model,is_permanent=False, replace=True, imports = [f"@{udf_stage_name}/rc_pipeline.joblib"]
        , input_types=[T.PandasDataFrameType([T.FloatType(), T.FloatType(),T.FloatType(), T.StringType(), T.StringType(), T.StringType()])]
        , output_schema=PandasDataFrameType([FloatType()], ["PREDICTIONS"])
        , packages = [f'pandas=={pd_version}', f'scikit-learn=={sk_version}', 'cachetools'])

Run it locally like: 
```
predictor = predict_udtf()
final = predictor.end_partition(X)
final
```

In [43]:
results = features_df.select(predict_udtf("X1","X2","X3","C1","C2","C3").over(partition_by=["X1"]))
results.show()

-----------------------
|"PREDICTIONS"        |
-----------------------
|53.91033785203392    |
|137.7003877591698    |
|-195.21377894882605  |
|-4.268479029560137   |
|2.2400981635396415   |
|-160.1945418373434   |
|103.51438829189482   |
|-109.63587457610556  |
|-152.69719097490136  |
|-6.194146150918908   |
-----------------------



### Word Count UDTF

Start with a simple UDTF that splits a string into words and fore each unique word it returns a row with it and the number of ocurrances in the string of it. We need to provide the output schema ie the columns of the returning rows. If only names are provided the data types are inheried from the process parameters

In [52]:
from collections import Counter
from typing import Iterable, Tuple

session.clear_imports()
session.clear_packages()

@F.udtf(name="word_count_udtf", output_schema=["word", "count"], is_permanent=False, replace=True, session=session)
class MyWordCount:
    # Called once for each partition
    def __init__(self):
        self._total_per_partition = 0
    
    # Called for each row in a partition
    def process(self, s1: str) -> Iterable[Tuple[str, int]]:
        words = s1.split()
        self._total_per_partition = len(words)
        # Counter will return a dict with the uinique words as keys and the number ocurrances as the values
        counter = Counter(words) 
        yield from counter.items()
    
    # Called after the last row in a partion has been processed
    def end_partition(self):
        yield ("partition_total", self._total_per_partition)

Test the UDTF, by using session.table_function we will get a new DataFrame with the data generated by the UDTF

In [53]:
df_udtf = session.table_function("word_count_udtf", F.lit("w1 w2 w2 w3 w3 w3"))
df_udtf.show()

-----------------------------
|"WORD"           |"COUNT"  |
-----------------------------
|w1               |1        |
|w2               |2        |
|w3               |3        |
|partition_total  |6        |
-----------------------------



We can also use it with a DataFrame, using call_table_function

In [48]:
df_udtf_data = session.create_dataframe([["w1 w2 w2 w3 w3 w3"]], schema=["text"])
df_udtf_data.show()

---------------------
|"TEXT"             |
---------------------
|w1 w2 w2 w3 w3 w3  |
---------------------



In [49]:
df_udtf_data.select(F.call_table_function("word_count_udtf", F.col("TEXT"))).show()

-----------------------------
|"WORD"           |"COUNT"  |
-----------------------------
|w1               |1        |
|w2               |2        |
|w3               |3        |
|partition_total  |6        |
-----------------------------



If we want to do the split/count by a column, the partition_by parameter can be used.

In [50]:
df_udtf_part_data = session.create_dataframe([["1", "w1 w2 w2 w3 w3 w3"], ["2", "w4 w4 w4 w4 w1"]], schema=["partition","text"])
df_udtf_part_data.show()

-----------------------------------
|"PARTITION"  |"TEXT"             |
-----------------------------------
|1            |w1 w2 w2 w3 w3 w3  |
|2            |w4 w4 w4 w4 w1     |
-----------------------------------



In [51]:
df_udtf_part_data.select("partition", F.call_table_function("word_count_udtf", F.col("TEXT")).over(partition_by="partition")).show()

-------------------------------------------
|"PARTITION"  |"WORD"           |"COUNT"  |
-------------------------------------------
|1            |w1               |1        |
|1            |w2               |2        |
|1            |w3               |3        |
|1            |partition_total  |6        |
|2            |w4               |4        |
|2            |w1               |1        |
|2            |partition_total  |5        |
-------------------------------------------

