In [62]:
import json
import numpy as np
import pandas as pd
import os
import sys
import time

from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import sproc, col
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T

from snowflake.snowpark.types import PandasDataFrameType, IntegerType, StringType, FloatType, Variant
from snowflake.snowpark.exceptions import SnowparkSQLException

import torch
import torch.distributed as dist
from torch import nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader
import os
import torch.distributed as dist
from snowflake.ml.fileset import fileset

In [63]:
# Reading Snowflake Connection Details
snowflake_connection_cfg = json.loads(open("/Users/mitaylor/Documents/creds/creds.json").read())

# Creating Snowpark Session
session = Session.builder.configs(snowflake_connection_cfg).create()

# Create a fresh & new schema
session.sql("CREATE OR REPLACE DATABASE PYTORCH_DEMO").collect()
session.sql('''CREATE OR REPLACE STAGE UDF_STAGE''').collect()
session.sql('''CREATE OR REPLACE STAGE FILESET_DEMO
  DIRECTORY = ( ENABLE = true )
  encryption=(type='SNOWFLAKE_SSE')''').collect()

session.sql("CREATE OR REPLACE WAREHOUSE ASYNC_WH WITH WAREHOUSE_SIZE='X-SMALL'").collect()

[Row(status='Warehouse ASYNC_WH successfully created.')]

# 1. Create a Data Set

# 1.1 Load some arbitrary data into Snowflake

In [64]:
from sklearn.datasets import make_classification
import pandas as pd
columns = [str(i) for i in range(0,10)]
X,y = make_classification(n_samples=100000, n_features=10, n_classes=2)
X = np.array(X, dtype=np.float32)
df = pd.DataFrame(X, columns=columns)
feature_cols = ["COL" + i for i in df.columns]
df.columns = feature_cols
df['Y'] = y
session.write_pandas(df, table_name='DUMMY_DATASET', auto_create_table=True, overwrite=True)

<snowflake.snowpark.table.Table at 0x7fd12bab7dc0>

## 1.2 Create a Fileset Snapshot

In [65]:
sdf = session.table('DUMMY_DATASET')
train_sdf, test_sdf = sdf.random_split(weights=[0.8, 0.2], seed=0)
train_sdf.write.mode('overwrite').save_as_table('DUMMY_DATASET_TRAIN')
test_sdf.write.mode('overwrite').save_as_table('DUMMY_DATASET_TEST')

In [66]:
FS_STAGE_NAME = "FILESET_DEMO"
fileset_train_sdf = fileset.FileSet.make(
    target_stage_loc=f"@{session.get_current_database()}.{session.get_current_schema()}.{FS_STAGE_NAME}/",
    name="DUMMY_FILESET_TRAIN",
    snowpark_dataframe=train_sdf,
    shuffle=True,
)

fileset_test_sdf = fileset.FileSet.make(
    target_stage_loc=f"@{session.get_current_database()}.{session.get_current_schema()}.{FS_STAGE_NAME}/",
    name="DUMMY_FILESET_TEST",
    snowpark_dataframe=test_sdf,
    shuffle=True,
)

# 1.3 Get the Filset locally 

In [67]:
session.sql("GET @FILESET_DEMO/DUMMY_FILESET_TRAIN 'file:///Users/mitaylor/Documents/GitHub/AA Cleaned Repos/simple-pytorch-example/data/train' ").collect()
session.sql("GET @FILESET_DEMO/DUMMY_FILESET_TEST 'file:///Users/mitaylor/Documents/GitHub/AA Cleaned Repos/simple-pytorch-example/data/test' ").collect()

[Row(file='data_01b3fe58-0000-e056-0000-f14900af7e92_016_1_0.snappy.parquet', size=1187428, status='DOWNLOADED', message='')]

# 2. Build Neural Net In Pytorch

## 2.1 Prep the Data

In [68]:
X_tens = torch.tensor(X)
y_tens = torch.tensor(y)

