# MNIST Digits Recognition Pipeline

This notebook contains a Kubeflow Pipeline for training a CNN model on the MNIST digits dataset.

## Pipeline Components:
1. **get_data_batch**: Downloads MNIST data from MinIO and saves training/test datasets
2. **get_latest_data**: Dummy component for demonstration
3. **reshape_data**: Reshapes and normalizes the image data for CNN training
4. **model_building**: Builds, trains, and evaluates a CNN model using Keras/TensorFlow
5. **model_serving**: Deploys the trained model using KServe

## Usage:
1. Run the cell below to compile the pipeline
2. Upload the generated YAML file to your Kubeflow UI
3. Create a run with desired parameters (epochs, optimizer)

## Requirements:
- MinIO bucket "mlpipeline" with mnist.npz file
- KServe installed in the cluster
- Service account "sa-minio-kserve" configured

In [None]:
# 
# Creating a ML pipeline with the MNIST digits dataset
# KFP Example with lightweight Python components
#

import kfp
from kfp import dsl
import kfp.components as components
from typing import NamedTuple

def get_data_batch() -> NamedTuple('Outputs', [('datapoints_training', float),('datapoints_test', float),('dataset_version', str)]):
    """
    Function to get dataset and load it to minio bucket
    """
    print("getting data")
    from minio import Minio
    import numpy as np
    import json

    minio_client = Minio(
        "100.65.11.110:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "mlpipeline"
    
    minio_client.fget_object(minio_bucket,"mnist.npz","/tmp/mnist.npz")
    
    def load_data():
        with np.load("/tmp/mnist.npz", allow_pickle=True) as f:
            x_train, y_train = f["x_train"], f["y_train"]
            x_test, y_test = f["x_test"], f["y_test"]

        return (x_train, y_train), (x_test, y_test)
    
    # Get MNIST data directly from library
    (x_train, y_train), (x_test, y_test) = load_data()

    # save to numpy file, store in Minio
    np.save("/tmp/x_train.npy",x_train)
    minio_client.fput_object(minio_bucket,"x_train","/tmp/x_train.npy")

    np.save("/tmp/y_train.npy",y_train)
    minio_client.fput_object(minio_bucket,"y_train","/tmp/y_train.npy")

    np.save("/tmp/x_test.npy",x_test)
    minio_client.fput_object(minio_bucket,"x_test","/tmp/x_test.npy")

    np.save("/tmp/y_test.npy",y_test)
    minio_client.fput_object(minio_bucket,"y_test","/tmp/y_test.npy")
    
    dataset_version = "1.0"
    
    print(f"x_train shape: {x_train.shape}")
    print(f"y_train shape: {y_train.shape}")

    print(f"x_test shape: {x_test.shape}")
    print(f"y_test shape: {y_test.shape}")
    
    from collections import namedtuple
    divmod_output = namedtuple('Outputs', ['datapoints_training', 'datapoints_test', 'dataset_version'])
    return divmod_output(float(x_train.shape[0]),float(x_test.shape[0]),dataset_version)

def get_latest_data():
    """
    Dummy functions for showcasing
    """
    print("Adding latest data")

def reshape_data():
    """
    Reshape the data for model building
    """
    print("reshaping data")
    
    from minio import Minio
    import numpy as np

    minio_client = Minio(
        "100.65.11.110:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "mlpipeline"
    
    # load data from minio
    minio_client.fget_object(minio_bucket,"x_train","/tmp/x_train.npy")
    x_train = np.load("/tmp/x_train.npy")
    
    minio_client.fget_object(minio_bucket,"x_test","/tmp/x_test.npy")
    x_test = np.load("/tmp/x_test.npy")
    
    # reshaping the data
    # reshaping pixels in a 28x28px image with greyscale, canal = 1. This is needed for the Keras API
    x_train = x_train.reshape(-1,28,28,1)
    x_test = x_test.reshape(-1,28,28,1)

    # normalizing the data
    # each pixel has a value between 0-255. Here we divide by 255, to get values from 0-1
    x_train = x_train / 255
    x_test = x_test / 255
    
    # save data from minio
    np.save("/tmp/x_train.npy",x_train)
    minio_client.fput_object(minio_bucket,"x_train","/tmp/x_train.npy")
    
    np.save("/tmp/x_test.npy",x_test)
    minio_client.fput_object(minio_bucket,"x_test","/tmp/x_test.npy")

def model_building(
    no_epochs: int = 1,
    optimizer: str = "adam"
) -> NamedTuple('Output', [('model_accuracy', float), ('model_loss', float)]):
    """
    Build the model with Keras API
    Export model parameters
    """
    from tensorflow import keras
    import tensorflow as tf
    from minio import Minio
    import numpy as np
    import json
    
    minio_client = Minio(
        "100.65.11.110:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "mlpipeline"
    
    model = keras.models.Sequential()
    model.add(keras.layers.Conv2D(64, (3, 3), activation='relu', input_shape=(28,28,1)))
    model.add(keras.layers.MaxPool2D(2, 2))

    model.add(keras.layers.Flatten())
    model.add(keras.layers.Dense(64, activation='relu'))

    model.add(keras.layers.Dense(32, activation='relu'))

    model.add(keras.layers.Dense(10, activation='softmax')) #output are 10 classes, numbers from 0-9

    #compile the model - we want to have a binary outcome
    model.compile(optimizer=optimizer,
              loss="sparse_categorical_crossentropy",
              metrics=['accuracy'])
    
    minio_client.fget_object(minio_bucket,"x_train","/tmp/x_train.npy")
    x_train = np.load("/tmp/x_train.npy")
    
    minio_client.fget_object(minio_bucket,"y_train","/tmp/y_train.npy")
    y_train = np.load("/tmp/y_train.npy")
    
    #fit the model and return the history while training
    history = model.fit(
      x=x_train,
      y=y_train,
      epochs=no_epochs,
      batch_size=20,
    )
    
    minio_client.fget_object(minio_bucket,"x_test","/tmp/x_test.npy")
    x_test = np.load("/tmp/x_test.npy")
    
    minio_client.fget_object(minio_bucket,"y_test","/tmp/y_test.npy")
    y_test = np.load("/tmp/y_test.npy")
    

    # Test the model against the test dataset
    model_loss, model_accuracy = model.evaluate(x=x_test,y=y_test)
    
    print(f"Model accuracy: {model_accuracy}")
    print(f"Model loss: {model_loss}")
    
    ### Save model to minIO
    keras.models.save_model(model,"/tmp/detect-digits")
    
    import os
    import glob

    def upload_local_directory_to_minio(local_path, bucket_name, minio_path):
        assert os.path.isdir(local_path)

        for local_file in glob.glob(local_path + '/**'):
            local_file = local_file.replace(os.sep, "/")
            if not os.path.isfile(local_file):
                upload_local_directory_to_minio(
                    local_file, bucket_name, minio_path + "/" + os.path.basename(local_file))
            else:
                remote_path = os.path.join(
                    minio_path, local_file[1 + len(local_path):])
                remote_path = remote_path.replace(
                    os.sep, "/")
                minio_client.fput_object(bucket_name, remote_path, local_file)

    upload_local_directory_to_minio("/tmp/detect-digits",minio_bucket,"models/detect-digits/1/")
    
    print("Saved model to minIO")
    
    from collections import namedtuple
    output = namedtuple('Output', ['model_accuracy', 'model_loss'])
    return output(float(model_accuracy), float(model_loss))

def model_serving():
    """
    Create kserve instance
    """
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1TFServingSpec
    from datetime import datetime

    namespace = utils.get_default_target_namespace()

    now = datetime.now()
    v = now.strftime("%Y-%m-%d--%H-%M-%S")

    name='digits-recognizer-{}'.format(v)
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    isvc = V1beta1InferenceService(api_version=api_version,
                                   kind=constants.KSERVE_KIND,
                                   metadata=client.V1ObjectMeta(
                                       name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                                   spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       service_account_name="sa-minio-kserve",
                                       tensorflow=(V1beta1TFServingSpec(
                                           storage_uri="s3://mlpipeline/models/detect-digits/"))))
    )

    KServe = KServeClient()
    KServe.create(isvc)

# Create component ops
comp_get_data_batch = components.create_component_from_func(
    get_data_batch,
    base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0",
    packages_to_install=["minio"]
)

comp_get_latest_data = components.create_component_from_func(
    get_latest_data,
    base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0"
)

comp_reshape_data = components.create_component_from_func(
    reshape_data,
    base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0",
    packages_to_install=["minio"]
)

comp_model_building = components.create_component_from_func(
    model_building,
    base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0",
    packages_to_install=["minio"]
)

comp_model_serving = components.create_component_from_func(
    model_serving,
    base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.5.0",
    packages_to_install=['kserve==0.11.0', 'kubernetes']
)

@dsl.pipeline(
    name='digits-recognizer-pipeline',
    description='Detect digits'
)
def digits_recognizer_pipeline(no_epochs: int = 1, optimizer: str = "adam"):
    step1_1 = comp_get_data_batch()
    step1_2 = comp_get_latest_data()
    
    step2 = comp_reshape_data()
    step2.after(step1_1, step1_2)
    
    step3 = comp_model_building(no_epochs=no_epochs, optimizer=optimizer)
    step3.after(step2)
    
    step4 = comp_model_serving()
    step4.after(step3)

if __name__ == "__main__":
    # Compile pipeline
    kfp.compiler.Compiler().compile(
        pipeline_func=digits_recognizer_pipeline,
        package_path='digits_recognizer_pipeline.yaml'
    )
    print("Pipeline compiled successfully to 'digits_recognizer_pipeline.yaml'")
    
    # Optional: Create and run pipeline if client is available
    try:
        client = kfp.Client()
        
        # Submit pipeline run
        run = client.create_run_from_pipeline_func(
            digits_recognizer_pipeline,
            arguments={
                "no_epochs": 1,
                "optimizer": "adam"
            },
            experiment_name="digits-recognizer"
        )
        
        print(f"Pipeline run submitted: {run.run_id}")
        
    except Exception as e:
        print(f"Could not submit run automatically: {e}")
        print("You can upload the compiled YAML file manually to the Kubeflow UI.")

In [None]:
# Compile the pipeline
print("Compiling pipeline...")
kfp.compiler.Compiler().compile(
    pipeline_func=digits_recognizer_pipeline,
    package_path='digits_recognizer_pipeline.yaml'
)
print("✅ Pipeline compiled successfully to 'digits_recognizer_pipeline.yaml'")
print("\nYou can now:")
print("1. Upload the YAML file to your Kubeflow Pipelines UI")
print("2. Create a new run with parameters:")
print("   - no_epochs: Number of training epochs (default: 1)")
print("   - optimizer: Optimizer to use (default: 'adam')")

In [None]:
# Optional: Try to submit pipeline run (only works when running inside Kubeflow)
import os

try:
    client = kfp.Client()
    
    # Test connection
    experiments = client.list_experiments()
    print("✅ Connected to Kubeflow Pipelines!")
    
    # Submit pipeline run
    run = client.create_run_from_pipeline_func(
        digits_recognizer_pipeline,
        arguments={
            "no_epochs": 1,
            "optimizer": "adam"
        },
        experiment_name="digits-recognizer"
    )
    
    print(f"✅ Pipeline run submitted: {run.run_id}")
    
except Exception as e:
    print("ℹ️  Could not connect to Kubeflow Pipelines automatically.")
    print("This is expected when running outside of a Kubeflow cluster.")
    
    # Check if YAML file was created
    if os.path.exists('digits_recognizer_pipeline.yaml'):
        print("\n✅ Pipeline YAML file created successfully!")
        print("📁 File location: digits_recognizer_pipeline.yaml")
        print("\n📋 Next steps:")
        print("1. Navigate to your Kubeflow Pipelines UI")
        print("2. Click 'Upload pipeline'")
        print("3. Select the 'digits_recognizer_pipeline.yaml' file")
        print("4. Create a new run with these parameters:")
        print("   - no_epochs: 1-10 (number of training epochs)")
        print("   - optimizer: 'adam', 'sgd', or 'rmsprop'")
        print("\n🔧 Make sure your cluster has:")
        print("   - MinIO with 'mlpipeline' bucket containing mnist.npz")
        print("   - KServe installed and configured")
        print("   - Service account 'sa-minio-kserve' with proper permissions")
    else:
        print("❌ Pipeline YAML file was not created")