# Setup Environment

## Import Dependencies and Create Session

In [None]:
from snowflake.snowpark import Session, functions as F
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.ml import dataset

In [None]:
session = Session.builder.configs(SnowflakeLoginOptions()).create()
print(session)

TEST_DATASET_DB = "DATASET_DEMO_DB"
TEST_DATASET_SCHEMA = "DATASET_DEMO_SCHEMA"
session.sql(f"CREATE DATABASE IF NOT EXISTS {TEST_DATASET_DB}").collect()
session.sql(f"""
    CREATE SCHEMA IF NOT EXISTS 
    {TEST_DATASET_DB}.{TEST_DATASET_SCHEMA}""").collect()
session.use_database(TEST_DATASET_DB)
session.use_schema(TEST_DATASET_SCHEMA)

# Prepare test data

We will use the [diamond price dataset](https://ggplot2.tidyverse.org/reference/diamonds.html) for this demo. The data can be downloaded from https://raw.githubusercontent.com/tidyverse/ggplot2/main/data-raw/diamonds.csv

In [None]:
import pandas as pd
from sklearn.preprocessing import StandardScaler, LabelEncoder

data_url = "https://raw.githubusercontent.com/tidyverse/ggplot2/main/data-raw/diamonds.csv"
data_pd = pd.read_csv(data_url)

# Encode categorical variables: cut, color, clarity
label_encoder = LabelEncoder()
data_pd['cut'] = label_encoder.fit_transform(data_pd['cut'])
data_pd['color'] = label_encoder.fit_transform(data_pd['color'])
data_pd['clarity'] = label_encoder.fit_transform(data_pd['clarity'])

# Scale numerical features: carat, x, y, z, depth, table
numerical_features = ['carat', 'x', 'y', 'z', 'depth', 'table']
scaler = StandardScaler()
data_pd[numerical_features] = scaler.fit_transform(data_pd[numerical_features])

df = session.create_dataframe(data_pd)
df.show()

Let's create a Snowflake Dataset from the raw dataset

In [None]:
ds_name = f"{TEST_DATASET_DB}.{TEST_DATASET_SCHEMA}.wine_data"
ds_version = "v1"

session.sql(f"DROP DATASET IF EXISTS {ds_name}").collect()
ds = dataset.create_from_dataframe(
    session,
    name=ds_name,
    version=ds_version,
    input_dataframe=df,
    label_cols=["price"],
)

print(f"Dataset: {ds.fully_qualified_name}")
print(f"Selected version: {ds.selected_version.name} ({ds.selected_version})")
print(f"Available versions: {ds.list_versions()}")

The Dataset object includes various connectors under the `read` property which we can use to inspect or consume the Dataset.

In [None]:
print([f for f in dir(ds.read) if not f.startswith('_') and callable(getattr(ds.read, f))])

print(ds.read.files())
print(ds.read.to_pandas().shape)

In [None]:
type(ds.read)

We could use this dataset as-is and do any train/test split at runtime if needed. However, we might want to guarantee consistent splitting by saving the pre-split dataset as versions of our Snowflake Dataset.

In [None]:
test_ratio = 0.2
uniform_min, uniform_max = 1, 10
pivot = (uniform_max - uniform_min + 1) * test_ratio
df_aug = df.with_column("_UNIFORM", F.uniform(uniform_min, uniform_max, F.random()))
ds.create_version(
    version="train",
    input_dataframe=df_aug.where(df_aug.col("_UNIFORM") > pivot).drop(df_aug.col("_UNIFORM")),
    label_cols=["price"],
)
ds.create_version(
    version="test",
    input_dataframe=df_aug.where(df_aug.col("_UNIFORM") <= pivot).drop(df_aug.col("_UNIFORM")),
    label_cols=["price"],
)

print(ds.list_versions())

train_ds = ds.select_version("train")
test_ds = ds.select_version("test")

print("train rows:", train_ds.read.to_snowpark_dataframe().count())
print("test rows:", test_ds.read.to_snowpark_dataframe().count())

# Model Training

Let's train and evaluate a basic PyTorch model using our newly created Snowflake Datasets

In [None]:
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

train_pd = train_ds.read.to_pandas()
X_train = train_pd.drop(columns=["price"])
y_train = train_pd["price"]
rf_regressor = RandomForestRegressor(n_estimators=100, random_state=42)
rf_regressor.fit(X_train, y_train)

# Evaluate the Model
test_pd = test_ds.read.to_pandas()
X_test = test_pd.drop(columns=["price"])
y_test = test_pd["price"]
y_pred = rf_regressor.predict(X_test)

# Calculate the Mean Squared Error
mse = mean_squared_error(y_test, y_pred)
print("Mean Squared Error:", mse)

We can run this same model in a stored procedure

In [None]:
local_code_imports = [
    (os.path.join(snowml_path, 'snowflake', 'ml', '_internal'), 'snowflake.ml._internal'),
    (os.path.join(snowml_path, 'snowflake', 'ml', 'fileset'), 'snowflake.ml.fileset'),
    (os.path.join(snowml_path, 'snowflake', 'ml', 'dataset'), 'snowflake.ml.dataset'),
]
for t in local_code_imports:
    session.add_import(*t, whole_file_hash=True)
    
deps = [
    "snowflake-snowpark-python",
    "snowflake-ml-python",
    "cryptography",
]

@F.sproc(session=session, packages=deps)
def ds_sproc(session: Session) -> float:
    train_ds = dataset.load_dataset(session, ds_name, "train")
    test_ds = dataset.load_dataset(session, ds_name, "test")

    train_pd = train_ds.read.to_pandas()
    X_train = train_pd.drop(columns=["price"])
    y_train = train_pd["price"]
    rf_regressor = RandomForestRegressor(n_estimators=100, random_state=42)
    rf_regressor.fit(X_train, y_train)

    # Evaluate the Model
    test_pd = test_ds.read.to_pandas()
    X_test = test_pd.drop(columns=["price"])
    y_test = test_pd["price"]
    y_pred = rf_regressor.predict(X_test)

    # Calculate the Mean Squared Error
    return mean_squared_error(y_test, y_pred)

print("Mean Squared Error:", ds_sproc(session))
session.clear_imports()

We can also use Dataset's connector APIs to integrate with ML frameworks like PyTorch

In [None]:
import numpy as np
import torch
from torch import nn, optim

class DiamondPricePredictor(nn.Module):
    def __init__(self):
        super(DiamondPricePredictor, self).__init__()
        self.fc1 = nn.Linear(9, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, 1)
        self.relu = nn.ReLU()
        
    def forward(self, carat, cut, color, clarity, depth, table, x, y, z):
        X = torch.cat((carat, cut, color, clarity, depth, table, x, y, z), axis=1)
        X = self.relu(self.fc1(X))
        X = self.relu(self.fc2(X))
        X = self.fc3(X)
        return X


def train_model(model: nn.Module, ds: dataset.Dataset, batch_size: int = 32, num_epochs: int = 10, learning_rate: float = 1e-3):
    model.train()

    # Define loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Training loop
    for epoch in range(num_epochs):
        for batch in ds.read.to_torch_datapipe(batch_size=batch_size):
            targets = torch.from_numpy(batch.pop("price")).unsqueeze(1).to(torch.float32)
            inputs = {k:torch.from_numpy(v).unsqueeze(1) for k,v in batch.items()}
            
            # Forward pass
            outputs = model(**inputs)
            loss = criterion(outputs, targets)
            
            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

    return model

def eval_model(model: nn.Module, ds: dataset.Dataset, batch_size: int = 32) -> float:
    model.eval()
    mse = 0.0
    with torch.no_grad():
        for batch in ds.read.to_torch_datapipe(batch_size=batch_size):
            targets = torch.from_numpy(batch.pop("price")).unsqueeze(1).to(torch.float32)
            inputs = {k:torch.from_numpy(v).unsqueeze(1) for k,v in batch.items()}

            outputs = model(**inputs)
            mse += nn.functional.mse_loss(outputs, targets).item()
    return mse

model = DiamondPricePredictor()
train_model(model, train_ds)
eval_model(model, test_ds)

(WIP) We can pass the Datasets into SnowML modeling APIs using either Snowpark DataFrame or Pandas DataFrame

In [None]:
from snowflake.ml.modeling.xgboost import XGBRegressor

FEATURE_COLS = ["carat", "cut", "color", "clarity", "depth", "table", "x", "y", "z"]
LABEL_COLS = ["price"]

# Train an XGBoost model on snowflake.
xgboost_model = XGBRegressor(
    input_cols=FEATURE_COLS,
    label_cols=LABEL_COLS,
)

xgboost_model.fit(train_ds.read.to_snowpark_dataframe())

# Use the model to make predictions.
predictions = xgboost_model.predict(test_ds.read.to_snowpark_dataframe())

# Future Work

There are several features which are still on the horizon for the Dataset client API, such as:
1. Adding multi-version Dataset support
2. Adding exclude_cols handling to all connectors (`to_pandas()`, `to_torch_datapipe()`, etc)
3. Consolidating FileSet functionality (reading from internal stage) into dataset.DataReader

# Clean Up Resources

In [None]:
session.sql(f"DROP SCHEMA IF EXISTS {TEST_DATASET_SCHEMA}").collect()
session.sql(f"DROP DATABASE IF EXISTS {TEST_DATASET_DB}").collect()
session.close()