# Python Holoscan Tutorial

Welcome to the Holoscan Python API tutorial! By now, you've learned some of the basic motivation, benefits, and architecture guiding Holoscan application development. Now, let's put some of that theory into practice!

## Our First Holoscan Application

For our first Holoscan Application, we are going to create three operators:
1. PingTxOp
2. PingMiddleOp
3. PingRxOp

`PingTxOp` is our 'source' operator and generates integers that it emits via 2 ports, or streams, to the `PingMiddleOp` operator. This middle operator takes the two intergers transmitted by the source and multiplies both by a number, emitting both results (again, via 2 ports), to the `PingRxOp` operator. This final sink operator simply prints the results.

Below is a diagram of the overall pipeline

<img src="images/MyPingApp.png" width=600/>

### Creating a Holoscan Custom Operator

Prior to creating a custom Holoscan Operator, we must import both `Operator` and `OperatorSpec` from `holoscan.core`
- An operator is the most basic unit of work in this SDK. It receives streaming data at an input port, processes it, and publishes it to one of its output ports.
- An operator can have multiple input and output ports. Further, in the case of a Source operator, where the main goal is to generate data, an operator may not have any input ports; conversely, a sink operator may not have any output ports.

A Holoscan Operator is run based on logic defined in the scheduler. Diving into the guts of the scheduler is beyond the scope of this tutorial, but an Operator is 'run' with each clock tick, or iteration, emitted from the scheduler. Let's create a simple source Operator called PingTxOp that will transmit two outputs at each tick. Remember, since this is a source Operator, there are no input ports!

In addition, the following examples will demonstrate the ability of operators to pass and operate on user-defined data types.

#### User-defined data type

In [None]:
class ValueData:
    """Example of a custom Python class"""

    def __init__(self, value):
        self.data = value

    def __repr__(self):
        return f"ValueData({self.data})"

    def __eq__(self, other):
        return self.data == other.data

    def __hash__(self):
        return hash(self.data)

Here is a simple wrapper for a user-defined data type.

#### Custom Source Operator

In [None]:
class PingTxOp(Operator):
    """Simple transmitter operator.
    This operator has:
        outputs: "out1", "out2"
    On each tick, it transmits a `ValueData` object at each port. The
    transmitted values are even on port1 and odd on port2 and increment with
    each call to compute.
    """

    def __init__(self, *args, **kwargs):
        self.index = 0
        # Need to call the base class constructor last
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.output("out1")
        spec.output("out2")

    def compute(self, op_input, op_output, context):
        value1 = ValueData(self.index)
        self.index += 1
        op_output.emit(value1, "out1")

        value2 = ValueData(self.index)
        self.index += 1
        op_output.emit(value2, "out2")

To define a Custom Operator in Holoscan, the developer must first create a Python class that inherits from the `holoscan.core.Operator` class. In this case, we are creating an Operator named `PingTxOp`.

Regardless of the Operator's function, Custom Holoscan Operators each contain:
- An `__init__` function used to initialize class specific parameters
- A `setup` function used to define input and output ports
- A `compute` function that defines the compute or work done within that Operator

**More on the Setup Function**
`setup` sources from the `OperatorSpec` class which can then be directly used to define that Operator's inputs and outputs. 

To create an input port:
`spec.input("name_of_port")` where "name_of_port" is your name for the operator input port

To create an output port:
`spec.output("name_of_port")` where "name_of_port" is your name for the operator output port

These output port names, of course, must be unique.

**More on the Compute Function**
Most of your application specfic code will take place in the Operator's `compute` function. As you can see in the example above, the `compute` function allows for three separate parameters to be passed in, by default, to the function. We will focus on `op_input` and `op_output` in detail.

`op_input` provides the mechanism to receiving upstream messages and data into the custom Holoscan Operator.

To receive data from an upstream operator:
`value = op_input.receive("name_of_port")` where "name_of_port" is the name of the input port on which data is received. Here, `value` could be an integer, tensor, or other Python data object. Later in the tutorial, we will build applications that invoke compute on the incoming data stream.

To emit data and messages from the operator:
`op_output.emit(value, "name_of_port")` where `value` is the data object you want to send out of the operator and "name_of_port" is the port name to emit the data stream.

### Exercise 1

Now that we have some familarity with building custom Holoscan Operators, please fix the "FIX ME" in the below code to construct a custom Operator named `MyOp` that has following:
- 3 input ports, named `in1`, `in2`, and `in3`
- 2 output ports, named `out1`, `out2`
- In the `compute` function, always emit the data received on `in1` with the `out1` port
- Emit `in2` with `out2` if the Operator tick (hint: leverage `self.index`) is odd and `in3` with `out2` if the tick is even

In [None]:
class <<<FIX ME>>>(<<<FIX ME>>>):

    def __init__(self, *args, **kwargs):
        self.index = 0
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.input(<<<FIX ME>>>)
        spec.input(<<<FIX ME>>>)
        spec.input(<<<FIX ME>>>)
        spec.output(<<<FIX ME>>>)
        spec.output(<<<FIX ME>>>)

    def compute(self, op_input, op_output, context):
        
        # Always emit in1 value in out1 port
        in1_value = op_input.receive(<<<FIX ME>>>)
        op_output.emit(in1_value, <<<FIX ME>>>)

        # each input needs to be received, regardless of utilization
        # Even if in2 and in3 won't be utilized at the same time,
        # they still both need to be be received
        in2_value = op_input.receive(<<<FIX ME>>>)
        in3_value = op_input.receive(<<<FIX ME>>>)
        
        # If tick is even
        if <<<FIX ME>>>:   
            op_output.emit(in2_value, "out2")
        else:
            op_output.emit(in3_value, "out2")
        
        self.index += 1


Click [here](scripts/answers/ex1.py) to view the solution.

#### Custom Middle Operator

