# Embedded ML Lab - Challenge (Camera example)

This is an example notebook for the camera usage

In [10]:
import sklearn
from faf.tensorrt import CameraTensorrtDisplay
from faf.utils.camera import CameraDisplay
import time
import cv2
from faf.tinyyolov2 import TinyYoloV2
import torch
from faf.inference import InferenceModel
from torchvision.transforms import ToTensor
import onnxruntime
import numpy as np
from faf.utils.yolo import filter_boxes_separate, nms, filter_boxes
from faf.visualization import visualize_result, draw_bbox_opencv
import os

In [None]:
onnxruntime.get_available_providers()

In [None]:
!pip3 install git+https://github.com/onnx/onnx-tensorrt.git

In [2]:
net = TinyYoloV2.from_saved_state_dict("../weights/voc_pretrained.pt", use_constant_padding=True, use_leaky_relu=True)

In [None]:
net

In [3]:
net.eval()
x = torch.randn(1, 3, 320, 320, requires_grad=True)

out_path = os.path.join("./", "test.onnx")

# Export the model
torch.onnx.export(
    net,  # model being run
    x,  # model input (or a tuple for multiple inputs)
    out_path,  # where to save the model (can be a file or file-like object)
    export_params=True,  # store the trained parameter weights inside the model file
    opset_version=10,  # the ONNX version to export the model to
    do_constant_folding=True,  # whether to execute constant folding for optimization
    keep_initializers_as_inputs=True,
    input_names=["input"],  # the model's input names
    output_names=["output"],  # the model's output names
    # dynamic_axes={
    #     "input": {0: "batch_size"},
    #     "output": {0: "batch_size"},
    # },  # variable length axes
)

In [None]:
import onnx
import onnx_tensorrt.backend as backend
import numpy as np
model = onnx.load("../weights/simplified.onnx")

print(onnx.checker.check_model(model))
engine = backend.prepare(model, device='CUDA:0')
input_data = np.random.random(size=(1, 3, 320, 320)).astype(np.float32)
output_data = engine.run(input_data)[0]
print(output_data)
print(output_data.shape)

In [None]:
providers = onnxruntime.get_available_providers()
assert 'CUDAExecutionProvider' in providers, "CUDAExecution Provider is not available."

# Create a session options object
session_options = onnxruntime.SessionOptions()

# Set the GPU memory limit (in bytes, for example 0.5 GB here)
gpu_mem_limit = int(0.5 * 1024 * 1024 * 1024)

# Configuration for the CUDA Execution Provider
cuda_provider_options = {
    "device_id": "0",  # Assuming using GPU device 0
    "gpu_mem_limit": gpu_mem_limit,
    "arena_extend_strategy": "kSameAsRequested",
    "cudnn_conv_algo_search": "HEURISTIC",
    # "cudnn_conv_use_max_workspace": '1'
}

# Create the session with the model and the configured session options
# Replace 'your_model.onnx' with the path to your ONNX model
ort_session = onnxruntime.InferenceSession(
    "../weights/simplified.onnx", 
    sess_options=session_options, 
    providers=['CUDAExecutionProvider'], 
    provider_options=[cuda_provider_options]
)

In [None]:
# net = TinyYoloV2.from_saved_state_dict("../weights/test/final.pt")

# net.to(torch.device("cpu"))
# net = InferenceModel("../weights/test/inference.onnx")

ort_session = onnxruntime.InferenceSession(
    "../weights/simplified.onnx", providers=["CPUExecutionProvider"]
)

In [None]:
import tensorrt as trt
import numpy as np
import pycuda.driver as cuda

In [None]:
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
trt_runtime = trt.Runtime(TRT_LOGGER)

device = cuda.Device(0) 
cuda_context = device.make_context()

In [None]:
# Load your serialized TensorRT engine (model)
def load_engine(trt_runtime, engine_path):
    with open(engine_path, 'rb') as f:
        engine_data = f.read()
    engine = trt_runtime.deserialize_cuda_engine(engine_data)
    return engine

# Allocate buffers for input and output
def allocate_buffers(engine):

    context = engine.create_execution_context()
    input_shape = context.get_binding_shape(0)  # Assuming one input. Adjust if your model differs.
    output_shape = context.get_binding_shape(1)  # Assuming one output. Adjust if your model differs.
    dtype = trt.nptype(engine.get_binding_dtype(0))  # Assuming dtype is the same for input and output
    # Allocate host and device buffers
    d_input = cuda.mem_alloc(int(np.prod(input_shape) * 4))
    d_output = cuda.mem_alloc(int(np.prod(output_shape) * 4))
    h_output = np.empty(output_shape, dtype=dtype)
    bindings = [int(d_input), int(d_output)]
    stream = cuda.Stream()

    return d_input, d_output, h_output, bindings, stream, context

