# Install relevant libraries


1.   update pip
2.   install kubeflow sdk



In [3]:
!pip install --user --upgrade pip



In [4]:
!pip install kfp --upgrade --user --quiet

In [10]:
pwd

'/home/jovyan/KfaaS/kale'

cd to home directory or directory of choice

In [12]:
cd '/home/jovyan'

/home/jovyan


create folder to store outputs

In [14]:
mkdir 'datastore'

In [15]:
# You may need to restart your notebook kernel after updating the kfp sdk
! pip show kfp

Name: kfp
Version: 1.8.11
Summary: KubeFlow Pipelines SDK
Home-page: https://github.com/kubeflow/pipelines
Author: The Kubeflow Authors
Author-email: 
License: UNKNOWN
Location: /home/jovyan/.local/lib/python3.6/site-packages
Requires: absl-py, click, cloudpickle, dataclasses, Deprecated, docstring-parser, fire, google-api-python-client, google-auth, google-cloud-storage, jsonschema, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, pydantic, PyYAML, requests-toolbelt, strip-hints, tabulate, typer, typing-extensions, uritemplate
Required-by: kubeflow-kale


## Import kubeflow pipeline libraries

In [16]:
import kfp
from kfp import dsl
import kfp.components as comp

In [17]:
# create  directory for outputs.
output_dir = "/home/jovyan/datastore"

## Kubeflow pipeline component creation

step 1

In [18]:
# download data step

def download_data(data_path):
    import zipfile
    import sys
    import subprocess
    subprocess.run([sys.executable, "-m", "pip", "install", "wget"])
    
    import wget
    # download files
    wget.download('https://github.com/josepholaide/KfaaS/blob/main/kale/data/train.csv.zip?raw=true', f'{data_path}/train_csv.zip')
    wget.download('https://github.com/josepholaide/KfaaS/blob/main/kale/data/test.csv.zip?raw=true', f'{data_path}/test_csv.zip')
    
    with zipfile.ZipFile(f"{data_path}/train_csv.zip","r") as zip_ref:
        zip_ref.extractall(data_path)
        
    with zipfile.ZipFile(f"{data_path}/test_csv.zip","r") as zip_ref:
        zip_ref.extractall(data_path)

    
    return(print('Done!'))

In [19]:
download_data(output_dir)

Done!


step 2

In [20]:
# load data

def load_data(data_path,train_data,test_data):
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','scikit-learn'])
    # import Library
    import pickle
    import pandas as pd
    import numpy as np
    from sklearn.preprocessing import LabelEncoder
    from sklearn.preprocessing import OneHotEncoder
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler 

    #importing the data
    # Data Path
    train_data_path = data_path + '/train.csv'
    test_data_path = data_path + '/test.csv'

    # Loading dataset into pandas 
    train_df = pd.read_csv(train_data_path)
    
    # split features and label
    X = train_df.drop('label', axis=1)
    y = train_df.label
    
    # Reshape image in 3 dimensions (height = 28px, width = 28px , channel = 1)
    X = X.values.reshape(-1,28,28,1)

    # Normalize the data
    X = X / 255.0
    
    # split into train and test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)
    
    #Save the train_data as a pickle file to be used by the train component.
    with open(f'{data_path}/{train_data}', 'wb') as f:
        pickle.dump((X_train,  y_train), f)
        
    #Save the test_data as a pickle file to be used by the predict component.
    with open(f'{data_path}/{test_data}', 'wb') as f:
        pickle.dump((X_test,  y_test), f)
    
    return(print('Done!'))

In [21]:
load_data(output_dir,'train_data','test_data')

Done!


step 3

In [24]:
def train(data_path,train_data,model_path):
    import pickle
    # import Library
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','tensorflow'])
    import numpy as np
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras.models import Sequential
    from tensorflow.keras import layers

    #loading the train data
    with open(f'{data_path}/{train_data}', 'rb') as f:
        train_data = pickle.load(f)
        
    # Separate the X_train from y_train.
    X_train, y_train = train_data
    
    #initializing the classifier model with its input, hidden and output layers
    model = Sequential()
    model.add(layers.Conv2D(filters = 56, kernel_size = (5,5), activation ='relu'))
    model.add(layers.Dropout(0.25))
    model.add(layers.Conv2D(filters = 100, kernel_size = (3,3), activation ='relu'))
    model.add(layers.Dropout(0.3))
    model.add(layers.Conv2D(filters = 100, kernel_size = (3,3), activation ='relu'))
    model.add(layers.Dropout(0.3))
    model.add(layers.Flatten())
    model.add(layers.Dense(10, activation = "softmax"))

    #Compiling the classifier model with Stochastic Gradient Desecnt
    model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

    # model fitting
    model.fit(X_train, y_train,
              epochs=2,
              batch_size=64)
 
    #saving the model
    model.save(f'{data_path}/{model_path}')    