In [None]:
class PingMiddleOp(Operator):
    """Example of an operator modifying data.
    This operator has:
        inputs:  "in1", "in2"
        outputs: "out1", "out2"
    The data from each input is multiplied by a user-defined value.
    In this demo, the `multiplier` parameter value is read from a "ping.yaml"
    configuration file (near the bottom of this script), overriding the default
    defined in the setup() method below.
    """

    def __init__(self, *args, **kwargs):
        # If `self.multiplier` is set here (e.g., `self.multiplier = 4`), then
        # the default value by `param()` in `setup()` will be ignored.
        # (you can just call `spec.param("multiplier")` in `setup()` to use the
        # default value)
        
        # self.multiplier = 4
        self.count = 1

        # Need to call the base class constructor last
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.input("in1")
        spec.input("in2")
        spec.output("out1")
        spec.output("out2")
        spec.param("multiplier", 2)

    def compute(self, op_input, op_output, context):
        value1 = op_input.receive("in1")
        value2 = op_input.receive("in2")
        print(f"Middle message received (count: {self.count})")
        self.count += 1

        print(f"Middle message value1: {value1.data}")
        print(f"Middle message value2: {value2.data}")

        # Multiply the values by the multiplier parameter
        value1.data *= self.multiplier
        value2.data *= self.multiplier

        op_output.emit(value1, "out1")
        op_output.emit(value2, "out2")

This middle operator receives two channels of input data from `PingTxOp` and then multiplies each output by a certain value. The resulting integer is then emitted out of the middle operator via its two ports.

While `PingMiddleOp` is performing more work than the `PingTxOp` Operator, one can immediately find similaries in both structure and implementation between the two. Providing boiler-plate mechanisms to build applications is the soul of Holoscan!

Many Holoscan Operators are accompanied by a YAML configuration file that defines specific parameters to be used in that Operator. We will look at more examples later, but for AI inferencing Operators, these YAML files define things like the path to a trained ONNX model, a TensorRT inference engine, and data paths. By using configuration files, we can simplify the application code and improve portability.

In this example, we are obtaining the `multiplier` value from a corresponding [ping.yaml](scripts/ping/ping.yaml) file, and referenced with `spec.param`. This could easily be defined within the operator, however.

#### Custom Sink Operator

In [None]:
class PingRxOp(Operator):
    """Simple receiver operator.
    This operator has:
        input: "receivers"
    This is an example of a native operator that can dynamically have any
    number of inputs connected to is "receivers" port.
    """

    def __init__(self, *args, **kwargs):
        self.count = 1
        # Need to call the base class constructor last
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.param("receivers", kind="receivers")

    def compute(self, op_input, op_output, context):
        values = op_input.receive("receivers")
        print(f"Rx message received (count: {self.count}, size: {len(values)})")
        self.count += 1
        print(f"Rx message value1: {values[0].data}")
        print(f"Rx message value2: {values[1].data}")

You should be a pro by now! With this `PingRxOp` sink Operator, we won't have any output messages; this Operator is mainly used to print the message received from an upstream operator. Other sink operator examples include a Visualizer (we'll speak more about HoloViz later) or audio/signal playback tools.

For this specific Operator, we are showing how to programmatically define input ports with the special `receivers` parameter (note the `kind='receivers'` keyword argument); in this case, we can dynamically have any number of inputs connected to the Operator.

### Connect Operators to Form an Application

In [None]:
from holoscan.conditions import CountCondition
from holoscan.core import Application

In [None]:
class MyPingApp(Application):
    def compose(self):
        # Configure the operators. Here we use CountCondition to terminate
        # execution after a specific number of messages have been sent.
        tx = PingTxOp(self, CountCondition(self, 10), name="tx")
        mx = PingMiddleOp(self, self.from_config("mx"), name="mx")
        rx = PingRxOp(self, name="rx")

        # Connect the operators into the workflow:  tx -> mx -> rx
        self.add_flow(tx, mx, {("out1", "in1"), ("out2", "in2")})
        self.add_flow(mx, rx, {("out1", "receivers"), ("out2", "receivers")})

Now that we have a series of custom Holoscan Operators, we need to define data movement by connecting them all together in an application.

