In [None]:
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# PyTorch on Vertex Training: MNIST Distributed Training Example

<table align="left">

  <td>
    <a href="https://colab.research.google.com/github/rastringer/vertex-ai-examples/blob/main/pytorch_on_vertex/pytorch_mnist_distributed_training_python_package.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Run in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/rastringer/vertex-ai-examples/blob/main/pytorch_on_vertex/pytorch_mnist_distributed_training_python_package.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
  <td>
    <a href="https://console.cloud.google.com/vertex-ai/workbench/deploy-notebook?download_url=https://raw.githubusercontent.com/rastringer/vertex-ai-examples/blob/main/pytorch_on_vertex/pytorch_mnist_distributed_training_python_package.ipynb">
      <img src="https://lh3.googleusercontent.com/UiNooY4LUgW_oTvpsNhPpQzsstV5W8F7rYgxgGBD85cWJoLmrOzhVs_ksK_vgx40SHs7jCqkTkCk=e14-rj-sc0xffffff-h130-w32" alt="Vertex AI logo">
      Open in Vertex AI Workbench
    </a>
  </td>
</table>

#### Overview

This notebook shows how to create a distributed Vertex Training job by making a modular python package. We use the canonical MNIST image classification example from the [PyTorch GitHub repository](https://github.com/pytorch/examples/blob/main/mnist/main.py) (with thanks to Meta) and show how to run it using [pre-built containers](https://cloud.google.com/vertex-ai/docs/training/pre-built-containers#pytorch) on Vertex AI.

#### Objective

Building on the previous notebook in which we ran a simple training job, we will now run distributed training by making simple changes to the job configuration.

We will use the following Google Cloud services and resources:

- *Vertex AI Workbench, Training and Model Registry*
- *Google Cloud Storage*


#### Dataset

With simplicity in mind, we will load the MNIST dataset using the [PyTorch utils](https://pytorch.org/docs/stable/data.html).

#### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* Cloud Storage

Learn about [Vertex AI pricing](https://cloud.google.com/vertex-ai/pricing),
and [Cloud Storage pricing](https://cloud.google.com/storage/pricing),
and use the [Pricing Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

## Prerequisites

This tutorial requires a GCP project, Storage bucket, and Vertex AI and Storage APIs to be enabled. 

Please follow the steps in the [gcp_setup.ipynb](https://github.com/rastringer/vertex-ai-examples/blob/main/pytorch_on_vertex/gcp_setup.ipynb) notebook first if necessary. 

#### Installation

Install the following packages required to execute this notebook.

In [None]:
# Install the packages
! pip3 install --user --upgrade google-cloud-aiplatform

#### Colab only: Uncomment the following cell to restart the kernel.

In [None]:
# Automatically restart kernel after installs so that your environment can access the new packages
# import IPython

# app = IPython.Application.instance()
# app.kernel.do_shutdown(True)

In [None]:
PROJECT_ID = "<your-project-id>"  # @param {type:"string"}

#### Region

You can also change the `REGION` variable used by Vertex AI. Learn more about [Vertex AI regions](https://cloud.google.com/vertex-ai/docs/general/locations).

In [None]:
REGION = "<your-project-region>"  # @param {type: "string"}

#### Authentication

We may need to autnenticate the environment to your GCP account.

**Vertex AI Workbench**
* You are already authenticated, please skip to "Create a storage bucket..."

**Local JupyterLab instance:** uncomment and run:

In [None]:
# ! gcloud auth login

**Colab**, uncomment and run:

In [None]:
# from google.colab import auth
# auth.authenticate_user()

**Service account** or other
* See how to grant Cloud Storage permissions to your service account at https://cloud.google.com/storage/docs/gsutil/commands/iam#ch-examples.

Add a storage bucket to store artifacts.

In [None]:
BUCKET_URI = "gs://your-unique-bucket-name"  # @param {type:"string"}

Initialize the Vertex AI SDK for Python.

In [None]:
from google.cloud import aiplatform as vertexai
vertexai.init(project=PROJECT_ID, location=REGION, staging_bucket=BUCKET_URI)

UUID function to distinguish between different training jobs

In [None]:
import random
import string

# Generate a uuid of a specifed length
def generate_uuid(length: int=6) -> str:
    return "".join(random.choices(string.ascii_lowercase + string.digits, k=length))

UUID = generate_uuid()

Set environment variables for pre-built containers and model directory. This example will feature training and serving on GPU, to use CPU simply switch to the "_CPU" containers

### File structure

We will follow the recommended [file structure](https://cloud.google.com/vertex-ai/docs/training/code-requirements) to create a Python source distribution that trains a model and exports it to Cloud Storage. 


![Tree](tree.png)

In [None]:
# Name for the package application / model / repository
APP_NAME = "pytorch-mnist-modular"
# Training job's display name
JOB_NAME = f"{APP_NAME}-{UUID}"

# URI for the pre-built container for custom training
TRAIN_IMAGE_GPU = "us-docker.pkg.dev/vertex-ai/training/pytorch-gpu.1-13:latest"
# Container image for prediction
PREDICT_IMAGE_GPU = "us-docker.pkg.dev/vertex-ai/prediction/pytorch-gpu.1-12:latest"

# Python package folders, files and URIs
PYTHON_PACKAGE_APPLICATION_DIR = "python_package"
source_package_file_name = f"{PYTHON_PACKAGE_APPLICATION_DIR}/dist/trainer-0.1.tar.gz"
python_package_gcs_uri = f"{BUCKET_URI}/{APP_NAME}/train/python_package/trainer-0.1.tar.gz"
python_module_name = "trainer.task"

# Machine types and accelerator
TRAIN_MACHINE_TYPE = "n1-standard-8"
TRAIN_ACCELERATOR_TYPE = "NVIDIA_TESLA_T4"
TRAIN_ACCELERATOR_COUNT = 2

# Set the version for model deployment
VERSION = 1
# Model display name
model_display_name = f"{APP_NAME}-v{VERSION}"
# Description
model_description = "PyTorch MNIST classifier"

In [None]:
!mkdir -p python_package/trainer
!touch ./python_package/trainer/__init__.py

UUID function to distinguish between different training jobs

## Model

We use the magic *writefile* function to wrap the model file.

In [None]:
%%writefile ./python_package/trainer/model.py

"""
Thanks to Meta for the PyTorch example at
https://github.com/pytorch/examples/blob/main/mnist/main.py
"""

import os
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR


class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.fc1(x)
        x = F.relu(x)
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)

        return output

In [None]:
%%writefile ./python_package/trainer/task.py

import os
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR

from trainer.model import Net

def train(args, model, device, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % args.log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
            if args.dry_run:
                break

def test(model, device, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += F.nll_loss(output, target, reduction='sum').item()  # sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)

    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

def main():
    # Training settings
    parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=2, metavar='N',
                        help='number of epochs to train (default: 14)')
    parser.add_argument('--lr', type=float, default=1.0, metavar='LR',
                        help='learning rate (default: 1.0)')
    parser.add_argument('--gamma', type=float, default=0.7, metavar='M',
                        help='Learning rate step gamma (default: 0.7)')
    parser.add_argument('--no-cuda', action='store_true', default=False,
                        help='disables CUDA training')
    parser.add_argument('--no-mps', action='store_true', default=False,
                        help='disables macOS GPU training')
    parser.add_argument('--dry-run', action='store_true', default=False,
                        help='quickly check a single pass')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                        help='random seed (default: 1)')
    parser.add_argument('--log-interval', type=int, default=10, metavar='N',
                        help='how many batches to wait before logging training status')
    parser.add_argument('--save-model', action='store_true', default=True,
                        help='For Saving the current Model')

    args = parser.parse_args()
    use_cuda = not args.no_cuda and torch.cuda.is_available()
    use_mps = not args.no_mps and torch.backends.mps.is_available()

    torch.manual_seed(args.seed)

    if use_cuda:
        device = torch.device("cuda")
    elif use_mps:
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    train_kwargs = {'batch_size': args.batch_size}
    test_kwargs = {'batch_size': args.test_batch_size}
    if use_cuda:
        cuda_kwargs = {'num_workers': 1,
                       'pin_memory': True,
                       'shuffle': True}
        train_kwargs.update(cuda_kwargs)
        test_kwargs.update(cuda_kwargs)

    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
        ])
    dataset1 = datasets.MNIST('../data', train=True, download=True,
                       transform=transform)
    dataset2 = datasets.MNIST('../data', train=False,
                       transform=transform)
    train_loader = torch.utils.data.DataLoader(dataset1,**train_kwargs)
    test_loader = torch.utils.data.DataLoader(dataset2, **test_kwargs)

    model = Net().to(device)
    optimizer = optim.Adadelta(model.parameters(), lr=args.lr)

    scheduler = StepLR(optimizer, step_size=1, gamma=args.gamma)
    for epoch in range(1, args.epochs + 1):
        train(args, model, device, train_loader, optimizer, epoch)
        test(model, device, test_loader)
        scheduler.step()

    if args.save_model:
        torch.save(model, "mnist_cnn.pt")

    # Upload the trained model to Cloud storage
    from google.cloud import storage

    model_directory = os.getenv("AIP_MODEL_DIR")
    storage_path = os.path.join(model_directory, "mnist_cnn.pt")
    blob = storage.blob.Blob.from_string(storage_path, client=storage.Client())
    with blob.open("wb", ignore_flush=True) as f:
        torch.save(model, f)
    print(f"Saved model at {model_directory}")

if __name__ == '__main__':
    main()

In [None]:
%%writefile ./{PYTHON_PACKAGE_APPLICATION_DIR}/setup.py

from setuptools import find_packages
from setuptools import setup
import setuptools

from distutils.command.build import build as _build
import subprocess


REQUIRED_PACKAGES = [
    # Any additional packages beyond the base image 
    # would go here (not necessary for this tutorial) eg
    # 'transformers',
    # 'datasets'
    ]

setup(
    name='trainer',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    description='Vertex AI | Training | PyTorch | MNIST | Python Package'
)

!cd {PYTHON_PACKAGE_APPLICATION_DIR} && python3 setup.py sdist --formats=gztar


In [None]:
!gsutil cp {source_package_file_name} {python_package_gcs_uri}


In [None]:
!gsutil ls -l {python_package_gcs_uri}


### Distributed training

To add more machines, simply adjust 

```
replica_count=1,
```

And to add to the GPU count on each of those replicas, just change
 the global TRAIN_ACCELERATOR_COUNT variable to > 1. 

```
accelerator_count=TRAIN_ACCELERATOR_COUNT
```

In [None]:
job = vertexai.CustomPythonPackageTrainingJob(
    display_name=JOB_NAME,
    python_package_gcs_uri=python_package_gcs_uri,
    python_module_name=python_module_name,
    container_uri=TRAIN_IMAGE_GPU,
    model_serving_container_image_uri=PREDICT_IMAGE_GPU,
)

MODEL_DISPLAY_NAME = "pytorch_mnist"

EPOCHS = 10
BATCH_SIZE = 128
LEARNING_RATE = 0.01

CMDARGS = [
    "--epochs=" + str(EPOCHS),
    "--batch-size=" + str(BATCH_SIZE),
    "--lr=" + str(LEARNING_RATE),
    # "--distribute=" + TRAIN_STRATEGY,
]

# Start the training
model = job.run(
    model_display_name=MODEL_DISPLAY_NAME,
    args=CMDARGS,
    # To distribute training across multiple machines, 
    # simply adjust replica_count
    replica_count=1,
    accelerator_type=TRAIN_ACCELERATOR_TYPE,
    # Distribute training across two GPUs on each replica
    accelerator_count=TRAIN_ACCELERATOR_COUNT,
)

Add training arguments

## Cleaning up

To clean up all Google Cloud resources used in this project, you can [delete the Google Cloud
project](https://cloud.google.com/resource-manager/docs/creating-managing-projects#shutting_down_projects) you used for the tutorial.

Otherwise, you can delete the individual resources you created in this tutorial:

In [None]:
import os

# Delete model resource
# model.delete()

# Delete Cloud Storage objects that were created
# delete_bucket = False
# if delete_bucket or os.getenv("IS_TESTING"):
    # ! gsutil -m rm -r $BUCKET_URI