### Dataframe Comparisons in Snowflake

A DataFrame is a fundamental data structure in data analysis and programming, used to organize data into a two-dimensional table with rows and columns, similar to a spreadsheet or a table in a relational database. 

Using the Pandas library, Python can work with data in a dataframe structure. The Pandas dataframe however, stores all the data in memory which is problamatic for large data sets.  Snowflake has addressed this with the following two options: Snowpark DataFrame API and Snowpark Pandas DataFrame API.

This tutorial will demonstrate and compare the three dataframe types.
1. Snowpark DataFrame
    - https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/dataframe

2. Snowpark Pandas DataFame (aka Modin)
    - https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/modin/dataframe

3. Pandas DataFrame
    - https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.html


In [None]:
# Imports
from datetime import datetime
import streamlit as st
import pandas as pd

from snowflake.snowpark.context import get_active_session
from snowflake.core import Root
import modin.pandas as spd  # Snowpark Pandas API
import snowflake.snowpark.modin.plugin  # Required for Snowpark Pandas API to work

In [None]:
# Create a snowpark session
session = get_active_session()

# Add a query tag to the session for troubleshooting and monitoring
session.query_tag = {
    "origin":"sf_sit-is", 
    "name":"dataframe_comparisons", 
    "version":{"major":1, "minor":0},
    "attributes":{"is_quickstart":1, "source":"notebook", "vignette":"snowpark_pandas"}
}

# Set root
root = Root(session)

In [None]:
-- Set DDL

-- Warehouses
CREATE OR REPLACE WAREHOUSE TEST_WH WAREHOUSE_SIZE = XSMALL, AUTO_SUSPEND = 300, AUTO_RESUME= TRUE;
USE WAREHOUSE TEST_WH;

-- Databases
CREATE OR ALTER DATABASE dataframe_comparisons;
USE DATABASE dataframe_comparisons;

-- Schemas
CREATE OR ALTER SCHEMA DATA;
USE SCHEMA DATA;


In [None]:
# Set Paths

# Raw
raw_db = "SNOWFLAKE_SAMPLE_DATA"
raw_schema = "TPCH_SF1000"
raw_table = "LINEITEM"
raw_data_path = f"{raw_db}.{raw_schema}.{raw_table}"

# Stage
stage_db = "dataframe_comparisons"
stage_schema = "DATA"
stage_data_path_small = f"{stage_db}.{stage_schema}.lineitem_small"
stage_data_path_medium = f"{stage_db}.{stage_schema}.lineitem_medium"
stage_data_path_large = f"{stage_db}.{stage_schema}.lineitem_large"

In [None]:
CREATE OR REPLACE TABLE {{stage_data_path_small}} AS
    SELECT *
    FROM {{raw_data_path}}
    LIMIT 1000;

CREATE OR REPLACE TABLE {{stage_data_path_medium}} AS
    SELECT *
    FROM {{raw_data_path}}
    LIMIT 100000;

CREATE OR REPLACE TABLE {{stage_data_path_large}} AS
    SELECT *
    FROM {{raw_data_path}}
    LIMIT 10000000;

In [None]:
def create_dataframe_from_records(name):
    # This function creates a dataframe from a set of records

    column_list = ['Number', 'Name']
    record_list = [
        (1, 'one'),
        (2, 'two')
]
    if name == 'snowpark_df':
        return session.create_dataframe(record_list, schema=column_list)
    
    elif name == 'snowpark_pandas_df':
        return spd.DataFrame(record_list, columns=column_list)
    
    else:
        return pd.DataFrame(record_list, columns=column_list)


In [None]:
def create_dataframe_from_table(name, path):
    # This function creates a dataframe from a table
    
    if name == 'snowpark_df':
        return session.table(path)
    
    elif name == 'snowpark_pandas_df':
        return spd.read_snowflake(path)
    
    else:
        # Note: This can't be done directly from within Snowflake
        
        # Option 1: Locally
        # sql = "SELECT * FROM db.schema.table"
        # cnxn = # make local connection to database
        # pd.read_sql(sql,cnxn
        
        # Option 2: Using conversion (Note: use case for this is limited - think Matplotlib)
        snowpark_df = session.table(path)
        return snowpark_df.to_pandas()

