# Snowpark Basics HoL Part 4 - Stored Procedures and Functions

## 4.1 Setup

### Imports

In [None]:
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T

import sys
import json
import pandas as pd
import numpy as np

# Make sure we do not get line breaks when doing show on wide dataframes
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

### Create Snowpark Session

In [None]:
with open('creds.json') as f:
    connection_parameters = json.load(f)

In [None]:
session = Session.builder.configs(connection_parameters).create()
print(f"Current Database and schema: {session.get_fully_qualified_current_schema()}")
print(f"Current Warehouse: {session.get_current_warehouse()}")

### Create Stages
These will be used below.

In [None]:
session.sql('CREATE OR REPLACE STAGE PROCSTAGE').collect()

In [None]:
session.sql('CREATE OR REPLACE STAGE UDFSTAGE').collect()

### Snowpark DataFrames from Tables

In [None]:
# Creating a Snowpark DataFrame
snowpark_truck_df = session.table('TRUCK')
snowpark_header_df = session.table('ORDER_HEADER')
snowpark_detail_df = session.table('ORDER_DETAIL')
snowpark_location_df = session.table('LOCATION')
snowpark_menu_df = session.table('MENU')

## 4.2 Using Stored Procedures
Stored procedures may seem less natural to typical Python users, but they can be useful for several reasons:
<br>They provide a way to capture and run a set of Python commands, potentially with parameterisation.
They can include all sorts of complex Python logic, and access code and other files stored in Snowflake stages.
They run server-side, so if they use functionality such as Pandas, or are training a model which takes place outside the SQL engine, the CPU and memory is server-side not client.

One consideration is that procedures themselves run single node. However, if they invoke dataframe API processing including UDFs, that processing can parallelize across multiple nodes.

### Stored Procedure as Script
Let's turn the Part 2 solution into a SP which takes a Menu Item Category, Year and Month and creates a summary table for that combination. 

First we define the Python function. Note that in this code the indentation becomes important.

In [None]:
def category_sproc(session: Session,  category: str, selectyear: int, selectmonth: int)-> None:
    # Define table based dataframes
    header_df = session.table('RAW_POS.ORDER_HEADER').select(['ORDER_ID', 'LOCATION_ID', 'ORDER_TS'])
    detail_df = session.table('RAW_POS.ORDER_DETAIL').select(['ORDER_DETAIL_ID','ORDER_ID','MENU_ITEM_ID','QUANTITY'])
    location_df = session.table('RAW_POS.LOCATION').select(['LOCATION_ID','ISO_COUNTRY_CODE'])
    menu_df = session.table('RAW_POS.MENU').select(['MENU_ITEM_ID','MENU_ITEM_NAME','ITEM_CATEGORY'])

    # Filter based on inputs
    header_df = header_df.filter((F.year(F.col('ORDER_TS'))== selectyear ) & (F.month(F.col('ORDER_TS'))==selectmonth))
    menu_df = menu_df.filter(F.col('ITEM_CATEGORY') == category) 
    
    # Combine and aggregate
    combined_df = detail_df.join(header_df,"ORDER_ID").join(menu_df,"MENU_ITEM_ID").join(location_df,"LOCATION_ID")
    output_df = combined_df.groupBy(['MENU_ITEM_ID', 'MENU_ITEM_NAME', 'ISO_COUNTRY_CODE']).agg(F.sum('QUANTITY').alias('TOTAL_QUANTITY'))
    
    # Set up table name and write to table
    outputtable = category.upper() + str(selectyear) + str(selectmonth).zfill(2)
    output_df.write.save_as_table(table_name=outputtable, mode='overwrite')

Next we register the function as a stored procedure. Note that for a permanent stored procedure you must name a stage. 
(You may want to watch this in Snowsight Query History.)

In [None]:
category_sproc = session.sproc.register(
    func=category_sproc, # the name of the function (see cell above) 
    name='category_sproc', # the name of the function once stored in SF
    is_permanent=True, # store it permanently?
    replace=True, # replace anything that was there under this name
    stage_location='@PROCSTAGE', # the stage where we store it
    packages=['snowflake-snowpark-python'],) # the packages the function uses

