# Fingerprint recognition pipeline in Kubeflow

In this notebook, the **fingerprint recognition notebook** is segmented into components and executed as a **Kubeflow pipeline** run. A pipeline is a description of an ML workflow that includes all of the steps in the form of components in the workflow. A pipeline component is a self-contained set of user code, packaged as a Docker image, that performs one step in the pipeline. For example, this can be a component responsible for data preprocessing, data transformation, model training, and so on. For a conventional data science notebook to run as a Kubeflow pipeline it has to be brought into a Kubeflow *friendly* format which this notebook is dedicated to.

![pics](pics/fingerprint_Kubeflow.JPG)

## Load resuable components, define data location & name, MinIO, and namespace

Reusable components for repetitive steps are loaded in the first step. The components are located in a coworker's github as a **.yaml** file and have to be loaded using the url path. Kubeflow is designed to allow data scientists to reuse components when they execute a step of the ML workflow that happens frequently, for example downloading the data into the notebook. Other components that can be reused here are for model conversion, model upload and model deployment. The components for those steps are only compatible with models trained using *tensorflow*. For models using other frameworks, other components need to be loaded or the component needs to be defined in the notebook.

The dataset used in this notebook was uploaded to the file hosting service box. The URL and file name is mentioned next as well as the model name. Kubeflow ships with MinIO inside to store all of its pipelines, artifacts and logs. The URL, username and password must be called here. 

Kubeflow comes with multi-user isolation which simplifies user operations because each user only views and edits the Kubeflow components and model artifacts defined in their configuration. Isolation uses Kubernetes **Namespaces**. The Namespace needs to be specified before the other steps of the pipeline can be defined. 

In [15]:
DOWNLOAD_AND_EXTRACT_COMPONENT_URL = "https://raw.githubusercontent.com/lehrig/kubeflow-ppc64le-components/main/data-extraction/download-and-extract-from-url/component.yaml"
CONVERT_MODEL_TO_ONNX_COMPONENT_URL = "https://raw.githubusercontent.com/lehrig/kubeflow-ppc64le-components/main/model-building/convert-to-onnx/component.yaml"
UPLOAD_MODEL_COMPONENT_URL = "https://raw.githubusercontent.com/lehrig/kubeflow-ppc64le-components/main/model-building/upload-model/component.yaml"
DEPLOY_MODEL_WITH_KSERVE_COMPONENT_URL = "https://raw.githubusercontent.com/lehrig/kubeflow-ppc64le-components/main/model-deployment/deploy-model-with-kserve/component.yaml"

DATASET_URL = "https://ibm.box.com/shared/static/cr1dmse8ehk1ywanxws12gkbf30tsgp5.zip"
DATASET_FILE_NAME = "data.zip"
MODEL_NAME = "fingerprint-classification"

MINIO_URL = "minio-service.kubeflow:9000"
MINIO_USER = "minio"
MINIO_PASS = "minio123"

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
    NAMESPACE = f.read()
NAMESPACE

'user-example-com'

In [16]:
import kfp
import kfp.components as comp
from typing import NamedTuple
import kfp.dsl as dsl

In [17]:
client = kfp.Client()

# Pipeline
## 1.1 Load Dataset
The first component download the data and extracts it from a zip file. 

In [18]:
download_and_extract_comp = comp.load_component_from_url(
    DOWNLOAD_AND_EXTRACT_COMPONENT_URL
)

## 1.2 Preprocessing
In the second component all the preprocessing is done before the data can be used to train the model. The data scientist has to decide which steps qualify as preprocessing steps and incorporates the code pieces into this component. In this example, the various image files are loaded, concatenated and then the train and test split is performed.

Besides the preprocessing code, the component follows a clear logic where **Input** and **Output paths** are defined at the top, **packages & modules** are imported, **data** is imported, and after all the relevant code is inserted the data gets saved to a **new data directory** and the component receives a **base image** that contains all the relevant packages needed to run the code inside the component. This logic stays the same for every subsequent component. 

