### Importing snowpark modules and creating snowpark session

In [1]:
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import udf, avg, col,lit,call_udf
from snowflake.snowpark.types import IntegerType, FloatType, StringType, BooleanType
import pandas as pd
from config import snowflake_udf_conn_prop

In [2]:
from snowflake.snowpark import version
print(version.VERSION)

(0, 5, 0, None)


In [3]:
session = Session.builder.configs(snowflake_udf_conn_prop).create()
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

[Row(CURRENT_WAREHOUSE()='CLUSTER1', CURRENT_DATABASE()='UDFDEMO', CURRENT_SCHEMA()='PUBLIC')]


### Create Stage,Sample Dataset

In [126]:
from snowflake.snowpark.types import IntegerType
# session.sql('drop stage pythonstage').collect()
session.sql("create or replace stage pythonudfstage").collect()
session.sql("create or replace stage pythonsourcestage").collect()
session.file.put("utils/Numops.py", "@pythonsourcestage", auto_compress=False,overwrite=True)

[PutResult(source='Numops.py', target='Numops.py', source_size=202, target_size=208, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

In [99]:
session.get_imports()
session.clear_imports()

In [150]:
df = session.create_dataframe([[10,20],[30,40],[60,70]], schema=["Col1","Col2"])
type(df)

snowflake.snowpark.dataframe.DataFrame

In [151]:
df.toPandas()

Unnamed: 0,COL1,COL2
0,10,20
1,30,40
2,60,70


In [102]:
df.select(col("Col1"),df.Col2,df['col1'].alias("col1_new")).show()

--------------------------------
|"COL1"  |"COL2"  |"COL1_NEW"  |
--------------------------------
|10      |20      |10          |
|30      |40      |30          |
--------------------------------



In [103]:
df.select(col("Col1"),df.Col2,df['col1'].alias("col1_new")).show()

--------------------------------
|"COL1"  |"COL2"  |"COL1_NEW"  |
--------------------------------
|10      |20      |10          |
|30      |40      |30          |
--------------------------------



#### Creating Temporary and Permanent Python Scalar UDF
     1. Create temp Snowpark python UDF in Snowflake
     2. Using snowpark python UDF in Snowflake Dataframe API queries.
     3. Create permanent Snowpark python UDF in Snowflake using:
         - udf()
         - session.udf.register()

         

In [104]:
# Temp UDF and calling this UDF in the below Snowpark DF queries
@udf(return_type=FloatType(),replace=True)
def MultipleNums(col1:float,col2:float)->float:
    return col1*col2

In [97]:
df.select(df.Col1,df.Col2,MultipleNums(df.col2,df.col1).alias("ProdOfNums")).show()

----------------------------------
|"COL1"  |"COL2"  |"PRODOFNUMS"  |
----------------------------------
|30      |40      |1200.0        |
|10      |20      |200.0         |
----------------------------------



In [106]:
# Creating Permanent python UDF 
@udf(name="udf_MultipleNums",return_type=FloatType(),is_permanent=True,stage_location="@pythonudfstage",replace=True)
def MultipleNums(col1:float,col2:float)->float:
    return col1*col2
    

In [107]:
# we will call the permanent UDF we have created above in SF DataFrame API quries using call_udf() which takes the 
# function name, paramters to be passed to the function.

df.select(df.Col1,df.Col2,call_udf("udf_MultipleNums", df.col1,df.col2).alias("ProdOfNums")).show()



----------------------------------
|"COL1"  |"COL2"  |"PRODOFNUMS"  |
----------------------------------
|10      |20      |200.0         |
|30      |40      |1200.0        |
----------------------------------



In [108]:
def UDFNums(col1:float,col2:float)->float:
    return col1*col2

In [109]:
session.udf.register(
    func= UDFNums,
    name="udf_Regsiter_MultipleNums",
    input_types=[FloatType(), FloatType()],
    is_permanent=True, 
    replace=True,
    stage_location="@pythonudfstage",
)

<snowflake.snowpark.udf.UserDefinedFunction at 0x7ff00bf3b400>

In [24]:
df.select(df.Col1,df.Col2,call_udf("udf_Regsiter_MultipleNums",df.col2,df.col1).alias("ProdOfNums")).show()

----------------------------------
|"COL1"  |"COL2"  |"PRODOFNUMS"  |
----------------------------------
|30      |40      |1200.0        |
|10      |20      |200.0         |
----------------------------------



In [110]:
session.sql('list @pythonsourcestage').collect()

[]

#### Using existing modules and packages in Snowflake

In [114]:
# Importing only module (Numops)

from utils.Numops import *
@udf(imports=[("utils/Numops.py", "utils/Numops")])
def lookup_function(col1:float,col2:float)->float:
    return AddNums(col1,col2)
    

In [115]:
df.select(df.Col1,df.Col2,lookup_function(df.col1,df.col2).alias("SumofValues")).show()

-----------------------------------
|"COL1"  |"COL2"  |"SUMOFVALUES"  |
-----------------------------------
|30      |40      |70.0           |
|10      |20      |30.0           |
-----------------------------------



In [152]:
from utils.Numops import *
@udf(name="udf_LookupFunction",is_permanent=True,stage_location="@pythonudfstage",replace=True,imports=[("utils/Numops.py", "utils/Numops")])
def lookup_function(col1:float,col2:float)->float:
    return AddNums(col1,col2)
    

In [153]:
df.select(df.Col1,df.Col2,call_udf("udf_LookupFunction",df.col1,df.col2).alias("SumofValues")).show()

-----------------------------------
|"COL1"  |"COL2"  |"SUMOFVALUES"  |
-----------------------------------
|10      |20      |30.0           |
|60      |70      |130.0          |
|30      |40      |70.0           |
-----------------------------------



In [120]:
session.sql("drop stage if exists newstage").collect()

[Row(status='NEWSTAGE successfully dropped.')]

In [154]:
session.sql("create or replace stage newstage").collect()

[Row(status='Stage area NEWSTAGE successfully created.')]

In [155]:
from utils.SampleCalc.getValues import basiccal as bs
import sys
session.add_import("utils")
# session.add_import("utils/SampleCalc/getValues.py", import_path="utils.SampleCalc.getValues")
# session.add_import("utils/MathCalculations.py", import_path="utils.MathCalculations")

@udf(name="udf_basiccalFunction",is_permanent=True,stage_location="@newstage",replace=True)
def lookup_function2(col1:float,col2:float,funcname:str)->float:
        val=bs(col1,col2,funcname)
        return val

In [123]:
df.select(df.Col1,df.Col2,call_udf("udf_basiccalFunction",df.col1,df.col2,"MulNums").alias("CalcValues") \
          # ,call_udf("udf_basiccalFunction",df.col1,df.col2,"AddNums").alias("AddValues"),\
          # ,call_udf("udf_basiccalFunction",df.col1,df.col2,"DivNums").alias("DivValues")
         ).show()

--------------------------------------------------------------
|"COL1"  |"COL2"  |"CALCVALUES"  |"ADDVALUES"  |"DIVVALUES"  |
--------------------------------------------------------------
|10      |20      |200.0         |30.0         |0.5          |
|30      |40      |1200.0        |70.0         |0.75         |
--------------------------------------------------------------



In [124]:
ops_udf = session.udf.register_from_file \
            ( \
                name="udf_AddNums",
                file_path="utils/Numops.py",
                func_name="AddNums",
                is_permanent=True,
                stage_location="@pythonudfstage",
                input_types=[IntegerType(),FloatType()],
                return_type=FloatType(),
                replace=True
            )

In [55]:
df.select(df.Col1,df.Col2,ops_udf(df.col1,df.col2).alias("SumOfNums")).show()

---------------------------------
|"COL1"  |"COL2"  |"SUMOFNUMS"  |
---------------------------------
|10      |20      |30.0         |
|30      |40      |70.0         |
---------------------------------



In [160]:
ops_stage_udf = session.udf.register_from_file \
            ( \
                name="udf_AddNums_stage",
                file_path="@pythonsourcestage/Numops.py",
                func_name="AddNums",
                is_permanent=True,
                stage_location="@pythonudfstage",
                input_types=[IntegerType(),FloatType()],
                return_type=FloatType(),
                replace=True
            )

In [128]:
session.sql('list @pythonudfstage').collect()

[Row(name='pythonudfstage/110a53a7991f246ebd3b562f19b14dda/utils.zip', size=2288, md5='cb6bad09e4269e6378988036ba883408', last_modified='Mon, 2 May 2022 08:21:41 GMT'),
 Row(name='pythonudfstage/c24df528a7513b79ddf85ee7df8a2006/Numops.py.zip', size=336, md5='d993f6df7c6122ce43de56856734755c', last_modified='Mon, 2 May 2022 08:19:36 GMT'),
 Row(name='pythonudfstage/udf_AddNums/Numops.py', size=208, md5='1365fd0afeadd3d94e4d8021e7c2cfd7', last_modified='Mon, 2 May 2022 08:21:42 GMT')]

In [161]:
df.select(df.Col1,df.Col2,ops_stage_udf(df.col1,df.col2).alias("SumOfNums")).show()

---------------------------------
|"COL1"  |"COL2"  |"SUMOFNUMS"  |
---------------------------------
|30      |40      |70.0         |
|60      |70      |130.0        |
|10      |20      |30.0         |
---------------------------------



In [131]:
div_udf = session.udf.register_from_file \
            ( \
                name="udf_DivNums",
                file_path="@pythonsourcestage/Numops.py",
                func_name="DivNums",
                is_permanent=True,
                stage_location="@pythonudfstage",
                input_types=[FloatType(),FloatType()],
                return_type=FloatType(),
                replace=True
            )

In [132]:
df.select(df.Col1,df.Col2,div_udf(df.col1,df.col2).alias("DivOfNums(Col1/Col2)")).toPandas()

Unnamed: 0,COL1,COL2,DivOfNums(Col1/Col2)
0,10,20,0.5
1,30,40,0.75


In [133]:
session.sql("select * from information_schema.functions").toPandas()

Unnamed: 0,FUNCTION_CATALOG,FUNCTION_SCHEMA,FUNCTION_NAME,FUNCTION_OWNER,ARGUMENT_SIGNATURE,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH,CHARACTER_OCTET_LENGTH,NUMERIC_PRECISION,NUMERIC_PRECISION_RADIX,...,IS_EXTERNAL,API_INTEGRATION,CONTEXT_HEADERS,MAX_BATCH_ROWS,REQUEST_TRANSLATOR,RESPONSE_TRANSLATOR,COMPRESSION,IMPORTS,HANDLER,TARGET_PATH
0,UDFDEMO,PUBLIC,SNOWPARK_TEMP_FUNCTION_1JEW784TXD,ACCOUNTADMIN,"(ARG1 FLOAT, ARG2 FLOAT)",FLOAT,,,,,...,NO,,,,,,,"[""@UDFDEMO.PUBLIC.SNOWPARK_TEMP_STAGE_DLSEP0J6...",compute,
1,UDFDEMO,PUBLIC,SNOWPARK_TEMP_FUNCTION_9QDV2AMME6,ACCOUNTADMIN,"(ARG1 FLOAT, ARG2 FLOAT)",FLOAT,,,,,...,NO,,,,,,,,compute,
2,UDFDEMO,PUBLIC,SNOWPARK_TEMP_FUNCTION_CP6CJQAVI7,ACCOUNTADMIN,"(ARG1 FLOAT, ARG2 FLOAT)",FLOAT,,,,,...,NO,,,,,,,,compute,
3,UDFDEMO,PUBLIC,SNOWPARK_TEMP_FUNCTION_FHRGOF050D,ACCOUNTADMIN,"(ARG1 FLOAT, ARG2 FLOAT)",FLOAT,,,,,...,NO,,,,,,,"[""@UDFDEMO.PUBLIC.SNOWPARK_TEMP_STAGE_DLSEP0J6...",compute,
4,UDFDEMO,PUBLIC,SNOWPARK_TEMP_FUNCTION_HUTMQRSKCL,ACCOUNTADMIN,"(ARG1 FLOAT, ARG2 FLOAT)",FLOAT,,,,,...,NO,,,,,,,"[""@UDFDEMO.PUBLIC.SNOWPARK_TEMP_STAGE_DLSEP0J6...",compute,
5,UDFDEMO,PUBLIC,SNOWPARK_TEMP_FUNCTION_LDG4QW4B61,ACCOUNTADMIN,"(ARG1 FLOAT, ARG2 FLOAT)",FLOAT,,,,,...,NO,,,,,,,"[""@UDFDEMO.PUBLIC.SNOWPARK_TEMP_STAGE_DLSEP0J6...",compute,
6,UDFDEMO,PUBLIC,SNOWPARK_TEMP_FUNCTION_Q4BWKVKMK5,ACCOUNTADMIN,"(ARG1 FLOAT, ARG2 FLOAT)",FLOAT,,,,,...,NO,,,,,,,"[""@UDFDEMO.PUBLIC.SNOWPARK_TEMP_STAGE_DLSEP0J6...",compute,
7,UDFDEMO,PUBLIC,SNOWPARK_TEMP_FUNCTION_Q4OXZ053BR,ACCOUNTADMIN,(ARG1 FLOAT),FLOAT,,,,,...,NO,,,,,,,"[""@UDFDEMO.PUBLIC.SNOWPARK_TEMP_STAGE_DLSEP0J6...",compute,
8,UDFDEMO,PUBLIC,SNOWPARK_TEMP_FUNCTION_QQLYQV27T6,ACCOUNTADMIN,"(ARG1 FLOAT, ARG2 FLOAT)",FLOAT,,,,,...,NO,,,,,,,,compute,
9,UDFDEMO,PUBLIC,SNOWPARK_TEMP_FUNCTION_SPFRC0HPVB,ACCOUNTADMIN,"(ARG1 FLOAT, ARG2 FLOAT)",FLOAT,,,,,...,NO,,,,,,,"[""@UDFDEMO.PUBLIC.SNOWPARK_TEMP_STAGE_DLSEP0J6...",compute,


In [None]:
session.sql("list @pythonudfstage").collect()

In [134]:
session.get_imports()
# session.clear_imports()

['/Users/praj/Documents/GitHub/snowpark-python-examples/Fraud-Detection-Demo/utils']

In [None]:
session.file.get("@newstage/8abbbb3422c34e6d24fcbe8094ee740d/MathCalculations.py.zip", "/Users/praj/Documents/")

In [137]:
session.sql('list @newstage').collect()

[Row(name='newstage/110a53a7991f246ebd3b562f19b14dda/utils.zip', size=2288, md5='872a48b833f28b2d663b1a1c9e09c6b0', last_modified='Mon, 2 May 2022 08:21:06 GMT')]

### Including Packages

In [63]:
from snowflake.snowpark.functions import udf
import numpy as np
import math
@udf(packages=["numpy"])
def exp_udf(x: float) -> float:
    return np.exp(x)


In [64]:
df.select(exp_udf("Col1")).show()

------------------------------------------------------
|"""UDFDEMO"".""PUBLIC"".SNOWPARK_TEMP_FUNCTION_...  |
------------------------------------------------------
|22026.465794806718                                  |
|10686474581524.463                                  |
------------------------------------------------------



In [139]:
df11 = session.create_dataframe([f for f in range(1,6)], schema=["Col"])
print([f for f in df11.select(exp_udf("Col")).toPandas()])

['"UDFDEMO"."PUBLIC".SNOWPARK_TEMP_FUNCTION_Q4OXZ053BR("COL")']


In [140]:
function_name=[f.split('.')[2] for f in df11.select(exp_udf("Col")).toPandas()][0].split('(')[0]
print(f"Temp Function Name = {function_name}")

function_query=f"select * from information_schema.functions  where function_name='{function_name}'"

red= session.sql(function_query).toPandas()
red['ARGUMENT_SIGNATURE']

Temp Function Name = SNOWPARK_TEMP_FUNCTION_Q4OXZ053BR


0    (ARG1 FLOAT)
Name: ARGUMENT_SIGNATURE, dtype: object

In [141]:
# function_name=df.select(exp_udf("Col")).toPandas().columns[0]
ddl_query=f"select get_ddl('function', '{function_name}(float)')" 
print(ddl_query)
session.sql(ddl_query).collect()
# session.sql(function_query).toPandas()

select get_ddl('function', 'SNOWPARK_TEMP_FUNCTION_Q4OXZ053BR(float)')


[Row(GET_DDL('FUNCTION', 'SNOWPARK_TEMP_FUNCTION_Q4OXZ053BR(FLOAT)')='CREATE OR REPLACE FUNCTION "SNOWPARK_TEMP_FUNCTION_Q4OXZ053BR"("ARG1" FLOAT)\nRETURNS FLOAT\nLANGUAGE PYTHON\nRUNTIME_VERSION = \'3.8\'\nPACKAGES = (\'numpy\',\'cloudpickle==2.0.0\')\nHANDLER = \'compute\'\nIMPORTS = (\'@UDFDEMO.PUBLIC.SNOWPARK_TEMP_STAGE_DLSEP0J6AU/110a53a7991f246ebd3b562f19b14dda/utils.zip\')\nAS \'\nimport pickle\n\nfunc = pickle.loads(bytes.fromhex(\'\'8005951a020000000000008c17636c6f75647069636b6c652e636c6f75647069636b6c65948c0d5f6275696c74696e5f747970659493948c0a4c616d6264615479706594859452942868028c08436f6465547970659485945294284b014b004b004b014b034b43430a7400a0017c00a1015300944e85948c026e70948c036578709486948c01789485948c4e2f7661722f666f6c646572732f73642f6a36676a707a6d64306762396c6a7a686d6c737a7873633830303030676e2f542f6970796b65726e656c5f38373130352f323236333130343037312e7079948c076578705f756466944b0443020002942929749452947d94288c0b5f5f7061636b6167655f5f944e8c085f5f6e616d655f5f948c085f5f6d61

In [142]:
session.sql("create stage if not exists pythonstage").collect()
session.file.put("utils/MathCalculations.py", "@pythonstage", auto_compress=False,overwrite=True)

[PutResult(source='MathCalculations.py', target='MathCalculations.py', source_size=262, target_size=272, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

In [143]:
from utils.MathCalculations import BasicOperations as bo
"""
Value for Operations paramter:
                        Add
                        Sub
                        Mul
                        Div
"""
operationfunctiondict ={ \
                        'Add':'AddNums',
                        'Sub':'SubNums',
                        'Mul':'MulNums',
                        'Div':'DivNums'
                        }

def performcalculation(val1:float, val2:float,operation:str)-> object:
    res={}
    if operation in ('Add','Sub','Mul','Div'):
        res['FunctionName']=operationfunctiondict[operation]
        ops=getattr(bo,operationfunctiondict[operation]) 
        res['value']=ops(val1,val2)
    else:
        res = None
    return res


In [74]:
print(performcalculation(10,20,'Mul'))

{'FunctionName': 'MulNums', 'value': 200}


In [75]:
# session.add_import("utils/MathCalculations.py", import_path="utils/MathCalculations")

In [146]:
@udf(name='udf_getoperationJson',stage_location='@pythonstage',replace=True,is_permanent=True)
def getoperationJson(v1: float,v2:float,operation:str) -> dict:
    return performcalculation(v1,v2,operation)

In [163]:
df.toPandas()

Unnamed: 0,COL1,COL2
0,10,20
1,30,40
2,60,70


In [162]:
df.select(df.Col1,df.Col2,udf_getoperationJson(df.col1,df.col2,lit('Add')).alias("AddJson")).toPandas()

Unnamed: 0,COL1,COL2,ADDJSON
0,10,20,"{\n ""FunctionName"": ""AddNums"",\n ""value"": 30\n}"
1,30,40,"{\n ""FunctionName"": ""AddNums"",\n ""value"": 70\n}"
2,60,70,"{\n ""FunctionName"": ""AddNums"",\n ""value"": 13..."


In [164]:
df_res=df.select(df.Col1,df.Col2,udf_getoperationJson(df.col1,df.col2,lit('Add')).alias("AddJson"), \
          udf_getoperationJson(df.col1,df.col2,lit('Mul')).alias("MultiplyJson"))

In [165]:
df_res.toPandas()

Unnamed: 0,COL1,COL2,ADDJSON,MULTIPLYJSON
0,10,20,"{\n ""FunctionName"": ""AddNums"",\n ""value"": 30\n}","{\n ""FunctionName"": ""MulNums"",\n ""value"": 20..."
1,30,40,"{\n ""FunctionName"": ""AddNums"",\n ""value"": 70\n}","{\n ""FunctionName"": ""MulNums"",\n ""value"": 12..."
2,60,70,"{\n ""FunctionName"": ""AddNums"",\n ""value"": 13...","{\n ""FunctionName"": ""MulNums"",\n ""value"": 42..."


In [85]:
df_res.write.mode("overwrite").saveAsTable("UDF_TABLE")

In [166]:
session.sql("select * from UDF_TABLE;").toPandas()

Unnamed: 0,COL1,COL2,ADDJSON,MULTIPLYJSON
0,30,40,"{\n ""FunctionName"": ""AddNums"",\n ""value"": 70\n}","{\n ""FunctionName"": ""MulNums"",\n ""value"": 12..."
1,10,20,"{\n ""FunctionName"": ""AddNums"",\n ""value"": 30\n}","{\n ""FunctionName"": ""MulNums"",\n ""value"": 20..."


In [167]:
session.sql("select COL1,COL2, \
                ADDJSON:value::int as SumofValues,\
                multiplyjson:value::int as ProductofValues \
             from UDF_TABLE;").toPandas()

Unnamed: 0,COL1,COL2,SUMOFVALUES,PRODUCTOFVALUES
0,30,40,70,1200
1,10,20,30,200
