In [None]:
!pip install --upgrade kubeflow-training

In [2]:
def train_func():
    import torch
    import torch.nn.functional as F
    from torch.utils.data import DistributedSampler
    from torchvision import datasets, transforms
    from torch.distributed.fsdp import FullyShardedDataParallel as FSDP

    # [1] Setup PyTorch DDP. WORLD_SIZE and RANK environments will be set by Training Operator.
    torch.distributed.init_process_group(backend="nccl")
    Distributor = torch.nn.parallel.DistributedDataParallel


    # [2] Create PyTorch CNN Model.
    class Net(torch.nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = torch.nn.Conv2d(1, 20, 5, 1)
            self.conv2 = torch.nn.Conv2d(20, 50, 5, 1)
            self.fc1 = torch.nn.Linear(4 * 4 * 50, 500)
            self.fc2 = torch.nn.Linear(500, 10)

        def forward(self, x):
            x = F.relu(self.conv1(x))
            x = F.max_pool2d(x, 2, 2)
            x = F.relu(self.conv2(x))
            x = F.max_pool2d(x, 2, 2)
            x = x.view(-1, 4 * 4 * 50)
            x = F.relu(self.fc1(x))
            x = self.fc2(x)
            return F.log_softmax(x, dim=1)

    # [3] Attach model to GPU and distributor.
    device = "cuda"
    model = Net().to(device)
    model = Distributor(model)
    optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.5)

    # [4] Setup FashionMNIST dataloader and distribute data across PyTorchJob workers.
    dataset = datasets.FashionMNIST(
        "./data",
        download=True,
        train=True,
        transform=transforms.Compose([transforms.ToTensor()]),
    )
    train_loader = torch.utils.data.DataLoader(
        dataset=dataset,
        batch_size=128,
        sampler=DistributedSampler(dataset),
    )

    # [5] Start model Training.
    for epoch in range(3):
        for batch_idx, (data, target) in enumerate(train_loader):
            # Attach Tensors to the device.
            data = data.to(device)
            target = target.to(device)

            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()
            if batch_idx % 10 == 0:
                print(
                    "Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}".format(
                        epoch,
                        batch_idx * len(data),
                        len(train_loader.dataset),
                        100.0 * batch_idx / len(train_loader),
                        loss.item(),
                    )
                )

In [4]:
# STEP 1 (DONE)
# Get a very simple pytorch job definition without adding advanced parameters
# use training_client.create_job function 
# https://github.com/kubeflow/training-operator/blob/master/sdk/python/kubeflow/training/api/training_client.py
# this may not run but we get something

from kubeflow.training.utils import utils
import logging
from kubeflow.training import models
from kubeflow.training.constants import constants

logger = logging.getLogger(__name__)

status_logger = utils.StatusLogger(
    header="{:<30.30} {:<20.20} {}".format("NAME", "STATE", "TIME"),
    column_format="{:<30.30} {:<20.20} {}",
)

base_image = "quay.io/shanand/test-train:v0.1"
namespace = "shreyanand"
num_workers = 1
container_name = "PYTORCHJOB_CONTAINER"
name = "test-kfp"

container_spec = utils.get_container_spec(
    name="PYTORCHJOB_CONTAINER",
    base_image=base_image,
    train_func=train_func
)

# Get Pod template spec using the above container.
pod_template_spec = utils.get_pod_template_spec(
    containers=[container_spec],
)

job = utils.get_pytorchjob_template(
    name=name,
    namespace=namespace,
    worker_pod_template_spec=pod_template_spec,
    num_workers=num_workers,
)

In [31]:
type(job)

kubeflow.training.models.kubeflow_org_v1_py_torch_job.KubeflowOrgV1PyTorchJob

In [None]:
# STEP 2a (To do)
# Run the job using a launcher client 
# that waits for the succeded or failed
# taken from https://github.com/kubeflow/pipelines/blob/master/components/kubeflow/pytorch-launcher/src/launch_pytorchjob.py

from kubernetes import client as k8s_client
from kubernetes import config
import launch_crd

serialized_job = k8s_client.ApiClient().sanitize_for_serialization(job)

logging.info('Creating launcher client.')

config.load_incluster_config()
api_client = k8s_client.ApiClient()
launcher_client = launch_crd.K8sCR(
    group=args.jobGroup,
    plural=args.jobPlural,
    version=args.version,
    client=api_client
)

logging.info('Submitting CR.')
create_response = launcher_client.create(serialized_job)

expected_conditions = ["Succeeded", "Failed"]
logging.info(
    f'Monitoring job until status is any of {expected_conditions}.'
)
launcher_client.wait_for_condition(
    args.namespace, args.name, expected_conditions,
    timeout=datetime.timedelta(minutes=args.jobTimeoutMinutes))
if args.deleteAfterDone:
    logging.info('Deleting job.')
    launcher_client.delete(args.name, args.namespace)


# STEP 2b (To do)
# However this is probably outdated code
# We could actually rewrite this using training_client.create_job()
# and training_client.is_job_succeded(), training_client.is_job_failed()
# https://github.com/kubeflow/training-operator/blob/master/sdk/python/kubeflow/training/api/training_client.py

In [None]:
# STEP 3 (to do) Can also do this after step 4

# Run the above as a dsl component 
# basic image + kubeflow.training installed

In [32]:
# STEP 4

# Completely define the Pytorch job 
# If you complete step 2b, you can directly use training_client.train() for this
# If there are issues, adapt training_client.train() code for this use case
# Once you have a well defined Pytorch job; run the job using the final output of step 2