In [24]:
BASE_IMAGE = "kubeflownotebookswg/jupyter-pytorch-full:v1.8.0"

In [25]:
import kfp
import kfp.dsl as dsl
from kfp.dsl import Input, Output
from kfp.dsl import Dataset, Artifact
from kfp.dsl import Model, Metrics, ClassificationMetrics

from typing import NamedTuple

In [26]:
@dsl.component(
    base_image=BASE_IMAGE,
)
def load_data(
    x_train_pickle: Output[Dataset],
    y_train_pickle: Output[Dataset],
    x_test_pickle: Output[Dataset],
    y_test_pickle: Output[Dataset],
):
    # import dataset
    import torch
    import torchvision
    import numpy as np
    import pickle
    
    
    # load dataset    
    subset_indices = list(range(3000))
    x_train, y_train = torch.utils.data.Subset(torchvision.datasets.MNIST('.', 
                             train=True, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                             ])),subset_indices)
    subset_indices = list(range(1000))
    x_test, y_test = torch.utils.data.Subset(torchvision.datasets.MNIST('.', 
                                train=False, download=True,
                             transform=torchvision.transforms.Compose([
                               torchvision.transforms.ToTensor(),
                               torchvision.transforms.Normalize(
                                 (0.1307,), (0.3081,))
                                ])),subset_indices)
    
    # count the number of unique train labels
    unique, counts = np.unique(y_train, return_counts=True)
    print("Train labels: ", dict(zip(unique, counts)))

    # count the number of unique test labels
    unique, counts = np.unique(y_test, return_counts=True)
    print("\nTest labels: ", dict(zip(unique, counts)))
    indexes = np.random.randint(0, x_train.shape[0], size=25)
    images = x_train[indexes]
    with open(x_train_pickle.path, "wb") as file:
        pickle.dump(x_train, file)

    with open(y_train_pickle.path, "wb") as file:
        pickle.dump(y_train, file)

    with open(x_test_pickle.path, "wb") as file:
        pickle.dump(x_test, file)

    with open(y_test_pickle.path, "wb") as file:
        pickle.dump(y_test, file)

In [27]:
@dsl.component(base_image=BASE_IMAGE)
def preprocess_data(
    x_train_pickle: Input[Dataset],
    y_train_pickle: Input[Dataset],
    x_test_pickle: Input[Dataset],
    y_test_pickle: Input[Dataset],
    x_train_prep: Output[Dataset],
    y_train_prep: Output[Dataset],
    x_test_prep: Output[Dataset],
    y_test_prep: Output[Dataset],
) -> NamedTuple("outputs", input_size=int, num_labels=int):
    from keras.utils import to_categorical
    import numpy as np
    import pickle
    from typing import NamedTuple

    with open(x_train_pickle.path, "rb") as file:
        x_train = pickle.load(file)

    with open(y_train_pickle.path, "rb") as file:
        y_train = pickle.load(file)

    with open(x_test_pickle.path, "rb") as file:
        x_test = pickle.load(file)

    with open(y_test_pickle.path, "rb") as file:
        y_test = pickle.load(file)

    num_labels = len(np.unique(y_train))

    y_train = to_categorical(y_train)
    y_test = to_categorical(y_test)
    image_size = x_train.shape[1]
    input_size = image_size * image_size
    # resize and normalize
    x_train = np.reshape(x_train, [-1, input_size])
    x_train = x_train.astype("float32") / 255
    x_test = np.reshape(x_test, [-1, input_size])
    x_test = x_test.astype("float32") / 255
    with open(x_train_prep.path, "wb") as file:
        pickle.dump(x_train, file)

    with open(y_train_prep.path, "wb") as file:
        pickle.dump(y_train, file)

    with open(x_test_prep.path, "wb") as file:
        pickle.dump(x_test, file)

    with open(y_test_prep.path, "wb") as file:
        pickle.dump(y_test, file)
    outputs = NamedTuple("outputs", input_size=int, num_labels=int)
    return outputs(input_size, num_labels)

In [28]:
@dsl.component(base_image=BASE_IMAGE)
def train(
    input_size: int,
    num_labels: int,
    epochs: int,
    x_train_pickle: Input[Dataset],
    y_train_pickle: Input[Dataset],
    model_artifact: Output[Model],
    log: Output[Artifact],
):
    import torch
    import torchvision
    import pickle
    from datetime import datetime

    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
            self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
            self.conv2_drop = nn.Dropout2d()
            self.fc1 = nn.Linear(320, 50)
            self.fc2 = nn.Linear(50, 10)

        def forward(self, x):
            x = F.relu(F.max_pool2d(self.conv1(x), 2))
            x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
            x = x.view(-1, 320)
            x = F.relu(self.fc1(x))
            x = F.dropout(x, training=self.training)
            x = self.fc2(x)
            return F.log_softmax(x)     
    

    batch_size_train = 100
    batch_size_test = 100
    learning_rate = 0.01
    momentum = 0.5
    log_interval = 10

    random_seed = 1
    torch.backends.cudnn.enabled = False
    torch.manual_seed(random_seed)

    with open(x_train_pickle.path, "rb") as file:
        x_train = pickle.load(file)

    with open(y_train_pickle.path, "rb") as file:
        y_train = pickle.load(file)

    log_dir = f"{log.path}/logs/fit/{datetime.now().strftime('%Y%m%d-%H%M%S')}"
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Running on {device}.")
    network = Net()
    optimizer = optim.SGD(network.parameters(), lr=learning_rate,
                          momentum=momentum)    
    train_losses = []
    train_counter = []
    test_losses = []
    test_counter = [i*len(train_loader.dataset) for i in range(n_epochs + 1)]   

    network.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        output = network(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
          print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
            epoch, batch_idx * len(data), len(train_loader.dataset),
            100. * batch_idx / len(train_loader), loss.item()))
          train_losses.append(loss.item())
          train_counter.append(
            (batch_idx*64) + ((epoch-1)*len(train_loader.dataset)))
          torch.save(network.state_dict(), model_artifact.path + '/model.pth')
          torch.save(optimizer.state_dict(), model_artifact.path + '/optimizer.pth')    
   
       
    print("started training")
    for epoch in range(1, epochs + 1):
      train(epoch)    

