## Lab 3 - Vectorized UDTFs for Batching

### Setup

Below are the imports needed for this lab.  They have been included in the Anaconda packages provided by Snowflake


In [None]:
# Importing snowpark libraries
import snowflake.snowpark
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import avg, stddev, udf
from snowflake.snowpark.types import FloatType, StringType, PandasSeries

# Others
import json
import re
import pandas as pd
from datetime import datetime, timedelta

Next, import snowpark and verify by printing the version.

In [None]:
from snowflake.snowpark import version
print(version.VERSION)

Connect to active Snowflake session.

In [None]:
#This is the same as in previous sections of this lab
from snowflake.snowpark.context import get_active_session
session = get_active_session()


# Testing the session
session.sql("CREATE DATABASE IF NOT EXISTS SNOWPARK_BEST_PRACTICES_LABS")
session.sql("SELECT current_warehouse(), current_database(), current_schema()").show()

Before continuing, we will disable caching to ensure proper performance comparisons.

In [None]:
session.sql("ALTER SESSION SET USE_CACHED_RESULT = FALSE").collect()

### Create Dataframes

As mentioned earlier, our data (TPCH dataset) is available from Snowflake via an inbound data share. We are taking Customers datasets into consideration with 2 different sizes.

Let's create all of these datasets as dataframes.

In [None]:
df_customer_100 = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer")
df_customer_1000 = session.table("SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer")
df_customer_100.limit(2).show()

Next, let's check on the data volumes we will be working with.

In [None]:
# Let's quickly check how many records we are going to play with
print(f"df_customer_100 = {str(df_customer_100.count()/1000000)} M, df_customer_1000 = {str(df_customer_1000.count()/1000000)} M")

### Setup warehouse
- As mentioned earlier, we will be using 2 different Warehouse Sizes:
    - For *_100 datasets, we will use Small and Medium Warehouse sizes
    - For *_1000 datasets, we will use Medium and Large Warehouse sizes

In [None]:
# Let's change the warehouse size as required
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Small'").collect()

# Let's check the current size of the warehouse
session.sql("SHOW WAREHOUSES LIKE 'COMPUTE_WH'").collect()
session.sql('SELECT "name", "size" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))').show()

### Usecase 1: Numerical Computation using Regular UDTF

- We have a simple numerical computation use case wherein, we compute the mean and standard deviation using a regular UDTF.
- In addition, we are also checking for NULL values and ensuring they are removed in order to get the right results.

In [None]:
session.sql("""
CREATE OR REPLACE FUNCTION compute_mean_stddev_udtf(c_acctbal NUMBER(12, 2))
RETURNS TABLE (mean_acctbal NUMBER(12, 2), stddev_acctbal NUMBER(12, 2))
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
PACKAGES=('numpy')
HANDLER='ComputeMeanStddevUDTF'
AS $$
import numpy as np

class ComputeMeanStddevUDTF:
    def __init__(self):
        self.acctbal_values = []

    def process(self, c_acctbal):
        # Collect each row's c_acctbal value
        self.acctbal_values.append(float(c_acctbal) if c_acctbal is not None else np.nan)

    def end_partition(self):
        # Convert to numpy array and handle NaN values
        c_acctbal = np.array(self.acctbal_values)
        c_acctbal = c_acctbal[~np.isnan(c_acctbal)]

        if len(c_acctbal) == 0:
            # If there are no valid values left after removing NaNs, return NaNs for all statistics
            yield (np.nan, np.nan)
        else:
            # Compute mean
            mean_acctbal = np.mean(c_acctbal)
            # Compute standard deviation
            stddev_acctbal = np.std(c_acctbal)
            yield (round(mean_acctbal, 2), round(stddev_acctbal, 2))
$$
;
""").collect()