Similar to Holoscan Operators, a Holoscan Application inherits from the `holoscan.core.Application` module. It contains a single function, `compose` (in addition to `__init__` that isn't shown in the example above). Within the `compose` function, we instantiate our custom (or core - to be discussed later) operators, name them, and connect them together with `self.add_flow`.

When connecting operators, `add_flow` takes the following arguments:
- Upstream operator
- Downstream operator
- How ports are connected - these are defined by a Python set of 2-tuples specifying the port names to connect, e.g. {(source_port_name, destination_port_name)}

In the example above, `self.add_flow(tx, mx, {("out1", "in1"), ("out2", "in2")})` connects the `PingTxOp` to `PingMxOp` and maps the output of `out1` in `PingTxOp` to the input `in1` in `PingMxOp` along with the output of `out1` in `PingTxOp` to the input `in2` in `PingMxOp`.

One other parameter you may notice is `CountCondition(self, 10)` in the source Operator. This helps define how often the Operator 'ticks', or is executed sequentially. There are a few control options:
- CountCondtion - Tick for N number of times
- BooleanCondition - Programmatically tick (e.g. always tic until false)
- MessageAvailableCondition - Tick when message is received

### Run Ping Application

In order to run the `MyPingApp`, please run the cell below and pay close attention to the output. Please note, this ping application leverages a config yaml specified in the source code when launching the application.

```python
if __name__ == "__main__":
    app = MyPingApp()
    app.config(os.path.join(os.path.dirname(__file__), "ping.yaml"))
    app.run()
```

If you don't have a yaml config file, use `app.config("")`. 

In [None]:
!python scripts/ping/ping.py

### Exercise 2

Now it's your turn to code! Modify [ping-pow.py](scripts/exercises/ping-pow.py) to create a new operator called `Pow2on2` that squares the incoming integers if the clock tick is even. Otherwise, pass through the data. Be sure to swap out the `PingMiddleOp` above with `Pow2on2` and examine the output to ensure correctness.

In [None]:
# TO-DO
# 1) edit ping-pow.py (link above)

# 2) Run application using the following command
!python scripts/exercises/ping-pow.py

Click [here](scripts/answers/ex2.py) to view the solution.

## Building Holoscan Applications with GPU-Accelerated Python Packages

Let's move onto a more interesting example. One of the most exciting features of the Holoscan Python APIs is that it supports passing Python Objects between operators. This means that developers can easily leverage existing GPU-accelerated Python libraries in their streaming sensor workflows like:
- [CuPy](https://cupy.dev) - A GPU-accelerated version of NumPy for Numerical Computing
- [RAPIDS](https://rapids.ai) - A colleciton of GPU-accelerated data science libraries
- [PyTorch](https://pytorch.org) - GPU-accelerated deep learning framework for AI training and inferencing
- [JAX](https://github.com/google/jax) - Differentiable numerical computing library with GPU support
- [Numba](https://numba.pydata.org/) - LLVM JIT compiler that allows for the formulaic construction of GPU kernels in Python

Let's look at an example.

### Fourier Transforms with CuPy and Holoscan

In [None]:
import cupy as cp

In [None]:
class FFTOp(Operator):
    def setup(self, spec: OperatorSpec):
        spec.input("time_in")
        spec.output("frequency_out")

    def compute(self, op_input, op_output, context):
        sig = op_input.receive("time_in")
        op_output.emit(cp.fft.fft(sig), "sig_out")

In this example, we create a custom `FFTOp` Holoscan Operator that accepts an input data stream, runs an FFT with CuPy on that tensor, and emits the result out of the operator. One thing to note is that we can simpy run the `cupy.fft` function on the data captured from Holoscan's `op_input.receive` function. There are, however, certain cases where the developer needs use a "Holoscan Tensor", notably when they're leveraging Operators that are packaged with the Holoscan SDK. We discuss this in depth below.

### Exercise 3

Create a Holoscan application that consists of three operators:
1. SourceOp - Emits 3 ticks of random arrays of size 32,768 samples
2. FFTOp - Performs FFT on data streaming from SourceOp
3. SinkOp - Prints results of FFTOp

Click here to open [fftapp.py](scripts/exercises/fftapp.py). Please note, we do not need a config file for this example.

As a hint, one can create a random valued array in cupy with `cupy.random.randn(size_of_array)`

In [None]:
# TO-DO
# 1) edit fftapp.py (link above)

# 2) Run application using the following command
!python scripts/exercises/fftapp.py

Click [here](scripts/answers/ex3.py) to view the solution.

### Decorators: turn any function into an operator
Decorators have been introduced in HSDK-2.3.0. 

Decoraters come in handy when one wishes to turn a data analysis script into a production-ready Holoscan pipeline. With decorators, one can turn any function into an operator, which greatly reduces the refactoring overhead associated with Holoscan adoption.

Quick note: this is still an experimental feature - API is still under development. The HSDK team is accepting user community feedback and will be happy to update the API in the upcoming releases.

Here is an example of the fft app, but written with decorators.

In [None]:
from holoscan.conditions import CountCondition
from holoscan.core import Application
from holoscan.decorator import create_op
import cupy as cp

@create_op(outputs="signal")
def src_func():
    signal = cp.random.randn(32768)
    return signal

@create_op(inputs="signal", outputs="signal_fft")
def fft_func(signal):
    return cp.fft.fft(signal)

@create_op(inputs="signal_fft")
def sink_func(signal_fft):
    print(signal_fft)


class FFTApp_with_decorators(Application):
    def compose(self):
        src = src_func(self, CountCondition(self, 3), name='src_op')
        fft = fft_func(self, name='fft_op')
        sink = sink_func(self, name='sink_op')

        # Connect the operators into the workflow:  src -> fft -> sink
        self.add_flow(src, fft)
        self.add_flow(fft, sink)

if __name__ == "__main__":
    app = FFTApp_with_decorators()
    app.run()

If the pipeline is non-linear, i.e. different data streams move across different operators, decorators can be configured to receive/transmit only relevant variables using helper classes `Input` and `Output`. Here is an example of a pipeline where `signal1` goes through an FFT operator and `signal2` is doubled in amplitude.

In [None]:
from holoscan.decorator import Input, Output
@create_op(outputs=(Output("signal1", tensor_names=("signal1",)),
                    Output("signal2", tensor_names=("signal2",))))
def src_func():
    signal1 = cp.random.randn(32768)
    signal2 = cp.random.randn(32768)
    return {"signal1": signal1,
            "signal2": signal2} # note that this is now a dict

# This one is rewritten to demonstate handling of input/output variables in the most general case
@create_op(inputs=(Input("signal1", arg_map="signal1"),),
           outputs=(Output("signal1_fft", tensor_names=("signal1_fft",)),))
def fft_func(signal1):
    return {"signal1_fft": cp.fft.fft(signal1)}

# In linear sections of pipelines details in decorator configurations can be omitted
@create_op(inputs="signal2", outputs="signal2_double")
def double_func(signal2):
    return signal2 * 2

@create_op(inputs=(Input("signal1_fft", arg_map="signal1_fft"),
                   Input("signal2_double", arg_map="signal2_double")))
def sink_func(signal1_fft, signal2_double):
    print(f"{signal1_fft=}")
    print(f"{signal2_double=}")


class FFTApp_with_decorators(Application):
    def compose(self):
        src = src_func(self, CountCondition(self, 3), name='src_op')
        fft = fft_func(self, name='proc_op')
        double = double_func(self, name='double_func')
        sink = sink_func(self, name='sink_op')

        # Connect the operators into the workflow:  src -> fft -> sink
        self.add_flow(src, fft, {("signal1", "signal1")})
        self.add_flow(src, double, {("signal2", "signal2")})
        self.add_flow(fft, sink, {("signal1_fft", "signal1_fft")})
        self.add_flow(double, sink, {("signal2_double", "signal2_double")})

if __name__ == "__main__":
    app = FFTApp_with_decorators()
    app.run()

### Challenge Exercise

Create a Holoscan application that performs a matrix multiplication between a constant matrix and an updating one. This application should:
1. Include a SourceOp that emits 5 ticks of 1000x10 random values that updates with each tick along with a 1000x1000 random value matrix that doesn't change.
2. Include a MatMulOp that performs the matrix multiplication
3. Include a SinkOp that prints the last row in the resulting matrix

Click here to open [challenge-exercise.py](scripts/exercises/challenge-exercise.py).

As a hint, one can create a random generator via `rng = cp.random.default_rng()` and then create a single precision random valued matrix with `rng.standard_normal((num_rows, num_cols), dtype=cp.float32)`.

In [None]:
# TO-DO

# 1) Update challenge-exercise.py (link above)

# 2) Run application using the following command
!python scripts/exercises/challenge-exercise.py

Click [here](scripts/answers/ex3-challenge.py) to view the solution.

## Using Pre-Built Holoscan Operators

We have currently only discussed creating custom Operators, but there are a selection of optimized Operators that come packaged with the Holoscan SDK. These are available within the `holoscan.operators` and `holohub` module and include Operators that define commonly used functions when building real-time streaming AI pipelines.

A few examples:
- **Video Stream Replayer** - Outputs video frames as Holoscan Tensor objects
- **Holoviz** - High speed viewer that handles composing, blending, and visualzation of RBG and RGBA images, masks, geometric primitives, and text
- **AJA Source** - Enables GPU-Direct RDMA on Quadro/RTX GPUs with AJA capture card
- **Inference Module** - Provides capabilites to execute AI inference on one or multiple models

These core operators can be used just like the custom ones above and are typically configured via an appliation YAML file that specifies items like image size, model location, TensorRT engine location, and other important parameters.

You can find out more about the Core Holoscan Operators by reading the [User Guide](https://docs.nvidia.com/holoscan/sdk-user-guide/holoscan_operators_extensions.html).

Let's see how some of these pre-built Operators can be leveraged in an AI-enabled sensor pipeline.

### TAO PeopleNet Detection Model on V4L2 Video Stream - A Basic Workflow

In this example, we use the TAO PeopleNet available on NGC to detect faces and people in a V4L2 supported video stream. This pipeline makes use of 4 core Holoscan Operators:
1. Video Stream Replayer
2. Format Converter - Converts data type of the image from `uint8` to `float32` and resizes the image
3. Inference Module - Performs inference of the segmentation model with TensorRT
4. Holoviz - Shows tool tracking results

<img src="images/face_and_people_detection_app.png" width=1000/>

In [None]:
import cupy as cp
import holoscan as hs
import numpy as np
from holoscan.core import Application, Operator, OperatorSpec
from holoscan.gxf import Entity
from holoscan.operators import (
    VideoStreamReplayerOp,
    FormatConverterOp,
    InferenceOp,
    HolovizOp,
)
from holoscan.resources import UnboundedAllocator
from holoscan.schedulers import GreedyScheduler
from argparse import ArgumentParser 


In [None]:
class PreprocessorOp(Operator):
    """Operator to format input image for inference"""
    def setup(self, spec: OperatorSpec):
        spec.input("in")
        spec.output("out")

    def compute(self, op_input, op_output, context):
        # Get input message
        in_message = op_input.receive("in")

        # Transpose
        tensor = cp.asarray(in_message.get("preprocessed")).get()
        # OBS: Numpy conversion and moveaxis is needed to avoid strange
        # strides issue when doing inference
        tensor = np.moveaxis(tensor, 2, 0)[None]
        tensor = cp.asarray(tensor)

        # Create output message
        out_message = {"preprocessed": tensor}
        op_output.emit(out_message, "out")

In [None]:
class PostprocessorOp(Operator):
    """Operator to post-process inference output:
    * Reparameterize bounding boxes
    * Non-max suppression
    * Make boxes compatible with Holoviz

    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.input("in")
        spec.output("out")
        spec.param("iou_threshold", 0.15)
        spec.param("score_threshold", 0.5)
        spec.param("image_width", None)
        spec.param("image_height", None)
        spec.param("box_scale", None)
        spec.param("box_offset", None)
        spec.param("grid_height", None)
        spec.param("grid_width", None)

    def compute(self, op_input, op_output, context):
        # Get input message
        in_message = op_input.receive("in")

        # Convert input to cupy array
        boxes = cp.asarray(in_message.get("boxes"))[0, ...]
        scores = cp.asarray(in_message.get("scores"))[0, ...]

        # PeopleNet has three classes:
        # 0. Person
        # 1. Bag
        # 2. Face
        # Here we only keep the Person and Face classes
        boxes = boxes[[0, 1, 2, 3, 8, 9, 10, 11], ...][None]
        scores = scores[[0, 2], ...][None]

        # Loop over label classes
        out = {"person": None, "faces": None}
        for i, label in enumerate(out):
            # Reparameterize boxes
            out[label], scores_nms = self.reparameterize_boxes(
                boxes[:, 0 + i * 4 : 4 + i * 4, ...],
                scores[:, i, ...][None],
            )

            # Non-max suppression
            out[label], _ = self.nms(out[label], scores_nms)

            # Reshape for HoloViz
            if len(out[label]) == 0:
                out[label] = np.zeros([1, 2, 2]).astype(np.float32)
            else:
                out[label][:, [0, 2]] /= self.image_width
                out[label][:, [1, 3]] /= self.image_height
                out[label] = cp.reshape(out[label][None], (1, -1, 2))

        # Create output message
        op_output.emit(out, "out")

    def nms(self, boxes, scores):
        """Non-max suppression (NMS)

        Parameters
        ----------
        boxes : array (4, n)
        scores : array (n,)

        Returns
        ----------
        boxes : array (m, 4)
        scores : array (m,)

        """
        if len(boxes) == 0:
            return cp.asarray([]), cp.asarray([])

        # Get coordinates
        x0, y0, x1, y1 = boxes[0, :], boxes[1, :], boxes[2, :], boxes[3, :]

        # Area of bounding boxes
        area = (x1 - x0 + 1) * (y1 - y0 + 1)

        # Get indices of sorted scores
        indices = cp.argsort(scores)

        # Output boxes and scores
        boxes_out, scores_out = [], []

        # Iterate over bounding boxes
        while len(indices) > 0:
            # Get index with highest score from remaining indices
            index = indices[-1]

            # Pick bounding box with highest score
            boxes_out.append(boxes[:, index])
            scores_out.append(scores[index])

            # Get coordinates
            x00 = cp.maximum(x0[index], x0[indices[:-1]])
            x11 = cp.minimum(x1[index], x1[indices[:-1]])
            y00 = cp.maximum(y0[index], y0[indices[:-1]])
            y11 = cp.minimum(y1[index], y1[indices[:-1]])

            # Compute IOU
            width = cp.maximum(0, x11 - x00 + 1)
            height = cp.maximum(0, y11 - y00 + 1)
            overlap = width * height
            union = area[index] + area[indices[:-1]] - overlap
            iou = overlap / union

            # Threshold and prune
            left = cp.where(iou < self.iou_threshold)
            indices = indices[left]

        # To array
        boxes = cp.asarray(boxes_out)
        scores = cp.asarray(scores_out)

        return boxes, scores

    def reparameterize_boxes(self, boxes, scores):
        """Reparameterize boxes from corner+width+height to corner+corner.

        Parameters
        ----------
        boxes : array (1, 4, grid_height, grid_width)
        scores : array (1, 1, grid_height, grid_width)

        Returns
        ----------
        boxes : array (4, n)
        scores : array (n,)

        """
        cell_height = self.image_height / self.grid_height
        cell_width = self.image_width / self.grid_width

        # Generate the grid coordinates
        mx, my = cp.meshgrid(cp.arange(self.grid_width), cp.arange(self.grid_height))
        mx = mx.astype(np.float32).reshape((1, 1, self.grid_height, self.grid_width))
        my = my.astype(np.float32).reshape((1, 1, self.grid_height, self.grid_width))

        # Compute the box corners
        xmin = -(boxes[0, 0, ...] + self.box_offset) * self.box_scale + mx * cell_width
        ymin = -(boxes[0, 1, ...] + self.box_offset) * self.box_scale + my * cell_height
        xmax = (boxes[0, 2, ...] + self.box_offset) * self.box_scale + mx * cell_width
        ymax = (boxes[0, 3, ...] + self.box_offset) * self.box_scale + my * cell_height
        boxes = cp.concatenate([xmin, ymin, xmax, ymax], axis=1)

        # Select the scores that are above the threshold
        scores_mask = scores > self.score_threshold
        scores = scores[scores_mask]
        scores_mask = cp.repeat(scores_mask, 4, axis=1)
        boxes = boxes[scores_mask]

        # Reshape after masking
        n = int(boxes.size / 4)
        boxes = boxes.reshape(4, n)

        return boxes, scores

In [None]:
class PeopleAndFaceDetectApp(Application):
    def __init__(self, data_path, model_path, *args, **kwargs):
        """Initialize the face and people detection application"""
        super().__init__(*args, **kwargs)
        self.name = "People and Face Detection App"
        self.sample_data_path = data_path
        self.model_path = model_path

    def compose(self):
        pool = UnboundedAllocator(self, name="pool")

        # Video source operator
        source = VideoStreamReplayerOp(
            self,
            name="replayer_source",
            directory=self.sample_data_path,
            **self.kwargs("replayer_source"),
        )

        # Format converter operator
        preprocessor_args = self.kwargs("preprocessor")
        format_converter = FormatConverterOp(
            self,
            name="preprocessor",
            pool=pool,
            **preprocessor_args,
        )

        # Preprocessor operator
        preprocessor = PreprocessorOp(
            self,
            name="transpose",
            pool=pool,
        )

        # Inference operator
        inference_args = self.kwargs("inference")
        inference_args["model_path_map"] = {
            "face_detect": os.path.join(self.sample_data_path, "resnet34_peoplenet_int8.onnx")
        }

        inference = InferenceOp(
            self,
            name="inference",
            allocator=pool,
            **inference_args,
        )

        # Postprocessor operator
        postprocessor_args = self.kwargs("postprocessor")
        postprocessor_args["image_width"] = preprocessor_args["resize_width"]
        postprocessor_args["image_height"] = preprocessor_args["resize_height"]
        postprocessor = PostprocessorOp(
            self,
            name="postprocessor",
            allocator=pool,
            **postprocessor_args,
        )

        # Vizualization operator
        holoviz = HolovizOp(self,
                            allocator=pool,
                            name="holoviz",
                            headless=True, # this True to run the app on the cluster (see below)
                            **self.kwargs("holoviz"))


        self.add_flow(source, holoviz, {("output", "receivers")})
        self.add_flow(source, format_converter)
        self.add_flow(format_converter, preprocessor)
        self.add_flow(preprocessor, inference, {("", "receivers")})
        self.add_flow(inference, postprocessor, {("transmitter", "in")})
        self.add_flow(postprocessor, holoviz, {("out", "receivers")})


if __name__ == "__main__":
    config_file = os.path.join(os.path.dirname(__file__), "tao_peoplenet.yaml")
    data_path = os.path.join(os.path.dirname(__file__), "data/")
    model_path = os.path.join(os.path.dirname(__file__), "data/resnet34_peoplenet_int8.onnx")

    app = PeopleAndFaceDetectApp(data_path, model_path)
    app.config(config_file)
    scheduler = GreedyScheduler(app, name="greedy_scheduler", max_duration_ms=5000)
    app.scheduler(scheduler)

    app.run()

Let's take a look at the contents of the [config file](scripts/tao_peoplenet/tao_peoplenet.yaml) as well:
```python
replayer_source:
  basename: "people"
  frame_rate: 0   # as specified in timestamps
  repeat: true    # default: false
  count: 0        # default: 0 (no frame count restriction)

preprocessor:
  in_dtype: "rgb888" # input data type for format converter
  out_tensor_name: preprocessed
  out_dtype: "float32"
  resize_width: 960
  resize_height: 544
  scale_min: 0.0
  scale_max: 1.0

inference:
  backend: "trt"
  pre_processor_map:
    "face_detect": ["preprocessed"]
  inference_map:
    "face_detect": ["scores", "boxes"]
  device_map:
    "face_detect": "0"
  input_on_cuda: true
  is_engine_path: false

postprocessor:
  iou_threshold: 0.15
  score_threshold: 0.5
  box_scale: 35.0
  box_offset: 0.5
  grid_height: 34
  grid_width: 60

holoviz:
  tensors:
    - name: ""
      type: color
    - name: faces
      type: rectangles
      opacity: 0.5
      line_width: 4
      color: [1.0, 0.0, 0.0, 1.0]
    - name: person
      type: rectangles
      opacity: 0.5
      line_width: 4
      color: [0.0, 1.0, 0.0, 1.0]
```

### Results

Now the app can be run! Here we can see the output of the app. Note that on the first launch the app will build the NN engine to run it on the specific GPU that is available on the machine. 


Unfortunately, HoloViz launches an X11 display window that isn't well suited for use within a Jupyter Notebook and/or cluster environment. Since the output cannot be seen, we time out the app after 5 seconds. Because of this timeout, one might need to run the following snippet twice since the NN engine building can take a bit more time.

In [None]:
!python ./scripts/tao_peoplenet/tao_peoplenet.py

If you took this code on your own machine and ran it, you'd see the following results.

In [2]:
from IPython.display import Video
Video("./scripts/tao_peoplenet/data/people_processed.webm")

## Tensor Interoperability and Combining Custom Operators with Core Operators

An operator is the most basic unit of work in this framework. An Operator receives streaming data at an input port, processes it, and publishes it to one of its output ports. The operators included in the SDK provide domain-agnostic functionalities such as IO, machine learning inference, processing, and visualization, they are optimized for AI streaming pipelines.

When assembling a C++ application, two types of [operators](https://docs.nvidia.com/holoscan/sdk-user-guide/holoscan_create_operator.html#c-operators) can be used: native C++ operators and GXF Operators.
When assembling a Python application, two types of [operators](https://docs.nvidia.com/holoscan/sdk-user-guide/holoscan_create_operator.html#python-operators) can be used: native Python operators and Python wrappings of C++ Operators.

One of the Holoscan core functionalities is the interoperability between wrapped and native Python operators.

For you, the developer, this means:
- There is zero copy data movement as one transitions from core Holoscan operators to custom ones
- Holoscan supports the [`__cuda_array_interface__`](https://numba.readthedocs.io/en/stable/cuda/cuda_array_interface.html), meaning there is zero copy data movement between C++ and Python operators, in addition to between Python library that support the array standadr. With this support, we can use GPU-accelerated Python libraries such as CuPy, PyTorch, RAPIDs directly in native Holoscan Python operators.

To read a Holoscan/GXF tensor object into an Operator and convert it into a CuPy array


```python
def compute(self, op_input, op_output, context):
    # The op_input.receive() method call returns a dict object: in_message
    in_message = op_input.receive("input_tensor")
        
    # go through the dict where the key is the tensor name and the value is the tensor
    for key, value in in_message.items():
        # each tensor in in_message is now tuend into a CuPy array
        cp_array = cp.asarray(value)
```

If we already know the tensor name we want to access in the input message, for example "name1", we can access it by:

```python
def compute(self, op_input, op_output, context):
    in_message = op_input.receive("input_tensor")
    specific_tensor = in_message["name1"]
    cp_array = cp.asarray(specific_tensor)
```


To convert a CuPy array to a Holoscan/GXF tensor object and emit it from an Operator
```python
def compute(self, op_input, op_output, context):
    in_message = op_input.receive("input_tensor")
    # out_message is of dict
    out_message = dict()
    # iterate through in_message to 
    for key, value in in_message.items():
        # each tensor in in_message is now tuend into a CuPy array
        cp_array = cp.asarray(value)
        # modify each CuPy array with your processing logic here   
        # ...      
        out_message[key] = cp_array
    # op_output emits a dict object, with the same format: 
    # the key is the tensor name and the value is the tensor
    op_output.emit(out_message, "output_tensor")
```

### Exercise 4

Build a custom operator that accepts two tensor input streams from a Core Holoscan Operator at the custom operator's ports `input_1` and `input_2`. We only want to convert two tensors with names `"original_value"` from `input_1` and `"current_value"` from `input_2` to CuPy arrays and take the difference of these two tensors, regardless of which other tensors may be present in ports `input_1` and `input_2`. Emit the result from an operator with the knowledge that the next in the application pipeline is also a Core Holoscan Operator, through an output port `output` and with the tensor name `"diff"`.

In [None]:
#TO-DO


Click [here](scripts/answers/ex4.py) to view the solution.

## Optimizing Your Application
### Performance Tool - Data Flow Tracking

The Holoscan SDK provides the [Data Flow Tracking](https://docs.nvidia.com/holoscan/sdk-user-guide/flow_tracking.html) APIs as a mechanism to profile your application and analyze the data flow between operators in the graph of a fragment.

Enabling flow tracker is easy. We modify an existing app definition and app run command:
```python
app = MyApp()
app.run()
```
to include using data flow tracking:
```python
from holoscan.core import Tracker

app = MyApp()
# set the tracker params
with Tracker(app, filename="logger.log", num_start_messages_to_skip=2, num_last_messages_to_discard=3) as tracker:
    app.run()
    tracker.print() # this is optional
```
Let's take a look at these two details:

`filename="logger.log"`: When logging is enabled by specifying `filename`, every message’s received and sent timestamps at every operator between the root and the leaf operators are logged after a message has been processed at the leaf operator.

`tracker.print()`: Calling `print()` prints all data flow tracking results including end-to-end latencies and the number of source messages to the standard output.

### Scheduler Selection
It is essential to select the right scheduler for your use case at hand to ensure optimal performance and efficient resource utilization.

The Holoscan SDK offers the following schedulers:
* Greedy Scheduler: This basic single-threaded scheduler tests conditions in a greedy manner. Suitable for simple use cases and provides predictable execution. However, it may not be ideal for large-scale applications as it may incur significant overhead in condition execution.

* MultiThread Scheduler: It is designed to handle complex execution patterns in large-scale applications. This scheduler consists of a dispatcher thread that periodically polls the status of each operator and dispatches it to a thread pool of worker threads responsible for executing them. Once execution is complete, worker threads enqueue the operator back on the dispatch queue. The MultiThread Scheduler offers superior performance and scalability over the Greedy Scheduler.

* Event-Based Scheduler: The event-based scheduler is also a multi-thread scheduler, but as the name indicates it is event-based rather than polling based. Instead of having a thread that constantly polls for the execution readiness of each operator, it instead waits for an event to be received which indicates that an operator is ready to execute. The event-based scheduler will have a lower latency than using the multi-thread scheduler with a long polling interval (check_recession_period_ms), but without the high CPU usage seen for a multi-thread scheduler with a very short polling interval.

To set the MultiThread or Event-based scheduler for your app, it is simple via `app.scheduler()`.

Note: Explicitly setting GreedyScheduler is not strictly required as it is the default.

```python
app = MyApp()
app.config("config-file-name.yaml")

greedy_scheduler = GreedyScheduler(
    app,
    max_duration_ms=-1, # setting this to -1 will make the app run until all work is done; if positive number is given, the app will run for this amount of time (in ms)
    stop_on_deadlock=True,
    stop_on_deadlock_timeout=500, # in ms
    name="greedy_scheduler",
)

multithread_scheduler = MultiThreadScheduler(
    app,
    worker_thread_number=8,
    check_recession_period_ms=5, # time periods (in ms) between polling operators
    max_duration_ms=-1,
    stop_on_deadlock=True,
    stop_on_deadlock_timeout=500, # in ms
    name="multithread_scheduler",
)

event_based_scheduler = EventBasedScheduler(
    app,
    worker_thread_number=8,
    max_duration_ms=-1,
    stop_on_deadlock=True,
    stop_on_deadlock_timeout=500, # in ms
    name="event_based_scheduler",
)


app.scheduler(multithread_scheduler)
app.run()
```

### Seeing the Impact of Scheduler Selection with Flow Tracker
In this example, we will see the impact of scheduler selection on the application latency, measured by the Flow Tracker. In this simple app, there is one transmitter, many delay operators defined by `num_delay_ops`, and one receiver. The app only emits a signal once from the transmitter then terminates.

<img src="images/multithread.png" width=600/>

The latency range below will be different on each instance depending on the hardware, here the latency numbers are just for reference.

With the Greedy Scheduler, with 32 delay ops, the avg latency is 3220 ms.

With the Multithread Scheduler, with 32 delay ops, 8 threads, 5 ms recess time, the avg latency is 415 ms.

With the Event-Based Scheduler, with 32 delay ops, 8 threads, the avg latency is 410 ms.

### Exercise 5
Run the application defined in [tracker_and_schedulers.py](./scripts/flow_tracker/tracker_and_schedulers.py) to see the latencies with different configurations on your instance.

In [None]:
# Using 8 threads, with 32 parallel delays ops each delaying 0.1 in the command below.
# Uncomment the appropriate line to run the app with a selected scheduler
# Modify the --threads parameter to set the number of threads for the MultiThread or Event-based schedulers.

! python ./scripts/flow_tracker/tracker_and_schedulers.py --scheduler "greedy" --threads 1 --num_delay_ops 32 --delay 0.1 --delay_step 0.00
# ! python ./scripts/flow_tracker/tracker_and_schedulers.py --scheduler "multithread" --threads 8 --recession 5 --num_delay_ops 32 --delay 0.1 --delay_step 0.00
# ! python ./scripts/flow_tracker/tracker_and_schedulers.py --scheduler "event-based" --threads 8 --num_delay_ops 32 --delay 0.1 --delay_step 0.00

## The Ability to Create a Distributed Application
Distributed applications refer to those where the workflow is divided into multiple fragments that may be run on separate nodes. For example, data might be collected via a sensor at the edge, sent to a separate workstation for processing, and then the processed data could be sent back to the edge node for visualization. 

The multi fragment feature is built on UCX, which can utilize highly optimized network connectivity such as RDMA.

The multi fragment functionality is useful in scenarios where we need to:
* Scale one application across many copies, on many hardware locations;
* Isolate multiple compute workloads, such as CUDA processing, AI inferencing, and visualization;
* Have additional compute;
* Visualize data from sensors that are physically far away;

A single fragment consists of a computation graph built up of operators, and is equivalent to a non-distributed application.

We can define each fragment with its operator(s) similarly to how we can define an app: create the operators, use `self.app_operator` to add the operator(s) to each fragment, and use `self.add_flow` among operators within each fragment.

```python
class Fragment1(Fragment):
    def compose(self):
        # Configure the operators. Here we use CountCondition to terminate
        # execution after a specific number of messages have been sent.
        up = UpstreamOp(self, CountCondition(self, 10), name="upstream")
        middle = MiddleOp (self, name = "middle")

        # Add the operators to the fragment
        self.add_operator(up)
        self.add_operator(middle)
        
        # Add connection
        self.add_flow(up, middle)
```
Then in the application, we create each fragment, then use `self.add_flow` to connect the fragments.

```python
class MyDistributedApp(Application):
    def compose(self):
        fragment1 = Fragment1(self, name="fragment1")
        fragment2 = Fragment2(self, name="fragment2")
        self.add_flow(fragment1, fragment2, {("frag1_out_op.out", "frag2_in_op.in")})
```

### Exercise 6
Modify `MyPingApp` defined below to separate `PingTxOp` and `PingRxOp` into two fragments. Assume `PingTxOp` has output port name `out`, `PingRxOp` has input port name `in`. use CountCondition. End the app run after 10 messages had been sent.


```python
from holoscan.conditions import CountCondition
from holoscan.core import Application, Fragment
from holoscan.operators import PingRxOp, PingTxOp


class MyPingApp(Application):
    def compose(self):
        # Define the tx and rx operators, allowing tx to execute 10 times
        tx = PingTxOp(self, CountCondition(self, 10), name="tx")
        rx = PingRxOp(self, name="rx")

        # Define the workflow:  tx -> rx
        self.add_fow(tx, rx)


if __name__ == "__main__":
    app = MyPingApp()
    app.run()
```

In [None]:
# TO-DO


Click [here](scripts/answers/ex6.py) to view the solution.

## Application Packaging

A Holoscan application can be converted into a [Holoscan Application Package (HAP)](https://docs.nvidia.com/holoscan/sdk-user-guide/holoscan_packager.html) which is an extension of the MONAI Application Package (MAP) specification. A HAP is a containerized run-time environment for a Holoscan application or service, and is built using the Holoscan application package command-line utility. Once the application package has been created, the application can be easily deployed using the Holoscan application packager utility.

A HAP is a container image conforming to Holoscan specifications. The primary component of a HAP is the application which is provided by the developer and incorporated into the HAP using the Holoscan Application Packager. Application code and binaries are located in the /opt/holoscan/app/ folder, except for any dependencies installed by the Holoscan Application Packager during the creation of the HAP. All AI models (PyTorch, TensorFlow, TensorRT, etc.) should be in separate sub-folders of the /opt/holoscan/models/ folder or mapped into the HAP container at run-time using the volume-mount command-line options.

### Application Packaging Example

This example shows how to package a Holoscan Application into a HAP container using the Holoscan CLI. Note that building and running a HAP requires running docker commands, and this lab is likely already running inside a container. This prevents actually running the commands, but you can download the referenced files and build the application package on your own system. For this example, we will use the `ping` sample Python application.

*Visit the [SDK User Guide](https://docs.nvidia.com/holoscan/sdk-user-guide/) to learn more about the Holoscan Packager.*

#### Setup the Holoscan CLI

Refer to the documentation in the [user guide](https://docs.nvidia.com/holoscan/sdk-user-guide/holoscan_packager.html).

#### Define configurations

The packager will require a `--platform` and a `--platform-config`. We set them here as a prerequisite for this example. Refer to the user guide for other possible configurations.
```
$ export gpu_mode=dgpu
$ export platform=x64-workstation
```

Next, we set the Holoscan install directory, and the path to the application configuration file:
```
$ export holoscan_dir=/opt/nvidia/holoscan
$ export holoscan_app_config_path=/workspace/python/scripts/ping/ping.yaml
```

We then define the path to the application:
```
$ export holoscan_app_path=/workspace/python/scripts/ping/ping.py
```

#### Run the packager

This command will create a docker container that includes the application:

```
holoscan package -t ping-app \
  --platform $platform \
  --platform-config $gpu_mode \
  --config $holoscan_app_config_path \
  $holoscan_app_path
```

#### Run the containerized application

Given the configurations listed in the instructions above, that would be:

```
$ holoscan run ping-app-x64-workstation-dgpu-linux-amd64:<version-of-image>
```

## Explore More Examples with Holohub

The majority of this tutorial has been focused on processing video data, but as we saw in our very first Holoscan example and some of the numerical computing examples we built at the beginning, Holoscan aims to be sensor and domain agnostic.

[Holohub](https://github.com/nvidia-holoscan/holohub) is a community driven repository for hosting sample Holoscan applications and operators and includes many useful tools and examples, ranging from Network I/O to AI + Sensor Processing to ChatBots and Accelerated Computing. Some notable examples:

- [Basic Network Operator](https://github.com/nvidia-holoscan/holohub/tree/main/operators/basic_network) - Moves UDP data movement to GPU via Linux Sockets
- [Advanced Network Operator](https://github.com/nvidia-holoscan/holohub/tree/main/operators/advanced_network) - Moves UPD data to GPU via DPDK and GPUDirect RDMA, bypassing the CPU with additional flow control features (must have NIC + GPU)
- [Software Defined Radio](https://github.com/nvidia-holoscan/holohub/tree/main/applications/fm_asr) - [FM Demodulation](https://github.com/nvidia-holoscan/holohub/tree/main/applications/sdr_fm_demodulation) and real-time speech to text transcription
- RADAR Pipelines - [Pulse Descriptor Word ID](https://github.com/nvidia-holoscan/holohub/tree/main/applications/simple_pdw_pipeline) and [Simple RADAR Pipeline](https://github.com/nvidia-holoscan/holohub/tree/main/applications/simple_radar_pipeline)
- [Speech to Text LLM](https://github.com/nvidia-holoscan/holohub/tree/main/applications/speech_to_text_llm) - Transcribe audio and summarize output

And much, much more.

### Exercise 7


Take this opportunity to think about the types of applications you could port to Holoscan. Now think about the Operators you'd need to do that work. Please use the cell below to sketch out your vision. If you feel inclined, write some code or ask a TA about next steps! 

In [None]:
# To Do




## Future Work and Thanks!

Thank you so much for your attention and interest in Holoscan. We are excited for you to start building your own applications and contributing your work to Holohub!

## Licensing

Copyright © 2024 OpenACC-Standard.org. This material is released by OpenACC-Standard.org, in collaboration with NVIDIA Corporation, under the Creative Commons Attribution 4.0 International (CC BY 4.0). These materials may include references to hardware and software developed by other entities; all applicable licensing and copyrights apply.