# Imports

In [1]:
import kfp
import kfp.components as components
import requests
import kfp.dsl as dsl

from typing import NamedTuple

In [2]:
base_image = "public.ecr.aws/x6u1q5c1/kflow/pytorch:cpu-1.13.1"

# Steps

## Get Data

In [3]:
def get_data():
    """
    Function to get dataset and load it to minio bucket
    """
    print("initializing")
    from minio import Minio
    import urllib.request
    
    print("getting data")
    urllib.request.urlretrieve("http://*/intel.zip", "/tmp/intel.zip")
    
    print("uploading data")
    minio_client = Minio(
        "minio-service.kubeflow.svc.cluster.local:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    if minio_client.bucket_exists("pytorch")==False:
        print('creating bucket')
        minio_client.make_bucket("pytorch")
    minio_bucket = "pytorch"
 
    minio_client.fput_object(minio_bucket,"intel.zip","/tmp/intel.zip")
    
    dataset_version = "1.0"


## Preprocess Data

In [4]:
def preprocess_data() -> NamedTuple('Outputs', [('class_labels', str),('total_count', int),('train_count', int),('test_count', int)]):
    """
    Preprocessing the data for model building
    """
    print("preprocessing data")
    
    from minio import Minio
    import numpy as np
    from zipfile import ZipFile
    from pathlib import Path
    from collections import Counter
    import json
    import shutil
    from sklearn.model_selection import train_test_split
    from collections import namedtuple

    def write_dataset(image_paths, output_dir):
        for img_path in image_paths:
            Path(output_dir / img_path.parent.stem).mkdir(parents=True, exist_ok=True)
            shutil.copyfile(img_path, output_dir / img_path.parent.stem / img_path.name)

    minio_client = Minio(
        "minio-service.kubeflow.svc.cluster.local:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "pytorch"
    
    # load data from minio
    print("fetching data")
    minio_client.fget_object(minio_bucket,"intel.zip","/tmp/intel.zip")    
    
    print("extracting data")
    with ZipFile("/tmp/intel.zip", 'r') as zObject:
        zObject.extractall(path="/tmp/")
        
    dataset_full = list(Path("/tmp/intel").glob("*/*.jpg"))
    labels = [x.parent.stem for x in dataset_full]

    d_train, d_test = train_test_split(dataset_full, stratify=labels)

    print("writing data splits")
    write_dataset(d_train, Path("/tmp/dataset/train"))
    write_dataset(d_test, Path("/tmp/dataset/test"))
    
    print("compressing data splits")
    shutil.make_archive("/tmp/train", 'zip', "/tmp/dataset/train")
    shutil.make_archive("/tmp/test", 'zip', "/tmp/dataset/test")
    
    # save data from minio
    print("uploading data splits")
    minio_client.fput_object(minio_bucket,"train.zip","/tmp/train.zip")
    minio_client.fput_object(minio_bucket,"test.zip","/tmp/test.zip")
    
    divmod_output = namedtuple('Outputs', ['class_labels', 'total_count', 'train_count', 'test_count'])
    return [', '.join([str(elem) for elem in sorted(set(labels))]), len(dataset_full), len(d_train), len(d_test)]

## Model Train

In [5]:
from minio import Minio
minio_client = Minio(
        "minio-service.kubeflow.svc.cluster.local:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
minio_bucket = "pytorch"
minio_client.make_bucket("pytorch")

In [6]:
!git clone https://github.com/mmg10/ptkflow
minio_client.fput_object("pytorch","traincode.zip","./ptkflow/traincode.zip")
minio_client.fput_object("pytorch","convertcode.zip","./ptkflow/convertcode.zip")

Cloning into 'ptkflow'...
remote: Enumerating objects: 4, done.[K
remote: Counting objects: 100% (4/4), done.[K
remote: Compressing objects: 100% (4/4), done.[K
remote: Total 4 (delta 0), reused 4 (delta 0), pack-reused 0[K
Unpacking objects: 100% (4/4), 45.38 KiB | 5.04 MiB/s, done.


<minio.helpers.ObjectWriteResult at 0x7fcd27307f40>

In [7]:
def model_training():
    """
    Trains the model
    """

    from minio import Minio
    import json
    import shutil
    import os
    import sys
    
    os.environ['TORCH_HOME'] = '/tmp/models/'

    minio_client = Minio(
        "minio-service.kubeflow.svc.cluster.local:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "pytorch"

    print("getting data")

    minio_client.fget_object(minio_bucket,"test.zip","/tmp/test.zip")
    shutil.unpack_archive("/tmp/test.zip", "/tmp/test/", "zip")
    
    print("getting code")
    minio_client.fget_object(minio_bucket,"traincode.zip","/tmp/traincode.zip")
    shutil.unpack_archive("/tmp/traincode.zip", "/tmp/code/", "zip")
    
    print("training model")
    sys.path.insert(0, '/tmp/code')
    from train import main
    main()

    print("uploading model")
    
    minio_client.fput_object(minio_bucket,"model.pt","/tmp/model.pt")

    print("Saved model to minIO")
    

## Model Generation

In [18]:
def model_generation():
    """
    This can be used to convert model from one format to another 
    Here, we are creating a MAR file for serving
    """


    from minio import Minio
    import json
    import shutil
    import os
    import subprocess
    
    os.environ['TORCH_HOME'] = '/tmp/models/'

    minio_client = Minio(
        "minio-service.kubeflow.svc.cluster.local:9000",
        access_key="minio",
        secret_key="minio123",
        secure=False
    )
    minio_bucket = "pytorch"

    print("getting model")

    minio_client.fget_object(minio_bucket,"model.pt","/tmp/model.pt")
    
    print("getting code")
    minio_client.fget_object(minio_bucket,"convertcode.zip","/tmp/convertcode.zip")
    shutil.unpack_archive("/tmp/convertcode.zip", "/tmp/code/", "zip")
    
    print("converting model")
    bashCommand = "torch-model-archiver --model-name cifar34 --version 1.0 --serialized-file /tmp/model.pt --handler /tmp/code/ts/torch_handler/cifar34_handler.py --extra-files /tmp/code/index_to_name.json --export-path /tmp/"
    process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE)
    output, error = process.communicate()
    
    print("uploading model")
    minio_client.fput_object(minio_bucket,"cifar34/config/config.properties","/tmp/code/config.properties")
    minio_client.fput_object(minio_bucket,"cifar34/model-store/cifar34.mar","/tmp/cifar34.mar")

    print("Saved model to minIO")
    

## Model Serve

In [19]:
def model_serving():
    """
    Create kserve instance
    """
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1TFServingSpec
    from datetime import datetime

    namespace = utils.get_default_target_namespace()

    name='cifar34'
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    isvc = V1beta1InferenceService(api_version=api_version,
                                   kind=constants.KSERVE_KIND,
                                   metadata=client.V1ObjectMeta(
                                       name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                                   spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       service_account_name="sa-minio-kserve",
                                       pytorch=(V1beta1TFServingSpec(
                                           storage_uri="s3://pytorch/cifar34/"))))
    )

    KServe = KServeClient()
    KServe.create(isvc)


## Create Pipeline

## Defining Components

In [20]:
comp_get_data = components.create_component_from_func(
    func=get_data,
    base_image=base_image)

In [21]:
comp_preprocess_data = components.create_component_from_func(
    func=preprocess_data,
    base_image=base_image)

In [22]:
comp_model_training = components.create_component_from_func(
    func=model_training,
    base_image=base_image)

In [23]:
comp_model_generation = components.create_component_from_func(
    func=model_generation,
    base_image=base_image,
    packages_to_install=['torch-model-archiver==0.7.1'])

In [24]:
comp_model_serving = components.create_component_from_func(
    func=model_serving,
    base_image=base_image)

## Defining Pipeline

In [27]:
@dsl.pipeline(
    name='intel-pipeline',
    description='Classify images'
)
def output_test(no_epochs,optimizer):
    step1 = comp_get_data()
    
    step2 = comp_preprocess_data()
    step2.after(step1)
    
    step3 = comp_model_training()
    step3.after(step2)
    
    step4 = comp_model_generation()
    step4.after(step3)

    step5 = comp_model_serving()
    step5.after(step4)
    

# Executing Pipeline

In [28]:
if __name__ == "__main__":
    client = kfp.Client()

    arguments = {
        "no_epochs" : 1,
        "optimizer": "adam"
    }

    run_directly = 1
    
    if (run_directly == 1):
        client.create_run_from_pipeline_func(output_test,arguments=arguments,experiment_name="intel-pt")
    else:
        kfp.compiler.Compiler().compile(pipeline_func=output_test,package_path='output_test.yaml')
        client.upload_pipeline_version(pipeline_package_path='output_test.yaml',pipeline_version_name="0.4",pipeline_name="pipeline test",description="just for testing")

# Inference

In [29]:

from kubernetes import client 
from kserve import KServeClient
from kserve import constants
from kserve import utils
from kserve import V1beta1InferenceService
from kserve import V1beta1InferenceServiceSpec
from kserve import V1beta1PredictorSpec
from kserve import V1beta1SKLearnSpec
import numpy as np
import json
import pandas as pd



In [30]:
!curl http://cifar34.kubeflow-user-example-com.svc.cluster.local/v1/models/cifar34

{"name": "cifar34", "ready": true}

In [35]:
!curl -LO https://github.com/mmg10/ptkflow/raw/main/153.jpg

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 11013  100 11013    0     0  30762      0 --:--:-- --:--:-- --:--:-- 30762


In [32]:
actual_domain = 'http://cifar34.kubeflow-user-example-com.svc.cluster.local'
model_name='cifar34'
service_url = f'{actual_domain}/v1/models/{model_name}:predict'

In [33]:
import requests
import base64

image = open('153.jpg', 'rb') #open binary file in read mode
image_read = image.read()
image_64_encode = base64.b64encode(image_read)
bytes_array = image_64_encode.decode('utf-8')
request = {
  "instances":[
    {
      "data": bytes_array
    }
  ]
}

In [34]:
response = requests.post(service_url, json=request)
print(response.json())

{'predictions': [{'mountain': 0.48141807317733765, 'sea': 0.27487021684646606, 'glacier': 0.1335740089416504, 'buildings': 0.06491722911596298, 'street': 0.0399005189538002}]}
