In [None]:
# Import python packages
import streamlit as st
import pandas as pd
import snowflake.snowpark.modin.plugin
import modin.pandas as mpd
import snowflake.snowpark.modin.plugin
import warnings; warnings.simplefilter('ignore') # We sometimes get warnings in cell outputs, stop these from being displayed.
import timeit
import snowflake.snowpark.functions as F



from time import perf_counter
from snowflake.snowpark.context import get_active_session

In [None]:
# Create a Snowflake session.
session = get_active_session()


# What size dataset will we get?
lineitem_table_name = 'UTILS.PUBLIC.TPCHSF1_LINEITEMS'
lineitem_with_total_table_name = 'UTILS.PUBLIC.TPCHSF1_LINEITEMS_WITH_TOTAL'

In [None]:
-- Copy the sample data from the share, otherwise the Pandas API needs to do a copy and so the tests aren't fair.
create or replace table UTILS.PUBLIC.TPCHSF1_LINEITEMS as
select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM;


create or replace table UTILS.PUBLIC.TPCHSF10_LINEITEMS as
select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF10.LINEITEM;


create or replace table UTILS.PUBLIC.TPCHSF100_LINEITEMS as
select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF100.LINEITEM;


create or replace table UTILS.PUBLIC.TPCHSF1000_LINEITEMS as
select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1000.LINEITEM;

In [None]:
-- Create a table to store the test results.
create or replace table UTILS.PUBLIC.TPCHSF_TEST_RESULTS (
    METHOD varchar,
    DATA_SIZE varchar,
    DEFINE_TABLE_TIME number(12,4),
    TRANSFORMING_TIME number(12,4),
    WRITE_TABLE_TIME number(12,4),
    TOTAL_TIME number(12,4),
    RECORDS_WRITTEN number(12,0),
    TEST_TIME timestamptz default current_timestamp()
);

In [None]:
# Start timing.
start = perf_counter()


# Define a lazily-evaluated Snowpark Table.
snowpark_df = session.table( lineitem_table_name )
end_defining_table = perf_counter()


# We need to calculate the total price for each line item. Discount and tax are decimals, i.e. 0.04 = 4%.
# Nothing will be sent to Snowflake yet, and we don't actually do anything.
snowpark_df = snowpark_df.with_column(
    'L_TOTAL_PRICE', 
    F.col( 'L_EXTENDEDPRICE' ) * ( 1 - F.col( 'L_DISCOUNT' ) ) * ( 1 + F.col( 'L_TAX' ) ) 
)
end_transformations = perf_counter()


# Write the results to a Snowflake table.
snowpark_df.write.mode( 'overwrite' ).save_as_table( lineitem_with_total_table_name )
end_saving_table = perf_counter()


# End the whole test.
end = perf_counter()


# Lets calculate some timings.
time_defining_table = end_defining_table - start
time_transformationing = end_transformations - end_defining_table
time_saving = end_saving_table - end_transformations
time_total = end - start


# How many records did we write?
records_written = session.table( lineitem_with_total_table_name ).count()


# Write the results out.
st.write( str( '{0:,.4f}'.format( time_defining_table ) ) + ' seconds to define the Snowpark table.' )
st.write( str( '{0:,.4f}'.format( time_transformationing ) ) + ' seconds to define the transformations.' )
st.write( str( '{0:,.4f}'.format( time_saving ) ) + ' seconds to write to a table.' )
st.write( str( '{0:,.4f}'.format( time_total ) ) + ' seconds in total.' )
st.write( str( '{0:,.0f}'.format( records_written ) ) + ' records written.' )


# Write the results.
test_data = [
    ['SnowparkCalculatesTotalPrice', lineitem_with_total_table_name, time_defining_table, time_transformationing, time_saving, time_total, records_written]
]

# Create the pandas DataFrame
test_results = pd.DataFrame( 
    test_data, 
    columns=['METHOD', 'DATA_SIZE', 'DEFINE_TABLE_TIME', 'TRANSFORMING_TIME', 'WRITE_TABLE_TIME', 'TOTAL_TIME', 'RECORDS_WRITTEN']
)

session.write_pandas( 
    df=test_results, 
    database='UTILS', 
    schema='PUBLIC', 
    table_name='TPCHSF_TEST_RESULTS'
)

In [None]:
# Start timing.
start = perf_counter()


# Define a lazily-evaluated Pandas Table.
modin_pandas_df = mpd.read_snowflake( lineitem_table_name )
end_defining_table = perf_counter()


# We need to calculate the total price for each line item. Discount and tax are decimals, i.e. 0.04 = 4%.
# Nothing will be sent to Snowflake yet, and we don't actually do anything.
modin_pandas_df["L_TOTAL_PRICE"] = modin_pandas_df["L_EXTENDEDPRICE"] * \
    ( 1 - modin_pandas_df["L_DISCOUNT"] ) * \
    ( 1 + modin_pandas_df["L_TAX"] )
