In [31]:
import os
import sys
import json
import math
import cachetools
import numpy as np
import pandas as pd
import configparser
from snowflake.snowpark import Session
from copy import copy
from snowflake.snowpark import Row
import snowflake.snowpark.functions as F
from snowflake.snowpark.functions import col, lit, sql_expr, get, get_path, udf, udtf, table_function, sproc, seq8, uniform, when_matched, when_not_matched, cast, try_cast, asc, asc_nulls_first, asc_nulls_last, collate, startswith, endswith, equal_nan, is_null, in_, when
from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType, DecimalType, LongType, BooleanType, FloatType, PandasSeries, PandasSeriesType, PandasDataFrame, PandasDataFrameType
from snowflake.snowpark.exceptions import SnowparkJoinException, SnowparkSQLException
from snowflake.snowpark.files import SnowflakeFile
from snowflake.snowpark.column import METADATA_FILENAME, METADATA_FILE_ROW_NUMBER
from collections import Counter
from typing import Iterable, Tuple

# Read snowflake credentials securely
config = configparser.ConfigParser()
config.read('assets/credentials.cfg')

connection_parameters = dict(
   account   =  config['SNOWPARKAWS']['SNOWFLAKE_ACCOUNT'],
   user      =  config['SNOWPARKAWS']['SNOWFLAKE_USER'],
   password  =  config['SNOWPARKAWS']['SNOWFLAKE_PASSWORD'],
   role      =  config['SNOWPARKAWS']['SNOWFLAKE_ROLE'],  # optional
   warehouse =  config['SNOWPARKAWS']['SNOWFLAKE_WAREHOUSE'],  # optional
   database  =  config['SNOWPARKAWS']['SNOWFLAKE_DATABASE'],  # optional
   schema    =  config['SNOWPARKAWS']['SNOWFLAKE_SCHEMA'],  # optional
)

# Pass this dictionary to the Session.builder.configs method to return a builder object that has these connection parameters.
# Call the create method of the builder to establish the session.
session = Session.builder.configs(connection_parameters).create()

### `User-Defined Table Functions`

- [udtf](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/udtf) : User-defined table functions (UDTFs) in Snowpark.

- `normal UDTF`: default row-by-row processing pattern
- `vectorized Python UDTFs`: enable seamless partition-by-partition processing by operating on partitions as Pandas DataFrames and returning results as Pandas DataFrames or lists of Pandas arrays or Pandas Series.

`Example 1: Create a temporary UDTF and call it:`

In [2]:
from snowflake.snowpark.types import IntegerType, StructField, StructType
from snowflake.snowpark.functions import udtf, lit

class GeneratorUDTF:
    def process(self, n):
        for i in range(n):
            yield (i,)

generator_udtf = udtf(GeneratorUDTF
                      ,input_types=[IntegerType()]
                      ,output_schema=StructType([StructField("number", IntegerType())])
                      )

In [3]:
# Query it by calling it
session.table_function(generator_udtf(lit(8))).show()

------------
|"NUMBER"  |
------------
|0         |
|1         |
|2         |
|3         |
|4         |
|5         |
|6         |
|7         |
------------



In [4]:
# Query it by using the name
session.table_function(generator_udtf.name, lit(4)).show()

------------
|"NUMBER"  |
------------
|0         |
|1         |
|2         |
|3         |
------------



In [5]:
# Or you can lateral-join a UDTF like any other table functions
df = session.create_dataframe([2,3], schema=["c"])

df.join_table_function(generator_udtf(df["c"])).sort("c", "number").show()

------------------
|"C"  |"NUMBER"  |
------------------
|2    |0         |
|2    |1         |
|3    |0         |
|3    |1         |
|3    |2         |
------------------



`Example 2: Create a UDTF with type hints and @udtf decorator and query it:`

In [12]:
from snowflake.snowpark.types import IntegerType, StructField, StructType, Iterable
from snowflake.snowpark.functions import udtf, lit
from typing import Tuple

@udtf(output_schema=["number"])
class generator_udtf:
    def process(self, n: int)->Iterable[Tuple[int]]:
        for i in range(n):
            yield(i,)

In [13]:
# Query it by calling it
session.table_function(generator_udtf(lit(3))).show()