In [19]:
def preprocess_data(
    data_dir: comp.InputPath(str),
    prep_data_dir: comp.OutputPath(str)
):
    from sklearn.model_selection import train_test_split
    import numpy as np
    import os

    x_real = np.load(f'{data_dir}/x_real.npz')['data']
    y_real = np.load(f'{data_dir}/y_real.npy')
    x_easy = np.load(f'{data_dir}/x_easy.npz')['data']
    y_easy = np.load(f'{data_dir}/y_easy.npy')
    x_medium = np.load(f'{data_dir}/x_medium.npz')['data']
    y_medium = np.load(f'{data_dir}/y_medium.npy')
    x_hard = np.load(f'{data_dir}/x_hard.npz')['data']
    y_hard = np.load(f'{data_dir}/y_hard.npy')
    
    x_data = np.concatenate([x_easy, x_medium, x_hard], axis=0)
    label_data = np.concatenate([y_easy, y_medium, y_hard], axis=0)

    x_train, x_val, label_train, label_val = train_test_split(x_data, label_data, test_size=0.1)

    print(x_data.shape, label_data.shape)
    print(x_train.shape, label_train.shape)
    print(x_val.shape, label_val.shape)
    print(len(x_real), len(y_real))
    
    if not os.path.exists(prep_data_dir):
        os.makedirs(prep_data_dir)
            
    np.savez(f'/{prep_data_dir}/train_data.npz', x_train, laWhatWhatbel_train)
    np.savez(f'/{prep_data_dir}/val_data.npz', x_val, label_val)
    np.savez(f'/{prep_data_dir}/real_data.npz', x_real, y_real)
    
    
preprocess_data_comp = kfp.components.create_component_from_func(
    func=preprocess_data,
    output_component_file='prep_data_component.yaml',
    base_image='quay.io/mgiessing/kubeflow-component-data-prep:latest',
)

## 1.3 Train the model
In this component the model is trained and then it gets saved to the **model directory**. Before the model is trained the data is first augmented. Since the **data augmentation** has a direct effect on the training data it was decided to perform the data augmentation in this component. The data augmentation is performed on both the training and validation data. 

In [20]:
def train_model(
    prep_data_dir: comp.InputPath(str),
    model_dir: comp.OutputPath(str)
):
    """Uses transfer learning for 5 epochs on a prepared dataset. Once trained, the model is persisted to `model_dir`."""

    import os, random
    import numpy as np
    import tensorflow.keras as keras
    from sklearn.utils import shuffle
    from tensorflow.keras import Sequential
    from tensorflow.keras.models import Model
    from tensorflow.keras.applications import InceptionV3
    from tensorflow.keras import layers
    from tensorflow.keras.layers import Dense, Dropout, GlobalAveragePooling2D, BatchNormalization
    from tensorflow.keras.callbacks import ReduceLROnPlateau, ModelCheckpoint, EarlyStopping
    from imgaug import augmenters as iaa


    train_data = np.load(f'{prep_data_dir}/train_data.npz')
    x_train = train_data[train_data.files[0]]
    label_train = train_data[train_data.files[1]]
    
    val_data = np.load(f'{prep_data_dir}/val_data.npz')
    x_val = val_data[val_data.files[0]]
    label_val = val_data[val_data.files[1]]
    
    real_data = np.load(f'{prep_data_dir}/real_data.npz')
    x_real = real_data[real_data.files[0]]
    y_real = real_data[real_data.files[1]]
    
    label_real_dict = {}
    for i, y in enumerate(y_real):
        key = y.astype(str)
        key = ''.join(key).zfill(6)

        label_real_dict[key] = i
    
    class DataGenerator(keras.utils.Sequence):
        def __init__(self, x, label, x_real, label_real_dict, batch_size=16, shuffle=True):
            'Initialization'
            self.x = x
            self.label = label
            self.x_real = x_real
            self.label_real_dict = label_real_dict

            self.batch_size = batch_size
            self.shuffle = shuffle
            self.on_epoch_end()

        def __len__(self):
            'Denotes the number of batches per epoch'
            return int(np.floor(len(self.x) / self.batch_size))

        def __getitem__(self, index):
            'Generate one batch of data'
            # Generate indexes of the batch
            x1_batch = self.x[index*self.batch_size:(index+1)*self.batch_size]
            label_batch = self.label[index*self.batch_size:(index+1)*self.batch_size]

            x2_batch = np.empty((self.batch_size, 90, 90, 1), dtype=np.float32)
            y_batch = np.zeros((self.batch_size, 1), dtype=np.float32)

            # augmentation
            if self.shuffle:
                seq = iaa.Sequential([
                    iaa.GaussianBlur(sigma=(0, 0.5)),
                    iaa.Affine(
                        scale={"x": (0.9, 1.1), "y": (0.9, 1.1)},
                        translate_percent={"x": (-0.1, 0.1), "y": (-0.1, 0.1)},
                        rotate=(-30, 30),
                        order=[0, 1],
                        cval=255
                    )
                ], random_order=True)

                x1_batch = seq.augment_images(x1_batch)

            # pick matched images(label 1.0) and unmatched images(label 0.0) and put together in batch
            # matched images must be all same, [subject_id(3), gender(1), left_right(1), finger(1)], e.g) 034010
            for i, l in enumerate(label_batch):
                match_key = l.astype(str)
                match_key = ''.join(match_key).zfill(6)

                if random.random() > 0.5:
                    # put matched image
                    x2_batch[i] = self.x_real[self.label_real_dict[match_key]]
                    y_batch[i] = 1.
                else:
                    # put unmatched image
                    while True:
                        unmatch_key, unmatch_idx = random.choice(list(self.label_real_dict.items()))

                        if unmatch_key != match_key:
                            break

                    x2_batch[i] = self.x_real[unmatch_idx]
                    y_batch[i] = 0.

            return [x1_batch.astype(np.float32) / 255., x2_batch.astype(np.float32) / 255.], y_batch

        def on_epoch_end(self):
            if self.shuffle == True:
                self.x, self.label = shuffle(self.x, self.label)

    train_gen = DataGenerator(x_train, label_train, x_real, label_real_dict, shuffle=False)
    val_gen = DataGenerator(x_val, label_val, x_real, label_real_dict, shuffle=False)
    
    
    x1 = layers.Input(shape=(90, 90, 1))
    x2 = layers.Input(shape=(90, 90, 1))

    # share weights both inputs
    inputs = layers.Input(shape=(90, 90, 1))

    feature = layers.Conv2D(32, kernel_size=3, padding='same', activation='relu')(inputs)
    feature = layers.MaxPooling2D(pool_size=2)(feature)

    feature = layers.Conv2D(32, kernel_size=3, padding='same', activation='relu')(feature)
    feature = layers.MaxPooling2D(pool_size=2)(feature)

    feature_model = Model(inputs=inputs, outputs=feature)

    # 2 feature models that sharing weights
    x1_net = feature_model(x1)
    x2_net = feature_model(x2)

    # subtract features
    net = layers.Subtract()([x1_net, x2_net])

    net = layers.Conv2D(32, kernel_size=3, padding='same', activation='relu')(net)
    net = layers.MaxPooling2D(pool_size=2)(net)

    net = layers.Flatten()(net)

    net = layers.Dense(64, activation='relu')(net)

    net = layers.Dense(1, activation='sigmoid')(net)

    model = Model(inputs=[x1, x2], outputs=net)

    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['acc'])

    model.summary()
    
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
        
    history = model.fit_generator(train_gen, epochs=3, validation_data=val_gen)
    
    model.save(model_dir)
    