In [69]:
# convert into PyTorch tensors
X_tens = torch.empty_like(X_tens).copy_(X_tens)
y_tens = torch.empty_like(y_tens).copy_(y_tens).reshape(-1, 1)
loader = DataLoader(list(zip(X_tens,y_tens)), shuffle=True, batch_size=16)


In [70]:
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(10, 10),
            nn.ReLU(),
            nn.Linear(10, 1),
            nn.ReLU(),
        )

    def forward(self, tensor:torch.Tensor):
        return self.model(tensor)

# 3. Train the Neural Network

In [71]:
def train_model():
    n_epochs = 5
    device = 'cpu'
    model = MyModel()
    model = model.to(device)
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.1)
    start_time = time.time()
    
    # Training step
    for epoch in range(n_epochs):
        current_loss = 0.0
    
        for batch, (X, y) in enumerate(loader):
    
            X_batch, y_batch = X.to(device), y.to(device)
            # forward pass
            y_pred = model(X_batch)
    
            # compute loss
            loss = loss_fn(y_pred.float(), y_batch.float())
    
            # backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
    
            current_loss += loss.item()
    
        if epoch % 10 == 0:
           print(f"Loss after epoch {epoch}: {current_loss}")
           for param in model.parameters():
                print(param.data)
    
    end_time = time.time()
    print('Model training complete.')
    print(f'Training time: {end_time-start_time}')
    return model

In [72]:
model = train_model()