------------
|"NUMBER"  |
------------
|0         |
|1         |
|2         |
------------



In [14]:
# Query it by using the name
session.table_function(generator_udtf.name, lit(3)).show()

------------
|"NUMBER"  |
------------
|0         |
|1         |
|2         |
------------



`Example 3: Create a permanent UDTF with a name and call it in SQL:`

In [15]:
from snowflake.snowpark.types import IntegerType, StructField, StructType
from snowflake.snowpark.functions import udtf, lit

class GeneratorUDTF:
    def process(self, n):
        for i in range(n):
            yield (i,)

generator_udtf = udtf(GeneratorUDTF
                     ,output_schema=StructType([StructField("number", IntegerType())])
                     ,input_types=[IntegerType()]
                     ,is_permanent=True
                     ,name="generator_udtf"
                     ,replace=True
                     ,stage_location="@sf_udf_int_stg"
                    )

In [16]:
session.sql("select * from table(generator_udtf(3))").show()

------------
|"NUMBER"  |
------------
|0         |
|1         |
|2         |
------------



In [18]:
session.table_function(generator_udtf(lit(3))).show()

------------
|"NUMBER"  |
------------
|0         |
|1         |
|2         |
------------



`Example 4: Create a UDTF with type hints:`

In [19]:
from snowflake.snowpark.types import IntegerType, StructField, StructType, Iterable
from snowflake.snowpark.functions import udtf, lit
from typing import Tuple

@udtf(output_schema=["n1","n2"])
class generator_udtf:
    def process(self, n: int) -> Iterable[Tuple[int, int]]:
        for i in range(n):
            yield (i, i+1)

In [20]:
session.table_function(generator_udtf(lit(3))).show()

---------------
|"N1"  |"N2"  |
---------------
|0     |1     |
|1     |2     |
|2     |3     |
---------------



`Example 5: Create a UDTF with type hints by using ... for multiple columns of the same type:`

In [21]:
from snowflake.snowpark.types import IntegerType, StructField, StructType, Iterable
from snowflake.snowpark.functions import udtf, lit
from typing import Tuple

@udtf(output_schema=["n1","n2"])
class generator_udtf:
    def process(self, n: int) -> Iterable[Tuple[int, ...]]:
        for i in range(n):
            yield (i, i+1)

In [22]:
session.table_function(generator_udtf(lit(3))).show()

---------------
|"N1"  |"N2"  |
---------------
|0     |1     |
|1     |2     |
|2     |3     |
---------------



`Example 6: Create a UDTF with UDF-level imports and type hints:`

In [23]:
from scripts.mod5 import mod5
from snowflake.snowpark.types import IntegerType, StructField, StructType, Iterable
from snowflake.snowpark.functions import udtf, lit
from typing import Tuple

@udtf(output_schema=["number"], imports=[("scripts/mod5.py", "scripts.mod5")])
class generator_udtf:
    def process(self, n: int)->Iterable[Tuple[int]]:
        for i in range(n):
            yield(mod5(i),)

In [24]:
session.table_function(generator_udtf(lit(6))).show()

------------
|"NUMBER"  |
------------
|0         |
|1         |
|2         |
|3         |
|4         |
|0         |
------------



`Example 7: Create a UDTF with UDF-level packages and type hints:`

In [25]:
from scripts.mod5 import mod5
from snowflake.snowpark.types import IntegerType, StructField, StructType, Iterable
from snowflake.snowpark.functions import udtf, lit
from typing import Tuple
import numpy as np

@udtf(output_schema=["number"], packages=["numpy"])
class generator_udtf:
    def process(self, n: int)->Iterable[Tuple[int]]:
        for i in np.arange(n):
            yield(i,)

In [26]:
session.table_function(generator_udtf(lit(6))).show()

------------
|"NUMBER"  |
------------
|0         |
|1         |
|2         |
|3         |
|4         |
|5         |
------------



`Example 8: Creating a UDTF with the constructor and end_partition method.`

In [33]:
from collections import Counter
from typing import Iterable, Tuple
from snowflake.snowpark.functions import lit