Finally we call the procedure. Again you may want to try this directly inside Snowsight from SQL - just ignore the session parameter.

In [None]:
category_sproc(session, 'Dessert', 2022, 4)
session.table('DESSERT202204').show()

## 4.3 Using Functions

### Calling Built-In Functions
While many standard built-in functions in Snowflake have matching Snowpark **functions** methods, given the rate at which Snowflake adds new functionality, there will always be functions you might need outside that list.
<br>For these you can use **functions.call_builtin** or **functions.call_function**.  

Let's use this to try a simple geospatial function. We'll start from a different Tasty Bytes view which brings together enhanced location data and cross-join it with itself.

In [None]:
location_df1 = session.table('ANALYTICS.LOCATION_DETAIL_V').select('LOCATION_ID','LOCATION_NAME', 'LATITUDE', 'LONGITUDE', 'CITY')\
                        .filter(F.col('CITY') == 'Paris')
location_df1.show(5)
location_df2 = location_df1

In [None]:
locationpairs_df = location_df1.cross_join(location_df2, lsuffix='_DF1',rsuffix='_DF2').filter(F.col('LOCATION_ID_DF1') > F.col('LOCATION_ID_DF2'))
locationpairs_df.columns

Now let's invoke the ST_MAKEPOINT and ST_DISTANCE functions to find the proximity between locations, and then display the locations closest to each other.

In [None]:
locationdist_df = locationpairs_df.select(F.col('LOCATION_NAME_DF1'), F.col('LOCATION_NAME_DF2'), 
                    F.call_builtin('ST_MAKEPOINT',F.col('LONGITUDE_DF1'), F.col('LATITUDE_DF1')).alias('GEO_POINT_DF1'),
                    F.call_builtin('ST_MAKEPOINT',F.col('LONGITUDE_DF2'), F.col('LATITUDE_DF2')).alias('GEO_POINT_DF2'))
locationdist_df = locationdist_df.select(F.col('LOCATION_NAME_DF1'), F.col('LOCATION_NAME_DF2'), 
                     (F.call_builtin('ST_DISTANCE',F.col('GEO_POINT_DF1'), F.col('GEO_POINT_DF2'))/1000).alias('DISTANCE_KM'))
locationdist_df = locationdist_df.select(['LOCATION_NAME_DF1', 'LOCATION_NAME_DF2','DISTANCE_KM']).sort(F.col('DISTANCE_KM').desc()).show()

However, we can simplify this further. The **functions.function** method provides a way to locally name that function.

In [None]:
geo_makepoint = F.function('ST_MAKEPOINT')
geo_distance = F.function('ST_DISTANCE')

locationdist_df = locationpairs_df.select(F.col('LOCATION_NAME_DF1'), F.col('LOCATION_NAME_DF2'), 
                    geo_makepoint(F.col('LONGITUDE_DF1'), F.col('LATITUDE_DF1')).alias('GEO_POINT_DF1'),
                    geo_makepoint(F.col('LONGITUDE_DF2'), F.col('LATITUDE_DF2')).alias('GEO_POINT_DF2'))
locationdist_df = locationdist_df.select(F.col('LOCATION_NAME_DF1'), F.col('LOCATION_NAME_DF2'), 
                     (geo_distance(F.col('GEO_POINT_DF1'), F.col('GEO_POINT_DF2'))/1000).alias('DISTANCE_KM'))
locationdist_df = locationdist_df.select(['LOCATION_NAME_DF1', 'LOCATION_NAME_DF2','DISTANCE_KM']).sort(F.col('DISTANCE_KM').desc()).show()

### Existing UDFs
Let's capture the distance calculation above in a SQL UDF.  Note that both SQL and Python UDFs can be defined in 'SQL' CREATE FUNCTION.

