# KubeFlow Pipelines :  Pytorch Cifar10 Image classification

This notebook shows PyTorch CIFAR10 end-to-end  classification example using Kubeflow Pipelines. 

An example notebook that demonstrates how to:

* Get different tasks needed for the pipeline
* Create a Kubeflow pipeline
* Include Pytorch KFP components to preprocess, train, visualize and deploy the model in the pipeline
* Submit a job for execution
* Query(prediction and explain) the final deployed model


## import the necessary packages

In [1]:
import kfp
import json
import os
from kfp.onprem import use_k8s_secret
from kfp import components
from kfp.components import load_component_from_file, load_component_from_url
from kfp import dsl
from kfp import compiler
import kubernetes as k8s
import numpy as np
import logging

kfp.__version__

'1.8.22'

## Update values for the ingress gateway and auth session

In [2]:
INGRESS_GATEWAY='http://istio-ingressgateway.istio-system.svc.cluster.local'
AUTH="MTY5ODcyODg5OXxOd3dBTkZoYVdsRXlUVGMyVERVMlJVWlJTRkZPVjFCRldEZEdSVVJHTWpSVVJGYzNTbGMzUzFoVFVrUlZVbFJEVEVWRlFrWTNXbEU9fL1Gk8j3bvLBb_SJUWKInoHu4ZlWCBZhLfkYO6l_uwp0"
NAMESPACE="kubeflow-user-example-com"
COOKIE="authservice_session="+AUTH
EXPERIMENT="Default"

## Set  the Log bucket and Tensorboard Image

In [3]:
MINIO_ENDPOINT="http://minio-service.kubeflow:9000"
LOG_BUCKET="mlpipeline"
TENSORBOARD_IMAGE="public.ecr.aws/pytorch-samples/tboard:latest"

## Set the client and create the experiment

In [4]:
client = kfp.Client(host=INGRESS_GATEWAY+"/pipeline", cookies=COOKIE)

In [5]:
client.create_experiment(EXPERIMENT)
experiments = client.list_experiments(namespace=NAMESPACE)
my_experiment = experiments.experiments[0]
my_experiment

{'created_at': datetime.datetime(2023, 10, 31, 5, 13, 13, tzinfo=tzlocal()),
 'description': None,
 'id': '4adf1265-22a3-4aba-9f2a-f96185317c65',
 'name': 'Default',
 'resource_references': [{'key': {'id': 'kubeflow-user-example-com',
                                  'type': 'NAMESPACE'},
                          'name': None,
                          'relationship': 'OWNER'}],
 'storage_state': 'STORAGESTATE_AVAILABLE'}

## Set  the Inference parameters

In [6]:
DEPLOY_NAME="torchserve"
MODEL_NAME="cifar10"
ISVC_NAME=DEPLOY_NAME+"."+NAMESPACE+"."+"example.com"
INPUT_REQUEST="input.json"

## Load the the components yaml files for setting up the components

In [60]:
! sed -i 's/:task4/:task5/' utils/template_mapping.json
! rm -rf yamls

In [61]:
! python utils/generate_templates.py utils/template_mapping.json

Processing deploy_component.yaml
Processing train_component.yaml
Processing tensorboard_component.yaml
Processing minio_component.yaml
Processing prediction_component.yaml
Processing preprocess_component.yaml


In [62]:
prepare_tensorboard_op = load_component_from_file(
    "yaml/tensorboard_component.yaml"
    )

prep_op = components.load_component_from_file(
    "yaml/preprocess_component.yaml"
    )

train_op = components.load_component_from_file(
    "yaml/train_component.yaml"
    )

deploy_op = load_component_from_file(
    "yaml/deploy_component.yaml"
    )

pred_op = load_component_from_file(
    "yaml/prediction_component.yaml"
    )

minio_op = components.load_component_from_file(
    "yaml/minio_component.yaml"
    )


## Define the pipeline

In [63]:

@dsl.pipeline(
    name="Training Cifar10 pipeline", description="Cifar 10 dataset pipeline"
)
def pytorch_cifar10( # pylint: disable=too-many-arguments
    minio_endpoint=MINIO_ENDPOINT,
    log_bucket=LOG_BUCKET,
    log_dir=f"tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}",
    mar_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/model-store",
    config_prop_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/config",
    model_uri=f"s3://mlpipeline/mar/{dsl.RUN_ID_PLACEHOLDER}",
    tf_image=TENSORBOARD_IMAGE,
    deploy=DEPLOY_NAME,
    isvc_name=ISVC_NAME,
    model=MODEL_NAME,
    namespace=NAMESPACE,
    confusion_matrix_log_dir=f"confusion_matrix/{dsl.RUN_ID_PLACEHOLDER}/",
    checkpoint_dir="checkpoint_dir/cifar10",
    input_req=INPUT_REQUEST,
    cookie=COOKIE,
    ingress_gateway=INGRESS_GATEWAY,
):
    volume_train = dsl.PipelineVolume(volume=k8s.client.V1Volume(
        name="shm",
        empty_dir=k8s.client.V1EmptyDirVolumeSource(medium='Memory')))
    
    #volume_dep = dsl.PipelineVolume(volume=k8s.client.V1Volume(
    #    name="shm",
    #    empty_dir=k8s.client.V1EmptyDirVolumeSource(medium='Memory')))
    
    def sleep_op(seconds):
        """Sleep for a while."""
        return dsl.ContainerOp(
            name="Sleep " + str(seconds) + " seconds",
            image="python:alpine3.6",
            command=["sh", "-c"],
            arguments=[
                'python -c "import time; time.sleep($0)"',
                str(seconds)
            ],
        )

    """This method defines the pipeline tasks and operations"""
    pod_template_spec = json.dumps({
        "spec": {
            "containers": [{
                "env": [
                    {
                        "name": "AWS_ACCESS_KEY_ID",
                        "valueFrom": {
                            "secretKeyRef": {
                                "name": "mlpipeline-minio-artifact",
                                "key": "accesskey",
                            }
                        },
                    },
                    {
                        "name": "AWS_SECRET_ACCESS_KEY",
                        "valueFrom": {
                            "secretKeyRef": {
                                "name": "mlpipeline-minio-artifact",
                                "key": "secretkey",
                            }
                        },
                    },
                    {
                        "name": "AWS_REGION",
                        "value": "minio"
                    },
                    {
                        "name": "S3_ENDPOINT",
                        "value": f"{minio_endpoint}",
                    },
                    {
                        "name": "S3_USE_HTTPS",
                        "value": "0"
                    },
                    {
                        "name": "S3_VERIFY_SSL",
                        "value": "0"
                    },
                ]
            }]
        }
    })

    prepare_tb_task = prepare_tensorboard_op(
        log_dir_uri=f"s3://{log_bucket}/{log_dir}",
        image=tf_image,
        pod_template_spec=pod_template_spec,
    ).set_display_name("Visualization")

    prep_task = (
        prep_op().after(prepare_tb_task
                       ).set_display_name("Preprocess & Transform")
    )
    confusion_matrix_url = f"minio://{log_bucket}/{confusion_matrix_log_dir}"
    script_args = f"model_name=model.pth," \
                  f"confusion_matrix_url={confusion_matrix_url}"
    # For GPU, set number of devices and strategy type
    ptl_args = f"max_epochs=1, profiler=pytorch, accelerator=auto"
    train_task = (
        train_op(
            input_data=prep_task.outputs["output_data"],
            script_args=script_args,
            ptl_arguments=ptl_args
        ).add_pvolumes({'/dev/shm': volume_train})
        .after(prep_task).set_display_name("Training")
        # For allocating resources, uncomment below lines
        .set_memory_request('3000M')
        .set_memory_limit('3200M')
        .set_cpu_request('3000m')
        .set_cpu_limit('4000m')
        
        # For GPU uncomment below line and set GPU limit and node selector
        # .set_gpu_limit(1).add_node_selector_constraint('cloud.google.com/gke-accelerator','nvidia-tesla-p4')
    )


    (
        minio_op(
            bucket_name="mlpipeline",
            folder_name=log_dir,
            input_path=train_task.outputs["tensorboard_root"],
            filename="",
        ).after(train_task).set_display_name("Tensorboard Events Pusher")
    )

    (
        minio_op(
            bucket_name="mlpipeline",
            folder_name=checkpoint_dir,
            input_path=train_task.outputs["checkpoint_dir"],
            filename="",
        ).after(train_task).set_display_name("checkpoint_dir Pusher")
    )

    minio_mar_upload = (
        minio_op(
            bucket_name="mlpipeline",
            folder_name=mar_path,
            input_path=train_task.outputs["checkpoint_dir"],
            filename="cifar10_test.mar",
        ).after(train_task).set_display_name("Mar Pusher")
    )

    (
        minio_op(
            bucket_name="mlpipeline",
            folder_name=config_prop_path,
            input_path=train_task.outputs["checkpoint_dir"],
            filename="config.properties",
        ).after(train_task).set_display_name("Conifg Pusher")
    )

    model_uri = str(model_uri)
    # pylint: disable=unused-variable
    isvc_yaml = """
    apiVersion: "serving.kserve.io/v1beta1"
    kind: "InferenceService"
    metadata:
      name: {}
      namespace: {}
    spec:
      predictor:
        volumes:
        - name: dshm
          emptyDir:
            medium: Memory
            sizeLimit: "1024Mi"
        serviceAccountName: sa
        pytorch:
          protocolVersion: v2
          volumeMounts:
          - mountPath: /dev/shm
            name: dshm
            readOnly: false
          storageUri: {}
          resources:
            requests: 
              cpu: 4
              memory: 8Gi
            limits:
              cpu: 4
              memory: 8Gi
    """.format(deploy, namespace, model_uri)

    # For GPU inference use below yaml with gpu count and accelerator
    gpu_count = "1"
    accelerator = "nvidia-tesla-p4"
    isvc_gpu_yaml = """# pylint: disable=unused-variable
    apiVersion: "serving.kserve.io/v1beta1"
    kind: "InferenceService"
    metadata:
      name: {}
      namespace: {}
    spec:
      predictor:
        serviceAccountName: sa
        pytorch:
          protocolVersion: v2
          storageUri: {}
          resources:
            requests: 
              cpu: 16
              memory: 24Gi
            limits:
              cpu: 16
              memory: 24Gi
              nvidia.com/gpu: {}
          nodeSelector:
            cloud.google.com/gke-accelerator: {}
""".format(deploy, namespace, model_uri, gpu_count, accelerator)
    # Update inferenceservice_yaml for GPU inference
    deploy_task = (
        deploy_op(action="apply", inferenceservice_yaml=isvc_yaml
                  ).after(minio_mar_upload).set_display_name("Deployer"
                )
    )
    # Wait here for model to be loaded in torchserve for inference
    sleep_task = sleep_op(60).after(deploy_task).set_display_name("Sleep")
    # Make Inference request
    # pred_task = (
    #     pred_op(
    #         host_name=isvc_name,
    #         input_request=input_req,
    #         cookie=cookie,
    #         url=ingress_gateway,
    #         model=model,
    #         inference_type="infer",
    #     ).after(sleep_task).set_display_name("Prediction")
    # )
    # (
    #     pred_op(
    #         host_name=isvc_name,
    #         input_request=input_req,
    #         cookie=cookie,
    #         url=ingress_gateway,
    #         model=model,
    #         inference_type="explain",
    #     ).after(pred_task).set_display_name("Explanation")
    # )

    dsl.get_pipeline_conf().add_op_transformer(
        use_k8s_secret(
            secret_name="mlpipeline-minio-artifact",
            k8s_secret_key_to_env={
                "secretkey": "MINIO_SECRET_KEY",
                "accesskey": "MINIO_ACCESS_KEY",
            },
        )
    )