In [None]:
# First, let's get the results of the UDTF
session.sql("""
SELECT c_nationkey, compute_mean_stddev_udtf.mean_acctbal, compute_mean_stddev_udtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_udtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()

In [None]:

# Now let's time it
print("Regular UDTF: Numeric Computation with Small WH and 100x dataset")
st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_udtf.mean_acctbal, compute_mean_stddev_udtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_udtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()

et=datetime.now()
print(f"Total duration={(et-st).total_seconds()}")

This takes around 10-12 seconds to return the results for TPCH_SF100 customer dataset using a Small Warehouse.

### Numerical Computation using Vectorised UDTF

Next, we will perform the same numerical computation using an end partition method Vectorised UDTF.

In [None]:
session.sql("""
CREATE OR REPLACE FUNCTION compute_mean_stddev_evudtf(c_acctbal NUMBER(12, 2))
RETURNS TABLE (mean_acctbal NUMBER(12, 2), stddev_acctbal NUMBER(12, 2))
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
PACKAGES=('numpy', 'pandas')
HANDLER='ComputeMeanStddevVUDTF'
AS $$
import numpy as np
import pandas as pd
from _snowflake import vectorized

class ComputeMeanStddevVUDTF:
    @vectorized(input=pd.DataFrame)
    def end_partition(self, df):
        # Extract the c_acctbal column
        c_acctbal = df['C_ACCTBAL'].values.astype(float)
        
        # Handle NaN values by removing them before computation
        c_acctbal = c_acctbal[~np.isnan(c_acctbal)]

        if len(c_acctbal) == 0:
            # If there are no valid values left after removing NaNs, return NaNs for all statistics
            return pd.DataFrame([[np.nan, np.nan]], columns=['mean_acctbal', 'stddev_acctbal'])
        else:
            # Compute mean and standard deviation
            mean_acctbal = np.mean(c_acctbal)
            stddev_acctbal = np.std(c_acctbal)
            # Return the results as a DataFrame
            return pd.DataFrame([[round(mean_acctbal, 2), round(stddev_acctbal, 2)]], columns=['mean_acctbal', 'stddev_acctbal'])
$$
;
""").collect()

In [None]:
# Now let's time it
print("End parition method Vectorized UDTF: Numeric Computation with Small WH and 100x dataset")
st=datetime.now()

session.sql("""
SELECT c_nationkey, compute_mean_stddev_evudtf.mean_acctbal, compute_mean_stddev_evudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_evudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()

et=datetime.now()
print(f"Total duration={(et-st).total_seconds()}")

- This takes just short of 12 seconds to return the results for TPCH_SF100 customer dataset using a Small Warehouse.
- So, there is a marginal improvement in performance when using a Vectorised UDTF.

Next, we will perform the same numerical computation using a process method Vectorised UDTF.

In [None]:
session.sql("""
CREATE OR REPLACE FUNCTION compute_mean_stddev_pvudtf(c_acctbal NUMBER(12, 2))
RETURNS TABLE (
    mean_acctbal NUMBER(12, 2),
    stddev_acctbal NUMBER(12, 2)
)
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
PACKAGES=('numpy', 'pandas')
HANDLER='ComputeMeanStddevVUDTF'
AS $$
import numpy as np
import pandas as pd
from _snowflake import vectorized

class ComputeMeanStddevVUDTF:
    @vectorized(input=pd.DataFrame)
    def process(self, df):
        # Extract the c_acctbal column
        c_acctbal = df['C_ACCTBAL'].values.astype(float)
        
        # Handle NaN values by removing them before computation
        c_acctbal = c_acctbal[~np.isnan(c_acctbal)]

        if len(c_acctbal) == 0:
            # If there are no valid values left after removing NaNs, return NaNs for all statistics
            return pd.DataFrame([[np.nan, np.nan]], columns=['mean_acctbal', 'stddev_acctbal'])
        else:
            # Compute mean and standard deviation
            mean_acctbal = np.mean(c_acctbal)
            stddev_acctbal = np.std(c_acctbal)
            # Return the results as a DataFrame
            return pd.DataFrame([[round(mean_acctbal, 2), round(stddev_acctbal, 2)]], columns=['mean_acctbal', 'stddev_acctbal'])
$$
;
""").collect()

In [None]:
session.sql("""SELECT c_nationkey, compute_mean_stddev_pvudtf.mean_acctbal, compute_mean_stddev_pvudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_pvudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()

With the above, you should get the following error:
- 100357 (P0000): Expected 4096 rows in the output given 4096 rows in the input, but received 1 in function COMPUTE_MEAN_STDDEV_VUDTF with handler ComputeMeanStddevVUDTF

- We need to ensure that the output has the same number of rows as the input, which is different from the batch processing seen in regular UDTFs. We will modify the UDTF to return the computed statistics for each input row, though this will result in redundant values across all rows.
- This will also severely degrade performance and all in all, it is a bad choice to implement Vectorised UDTFs in this way for this use case.

In [None]:
session.sql("""
CREATE OR REPLACE FUNCTION compute_mean_stddev_pvudtf(c_acctbal NUMBER(12, 2))
RETURNS TABLE (
    mean_acctbal NUMBER(12, 2),
    stddev_acctbal NUMBER(12, 2)
)
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
PACKAGES=('numpy', 'pandas')
HANDLER='ComputeMeanStddevVUDTF'
AS $$
import numpy as np
import pandas as pd
from _snowflake import vectorized

class ComputeMeanStddevVUDTF:
    @vectorized(input=pd.DataFrame)
    def process(self, df):
        # Extract the c_acctbal column
        c_acctbal = df['C_ACCTBAL'].values.astype(float)
        
        # Handle NaN values by removing them before computation
        c_acctbal = c_acctbal[~np.isnan(c_acctbal)]

        if len(c_acctbal) == 0:
            # If there are no valid values left after removing NaNs, return NaNs for all statistics
            mean_acctbal = np.nan
            stddev_acctbal = np.nan
        else:
            # Compute mean and standard deviation
            mean_acctbal = np.mean(c_acctbal)
            stddev_acctbal = np.std(c_acctbal)
        
        # Create a DataFrame with the same number of rows as the input
        result_df = pd.DataFrame({
            'mean_acctbal': [round(mean_acctbal, 2)] * len(df),
            'stddev_acctbal': [round(stddev_acctbal, 2)] * len(df)
        })
        
        return result_df
$$
;
""").collect()

In [None]:
st=datetime.now()

session.sql("""
SELECT c_nationkey, compute_mean_stddev_pvudtf.mean_acctbal, compute_mean_stddev_pvudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_pvudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()

et=datetime.now()
print(f"Total duration={(et-st).total_seconds()}")

If you take a look at the [documentation](https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-tabular-vectorized#udtfs-with-a-vectorized-process-method), we must remove explicit partitioning for process method vectorisations. This does give a marginal improvement in performance. But overall, this is just bad practice. Head on back to the documentation to understand when it is best to implement a Process Method Vectorised UDTF. Anyway, let's rewrite the call as below:

In [None]:
st=datetime.now()

session.sql("""
SELECT c_nationkey, compute_mean_stddev_pvudtf.mean_acctbal, compute_mean_stddev_pvudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_pvudtf(c_acctbal));
""").collect()

et=datetime.now()
print(f"Total duration={(et-st).total_seconds()}")

### Numerical Computation using variable compute and dataset sizes

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()

print("Using Medium WH and 100x dataset")
st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_udtf.mean_acctbal, compute_mean_stddev_udtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_udtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Regular UDTF={(et-st).total_seconds()}")


st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_evudtf.mean_acctbal, compute_mean_stddev_evudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_evudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"End partitioned method vectorized UDTF={(et-st).total_seconds()}")


st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_pvudtf.mean_acctbal, compute_mean_stddev_pvudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_pvudtf(c_acctbal));
""").collect()
et=datetime.now()
print(f"Process method vectorized UDTF={(et-st).total_seconds()}")

In [None]:
print("Using Medium WH and 1000x dataset")

st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_udtf.mean_acctbal, compute_mean_stddev_udtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer, TABLE(compute_mean_stddev_udtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Regular UDTF={(et-st).total_seconds()}")

st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_evudtf.mean_acctbal, compute_mean_stddev_evudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer, TABLE(compute_mean_stddev_evudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"End partitioned method vectorized UDTF={(et-st).total_seconds()}")


st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_pvudtf.mean_acctbal, compute_mean_stddev_pvudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer, TABLE(compute_mean_stddev_pvudtf(c_acctbal));
""").collect()
et=datetime.now()
print(f"Process method vectorized UDTF={(et-st).total_seconds()}")

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Large'").collect()
print("Using Large WH and 1000x dataset")