In [None]:
session.sql("""
            create or replace function longlatdistance (long1 float, lat1 float, long2 float, lat2 float) returns float
            as 
            'st_distance(st_makepoint(long1, lat1),st_makepoint(long2, lat2))' 
            """).collect()

Now we can call that udf using **functions.call_udf**.

In [None]:
locationdist_df = locationpairs_df.select(F.col('LOCATION_NAME_DF1'), F.col('LOCATION_NAME_DF2'),
            (F.call_udf("longlatdistance", F.col('LONGITUDE_DF1'), F.col('LATITUDE_DF1'),F.col('LONGITUDE_DF2'), F.col('LATITUDE_DF2'))/1000)
                .alias('DISTANCE_KM'))
locationdist_df.sort(F.col('DISTANCE_KM').desc()).show()

## 4.4 Creating Functions

### Creating a Python UDF in SQL
We can define a Python UDF inline within SQL. This code could also be run in a Snowsight SQL Worksheet.
<br>(Note that in the real world this very simple example, which could be run in a SQL UDF, is not necessarily a good use of Python UDF capabilities, as it forces the data through the Python UDF, whereas the optimizer can 'extract' and combine SQL from a SQL UDF with other SQL Clauses.)

In [None]:
session.sql("""
CREATE OR REPLACE FUNCTION profit_margin(
 cost decimal(38,4)
,sale decimal(38,4)
           )
returns decimal(38,2) not null
language python
runtime_version = '3.9'
handler = 'profit_margin'
as
$$    
import decimal
def profit_margin(
    cost: decimal.Decimal
  , sale: decimal.Decimal
    ):
    if cost != 0:
        return round(((sale - cost)/cost)*100,2)
    else:
        return 0
$$
;

            """).collect()

In [None]:
menu_df = snowpark_menu_df.select(F.col('MENU_ID'), F.col('COST_OF_GOODS_USD'), F.col('SALE_PRICE_USD'), 
                                  F.call_udf('profit_margin', F.col('COST_OF_GOODS_USD'), F.col('SALE_PRICE_USD')).alias('PROFIT_MARGIN'))
menu_df.show(20)

### Creating a Python UDF in Snowpark
We can also define the UDF in Snowpark.

In [None]:
# Define Python function locally
import decimal
def profit_margin(
    cost: decimal.Decimal
  , sale: decimal.Decimal
    ):
    if cost != 0:
        return round(((sale - cost)/cost)*100,2)
    else:
        return 0

Then we need to register the UDF. 
<br>Note that we need to use the types from snowflake.snowpark.types, and that input_types expects a list, even if it contains just one element.
<br>The stage location has to be provided, but if the generated code is small enough to be stored inline in metadata, you may see nothing added to the stage.

In [None]:
# Upload UDF to Snowflake
session.udf.register(
    func = profit_margin
  , return_type = T.DecimalType(38,2)
  , input_types = [T.DecimalType(38,4), T.DecimalType(38,4)]
  , is_permanent = True
  , name = 'profit_margin'
  , replace = True
  , stage_location = '@UDFSTAGE'
)

In [None]:
menu_df = snowpark_menu_df.select(F.col('MENU_ID'), F.col('COST_OF_GOODS_USD'), F.col('SALE_PRICE_USD'), 
                                  F.call_udf('profit_margin', F.col('COST_OF_GOODS_USD'), F.col('SALE_PRICE_USD')).alias('PROFIT_MARGIN'))
menu_df.show(20)

### Using the @udf Decorator
Finally, we can do the same using a Python decorator approach. You may see this in code examples.

In [None]:
import decimal
from snowflake.snowpark.functions import udf

@udf(return_type = T.DecimalType(38,2), input_types = [T.DecimalType(38,4), T.DecimalType(38,4)], 
     is_permanent = True, name = "profit_margin", replace = True, stage_location = '@UDFSTAGE', session=session)
def profit_margin(cost: decimal.Decimal, sale: decimal.Decimal) -> decimal.Decimal:
    if cost != 0:
        return round(((sale - cost)/cost)*100,2)
    else:
        return 0