class MyWordCount:
    def __init__(self) -> None:
        self._total_per_partition = 0

    def process(self, s1: str) -> Iterable[Tuple[str, int]]:
        words = s1.split()
        self._total_per_partition = len(words)
        counter = Counter(words)
        yield from counter.items()

    def end_partition(self):
        yield ("partition_total", self._total_per_partition)

In [34]:
udtf_name = "word_count_udtf"
word_count_udtf = session.udtf.register(MyWordCount
                                      ,["word", "count"]
                                      ,name=udtf_name
                                      ,is_permanent=False
                                      ,replace=True
                                       )

In [30]:
# Call it by its name
df1 = session.table_function(udtf_name, lit("w1 w2 w2 w3 w3 w3"))
df1.show()

-----------------------------
|"WORD"           |"COUNT"  |
-----------------------------
|w1               |1        |
|w2               |2        |
|w3               |3        |
|partition_total  |6        |
-----------------------------



In [35]:
# Call it by the returned callable instance
df2 = session.table_function(word_count_udtf(lit("w1 w2 w2 w3 w3 w3")))
df2.show()

-----------------------------
|"WORD"           |"COUNT"  |
-----------------------------
|w1               |1        |
|w2               |2        |
|w3               |3        |
|partition_total  |6        |
-----------------------------



`Example 9: Creating a UDTF from a local Python file:`

In [37]:
from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType, BooleanType, DecimalType, LongType
from snowflake.snowpark.functions import udtf, lit

generator_udtf = session.udtf.register_from_file(
     file_path="scripts/test_udtf_file.py"
    ,handler_name="GeneratorUDTF"
    ,output_schema=StructType([StructField("number", IntegerType())])
    ,input_types=[IntegerType()]
                                               )

In [38]:
session.table_function(generator_udtf(lit(3))).show()

------------
|"NUMBER"  |
------------
|0         |
|1         |
|2         |
------------



`Example 10: Creating a UDTF from a Python file on an internal stage:`

In [39]:
# Loading the file to snowflake Stage
session.file.put('scripts/test_udtf_file.py', "@SF_INT_STG", auto_compress=False, overwrite=True)

