In [1]:
import snowflake.connector
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import (
    IntegerType,
    StringType,
    StructField,
    StructType,
    DateType,
    BooleanType,
    DecimalType,
    FloatType,
    TimestampType,
    VariantType,
    ArrayType,
)

import toml
import os
import pandas as pd
import numpy as np

# 1. Create needed datasets


In [2]:
data_path = os.path.join(os.path.dirname(os.getcwd()), "data")
df = pd.read_csv(os.path.join(data_path, "heart.csv"))

# Split indices
indices = np.random.permutation(len(df))
split1 = int(len(df) / 3)
split2 = int(2 * len(df) / 3)

# Create the three datasets
df1 = df.iloc[indices[:split1]]
df2 = df.iloc[indices[split1:split2]]
df3 = df.iloc[indices[split2:]]

df1.to_csv(os.path.join(data_path, "dataset1.csv"), index=False)
df2.to_csv(os.path.join(data_path, "dataset2.csv"), index=False)
df3.to_csv(os.path.join(data_path, "dataset3.csv"), index=False)

# 2. Connect to Snowflake and create database


Use snowflake.connector to connect to Snowflake


In [3]:
config = toml.load("../config.toml")
connection_parameters = config["snowflake_connection"]

session = Session.builder.configs(connection_parameters).create()

Now, I will execute SQL commands to create a database and a schema in Snowflake.

I will not create a warehouse because I already have one, but the commented command can be used if a new warehouse is needed in the future.


In [4]:
session.sql("CREATE OR REPLACE DATABASE HEART_DB").collect()
# session.sql("CREATE OR REPLACE WAREHOUSE compute_wh WITH WAREHOUSE_SIZE='X-SMALL'").collect()

[Row(status='Database HEART_DB successfully created.')]

After creating the necessary database and schema, it is essential to set the current schema for subsequent SQL operations. The following command sets the schema to `PUBLIC`:


In [5]:
session.sql("USE SCHEMA PUBLIC;").collect()

[Row(status='Statement executed successfully.')]

To verify the current settings of the Snowflake session, we can execute the following SQL command:


In [6]:
session.sql(
    "SELECT current_warehouse(), current_database(), current_schema(), current_user(), current_role()"
).collect()

[Row(CURRENT_WAREHOUSE()='COMPUTE_WH', CURRENT_DATABASE()='HEART_DB', CURRENT_SCHEMA()='PUBLIC', CURRENT_USER()='JOANABAIAO', CURRENT_ROLE()='ACCOUNTADMIN')]

# 3. Create stages


Creating a stage in Snowflake is an important part of managing data in machine learning workflows. A stage is essentially a location where data files can be stored before they are loaded into a table or processed.

I will create the following stages:

- **LOAD_DATA_STAGE**: To ingest data into Snowflake.
- **MODEL_STAGE**: To store machine learning models that are generated during the project.
- **FUNCTION_STAGE**: To store Python functions
- **PACKAGE_STAGE**: To store any libraries not available in the Anaconda repository.


In [21]:
load_data_stage = "LOAD_DATA_STAGE"
model_stage = "MODEL_STAGE"
function_stage = "FUNCTION_STAGE"
package_stage = "PACKAGE_STAGE"

session.sql(f"CREATE OR REPLACE STAGE {load_data_stage}").collect()
session.sql(f"CREATE OR REPLACE STAGE {model_stage}").collect()
session.sql(f"CREATE OR REPLACE STAGE {function_stage}").collect()
session.sql(f"CREATE OR REPLACE STAGE {package_stage}").collect()

[Row(status='Stage area PACKAGE_STAGE successfully created.')]

I will also create a sequence for generating unique model IDs.


In [8]:
session.sql(
    "CREATE OR REPLACE SEQUENCE MODEL_SEQ START WITH 1 INCREMENT BY 1;"
).collect()

[Row(status='Sequence MODEL_SEQ successfully created.')]

### 3.1. Load data to stage "LOAD_DATA_STAGE"


In [22]:
data_path = os.path.join(os.path.dirname(os.getcwd()), "data")
file_path1 = os.path.join(data_path, "dataset1.csv")
file_path2 = os.path.join(data_path, "dataset2.csv")
file_path3 = os.path.join(data_path, "dataset3.csv")

session.file.put(file_path1, f"@{load_data_stage}")
session.file.put(file_path2, f"@{load_data_stage}")
session.file.put(file_path3, f"@{load_data_stage}")

[PutResult(source='dataset3.csv', target='dataset3.csv.gz', source_size=12723, target_size=2832, source_compression='NONE', target_compression='GZIP', status='UPLOADED', message='')]

Confirm that the file is in the "LOAD_DATA_STAGE" stage.


In [23]:
files = session.sql(f"LIST @{load_data_stage}").collect()
for file in files:
    print(file)

