<img src="./img/hpe_logo.png" alt="HPE Logo" width="300">

<h1>Request Prediction from KServe InferenceService</h1>

<h5>Date: 07/26/23</h5>
<h5>Version: 1.0</h5>
<h5>Author(s): andrew.mendez@hpe.com</h5>



<img src="./img/platform_step0.png" alt="Enterprise Machine Learning platform architecture" width="850">

<h3>Import modules and define functions</h3>
The cell below imports all modules and libraries required to run the demo.

In [1]:
# !pip install -q ipywidgets

In [2]:
# imports
import sys
import glob
import base64
import json
import requests
import matplotlib.pyplot as plt

from skimage import io
from PIL import Image, ImageDraw
from ipywidgets import interact, interactive
import ipywidgets as widgets
import io

from tqdm import tqdm
from multiprocessing import Pool

<h3>Step 1: Setting up connection details to KServe and define image directory</h3>

In [3]:
# Set direcotry ./img/pred for images and get files with .jpg extention
images = r"./../e2e_blogposts/ngc_blog/xview_dataset/train_images_rgb_no_neg_filt_32/train_images_640_02_filt_32/*.jpg"
endpoint_name='detection-deploy'
model_name='sat-detection'
ingress_host = "{}.models.mlds-kserve.us.rdlabs.hpecorp.net".format(endpoint_name)
ingress_port = "80"
service_hostname = "{}.models.mlds-kserve.us.rdlabs.hpecorp.net".format(endpoint_name)
print(ingress_host)

detection-deploy.models.mlds-kserve.us.rdlabs.hpecorp.net


In [4]:
import torchvision
import torch
def run_nms(pred_d):
    cat2id = {'Fixed-wing Aircraft': 1, 'Cargo Plane':2}
    id2cat = {v:k for k,v in cat2id.items()}
    bboxes = []
    scores = []
    classes = []
    for pred in pred_d:
        cl_id = list(pred.keys())[0]
        classes.append(cat2id[cl_id])
        scores.append(pred['score'])
        bboxes.append(pred[cl_id])
    classes = torch.LongTensor(classes)
    bboxes = torch.FloatTensor(bboxes)
    scores = torch.FloatTensor(scores)
    # print(classes)
    idxs = torchvision.ops.batched_nms(bboxes, scores, classes, iou_threshold=0.2)
    # print(idxs)
    final_classes = classes[idxs].tolist()
    final_bboxes = bboxes[idxs].tolist()
    final_scores = scores[idxs].tolist()
    # print(final_classes)

    final_d = []
    for cl,bbox,s in zip(final_classes,final_bboxes,final_scores):

        #{'Cargo Plane': [226.26126098632812, 554.0189208984375, 307.98333740234375, 623.9032592773438], 'score': 0.30342113971710205}
        # print(id2cat[cl])
        final_d.append({id2cat[cl]:bbox, 'score':s })
        # break
    # print(final_d)
    return final_d
def plot_pred(im,pred_d,thres=0.15):
    '''
    '''
    draw = ImageDraw.Draw(im)
    try:
        for pred in pred_d['predictions'][0]:
            assert len(list(pred.keys())) == 2
            cl_name = list(pred.keys())[0]
            bboxes = pred[cl_name]
            if pred['score'] > thres:
                draw.rectangle([bboxes[0],bboxes[1],bboxes[2],bboxes[3]],outline=(255,0,0),fill=None,width=1)
                draw.text([bboxes[0],bboxes[1]-10],"{} :{:.2f}".format(cl_name,pred['score']),fill=(250,0,0))
        plt.figure(figsize=(8,8))
    except Exception as e:
        print(e)
        pass
    plt.imshow(im)
    plt.show()
    return im

def predict(args):
    '''
    Function to base64encode image and send to API
    '''
    image= Image.open(args[0])
    with io.BytesIO() as buffer:
        image.save(buffer, format='JPEG')  # You can replace 'JPEG' with other formats like 'PNG' if needed
        image_bytes = buffer.getvalue()
    image_64_encode = base64.b64encode(image_bytes)
    bytes_array = image_64_encode.decode("utf-8")
    
    # Format the request in json
    request = {
      "instances":[
        {
          "data": bytes_array
        }
      ]
    }
    ingress_host = args[1]
    ingress_port = args[2]
    model_name = args[3]
    service_hostname = args[4]
    # Create request for Prediction (header, URL, payload)
    url = str("http://") + str(ingress_host) + ":" + str(ingress_port) + "/v1/models/" + str(model_name) + ":predict"
    headers = {'Host': service_hostname}
    payload = json.dumps(request)
    # print(request)
    response = requests.post(url, data=payload, headers=headers)
    res = response.json()
    # print("Running NMS...")
    final_res = run_nms(res['predictions'][0])
    # print("Done!")
    res['predictions'][0] = final_res
    # print(len(res['predictions'][0]))
    return res

def visualize(idx,thres=0.15):
    '''
    Visualize predicted results from resps
    '''
    print(idx,thres)
    output = resps[idx]
    im = Image.open(imgs[idx])
    plot_pred(im,output,thres)
    
def run_apply_async_multiprocessing(func, argument_list, num_processes):
    '''
    Use multiprocessing.apply_async to send simultaneous requests
    '''
    pool = Pool(processes=num_processes)

    jobs = [pool.apply_async(func=func, args=(*argument,)) if isinstance(argument, tuple) else pool.apply_async(func=func, args=(argument,)) for argument in argument_list]
    pool.close()
    result_list_tqdm = []
    for job in tqdm(jobs):
        result_list_tqdm.append(job.get())

    return result_list_tqdm

In [5]:
imgs = [img for img in glob.glob(images, recursive=True)]

In [6]:
# predict([imgs[0],ingress_host,ingress_port,model_name,service_hostname])

<h3>Step 2: Request prediction from KServe InferenceService and display results</h3>

In [7]:
resps = run_apply_async_multiprocessing(predict,[[imgs[i],ingress_host,ingress_port,model_name,service_hostname] for i in range(len(imgs))],num_processes=4)

100%|██████████| 33/33 [01:07<00:00,  2.04s/it]


In [8]:
interact(visualize, idx=widgets.IntSlider(min=0, max=len(resps), step=1, value=0),thres =widgets.FloatSlider(min=0, max=1.0, step=0.1, value=0.00));

interactive(children=(IntSlider(value=0, description='idx', max=33), FloatSlider(value=0.0, description='thres…