# Imports

In [1]:
import kfp
import kfp.components as components
import requests
import kfp.dsl as dsl
import subprocess
from datetime import datetime
import shutil
import kubernetes as k8s

from minio import Minio
from minio.error import BucketAlreadyOwnedByYou
    
from typing import NamedTuple

In [None]:
MINIO_URL = "minio-service.kubeflow.svc.cluster.local:9000"
MINIO_USER = "minio"
MINIO_PASS = "minio123"
MINIO_SECURE = False

In [2]:
base_image = "public.ecr.aws/x6u1q5c1/kflow/pytorch:cpu-1.13.1"

# Steps

## Get Data

In [3]:
def get_data(minio_args, bucket_name):
    """
    Function to get dataset and load it to minio bucket
    """
    print("initializing")
    from minio import Minio
    import urllib.request
    
    print("getting data")
    urllib.request.urlretrieve("http://*/intel.zip", "/tmp/intel.zip")
    
    print("uploading data")
    minio_client = Minio(**minio_args)
    if minio_client.bucket_exists(bucket_name)==False:
        print('creating bucket')
        minio_client.make_bucket(bucket_name)
    minio_bucket = bucket_name
 
    minio_client.fput_object(minio_bucket,"intel.zip","/tmp/intel.zip")
    
    dataset_version = "1.0"


## Preprocess Data

In [4]:
def preprocess_data(minio_args, bucket_name) -> NamedTuple('Outputs', [('class_labels', str),('total_count', int),('train_count', int),('test_count', int)]):
    """
    Preprocessing the data for model building
    """
    print("preprocessing data")
    
    from minio import Minio
    import numpy as np
    from zipfile import ZipFile
    from pathlib import Path
    from collections import Counter
    import json
    import shutil
    from sklearn.model_selection import train_test_split
    from collections import namedtuple

    def write_dataset(image_paths, output_dir):
        for img_path in image_paths:
            Path(output_dir / img_path.parent.stem).mkdir(parents=True, exist_ok=True)
            shutil.copyfile(img_path, output_dir / img_path.parent.stem / img_path.name)

    minio_client = Minio(**minio_args)
    minio_bucket = bucket_name
    
    # load data from minio
    print("fetching data")
    minio_client.fget_object(minio_bucket,"intel.zip","/tmp/intel.zip")    
    
    print("extracting data")
    with ZipFile("/tmp/intel.zip", 'r') as zObject:
        zObject.extractall(path="/tmp/")
        
    dataset_full = list(Path("/tmp/intel").glob("*/*.jpg"))
    labels = [x.parent.stem for x in dataset_full]

    d_train, d_test = train_test_split(dataset_full, stratify=labels)

    print("writing data splits")
    write_dataset(d_train, Path("/tmp/dataset/train"))
    write_dataset(d_test, Path("/tmp/dataset/test"))
    
    print("compressing data splits")
    shutil.make_archive("/tmp/train", 'zip', "/tmp/dataset/train")
    shutil.make_archive("/tmp/test", 'zip', "/tmp/dataset/test")
    
    # save data from minio
    print("uploading data splits")
    minio_client.fput_object(minio_bucket,"train.zip","/tmp/train.zip")
    minio_client.fput_object(minio_bucket,"test.zip","/tmp/test.zip")
    
    divmod_output = namedtuple('Outputs', ['class_labels', 'total_count', 'train_count', 'test_count'])
    return [', '.join([str(elem) for elem in sorted(set(labels))]), len(dataset_full), len(d_train), len(d_test)]

## Model Train

In [6]:
# !git clone https://github.com/mmg10/ptkflow
# minio_client.fput_object("pytorch","traincode.zip","./ptkflow/traincode.zip")
# minio_client.fput_object("pytorch","convertcode.zip","./ptkflow/convertcode.zip")

