From b7eb886608413704d3af90ed7f2032ff5d0fdeae Mon Sep 17 00:00:00 2001 From: David Grove Date: Thu, 27 Mar 2025 12:17:25 -0400 Subject: [PATCH] Details of PyTorch sample --- setup.KubeConEU25/README.md | 424 ++++++++++++++++- .../sample-jobs/pytorch-training.ipynb | 450 ++++++++++++++++++ 2 files changed, 870 insertions(+), 4 deletions(-) create mode 100644 setup.KubeConEU25/sample-jobs/pytorch-training.ipynb diff --git a/setup.KubeConEU25/README.md b/setup.KubeConEU25/README.md index dcd9998..e2d208c 100644 --- a/setup.KubeConEU25/README.md +++ b/setup.KubeConEU25/README.md @@ -805,14 +805,430 @@ kubectl delete --as alice -n blue appwrapper batch-inference ### Pre-Training with PyTorch -In this example, `alice` uses the [Kubeflow Training -Operator](https://github.com/kubeflow/training-operator) to run a job that uses -[PyTorch](https://pytorch.org) to train a machine learning model. +In this example, `alice` uses the [Kubeflow Trainer](https://github.com/kubeflow/trainer) +to run a job that uses [PyTorch](https://pytorch.org) to train a machine learning model.
-TODO +This example was constructed by converting a [PyTorch tutorial on FSDP]((https://pytorch.org/tutorials/intermediate/FSDP_tutorial.html)) +into a KubeFlow Trainer [notebook](./sample-jobs/pytorch-training.ipynb) that we used to generate +the yaml for a `PyTorchJob`. The YAML generated by running the notebook was then put inside an +`AppWrapper` using MLBatch's [awpack tool](../tools/appwrapper-packager/awpack.py) to produce the final YAML +that we will apply by executing the command below. +```sh +kubectl apply --as alice -n blue -f- << EOF +apiVersion: workload.codeflare.dev/v1beta2 +kind: AppWrapper +metadata: + name: pytorch-mnist-training + labels: + kueue.x-k8s.io/queue-name: default-queue +spec: + components: + - template: + apiVersion: kubeflow.org/v1 + kind: PyTorchJob + metadata: + name: mnist-training + spec: + nprocPerNode: "2" + pytorchReplicaSpecs: + Master: + replicas: 1 + template: + metadata: + annotations: + sidecar.istio.io/inject: "false" + spec: + containers: + - args: + - |2- + program_path=$(mktemp -d) + read -r -d '' SCRIPT << EOM + def train_function(parameters): + import os + import time + import functools + import torch + import torch.nn as nn + import torch.nn.functional as F + import torch.optim as optim + from torchvision import datasets, transforms + from torch.optim.lr_scheduler import StepLR + import torch.distributed as dist + import torch.distributed as dist + import torch.multiprocessing as mp + from torch.nn.parallel import DistributedDataParallel as DDP + from torch.utils.data.distributed import DistributedSampler + from torch.distributed.fsdp import FullyShardedDataParallel as FSDP + from torch.distributed.fsdp.fully_sharded_data_parallel import ( + CPUOffload, + BackwardPrefetch, + ) + from torch.distributed.fsdp.wrap import ( + size_based_auto_wrap_policy, + enable_wrap, + wrap, + ) + class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 32, 3, 1) + self.conv2 = nn.Conv2d(32, 64, 3, 1) + self.dropout1 = nn.Dropout(0.25) + self.dropout2 = nn.Dropout(0.5) + self.fc1 = nn.Linear(9216, 128) + self.fc2 = nn.Linear(128, 10) + def forward(self, x): + x = self.conv1(x) + x = F.relu(x) + x = self.conv2(x) + x = F.relu(x) + x = F.max_pool2d(x, 2) + x = self.dropout1(x) + x = torch.flatten(x, 1) + x = self.fc1(x) + x = F.relu(x) + x = self.dropout2(x) + x = self.fc2(x) + output = F.log_softmax(x, dim=1) + return output + def train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=None): + model.train() + ddp_loss = torch.zeros(2).to(rank) + if sampler: + sampler.set_epoch(epoch) + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(rank), target.to(rank) + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target, reduction='sum') + loss.backward() + optimizer.step() + ddp_loss[0] += loss.item() + ddp_loss[1] += len(data) + dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) + if rank == 0: + print('Train Epoch: {} \tLoss: {:.6f}'.format(epoch, ddp_loss[0] / ddp_loss[1])) + def test(model, rank, world_size, test_loader): + model.eval() + correct = 0 + ddp_loss = torch.zeros(3).to(rank) + with torch.no_grad(): + for data, target in test_loader: + data, target = data.to(rank), target.to(rank) + output = model(data) + ddp_loss[0] += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss + pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability + ddp_loss[1] += pred.eq(target.view_as(pred)).sum().item() + ddp_loss[2] += len(data) + dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) + if rank == 0: + test_loss = ddp_loss[0] / ddp_loss[2] + print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format( + test_loss, int(ddp_loss[1]), int(ddp_loss[2]), + 100. * ddp_loss[1] / ddp_loss[2])) + # [1] Setup PyTorch distributed and get the distributed parameters. + torch.manual_seed(parameters["seed"]) + dist.init_process_group("nccl") + local_rank = int(os.environ["LOCAL_RANK"]) + rank = dist.get_rank() + world_size = dist.get_world_size() + # Local rank identifies the GPU number inside the pod. + torch.cuda.set_device(local_rank) + print( + f"FSDP Training for WORLD_SIZE: {world_size}, RANK: {rank}, LOCAL_RANK: {local_rank}" + ) + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ]) + dataset1 = datasets.MNIST('/tmp/data', train=True, download=True, + transform=transform) + dataset2 = datasets.MNIST('/tmp/data', train=False, + transform=transform) + sampler1 = DistributedSampler(dataset1, rank=rank, num_replicas=world_size, shuffle=True) + sampler2 = DistributedSampler(dataset2, rank=rank, num_replicas=world_size) + train_kwargs = {'batch_size': parameters["batch-size"], 'sampler': sampler1} + test_kwargs = {'batch_size': parameters["test-batch-size"], 'sampler': sampler2} + cuda_kwargs = {'num_workers': 2, + 'pin_memory': True, + 'shuffle': False} + train_kwargs.update(cuda_kwargs) + test_kwargs.update(cuda_kwargs) + train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs) + test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs) + my_auto_wrap_policy = functools.partial( + size_based_auto_wrap_policy, min_num_params=100 + ) + init_start_event = torch.cuda.Event(enable_timing=True) + init_end_event = torch.cuda.Event(enable_timing=True) + model = Net().to(local_rank) + model = FSDP(model) + optimizer = optim.Adadelta(model.parameters(), lr=parameters["lr"]) + scheduler = StepLR(optimizer, step_size=1, gamma=parameters["gamma"]) + init_start_event.record() + for epoch in range(1, parameters["epochs"] + 1): + train(parameters, model, local_rank, world_size, train_loader, optimizer, epoch, sampler=sampler1) + test(model, local_rank, world_size, test_loader) + scheduler.step() + init_end_event.record() + if rank == 0: + init_end_event.synchronize() + print(f"CUDA event elapsed time: {init_start_event.elapsed_time(init_end_event) / 1000}sec") + print(f"{model}") + if parameters["save-model"]: + # use a barrier to make sure training is done on all ranks + dist.barrier() + states = model.state_dict() + if rank == 0: + torch.save(states, "mnist_cnn.pt") + train_function({'batch-size': 64, 'test-batch-size': 1000, 'epochs': 10, 'lr': 1.0, 'gamma': 0.7, 'seed': 1, 'save-model': False}) + EOM + printf "%s" "$SCRIPT" > "$program_path/ephemeral_script.py" + torchrun "$program_path/ephemeral_script.py" + command: + - bash + - -c + image: docker.io/pytorch/pytorch:2.1.2-cuda11.8-cudnn8-runtime + name: pytorch + resources: + limits: + nvidia.com/gpu: "2" + requests: + nvidia.com/gpu: "2" + Worker: + replicas: 1 + template: + metadata: + annotations: + sidecar.istio.io/inject: "false" + spec: + containers: + - args: + - |2- + program_path=$(mktemp -d) + read -r -d '' SCRIPT << EOM + def train_function(parameters): + import os + import time + import functools + import torch + import torch.nn as nn + import torch.nn.functional as F + import torch.optim as optim + from torchvision import datasets, transforms + from torch.optim.lr_scheduler import StepLR + import torch.distributed as dist + import torch.distributed as dist + import torch.multiprocessing as mp + from torch.nn.parallel import DistributedDataParallel as DDP + from torch.utils.data.distributed import DistributedSampler + from torch.distributed.fsdp import FullyShardedDataParallel as FSDP + from torch.distributed.fsdp.fully_sharded_data_parallel import ( + CPUOffload, + BackwardPrefetch, + ) + from torch.distributed.fsdp.wrap import ( + size_based_auto_wrap_policy, + enable_wrap, + wrap, + ) + class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 32, 3, 1) + self.conv2 = nn.Conv2d(32, 64, 3, 1) + self.dropout1 = nn.Dropout(0.25) + self.dropout2 = nn.Dropout(0.5) + self.fc1 = nn.Linear(9216, 128) + self.fc2 = nn.Linear(128, 10) + def forward(self, x): + x = self.conv1(x) + x = F.relu(x) + x = self.conv2(x) + x = F.relu(x) + x = F.max_pool2d(x, 2) + x = self.dropout1(x) + x = torch.flatten(x, 1) + x = self.fc1(x) + x = F.relu(x) + x = self.dropout2(x) + x = self.fc2(x) + output = F.log_softmax(x, dim=1) + return output + def train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=None): + model.train() + ddp_loss = torch.zeros(2).to(rank) + if sampler: + sampler.set_epoch(epoch) + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(rank), target.to(rank) + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target, reduction='sum') + loss.backward() + optimizer.step() + ddp_loss[0] += loss.item() + ddp_loss[1] += len(data) + dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) + if rank == 0: + print('Train Epoch: {} \tLoss: {:.6f}'.format(epoch, ddp_loss[0] / ddp_loss[1])) + def test(model, rank, world_size, test_loader): + model.eval() + correct = 0 + ddp_loss = torch.zeros(3).to(rank) + with torch.no_grad(): + for data, target in test_loader: + data, target = data.to(rank), target.to(rank) + output = model(data) + ddp_loss[0] += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss + pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability + ddp_loss[1] += pred.eq(target.view_as(pred)).sum().item() + ddp_loss[2] += len(data) + dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM) + if rank == 0: + test_loss = ddp_loss[0] / ddp_loss[2] + print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\n'.format( + test_loss, int(ddp_loss[1]), int(ddp_loss[2]), + 100. * ddp_loss[1] / ddp_loss[2])) + # [1] Setup PyTorch distributed and get the distributed parameters. + torch.manual_seed(parameters["seed"]) + dist.init_process_group("nccl") + local_rank = int(os.environ["LOCAL_RANK"]) + rank = dist.get_rank() + world_size = dist.get_world_size() + # Local rank identifies the GPU number inside the pod. + torch.cuda.set_device(local_rank) + print( + f"FSDP Training for WORLD_SIZE: {world_size}, RANK: {rank}, LOCAL_RANK: {local_rank}" + ) + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ]) + dataset1 = datasets.MNIST('/tmp/data', train=True, download=True, + transform=transform) + dataset2 = datasets.MNIST('/tmp/data', train=False, + transform=transform) + sampler1 = DistributedSampler(dataset1, rank=rank, num_replicas=world_size, shuffle=True) + sampler2 = DistributedSampler(dataset2, rank=rank, num_replicas=world_size) + train_kwargs = {'batch_size': parameters["batch-size"], 'sampler': sampler1} + test_kwargs = {'batch_size': parameters["test-batch-size"], 'sampler': sampler2} + cuda_kwargs = {'num_workers': 2, + 'pin_memory': True, + 'shuffle': False} + train_kwargs.update(cuda_kwargs) + test_kwargs.update(cuda_kwargs) + train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs) + test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs) + my_auto_wrap_policy = functools.partial( + size_based_auto_wrap_policy, min_num_params=100 + ) + init_start_event = torch.cuda.Event(enable_timing=True) + init_end_event = torch.cuda.Event(enable_timing=True) + model = Net().to(local_rank) + model = FSDP(model) + optimizer = optim.Adadelta(model.parameters(), lr=parameters["lr"]) + scheduler = StepLR(optimizer, step_size=1, gamma=parameters["gamma"]) + init_start_event.record() + for epoch in range(1, parameters["epochs"] + 1): + train(parameters, model, local_rank, world_size, train_loader, optimizer, epoch, sampler=sampler1) + test(model, local_rank, world_size, test_loader) + scheduler.step() + init_end_event.record() + if rank == 0: + init_end_event.synchronize() + print(f"CUDA event elapsed time: {init_start_event.elapsed_time(init_end_event) / 1000}sec") + print(f"{model}") + if parameters["save-model"]: + # use a barrier to make sure training is done on all ranks + dist.barrier() + states = model.state_dict() + if rank == 0: + torch.save(states, "mnist_cnn.pt") + train_function({'batch-size': 64, 'test-batch-size': 1000, 'epochs': 10, 'lr': 1.0, 'gamma': 0.7, 'seed': 1, 'save-model': False}) + EOM + printf "%s" "$SCRIPT" > "$program_path/ephemeral_script.py" + torchrun "$program_path/ephemeral_script.py" + command: + - bash + - -c + image: docker.io/pytorch/pytorch:2.1.2-cuda11.8-cudnn8-runtime + name: pytorch + resources: + limits: + nvidia.com/gpu: "2" + requests: + nvidia.com/gpu: "2" + runPolicy: + suspend: false +EOF +``` + +This will created 2 Pods, each requesting 2 GPUs. On our cluster, it will take about 30 seconds +to execute this `PyTorchJob`. We can check on the status of the PyTorchJob by using the command: + +```sh +kubectl get pytorchjob -n blue +``` + +After the jobs completes, we can get the log of the worker Pod with + +```sh +kubectl logs mnist-training-worker-0 -n blue +``` + +At the beginning of the log we can see messages from each Python process +with its rank information: +```sh +... +FSDP Training for WORLD_SIZE: 4, RANK: 3, LOCAL_RANK: 1 +... +FSDP Training for WORLD_SIZE: 4, RANK: 2, LOCAL_RANK: 0 +``` +And at the end of the log, we can see the messages from the `LOCAL_RANK` `0` +process summarizing each epoch: +```sh +... + +Train Epoch: 1 Loss: 0.247396 +Test set: Average loss: 0.0498, Accuracy: 9824/10000 (98.24%) + +Train Epoch: 2 Loss: 0.070375 +Test set: Average loss: 0.0355, Accuracy: 9874/10000 (98.74%) + +Train Epoch: 3 Loss: 0.047944 +Test set: Average loss: 0.0291, Accuracy: 9900/10000 (99.00%) + +Train Epoch: 4 Loss: 0.038316 +Test set: Average loss: 0.0282, Accuracy: 9906/10000 (99.06%) + +Train Epoch: 5 Loss: 0.032751 +Test set: Average loss: 0.0276, Accuracy: 9906/10000 (99.06%) + +Train Epoch: 6 Loss: 0.028068 +Test set: Average loss: 0.0275, Accuracy: 9905/10000 (99.05%) + +Train Epoch: 7 Loss: 0.028161 +Test set: Average loss: 0.0254, Accuracy: 9916/10000 (99.16%) + +Train Epoch: 8 Loss: 0.025051 +Test set: Average loss: 0.0260, Accuracy: 9911/10000 (99.11%) + +Train Epoch: 9 Loss: 0.023851 +Test set: Average loss: 0.0264, Accuracy: 9916/10000 (99.16%) + +Train Epoch: 10 Loss: 0.023334 +Test set: Average loss: 0.0255, Accuracy: 9916/10000 (99.16%) +``` + +When we are all done, we can delete the completed `AppWrapper` with: + +```sh + kubectl delete appwrapper pytorch-mnist-training -n blue +```
### Fine-Tuning with Ray diff --git a/setup.KubeConEU25/sample-jobs/pytorch-training.ipynb b/setup.KubeConEU25/sample-jobs/pytorch-training.ipynb new file mode 100644 index 0000000..2fd86b6 --- /dev/null +++ b/setup.KubeConEU25/sample-jobs/pytorch-training.ipynb @@ -0,0 +1,450 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Tune Model on MNIST dataset using PyTorchJob and FSDP" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This Notebook will tune a small model on the MNIST dataset using FSDP.\n", + "\n", + "This Notebook will use **4** GPUs to train the model on 2 Nodes. This example is based on [the official PyTorch FSDP tutorial](https://pytorch.org/tutorials/intermediate/FSDP_tutorial.html)." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## FSDP with multi-node multi-worker training" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This Notebook demonstrates multi-node, multi-worker distributed training with Fully Sharded Data Parallel (FSDP) and PyTorchJob.\n", + "\n", + "When a model is trained with FSDP, the GPU memory footprint is smaller compare to Distributed Data Parallel (DDP),\n", + "as the model parameters are sharded across GPU devices.\n", + "\n", + "This enables training of very large models that would otherwise be impossible to fit on a single GPU device.\n", + "\n", + "Check this guide to learn more about PyTorch FSDP: https://pytorch.org/tutorials/intermediate/FSDP_tutorial.html\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "tags": [] + }, + "source": [ + "## Install the required packages\n", + "\n", + "Install the Kubeflow Training Python SDK." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# TODO (andreyvelich): Use the release version of SDK.\n", + "!pip install git+https://github.com/kubeflow/training-operator.git#subdirectory=sdk/python" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create script to train using MNIST using FSDP\n", + "\n", + "We need to wrap our fine-tuning script in a function to create Kubeflow PyTorchJob." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "def train_function(parameters):\n", + " import os\n", + " import time\n", + " import functools\n", + "\n", + " import torch\n", + " import torch.nn as nn\n", + " import torch.nn.functional as F\n", + " import torch.optim as optim\n", + " from torchvision import datasets, transforms\n", + "\n", + " from torch.optim.lr_scheduler import StepLR\n", + "\n", + " import torch.distributed as dist\n", + " import torch.distributed as dist\n", + " import torch.multiprocessing as mp\n", + " from torch.nn.parallel import DistributedDataParallel as DDP\n", + " from torch.utils.data.distributed import DistributedSampler\n", + " from torch.distributed.fsdp import FullyShardedDataParallel as FSDP\n", + " from torch.distributed.fsdp.fully_sharded_data_parallel import (\n", + " CPUOffload,\n", + " BackwardPrefetch,\n", + " )\n", + " from torch.distributed.fsdp.wrap import (\n", + " size_based_auto_wrap_policy,\n", + " enable_wrap,\n", + " wrap,\n", + " )\n", + "\n", + " class Net(nn.Module):\n", + " def __init__(self):\n", + " super(Net, self).__init__()\n", + " self.conv1 = nn.Conv2d(1, 32, 3, 1)\n", + " self.conv2 = nn.Conv2d(32, 64, 3, 1)\n", + " self.dropout1 = nn.Dropout(0.25)\n", + " self.dropout2 = nn.Dropout(0.5)\n", + " self.fc1 = nn.Linear(9216, 128)\n", + " self.fc2 = nn.Linear(128, 10)\n", + "\n", + " def forward(self, x):\n", + "\n", + " x = self.conv1(x)\n", + " x = F.relu(x)\n", + " x = self.conv2(x)\n", + " x = F.relu(x)\n", + " x = F.max_pool2d(x, 2)\n", + " x = self.dropout1(x)\n", + " x = torch.flatten(x, 1)\n", + " x = self.fc1(x)\n", + " x = F.relu(x)\n", + " x = self.dropout2(x)\n", + " x = self.fc2(x)\n", + " output = F.log_softmax(x, dim=1)\n", + " return output\n", + " \n", + "\n", + " def train(args, model, rank, world_size, train_loader, optimizer, epoch, sampler=None):\n", + " model.train()\n", + " ddp_loss = torch.zeros(2).to(rank)\n", + " if sampler:\n", + " sampler.set_epoch(epoch)\n", + " for batch_idx, (data, target) in enumerate(train_loader):\n", + " data, target = data.to(rank), target.to(rank)\n", + " optimizer.zero_grad()\n", + " output = model(data)\n", + " loss = F.nll_loss(output, target, reduction='sum')\n", + " loss.backward()\n", + " optimizer.step()\n", + " ddp_loss[0] += loss.item()\n", + " ddp_loss[1] += len(data)\n", + "\n", + " dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM)\n", + " if rank == 0:\n", + " print('Train Epoch: {} \\tLoss: {:.6f}'.format(epoch, ddp_loss[0] / ddp_loss[1]))\n", + " \n", + " def test(model, rank, world_size, test_loader):\n", + " model.eval()\n", + " correct = 0\n", + " ddp_loss = torch.zeros(3).to(rank)\n", + " with torch.no_grad():\n", + " for data, target in test_loader:\n", + " data, target = data.to(rank), target.to(rank)\n", + " output = model(data)\n", + " ddp_loss[0] += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss\n", + " pred = output.argmax(dim=1, keepdim=True) # get the index of the max log-probability\n", + " ddp_loss[1] += pred.eq(target.view_as(pred)).sum().item()\n", + " ddp_loss[2] += len(data)\n", + "\n", + " dist.all_reduce(ddp_loss, op=dist.ReduceOp.SUM)\n", + "\n", + " if rank == 0:\n", + " test_loss = ddp_loss[0] / ddp_loss[2]\n", + " print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.2f}%)\\n'.format(\n", + " test_loss, int(ddp_loss[1]), int(ddp_loss[2]),\n", + " 100. * ddp_loss[1] / ddp_loss[2]))\n", + "\n", + "\n", + " # [1] Setup PyTorch distributed and get the distributed parameters.\n", + " torch.manual_seed(parameters[\"seed\"])\n", + " dist.init_process_group(\"nccl\")\n", + " local_rank = int(os.environ[\"LOCAL_RANK\"])\n", + " rank = dist.get_rank()\n", + " world_size = dist.get_world_size()\n", + "\n", + " # Local rank identifies the GPU number inside the pod.\n", + " torch.cuda.set_device(local_rank)\n", + "\n", + " print(\n", + " f\"FSDP Training for WORLD_SIZE: {world_size}, RANK: {rank}, LOCAL_RANK: {local_rank}\"\n", + " )\n", + "\n", + " transform=transforms.Compose([\n", + " transforms.ToTensor(),\n", + " transforms.Normalize((0.1307,), (0.3081,))\n", + " ])\n", + "\n", + " dataset1 = datasets.MNIST('../data', train=True, download=True,\n", + " transform=transform)\n", + " dataset2 = datasets.MNIST('../data', train=False,\n", + " transform=transform)\n", + "\n", + " sampler1 = DistributedSampler(dataset1, rank=rank, num_replicas=world_size, shuffle=True)\n", + " sampler2 = DistributedSampler(dataset2, rank=rank, num_replicas=world_size)\n", + "\n", + " train_kwargs = {'batch_size': parameters[\"batch-size\"], 'sampler': sampler1}\n", + " test_kwargs = {'batch_size': parameters[\"test-batch-size\"], 'sampler': sampler2}\n", + " cuda_kwargs = {'num_workers': 2,\n", + " 'pin_memory': True,\n", + " 'shuffle': False}\n", + " train_kwargs.update(cuda_kwargs)\n", + " test_kwargs.update(cuda_kwargs)\n", + "\n", + " train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)\n", + " test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)\n", + " my_auto_wrap_policy = functools.partial(\n", + " size_based_auto_wrap_policy, min_num_params=100\n", + " )\n", + "\n", + " init_start_event = torch.cuda.Event(enable_timing=True)\n", + " init_end_event = torch.cuda.Event(enable_timing=True)\n", + "\n", + " model = Net().to(local_rank)\n", + "\n", + " model = FSDP(model)\n", + "\n", + " optimizer = optim.Adadelta(model.parameters(), lr=parameters[\"lr\"])\n", + "\n", + " scheduler = StepLR(optimizer, step_size=1, gamma=parameters[\"gamma\"])\n", + " init_start_event.record()\n", + " for epoch in range(1, parameters[\"epochs\"] + 1):\n", + " train(parameters, model, local_rank, world_size, train_loader, optimizer, epoch, sampler=sampler1)\n", + " test(model, local_rank, world_size, test_loader)\n", + " scheduler.step()\n", + "\n", + " init_end_event.record()\n", + "\n", + " if rank == 0:\n", + " init_end_event.synchronize()\n", + " print(f\"CUDA event elapsed time: {init_start_event.elapsed_time(init_end_event) / 1000}sec\")\n", + " print(f\"{model}\")\n", + "\n", + " if parameters[\"save-model\"]:\n", + " # use a barrier to make sure training is done on all ranks\n", + " dist.barrier()\n", + " states = model.state_dict()\n", + " if rank == 0:\n", + " torch.save(states, \"mnist_cnn.pt\")\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Create Kubeflow PyTorchJob to train on MNIST with FSDP\n", + "\n", + "Use `TrainingClient()` to create PyTorchJob which will train on **2 workers** using **2 GPU** for each worker.\n", + "\n", + "If you don't have enough GPU resources, you can decrease number of workers or number of GPUs per worker." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "from kubeflow.training import TrainingClient\n", + "\n", + "job_name = \"mnist-training\"\n", + "\n", + "parameters = {\n", + " \"batch-size\": 64,\n", + " \"test-batch-size\": 1000,\n", + " \"epochs\": 10,\n", + " \"lr\": 1.0,\n", + " \"gamma\": 0.7,\n", + " \"seed\": 1,\n", + " \"save-model\": False,\n", + "}\n" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "# Create the PyTorchJob.\n", + "TrainingClient().create_job(\n", + " name=job_name,\n", + " train_func=train_function,\n", + " parameters=parameters,\n", + " num_workers=2, # You can modify number of workers or number of GPUs.\n", + " num_procs_per_worker=2,\n", + " resources_per_worker={\"gpu\": 2},\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "tags": [] + }, + "source": [ + "### Check the PyTorchJob conditions\n", + "\n", + "Use `TrainingClient()` APIs to get information about created PyTorchJob." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "print(\"PyTorchJob Conditions\")\n", + "print(TrainingClient().get_job_conditions(job_name))\n", + "print(\"-\" * 40)\n", + "\n", + "# Wait until PyTorchJob has the Running condition.\n", + "job = TrainingClient().wait_for_job_conditions(\n", + " job_name,\n", + " expected_conditions={\"Running\"},\n", + ")\n", + "print(\"PyTorchJob is running\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Get the PyTorchJob pod names\n", + "\n", + "Since we define 2 workers, PyTorchJob will create 1 master pod and 1 worker pod to run FSDP fine-tuning." + ] + }, + { + "cell_type": "code", + "execution_count": 21, + "metadata": { + "tags": [] + }, + "outputs": [ + { + "data": { + "text/plain": [ + "['mnist-training-master-0', 'mnist-training-worker-0']" + ] + }, + "execution_count": 21, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "TrainingClient().get_job_pod_names(job_name)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "execution": { + "iopub.status.busy": "2022-09-01T20:10:25.759950Z", + "iopub.status.idle": "2022-09-01T20:10:25.760581Z", + "shell.execute_reply": "2022-09-01T20:10:25.760353Z", + "shell.execute_reply.started": "2022-09-01T20:10:25.760328Z" + }, + "tags": [] + }, + "source": [ + "### Get the PyTorchJob training logs\n", + "\n", + "Model parameters are sharded across all workers and GPU devices." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "logs, _ = TrainingClient().get_job_logs(job_name, follow=True)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": { + "execution": { + "iopub.execute_input": "2024-03-01T23:44:15.511173Z", + "iopub.status.busy": "2024-03-01T23:44:15.510932Z", + "iopub.status.idle": "2024-03-01T23:44:15.539921Z", + "shell.execute_reply": "2024-03-01T23:44:15.539352Z", + "shell.execute_reply.started": "2024-03-01T23:44:15.511155Z" + }, + "tags": [] + }, + "source": [ + "## Delete the PyTorchJob\n", + "\n", + "You can delete the created PyTorchJob." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "TrainingClient().delete_job(name=job_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "pt-demo", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}