# Loading the TRT engine
engine_path = '../weights/simplified.trt'  # Provide the path to your .trt model file
engine = load_engine(trt_runtime, engine_path)

In [None]:
# Allocate buffers
d_input, d_output, h_output, bindings, stream, context = allocate_buffers(engine)

In [None]:
def do_inference(context, bindings, input_data, d_input, d_output, h_output, stream):
    """
    Execute the model inference.
    :param context: The TensorRT execution context.
    :param bindings: The bindings for the input and output memory pointers.
    :param input_data: The input data for the model.
    :param d_input: Device input buffer.
    :param d_output: Device output buffer.
    :param h_output: Host output buffer.
    :param stream: CUDA stream for asynchronous execution.
    :return: The model's output data.
    """
    # Transfer input data to the GPU.
    cuda.memcpy_htod_async(d_input, input_data, stream)
    # Execute the model.
    context.execute_async_v2(bindings=bindings, stream_handle=stream.handle)
    # Transfer predictions back from the GPU.
    cuda.memcpy_dtoh_async(h_output, d_output, stream)
    # Synchronize the stream
    stream.synchronize()
    # Return the host output.
    return h_output

# Assuming your model expects a single input with shape (1, C, H, W) where C, H, W are channel, height, and width.
# Update this to match your model's expected input shape.
input_shape = (1, 3, 320, 320)  # Replace C, H, W with actual values.
dtype = np.float32  # Replace with the actual dtype your model expects (e.g., np.float16, np.float32)

# Generate a dummy input. Replace this with real data when you run your model.
input_data = np.random.random_sample(input_shape).astype(dtype)

# Prepare input data for the model (ensure it matches the expected input shape and type)
input_data = np.ascontiguousarray(input_data)

# Perform inference
output_data = do_inference(context, bindings, input_data, d_input, d_output, h_output, stream)

print("Inference output:", output_data)

In [None]:
cuda_context.pop()

In [4]:
def draw_boxes(image, targets):
    image_size = image.shape[:2]
    pad = 0
    
    for target in targets:
        if target[-1] >= 0:
            x_min = (
                int(
                    target[0] * image_size[0]
                    - target[2] * image_size[0] / 2
                )
                + pad
                )
            y_min = (
                int(
                    target[1] * image_size[1]
                    - target[3] * image_size[1] / 2
                )
                + pad
            )
            x_max = pad + int(x_min + target[2] * image_size[0])
            y_max = pad + int(y_min + target[3] * image_size[1])

            draw_bbox_opencv(
                image,
                (x_min, y_min, x_max, y_max),
                "person",
                target[4],
                color=(255, 0, 0),
            )

In [None]:
def perform_inference_onnx(frame, ort_session, cuda=False):
    io_binding = ort_session.io_binding()
    
    device = "cuda" if cuda else "cpu"

    # Adjust the shape of your frame to match the input shape of the model
    # This might involve adding a batch dimension or resizing if necessary
    # For example, if your model expects a batch dimension:
    frame = np.array(frame)
    frame = (frame.T / 255.0).astype(np.float32)
    frame = np.expand_dims(frame, axis=0)

    # Ensure the frame is the expected dtype (e.g., float32)
    frame = frame.astype(np.float32)

    # Create a tensor from the frame for binding
    frame_tensor = onnxruntime.OrtValue.ortvalue_from_numpy(frame, device, 0) # Assuming device_id is 0
    output_tensor = onnxruntime.OrtValue.ortvalue_from_shape_and_type([1, 5, 10, 10, 6], np.float32, device, 0)  # Change the shape to the actual shape of the output being bound
    # Bind the input tensor to the input name
    io_binding.bind_ortvalue_input("input", frame_tensor)
    io_binding.bind_ortvalue_output("output", output_tensor)
    ort_session.run_with_iobinding(io_binding)
    
    # import pdb; pdb.set_trace()

    # Retrieve the outputs
    ort_outs = io_binding.copy_outputs_to_cpu()[0]

    outputs = torch.tensor(ort_outs)
    # print(outputs)
    # outputs = filter_boxes_separate(outputs, 0.5, 0.3)
    outputs = filter_boxes(outputs, 0.3)
    outputs = nms(outputs, 0.25)
    outputs = [np.array(output) for output in outputs]
    targets = np.stack(outputs, axis=0)[0]
    return targets