Cloning into 'ptkflow'...
remote: Enumerating objects: 4, done.[K
remote: Counting objects: 100% (4/4), done.[K
remote: Compressing objects: 100% (4/4), done.[K
remote: Total 4 (delta 0), reused 4 (delta 0), pack-reused 0[K
Unpacking objects: 100% (4/4), 45.38 KiB | 5.04 MiB/s, done.


<minio.helpers.ObjectWriteResult at 0x7fcd27307f40>

In [7]:
def model_training(
    no_epochs,
    accelerator,
    minio_args,
    bucket_name
    ) -> NamedTuple('Output', [('mlpipeline_ui_metadata', 'UI_metadata'),('mlpipeline_metrics', 'Metrics')]):
    """
    Trains the model
    """

    from minio import Minio
    import json
    import shutil
    import os
    import sys
    
    os.environ['TORCH_HOME'] = '/tmp/models/'

    minio_client = Minio(**minio_args)
    minio_bucket = bucket_name
    
    def upload_directory(bucket_name, local_path, remote_prefix=""):
        for root, dirs, files in os.walk(local_path):
            for file in files:
                local_file_path = os.path.join(root, file)
                remote_object_name = os.path.join(remote_prefix, os.path.relpath(local_file_path, local_path))
                minio_client.fput_object(bucket_name, remote_object_name, local_file_path)


    print("fetching data")

    minio_client.fget_object(minio_bucket,"data.zip","/tmp/data.zip")
    shutil.unpack_archive("/tmp/data.zip", "/tmp/data/", "zip")
    
    print("fetching code")
    minio_client.fget_object(minio_bucket,"traincode.zip","/tmp/traincode.zip")
    shutil.unpack_archive("/tmp/traincode.zip", "/tmp/code/", "zip")
    
    print("training model")
    sys.path.insert(0, '/tmp/code')
    from train import main
    main()
    
    trainer_args = {
        'epochs': no_epochs,
        'profiler': 'pytorch',
        'accelerator': accelerator,
    }
    
    data_module_args = {"train_num_workers": 2}
    
    model_name="model.pth"
    checkpoint_dir="/tmp/checkpoints"
    tensorboard_root="/tmp/tensorboard"
    
    train_output = main(
        trainer_args=trainer_args,
        tensorboard_root=tensorboard_root,
        checkpoint_dir=checkpoint_dir,
        data_path="/tmp/data/",
        # mlpipeline_ui_metadata="mlpipeline-ui-metadata.json",
        # mlpipeline_metrics="mlpipeline-metrics.json",
        # results="results.json",
        # confusion_matrix_url="metrics",
        data_module_args=data_module_args
    )

    print("uploading model")
    minio_client.fput_object(minio_bucket,"model.pt","/tmp/model.pt")
    
    print("uploading checkpoints")
    upload_directory(minio_bucket, checkpoint_dir, 'checkpoints')
    
    print("uploading tensorbord logs")
    upload_directory(minio_bucket, tensorboard_root, 'tensorboard')

    print("Saved model to minIO")
    
    return train_output
    

## Model Generation

In [18]:
def model_generation(minio_args, bucket_name, model_name):
    """
    This can be used to convert model from one format to another 
    Here, we are creating a MAR file for serving
    """


    from minio import Minio
    import json
    import shutil
    import os
    import subprocess
    
    os.environ['TORCH_HOME'] = '/tmp/models/'

    minio_client = Minio(**minio_args)
    minio_bucket = bucket_name

    print("getting model")

    minio_client.fget_object(minio_bucket,"model.pt","/tmp/model.pt")
    
    print("getting code")
    minio_client.fget_object(minio_bucket,"convertcode.zip","/tmp/convertcode.zip")
    shutil.unpack_archive("/tmp/convertcode.zip", "/tmp/code/", "zip")
    
    print("converting model")
    bashCommand = f"torch-model-archiver --model-name {model_name} --version 1.0 --serialized-file /tmp/model.pt --handler /tmp/code/ts/torch_handler/cifar34_handler.py --extra-files /tmp/code/index_to_name.json --export-path /tmp/"
    process = subprocess.Popen(bashCommand.split(), stdout=subprocess.PIPE)
    output, error = process.communicate()
    
    print("uploading model")
    minio_client.fput_object(minio_bucket,f"{model_name}/config/config.properties","/tmp/code/config.properties")
    minio_client.fput_object(minio_bucket,f"{model_name}/model-store/{model_name}.mar",f"/tmp/{model_name}.mar")

    print("Saved model to minIO")
    

## Model Serve

In [19]:
def model_serving(bucket_name):
    """
    Create kserve instance
    """
    from kubernetes import client 
    from kserve import KServeClient
    from kserve import constants
    from kserve import utils
    from kserve import V1beta1InferenceService
    from kserve import V1beta1InferenceServiceSpec
    from kserve import V1beta1PredictorSpec
    from kserve import V1beta1TFServingSpec
    from datetime import datetime

    namespace = utils.get_default_target_namespace()

    name='cifar34'
    kserve_version='v1beta1'
    api_version = constants.KSERVE_GROUP + '/' + kserve_version

    isvc = V1beta1InferenceService(api_version=api_version,
                                   kind=constants.KSERVE_KIND,
                                   metadata=client.V1ObjectMeta(
                                       name=name, namespace=namespace, annotations={'sidecar.istio.io/inject':'false'}),
                                   spec=V1beta1InferenceServiceSpec(
                                   predictor=V1beta1PredictorSpec(
                                       service_account_name="sa-minio-kserve",
                                       pytorch=(V1beta1TFServingSpec(
                                           storage_uri=f"s3://bucket_name/cifar34/"))))
    )

    KServe = KServeClient()
    KServe.create(isvc)


## Create Pipeline

## Defining Components

In [20]:
comp_get_data = components.create_component_from_func(
    func=get_data,
    base_image=base_image)

In [21]:
comp_preprocess_data = components.create_component_from_func(
    func=preprocess_data,
    base_image=base_image)

In [22]:
comp_model_training = components.create_component_from_func(
    func=model_training,
    base_image=base_image)

In [23]:
comp_model_generation = components.create_component_from_func(
    func=model_generation,
    base_image=base_image,
    packages_to_install=['torch-model-archiver==0.7.1'])

In [24]:
comp_model_serving = components.create_component_from_func(
    func=model_serving,
    base_image=base_image)

## Defining Pipeline

In [27]:
@dsl.pipeline(
    name='pytorch-pipeline',
    description='kubeflow pipeline'
)
def pipeline(no_epochs,accelerator,minio_args, bucket_name, model_name):
    
    volume_train = dsl.PipelineVolume(volume=k8s.client.V1Volume(            │
│         name="shm",                                                          │
│         empty_dir=k8s.client.V1EmptyDirVolumeSource(medium='Memory'))) 
    
    get_data = comp_get_data().set_display_name("Fetching Data")
    
    preprocess_data = comp_preprocess_data(minio_args, bucket_name)
    preprocess_data.after(get_data).set_display_name("Preprocessing Data")
    
    model_training = comp_model_training(
        no_epochs,accelerator, minio_args, bucket_name
        ).after(preprocess_data
                ).add_pvolumes({'/dev/shm': volume_train}
                ).set_display_name("Training"
                ).set_memory_request('3000M'
                ).set_memory_limit('3200M'
                ).set_cpu_request('3000m'
                ).set_cpu_limit('4000m'
                ) 
    
    model_generation = comp_model_generation(minio_args, bucket_name, model_name)
    model_generation.after(model_training).set_display_name("MAR Generation")

    model_serving = comp_model_serving(minio_args, bucket_name, model_name)
    model_serving.after(model_generation).set_display_name("Model Serving")
    

# Executing Pipeline

In [28]:
if __name__ == "__main__":
    client = kfp.Client()
    current_datetime = datetime.now()
    formatted_datetime = current_datetime.strftime("%Y-%m-%d %H:%M")

    repo_url = "https://github.com/exampleuser/example-repo.git"
    
    bucket_name = "pytorch" + "-" + formatted_datetime
    model_name = "timm_vit"

    minio_args = {
        "url": MINIO_URL,
        "user": MINIO_USER,
        "pass": MINIO_PASS,
        "secure": MINIO_SECURE
    }
    
    from minio import Minio
    from minio.error import BucketAlreadyOwnedByYou
    
    minio_client = Minio(
        **minio_args
    )
    

    try:
        minio_client.make_bucket(bucket_name)
        print("Bucket created successfully.")
    except BucketAlreadyOwnedByYou as e:
        print(f"Bucket '{e.bucket_name}' already exists and is owned by you.")
    except Exception as e:
        print(f"Error creating bucket: {e}")  
    
    try:
        subprocess.run(["git", "clone", repo_url, "gitrepo"], check=True)
        print(f"Repository cloned successfully to gitrepo")
    except subprocess.CalledProcessError as e:
        print(f"Failed to clone the repository. Error: {e}")
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        
    shutil.make_archive("traincode", 'zip', "gitrepo/traincode")
    shutil.make_archive("convertcode", 'zip', "gitrepo/convertcode")
        
    minio_client.fput_object(bucket_name,"traincode.zip","./traincode.zip")
    minio_client.fput_object(bucket_name,"convertcode.zip","./convertcode.zip")
    
    arguments = {
        "no_epochs" : 1,
        "accelerator": "cpu",
        "minio_args": minio_args,
        "bucket_name": bucket_name,
        "model_name": model_name
    }

    experiment_name = "default"
    run_directly = 1
    
    if (run_directly == 1):
        client.create_run_from_pipeline_func(pipeline,arguments=arguments,experiment_name=experiment_name)
    else:
        kfp.compiler.Compiler().compile(pipeline_func=pipeline,package_path='output_test.yaml')
        client.upload_pipeline_version(pipeline_package_path='output_test.yaml',pipeline_version_name="0.4",pipeline_name="pipeline test",description="just for testing")

# Inference

In [29]:

from kubernetes import client 
from kserve import KServeClient
from kserve import constants
from kserve import utils
from kserve import V1beta1InferenceService
from kserve import V1beta1InferenceServiceSpec
from kserve import V1beta1PredictorSpec
from kserve import V1beta1SKLearnSpec
import numpy as np
import json
import pandas as pd



In [30]:
!curl http://cifar34.kubeflow-user-example-com.svc.cluster.local/v1/models/cifar34

{"name": "cifar34", "ready": true}

In [35]:
!curl -LO https://github.com/mmg10/ptkflow/raw/main/153.jpg

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100 11013  100 11013    0     0  30762      0 --:--:-- --:--:-- --:--:-- 30762


In [32]:
actual_domain = 'http://cifar34.kubeflow-user-example-com.svc.cluster.local'
model_name='cifar34'
service_url = f'{actual_domain}/v1/models/{model_name}:predict'

In [33]:
import requests
import base64

image = open('153.jpg', 'rb') #open binary file in read mode
image_read = image.read()
image_64_encode = base64.b64encode(image_read)
bytes_array = image_64_encode.decode('utf-8')
request = {
  "instances":[
    {
      "data": bytes_array
    }
  ]
}

In [34]:
response = requests.post(service_url, json=request)
print(response.json())

{'predictions': [{'mountain': 0.48141807317733765, 'sea': 0.27487021684646606, 'glacier': 0.1335740089416504, 'buildings': 0.06491722911596298, 'street': 0.0399005189538002}]}
