# PyTorch DDP Fashion MNIST Training Example

This example demonstrates how to train a convolutional neural network (CNN) to classify images using the [Fashion MNIST](https://github.com/zalandoresearch/fashion-mnist) dataset and [PyTorch Distributed Data Parallel (DDP)](https://pytorch.org/tutorials/intermediate/ddp_tutorial.html).

This notebook walks you through running that example locally, and how to easily scale PyTorch DDP across multiple nodes with Kubeflow TrainJob.

## Install the Kubeflow SDK

You need to install the Kubeflow SDK to interact with Kubeflow Trainer APIs:

In [None]:
# !pip install git+https://github.com/kubeflow/sdk.git@main#subdirectory=python

## Install the PyTorch Dependencies

You also need to install PyTorch and Torchvision to be able to run the example locally:

In [None]:
!pip install torch==2.5.0
!pip install torchvision==0.20.0

## Define the Training Function

The first step is to create function to train CNN model using Fashion MNIST data.

In [2]:
def train_fashion_mnist():
    import os

    import torch
    import torch.distributed as dist
    import torch.nn.functional as F
    from torch import nn
    from torch.utils.data import DataLoader, DistributedSampler
    from torchvision import datasets, transforms

    # Define the PyTorch CNN model to be trained
    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 20, 5, 1)
            self.conv2 = nn.Conv2d(20, 50, 5, 1)
            self.fc1 = nn.Linear(4 * 4 * 50, 500)
            self.fc2 = nn.Linear(500, 10)

        def forward(self, x):
            x = F.relu(self.conv1(x))
            x = F.max_pool2d(x, 2, 2)
            x = F.relu(self.conv2(x))
            x = F.max_pool2d(x, 2, 2)
            x = x.view(-1, 4 * 4 * 50)
            x = F.relu(self.fc1(x))
            x = self.fc2(x)
            return F.log_softmax(x, dim=1)

    # Use NCCL if a GPU is available, otherwise use Gloo as communication backend.
    device, backend = ("cuda", "nccl") if torch.cuda.is_available() else ("cpu", "gloo")
    print(f"Using Device: {device}, Backend: {backend}")

    # Setup PyTorch distributed.
    local_rank = int(os.getenv("LOCAL_RANK", 0))
    dist.init_process_group(backend=backend)
    print(
        "Distributed Training for WORLD_SIZE: {}, RANK: {}, LOCAL_RANK: {}".format(
            dist.get_world_size(),
            dist.get_rank(),
            local_rank,
        )
    )

    # Create the model and load it into the device.
    device = torch.device(f"{device}:{local_rank}")
    model = nn.parallel.DistributedDataParallel(Net().to(device))
    optimizer = torch.optim.SGD(model.parameters(), lr=0.1, momentum=0.9)

    
    # Download FashionMNIST dataset only on local_rank=0 process.
    if local_rank == 0:
        dataset = datasets.FashionMNIST(
            "./data",
            train=True,
            download=True,
            transform=transforms.Compose([transforms.ToTensor()]),
        )
    dist.barrier()
    dataset = datasets.FashionMNIST(
        "./data",
        train=True,
        download=False,
        transform=transforms.Compose([transforms.ToTensor()]),
    )


    # Shard the dataset accross workers.
    train_loader = DataLoader(
        dataset,
        batch_size=100,
        sampler=DistributedSampler(dataset)
    )

    # TODO(astefanutti): add parameters to the training function
    dist.barrier()
    for epoch in range(1, 3):
        model.train()

        # Iterate over mini-batches from the training set
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            # Copy the data to the GPU device if available
            inputs, labels = inputs.to(device), labels.to(device)
            # Forward pass
            outputs = model(inputs)
            loss = F.nll_loss(outputs, labels)
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch_idx % 10 == 0 and dist.get_rank() == 0:
                print(
                    "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                        epoch,
                        batch_idx * len(inputs),
                        len(train_loader.dataset),
                        100.0 * batch_idx / len(train_loader),
                        loss.item(),
                    )
                )

    # Wait for the distributed training to complete
    dist.barrier()
    if dist.get_rank() == 0:
        print("Training is finished")

    # Finally clean up PyTorch distributed
    dist.destroy_process_group()

## Dry-run the Training Locally

We are going to download Fashion MNIST Dataset and start local training.

In [3]:
import os

# Set the Torch Distributed env variables so the training function can be run locally in the Notebook.
# See https://pytorch.org/docs/stable/elastic/run.html#environment-variables
os.environ["RANK"] = "0"
os.environ["LOCAL_RANK"] = "0"
os.environ["WORLD_SIZE"] = "1"
os.environ["MASTER_ADDR"] = "localhost"
os.environ["MASTER_PORT"] = "1234"