In [25]:
train(output_dir,"train_data","model")

Epoch 1/2
Epoch 2/2
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
Instructions for updating:
This property should not be used in TensorFlow 2.0, as updates are applied automatically.
INFO:tensorflow:Assets written to: /home/jovyan/datastore/model/assets


step 4

In [26]:
def predict(data_path,test_data,model):
    import pickle
    import numpy as np
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','tensorflow'])
    from tensorflow import keras
    from tensorflow.keras.models import load_model
    
    #loading the X_test and y_test
    with open(f'{data_path}/{test_data}', 'rb') as f:
        test_data = pickle.load(f)
    # Separate the X_test from y_test.
    X_test, y_test = test_data
    #loading the model
    model = load_model(f'{data_path}/{model}')

    #Evaluate the model and print the results
    test_loss, test_acc = model.evaluate(X_test,  y_test, verbose=0)
    
    #model's prediction on test data
    y_pred = model.predict(X_test)

    # convert predictions to positional indices with max values
    y_pred_class = np.argmax(y_pred,axis = 1)

    #saving the test_loss and test_acc
    with open(f'{data_path}/performance.txt', 'w') as f:
        f.write("Test_loss: {}, Test_accuracy: {} ".format(test_loss,test_acc))
        
    #saving the predictions
    with open(f'{data_path}/results.txt', 'w') as result:
        result.write(" Prediction: {}, Actual: {} ".format(y_pred_class,y_test))
    return(print('Done!'))

In [27]:
predict(output_dir,"test_data","model")

Done!


create kubeflow pipeline components from images

In [28]:
# create light weight components
download_op = comp.create_component_from_func(download_data,base_image="python:3.7.1")
load_op = comp.create_component_from_func(load_data,base_image="python:3.7.1")
train_op = comp.create_component_from_func(train, base_image="tensorflow/tensorflow:latest")
predict_op = comp.create_component_from_func(predict, base_image="tensorflow/tensorflow:latest")

## Kubeflow pipeline creation

In [29]:
# create client that would enable communication with the Pipelines API server 
client = kfp.Client()

In [30]:
# define pipeline
@dsl.pipeline(name="digit recognizer Pipeline", description="Performs Preprocessing, training and prediction of digits")

# Define parameters to be fed into pipeline
def digit_recognize_pipeline(data_path: str,
                             train_data: str,
                             test_data:str,
                             model_path:str):
    
    # Define volume to share data between components.
    volume_op = dsl.VolumeOp(
    name="data_volume",
    resource_name="data-volume",
    size="1Gi",
    modes=dsl.VOLUME_MODE_RWO)

    # Create preprocess components.
    download_container = download_op(data_path).add_pvolumes({data_path: volume_op.volume})
    # Create train component.
    load_container = load_op(data_path,train_data,test_data).add_pvolumes({data_path: download_container.pvolume})
    # Create prediction component.
    train_container = train_op(data_path, train_data, model_path).add_pvolumes({data_path: load_container.pvolume})
    # Create prediction component.
    predict_container = predict_op(data_path, test_data, model_path).add_pvolumes({data_path: train_container.pvolume})
    
    # Print the result of the prediction (step 5)
    result_container = dsl.ContainerOp(
        name="print_prediction",
        image='library/bash:4.4.23',
        pvolumes={data_path: predict_container.pvolume},
        arguments=['cat', f'{data_path}/results.txt']
        )

In [31]:
DATA_PATH = "/mnt"
TRAIN_DATA = "train_data"
TEST_DATA = "test_data"
MODEL_FILE= "model.h5"


pipeline_func = digit_recognize_pipeline

experiment_name = 'digit_recognize_lightweight'
run_name = pipeline_func.__name__ + ' run'

arguments = {"data_path":DATA_PATH,
            "train_data": TRAIN_DATA,
            "test_data": TEST_DATA,
            "model_path":MODEL_FILE}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)