## Compile  the pipeline

In [64]:
compiler.Compiler().compile(pytorch_cifar10, 'pytorch.tar.gz', type_check=True)

## Execute the pipeline

In [106]:
run = client.run_pipeline(my_experiment.id, 'pytorch-cifar10', 'pytorch.tar.gz')

## Wait for inference service below to go to READY True state

In [66]:
!kubectl get isvc $DEPLOY

NAME         URL                                                    READY   PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION                  AGE
torchserve   http://torchserve.kubeflow-user-example-com.emlo.mmg   True           100                              torchserve-predictor-default-00001   37s


## Get  the Inference service name

In [67]:
INFERENCE_SERVICE_LIST = ! kubectl get isvc {DEPLOY_NAME} -n {NAMESPACE} -o json | python3 -c "import sys, json; print(json.load(sys.stdin)['status']['url'])"| tr -d '"' | cut -d "/" -f 3
INFERENCE_SERVICE_NAME = INFERENCE_SERVICE_LIST[0]
INFERENCE_SERVICE_NAME

'torchserve.kubeflow-user-example-com.emlo.mmg'

# See deployed models

In [68]:
!curl \
    -H "Host: torchserve.kubeflow-user-example-com.emlo.mmg" \
    -H "Cookie: $COOKIE" \
    "http://istio-ingressgateway.istio-system.svc.cluster.local/v2/models"

curl: /opt/conda/lib/libcurl.so.4: no version information available (required by curl)
{"models": ["cifar10"]}