In [None]:
menu_df = snowpark_menu_df.select(F.col('MENU_ID'), F.col('COST_OF_GOODS_USD'), F.col('SALE_PRICE_USD'), 
                                  F.call_udf('profit_margin', F.col('COST_OF_GOODS_USD'), F.col('SALE_PRICE_USD')).alias('PROFIT_MARGIN'))
menu_df.show(20)

### Vectorised UDFs
If you examine the Query Profile of the previous query, you will see that the UF appears as an Extension Function, and there are some Statistics including Total Python UDF handler invocations: 20 and Total Python UDF rows processed: 20
<br>In this case, the ms timings are tiny, but if we were processing large numbers of rows, or even using a Pandas library designed to process batches of data efficiently, we can set up the UDF to run with batches at a time, using the Pandas API.
<br>Note that this example uses a lambda function to bypass decimal division by zero issues.

In [None]:
import decimal
from snowflake.snowpark.functions import udf

@udf(return_type = T.DecimalType(38,2), input_types = [T.DecimalType(38,4), T.DecimalType(38,4)], 
     is_permanent = True, name = "profit_margin_batch", replace = True, stage_location = '@UDFSTAGE', session=session)
def profit_margin_batch1(pdf: T.PandasDataFrame [decimal.Decimal, decimal.Decimal]) -> T.PandasSeries[decimal.Decimal]:
    pdf.columns = ["cost", "sale"]
    pdf["result"] = (pdf.apply(lambda x: (x['sale'] - x['cost']) / x['cost'] if x['cost'] != 0 else decimal.Decimal('NaN'), axis=1))*100
    return pdf["result"]

In [None]:
menu_df = snowpark_menu_df.select(F.col('MENU_ID'), F.col('COST_OF_GOODS_USD'), F.col('SALE_PRICE_USD'), 
                                  F.call_udf('profit_margin_batch', F.col('COST_OF_GOODS_USD'), F.col('SALE_PRICE_USD')).alias('PROFIT_MARGIN'))
menu_df.show(20)

If you examine the Query Profile of the previous query, you will see that the UDF appears as an Extension Function, and there are some Statistics including Total Python UDF handler invocations: 1 and Total Python UDF rows processed: 20.
<br> If we wanted to process all the rows the number of rows per invocation would be even higher. You can set a maximum batch size to avoid the UDF invocation timing out at 60 seconds.

## 4.X YOUR TURN!

Here is the challenge: 
<br>You realise that the line in the solution to part 1
<br>`F.concat(F.to_char(F.date_part("year",'ORDER_TS')), F.to_char(F.date_part("month",'ORDER_TS'),'FM09'))`
<br>could also be written generically in Python as:
<br>`return str(ts.year) + str(ts.month)`
where ts is the datetime type.  
<br>(If that was already part of your solution to Part 1, congratulations!)
<br>Create and register a Python UDF char_month to implement this and reproduce the answer to Part 1 using this. Start by separately defining a function and registering it. Then move on to decorators and vectorized UDFs if you wish...
<br>Hint: you will need to import datetime from datetime

### Define the function



In [None]:
# Define Python function locally
from datetime import datetime
def char_month(
    ts: datetime
    ):
    return str(ts.year) + str(ts.month)

### Register the function



In [None]:
from snowflake.snowpark.types import StringType, TimestampType

# Upload UDF to Snowflake
session.udf.register(
    func = char_month
  , return_type = T.StringType()
  , input_types = [T.TimestampType()]
  , is_permanent = True
  , name = 'char_month'
  , replace = True
  , stage_location = '@UDFSTAGE'
)


### Test with Order Header

In [None]:
exercise_header_df = session.table("ORDER_HEADER")
header_df1 = exercise_header_df.select(F.col('ORDER_ID'), F.col('LOCATION_ID'), 
        F.col('ORDER_AMOUNT').cast(T.DecimalType(36,2)).alias("ORDER_AMOUNT"),
        F.call_udf('char_month',F.col('ORDER_TS')).alias('ORDER_MONTH'))
header_df1.show()

In [None]:
session.close()