<a href="https://colab.research.google.com/github/clearspandex/distributed-ml-ray/blob/main/notebooks/ray_train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install ray

In [1]:
import ray
import ray.train as train
from ray.train.trainer import Trainer
from ray.train.callbacks import JsonLoggerCallback

ray.init()

# in practice you will want to spin up a multi-GPU cluster: https://docs.ray.io/en/latest/cluster/quickstart.html
trainer = Trainer(backend='torch', num_workers=2, use_gpu=False)
trainer.start()

2022-07-01 06:32:28,586	INFO trainer.py:243 -- Trainer logs will be logged in: /root/ray_results/train_2022-07-01_06-32-28
[2m[36m(BaseWorkerMixin pid=2845)[0m 2022-07-01 06:32:37,720	INFO torch.py:347 -- Setting up process group for: env:// [rank=0, world_size=2]
[2m[36m(BaseWorkerMixin pid=2846)[0m 2022-07-01 06:32:37,889	INFO torch.py:347 -- Setting up process group for: env:// [rank=1, world_size=2]


# Ray Train

In [13]:
# define model architecture
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms

class LeNet(nn.Module):
    def __init__(self):
        super().__init__()

        self.stack = nn.Sequential(
            nn.Conv2d(1, 6, 5),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(6, 16, 5),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Flatten(start_dim=1),
            nn.Linear(256, 120),
            nn.ReLU(),
            nn.Linear(120, 84),
            nn.ReLU(),
            nn.Linear(84, 10)
        )
                
    def forward(self, x):
        return self.stack(x)

In [3]:
training_data = datasets.FashionMNIST(
    root="~/data",
    train=True,
    download=True,
    transform=transforms.ToTensor(),
)

# Download test data from open datasets.
test_data = datasets.FashionMNIST(
    root="~/data",
    train=False,
    download=True,
    transform=transforms.ToTensor(),
)

In [14]:
# Setup function to train and eval for one epoch

def train_epoch(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset) // train.world_size()
    model.train()

    for batch, (X, y) in enumerate(dataloader):
        pred = model(X)
        loss = loss_fn(pred, y)

        # backpropagate gradients
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)

            # print logs to local node
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def validate_epoch(dataloader, model, loss_fn):
    size = len(dataloader.dataset) // train.world_size()
    num_batches = len(dataloader)

    # put PyTorch network in eval mode
    model.eval()
    test_loss, correct = 0, 0

    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
            
    test_loss /= num_batches
    correct /= size
    print(
        f"Test Error: \n "
        f"Accuracy: {(100 * correct):>0.1f}%, "
        f"Avg loss: {test_loss:>8f} \n"
    )
    return test_loss

In [15]:
def train_func():
    batch_size = 64
    epochs = 5

    worker_batch_size = batch_size // train.world_size()

    # setup DataLoaders
    train_dataloader = DataLoader(training_data, batch_size=worker_batch_size)
    train_dataloader = train.torch.prepare_data_loader(train_dataloader)

    test_dataloader = DataLoader(test_data, batch_size=worker_batch_size)
    test_dataloader = train.torch.prepare_data_loader(test_dataloader)

    # setup model
    net = LeNet()
    model = train.torch.prepare_model(net)

    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001)

    losses = []

    for i in range(epochs):
        train_epoch(train_dataloader, model, loss_fn, optimizer)
        loss = validate_epoch(test_dataloader, model, loss_fn)
        train.report(loss=loss)
        losses.append(loss)

    return losses

In [16]:
result = trainer.run(
    train_func=train_func,
    callbacks=[JsonLoggerCallback()],
)

2022-07-01 06:38:20,355	INFO trainer.py:249 -- Run results will be logged in: /root/ray_results/train_2022-07-01_06-32-28/run_002
[2m[36m(BaseWorkerMixin pid=2846)[0m 2022-07-01 06:38:20,953	INFO torch.py:98 -- Moving model to device: cpu
[2m[36m(BaseWorkerMixin pid=2846)[0m 2022-07-01 06:38:20,953	INFO torch.py:132 -- Wrapping provided model in DDP.
[2m[36m(BaseWorkerMixin pid=2845)[0m 2022-07-01 06:38:20,949	INFO torch.py:98 -- Moving model to device: cpu
[2m[36m(BaseWorkerMixin pid=2845)[0m 2022-07-01 06:38:20,950	INFO torch.py:132 -- Wrapping provided model in DDP.


[2m[36m(BaseWorkerMixin pid=2846)[0m loss: 2.298295  [    0/30000]
[2m[36m(BaseWorkerMixin pid=2845)[0m loss: 2.301855  [    0/30000]
[2m[36m(BaseWorkerMixin pid=2846)[0m loss: 2.305949  [ 3200/30000]
[2m[36m(BaseWorkerMixin pid=2845)[0m loss: 2.306239  [ 3200/30000]
[2m[36m(BaseWorkerMixin pid=2846)[0m loss: 2.326731  [ 6400/30000]
[2m[36m(BaseWorkerMixin pid=2845)[0m loss: 2.289704  [ 6400/30000]
[2m[36m(BaseWorkerMixin pid=2846)[0m loss: 2.317367  [ 9600/30000]
[2m[36m(BaseWorkerMixin pid=2845)[0m loss: 2.312324  [ 9600/30000]
[2m[36m(BaseWorkerMixin pid=2846)[0m loss: 2.303359  [12800/30000]
[2m[36m(BaseWorkerMixin pid=2845)[0m loss: 2.284207  [12800/30000]
[2m[36m(BaseWorkerMixin pid=2846)[0m loss: 2.298878  [16000/30000]
[2m[36m(BaseWorkerMixin pid=2845)[0m loss: 2.303739  [16000/30000]
[2m[36m(BaseWorkerMixin pid=2846)[0m loss: 2.305996  [19200/30000]
[2m[36m(BaseWorkerMixin pid=2845)[0m loss: 2.300008  [19200/30000]
[2m[36m(BaseWorker

In [17]:
trainer.shutdown()
print(f"Loss results: {result}")

[2m[36m(BaseWorkerMixin pid=2846)[0m Test Error: 
[2m[36m(BaseWorkerMixin pid=2846)[0m  Accuracy: 21.0%, Avg loss: 2.272009 
[2m[36m(BaseWorkerMixin pid=2846)[0m 
[2m[36m(BaseWorkerMixin pid=2845)[0m Test Error: 
[2m[36m(BaseWorkerMixin pid=2845)[0m  Accuracy: 21.2%, Avg loss: 2.270097 
[2m[36m(BaseWorkerMixin pid=2845)[0m 
Loss results: [[2.3013921482547834, 2.298111924699917, 2.2931637688047566, 2.2853394526584894, 2.2700969322471862], [2.3025391997805067, 2.2992611675505428, 2.294413719966913, 2.2867955037742664, 2.272008587600319]]