end_transformations = perf_counter()


# Save the result back to Snowflake with a row_pos column.
modin_pandas_df.reset_index(drop=True).to_snowflake(
    'UTILS.PUBLIC.DELETEME', 
    if_exists='replace', 
    index=True, 
    index_label=['row_pos']
)
end_saving_table = perf_counter()


# End the whole test.
end = perf_counter()


# Lets calculate some timings.
time_defining_table = end_defining_table - start
time_transformationing = end_transformations - end_defining_table
time_saving = end_saving_table - end_transformations
time_total = end - start


# How many records did we write?
records_written = session.table( lineitem_with_total_table_name ).count()


# Write the results out.
st.write( str( '{0:,.4f}'.format( time_defining_table ) ) + ' seconds to define the Modin Pandas Dataframe.' )
st.write( str( '{0:,.4f}'.format( time_transformationing ) ) + ' seconds to define the transformations.' )
st.write( str( '{0:,.4f}'.format( time_saving ) ) + ' seconds to write to a table.' )
st.write( str( '{0:,.4f}'.format( time_total ) ) + ' seconds in total.' )
st.write( str( '{0:,.0f}'.format( records_written ) ) + ' records written.' )


# Write the test times to a table.
test_data = [
    ['PandasAPICalculatesTotalPrice', lineitem_with_total_table_name, time_defining_table, time_transformationing, time_saving, time_total, records_written]
]

# Create the pandas DataFrame
test_results = pd.DataFrame( 
    test_data, 
    columns=['METHOD', 'DATA_SIZE', 'DEFINE_TABLE_TIME', 'TRANSFORMING_TIME', 'WRITE_TABLE_TIME', 'TOTAL_TIME', 'RECORDS_WRITTEN']
)

session.write_pandas( 
    df=test_results, 
    database='UTILS', 
    schema='PUBLIC', 
    table_name='TPCHSF_TEST_RESULTS'
)

In [None]:
# Start timing.
start = perf_counter()


# Define a lazily-evaluated Snowpark Table.
pandas_df = session.table( lineitem_table_name ).to_pandas()
end_defining_table = perf_counter()


# We need to calculate the total price for each line item. Discount and tax are decimals, i.e. 0.04 = 4%.
# Nothing will be sent to Snowflake yet, and we don't actually do anything.
pandas_df['L_TOTAL_PRICE'] = pandas_df['L_EXTENDEDPRICE'] * \
    ( 1 - pandas_df["L_DISCOUNT"] ) * \
    ( 1 + pandas_df["L_TAX"] )
end_transformations = perf_counter()


# Write the results to a Snowflake table.
pandas_df = session.write_pandas( 
    pandas_df, 
    lineitem_with_total_table_name, 
    auto_create_table=True, 
    overwrite=True,
    table_type='' # An empty string means to create a permanent table.
)
end_saving_table = perf_counter()


# End the whole test.
end = perf_counter()


# Lets calculate some timings.
time_defining_table = end_defining_table - start
time_transformationing = end_transformations - end_defining_table
time_saving = end_saving_table - end_transformations
time_total = end - start


# How many records did we write?
records_written = session.table( lineitem_with_total_table_name ).count()


# Write the results out.
st.write( str( '{0:,.4f}'.format( time_defining_table ) ) + ' seconds to define the Pandas Dataframe.' )
st.write( str( '{0:,.4f}'.format( time_transformationing ) ) + ' seconds to define the transformations.' )
st.write( str( '{0:,.4f}'.format( time_saving ) ) + ' seconds to write to a table.' )
st.write( str( '{0:,.4f}'.format( time_total ) ) + ' seconds in total.' )
st.write( str( '{0:,.0f}'.format( records_written ) ) + ' records written.' )


# Write the test times to a table.
test_data = [
    ['PandasCalculatesTotalPrice', lineitem_with_total_table_name, time_defining_table, time_transformationing, time_saving, time_total, records_written]
]

# Create the pandas DataFrame
test_results = pd.DataFrame( 
    test_data, 
    columns=['METHOD', 'DATA_SIZE', 'DEFINE_TABLE_TIME', 'TRANSFORMING_TIME', 'WRITE_TABLE_TIME', 'TOTAL_TIME', 'RECORDS_WRITTEN']
)

session.write_pandas( 
    df=test_results, 
    database='UTILS', 
    schema='PUBLIC', 
    table_name='TPCHSF_TEST_RESULTS'
)

In [None]:
st.dataframe( session.sql( """
    select 
        METHOD::varchar,
        DATA_SIZE::varchar,
        DEFINE_TABLE_TIME::number(12,4),
        TRANSFORMING_TIME::number(12,4),
        WRITE_TABLE_TIME::number(12,4),
        TOTAL_TIME::number(12,4),
        RECORDS_WRITTEN::number(12,0),
        TEST_TIME
        
    from UTILS.PUBLIC.TPCHSF_TEST_RESULTS
""" ).to_pandas() )