st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_udtf.mean_acctbal, compute_mean_stddev_udtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer, TABLE(compute_mean_stddev_udtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Regular UDTF={(et-st).total_seconds()}")

st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_evudtf.mean_acctbal, compute_mean_stddev_evudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer, TABLE(compute_mean_stddev_evudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"End partitioned method vectorized UDTF={(et-st).total_seconds()}")


st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_pvudtf.mean_acctbal, compute_mean_stddev_pvudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer, TABLE(compute_mean_stddev_pvudtf(c_acctbal));
""").collect()
et=datetime.now()
print(f"Process method vectorized UDTF={(et-st).total_seconds()}")

### Numerical Computation running concurrent tasks with Worker Processes for Regular UDTF

In [None]:
# Let's change the warehouse size as required
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Small'").collect()

# Let's check the current size of the warehouse
session.sql("SHOW WAREHOUSES LIKE 'COMPUTE_WH'").collect()
session.sql('SELECT "name", "size" FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()))').show()

In [None]:
session.sql("""
CREATE OR REPLACE FUNCTION compute_mean_stddev_cudtf(c_acctbal NUMBER(12, 2))
RETURNS TABLE (mean_acctbal NUMBER(12, 2), stddev_acctbal NUMBER(12, 2))
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
PACKAGES=('numpy', 'joblib')
HANDLER='ComputeMeanStddevUDTF'
AS $$
import numpy as np
import joblib

class ComputeMeanStddevUDTF:
    def __init__(self):
        self.acctbal_values = []

    def process(self, c_acctbal):
        # Collect each row's c_acctbal value
        self.acctbal_values.append(float(c_acctbal) if c_acctbal is not None else np.nan)

    def end_partition(self):
        # Convert to numpy array and handle NaN values
        c_acctbal = np.array(self.acctbal_values)
        c_acctbal = c_acctbal[~np.isnan(c_acctbal)]

        if len(c_acctbal) == 0:
            # If there are no valid values left after removing NaNs, return NaNs for all statistics
            yield (np.nan, np.nan)
        else:
            # Define the function to compute mean and standard deviation
            def compute_stats(values):
                mean = np.mean(values)
                std = np.std(values)
                return mean, std
            
            # Split the data into chunks for parallel processing
            chunk_size = len(c_acctbal) // joblib.cpu_count()
            chunks = [c_acctbal[i:i+chunk_size] for i in range(0, len(c_acctbal), chunk_size)]
            
            # Run the computation in parallel using joblib
            results = joblib.Parallel(n_jobs=-1)(joblib.delayed(compute_stats)(chunk) for chunk in chunks)
            
            # Combine the results from parallel tasks
            means, stds = zip(*results)
            total_mean = np.mean(means)
            total_std = np.mean(stds)
            
            # Yield the final aggregated statistics
            yield (round(total_mean, 2), round(total_std, 2))
$$
;
""").collect()

In [None]:
# Now let's time it
print("Using Small WH and 100x dataset")
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Small'").collect()
st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_cudtf.mean_acctbal, compute_mean_stddev_cudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_cudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Concurrent tasks with worker process for Regular UDTF={(et-st).total_seconds()}")


print("Using Medium WH and 100x dataset")
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_cudtf.mean_acctbal, compute_mean_stddev_cudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_cudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Concurrent tasks with worker process for Regular UDTF={(et-st).total_seconds()}")


print("Using Medium WH and 1000x dataset")
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_cudtf.mean_acctbal, compute_mean_stddev_cudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer, TABLE(compute_mean_stddev_cudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Concurrent tasks with worker process for Regular UDTF={(et-st).total_seconds()}")


print("Using Large WH and 1000x dataset")
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Large'").collect()
st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_cudtf.mean_acctbal, compute_mean_stddev_cudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer, TABLE(compute_mean_stddev_cudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Concurrent tasks with worker process for Regular UDTF={(et-st).total_seconds()}")

Now let's override the default backend to 'loky'

In [None]:
session.sql("""
CREATE OR REPLACE FUNCTION compute_mean_stddev_cudtf(c_acctbal NUMBER(12, 2))
RETURNS TABLE (mean_acctbal NUMBER(12, 2), stddev_acctbal NUMBER(12, 2))
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
PACKAGES=('numpy', 'joblib')
HANDLER='ComputeMeanStddevUDTF'
AS $$
import numpy as np
import joblib
joblib.parallel_backend('loky')

class ComputeMeanStddevUDTF:
    def __init__(self):
        self.acctbal_values = []

    def process(self, c_acctbal):
        # Collect each row's c_acctbal value
        self.acctbal_values.append(float(c_acctbal) if c_acctbal is not None else np.nan)

    def end_partition(self):
        # Convert to numpy array and handle NaN values
        c_acctbal = np.array(self.acctbal_values)
        c_acctbal = c_acctbal[~np.isnan(c_acctbal)]

        if len(c_acctbal) == 0:
            # If there are no valid values left after removing NaNs, return NaNs for all statistics
            yield (np.nan, np.nan)
        else:
            # Define the function to compute mean and standard deviation
            def compute_stats(values):
                mean = np.mean(values)
                std = np.std(values)
                return mean, std
            
            # Split the data into chunks for parallel processing
            chunk_size = len(c_acctbal) // joblib.cpu_count()
            chunks = [c_acctbal[i:i+chunk_size] for i in range(0, len(c_acctbal), chunk_size)]
            
            # Run the computation in parallel using joblib
            results = joblib.Parallel(n_jobs=-1)(joblib.delayed(compute_stats)(chunk) for chunk in chunks)
            
            # Combine the results from parallel tasks
            means, stds = zip(*results)
            total_mean = np.mean(means)
            total_std = np.mean(stds)
            
            # Yield the final aggregated statistics
            yield (round(total_mean, 2), round(total_std, 2))
$$
;
""").collect()

In [None]:
# Now let's time it
print("loky backend: Using Small WH and 100x dataset")
st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_cudtf.mean_acctbal, compute_mean_stddev_cudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_cudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Concurrent tasks with worker process for Regular UDTF={(et-st).total_seconds()}")


print("Using Medium WH and 100x dataset")
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_cudtf.mean_acctbal, compute_mean_stddev_cudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.customer, TABLE(compute_mean_stddev_cudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Concurrent tasks with worker process for Regular UDTF={(et-st).total_seconds()}")


print("Using Medium WH and 1000x dataset")
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Medium'").collect()
st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_cudtf.mean_acctbal, compute_mean_stddev_cudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer, TABLE(compute_mean_stddev_cudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Concurrent tasks with worker process for Regular UDTF={(et-st).total_seconds()}")

print("Using Large WH and 1000x dataset")
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Large'").collect()
st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_cudtf.mean_acctbal, compute_mean_stddev_cudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer, TABLE(compute_mean_stddev_cudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Concurrent tasks with worker process for Regular UDTF={(et-st).total_seconds()}")

In [None]:
print("Using Snowpark Optimized WH and 1000x dataset")
session.sql("CREATE OR REPLACE WAREHOUSE snowpark_opt_wh WITH warehouse_size='MEDIUM' warehouse_type='SNOWPARK-OPTIMIZED';").collect()

In [None]:
print("Using Snowpark Optimized WH and 1000x dataset")
session.sql("CREATE OR REPLACE WAREHOUSE snowpark_opt_wh WITH warehouse_size='MEDIUM' warehouse_type='SNOWPARK-OPTIMIZED';").collect()

print("Using Snowpark Optimized Medium WH and 1000x dataset")
session.sql("ALTER WAREHOUSE snowpark_opt_wh SET warehouse_size='Medium'").collect()
st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_cudtf.mean_acctbal, compute_mean_stddev_cudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer, TABLE(compute_mean_stddev_cudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Concurrent tasks with worker process for Regular UDTF={(et-st).total_seconds()}")


print("Using Snowpark Optimized Large WH and 1000x dataset")
session.sql("ALTER WAREHOUSE snowpark_opt_wh SET warehouse_size='Large'").collect()
st=datetime.now()
session.sql("""
SELECT c_nationkey, compute_mean_stddev_cudtf.mean_acctbal, compute_mean_stddev_cudtf.stddev_acctbal
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.customer, TABLE(compute_mean_stddev_cudtf(c_acctbal) OVER (PARTITION BY c_nationkey));
""").collect()
et=datetime.now()
print(f"Concurrent tasks with worker process for Regular UDTF={(et-st).total_seconds()}")

### Cleanup

The following code block cleans up what was executed in this lab, by accomplishing the following:
 - Resetting the compute warehouse to SMALL
 - Suspending Snowpark optimzied warehouse

In [None]:
session.sql("ALTER WAREHOUSE compute_wh SET warehouse_size='Small'").collect()
session.sql("ALTER WAREHOUSE snowpark_opt_wh SUSPEND").collect()