Row(name='load_data_stage/dataset1.csv.gz', size=2880, md5='111cdba7ac2932838eca831a839746b5', last_modified='Wed, 22 Jan 2025 11:33:36 GMT')
Row(name='load_data_stage/dataset2.csv.gz', size=2976, md5='cb5026952be0389eee7588b433f696c7', last_modified='Wed, 22 Jan 2025 11:33:37 GMT')
Row(name='load_data_stage/dataset3.csv.gz', size=2832, md5='ba71e2057c5e5cec0c5d153034eeeca6', last_modified='Wed, 22 Jan 2025 11:33:37 GMT')


# 4. Create needed tables


In this project, we will create several internal tables within Snowflake to manage and store key information related to the heart attack prediction models and patient data. These tables will be integral for storing model metadata, performance metrics, and patient health data, which can then be used for analysis and model evaluation.


## 4.1 Create the `MODEL_TRAINING_INFO` table


This table will store details about each model generated, such as model name, training data, and the scoring results.


In [11]:
schema_log = StructType(
    [
        StructField("training_date", TimestampType()),
        StructField("model_id", StringType()),
        StructField("model_name", StringType()),
        StructField("optimization", BooleanType()),
        StructField("training_table", StringType()),
        StructField("feature_columns", ArrayType(StringType())),
        StructField("accuracy", FloatType()),
        StructField("precision", FloatType()),
        StructField("recall", FloatType()),
        StructField("f1_score", FloatType()),
        StructField("auc_roc", FloatType()),
        StructField("TN", IntegerType()),
        StructField("FP", IntegerType()),
        StructField("FN", IntegerType()),
        StructField("TP", IntegerType()),
    ]
)

df_log = session.create_dataframe([], schema=schema_log)
df_log.write.mode("overwrite").save_as_table("MODEL_TRAINING_INFO")

## 3.2. Create the `INFERENCE_RESULTS` table

This table will be used to store the details of each prediction made by the deployed models. It will include information like the model used, the input data for inference, and the prediction results.


In [25]:
schema_inference = StructType(
    [
        StructField("inference_date", TimestampType()),
        StructField("model_id", StringType()),
        StructField("training_table", StringType()),
        StructField("test_table", StringType()),
        #StructField("predictions_table", StringType()),
        StructField("accuracy", FloatType()),
        StructField("precision", FloatType()),
        StructField("recall", FloatType()),
        StructField("f1_score", FloatType()),
        StructField("auc_roc", FloatType()),
        StructField("TN", IntegerType()),
        StructField("FP", IntegerType()),
        StructField("FN", IntegerType()),
        StructField("TP", IntegerType()),
    ]
)

df_inference = session.create_dataframe([], schema=schema_inference)
df_inference.write.mode("overwrite").save_as_table("INFERENCE_RESULTS")

## 3.3. Create the `MODEL_CATALOG` table


We will create a table to store the names of the available models. The Streamlit app will query this table to present the list of model names to the user, allowing them to choose a model for further analysis.


In [13]:
schema_model = StructType([StructField("model_name", StringType(), True)])

# Create a df with model names
model_names = [
    ["Random Forest"],
    ["XGBoost"],
    ["K-Nearest Neighbors"],
    ["Support Vector Machine"]
]
df_models_table = session.create_dataframe(model_names, schema=schema_model)

# Write the df to the models table
df_models_table.write.mode("overwrite").save_as_table("MODEL_CATALOG")

## 3.4. Create tables with the patients data


### 3.4.1. Test data upload process


Before automating the process, we first load a sample table manually to ensure everything works correctly.

**Steps:**

1. **Schema Definition:** I will define the schema to ensure that the data is correctly parsed when loading the CSV file into Snowflake.
2. **Load the Data:** We load the data from the staging area into a Snowflake DataFrame using the schema.
3. **Copy Data Into Snowflake Table:** Once the data is loaded into a Snowflake DataFrame, we copy it into a Snowflake table. This process ensures that the data is available in the correct table for further processing or analysis.
4. **Check Data in Table:** To ensure the data is present in the Snowflake table, query it and display a sample. The `session.table()` command fetches the data from the specified table, and result_df.show(5) displays the first 5 rows of the table to verify that the data has been successfully loaded.


In [14]:
schema = StructType(
    [
        StructField("age", IntegerType()),
        StructField("sex", IntegerType()),
        StructField("cp", IntegerType()),
        StructField("trestbps", DecimalType()),
        StructField("chol", IntegerType()),
        StructField("fbs", DecimalType()),
        StructField("restecg", DecimalType()),
        StructField("thalach", DecimalType()),
        StructField("exang", DecimalType()),
        StructField("oldpeak", DecimalType()),
        StructField("slope", DecimalType()),
        StructField("ca", DecimalType()),
        StructField("thal", IntegerType()),
        StructField("target", IntegerType()),
    ]
)

In [15]:
# Define variables for the destination table and file name
file_name = "dataset1.csv"  # Path to the CSV file in the staging area
table_name = "TEST_TABLE"

# Define snowflake dataframe
df_heart = (
    session.read.schema(schema)
    .options({"FIELD_DELIMITER": ",", "SKIP_HEADER": 1})
    .csv(f"@{load_data_stage}/{file_name}")
)
# df_heart.show(5)

# Copy data into table
copied_into_result = df_heart.copy_into_table(
    table_name, force=True, on_error="CONTINUE"
)