In [5]:
def perform_inference_torch(frame, net, cuda=False):    
    device = "cuda" if cuda else "cpu"

    # Adjust the shape of your frame to match the input shape of the model
    # This might involve adding a batch dimension or resizing if necessary
    # For example, if your model expects a batch dimension:
    frame = np.array(frame)
    frame = (frame.T / 255.0).astype(np.float32)
    frame = np.expand_dims(frame, axis=0)

    # Ensure the frame is the expected dtype (e.g., float32)
    frame = frame.astype(np.float32)
    frame = torch.tensor(frame)

    outputs = net(frame)
    
    # print(outputs)
    # outputs = filter_boxes_separate(outputs, 0.5, 0.3)
    outputs = filter_boxes(outputs, 0.1)
    outputs = nms(outputs, 0.25)
    outputs = [np.array(output.detach().cpu().numpy()) for output in outputs]
    targets = np.stack(outputs, axis=0)[0]
    return targets

In [None]:
def perform_inference_tensorrt_backend(frame, engine, cuda=False):
    # Adjust the shape of your frame to match the input shape of the model
    # This might involve adding a batch dimension or resizing if necessary
    # For example, if your model expects a batch dimension:
    frame = np.array(frame)
    frame = (frame.T / 255.0).astype(np.float32)
    frame = np.expand_dims(frame, axis=0)

    # Ensure the frame is the expected dtype (e.g., float32)
    frame = frame.astype(np.float32)

    trt_out = engine.run(frame)[0]
    
    outputs = torch.tensor(trt_out)
    # outputs = filter_boxes_separate(outputs, 0.5, 0.3)
    # print(outputs)
    outputs = filter_boxes(outputs, 0.3)
    outputs = nms(outputs, 0.25)
    outputs = [np.array(output) for output in outputs]
    targets = np.stack(outputs, axis=0)[0]
    return targets

In [None]:
def perform_inference_tensorrt(frame):
    # Adjust the shape of your frame to match the input shape of the model
    # This might involve adding a batch dimension or resizing if necessary
    # For example, if your model expects a batch dimension:
    frame = np.array(frame)
    frame = (frame.T / 255.0).astype(np.float32)
    frame = np.expand_dims(frame, axis=0)

    # Ensure the frame is the expected dtype (e.g., float32)
    frame = frame.astype(np.float32)

    # Prepare input data for the model (ensure it matches the expected input shape and type)
    input_data = np.ascontiguousarray(frame)

    # Perform inference
    trt_out = do_inference(context, bindings, input_data, d_input, d_output, h_output, stream)
    
    outputs = torch.tensor(trt_out)
    # outputs = filter_boxes_separate(outputs, 0.5, 0.3)
    # print(outputs)
    outputs = filter_boxes(outputs, 0.1)
    outputs = nms(outputs, 0.25)
    outputs = [np.array(output) for output in outputs]
    targets = np.stack(outputs, axis=0)[0]
    return targets

In [6]:
# Define a callback function (your detection pipeline)
# Make sure to first load all your pipeline code and only at the end init the camera
now = time.time()
def callback(image):
    global now

    fps = f"{int(1/(time.time() - now))}"
    now = time.time()
    
    crop_to_fill = False
    
    if crop_to_fill:
        image = image[0:320, 0:320, :]
    else:
        width = 640
        height = 360
        scale = min(320/width, 320/height)

        image = cv2.resize(image, fx=scale, fy=scale, dsize=None)

        pad_height = max(0, 320 - scale*height)
        top_pad = int(pad_height // 2)
        bottom_pad = int(pad_height - top_pad)

        # Pad the image
        image = cv2.copyMakeBorder(image, top_pad, bottom_pad, 0, 0, cv2.BORDER_CONSTANT, value=(0, 0, 0))


    targets = perform_inference_torch(image, net, False)    
    # targets = perform_inference_onnx(image, ort_session, False)
    # targets = perform_inference_tensorrt_backend(image, engine, False)
    # targets = perform_inference_tensorrt(image)

    draw_boxes(image, targets)
   
    cv2.putText(image, "fps="+fps, (2, 25), cv2.FONT_HERSHEY_SIMPLEX, 1,
                (100, 255, 0), 2, cv2.LINE_AA)
    return image

In [7]:
callback(np.zeros((320, 320, 3), np.uint8)).shape

(300, 160, 3)

In [11]:
# Initialize the camera with the callback
cam = CameraTensorrtDisplay(engine_path="../weights/simplified.trt", crop_to_fill=False)
# cam = CameraDisplay(callback)

Initializing camera...


Image(value=b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x0…

In [13]:
# The camera stream can be started with cam.start()
# The callback gets asynchronously called (can be stopped with cam.stop())
# cam.run()
cam.start()

In [14]:
# The camera should always be stopped and released for a new camera is instantiated (calling CameraDisplay(callback) again)
#cam.stop()
cam.release()

Camera released
