In [7]:
# import cv2
import numpy as np
import math
import matplotlib.pyplot as plt
from PIL import Image
import time
import requests
import tritonclient.http as httpclient
import tritonclient.grpc as grpcclient
from tritonclient.grpc import service_pb2, service_pb2_grpc

## Triton Tutorial

### HTTP Request

In [None]:
def detection_preprocessing(image: cv2.Mat) -> np.ndarray:
    blob = cv2.dnn.blobFromImage(image, 1.0, (640, 480), (123.68, 116.78, 103.94), True, False)
    blob = np.transpose(blob, (0, 2, 3, 1))  # (1, 3, 480, 640)
    return blob

def detection_postprocessing(scores, geometry, preprocessed_image):
    def fourPointsTransform(frame, vertices):  # (480, 640, 3) / [[x1, y1], [x2, y2], [x1, y2], [x2, y1]]
        vertices = np.asarray(vertices)
        outputSize = (100, 32)
        targetVertices = np.array(
            [
                [0, outputSize[1] - 1],
                [0, 0],
                [outputSize[0] - 1, 0],
                [outputSize[0] - 1, outputSize[1] - 1],
            ],
        dtype="float32") # (4, 2)
        rotationMatrix = cv2.getPerspectiveTransform(vertices, targetVertices) # (3, 3)
        result = cv2.warpPerspective(frame, rotationMatrix, outputSize)
        return result

In [2]:
SAVE_INTERMEDIATE_IMAGES = False

def detection_postprocessing(scores, geometry, preprocessed_image):
    def fourPointsTransform(frame, vertices):  # (480, 640, 3) / [[x1, y1], [x2, y2], [x1, y2], [x2, y1]]
        vertices = np.asarray(vertices)
        outputSize = (100, 32)
        targetVertices = np.array(
            [
                [0, outputSize[1] - 1],
                [0, 0],
                [outputSize[0] - 1, 0],
                [outputSize[0] - 1, outputSize[1] - 1],
            ],
            dtype="float32",
        ) # (4, 2)

        rotationMatrix = cv2.getPerspectiveTransform(vertices, targetVertices) # (3, 3)
        result = cv2.warpPerspective(frame, rotationMatrix, outputSize)
        return result

    def decodeBoundingBoxes(scores, geometry, scoreThresh=0.5):
        detections = []
        confidences = []

        # CHECK DIMENSIONS AND SHAPES OF geometry AND scores
        assert len(scores.shape) == 4, "Incorrect dimensions of scores"
        assert len(geometry.shape) == 4, "Incorrect dimensions of geometry"
        assert scores.shape[0] == 1, "Invalid dimensions of scores"
        assert geometry.shape[0] == 1, "Invalid dimensions of geometry"
        assert scores.shape[1] == 1, "Invalid dimensions of scores"
        assert geometry.shape[1] == 5, "Invalid dimensions of geometry"
        assert (
            scores.shape[2] == geometry.shape[2]
        ), "Invalid dimensions of scores and geometry"
        assert (
            scores.shape[3] == geometry.shape[3]
        ), "Invalid dimensions of scores and geometry"
        
        height = scores.shape[2]
        width = scores.shape[3]
        
        for y in range(0, height):
            # Extract data from scores
            scoresData = scores[0][0][y]   # (160, )
            x0_data = geometry[0][0][y]    # (160, )
            x1_data = geometry[0][1][y]    # (160, ) 
            x2_data = geometry[0][2][y]    # (160, )
            x3_data = geometry[0][3][y]    # (160, )
            anglesData = geometry[0][4][y] # (160, )
            
            for x in range(0, width):
                score = scoresData[x]      # (1, )

                # If score is lower than threshold score, move to next x
                if score < scoreThresh:
                    continue

                # Calculate offset
                offsetX = x * 4.0          # x,y are indices of width, height
                offsetY = y * 4.0
                angle = anglesData[x]      # (1, )

                # Calculate cos and sin of angle
                cosA = math.cos(angle)
                sinA = math.sin(angle)
                h = x0_data[x] + x2_data[x]
                w = x1_data[x] + x3_data[x]

                # Calculate offset
                offset = [
                    offsetX + cosA * x1_data[x] + sinA * x2_data[x],
                    offsetY - sinA * x1_data[x] + cosA * x2_data[x],
                ]

                # Find points for rectangle
                p1 = (-sinA * h + offset[0], -cosA * h + offset[1])
                p3 = (-cosA * w + offset[0], sinA * w + offset[1])
                center = (0.5 * (p1[0] + p3[0]), 0.5 * (p1[1] + p3[1]))
                detections.append((center, (w, h), -1 * angle * 180.0 / math.pi))
                confidences.append(float(score))

        # Return detections and confidences
        return [detections, confidences]

    scores = scores.transpose(0, 3, 1, 2)           # (1, 1, 120, 160)
    geometry = geometry.transpose(0, 3, 1, 2)       # (1, 5, 120, 160)
    frame = np.squeeze(preprocessed_image, axis=0)  # (480, 640, 3)
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)  # (480, 640, 3)
    [boxes, confidences] = decodeBoundingBoxes(scores, geometry)    # [list: 248 of ((center_x, center_y), (w,h), angle), list: 248]
    indices = cv2.dnn.NMSBoxesRotated(boxes, confidences, 0.5, 0.4) # Non-max suppression -> (1)
    
    cropped_list = []
    count = 0
    for i in indices:
        # get 4 corners of the rotated rect
        count += 1
        vertices = cv2.boxPoints(boxes[i])  # [[x1, y1], [x2, y2], [x1, y2], [x2, y1]]
        cropped = fourPointsTransform(frame, vertices) # (32, 100, 3)
        cropped = np.expand_dims(cv2.cvtColor(cropped, cv2.COLOR_BGR2GRAY), axis=0) # (1, 32, 100)
        cropped_list.append(((cropped / 255.0) - 0.5) * 2) # normalize
        
    cropped_arr = np.stack(cropped_list, axis=0) # (1, 1, 32, 100)
    return cropped_arr[None, 0]