Loss after epoch 0: 637.2545770109864
tensor([[-1.8317e-02,  1.5106e-01, -1.4328e-02, -1.9379e-01, -6.5999e-03,
         -2.1097e-02,  1.3912e-01,  4.0608e-03,  5.8638e-02,  1.1382e-03],
        [-2.4535e-01,  1.5937e-01, -5.2283e-02,  2.4372e-01,  7.2789e-02,
          1.1512e-01, -1.5283e-01,  1.9260e-01,  7.1760e-02, -9.3361e-02],
        [ 3.7031e-02,  7.0855e-02,  5.0988e-02,  5.4159e-02,  1.9298e-02,
         -5.1789e-02,  1.3391e-01, -1.4499e-01, -3.7774e-02,  2.0743e-01],
        [-1.0262e-02,  2.1683e-01,  7.9308e-03,  2.0104e-01, -2.0156e-01,
          1.7413e-04, -1.0076e-02, -7.0204e-03, -6.5704e-03,  1.5473e-02],
        [ 5.6176e-03,  5.3171e-01, -1.5312e-02, -3.5281e-02,  4.7479e-01,
          4.5552e-01,  8.1175e-03,  2.8066e-02, -2.9128e-02,  2.6518e-02],
        [-2.0953e-01, -2.5814e-02, -2.0038e-01, -1.6099e-01,  1.7992e-01,
         -1.6004e-01,  1.4637e-01, -1.1547e-01,  1.0821e-01, -1.4588e-01],
        [ 1.8236e-01, -5.4621e-02, -1.4506e-01,  1.9343e-01,  1.9273

# 4. Deploy model (into Registry, then into a UDF)

In [75]:
from snowflake.ml.registry import registry

REGISTRY_DATABASE_NAME = "PYTORCH_DEMO"
REGISTRY_SCHEMA_NAME = "PUBLIC"
native_registry = registry.Registry(session=session, database_name=REGISTRY_DATABASE_NAME, schema_name=REGISTRY_SCHEMA_NAME)

In [76]:
model_ref = native_registry.log_model(
    model,
    model_name="torchModel",
    version_name="v1",
    sample_input_data=[X_tens],
)

  return next(self.gen)


In [77]:
model_ref.show_functions()

[{'name': 'FORWARD',
  'target_method': 'forward',
  'signature': ModelSignature(
                      inputs=[
                          FeatureSpec(dtype=DataType.FLOAT, name='input_feature_0', shape=(10,))
                      ],
                      outputs=[
                          FeatureSpec(dtype=DataType.FLOAT, name='output_feature_0', shape=(1,))
                      ]
                  )}]

In [78]:
model_ref.run([X_tens])

Unnamed: 0,output_feature_0
0,[0.35751473903656006]
1,[0.7751520872116089]
2,[0.26887667179107666]
3,[0]
4,[0.9140706658363342]
...,...
99995,[0.5455130338668823]
99996,[0.997458279132843]
99997,[0]
99998,[0.3630709648132324]


In [88]:
input_data_df = session.sql("select DUMMY_DATASET")
input_data_df = input_data_df.drop("y")
input_data_df = input_data_df.with_column('"input_feature_0"', F.
                                          ('*'))
input_data_df.limit(1).show()

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"0"                   |"1"                    |"2"                 |"3"                  |"4"                 |"5"                  |"6"                  |"7"                  |"8"                  |"9"                 |"y"  |"input_feature_0"          |
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|-0.29623791575431824  |-0.042623672634363174  |0.7249094843864441  |-0.4209217131137848  |0.4089137613773346  |0.26310214400291443  |0.40178823471069336  |0.12115471065044403  |-1.9882298707962036  |0.7772805094718933  |1    |[    

In [81]:
predictions_df = model_ref.run(input_data_df)


In [82]:
predictions_df.show()

SnowparkSQLException: (1304): 01b3fe5a-0000-e048-0000-f14900af8612: 100357 (P0000): Python Interpreter Error:
Traceback (most recent call last):
  File "/home/udf/4048102905/forward.py", line 78, in infer
    predictions_df = runner(input_df[input_cols])
  File "/usr/lib/python_udf/c0ec591262b5905d2537dfcb5a546dbd94104d87acd8f3fb45088ac394f55e81/lib/python3.10/site-packages/snowflake/ml/model/_packager/model_handlers/pytorch.py", line 186, in fn
    res = getattr(raw_model, target_method)(*t)
  File "/var/folders/97/8vc6xcbx4zd06p75xg9frdrw0000gn/T/ipykernel_28105/452384569.py", line 12, in forward
  File "/usr/lib/python_udf/c0ec591262b5905d2537dfcb5a546dbd94104d87acd8f3fb45088ac394f55e81/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1511, in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
  File "/usr/lib/python_udf/c0ec591262b5905d2537dfcb5a546dbd94104d87acd8f3fb45088ac394f55e81/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1520, in _call_impl
    return forward_call(*args, **kwargs)
  File "/usr/lib/python_udf/c0ec591262b5905d2537dfcb5a546dbd94104d87acd8f3fb45088ac394f55e81/lib/python3.10/site-packages/torch/nn/modules/container.py", line 217, in forward
    input = module(input)
  File "/usr/lib/python_udf/c0ec591262b5905d2537dfcb5a546dbd94104d87acd8f3fb45088ac394f55e81/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1511, in _wrapped_call_impl
    return self._call_impl(*args, **kwargs)
  File "/usr/lib/python_udf/c0ec591262b5905d2537dfcb5a546dbd94104d87acd8f3fb45088ac394f55e81/lib/python3.10/site-packages/torch/nn/modules/module.py", line 1520, in _call_impl
    return forward_call(*args, **kwargs)
  File "/usr/lib/python_udf/c0ec591262b5905d2537dfcb5a546dbd94104d87acd8f3fb45088ac394f55e81/lib/python3.10/site-packages/torch/nn/modules/linear.py", line 116, in forward
    return F.linear(input, self.weight, self.bias)
RuntimeError: mat1 and mat2 shapes cannot be multiplied (10x11 and 10x10)
 in function FORWARD with handler forward.infer

# 5. Run it on a Fileset in Snowflake

In [41]:
help(model_ref)

Help on ModelVersion in module snowflake.ml.model._client.model.model_version_impl object:

class ModelVersion(builtins.object)
 |  ModelVersion() -> None
 |  
 |  Model Version Object representing a specific version of the model that could be run.
 |  
 |  Methods defined here:
 |  
 |  __eq__(self, _ModelVersion__value: object) -> bool
 |      Return self==value.
 |  
 |  __init__(self) -> None
 |      Initialize self.  See help(type(self)) for accurate signature.
 |  
 |  delete_metric(self, metric_name: str) -> None
 |      Delete a metric from metric storage.
 |      
 |      Args:
 |          metric_name: The name of the metric to be deleted.
 |      
 |      Raises:
 |          KeyError: When the requested metric name does not exist.
 |  
 |  get_metric(self, metric_name: str) -> Any
 |      Get the value of a specific metric.
 |      
 |      Args:
 |          metric_name: The name of the metric.
 |      
 |      Raises:
 |          KeyError: When the requested metric name does

# Do the next cell in a sproc or UDF for server side inference

In [44]:
# Use FileSet to get data from a Snowflake table in the form of files in an internal server-side excrypted stage







from torch.utils.data import DataLoader


#write to a fileset

STAGE_NAME = "FILESET_DEMO"
fileset_test_df = fileset.FileSet(
    target_stage_loc=f"@{session.get_current_database()}.{session.get_current_schema()}.{STAGE_NAME}/",
    name="DUMMY_FILESET_TEST",
    snowpark_session=session,
)

pipe = fileset_test_df.to_torch_datapipe(
    batch_size=4,
    shuffle=True,
    drop_last_batch=True)

for batch in DataLoader(pipe, batch_size=None, num_workers=0):
    print(batch)
    break




{'0': tensor([-0.5607, -0.2020, -0.1885,  0.5232]), '1': tensor([0.3300, 0.7217, 1.0068, 0.5628]), '2': tensor([-0.2890,  0.3478, -0.2592, -0.5748]), '3': tensor([0.4056, 0.8343, 0.5201, 0.6205]), '4': tensor([ 0.9908, -1.1942,  0.5714, -1.9183]), '5': tensor([-0.8134, -0.4245, -0.0218,  0.5493]), '6': tensor([-0.9227, -0.3730, -2.2369,  1.1993]), '7': tensor([-0.5810,  0.3710,  0.2800, -0.8917]), '8': tensor([ 1.7019,  0.6239,  0.5516, -1.5708]), '9': tensor([-1.1930, -0.7236,  0.1613,  0.6446]), 'y': tensor([1, 1, 0, 0])}


  return torch.as_tensor(data)


In [40]:
type(test_dp)

snowflake.ml.fileset.torch_datapipe.ReadAndParseParquet

In [None]:
model_ = native_registry.get_model(MODEL_NAME).version(MODEL_VERSION)
model_.run(sdf_filt_test, function_name="predict").write.save_as_table("ML_PREDICT", mode="overwrite")


@udf(...)
def predict_with_pytorch()
    model_ = native_registry.get_model(MODEL_NAME).version(MODEL_VERSION)
    turns it into tensors
    results = model_ref.run([X_tens])
    returns results

In [None]:
test_sdf.with_column('PREDICTION', pytorch_udf_model(*feature_cols)).to_pandas()
#where the internals convert it into a fileset/tensor and 

In [36]:
# or sproc this

#Use FileSet to get data from a Snowflake table in the form of files in an internal server-side excrypted stage
fileset_train_df = fileset.FileSet(
    target_stage_loc=f"@{session.get_current_database()}.{session.get_current_schema()}.{STAGE_NAME}/",
    name="diamonds_train",
    snowpark_session=session,
)

# Feed Training FileSet to Pytorch
# Get PyTorch DataPipe
train_dp = fileset_train_df.to_torch_datapipe(batch_size=32*4, shuffle=True, drop_last_batch=False)
# Shard the training DataPipe so each GPU gets a different subset of data
train_dp = train_dp.sharding_filter()

# Pass PyTorch DataPipe to Pytorch DataLoader
dataloader = DataLoader(train_dp, batch_size=None, num_workers=0)


# Next Steps

Do the wrapper function

Do it in a SProc

Do it in SPCS

Do it in SPCS with DDP

# Appendix

In [3]:
# Get Training + Testing Datasets
#with FileLock("/tmp/data.lock"):
training_data = datasets.FashionMNIST(root="/tmp/data", train=True, download=True, transform=ToTensor(),)
test_data = datasets.FashionMNIST(root="/tmp/data", train=False, download=True, transform=ToTensor(),)

In [7]:
training_data.data.numpy().shape

(60000, 28, 28)

In [3]:
# Model Definition & Training
class MyModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.two_hidden_layer_nn = nn.Sequential(
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )

    def forward(self, x):
        x = self.flatten(x)
        digits = self.two_hidden_layer_nn(x)
        return digits

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)
        pred = model(X)
        loss = loss_fn(pred, y)

        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            print(f"loss: {loss.item()} [{(batch+1)* len(X)}/{size}]")

def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

rank = int(os.environ['LOCAL_RANK']) # Used to identify the local node
world_size = int(os.environ['WORLD_SIZE']) # Total number of CPUs/GPUs available
dist.init_process_group("gloo", rank=rank, world_size=world_size)

model = MyModel()
batch_size = 64
device = 'cpu'
model = DDP(model)

print(model)
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
test_dataloader = DataLoader(test_data, batch_size=batch_size)

for X, y in train_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

epochs = 3
for ep in range(epochs):
    print(f"Epoch {ep}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)

In [None]:
class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.model = nn.Sequential(nn.Linear(26, 1), nn.ReLU())

    def forward(self, tensor:torch.Tensor):
        return self.model(tensor)


def get_batch(batch):
    X_batch = torch.column_stack(
        (
            batch["CUT_OE_IDEAL"],
            batch["CUT_OE_PREMIUM"],
            batch["CUT_OE_VERY_GOOD"],
            batch["CUT_OE_GOOD"],
            batch["CUT_OE_FAIR"],
            batch["COLOR_OE_D"],
            batch["COLOR_OE_E"],
            batch["COLOR_OE_F"],
            batch["COLOR_OE_G"],
            batch["COLOR_OE_H"],
            batch["COLOR_OE_I"],
            batch["COLOR_OE_J"],
            batch["CLARITY_OE_IF"],
            batch["CLARITY_OE_VVS1"],
            batch["CLARITY_OE_VVS2"],
            batch["CLARITY_OE_VS1"],
            batch["CLARITY_OE_VS2"],
            batch["CLARITY_OE_SI1"],
            batch["CLARITY_OE_SI2"],
            batch["CLARITY_OE_I1"],
            batch["CLARITY_OE_I2"],
            batch["CLARITY_OE_I3"],
            batch["CARAT"],
            batch["X"],
            batch["Y"],
            batch["Z"],
        )
    )

    return X_batch

STAGE_NAME = 'FILESET_DEMO'
device = 'cpu'

print(f"rank = {rank}, using device {device}")

# Use FileSet to get data from a Snowflake table in the form of files in an internal server-side excrypted stage
fileset_train_df = fileset.FileSet(
    target_stage_loc=f"@{session.get_current_database()}.{session.get_current_schema()}.{STAGE_NAME}/",
    name="diamonds_train",
    snowpark_session=session,
)

# Feed Training FileSet to Pytorch
# Get PyTorch DataPipe
train_dp = fileset_train_df.to_torch_datapipe(batch_size=32*4, shuffle=True, drop_last_batch=False)
# Shard the training DataPipe so each GPU gets a different subset of data
train_dp = train_dp.sharding_filter()

# Pass PyTorch DataPipe to Pytorch DataLoader
dataloader = DataLoader(train_dp, batch_size=None, num_workers=0)

# Define model & training params
model = MyModel()
model = model.to(device)

n_epochs = 100
loss_fn = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)

start_time = time.time()
# Training step
for epoch in range(n_epochs):
    current_loss = 0.0

    for i, batch in enumerate(dataloader):
        X_batch = get_batch(batch)
        y_batch = torch.column_stack((batch["PRICE"],))

        X_batch = X_batch.to(device)
        y_batch = y_batch.to(device)

        # forward pass
        y_pred = model(X_batch)

        # compute loss
        loss = loss_fn(y_pred, y_batch)

        # backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        current_loss += loss.item()

    if epoch % 10 == 0:
       print(f"[RANK = {rank}] Loss after epoch {epoch}: {current_loss / 32*4}")
       for param in model.parameters():
            print(param.data)

end_time = time.time()