# This Notebook focuses on how to use dsl.ParallelFor for parallelism in kfp sdk with a tensorflow NN usecase

In [None]:
import kfp
from kfp import dsl
import kfp.components as comp
from kfp.components import func_to_container_op
from typing import NamedTuple

## Loading the mnist data

- This method downloads the mnist dataset and saves them to log_folder path and returning data_folder with updated path
    parameters
    -------------
    log_folder: str
        the folder to save the dataset
    
    return
    ---------
    datadir: str
        The folder path where dataset got saved

In [12]:
def load_data(log_folder: str) -> NamedTuple('Outputs', [('datadir', str)]):

    import os
    os.system("pip install joblib")
    import tensorflow as tf
    import joblib
    mnist = tf.keras.datasets.mnist
    (X_train_full, y_train_full), (X_test, y_test) = mnist.load_data()
    
    if not os.path.isdir(log_folder + '/data'):
        os.makedirs(log_folder + '/data')
    
    data_folder = log_folder + '/data'
    joblib.dump(X_train_full, data_folder + '/X_train_full.pkl')
    joblib.dump(y_train_full, data_folder + '/y_train_full.pkl')
    joblib.dump(X_test, data_folder + '/X_test.pkl')
    joblib.dump(y_test, data_folder + '/y_test.pkl')
    return ([data_folder])

## Split the data

-    This method splits the datset into train and test
     and normalizes the values then saves the data to pkl file and returns data_folder
     
    parameters
    ------------
    data_folder: str
        The path to the folder of the data
    
    returns
    ------------
        path to the folder of the data

In [13]:
def split_data(data_folder: str) -> NamedTuple('Outputs', [('data_folder', str)]):
    """ 

    """
    import os
    os.system("pip install joblib")
    import joblib
    X_train_full = joblib.load(open(data_folder + '/X_train_full.pkl', 'rb'))
    y_train_full = joblib.load(open(data_folder + '/y_train_full.pkl', 'rb'))
    X_test = joblib.load(open(data_folder + '/X_test.pkl', 'rb'))
    X_valid, X_train = X_train_full[:5000] / 255, X_train_full[5000:] / 255
    y_valid, y_train = y_train_full[:5000], y_train_full[5000:]


    X_test = X_test / 255
    
    joblib.dump(X_train, data_folder + '/X_train.pkl')
    joblib.dump(X_valid, data_folder + '/X_valid.pkl')
    joblib.dump(y_train, data_folder + '/y_train.pkl')
    joblib.dump(y_valid, data_folder + '/y_valid.pkl')
    joblib.dump(X_test, data_folder + '/X_test.pkl')
    
    return ([data_folder])

## Building and training the Neural network

-     This method builds and trains the neural network
      and returns the result
    
    parameters
    ---------------
    data_folder: str
        Path to the folder of data
    learning_rate: float
        Learning rate on which model needs to be trained
        
    return
    --------------
    result: list
        A list of value consists of loss value and accuracy