In [None]:
def display_dataframe(name, df):
    
    if name == 'snowpark_df':
        df.first(10)
    
    elif name == 'snowpark_pandas_df':
        df.head(10)
    
    else:
        df.head(10)

In [None]:
def convert_to_snowpark(name, df):
    if name == 'snowpark_df':
        new_df = df    
    elif name == 'snowpark_pandas_df':
        new_df = df.to_snowpark()
    else:
        new_df = session.create_dataframe(df)

def convert_to_snowpark_pandas(name, df):
    if name == 'snowpark_df':
        new_df = df.to_snowpark_pandas()
    elif name == 'snowpark_pandas_df':
        new_df = df 
    else:
        new_df = session.create_dataframe(df) # to snowpark first
        new_df = new_df.to_snowpark_pandas() # then to snowpark_pandas

def convert_to_pandas(name, df):
    if name == 'snowpark_df':
        new_df = df.to_pandas()
    elif name == 'snowpark_pandas_df':
        new_df = df.to_pandas()
    else:
        new_df = df

In [None]:
time_columns = ['job', 'df', 'table_size', 'execution_time']
time_records = []

name_list = ['snowpark_df', 'snowpark_pandas_df', 'pandas_df']
table_list = [
    {'size': 'small', 'path': stage_data_path_small},
    {'size': 'medium', 'path': stage_data_path_medium},
    {'size': 'large', 'path':stage_data_path_large}
]
df_dict = {}

for indx, name in enumerate(name_list):
    # create_dataframe_from_records
    start = datetime.now()
    create_dataframe_from_records(name)
    stop = datetime.now()
    execution_time = str(stop - start)
    df_name = f"{indx} {name}"
    job_name = '0 create_dataframe_from_records'
    time_records.append((job_name, df_name, 'n/a', execution_time))
    
    for table in table_list:   
        # create_dataframe_from_table
        start = datetime.now()
        df = create_dataframe_from_table(name, table['path'])
        df_dict[name] = df
        stop = datetime.now()
        execution_time = str(stop - start)
        df_name = f"{indx} {name}"
        job_name = '1 create_dataframe_from_table'
        time_records.append((job_name, df_name, table['size'], execution_time))

        # display_dataframe
        start = datetime.now()
        display_dataframe(name, df)
        stop = datetime.now()
        execution_time = str(stop - start)
        df_name = f"{indx} {name}"
        job_name = '2 display_dataframe'
        time_records.append((job_name, df_name, table['size'], execution_time))
        
        # convert_dataframes
        start = datetime.now()
        try:
            convert_to_snowpark(name, df)
        except Exception as e:
            print(f"convert_to_snowpark {name} error: {e}")
        stop = datetime.now()
        execution_time = str(stop - start)
        df_name = f"{indx} {name}"
        job_name = '3a convert_to_snowpark'
        time_records.append((job_name, df_name, table['size'], execution_time))

        start = datetime.now()
        try:
            convert_to_snowpark_pandas(name, df)
        except Exception as e:
            print(f"convert_to_snowpark_pandas {name} error: {e}")
        stop = datetime.now()
        execution_time = str(stop - start)
        df_name = f"{indx} {name}"
        job_name = '3b convert_to_snowpark_pandas'
        time_records.append((job_name, df_name, table['size'], execution_time))

        start = datetime.now()
        try:
            convert_to_pandas(name, df)
        except Exception as e:
            print(f"convert_to_pandas {name} error: {e}")
        stop = datetime.now()
        execution_time = str(stop - start)
        df_name = f"{indx} {name}"
        job_name = '3c convert_to_pandas'
        time_records.append((job_name, df_name, table['size'], execution_time))

# Display comparisons
compare_df = pd.DataFrame(time_records, columns=time_columns)
compare_df.sort_values(['job', 'df', 'table_size'],  ascending=[True, True, False])

In [None]:
-- Teardown

DROP DATABASE dataframe_comparisons;


