# Kubeflow Trainer: Local Training

This notebook demonstrates how to run single-node training using the **Local Process Backend**.

## Local Process Backend

- **Container Runtime**: None (native Python subprocess)
- **Use Case**: Quick testing, debugging, rapid iteration
- **Prerequisites**: Python 3.9+ only

This example trains a CNN on the classic [MNIST](http://yann.lecun.com/exdb/mnist/) handwritten digit dataset using PyTorch.

## Install the Kubeflow SDK

You need to install the Kubeflow SDK to interact with Kubeflow Trainer APIs:

In [None]:
# Uncomment to install
# %pip install -U kubeflow

## Define the Training Function

The first step is to create a function to train CNN model using MNIST data.

In [1]:
def train_mnist():
    import torch
    import torch.nn.functional as F
    from torch import nn, optim
    from torch.utils.data import DataLoader
    from torchvision import datasets, transforms

    # Define the PyTorch CNN model to be trained
    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 20, 5, 1)
            self.conv2 = nn.Conv2d(20, 50, 5, 1)
            self.fc1 = nn.Linear(4 * 4 * 50, 500)
            self.fc2 = nn.Linear(500, 10)

        def forward(self, x):
            x = F.relu(self.conv1(x))
            x = F.max_pool2d(x, 2, 2)
            x = F.relu(self.conv2(x))
            x = F.max_pool2d(x, 2, 2)
            x = x.view(-1, 4 * 4 * 50)
            x = F.relu(self.fc1(x))
            x = self.fc2(x)
            return F.log_softmax(x, dim=1)

    # Create the model
    if torch.cuda.is_available():
        device = torch.device("cuda")
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")
    model = Net().to(device)
    
    # Load MNIST dataset
    dataset = datasets.MNIST(
        './data',
        train=True,
        download=True,
        transform=transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
    )
    train_loader = DataLoader(dataset, batch_size=64, shuffle=True)
    optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.5)
    
    for epoch in range(1, 3):
        model.train()
        
        # Iterate over mini-batches from the training set
        for batch_idx, (data, target) in enumerate(train_loader):
            # Forward pass
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            loss = F.nll_loss(outputs, target)
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            if batch_idx % 100 == 0:
                print(
                    "Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(
                        epoch,
                        batch_idx * len(data),
                        len(train_loader.dataset),
                        100.0 * batch_idx / len(train_loader),
                        loss.item(),
                    )
                )

    torch.save(model.state_dict(), "mnist_cnn.pt")
    print("Training is finished")

## Configure Local Process Backend

Initialize the Local Process Backend configuration:

In [1]:
from kubeflow.trainer import TrainerClient, LocalProcessBackendConfig

# Configure Local Process Backend
backend_config = LocalProcessBackendConfig(
    cleanup_venv=True  # Auto-cleanup virtual environments after job completes
)

## Initialize Client

Initialize the TrainerClient with the Local Process Backend:

In [1]:
client = TrainerClient(backend_config=backend_config)

## List the Training Runtimes

You can get the list of available Training Runtimes to start your TrainJob.

In [1]:
for runtime in client.list_runtimes():
    print(runtime)
    if runtime.name == "torch-distributed":
        torch_runtime = runtime

Runtime(name='torch-distributed', trainer=RuntimeTrainer(trainer_type=<TrainerType.CUSTOM_TRAINER: 'CustomTrainer'>, framework='torch', image='local', num_nodes=1, device='Unknown', device_count='Unknown'), pretrained_model=None)


## Run the TrainJob

Submit the training job to the Local Process Backend:

In [1]:
from kubeflow.trainer import CustomTrainer

job_name = client.train(
    trainer=CustomTrainer(
        func=train_mnist,
        packages_to_install=["pip-system-certs", "torch", "torchvision"],
    ),
    runtime=torch_runtime,
)

## Check the TrainJob Status

You can check the status of the TrainJob that's created.

In [1]:
job = client.get_job(job_name)
print("Job: {}, Status: {}".format(job.name, job.status))

Job: u61c13e8364f, Status: Running


## Watch the TrainJob Logs

We can use the `get_job_logs()` API to get the TrainJob logs.

In [1]:
for logline in client.get_job_logs(job_name, follow=True):
    print(logline, end='')

Operating inside /var/folders/tx/51dj585d29d554dgxchlcnvh0000gn/T/u61c13e8364f2amq0j64
Looking in links: /tmp/tmpq35odxzx
Processing /tmp/tmpq35odxzx/setuptools-65.5.0-py3-none-any.whl
Processing /tmp/tmpq35odxzx/pip-24.0-py3-none-any.whl
Installing collected packages: setuptools, pip
Successfully installed pip-24.0 setuptools-65.5.0
Collecting pip-system-certs
  Using cached pip_system_certs-5.3-py3-none-any.whl.metadata (3.9 kB)
Collecting torch
  Downloading torch-2.9.1-cp311-none-macosx_11_0_arm64.whl.metadata (30 kB)
Collecting torchvision
  Downloading torchvision-0.24.1-cp311-cp311-macosx_11_0_arm64.whl.metadata (5.9 kB)
Collecting pip>=24.2 (from pip-system-certs)
  Using cached pip-25.3-py3-none-any.whl.metadata (4.7 kB)
Collecting filelock (from torch)
  Using cached filelock-3.20.0-py3-none-any.whl.metadata (2.1 kB)
Collecting typing-extensions>=4.10.0 (from torch)
  Using cached typing_extensions-4.15.0-py3-none-any.whl.metadata (3.3 kB)
Collecting sympy>=1.13.3 (from torch

## Delete the TrainJob

When the TrainJob is finished, you can delete the resource.

In [None]:
client.delete_job(job_name)