[PutResult(source='test_udtf_file.py', target='test_udtf_file.py', source_size=96, target_size=112, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

In [42]:
session.sql("list @SF_INT_STG").show(50)

------------------------------------------------------------------------------------------------------------------------------------
|"name"                                              |"size"    |"md5"                             |"last_modified"                |
------------------------------------------------------------------------------------------------------------------------------------
|sf_int_stg/avro_dataset.avro                        |19114464  |71cf879b3e591be4443b9bbee339d394  |Thu, 14 Dec 2023 15:07:51 GMT  |
|sf_int_stg/car_sales.json                           |848       |d69652cd0d10f4651db31c180bd8dff6  |Fri, 8 Dec 2023 17:25:35 GMT   |
|sf_int_stg/car_sales1.json                          |656       |04c4262b096991965a9887a75393ef8d  |Sat, 9 Dec 2023 00:59:33 GMT   |
|sf_int_stg/cities.parquet                           |880       |66d6d49b068c5062c4ac8e7c7aaeef8b  |Thu, 14 Dec 2023 15:07:56 GMT  |
|sf_int_stg/copyloc/PLOGANATHAN/data_01b0fc8e-06...  |18000     |4afc

In [43]:
from snowflake.snowpark.types import StructType, StructField, StringType, IntegerType, BooleanType, DecimalType, LongType
from snowflake.snowpark.functions import udtf, lit

generator_udtf = session.udtf.register_from_file(
     file_path="@sf_int_stg/test_udtf_file.py"
    ,handler_name="GeneratorUDTF"
    ,output_schema=StructType([StructField("number", IntegerType())])
    ,input_types=[IntegerType()]
)

In [44]:
session.table_function(generator_udtf(lit(3))).show()

------------
|"NUMBER"  |
------------
|0         |
|1         |
|2         |
------------



`Example 11: Creating a vectorized UDTF by specifying a PandasDataFrameType as input_types and a PandasDataFrameType with column names as output_schema.`

In [49]:
from snowflake.snowpark.types import PandasDataFrameType, IntegerType, StringType, FloatType

class multiply:
     def __init__(self):
         self.multiplier = 10
         
     def end_partition(self, df):
         df.col1 = df.col1*self.multiplier
         df.col2 = df.col2*self.multiplier
         yield df

In [50]:
multiply_udtf = session.udtf.register(
     multiply,
     output_schema=PandasDataFrameType([StringType(), IntegerType(), FloatType()], ["id_", "col1_", "col2_"]),
     input_types=[PandasDataFrameType([StringType(), IntegerType(), FloatType()])],
     input_names = ['"id"', '"col1"', '"col2"'],
 )

In [52]:
df = session.create_dataframe([['x', 3, 35.9],['y', 9, 20.5]], schema=["id", "col1", "col2"])
df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show()

-----------------------------
|"ID_"  |"COL1_"  |"COL2_"  |
-----------------------------
|x      |30       |359.0    |
|y      |90       |205.0    |
-----------------------------



`Example 12: Creating a vectorized UDTF by specifying PandasDataFrame with nested types as type hints.`

In [53]:
from snowflake.snowpark.types import PandasDataFrame

class multiply:
    def __init__(self):
        self.multiplier = 10
        
    def end_partition(self, df: PandasDataFrame[str, int, float]) -> PandasDataFrame[str, int, float]:
        df.col1 = df.col1*self.multiplier
        df.col2 = df.col2*self.multiplier
        yield df

In [54]:
multiply_udtf = session.udtf.register(
    multiply,
    output_schema=["id_", "col1_", "col2_"],
    input_names = ['"id"', '"col1"', '"col2"'],
)

In [55]:
df = session.create_dataframe([['x', 3, 35.9],['y', 9, 20.5]], schema=["id", "col1", "col2"])
df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show()

-----------------------------
|"ID_"  |"COL1_"  |"COL2_"  |
-----------------------------
|x      |30       |359.0    |
|y      |90       |205.0    |
-----------------------------



`Example 13: Creating a vectorized UDTF by specifying a pandas.DataFrame as type hints and a StructType with type information and column names as output_schema.`

In [57]:
import pandas as pd
from snowflake.snowpark.types import IntegerType, StringType, FloatType, StructType, StructField

class multiply:
    def __init__(self):
        self.multiplier = 10
        
    def end_partition(self, df: pd.DataFrame) -> pd.DataFrame:
        df.col1 = df.col1*self.multiplier
        df.col2 = df.col2*self.multiplier
        yield df

In [58]:
multiply_udtf = session.udtf.register(
    multiply,
    output_schema=StructType([StructField("id_", StringType()), StructField("col1_", IntegerType()), StructField("col2_", FloatType())]),
    input_types=[StringType(), IntegerType(), FloatType()],
    input_names = ['"id"', '"col1"', '"col2"'],
)

In [59]:
df = session.create_dataframe([['x', 3, 35.9],['y', 9, 20.5]], schema=["id", "col1", "col2"])
df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show()

-----------------------------
|"ID_"  |"COL1_"  |"COL2_"  |
-----------------------------
|x      |30       |359.0    |
|y      |90       |205.0    |
-----------------------------



`Example 14: Same as Example 12, but does not specify input_names and instead set the column names in end_partition.`

In [60]:
from snowflake.snowpark.types import PandasDataFrameType, IntegerType, StringType, FloatType

class multiply:
    def __init__(self):
        self.multiplier = 10

    def end_partition(self, df):
        df.columns = ["id", "col1", "col2"]
        df.col1 = df.col1*self.multiplier
        df.col2 = df.col2*self.multiplier
        yield df

In [61]:
multiply_udtf = session.udtf.register(
    multiply,
    output_schema=PandasDataFrameType([StringType(), IntegerType(), FloatType()], ["id_", "col1_", "col2_"]),
    input_types=[PandasDataFrameType([StringType(), IntegerType(), FloatType()])],
)

In [62]:
df = session.create_dataframe([['x', 3, 35.9],['y', 9, 20.5]], schema=["id", "col1", "col2"])
df.select(multiply_udtf("id", "col1", "col2").over(partition_by=["id"])).sort("col1_").show()

-----------------------------
|"ID_"  |"COL1_"  |"COL2_"  |
-----------------------------
|x      |30       |359.0    |
|y      |90       |205.0    |
-----------------------------



In [63]:
# Close Snowpark session
session.close()