In [97]:
import kfp
import kfp.dsl as dsl
from kfp.dsl import Input, Output
from kfp.dsl import Dataset, Artifact
from kfp.dsl import Model, Metrics, ClassificationMetrics

from typing import NamedTuple

In [98]:
import tensorflow as tf

BASE_IMAGE = "tensorflow/tensorflow"

devices = tf.config.list_physical_devices('GPU')
if devices:
    BASE_IMAGE += ":latest-gpu"
else:
    BASE_IMAGE += ":latest"

In [99]:
## NOTICE: cannot declare outside of component
# class MyCallback(tf.keras.callbacks.Callback):
#   def on_epoch_end(self, epoch, logs={}):
#     if(logs.get('accuracy')>0.995):
#       print("\nReached 99.5% accuracy so cancelling training!")
#       self.model.stop_training = True

In [100]:
@dsl.component(
    base_image=BASE_IMAGE,
)
def load_data(
    x_train_input: Output[Dataset],
    y_train_input: Output[Dataset],
    x_test_input: Output[Dataset],
    y_test_input: Output[Dataset],
):
    from keras.datasets import mnist
    import pickle

    # load dataset
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    with open(x_train_input.path, "wb") as file:
        pickle.dump(x_train, file)

    with open(y_train_input.path, "wb") as file:
        pickle.dump(y_train, file)

    with open(x_test_input.path, "wb") as file:
        pickle.dump(x_test, file)

    with open(y_test_input.path, "wb") as file:
        pickle.dump(y_test, file)

In [101]:
@dsl.component(
    base_image = BASE_IMAGE,
)
def preprocess_data(
    x_train_input: Input[Dataset],
    y_train_input: Input[Dataset],
    x_test_input: Input[Dataset],
    y_test_input: Input[Dataset],
    x_train_pre: Output[Dataset],
    y_train_pre: Output[Dataset],
    x_test_pre: Output[Dataset],
    y_test_pre: Output[Dataset],
) -> NamedTuple("Preprocessed", input_shape=str, unique_classes=int):
    import pickle
    import numpy as np

    
    # load data from last step
    with open(x_train_input.path, "rb") as file:
        x_train = pickle.load(file)
    with open(y_train_input.path, "rb") as file:
        y_train = pickle.load(file)
    with open(x_test_input.path, "rb") as file:
        x_test = pickle.load(file)
    with open(y_test_input.path, "rb") as file:
        y_test = pickle.load(file)

    x_train=x_train.reshape(x_train.shape[0], x_train.shape[1], x_train.shape[2], 1)
    x_train=x_train / 255.0
    x_test = x_test.reshape(x_test.shape[0], x_test.shape[1], x_test.shape[2], 1)
    x_test=x_test/255.0

    with open(x_train_pre.path, "wb") as file:
        pickle.dump(x_train, file)
    with open(y_train_pre.path, "wb") as file:
        pickle.dump(y_train, file)
    with open(x_test_pre.path, "wb") as file:
        pickle.dump(x_test, file)
    with open(y_test_pre.path, "wb") as file:
        pickle.dump(y_test, file)

    outputs = NamedTuple("Preprocessed", input_shape=str, unique_classes=int)
    return outputs(str(x_train.shape), len(np.unique(y_train)))
    

In [102]:
@dsl.component(
    base_image = BASE_IMAGE,
)
def train(
    x_train_pre: Input[Dataset],
    y_train_pre: Input[Dataset],
    model_artifact: Output[Model],
    log: Output[Artifact],
    *,
    batch_size:int = 64,
    epochs:int = 10,
    model_version:str = "1",
):
    from datetime import datetime
    import tensorflow as tf
    import pickle
    import os

    from tensorflow.keras.optimizers import SGD
    from tensorflow.keras.callbacks import TensorBoard

    # cannot declare outside of component
    class MyCallback(tf.keras.callbacks.Callback):
      def on_epoch_end(self, epoch, logs={}):
        if(logs.get('accuracy')>0.995):
          print("\nReached 99.5% accuracy so cancelling training!")
          self.model.stop_training = True

    log_dir = f"{log.path}/logs/fit/{datetime.now().strftime('%Y%m%d-%H%M%S')}"
    tensorboard_callback = TensorBoard(log_dir=log_dir, histogram_freq=1)

    with open(x_train_pre.path, "rb") as file:
        x_train = pickle.load(file)

    with open(y_train_pre.path, "rb") as file:
        y_train = pickle.load(file)

    model = tf.keras.models.Sequential([
        tf.keras.layers.Conv2D(filters=10, kernel_size=5, strides=1, padding='valid', input_shape=(28, 28, 1)),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.Conv2D(filters=20, kernel_size=5, strides=1, padding='valid'),
        tf.keras.layers.SpatialDropout2D(0.5),
        tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.Reshape((320,)),
        tf.keras.layers.Dense(units=50, input_shape=(320,)),
        tf.keras.layers.Activation('relu'),
        tf.keras.layers.Dropout(0.5),
        tf.keras.layers.Dense(units=10, input_shape=(50,)),
        tf.keras.layers.Activation('softmax')
    ])

    learning_rate = 0.01
    momentum = 0.9
    optimizer = SGD(learning_rate=learning_rate, momentum=momentum)
    validation_split = 0.1

    model.compile(
        optimizer=optimizer,
        loss=tf.keras.losses.SparseCategoricalCrossentropy(),
        metrics=['accuracy']
    )
    history = model.fit(x_train, y_train,
                    batch_size=batch_size,
                    epochs=epochs,
                    validation_split=validation_split,
                    callbacks=[MyCallback(), tensorboard_callback])

    # NOTICE: keras 3 cannot create model if parent folders doesn't exist
    os.makedirs(model_artifact.path, exist_ok=True)
    model.save(f"{model_artifact.path}/{model_version}_model.keras")
    # tf.saved_model.save(model, f"{model_artifact.path}/{model_version}")