train_model_comp = kfp.components.create_component_from_func(
    func=train_model,
    output_component_file='train_model_component.yaml',
    base_image='quay.io/sabrinakopecki/imageaugmenttraintestplit:1.3'
)

## 1.4 Model evaluation
This component does the evaluation of the model. The necessary packages and data from previously created directories are loaded.

In [22]:
def evaluate_model(
    prep_data_dir: comp.InputPath(str),
    model_dir: comp.InputPath(str),
):
    """Loads a saved model from file and uses a pre-downloaded dataset for evaluation.
    Model metrics are persisted to `/mlpipeline-metrics.json` for Kubeflow Pipelines
    metadata."""
    
    import json, random
    import numpy as np
    import tensorflow as tf
    from sklearn.utils import shuffle
    from imgaug import augmenters as iaa
    from collections import namedtuple

    val_data = np.load(f'{prep_data_dir}/val_data.npz')
    x_val = val_data[val_data.files[0]]
    label_val = val_data[val_data.files[1]]

    real_data = np.load(f'{prep_data_dir}/real_data.npz')
    x_real = real_data[real_data.files[0]]
    y_real = real_data[real_data.files[1]]
    
    model = tf.keras.models.load_model(model_dir)

    label_real_dict = {}
    for i, y in enumerate(y_real):
        key = y.astype(str)
        key = ''.join(key).zfill(6)
        label_real_dict[key] = i
    
    # new user fingerprint input
    random_idx = random.randint(0, len(x_val))

    random_img = x_val[random_idx]
    random_label = label_val[random_idx]

    seq = iaa.Sequential([
        iaa.GaussianBlur(sigma=(0, 0.5)),
        iaa.Affine(
            scale={"x": (0.9, 1.1), "y": (0.9, 1.1)},
            translate_percent={"x": (-0.1, 0.1), "y": (-0.1, 0.1)},
            rotate=(-30, 30),
            order=[0, 1],
            cval=255
        )
    ], random_order=True)

    random_img = seq.augment_image(random_img).reshape((1, 90, 90, 1)).astype(np.float32) / 255.

    # matched image
    match_key = random_label.astype(str)
    match_key = ''.join(match_key).zfill(6)

    print(len(x_real), len(label_real_dict))
    
    rx = x_real[label_real_dict[match_key]].reshape((1, 90, 90, 1)).astype(np.float32) / 255.
    ry = y_real[label_real_dict[match_key]]

    pred_rx = model.predict([random_img, rx])

    # unmatched image
    unmatch_key, unmatch_idx = random.choice(list(label_real_dict.items()))

    ux = x_real[unmatch_idx].reshape((1, 90, 90, 1)).astype(np.float32) / 255.
    uy = y_real[unmatch_idx]

    pred_ux = model.predict([random_img, ux])
    
    print(pred_ux)