# Run the training function locally.
train_fashion_mnist()

Using Device: cpu, Backend: gloo
Distributed Training for WORLD_SIZE: 1, RANK: 0, LOCAL_RANK: 0
Training is finished


## Scale PyTorch DDP with Kubeflow TrainJob

You can use `TrainerClient()` from the Kubeflow SDK to communicate with Kubeflow Trainer APIs and scale your training function across multiple PyTorch training nodes.

`TrainerClient()` verifies that you have required access to the Kubernetes cluster.

Kubeflow Trainer creates a `TrainJob` resource and automatically sets the appropriate environment variables to set up PyTorch in distributed environment.



In [4]:
from kubeflow.trainer import CustomTrainer, TrainerClient

client = TrainerClient()

## List the Training Runtimes

You can get the list of available Training Runtimes to start your TrainJob.

Additionally, it might show available accelerator type and number of available resources.

In [5]:
for runtime in client.list_runtimes():
    print(runtime)
    if runtime.name == "torch-distributed":
        torch_runtime = runtime

Runtime(name='torch-distributed', trainer=Trainer(trainer_type=<TrainerType.CUSTOM_TRAINER: 'CustomTrainer'>, framework=<Framework.TORCH: 'torch'>, entrypoint='torchrun', accelerator='gpu-tesla-v100-16gb', accelerator_count='4.0'), pretrained_model=None)


## Run the Distributed TrainJob

Kubeflow TrainJob will train the above model on 3 PyTorch nodes.

In [6]:
job_name = client.train(
    trainer=CustomTrainer(
        func=train_fashion_mnist,
        # Set how many PyTorch nodes you want to use for distributed training.
        num_nodes=3,
        # Set the resources for each PyTorch node.
        resources_per_node={
            "cpu": 3,
            "memory": "16Gi",
            # Uncomment this to distribute the TrainJob using GPU nodes.
            # "nvidia.com/gpu": 1,
        },
    ),
    runtime=torch_runtime,
)

## Check the TrainJob steps

You can check the components of TrainJob that's created.

Since the TrainJob performs distributed training across 3 nodes, it generates 3 steps: `trainer-node-0` .. `trainer-node-2`.

You can get the individual status for each of these steps.

In [7]:
import time

def wait_for_job_running():
    for _ in range(100):
        trainjob = client.get_job(name=job_name)
        for c in trainjob.steps:
            if c.name == "trainer-node-0" and c.status == "Running":
                return
        print("Wait for TrainJob running status. Sleep for 5 seconds")
        time.sleep(5)

In [8]:
# TODO (andreyvelich): Use wait_for_job_status API from TrainerClient() when it is implemented.
wait_for_job_running()

In [9]:
for c in client.get_job(name=job_name).steps:
    print(f"Step: {c.name}, Status: {c.status}, Devices: {c.device} x {c.device_count}\n")

Step: trainer-node-0, Status: Running, Devices: gpu x 1

Step: trainer-node-1, Status: Running, Devices: gpu x 1

Step: trainer-node-2, Status: Running, Devices: gpu x 1



## Watch the TrainJob logs

We can use the `get_job_logs()` API to get the TrainJob logs.

Since we run training on 3 GPUs, every PyTorch node uses 60,000/3 = 20,000 images from the dataset.

In [10]:
_ = client.get_job_logs(job_name, follow=True)

[trainer-node-0]: Using Device: cuda, Backend: nccl
[trainer-node-0]: Distributed Training for WORLD_SIZE: 3, RANK: 0, LOCAL_RANK: 0
[trainer-node-0]: Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz
[trainer-node-0]: Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz to ./data/FashionMNIST/raw/train-images-idx3-ubyte.gz
100%|██████████| 26.4M/26.4M [00:02<00:00, 12.5MB/s]
[trainer-node-0]: Extracting ./data/FashionMNIST/raw/train-images-idx3-ubyte.gz to ./data/FashionMNIST/raw
[trainer-node-0]: Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz
[trainer-node-0]: Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz to ./data/FashionMNIST/raw/train-labels-idx1-ubyte.gz
100%|██████████| 29.5k/29.5k [00:00<00:00, 211kB/s]
[trainer-node-0]: Extracting ./data/FashionMNIST/raw/train-labels-idx1-ubyte.gz to 

## Delete the TrainJob

When TrainJob is finished, you can delete the resource.


In [None]:
# client.delete_job(job_name)