In [None]:
def recognition_postprocessing(scores: np.ndarray) -> str:
    text = ""
    alphabet = "0123456789abcdefghijklmnopqrstuvwxyz"

    scores = np.transpose(scores, (1, 0, 2))

    for i in range(scores.shape[0]):
        c = np.argmax(scores[i][0])
        if c != 0:
            text += alphabet[c - 1]
        else:
            text += "-"
    # adjacent same letters as well as background text must be removed
    # to get the final output
    char_list = []
    for i, char in enumerate(text):
        if char != "-" and (not (i > 0 and char == text[i - 1])):
            char_list.append(char)
    return "".join(char_list)

In [None]:
if __name__ == "__main__":
    # Setting up client
    client = httpclient.InferenceServerClient(url="localhost:8000")

    # Read image and create input object
    raw_image = cv2.imread("./inputs/sample.jpg")           # (4000, 3000, 3)
    preprocessed_image = detection_preprocessing(raw_image) # (1, 480, 640, 3)

    detection_input = httpclient.InferInput(
        "input_images:0", preprocessed_image.shape, datatype="FP32"
    )
    detection_input.set_data_from_numpy(preprocessed_image, binary_data=True)

    # # Query the server
    detection_response = client.infer(
        model_name="text_detection", inputs=[detection_input]
    )

    # Process responses from detection model
    scores = detection_response.as_numpy("feature_fusion/Conv_7/Sigmoid:0")         # (1, 120, 160, 1) down 4
    geometry = detection_response.as_numpy("feature_fusion/concat_3:0")             # (1, 120, 160, 5)
    cropped_images = detection_postprocessing(scores, geometry, preprocessed_image) # (1, 32, 100)

    # Create input object for recognition model
    recognition_input = httpclient.InferInput(
        "input.1", cropped_images.shape, datatype="FP32"
    )
    recognition_input.set_data_from_numpy(cropped_images, binary_data=True)

    # # Query the server
    recognition_response = client.infer(
        model_name="text_recognition", inputs=[recognition_input]
    )

    # Process response from recognition model
    final_text = recognition_postprocessing(recognition_response.as_numpy("308"))
    print(final_text)

### GRPC for Ensemble Model