evaluate_model_comp = kfp.components.create_component_from_func(
    func=evaluate_model,
    output_component_file='evaluate_model_component.yaml',
    base_image='quay.io/sabrinakopecki/imageaugmenttraintestplit:1.3'
)

## 1.5 Convert model to ONNX (by reusing a Kubeflow component)

In [24]:
convert_model_to_onnx_comp = comp.load_component_from_url(
    CONVERT_MODEL_TO_ONNX_COMPONENT_URL
)

## 1.6 Upload model to MinIO artifact store (by reusing a Kubeflow component)

In [26]:
upload_model_comp = comp.load_component_from_url(
    UPLOAD_MODEL_COMPONENT_URL
)

## 1.7 Deploy the model using KServe (by reusing a Kubeflow component)

In [28]:
deploy_model_with_kserve_comp = comp.load_component_from_url(
    DEPLOY_MODEL_WITH_KSERVE_COMPONENT_URL
)

## 2 Pipeline
After all the components have been specified, the pipeline is defined using the **@dsl.pipeline** decorator. The pipeline determines the succession of components to run and which parameters to pass between them. 

In [30]:
@dsl.pipeline(
  name='Fingerprint classification pipeline',
  description='fingerprint pipeline that matches images of fingerprints'
)
def fingerprint_pipeline(dataset_url: str,
                    dataset_file_name: str = "data.zip",
                    data_dir: str = "/train/data",
                    prep_data_dir: str = "/train/prep_data",
                    model_dir: str = "/train/model",
                    model_name: str = "fingerprint-recognition",
                    minio_url: str = MINIO_URL,
                    minio_user: str = MINIO_USER,
                    minio_pass: str = MINIO_PASS):
    download_and_extract_task = download_and_extract_comp(
        url=dataset_url,
        file_name=dataset_file_name
    )

    preprocess_data_task = preprocess_data_comp(
        download_and_extract_task.outputs['data_path']
    )

    train_model_task = train_model_comp(
        preprocess_data_task.output
    ).set_gpu_limit(1)
    
    evaluate_model_task = evaluate_model_comp(
        preprocess_data_task.output,
        train_model_task.output
    ).set_gpu_limit(1)

    convert_model_to_onnx_task = convert_model_to_onnx_comp(
        train_model_task.output
    )

    upload_model_task = upload_model_comp(
        convert_model_to_onnx_task.output,
        minio_url,
        minio_user,
        minio_pass,
        model_name=model_name
    )

    deploy_model_with_kserve_task = deploy_model_with_kserve_comp(
        model_name=model_name
    )

    deploy_model_with_kserve_task.after(upload_model_task)

## 3 Run the pipline within an experiment
After defining the pipeline arguments the pipeline run is executed. Click on *Run details* which will appear below the cell and view the run of the pipeline inside the Kubeflow Pipelines UI opening in the browser.

In [32]:
# Specify argument values for your pipeline run.
arguments = {
    'dataset_url': DATASET_URL,
    'dataset_file_name': DATASET_FILE_NAME,
    'data_dir': '/train/data',
    'prep_data_dir': '/train/prep_data',
    'model_dir': '/train/model',
    'model_name': MODEL_NAME,
    'minio_url': MINIO_URL,
    'minio_user': MINIO_USER,
    'minio_pass': MINIO_PASS
}

client.create_run_from_pipeline_func(
    fingerprint_pipeline,
    arguments=arguments,
    namespace=NAMESPACE
)

RunPipelineResult(run_id=1eca175a-2d54-4dff-942f-d2c55c143554)