In [105]:
@dsl.component(
    base_image=BASE_IMAGE,
    packages_to_install=["scikit-learn"],
)
def evaluate(
    model_artifact: Input[Model],
    metrics: Output[ClassificationMetrics],
    scalar_metrics: Output[Metrics],
    x_test_input: Input[Dataset],
    y_test_input: Input[Dataset],
    unique_labels:int,
    model_version:str = "1",
):
    from tensorflow.keras.metrics import Precision
    from tensorflow.keras.models import load_model
    from tensorflow.keras.utils import to_categorical
    from sklearn.metrics import confusion_matrix
    import numpy as np
    import pickle
    import tensorflow as tf

    # model = tf.keras.layers.TFSMLayer(f"{model_artifact.path}/{model_version}", call_endpoint="serving_default")
    model = load_model(f"{model_artifact.path}/{model_version}_model.keras")
    # model = tf.saved_model.load(f"{model_artifact.path}/{model_version}")

    batch_size = 128

    with open(x_test_input.path, "rb") as file:
        x_test = pickle.load(file)

    with open(y_test_input.path, "rb") as file:
        y_test = pickle.load(file)

    # predictions = model(x_test)
    predictions = model.predict(x_test, batch_size=batch_size)
    predictions = (predictions >= 0.5).astype(int)

    y_test = to_categorical(y_test)

    metrics.log_confusion_matrix(
        ["0", "1", "2", "3", "4", "5", "6", "7", "8", "9"],
        confusion_matrix(
            y_test.argmax(axis=1), predictions.argmax(axis=1)
        ).tolist(),  # .tolist() to convert np array to list.
    )
    m = Precision()
    m.update_state(y_test, predictions)
    loss, acc = model.evaluate(x_test, y_test.argmax(axis=1), batch_size=batch_size)
    scalar_metrics.log_metric("accuracy", acc)
    scalar_metrics.log_metric("loss", loss)
    scalar_metrics.log_metric("precision", m.result().numpy().tolist())

In [106]:
@dsl.pipeline(
    name="mnist_pipeline",
)
def mnist_pipeline(epochs: int, model_version: str):
    data = (
        load_data()
        .set_memory_limit("4G")
        .set_memory_request("4G")
        .set_cpu_limit("1")
        .set_cpu_request("1")
    )
    preprocess = (
        preprocess_data(
            x_train_input=data.outputs["x_train_input"],
            y_train_input=data.outputs["y_train_input"],
            x_test_input=data.outputs["x_test_input"],
            y_test_input=data.outputs["y_test_input"],
        )
        .set_memory_limit("4G")
        .set_memory_request("4G")
        .set_cpu_limit("1")
        .set_cpu_request("1")
    )
    preprocess.after(data)
    model = (
        train(
            x_train_pre=preprocess.outputs["x_train_pre"],
            y_train_pre=preprocess.outputs["y_train_pre"],
            epochs=epochs,
        )
        .set_memory_limit("6G")
        .set_memory_request("6G")
        .set_cpu_limit("1")
        .set_cpu_request("1")
    )
    model.after(preprocess)
    evaluation = (
        evaluate(
            model_artifact=model.outputs["model_artifact"],
            x_test_input=preprocess.outputs["x_test_pre"],
            y_test_input=preprocess.outputs["y_test_pre"],
            unique_labels=preprocess.outputs["unique_classes"]
        )
        .set_memory_limit("4G")
        .set_memory_request("4G")
        .set_cpu_limit("1")
        .set_cpu_request("1")
    )
    evaluation.after(model)


client = kfp.Client()
client.create_run_from_pipeline_func(
    mnist_pipeline,
    arguments={"epochs": 10, "model_version": "1"},
    experiment_name="mnist_pipeline",
)

RunPipelineResult(run_id=58ffc6b4-7071-46b2-8be8-1c84d8a76b38)

## Generate pipeline file for uploading

In [107]:
from kfp import compiler

In [108]:
compiler.Compiler().compile(
    pipeline_func=mnist_pipeline,
    package_path="mnist_pipeline.yaml"
)