##  Use the deployed model for prediction request and save the output into a json

In [98]:
!python utils/tobytes.py img/kitten.png

In [99]:
!echo "$INGRESS_GATEWAY"

http://istio-ingressgateway.istio-system.svc.cluster.local


In [100]:
!curl  \
    -H "Host: $INFERENCE_SERVICE_NAME" \
    -H "Cookie: $COOKIE" \
    "$INGRESS_GATEWAY/v2/models"

curl: /opt/conda/lib/libcurl.so.4: no version information available (required by curl)
{"models": ["cifar10"]}

In [101]:
!curl  \
    -H "Host: $INFERENCE_SERVICE_NAME" \
    -H "Cookie: $COOKIE" \
    "http://istio-ingressgateway.istio-system.svc.cluster.local/v2/models"


curl: /opt/conda/lib/libcurl.so.4: no version information available (required by curl)
{"models": ["cifar10"]}

In [102]:
!curl \
    -H "Host: torchserve.kubeflow-user-example-com.emlo.mmg" \
    -H "Cookie: $COOKIE" \
    "http://istio-ingressgateway.istio-system.svc.cluster.local/v2/models/cifar10/infer" \
    -d @./input.json

curl: /opt/conda/lib/libcurl.so.4: no version information available (required by curl)
{"id": "ec0ca1b5-153d-4e10-b46f-02ce0167faa2", "model_name": "cifar10_test", "model_version": "1", "outputs": [{"name": "predict", "shape": [], "datatype": "BYTES", "data": [{"plane": 0.276592880487442, "truck": 0.25541818141937256, "car": 0.12587128579616547, "bird": 0.08175255358219147, "horse": 0.07523645460605621}]}]}

In [103]:
!curl \
    -H "Host: torchserve.kubeflow-user-example-com.emlo.mmg" \
    -H "Cookie: $COOKIE" \
    "http://istio-ingressgateway.istio-system.svc.cluster.local/v2/models/cifar10/infer" \
    -d @./img/kitten.json

curl: /opt/conda/lib/libcurl.so.4: no version information available (required by curl)
{"id": "293200f1-038e-49b8-88bf-62165f7ed0b3", "model_name": "cifar10_test", "model_version": "1", "outputs": [{"name": "predict", "shape": [], "datatype": "BYTES", "data": [{"plane": 0.276592880487442, "truck": 0.25541818141937256, "car": 0.12587128579616547, "bird": 0.08175255358219147, "horse": 0.07523645460605621}]}]}

In [105]:
import os
import requests

headers = {
    'Host': f"{INFERENCE_SERVICE_NAME}",
    'Cookie': f"{COOKIE}",
    'Content-Type': 'application/x-www-form-urlencoded',
}

URL = 'http://istio-ingressgateway.istio-system.svc.cluster.local/v2/models/cifar10/infer'
with open('./img/kitten.json') as f:
    data = f.read().replace('\n', '').replace('\r', '').encode()

response = requests.post(
    URL,
    headers=headers,
    data=data,
)

print(response.text)

{"id": "b75d69fb-8df3-4c67-b575-2868d83fc2de", "model_name": "cifar10_test", "model_version": "1", "outputs": [{"name": "predict", "shape": [], "datatype": "BYTES", "data": [{"plane": 0.276592880487442, "truck": 0.25541818141937256, "car": 0.12587128579616547, "bird": 0.08175255358219147, "horse": 0.07523645460605621}]}]}


In [84]:
import requests
import json

# URL = 'http://' + f"{INGRESS_GATEWAY}" + '/v2/models/' + f"{MODEL_NAME}" + '/infer'
URL = 'http://istio-ingressgateway.istio-system.svc.cluster.local/v2/models/cifar10/infer'

with open('./img/kitten.json') as f:
    payload = f.read().replace('\n', '').replace('\r', '').encode()
    
    
headers = {
    'Host': f"{INFERENCE_SERVICE_NAME}",
    'Cookie': f"{COOKIE}",
    'Content-Type': 'application/json'
}

response = requests.request("POST", URL, headers=headers, data=payload)

print(response.text)


{"id": "de07c894-c8ac-4a43-b313-6a8e177488b8", "model_name": "cifar10_test", "model_version": "1", "outputs": [{"name": "predict", "shape": [], "datatype": "BYTES", "data": [{"plane": 0.276592880487442, "truck": 0.25541818141937256, "car": 0.12587128579616547, "bird": 0.08175255358219147, "horse": 0.07523645460605621}]}]}