In [29]:
@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=["scikit-learn"],
)
def evaluate(
    model_artifact: Input[Model],
    metrics: Output[ClassificationMetrics],
    scalar_metrics: Output[Metrics],
    x_test_pickle: Input[Dataset],
    y_test_pickle: Input[Dataset],
):
    import torch
    import torchvision
    import numpy as np
    import pickle
    from sklearn.metrics import classification_report, confusion_matrix
    import seaborn as sns
    import matplotlib.pyplot as plt
    
    with open(x_test_pickle.path, "rb") as file:
        x_test = pickle.load(file)

    with open(y_test_pickle.path, "rb") as file:
        y_test = pickle.load(file)    
    class Net(nn.Module):
        def __init__(self):
            super(Net, self).__init__()
            self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
            self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
            self.conv2_drop = nn.Dropout2d()
            self.fc1 = nn.Linear(320, 50)
            self.fc2 = nn.Linear(50, 10)

        def forward(self, x):
            x = F.relu(F.max_pool2d(self.conv1(x), 2))
            x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
            x = x.view(-1, 320)
            x = F.relu(self.fc1(x))
            x = F.dropout(x, training=self.training)
            x = self.fc2(x)
            return F.log_softmax(x)          
    

    network = model.load_state_dict(torch.load(model_artifact.path + '/model.pth', weights_only=True))

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Running on {device}.")

    # Set the model to evaluation mode. This is important as certain layers like dropout behave differently during training and evaluation.
    network.eval()

    # Lists to store all predictions and true labels
    all_preds = []
    all_labels = []
    # Define the class labels for the Fashion MNIST dataset.
    classes = ('zero','one','two','three','four','five','six','seven','eight','nine')

    # We don't want to compute gradients during evaluation, hence wrap the code inside torch.no_grad()
    with torch.no_grad():
        # Iterate over all batches in the test loader
        for images, labels in test_loader:
            # Transfer images and labels to the computational device (either CPU or GPU)
            images, labels = images.to(device), labels.to(device)

            # Pass the images through the model to get predictions
            outputs = network(images)

            # Get the class with the maximum probability as the predicted class
            _, predicted = torch.max(outputs, 1)

            # Extend the all_preds list with predictions from this batch
            all_preds.extend(predicted.cpu().numpy())

            # Extend the all_labels list with true labels from this batch
            all_labels.extend(labels.cpu().numpy())

    # Print a classification report which provides an overview of the model's performance for each class
    print(classification_report(all_labels, all_preds, target_names=classes))


In [30]:
@dsl.pipeline(
    name="pytorch_mnist_pipeline",
)
def mnist_pipeline(epochs: int):
    data = (
        load_data()
        .set_memory_limit("4G")
        .set_memory_request("4G")
        .set_cpu_limit("2")
        .set_cpu_request("2")
    )
    preprocess = (
        preprocess_data(
            x_train_pickle=data.outputs["x_train_pickle"],
            y_train_pickle=data.outputs["y_train_pickle"],
            x_test_pickle=data.outputs["x_test_pickle"],
            y_test_pickle=data.outputs["y_test_pickle"],
        )
        .set_memory_limit("4G")
        .set_memory_request("4G")
        .set_cpu_limit("1")
        .set_cpu_request("1")
    )
    preprocess.after(data)
    model = (
        train(
            input_size=preprocess.outputs["input_size"],
            num_labels=preprocess.outputs["num_labels"],
            epochs=epochs,
            x_train_pickle=preprocess.outputs["x_train_prep"],
            y_train_pickle=preprocess.outputs["y_train_prep"],
        )
        # .set_memory_limit("6G")
        # .set_memory_request("6G")
        # .set_cpu_limit("1")
        # .set_cpu_request("1")
    )
    model.after(preprocess)
    evaluation = (
        evaluate(
            model_artifact=model.outputs["model_artifact"],
            x_test_pickle=preprocess.outputs["x_test_prep"],
            y_test_pickle=preprocess.outputs["y_test_prep"],
        )
        .set_memory_limit("4G")
        .set_memory_request("4G")
        .set_cpu_limit("1")
        .set_cpu_request("1")
    )
    evaluation.after(model)


client = kfp.Client()
client.create_run_from_pipeline_func(
    mnist_pipeline,
    arguments={"epochs": 2},
    experiment_name="mnist_pipeline",
)

RunPipelineResult(run_id=e14396d4-9c36-4847-8d28-696b9aca5166)