### Global Variables

#### make sure to change the global variables as per your requirements!!!

In [27]:
KF_SERVING_COMPONENT = "https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/kfserving/component.yaml"

NAMESPACE = "kubeflow-user-example-com"
BUCKET_NAME = "pranava-data"
EXPERIMENT_NAME = "Hello-World"
MODEL_NAME = "iris1"

BASE_IMAGE = "mesosphere/kubeflow:1.2.0-1.1.0-tensorflow-2.4.0"

COOKIE = "MTY0NjM2NTQwMXxOd3dBTkZrMFFrdFZRakkxUjBaSU1sUTNORVZLVkVRMlZrSkpNa3ROVFVoQk4wdEVOa1kyTjBOSlJESkRTVFpRUjBoRldGRTFSVkU9fHI5F0-aM6Ld33E7yLEYFJ6SW9CHm90PaFteXkRRNZU7"

# DATASET_URL = "https://www.kaggle.com/uciml/iris/download"

### Importing modules

In [28]:
from typing import NamedTuple

import kfp
import kfp.components as components
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath

### Building Pipeline Components

In [29]:
def data_preprocess(input_bucket: str) -> NamedTuple("Preprocess", [('features', str),('target', str)]):
    """
    downloads data from s3 and preprocess the data
    and saves the processed data to s3
    :param input_bucket: Bucket name where the data is present
    :return: returns the preprocessed data file names that are stored in s3
    """
    
    import boto3
    import pandas as pd
    from io import StringIO
    from collections import namedtuple
    
    PreProcessedData = namedtuple("Preprocess", ['features','target'])
    
    s3 = boto3.client('s3')
    s3.download_file(input_bucket, "Iris.csv", "Iris.csv")
    
    def save_data(data, filename):
        """
        uploads dataframes to s3
        :param data: the data to be written to s3
        :param filename: the name of the file where the data should be stored
        """
        
        csv_buffer = StringIO()
        data.to_csv(csv_buffer)
        s3_resource = boto3.resource('s3')
        s3_resource.Object(input_bucket, filename).put(Body=csv_buffer.getvalue())
    
    
    iris = pd.read_csv('Iris.csv')
    
    iris.drop('Id',axis=1,inplace=True)
    
    x = iris.drop(columns='Species')
    y = iris['Species']
    
    save_data(x,"features.csv")
    save_data(y,"target.csv")
    
    return PreProcessedData("features.csv", "target.csv")

In [30]:
def train_model(preprocess:dict, input_bucket:str) -> str:
    """
    downloads the features and target data from s3
    trains the model and uploads to s3
    :param preprocess: a dictionary containing filenames of preprocessed data
    :param input_bucket: Bucket name where the data is present
    :return: returns name of the model that is saved to s3
    """
    
    import boto3
    import pandas as pd
    from sklearn.tree import DecisionTreeClassifier
    from sklearn import tree
    import joblib
    
    s3 = boto3.client('s3')
    def get_data(key:str):
        """
        downloads data from s3
        :param key: key for the filename
        :return: returns the data in a dataframe format
        """
        filename = preprocess[key]
        s3.download_file(
            input_bucket,
            filename, 
            filename
        )  
        return pd.read_csv(filename).iloc[:, 1:]
    
    classifier = DecisionTreeClassifier()
    
    x = get_data("features")
    y = get_data("target")
    
    classifier.fit(x,y)
    
    model_name = "model"
    
    joblib.dump(classifier, model_name)
    
    s3.upload_file(model_name, input_bucket, model_name)
    
    return model_name

In [47]:
def predict_model(bucket: str, model_name: str):
    """
    loads the model and displays the predicted results
    :param input_path: location of model file
    :param model_name: name of the model
    :return: None
    """
    
    import boto3
    import joblib
    
    s3 = boto3.client('s3')
    s3.download_file(bucket, model_name, model_name)
    
    model = joblib.load(model_name)
    
    print(model.predict([["5.1","3.5", "1.4", "0.2"]]))
    print(model.predict([["6.4", "3.2", "4.5", "1.5"]]))
    print(model.predict([["6.7","3.3", "5.7", "2.5"]]))

In [32]:
def download_file(model_volume_op, model_name: str) -> str:
    """
    downloads the model from s3 and attaches to a Persistent Volume
    :param model_name: name of the model
    :return: returns a success message if the model is downloaded, throws an error otherwise
    """
    
    import boto3
    import os
    
    s3 = boto3.client("s3")
    
    if os.path.exists("/mnt/e2e"):
        if not os.path.exists("/mnt/e2e/iris"):
            os.mkdir("/mnt/e2e/iris")
        s3.download_file(
            "pranava-data",
            model_name, 
            "/mnt/e2e/iris/model.joblib"
        )        
    else:
        raise Exception("Volume not Mounted")
    return "downloaded"