In [8]:
class TritonClient:
    def __init__(self, url="localhost:8001"):
        self.url = url
        self.client = grpcclient.InferenceServerClient(url=self.url)
        
    def inference(self, model_name=None, input_name=None, input_dtype=None, path=None):
        assert model_name is not None, "Insert the model for inference"
        assert input_name is not None, "Insert input name for inference"
        assert input_dtype is not None, "Insert data type for inference"
        assert path is not None, "Insert one image for inference"
        
        image_data = np.fromfile(path, dtype="uint8")
        image_data = np.expand_dims(image_data, axis=0)
        
        inputs = [grpcclient.InferInput(input_name, image_data.shape, input_dtype)]
        inputs[0].set_data_from_numpy(image_data)
        
        results = self.client.infer(model_name=model_name, inputs=inputs)
        output_data = results.as_numpy("recognized_text").astype(str)
        print(output_data)

In [None]:
if __name__ == "__main__":
    triton_client = TritonClient()
    triton_client.inference(
        model_name="ensemble_model",
        input_name="input_image",
        input_dtype="UINT8",
        path="inputs/sample.jpg"
    )

### Debug the preprocessing

In [None]:
def detection_preprocessing(image: cv2.Mat) -> np.ndarray:
    inpWidth = 640
    inpHeight = 480

    blob = cv2.dnn.blobFromImage(image, 1.0, (inpWidth, inpHeight), (123.68, 116.78, 103.94), False, False)
    blob = np.transpose(blob, (0, 2, 3, 1))
    return blob

image = cv2.imread("inputs/sample.jpg")
image = detection_preprocessing(image)
plt.imshow(image[0])

In [None]:
def image_loader(image) -> np.ndarray:
    image = np.array(image.resize((640, 480)))
    image = image - np.array([103.94, 123.68, 116.78]).reshape(1,1,3)
    image = image.astype(np.float32)[None, ...]
    return image

image = Image.open("inputs/sample.jpg")
image = image_loader(image)
plt.imshow(image[0])

## Other deployment

### HTTTP Request

In [None]:
class TritonClientInference:
    def __init__(self):
        self.url = "http://localhost:8000/v2/models/classification/infer"
    
    def inference(self, input_data):
        inputs = [
            {
                "name": "input",
                "shape": input_data.shape,
                "datatype": "FP32",
                "data": input_data.tolist()
            }
        ]
        outputs = [{"name": "output"}]
        
        request_payload = {
            "inputs": inputs,
            "outputs": outputs
        }
        response = requests.post(self.url, json=request_payload)
        if response.status_code == 200:
            response_json = response.json()
            output_data = np.array(response_json["outputs"][0]["data"]).reshape(response_json["outputs"][0]["shape"])
            print("Output Data: ", output_data.shape)
            
        else:
            print("Request failed with status code: ", response.status_code)
            print("Response: ", response.text)

if __name__ == "__main__":
    client = TritonClientInference()
    input_data = np.zeros((1, 3, 224, 224), dtype=np.float32)
    client.inference(input_data)

### GRPC Request

In [None]:
class TritonClient:
    def __init__(self):
        self.url = "localhost:8001"
        self.client = grpcclient.InferenceServerClient(url=self.url)
        
    def infer(self, model_name, input_data):
        inputs = []
        input_tensor = grpcclient.InferInput("input", input_data.shape, "FP32")
        input_tensor.set_data_from_numpy(input_data)
        inputs.append(input_tensor)
        
        outputs = []
        output_tensor = grpcclient.InferRequestedOutput("output")
        outputs.append(output_tensor)
        
        start = time.time()
        response = self.client.infer(model_name=model_name, inputs=inputs, outputs=outputs)
        print("Model inference time", (time.time() - start)*1000, "ms")
        
        output_data = response.as_numpy("output")
        return output_data

        
if __name__ == "__main__":
    model_name = "classification"
    input_data = np.zeros((1, 3, 224, 224), dtype=np.float32)
    
    triton_client = TritonClient()
    start = time.time()
    output = triton_client.infer(model_name, input_data)
    print("Total call inference time", (time.time() - start)*1000, "ms")
    print(output.shape)