In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# specify substep parameters for interactive run
# this cell will be replaced during job run with the parameters from json within params subfolder
substep_params={
}

In [None]:
# load pipeline and step parameters - do not edit
from sinara.substep import get_pipeline_params, get_step_params
pipeline_params = get_pipeline_params(pprint=True)
step_params = get_step_params(pprint=True)

In [None]:
# define substep interface
from sinara.substep import NotebookSubstep, ENV_NAME, PIPELINE_NAME, ZONE_NAME, STEP_NAME, RUN_ID, ENTITY_NAME, ENTITY_PATH, SUBSTEP_NAME

substep = NotebookSubstep(pipeline_params, step_params, substep_params)

substep.interface(
    inputs =
    [ 
      {PIPELINE_NAME: "face_detect_rest", STEP_NAME: "model_pack", ENTITY_NAME: "bento_service"}, # bentoservice file from pack step of pipeline face_detect_rest 
      {PIPELINE_NAME: "facemask_classification_rest", STEP_NAME: "model_pack", ENTITY_NAME: "bento_service"} # bentoservice file from pack step of facemask_classification_rest
    ]
    
)

substep.print_interface_info()
substep.exit_in_visualize_mode()

In [None]:
import numpy as np
import os.path as osp
from pathlib import Path
import json
import atexit
import requests
import base64
import cv2
import matplotlib.pyplot as plt

In [None]:
# run spark
from sinara.spark import SinaraSpark

spark = SinaraSpark.run_session(0)
SinaraSpark.ui_url()

### Loading REST Bentoservice 

In [None]:
from sinara.bentoml import load_bentoservice

# load bentoservise
inputs_face_detect= substep.inputs(step_name = "model_pack", pipeline_name = "face_detect_rest")
inputs_facemask_classification= substep.inputs(step_name = "model_pack", pipeline_name = "facemask_classification_rest")

face_detect_service = load_bentoservice(inputs_face_detect.bento_service, 
                                        bentoservice_name="face_detect_service")
facemask_classification_service = load_bentoservice(inputs_facemask_classification.bento_service,
                                                    bentoservice_name="facemask_classification_service")

In [None]:
face_detect_service.service_version()

In [None]:
facemask_classification_service.service_version()

### Start Bentoservice

In [None]:
from sinara.bentoml import start_dev_bentoservice, stop_dev_bentoservice

In [None]:
%%capture cap --no-stderr
# Stop a dev model server if running
stop_dev_bentoservice(face_detect_service)
stop_dev_bentoservice(facemask_classification_service)

# Start a dev model server to test out the API endpoint locally
start_dev_bentoservice(face_detect_service, use_popen=True, debug=False, port = 5001)
start_dev_bentoservice(facemask_classification_service, use_popen=True, debug=False, port = 5002)
_=atexit.register(stop_dev_bentoservice)

In [None]:
# example REST API get service_version face_detect_service
service_version = requests.post("http://127.0.0.1:5001/service_version", json={}).json()
print(f"service_version face_detect_service: {service_version}")

In [None]:
# example REST API get service_version facemask_classification_service
service_version = requests.post("http://127.0.0.1:5002/service_version", json={}).json()
print(f"service_version facemask_classification_service: {service_version}")

### Predict test_data by ensemble face_detect_service and facemask_classification_service

#### Get test_data from face_detect_service
(test image, which should be stored in the bento service, and can be obtained using the test_data method)

In [None]:
test_data_api_endpoint = f'http://127.0.0.1:5001/test_data'
test_data_response = requests.request("POST", test_data_api_endpoint, json={})

content = test_data_response.json()
encoded_test_data = content['b64']
test_data = base64.b64decode(encoded_test_data)

image_array = np.frombuffer(test_data, np.int8)
numpy_test_data = cv2.imdecode(image_array, cv2.IMREAD_COLOR)
numpy_test_data = cv2.cvtColor(numpy_test_data, cv2.COLOR_BGR2RGB )

#### Predict test_data by face_detect_service
(sending test image test_data to predict method)

In [None]:
predict_face_detect_response = requests.post('http://127.0.0.1:5001/predict', 
                                             headers={'Content-Type': 'application/octet-stream'}, 
                                             data=cv2.imencode('.png', numpy_test_data)[1].tobytes())
predict_face_detect_result = predict_face_detect_response.json()

### Predict every crop image of predicted objects by facemask_classification_service

In [None]:
predict_result = predict_face_detect_result.copy()
predict_result["annotations"] = []

for ann_obj in predict_face_detect_result["annotations"]:
    score = ann_obj["score"]
    if score < 0.5:
        continue
    id_obj = ann_obj["id"]
    bbox = ann_obj["bbox"]  # coco format: xywh (x_top_left, y_top_left, w_object, h_object)
    x_tl, y_tl, w_obj, h_obj  = bbox

    # get crop image of face in test_image
    crop_image_obj = numpy_test_data[y_tl:y_tl+h_obj, x_tl:x_tl+w_obj]

    # predict facemask_classification_service
    predict_facemask_classification_response = requests.post("http://127.0.0.1:5002/predict", 
                                                             headers={'Content-Type': 'application/octet-stream'}, 
                                                             data=cv2.imencode('.png', crop_image_obj)[1].tobytes())
    predict_facemask_classification_result = predict_facemask_classification_response.json()
    face_class = predict_facemask_classification_result["class_names"][0]
    face_class_scores = predict_facemask_classification_result["class_scores"]
    attribute_obj = {"face_class": face_class,
                     "face_class_scores": face_class_scores}
    ann_obj.update({"attribute": attribute_obj})
    predict_result["annotations"].append(ann_obj.copy())  

In [None]:
# Stop spark
SinaraSpark.stop_session()