In [33]:
def create_kfserving_task(model_name, model_namespace, tfjob_op, model_volume_op):
    """
    creates a kfserve inference
    :param model_name: name of the model
    :param model_namespace: namespace
    """
    inference_service = '''
apiVersion: "serving.kubeflow.org/v1beta1"
kind: "InferenceService"
metadata:
  name: {}
  namespace: {}
  annotations:
    "sidecar.istio.io/inject": "false"
spec:
  predictor:
    sklearn:
      storageUri: "pvc://{}/iris"
'''.format(model_name, model_namespace, str(model_volume_op.outputs["name"]))
    
    return kfservingLauncherOP(action="create", inferenceservice_yaml=inference_service)

In [34]:
def kfserve_predict(model_name: str, namespace: str, cookie: str):
    """
    predicts the model using kfserve generated internal api
    :param model_name: name of the model
    :param namespace: namespace where the run should be executed
    :param cookie: to authenticate
    """
    
    import requests
    import json
    
    authservice_session={'authservice_session': cookie}
    url = "http://{}.{}.svc.cluster.local/v1/models/{}:predict".format(
        model_name, 
        namespace, 
        model_name
    )
    data = {
      "instances": [
        ["5.1","3.5", "1.4", "0.2"],
        ["6.4", "3.2", "4.5", "1.5"],
        ["6.7","3.3", "5.7", "2.5"]
      ]
    }


    res = requests.post(
        url,
        json=data,
        cookies=authservice_session
    )
    print(res.text)

### creating components from the above functions

In [35]:
dataPreprocessOP = components.func_to_container_op(data_preprocess, base_image=BASE_IMAGE)

modelTrainOP = components.func_to_container_op(train_model, base_image=BASE_IMAGE)

predictModelOP = components.func_to_container_op(predict_model, base_image=BASE_IMAGE)

downloadOP = components.func_to_container_op(download_file, base_image=BASE_IMAGE)

kfservingLauncherOP = components.load_component_from_url(KF_SERVING_COMPONENT)

kfservePredictionOP = components.func_to_container_op(kfserve_predict,base_image=BASE_IMAGE)


### defining the pipeline

In [46]:
@dsl.pipeline(
    name="Sample Hello world pipeline",
    description="A sample pipeline to demonstrate multi-step model training, evaluation, export using Iris data classification",
)
def sample_pipeline(
    input_bucket: str = BUCKET_NAME, 
    model_name: str = MODEL_NAME, 
    namespace: str = NAMESPACE
):
    """
    connects the components for a pipeline
    :param input_bucket: Bucket name where the data is present
    :param model_name: Name of the model
    """
    
    preProcess = dataPreprocessOP(input_bucket)
    
    train = modelTrainOP(preProcess.outputs, input_bucket)
    
    predict = predictModelOP(input_bucket, train.output)
    
    model_volume_op = dsl.VolumeOp(
        name="model-volume-demo-lscd",
        resource_name="model-volume-demo-mock-lscd",
        size="1Gi",
        modes=dsl.VOLUME_MODE_RWO
    )
    
    tfjob_op = downloadOP("", train.output).add_pvolumes(
        {"/mnt/e2e":model_volume_op.volume })
    tfjob_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
        
    kf_op = create_kfserving_task(
        model_name, 
        namespace, 
        tfjob_op, 
        model_volume_op
    )
    kf_op.execution_options.caching_strategy.max_cache_staleness = "P0D"
    kf_op.after(tfjob_op)
    
    kfserve_predict = kfservePredictionOP(
        model_name,
        namespace,
        COOKIE
    )
    kfserve_predict.after(kf_op)

### connect to kfp client

In [37]:
client = kfp.Client()

### Create an experiment

In [38]:
experiment = client.create_experiment(
    name=EXPERIMENT_NAME, 
    description="A sample pipeline to demonstrate multi-step model training, evaluation, export using Iris data classification",
    namespace=NAMESPACE
) 

### Create a Run

In [39]:
client.create_run_from_pipeline_func(
    sample_pipeline, 
    arguments={}, 
    run_name="sample-iris-e2e", 
    experiment_name=EXPERIMENT_NAME
)

RunPipelineResult(run_id=c6b32dfc-e184-40d2-a719-3203f3954eae)