In [19]:
def create_train_model(data_folder: str, learning_rate: float) -> NamedTuple('Output', [('result', list)]):
    """

    """
    import tensorflow as tf
    import os
    os.system("pip install joblib pandas")
    import pandas as pd
    import joblib
    import json
    # a method to generate unique path for logging callbacks
    def get_log_path(data_folder: str):
        import os
        import time
        log_folder = data_folder.split('/')[0]
        unique = time.asctime().replace(" ", "_").replace(":","")
        if not os.path.isdir(log_folder + '/fit'):
            os.makedirs(log_folder + '/fit')
        log_folder = log_folder + '/fit'
        os.makedirs(log_folder + '/' + unique)
        log_path = log_folder + '/' + unique
        print(f"Saving logs at: {log_path}")
        return log_path
    
    X_train = joblib.load(open(data_folder + '/X_train.pkl', 'rb'))
    y_train = joblib.load(open(data_folder + '/y_train.pkl', 'rb'))
    X_valid = joblib.load(open(data_folder + '/X_valid.pkl', 'rb'))
    y_valid = joblib.load(open(data_folder + '/y_valid.pkl', 'rb'))
    X_test = joblib.load(open(data_folder + '/X_test.pkl', 'rb'))
    y_test = joblib.load(open(data_folder + '/y_test.pkl', 'rb'))
    LAYERS = [
            tf.keras.layers.Flatten(input_shape=(28, 28), name='inputLayer'),
            tf.keras.layers.Dense(300, activation='relu', name='hiddenLayer1'),
            tf.keras.layers.Dense(100, activation='relu', name='hiddenLayer2'),
            tf.keras.layers.Dense(10, activation='softmax', name='outputLayer')
            ]
    model_clf = tf.keras.models.Sequential(LAYERS)
    loss_function = 'sparse_categorical_crossentropy'
    optimizers = tf.keras.optimizers.SGD(learning_rate=learning_rate,name='SGD')
    metric = ['accuracy']
    model_clf.compile(loss=loss_function, 
                      optimizer=optimizers, 
                      metrics=metric)
    log_dir = get_log_path(data_folder)
    tensorboard_cb = tf.keras.callbacks.TensorBoard(log_dir=log_dir)
    earlystopping_cb = tf.keras.callbacks.EarlyStopping(patience=5, restore_best_weights=True)
    ckpt_path = data_folder + "/model_ckpt.h5"
    ckpt_cb = tf.keras.callbacks.ModelCheckpoint(ckpt_path, save_best_only=True)
    CALLBACKS_LIST = [tensorboard_cb, earlystopping_cb, ckpt_cb]
    EPOCHS = 20
    VALIDATION = (X_valid, y_valid)

    history = model_clf.fit(X_train, y_train,
                  epochs=EPOCHS, 
                  validation_data=VALIDATION,
                  callbacks=CALLBACKS_LIST)
    
    result = model_clf.evaluate(X_test, y_test)
 
    return ([result])



## Print the result

-   Prints the result from neural network
    parameter
    -----------
    result: list
         A list of value consists of loss value and accuracy

In [20]:
def print_result(result):
    """

    """
    print(result)

## Building the pipeline as required
- Here pipeline is designed components are created from functions, shared volume space is attached to each components

    vop is a volume storage object created using dsl.VolumeOp that can be attacked to different components
    
    func_to_container_op is a function that is used to convert functions to components
    
    add_pvolumes is used to attached the shared volume vop to the components
    
    dsl.ParallelFor is used to run training component parallely with different learning rates, where parallelism controls 
    maximum concurrent components to run at a time.
    

In [21]:
import kfp.dsl as dsl
@dsl.pipeline(
   name='Mnist pipeline',
   description='A pipeline that trains on mnist dataset'
)
def mnist_pipeline():
    log_folder = '/information'
    pvc_name = "mnist-model"

    image = 'tensorflow/tensorflow'
    
    # Creating volume
    vop = dsl.VolumeOp(
        name=pvc_name,
        resource_name="mnist-model",
        size="1Gi",
        modes=dsl.VOLUME_MODE_RWO
    )

    # create componets from functions
    load_data_op = func_to_container_op(
    load_data, base_image=image)
    
    split_data_op = func_to_container_op(
    split_data, base_image=image)
    
    training_op = func_to_container_op(
    create_train_model, 
        base_image=image)
    print_result_op = func_to_container_op(
    print_result, 
        )

    load_task = load_data_op(log_folder).add_pvolumes({ log_folder:vop.volume, })
    split_data_task =split_data_op(load_task.outputs['datadir']).add_pvolumes({ log_folder:vop.volume, })
    
    loop_args = [0.001, 0.01, 0.1, 1] # different values of learing rate
    with dsl.ParallelFor(loop_args, parallelism=10) as item: # parallelism: this controls the number of pod to run parallely
        train_model_task  = training_op(split_data_task.outputs['data_folder'], item).add_pvolumes({ log_folder:vop.volume, })
        print_result_task  = print_result_op(train_model_task.outputs['result']).add_pvolumes({ log_folder:vop.volume, })

## Run the pipeline
- Here we create experiments, compile and finally run the pipeline


In [22]:
import uuid
EXPERIMENT_NAME = 'Mnist'
run_name = 'mnist_training'
arguments ={}
pipeline_func = mnist_pipeline
# pipeline config filename
pipeline_filename = pipeline_func.__name__ + f'{uuid.uuid1()}.pipeline.yaml'
from datetime import datetime
# compiling the pipeline and generating config filename
kfp.compiler.Compiler().compile(pipeline_func, pipeline_filename)
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)
run_name = pipeline_func.__name__ + str(datetime.now().strftime("%d-%m-%Y-%H-%M-%S"))
client.upload_pipeline(pipeline_filename)
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)