# Check data in table
df_heart_test = session.table(table_name)
df_heart_test.limit(5).to_pandas()

Unnamed: 0,AGE,SEX,CP,TRESTBPS,CHOL,FBS,RESTECG,THALACH,EXANG,OLDPEAK,SLOPE,CA,THAL,TARGET
0,44,1,1,120,263,0,1,173,0,0,2,0,3,1
1,64,0,2,140,313,0,1,133,0,0,2,0,3,1
2,44,1,0,112,290,0,0,153,0,0,2,1,2,0
3,57,1,2,150,126,1,1,173,0,0,2,1,3,1
4,58,1,0,114,318,0,2,140,0,4,0,3,1,0


### 3.4.2. Automate the Process with a Function


Now that the manual process has been validated, I will create a reusable function, copy_into(), that can be used in the Streamlit app to load staged data into Snowflake tables. This function will allow us to load data automatically based on the CSV file provided by the user.


In [16]:
def load_data_to_table(session: Session, file_name: str, table_name: str) -> str:
    """
    Loads CSV data from the staging area into a Snowflake table.
    """

    # Import required modules inside the function to ensure portability
    from snowflake.snowpark.types import (
        StructType,
        StructField,
        IntegerType,
        DecimalType,
    )

    # Define schema for patient heart data
    schema_heart = StructType(
        [
            StructField("age", IntegerType()),
            StructField("sex", IntegerType()),
            StructField("cp", IntegerType()),
            StructField("trestbps", DecimalType(10, 2)),
            StructField("chol", IntegerType()),
            StructField("fbs", DecimalType(10, 2)),
            StructField("restecg", DecimalType(10, 2)),
            StructField("thalach", DecimalType(10, 2)),
            StructField("exang", DecimalType(10, 2)),
            StructField("oldpeak", DecimalType(10, 2)),
            StructField("slope", DecimalType(10, 2)),
            StructField("ca", DecimalType(10, 2)),
            StructField("thal", IntegerType()),
            StructField("target", IntegerType()),
        ]
    )

    try:

        # Read the CSV file and load it into the specified table
        session.read.option("FIELD_DELIMITER", ",").option("SKIP_HEADER", 1).option(
            "ON_ERROR", "CONTINUE"
        ).schema(schema_heart).csv(file_name).copy_into_table(table_name)

        return f"{file_name} data successfully copied into table '{table_name}'"

    except Exception as e:
        return f"Error occurred while copying data into table '{table_name}': {str(e)}"

Test if the function is working correctly and verify if table was created


In [17]:
# Define variables for the destination table and file name
file_name = "@load_data_stage/dataset1.csv"  # Path to the CSV file in the staging area
table_name = "TEST_TABLE2"

load_data_to_table(session, file_name, table_name)

df_heart_test = session.table(table_name)
df_heart_test.limit(5).to_pandas()

Unnamed: 0,AGE,SEX,CP,TRESTBPS,CHOL,FBS,RESTECG,THALACH,EXANG,OLDPEAK,SLOPE,CA,THAL,TARGET
0,44,1,1,120.0,263,0.0,1.0,173.0,0.0,0.0,2.0,0.0,3,1
1,64,0,2,140.0,313,0.0,1.0,133.0,0.0,0.2,2.0,0.0,3,1
2,44,1,0,112.0,290,0.0,0.0,153.0,0.0,0.0,2.0,1.0,2,0
3,57,1,2,150.0,126,1.0,1.0,173.0,0.0,0.2,2.0,1.0,3,1
4,58,1,0,114.0,318,0.0,2.0,140.0,0.0,4.4,0.0,3.0,1,0


### 3.4.3. Register the copy_into() function as a stored procedure


To optimize the data loading process, we register the _copy_into()_ function as a **stored procedure** in Snowflake. This allows the function to be executed directly within Snowflake, improving performance, centralizing logic, and making the operation reusable across different platforms (like Streamlit).


In [18]:
session.sproc.register(
    func=load_data_to_table,
    name="load_data_to_table",
    packages=["snowflake-snowpark-python"],
    is_permanent=True,
    stage_location=f"@{function_stage}",
    replace=True,
)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x10eea4c50>

To execute the stored procedure using Snowpark, we can use the `session.call` method.

The `session.call()` method triggers the stored procedure, passing the file path and target table name as parameters.


In [19]:
session.call("load_data_to_table", f"@{load_data_stage}/dataset1.csv", "DATA_TABLE_1")
session.call("load_data_to_table", f"@{load_data_stage}/dataset2.csv", "DATA_TABLE_2")
session.call("load_data_to_table", f"@{load_data_stage}/dataset3.csv", "DATA_TABLE_3")

"@LOAD_DATA_STAGE/dataset3.csv data successfully copied into table 'DATA_TABLE_3'"

In [20]:
session.sql(f"DROP TABLE IF EXISTS TEST_TABLE").collect()
session.sql(f"DROP TABLE IF EXISTS TEST_TABLE2").collect()

[Row(status='TEST_TABLE2 successfully dropped.')]