# NVIDIA Holoscan: Fundamentals & Quick Start Tutorial

## What You'll Learn

This hands-on tutorial introduces NVIDIA Holoscan, a powerful SDK designed for building high-performance, GPU-accelerated streaming AI applications.By the end of this tutorial, you'll understand how to build efficient data processing pipelines that leverage GPU acceleration for real-time AI inference.

## Key Topics Covered

- The role of Holoscan in addressing CPU bottlenecks in AI pipelines
- Building custom operators for data processing
- Creating efficient data flows between operators
- Using pre-built Holoscan operators for common tasks
- Implementing a complete face detection application
- Optimizing video processing pipelines for real-time performance

## Prerequisites

- Basic understanding of Python
- Familiarity with GPU computing concepts
- NVIDIA GPU hardware (for running examples)
- NVIDIA Holoscan SDK installed

Let's get started!

# Mount Google Drive in Colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Copy files from your Drive to Colab /content/scripts/ while preserving structure

In [None]:
import os, shutil

# Define paths
base_drive = '/content/drive/My Drive/Colab Notebooks'

# Ensure folders exist in Colab
os.makedirs('/content/scripts/ping', exist_ok=True)
os.makedirs('/content/scripts/tao_peoplenet/data', exist_ok=True)
os.makedirs('/content/images', exist_ok=True)

# Copy scripts and video data
shutil.copy(f'{base_drive}/scripts/ping/ping.yaml', '/content/scripts/ping/ping.yaml')
shutil.copy(f'{base_drive}/scripts/tao_peoplenet/data/people.mp4', '/content/scripts/tao_peoplenet/data/people.mp4')

# Copy images from Google Drive into Colab
shutil.copy(f'{base_drive}/images/face_and_people_detection_app.png', '/content/images/face_and_people_detection_app.png')
shutil.copy(f'{base_drive}/images/MyPingApp.png', '/content/images/MyPingApp.png')

# Environment Initialization

In [None]:
# ⚙️ Environment Initialization for CUDA, PyCUDA & Modern NumPy

import os, ctypes

# 1) Upgrade to a modern NumPy that satisfies JAX/Torch/etc.
!pip install numpy==1.25.2 --quiet --force-reinstall

# 2) Monkey-patch the old alias PyCUDA and Holoscan expect
import numpy as np
np.bool8 = np.bool_

# 3) (Re-)install PyCUDA so it picks up our numpy patch
!pip install pycuda --quiet --force-reinstall

# 4) Point at your CUDA libs and force-load the driver
os.environ["LD_LIBRARY_PATH"] = (
    "/usr/local/cuda/lib64:"
    "/usr/local/cuda/compat:"
    "/usr/local/nvidia/lib:"
    "/usr/local/nvidia/lib64"
)
ctypes.CDLL("libcuda.so.1")  # should print nothing on success
print("✅ CUDA driver loaded, NumPy patched to", np.__version__)

# PyCUDA Verification

In [None]:
# 🔍 Test PyCUDA functionality & confirm monkey-patch

import numpy as np
print("NumPy version:", np.__version__, "| has bool8:", hasattr(np, "bool8"))

try:
    import pycuda.driver as cuda
    import pycuda.autoinit
    print("✅ PyCUDA loaded and initialized.")
    print("GPU Detected:", cuda.Device(0).name())
except Exception as e:
    print("⚠️ PyCUDA not available:", e)

## Table Styling for Left Alignment

**Execute this cell once to ensure proper table formatting throughout the notebook**

This CSS injection overrides Jupyter's default table centering behavior, ensuring all tables align naturally with surrounding text.

In [None]:
# Import necessary dependencies
from IPython.display import display, HTML

display(HTML("""
<style>
table, th, td {
    text-align: left !important;
    margin-left: 0 !important;
}
</style>
"""))

In [None]:
# Import necessary dependencies
from IPython.display import Markdown, Image

# Display markdown text and images
display(Markdown("""
# Background & Motivation: Understanding the Data Pipeline

## The Data Flow Challenge
In deep learning workflows, data typically follows this path:
```
Storage → CPU Memory → (Optional) GPU Memory → Processing → Inference
```

This seemingly straightforward pipeline often becomes a major bottleneck in AI/ML applications, limiting overall system performance.

## CPU Bottlenecks in AI/ML Processing Pipelines
1. **Data Loading**: Pre- and post-processing tasks account for roughly 90% of the workload.
2. **Processing Overhead**: The cost of these operations varies by task, with image preprocessing being particularly intensive.
3. **Library Impact**: Common Python libraries, such as Torch or Keras, can inadvertently introduce CPU bottlenecks.

*Conceptual Diagram: AI Imaging Pipeline in Cloud/Data Center Environments*
"""))

# First diagram - CPU bottlenecks
display(Image('/content/drive/My Drive/Colab Notebooks/CPU_Bottlenecks_in_AI_ML_Pipelines-All_CPU.jpg'))

display(Markdown("""
## GPU Acceleration using Accelerator Frameworks
"""))

# Second diagram - GPU acceleration
display(Image('/content/drive/My Drive/Colab Notebooks/CPU_Bottlenecks_in_AI_ML_Pipelines-All_GPU.jpg'))

display(Markdown("""
Moving all intermediate preprocessing stages from CPU to GPU delivers significant benefits:
- **>10x throughput improvement** by keeping the entire pipeline on the same GPU
- **Significant cloud cost savings** with accelerated software
- **Reduced memory requirements** by eliminating intermediate storage products

## NVIDIA Holoscan: A Comparative Advantage
Holoscan addresses common limitations in streaming AI workflows through several key improvements:

### Traditional Approaches vs. Holoscan
| Traditional Approaches          | NVIDIA Holoscan                   |
|:--------------------------------|:----------------------------------|
| CPU bottlenecks and high latency | Specialized streaming architecture |
| Independent pipeline components | End-to-end pipeline optimization  |
| Complex I/O integration         | Seamless GPU acceleration         |
| Manual hardware optimization    | Unified sensor-to-insight stack   |

### General AI Frameworks vs. Holoscan
| General AI Frameworks           | NVIDIA Holoscan                   |
|:--------------------------------|:----------------------------------|
| Optimized for batch processing  | Designed for streaming data       |
| Limited real-time guarantees    | Deterministic low latency         |
| Separate tools for deployment   | End-to-end solution               |
| Training-focused design         | Inference-optimized architecture  |

## Key Differentiators
NVIDIA Holoscan is uniquely positioned for applications where milliseconds matter:
- **Purpose-built for time-critical streaming sensor workflows**
- **Pre-optimized operators for common signal and image processing tasks**
- **Scale from the edge (embedd
ed Jetson) to the datacenter (DGX) seamlessly with the same code**
- **Domain-specific adaptability with 90+ reference applications**

These advantages make Holoscan particularly valuable for applications like surgical robotics, radio astronomy, autonomous systems, and real-time monitoring solutions.

In the following sections, we will explore how to leverage these advantages by building applications with Holoscan, starting with a basic understanding of the system architecture and then progressing to hands-on development.
"""))

<!-- # Background & Motivation: Understanding the Data Pipeline

## The Data Flow Challenge

In deep learning workflows, data typically follows this path:

```
Storage → CPU Memory → (Optional) GPU Memory → Processing → Inference
```

This seemingly straightforward pipeline often becomes a major bottleneck in AI/ML applications, limiting overall system performance.

## CPU Bottlenecks in AI/ML Processing Pipelines

1. **Data Loading**: Pre- and post-processing tasks account for roughly 90% of the workload.
2. **Processing Overhead**: The cost of these operations varies by task, with image preprocessing being particularly intensive.
3. **Library Impact**: Common Python libraries, such as Torch or Keras, can inadvertently introduce CPU bottlenecks.

*Conceptual Diagram: AI Imaging Pipeline in Cloud/Data Center Environments*

![CPU Bottlenecks](CPU_Bottlenecks_in_AI_ML_Pipelines-All_CPU.jpg)

## GPU Acceleration using Accelerator Frameworks

![GPU Acceleration](CPU_Bottlenecks_in_AI_ML_Pipelines-All_GPU.jpg)

Moving all intermediate preprocessing stages from CPU to GPU delivers significant benefits:

- **>10x throughput improvement** by keeping the entire pipeline on the same GPU
- **Significant cloud cost savings** with accelerated software
- **Reduced memory requirements** by eliminating intermediate storage products

## NVIDIA Holoscan: A Comparative Advantage

Holoscan addresses common limitations in streaming AI workflows through several key improvements:

### Traditional Approaches vs. Holoscan

| Traditional Approaches          | NVIDIA Holoscan                   |
|:--------------------------------|:----------------------------------|
| CPU bottlenecks and high latency | Specialized streaming architecture |
| Independent pipeline components | End-to-end pipeline optimization  |
| Complex I/O integration         | Seamless GPU acceleration         |
| Manual hardware optimization    | Unified sensor-to-insight stack   |

### General AI Frameworks vs. Holoscan

| General AI Frameworks           | NVIDIA Holoscan                   |
|:--------------------------------|:----------------------------------|
| Optimized for batch processing  | Designed for streaming data       |
| Limited real-time guarantees   | Deterministic low latency         |
| Separate tools for deployment  | End-to-end solution               |
| Training-focused design        | Inference-optimized architecture  |

## Key Differentiators

NVIDIA Holoscan is uniquely positioned for applications where milliseconds matter:

- **Purpose-built for time-critical streaming sensor workflows**
- **Pre-optimized operators for common signal and image processing tasks**
- **Scale from the edge (embedded Jetson) to the datacenter (DGX) seamlessly with the same code**
- **Domain-specific adaptability with 90+ reference applications**

These advantages make Holoscan particularly valuable for applications like surgical robotics, radio astronomy, autonomous systems, and real-time monitoring solutions.

In the following sections, we will explore how to leverage these advantages by building applications with Holoscan, starting with a basic understanding of the system architecture and then progressing to hands-on development. -->

## Installing Required Google Colab Python and System Packages

Before we start building Holoscan applications, let's set up our environment and install the necessary dependencies so that it will run successfully in Google Colab.

We will begin by installing all of the required packages to that this notebook will run succesfully in Google Colab:

In [None]:
# 🛠️ ENVIRONMENT SETUP FOR NVIDIA HOLOSCAN IN GOOGLE COLAB

# Step 1: Write out a minimal requirements file ===
%%writefile cleaned_requirements.txt
anyio==4.9.0
appdirs==1.4.4
argon2-cffi==23.1.0
argon2-cffi-bindings==21.2.0
arrow==1.3.0
asttokens==3.0.0
async-lru==2.0.5
attrs==25.3.0
babel==2.17.0
beautifulsoup4==4.13.4
bleach==6.2.0
certifi==2024.7.4
cffi==1.17.1
charset-normalizer==3.3.2
click==8.1.7
cloudpickle==2.2.1
comm==0.2.2
cupy-cuda12x==12.0.0
debugpy==1.8.14
decorator==5.2.1
defusedxml==0.7.1
exceptiongroup==1.2.2
executing==2.2.0
fastjsonschema==2.21.1
fastrlock==0.8.2
fqdn==1.5.1
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.7
ipython==8.36.0
isoduration==20.11.0
jedi==0.19.2
Jinja2==3.1.3
json5==0.12.0
jsonpointer==3.0.0
jsonschema==4.23.0
jsonschema-specifications==2025.4.1
Mako==1.2.4
markdown-it-py==3.0.0
MarkupSafe==2.1.3
matplotlib-inline==0.1.7
mdurl==0.1.2
mistune==3.1.3
nbclient==0.10.2
nbconvert==7.16.6
nbformat==5.10.4
nest-asyncio==1.6.0
overrides==7.7.0
packaging==23.1
pandocfilters==1.5.1
parso==0.8.4
pexpect==4.9.0
platformdirs==3.10.0
polygraphy==0.49.0
prometheus_client==0.21.1
prompt_toolkit==3.0.51
protobuf==4.23.4
psutil==5.9.6
ptyprocess==0.7.0
pure_eval==0.2.3
pycparser==2.22
pydantic==1.10.17
Pygments==2.18.0
python-dateutil==2.9.0.post0
python-json-logger==3.3.0
pytools==2023.1.1
PyYAML==6.0
pyzmq==26.4.0
referencing==0.36.2
requests==2.31.0
rfc3339-validator==0.1.4
rfc3986-validator==0.1.1
rich==13.7.1
rpds-py==0.24.0
Send2Trash==1.8.3
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
soupsieve==2.7
stack-data==0.6.3
terminado==0.18.1
tinycss2==1.4.0
tomli==2.2.1
tqdm==4.66.4
typer==0.12.3
types-python-dateutil==2.9.0.20241206
typing_extensions==4.7.1
uri-template==1.3.0
urllib3==2.2.2
wcwidth==0.2.13
webcolors==24.11.1
webencodings==0.5.1
websocket-client==1.8.0
wheel-axle-runtime==0.0.6

In [None]:
# Step 2: Install system dependencies
!apt-get update -qq
!apt-get install --no-install-recommends -y ffmpeg libegl1

In [None]:
# Step 3: Install from requirements and additional useful libraries
!pip install -q --no-deps -r cleaned_requirements.txt opencv-python matplotlib

In [None]:
# Step 4: NVIDIA SDK support
!pip install --no-deps nvidia-pyindex
!pip install --no-deps tensorrt
!pip install holoscan

In [None]:
# ✅ IMPORT VALIDATION
print("\n🔍 Validating critical imports...\n")

# Standard libs
try:
    import numpy as np
    import matplotlib.pyplot as plt
    import cv2
    print("✅ numpy, matplotlib, OpenCV")
except Exception as e:
    print("❌ Standard libs failed:", e)

# Holoscan
try:
    import holoscan
    print("✅ Holoscan SDK")
except Exception as e:
    print("❌ Holoscan failed:", e)

# TensorRT
try:
    import tensorrt as trt
    print("✅ TensorRT")
except Exception as e:
    print("⚠️ TensorRT failed:", e)

# PyCUDA
try:
    import pycuda.driver as cuda
    import pycuda.autoinit
    print("✅ PyCUDA")
except Exception as e:
    print("⚠️ PyCUDA failed:", e)

## Importing Holoscan Core Classes

Now, let's import the necessary classes from the Holoscan SDK:

In [None]:
import os
import cv2
import glob
import time
import logging
import threading
import subprocess
import numpy as np
from holoscan.core import Operator, OperatorSpec

# Set up logging configuration early in your script
def configure_logging():
    # Configure root logger
    logging.getLogger().setLevel(logging.WARNING)  # Set overall level to WARNING to suppress INFO messages

    # You can also specifically target the video_stream_replayer module if needed
    logging.getLogger('video_stream_replayer').setLevel(logging.WARNING)

## Understanding Holoscan's Architecture

Holoscan is built around a few core concepts:

1. **Operators**: The basic processing units that perform specific functions (e.g., data loading, inference, visualization)
2. **Data Flow**: Defines how data moves between operators
3. **Application**: Orchestrates the operators and data flow
4. **Scheduler**: Manages the execution of operators

The `ValueData` class is one of the fundamental building blocks for passing data between operators:

In [None]:
class ValueData:
    """Example of a custom Python class"""

    def __init__(self, value):
        self.data = value

    def __repr__(self):
        return f"ValueData({self.data})"

    def __eq__(self, other):
        return self.data == other.data

    def __hash__(self):
        return hash(self.data)

This class serves several important functions in a Holoscan application:

- **Data Encapsulation**: Provides a structured way to encapsulate data values
- **Inter-Operator Communication**: Enables communication between different operators
- **Type Safety**: Ensures consistent data types throughout the processing pipeline
- **Memory Management**: Facilitates proper memory handling between operators

With our environment set up, we're ready to build our first Holoscan application in the next section.

# Our First Holoscan Application

For our first Holoscan application, we will create a simple data processing pipeline that demonstrates the core concepts of operators and data flow.

In [None]:
# Import display dependencies
from IPython.display import Markdown, Image, display

# Display the application overview text
display(Markdown("""
# Application Overview

We'll build a pipeline with three operators:

1. **PingTxOp** – a source operator that generates integers and emits them via two output ports
2. **PingMiddleOp** – a processing operator that receives the integers, multiplies them by a configurable value, and emits the results
3. **PingRxOp** – a sink operator that receives and displays the processed values

Below is a diagram of our application pipeline:
"""))

# Display the application diagram image
display(Image(filename='/content/images/MyPingApp.png', width=1000))

## Creating Custom Operators

Let's implement each of these operators step by step.

### 1. Source Operator: PingTxOp

This operator generates sequential integer values encapsulated in ValueData objects and emits them through two output ports, "out1" and "out2".

In [None]:
class PingTxOp(Operator):
    """Simple transmitter operator.
    This operator has:
        outputs: "out1", "out2"
    On each tick, it transmits a `ValueData` object at each port. The
    transmitted values are even on port1 and odd on port2 and increment with
    each call to compute.
    """

    def __init__(self, *args, **kwargs):
        self.index = 0
        # Need to call the base class constructor last
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.output("out1")
        spec.output("out2")

    def compute(self, op_input, op_output, context):
        value1 = ValueData(self.index)
        self.index += 1
        op_output.emit(value1, "out1")

        value2 = ValueData(self.index)
        self.index += 1
        op_output.emit(value2, "out2")

### 2. Processing Operator: PingMiddleOp

This operator receives integer values through two input ports, multiplies them by a configurable parameter, and emits the processed results through two output ports.

In [None]:
class PingMiddleOp(Operator):
    """Example of an operator modifying data.
    This operator has:
        inputs:  "in1", "in2"
        outputs: "out1", "out2"
    The data from each input is multiplied by a user-defined value.
    In this demo, the `multiplier` parameter value is read from a "ping.yaml"
    configuration file (near the bottom of this script), overriding the default
    defined in the setup() method below.
    """

    def __init__(self, *args, **kwargs):
        # If `self.multiplier` is set here (e.g., `self.multiplier = 4`), then
        # the default value by `param()` in `setup()` will be ignored.
        # (you can just call `spec.param("multiplier")` in `setup()` to use the
        # default value)

        # self.multiplier = 4
        self.count = 1

        # Need to call the base class constructor last
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.input("in1")
        spec.input("in2")
        spec.output("out1")
        spec.output("out2")
        spec.param("multiplier", 2)

    def compute(self, op_input, op_output, context):
        value1 = op_input.receive("in1")
        value2 = op_input.receive("in2")
        print(f"Middle message received (count: {self.count})")
        self.count += 1

        print(f"Middle message value1: {value1.data}")
        print(f"Middle message value2: {value2.data}")

        # Multiply the values by the multiplier parameter
        value1.data *= self.multiplier
        value2.data *= self.multiplier

        op_output.emit(value1, "out1")
        op_output.emit(value2, "out2")

### 3. Sink Operator: PingRxOp

This sink operator receives multiple input values through its special "receivers" port that can accept multiple connections, collects them into a single array, and simply prints the received values without emitting any outputs.

In [None]:
class PingRxOp(Operator):
    """Simple receiver operator.
    This operator has:
        input: "receivers"
    This is an example of a native operator that can dynamically have any
    number of inputs connected to is "receivers" port.
    """

    def __init__(self, *args, **kwargs):
        self.count = 1
        # Need to call the base class constructor last
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.param("receivers", kind="receivers")

    def compute(self, op_input, op_output, context):
        values = op_input.receive("receivers")
        print(f"Rx message received (count: {self.count}, size: {len(values)})")
        self.count += 1
        print(f"Rx message value1: {values[0].data}")
        print(f"Rx message value2: {values[1].data}")

## Anatomy of a Holoscan Operator

Each Holoscan operator follows a consistent structure:

1. **`__init__` Method**: Initializes the operator and its parameters
2. **`setup` Method**: Defines the operator's input and output ports
3. **`compute` Method**: Implements the operator's processing logic

The `compute` method is where the actual data processing happens:
- `op_input.receive(port_name)` receives data from an input port
- `op_output.emit(data, port_name)` sends data to an output port

## Connecting Operators to Form an Application

Now that we have our operators, let's connect them to form a complete application:

In [None]:
from holoscan.conditions import CountCondition
from holoscan.core import Application

class MyPingApp(Application):
    def compose(self):
        # Configure the operators. Here we use CountCondition to terminate
        # execution after a specific number of messages have been sent.
        tx = PingTxOp(self, CountCondition(self, 10), name="tx")
        mx = PingMiddleOp(self, self.from_config("mx"), name="mx")
        rx = PingRxOp(self, name="rx")

        # Connect the operators into the workflow:  tx -> mx -> rx
        self.add_flow(tx, mx, {("out1", "in1"), ("out2", "in2")})
        self.add_flow(mx, rx, {("out1", "receivers"), ("out2", "receivers")})

In the `compose` method, we:
1. Create instances of our three operators
2. Configure the source operator to run 10 times using `CountCondition`
3. Connect the operators using `add_flow` to define the data flow between them

## Running the Application

To run the application, we need to create an instance of our application class, configure it with a YAML file if needed, and call its run method, which will execute our ping application and process 10 pairs of values through the pipeline.

In [None]:
if __name__ == "__main__":
    app = MyPingApp()
    app.config("./scripts/ping/ping.yaml") # Optional configuration file
    app.run()

## Understanding Key Concepts

Let's review some key concepts we've covered:

1. **Operators**: The basic building blocks of a Holoscan application
2. **Ports**: Define how data flows between operators
3. **Data Flow**: Specifies the connections between operators
4. **Application**: Orchestrates the operators and data flow
5. **Conditions**: Control when and how often operators execute

In the next section, we'll explore more advanced operators and build a more complex application for real-time video processing for detecting faces.

# Optional Exercises: Building a Custom Operator

Now that you've seen how to create basic Holoscan operators, let's practice by building a custom operator with specific requirements.

## Optional Exercise 1: Create a Multi-Port Operator

In this exercise, you will build a custom operator named `MyOp` with the following specifications:

- 3 input ports: `in1`, `in2`, and `in3`
- 2 output ports: `out1` and `out2`
- The operator should always emit the data received on `in1` with the `out1` port
- It should emit `in2` with `out2` if the operator tick (using `self.index`) is odd, and `in3` with `out2` if the tick is even

Here's the template code with the parts you need to fix: -->

Try implementing this operator on your own before checking the solution that is shown below.

In [None]:
class <<<FIX ME>>>(<<<FIX ME>>>):

    def __init__(self, *args, **kwargs):
        self.index = 0
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.input(<<<FIX ME>>>)
        spec.input(<<<FIX ME>>>)
        spec.input(<<<FIX ME>>>)
        spec.output(<<<FIX ME>>>)
        spec.output(<<<FIX ME>>>)

    def compute(self, op_input, op_output, context):

        # Always emit in1 value in out1 port
        in1_value = op_input.receive(<<<FIX ME>>>)
        op_output.emit(in1_value, <<<FIX ME>>>)

        # each input needs to be received, regardless of utilization
        # Even if in2 and in3 won't be utilized at the same time,
        # they still both need to be be received
        in2_value = op_input.receive(<<<FIX ME>>>)
        in3_value = op_input.receive(<<<FIX ME>>>)

        # If tick is even
        if <<<FIX ME>>>:
            op_output.emit(in2_value, "out2")
        else:
            op_output.emit(in3_value, "out2")

        self.index += 1

## Solution for Optional Exercise #1

Here is the completed solution:

In [None]:
class MyOp(Operator):

    def __init__(self, *args, **kwargs):
        self.index = 0
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.input("in1")
        spec.input("in2")
        spec.input("in3")
        spec.output("out1")
        spec.output("out2")

    def compute(self, op_input, op_output, context):

        # Always emit in1 value in out1 port
        in1_value = op_input.receive("in1")
        op_output.emit(in1_value, "out1")

        # each input needs to be received, regardless of utilization
        # Even if in2 and in3 won't be utilized at the same time,
        # they still both need to be be received
        in2_value = op_input.receive("in2")
        in3_value = op_input.receive("in3")

        # If tick is even
        if self.index % 2 == 0:
            op_output.emit(in2_value, "out2")
        else:
            op_output.emit(in3_value, "out2")

        self.index += 1

## Optional Exercise 2: Create a Power-of-Two Operator

For our second exercise, create a new operator called `Pow2on2` that squares the incoming integers if the clock tick is even. Otherwise, pass through the data unchanged.

Here is a starting point:

In [None]:
class Pow2on2(Operator):
    """Operator that squares incoming values on even ticks.
    This operator has:
        inputs:  "in1", "in2"
        outputs: "out1", "out2"
    On even ticks, the input values are squared.
    On odd ticks, the input values are passed through unchanged.
    """

    def __init__(self, *args, **kwargs):
        # Your initialization code here
        pass

    def setup(self, spec: OperatorSpec):
        # Define inputs and outputs
        pass

    def compute(self, op_input, op_output, context):
        # Implement the compute logic
        pass

print("Try implementing this operator and integrating it into the `MyPingApp` application by replacing `PingMiddleOp` with your new `Pow2on2` operator.")

## Solution for Optional Exercise #2

Here is the completed solution:

In [None]:
import os
from holoscan.conditions import CountCondition
from holoscan.core import Application, Operator, OperatorSpec

# define a custom class to represent data used in the app

class ValueData:
    """Example of a custom Python class"""

    def __init__(self, value):
        self.data = value

    def __repr__(self):
        return f"ValueData({self.data})"

    def __eq__(self, other):
        return self.data == other.data

    def __hash__(self):
        return hash(self.data)

# define custom Operators for use in the demo

class PingTxOp(Operator):
    """Simple transmitter operator.
    This operator has:
        outputs: "out1", "out2"
    On each tick, it transmits a `ValueData` object at each port. The
    transmitted values are even on port1 and odd on port2 and increment with
    each call to compute.
    """

    def __init__(self, *args, **kwargs):
        self.index = 0
        # Need to call the base class constructor last
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.output("out1")
        spec.output("out2")

    def compute(self, op_input, op_output, context):
        value1 = ValueData(self.index)
        self.index += 1
        op_output.emit(value1, "out1")

        value2 = ValueData(self.index)
        self.index += 1
        op_output.emit(value2, "out2")

class PingMiddleOp(Operator):
    """Example of an operator modifying data.
    This operator has:
        inputs:  "in1", "in2"
        outputs: "out1", "out2"
    The data from each input is multiplied by a user-defined value.
    In this demo, the `multiplier` parameter value is read from a "ping.yaml"
    configuration file (near the bottom of this script), overriding the default
    defined in the setup() method below.
    """

    def __init__(self, *args, **kwargs):
        # If `self.multiplier` is set here (e.g., `self.multiplier = 4`), then
        # the default value by `param()` in `setup()` will be ignored.
        # (you can just call `spec.param("multiplier")` in `setup()` to use the
        # default value)
        #
        # self.multiplier = 4
        self.count = 1

        # Need to call the base class constructor last
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.input("in1")
        spec.input("in2")
        spec.output("out1")
        spec.output("out2")
        spec.param("multiplier", 2)

    def compute(self, op_input, op_output, context):
        value1 = op_input.receive("in1")
        value2 = op_input.receive("in2")
        print(f"Middle message received (count: {self.count})")
        self.count += 1

        print(f"Middle message value1: {value1.data}")
        print(f"Middle message value2: {value2.data}")

        # Multiply the values by the multiplier parameter
        value1.data *= self.multiplier
        value2.data *= self.multiplier

        op_output.emit(value1, "out1")
        op_output.emit(value2, "out2")

class Pow2On2Op(Operator):

    def __init__(self, *args, **kwargs):
        # If `self.multiplier` is set here (e.g., `self.multiplier = 4`), then
        # the default value by `param()` in `setup()` will be ignored.
        # (you can just call `spec.param("multiplier")` in `setup()` to use the
        # default value)
        #
        # self.multiplier = 4
        self.count = 0

        # Need to call the base class constructor last
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.input("in1")
        spec.input("in2")
        spec.output("out1")
        spec.output("out2")
        spec.param("multiplier", 2)

    def compute(self, op_input, op_output, context):
        value1 = op_input.receive("in1")
        value2 = op_input.receive("in2")

        print(f"Middle message received (count: {self.count})")
        print(f"Middle message value1: {value1.data}")
        print(f"Middle message value2: {value2.data}")

        # If count is even
        if self.count % 2 == 0:
            value1.data *= value1.data
            value2.data *= value2.data

        # Increment count
        self.count += 1

        op_output.emit(value1, "out1")
        op_output.emit(value2, "out2")

class PingRxOp(Operator):
    """Simple receiver operator.
    This operator has:
        input: "receivers"
    This is an example of a native operator that can dynamically have any
    number of inputs connected to is "receivers" port.
    """

    def __init__(self, *args, **kwargs):
        # Need to call the base class constructor last
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.param("receivers", kind="receivers")

    def compute(self, op_input, op_output, context):
        values = op_input.receive("receivers")
        print(f"Rx message value1: {values[0].data}")
        print(f"Rx message value2: {values[1].data}")

class MyPingPowApp(Application):
    def compose(self):
        # Configure the operators. Here we use CountCondition to terminate
        # execution after a specific number of messages have been sent.
        tx = PingTxOp(self, CountCondition(self, 10), name="tx")
        mx = Pow2On2Op(self, name="mx")
        rx = PingRxOp(self, name="rx")

        # Connect the operators into the workflow:  tx -> mx -> rx
        self.add_flow(tx, mx, {("out1", "in1"), ("out2", "in2")})
        self.add_flow(mx, rx, {("out1", "receivers"), ("out2", "receivers")})

if __name__ == "__main__":
    app = MyPingPowApp()
    # no config file
    app.config("")
    app.run()

## Key Learning Points

These exercises help you practice important Holoscan concepts:

1. **Port Configuration**: Setting up input and output ports
2. **Conditional Processing**: Implementing different logic based on conditions
3. **Operator State**: Maintaining state across multiple compute calls
4. **Data Flow**: Understanding how data moves between operators

In the next section, we'll explore pre-built Holoscan operators and how to use them in more complex applications.

# Using Pre-Built Holoscan Operators

While creating custom operators is powerful, Holoscan also provides a rich set of pre-built operators for common tasks. These operators are optimized for performance and can significantly reduce development time.

## Core Holoscan Operators

The Holoscan SDK includes several pre-built operators for common tasks:

| Operator | Description |
|----------|-------------|
| **VideoStreamReplayerOp** | Outputs video frames as Holoscan Tensor objects |
| **FormatConverterOp** | Converts data formats and performs operations like resizing |
| **InferenceOp** | Executes AI inference on one or multiple models |
| **HolovizOp** | High-speed viewer for visualization of images and overlay data |
| **AJA Source** | Enables GPU-Direct RDMA with AJA capture cards |

These operators are highly optimized and designed to work together in GPU-accelerated pipelines.

## Benefits of Pre-Built Operators

Using pre-built operators offers several advantages:

1. **Performance**: Optimized for maximum throughput and minimum latency
2. **Reliability**: Thoroughly tested and production-ready
3. **Interoperability**: Designed to work seamlessly with other Holoscan components
4. **Configurability**: Easily customizable through YAML configuration files
5. **GPU Acceleration**: Built to maximize GPU utilization

## Configuring Operators with YAML

Most pre-built operators can be configured using YAML files, which provide a clean separation between code and configuration:

```yaml
# Example operator configuration in YAML
replayer_source:
  directory: ./data
  batch_size: 1
  loop: false
  realtime: false

preprocessor:
  resize_width: 960
  resize_height: 544
  scale_min: 0
  scale_max: 255

inference:
  backend: "trt"
  model_path_map:
    face_detect: /path/to/model.onnx
```

This approach makes it easy to adjust parameters without changing code, and allows for different configurations in different environments.

## Example: Using Format Converter

Here's a simple example of using the `FormatConverterOp` in a pipeline (**DON'T RUN - Cell is for illustration purposes only**):

In [None]:
from holoscan.operators import FormatConverterOp

# In your Application's compose method:
format_converter = FormatConverterOp(
    self,
    name="preprocessor",
    pool=pool,
    resize_width=960,
    resize_height=544,
    scale_min=0,
    scale_max=255
)

# Connect it to other operators
self.add_flow(source, format_converter)
self.add_flow(format_converter, next_op)

The `FormatConverterOp` takes input images, resizes them to the specified dimensions, and scales pixel values to the specified range.

## Example: Using Inference Operator

The `InferenceOp` is used to run neural network inference (**DON'T RUN - Cell is for illustration purposes only**):

In [None]:
from holoscan.operators import InferenceOp

# In your Application's compose method:
inference = InferenceOp(
    self,
    name="inference",
    allocator=pool,
    backend="trt",  # TensorRT backend
    model_path_map={
        "face_detect": os.path.join(self.data_path, "model.onnx")
    }
)

# Connect it to other operators
self.add_flow(preprocessor, inference, {("", "receivers")})
self.add_flow(inference, postprocessor, {("transmitter", "in")})

The `InferenceOp` takes preprocessed input data, runs inference using the specified model, and outputs the results to the next operator in the pipeline.

In the next section, we'll build a complete application using these pre-built operators to perform real-time face detection on a video stream.

# TAO PeopleNet Detection: Building a Complete Application

In this section, we'll build a complete Holoscan application for face detection using NVIDIA's PeopleNet model from the Transfer Learning Toolkit (TLT) and Training, Adaptation, and Optimization (TAO) platform.

In [None]:
from IPython.display import Markdown, Image, display

# Display descriptive markdown
display(Markdown("""
# Application Overview

Our application will:

1. Load video frames from a source
2. Preprocess the frames for inference
3. Detect faces using the PeopleNet model
4. Postprocess the detection results
5. Visualize the results with bounding boxes
6. Save the processed video

Below is a diagram of our pipeline:
"""))

# Show the face and people detection pipeline image
display(Image(filename='/content/images/face_and_people_detection_app.png', width=1200))

## Required Imports

We will start by importing the necessary modules:

In [None]:
# Step 1: Downgrade NumPy (critical for compatibility)
!pip install -q "numpy==1.23.5" --force-reinstall

# Step 2: Check if NumPy downgrade worked
import numpy as np
print(f"NumPy version after downgrade: {np.__version__}")

# Step 3: Set up a lightweight CUDA bridge directory structure
import os
import sys

# Create a directory for our custom CUDA bridge
!mkdir -p /tmp/cuda_bridge/lib64

# Find existing libcuda.so.1 files
!find / -name "libcuda.so.1" 2>/dev/null

# Create symbolic links to any existing CUDA libraries
!ln -sf /usr/local/cuda-12.5/compat/libcuda.so.1 /tmp/cuda_bridge/lib64/libcuda.so.1

# Step 4: Update environment variables to use our bridge
os.environ["LD_LIBRARY_PATH"] = "/tmp/cuda_bridge/lib64:" + os.environ.get("LD_LIBRARY_PATH", "")
os.environ["CUDA_PATH"] = "/usr/local/cuda-12.5"

# Step 5: Create a minimal CuPy mock
# This approach creates a lightweight CuPy substitute that doesn't need GPU drivers
if "cupy" not in sys.modules:
    print("Creating CuPy mock...")

    # Create a mock CuPy module
    from types import ModuleType
    cupy_mock = ModuleType("cupy")
    sys.modules["cupy"] = cupy_mock

    # Setup basic NumPy compatibility
    cupy_mock.ndarray = np.ndarray
    cupy_mock.array = lambda arr, dtype=None: np.array(arr, dtype=dtype)
    cupy_mock.asarray = lambda arr, dtype=None: np.asarray(arr, dtype=dtype)
    cupy_mock.zeros = lambda *args, **kwargs: np.zeros(*args, **kwargs)
    cupy_mock.ones = lambda *args, **kwargs: np.ones(*args, **kwargs)

    # Add math operations
    cupy_mock.add = lambda x, y: np.add(x, y)
    cupy_mock.subtract = lambda x, y: np.subtract(x, y)
    cupy_mock.multiply = lambda x, y: np.multiply(x, y)
    cupy_mock.divide = lambda x, y: np.divide(x, y)

    # Create CUDA mock submodule
    cuda_mock = ModuleType("cuda")
    cupy_mock.cuda = cuda_mock
    cuda_mock.is_available = lambda: False
    cuda_mock.runtime = ModuleType("runtime")

    # Create device mock
    cuda_mock.Device = type("Device", (), {
        "__call__": lambda self, device_id: type("DeviceObject", (), {
            "name": lambda: "CPU (Emulated)",
            "id": lambda: device_id,
            "attributes": {}
        })(),
        "count": lambda: 1
    })()

    # Add array transfer functions
    cupy_mock.get = lambda arr: arr
    cupy_mock.to_gpu = lambda arr: arr

    # Add common attributes
    setattr(cupy_mock, "__version__", "12.2.0-mock")
    setattr(cupy_mock, "float32", np.float32)
    setattr(cupy_mock, "float64", np.float64)
    setattr(cupy_mock, "float_", np.float64)

    print("CuPy mock created successfully")
else:
    print("CuPy already exists in sys.modules")

# Step 6: Install Holoscan again (with our mock cupy already in place)
!pip install -q holoscan

# Step 7: Test if our approach works
try:
    import cupy as cp
    import holoscan

    print(f"CuPy version: {cp.__version__}")
    print(f"Holoscan version: {holoscan.__version__}")

    # Try creating arrays with our mock
    a = cp.array([1, 2, 3])
    b = cp.array([4, 5, 6])
    c = cp.add(a, b)
    print(f"Array test: {a} + {b} = {c}")

    print("CuPy mock is working correctly with NumPy backend")
except Exception as e:
    print(f"Error testing CuPy mock: {e}")
    import traceback
    traceback.print_exc()

In [None]:
# Search for NVIDIA libraries
print("Searching for NVIDIA driver libraries...\n")

# Look for libnvidia-ml.so specifically
print("=== Looking for libnvidia-ml.so ===")
!find / -name "libnvidia-ml.so*" 2>/dev/null

# Look for other key NVIDIA libraries
print("\n=== Looking for other NVIDIA libraries ===")
!find / -name "libcuda.so*" 2>/dev/null
!find / -name "libnvidia-*.so*" 2>/dev/null | head -20

# Check CUDA runtime location
print("\n=== CUDA Runtime Location ===")
!find /usr/local/cuda* -name "libcudart.so*" 2>/dev/null

# Check if we can access NVIDIA driver version
print("\n=== Trying to access driver version ===")
!cat /proc/driver/nvidia/version 2>/dev/null || echo "No NVIDIA driver version file found"

# Check loaded kernel modules
print("\n=== Checking for NVIDIA kernel modules ===")
!lsmod | grep -i nvidia

# Check if CUDA driver is loaded in LD_LIBRARY_PATH
print("\n=== Checking LD_LIBRARY_PATH ===")
import os
print(os.environ.get("LD_LIBRARY_PATH", "Not set"))

# Check if we can create a minimal CUDA context
print("\n=== Testing minimal CUDA context creation ===")
!python -c "import ctypes; cuda = ctypes.CDLL('libcuda.so.1', mode=ctypes.RTLD_GLOBAL); print('CUDA library loaded via ctypes')" 2>&1 || echo "Failed to load CUDA library"

In [None]:
# import cupy as cp
# import holoscan as hs
# import numpy as np
# import os
# import cv2
# import glob
# import time
# import logging
# import threading
# import subprocess
# from holoscan.core import Application, Operator, OperatorSpec
# from holoscan.gxf import Entity
# from holoscan.operators import (
#     VideoStreamReplayerOp,
#     FormatConverterOp,
#     InferenceOp,
#     HolovizOp,
# )
# from holoscan.resources import UnboundedAllocator
# from holoscan.schedulers import GreedyScheduler

## Custom Operators for the Pipeline

We will create two custom operators to process the model's input and output.

### Preprocessor Operator

This operator prepares the input for inference:

In [None]:
class PreprocessorOp(Operator):
    """Operator to format input image for inference"""
    def setup(self, spec: OperatorSpec):
        spec.input("in")
        spec.output("out")

    def compute(self, op_input, op_output, context):
        # Get input message
        in_message = op_input.receive("in")

        # Transpose
        tensor = cp.asarray(in_message.get("preprocessed")).get()
        # OBS: Numpy conversion and moveaxis is needed to avoid strange
        # strides issue when doing inference
        tensor = np.moveaxis(tensor, 2, 0)[None]
        tensor = cp.asarray(tensor)

        # Create output message
        out_message = {"preprocessed": tensor}
        op_output.emit(out_message, "out")

### Postprocessor Operator

This operator processes the model's output to extract face detections:

In [None]:
class PostprocessorOp(Operator):
    """Operator to post-process inference output:
    * Reparameterize bounding boxes
    * Non-max suppression
    * Make boxes compatible with Holoviz
    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def setup(self, spec: OperatorSpec):
        spec.input("in")
        spec.output("out")
        spec.param("iou_threshold", 0.15)
        spec.param("score_threshold", 0.3)  # Lower threshold to catch more potential faces
        spec.param("image_width", None)
        spec.param("image_height", None)
        spec.param("box_scale", None)
        spec.param("box_offset", None)
        spec.param("grid_height", None)
        spec.param("grid_width", None)

    def compute(self, op_input, op_output, context):
        # Get input message
        in_message = op_input.receive("in")

        # Convert input to cupy array
        boxes = cp.asarray(in_message.get("boxes"))[0, ...]
        scores = cp.asarray(in_message.get("scores"))[0, ...]

        # PeopleNet has three classes:
        # 0. Person
        # 1. Bag
        # 2. Face
        # Here we only keep the Face class (index 2)
        face_boxes = boxes[[8, 9, 10, 11], ...][None]  # Indices 8-11 correspond to face boxes
        face_scores = scores[[2], ...][None]  # Index 2 corresponds to face scores

        # Create output dictionary with just faces
        out = {"faces": None}

        # Reparameterize face boxes
        out["faces"], scores_nms = self.reparameterize_boxes(
            face_boxes,
            face_scores
        )

        # Non-max suppression
        out["faces"], _ = self.nms(out["faces"], scores_nms)

        # Reshape for HoloViz
        if len(out["faces"]) == 0:
            out["faces"] = np.zeros([1, 2, 2]).astype(np.float32)
        else:
            out["faces"][:, [0, 2]] /= self.image_width
            out["faces"][:, [1, 3]] /= self.image_height
            out["faces"] = cp.reshape(out["faces"][None], (1, -1, 2))

        # Create output message
        op_output.emit(out, "out")

    def nms(self, boxes, scores):
        """Non-max suppression (NMS)

        Parameters
        ----------
        boxes : array (4, n)
        scores : array (n,)

        Returns
        ----------
        boxes : array (m, 4)
        scores : array (m,)

        """
        if len(boxes) == 0:
            return cp.asarray([]), cp.asarray([])

        # Get coordinates
        x0, y0, x1, y1 = boxes[0, :], boxes[1, :], boxes[2, :], boxes[3, :]

        # Area of bounding boxes
        area = (x1 - x0 + 1) * (y1 - y0 + 1)

        # Get indices of sorted scores
        indices = cp.argsort(scores)

        # Output boxes and scores
        boxes_out, scores_out = [], []

        # Iterate over bounding boxes
        while len(indices) > 0:
            # Get index with highest score from remaining indices
            index = indices[-1]

            # Pick bounding box with highest score
            boxes_out.append(boxes[:, index])
            scores_out.append(scores[index])

            # Get coordinates
            x00 = cp.maximum(x0[index], x0[indices[:-1]])
            x11 = cp.minimum(x1[index], x1[indices[:-1]])
            y00 = cp.maximum(y0[index], y0[indices[:-1]])
            y11 = cp.minimum(y1[index], y1[indices[:-1]])

            # Compute IOU
            width = cp.maximum(0, x11 - x00 + 1)
            height = cp.maximum(0, y11 - y00 + 1)
            overlap = width * height
            union = area[index] + area[indices[:-1]] - overlap
            iou = overlap / union

            # Threshold and prune
            left = cp.where(iou < self.iou_threshold)
            indices = indices[left]

        # To array
        boxes = cp.asarray(boxes_out)
        scores = cp.asarray(scores_out)

        return boxes, scores

    def reparameterize_boxes(self, boxes, scores):
        """Reparameterize boxes from corner+width+height to corner+corner.

        Parameters
        ----------
        boxes : array (1, 4, grid_height, grid_width)
        scores : array (1, 1, grid_height, grid_width)

        Returns
        ----------
        boxes : array (4, n)
        scores : array (n,)

        """
        cell_height = self.image_height / self.grid_height
        cell_width = self.image_width / self.grid_width

        # Generate the grid coordinates
        mx, my = cp.meshgrid(cp.arange(self.grid_width), cp.arange(self.grid_height))
        mx = mx.astype(np.float32).reshape((1, 1, self.grid_height, self.grid_width))
        my = my.astype(np.float32).reshape((1, 1, self.grid_height, self.grid_width))

        # Compute the box corners
        xmin = -(boxes[0, 0, ...] + self.box_offset) * self.box_scale + mx * cell_width
        ymin = -(boxes[0, 1, ...] + self.box_offset) * self.box_scale + my * cell_height
        xmax = (boxes[0, 2, ...] + self.box_offset) * self.box_scale + mx * cell_width
        ymax = (boxes[0, 3, ...] + self.box_offset) * self.box_scale + my * cell_height
        boxes = cp.concatenate([xmin, ymin, xmax, ymax], axis=1)

        # Select the scores that are above the threshold
        scores_mask = scores > self.score_threshold
        scores = scores[scores_mask]
        scores_mask = cp.repeat(scores_mask, 4, axis=1)
        boxes = boxes[scores_mask]

        # Reshape after masking
        n = int(boxes.size / 4)
        boxes = boxes.reshape(4, n)

        return boxes, scores

### Visualization Sink Operator

This operator renders the detection results and saves frames:

In [None]:
class VisualizationSinkOp(Operator):
    def __init__(self, fragment, name="visualization_sink",
                 save_frames=False,
                 output_dir="detection_results",
                 output_video="./scripts/tao_peoplenet/data/people_faces_detected.webm",
                 *args, **kwargs):
        import os
        import time
        import cv2
        import subprocess
        import threading

        # Initialize properties
        self.frame_count = 0
        self.save_frames = save_frames
        self.output_dir = output_dir

        # Set up directories and paths
        self.frames_dir = os.path.join(os.path.dirname(output_video), "temp_frames")
        os.makedirs(self.frames_dir, exist_ok=True)

        self.output_video_base = os.path.splitext(output_video)[0]
        self.output_video_webm = f"{self.output_video_base}.webm"

        self.start_time = time.time()
        self.frames_saved = 0
        self.frame_paths = []

        # Call the base class constructor
        super().__init__(fragment, name=name, *args, **kwargs)

        # Check for ffmpeg
        try:
            result = subprocess.run(['ffmpeg', '-version'],
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.PIPE,
                                  text=True)
            self.ffmpeg_available = result.returncode == 0
            print(f"ffmpeg available: {self.ffmpeg_available}")
        except Exception:
            self.ffmpeg_available = False
            print("ffmpeg not available")

        # Create output directory
        output_video_dir = os.path.dirname(self.output_video_webm)
        if output_video_dir:
            os.makedirs(output_video_dir, exist_ok=True)
            print(f"Ensured output directory exists: {output_video_dir}")

    def setup(self, spec):
        spec.input("in")
        spec.input("original_image")

    def compute(self, op_input, op_output, context):
        # Import necessary modules
        import time
        import numpy as np
        import cv2
        import os

        # Start timing this frame
        frame_start_time = time.time()

        # Get the detection results
        detection_data = op_input.receive("in")
        # Get the original image
        original_image = op_input.receive("original_image")

        if detection_data and original_image:
            # Try to get original image data
            img_array = None

            if isinstance(original_image, dict) and '' in original_image:
                img_tensor = original_image['']

                # Try to access the image data using cupy
                try:
                    import cupy as cp
                    if hasattr(img_tensor, '__cuda_array_interface__'):
                        # Convert GPU tensor to cupy array and then to numpy
                        img_array = cp.asarray(img_tensor).get()
                except (ImportError, Exception):
                    # Don't print error for better performance
                    pass

            # If we couldn't get the original image, create a blank one
            if img_array is None:
                h, w = 1080, 1920
                img_array = np.ones((h, w, 3), dtype=np.uint8) * 80  # Medium gray
                print("Created fallback image (original image access failed)")

            # Make a copy for visualization (in RGB format)
            processed_img = img_array.copy()
            h, w = processed_img.shape[:2]

            # Process detection data to extract face boxes
            face_boxes = []

            if isinstance(detection_data, dict) and 'faces' in detection_data:
                try:
                    # Convert to numpy array using cupy
                    faces_data = detection_data['faces']
                    if faces_data is not None and hasattr(faces_data, '__cuda_array_interface__'):
                        import cupy as cp
                        faces_array = cp.asarray(faces_data).get()

                        # Process face detection coordinates
                        if len(faces_array.shape) == 3 and faces_array.shape[0] == 1:
                            # Process pairs of points as bounding boxes
                            for i in range(0, faces_array.shape[1], 2):
                                if i+1 < faces_array.shape[1]:
                                    # Get normalized coordinates
                                    x1 = float(faces_array[0, i, 0])
                                    y1 = float(faces_array[0, i, 1])
                                    x2 = float(faces_array[0, i+1, 0])
                                    y2 = float(faces_array[0, i+1, 1])

                                    # Skip if all coordinates are very close to zero
                                    if not np.allclose([x1, y1, x2, y2], 0, atol=1e-5):
                                        # Convert normalized coordinates to pixel coordinates
                                        px1, py1 = int(x1 * w), int(y1 * h)
                                        px2, py2 = int(x2 * w), int(y2 * h)

                                        # Ensure coordinates are within image bounds
                                        px1 = max(0, min(px1, w-1))
                                        py1 = max(0, min(py1, h-1))
                                        px2 = max(0, min(px2, w-1))
                                        py2 = max(0, min(py2, h-1))

                                        # Ensure proper orientation (x2 > x1, y2 > y1)
                                        if px1 > px2:
                                            px1, px2 = px2, px1
                                        if py1 > py2:
                                            py1, py2 = py2, py1

                                        # Add face box if it has reasonable dimensions
                                        if (px2 - px1) > 10 and (py2 - py1) > 10:
                                            face_boxes.append([px1, py1, px2, py2])
                except Exception:
                    # Don't print error for better performance
                    pass

            # Calculate timing information
            frame_time = (time.time() - frame_start_time) * 1000  # ms
            total_time = time.time() - self.start_time
            fps = self.frame_count / total_time if total_time > 0 else 0

            # Add frame information to the image - using RGB colors
            font = cv2.FONT_HERSHEY_DUPLEX
            # Yellow in RGB is (255, 255, 0)
            cv2.putText(processed_img, f"Frame: {self.frame_count} - Faces: {len(face_boxes)}",
                      (20, 40), font, 1, (255, 255, 0), 2)

            # Add performance metrics
            cv2.putText(processed_img, f"Frame time: {frame_time:.2f}ms",
                      (20, 80), font, 0.6, (0, 255, 255), 1)
            cv2.putText(processed_img, f"Average FPS: {fps:.1f}",
                      (20, 110), font, 0.6, (0, 255, 255), 1)

            # Draw face boxes - using RGB colors
            for i, box in enumerate(face_boxes):
                try:
                    x1, y1, x2, y2 = box

                    # Draw rectangle (Red in RGB is (255, 0, 0))
                    cv2.rectangle(processed_img, (x1, y1), (x2, y2), (255, 0, 0), 3)

                    # Add label (Red in RGB is (255, 0, 0))
                    cv2.putText(processed_img, f"Face {i+1}", (x1, y1-10),
                              font, 0.7, (255, 0, 0), 2)
                except Exception:
                    # Don't print error for better performance
                    pass

            # Convert to BGR for OpenCV operations
            bgr_img = cv2.cvtColor(processed_img, cv2.COLOR_RGB2BGR)

            # Save frame directly to disk with a unique, sequential name
            frame_path = os.path.join(self.frames_dir, f"frame_{self.frame_count:05d}.jpg")

            # Resize to reduce file size (but keep reasonable quality)
            resized_img = cv2.resize(bgr_img, (854, 480))  # 480p resolution

            # Use moderate compression to save space
            cv2.imwrite(frame_path, resized_img, [cv2.IMWRITE_JPEG_QUALITY, 90])

            # Track this frame path
            self.frame_paths.append(frame_path)
            self.frames_saved += 1

            # Periodically log progress
            if self.frame_count % 10 == 0:
                print(f"Processed {self.frame_count} frames with {len(face_boxes)} faces in latest frame. Avg FPS: {fps:.1f}, Saved frames: {self.frames_saved}")

            # Increment frame counter
            self.frame_count += 1

        return True

    def release(self):
        """Properly release resources when the application is shut down"""
        import time
        import os

        # Calculate total elapsed time and average FPS
        total_time = time.time() - self.start_time
        avg_fps = self.frame_count / total_time if total_time > 0 else 0
        print(f"\n\n===== RELEASING RESOURCES =====")
        print(f"Processed {self.frame_count} frames in {total_time:.2f} seconds ({avg_fps:.2f} FPS average)")
        print(f"Saved {self.frames_saved} frames to disk")

        # Store the frame information for access after application completes
        # This will be used by the post-application processing
        global stored_frame_info
        stored_frame_info = {
            "frames_dir": self.frames_dir,
            "frame_count": self.frames_saved,
            "output_video": self.output_video_webm
        }

        # We'll let the post-application code handle video creation to avoid scheduler issues
        print("Frame information stored for video creation after application completes")

        # Call base class release method
        print("Calling base class release method...")
        super().release()
        print("Base class release completed")
        print("===== RELEASE COMPLETED =====\n\n")

## Application Class

Now we will create the main application class to connect all the operators:

In [None]:
class PeopleAndFaceDetectApp(Application):
    def __init__(self, data_path, model_path, *args, **kwargs):
        """Initialize the face and people detection application"""
        super().__init__(*args, **kwargs)
        self.name = "People and Face Detection App"
        self.sample_data_path = data_path
        self.model_path = model_path

    def compose(self):
        # Create resource allocator
        pool = UnboundedAllocator(self, name="pool")

        # Configure and create the video source
        replayer_args = self.kwargs("replayer_source")
        replayer_args["repeat"] = False

        source = VideoStreamReplayerOp(
            self,
            name="replayer_source",
            directory=self.sample_data_path,
            **replayer_args
        )

        # Format converter for preprocessing
        preprocessor_args = self.kwargs("preprocessor")
        format_converter = FormatConverterOp(
            self,
            name="preprocessor",
            pool=pool,
            **preprocessor_args,
        )

        # Preprocessor for model input
        preprocessor = PreprocessorOp(
            self,
            name="transpose",
            pool=pool,
        )

        # Inference operator
        inference_args = self.kwargs("inference")
        inference_args["model_path_map"] = {
            "face_detect": os.path.join(self.sample_data_path, "resnet34_peoplenet_int8.onnx")
        }
        inference = InferenceOp(
            self,
            name="inference",
            allocator=pool,
            **inference_args,
        )

        # Postprocessor for detection results
        postprocessor_args = self.kwargs("postprocessor")
        postprocessor_args["image_width"] = preprocessor_args["resize_width"]
        postprocessor_args["image_height"] = preprocessor_args["resize_height"]
        postprocessor = PostprocessorOp(
            self,
            name="postprocessor",
            allocator=pool,
            **postprocessor_args,
        )

        # Visualization sink
        vis_args = self.kwargs("visualization") or {}
        visualization_sink = VisualizationSinkOp(
            self,
            name="visualization_sink",
            **vis_args
        )

        # Connect the operators
        self.add_flow(source, format_converter)
        self.add_flow(source, visualization_sink, {("output", "original_image")})
        self.add_flow(format_converter, preprocessor)
        self.add_flow(preprocessor, inference, {("", "receivers")})
        self.add_flow(inference, postprocessor, {("transmitter", "in")})
        self.add_flow(postprocessor, visualization_sink, {("out", "in")})

## Video Creation Helper Function

We'll also define a function to create a video from the saved frames:

In [None]:
def create_video_from_frames(frames_dir, output_video_path):
    """Create a video file from a directory of frame images"""
    print(f"\n\n===== CREATING VIDEO FROM FRAMES =====")

    # Ensure the output directory exists
    os.makedirs(os.path.dirname(output_video_path), exist_ok=True)

    # Find all frame files
    frame_pattern = os.path.join(frames_dir, "frame_*.jpg")
    frame_paths = sorted(glob.glob(frame_pattern))
    frame_count = len(frame_paths)

    print(f"Found {frame_count} frames in {frames_dir}")

    if frame_count == 0:
        print("No frames found - cannot create video")
        return False

    # Create a temporary file listing all frames
    list_file = os.path.join(frames_dir, "frames_list.txt")
    with open(list_file, 'w') as f:
        for frame_path in frame_paths:
            f.write(f"file '{os.path.abspath(frame_path)}'\n")

    print(f"Created frames list file with {frame_count} entries at: {list_file}")

    # Create WebM with ffmpeg
    webm_cmd = [
        'ffmpeg',
        '-y',                  # Overwrite output file if it exists
        '-f', 'concat',        # Concatenate frames
        '-safe', '0',          # Allow absolute paths
        '-i', list_file,       # Input file list
        '-c:v', 'libvpx',      # VP8 codec
        '-b:v', '2M',          # Bitrate (2Mbps for good quality)
        '-crf', '10',          # Quality factor (lower is better)
        '-r', '30',            # Frame rate
        output_video_path      # Output file
    ]

    print(f"Running FFmpeg command: {' '.join(webm_cmd)}")

    result = subprocess.run(webm_cmd,
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE,
                        text=True)

    if result.returncode == 0:
        if os.path.exists(output_video_path) and os.path.getsize(output_video_path) > 0:
            print(f"WebM created successfully: {output_video_path}")
            print(f"WebM file size: {os.path.getsize(output_video_path) / (1024*1024):.2f} MB")
            return True
        else:
            print(f"ERROR: WebM file not created properly (missing or empty)")
            print(f"FFmpeg stderr: {result.stderr}")
    else:
        print(f"WebM creation failed - error code: {result.returncode}")
        print(f"FFmpeg error: {result.stderr}")

        # Try an alternative approach
        print("Trying alternative approach using glob pattern...")
        alt_cmd = [
            'ffmpeg',
            '-y',
            '-framerate', '30',
            '-pattern_type', 'glob',
            '-i', frame_pattern,
            '-c:v', 'libvpx',
            '-b:v', '2M',
            output_video_path
        ]

        print(f"Running alternative FFmpeg command: {' '.join(alt_cmd)}")
        result = subprocess.run(alt_cmd,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.PIPE,
                              text=True)

        if result.returncode == 0:
            print(f"WebM created with alternative approach")
            print(f"WebM file size: {os.path.getsize(output_video_path) / (1024*1024):.2f} MB")
            return True
        else:
            print(f"Alternative approach failed:")
            print(f"FFmpeg stderr: {result.stderr}")

            # Try a third approach with OpenCV
            try:
                print("Trying third approach with OpenCV...")

                # Sort frames numerically to ensure correct order
                sorted_frames = sorted(frame_paths,
                                      key=lambda p: int(os.path.basename(p).split('_')[1].split('.')[0]))

                # Load the first frame to get dimensions
                frame = cv2.imread(sorted_frames[0])
                height, width, _ = frame.shape

                # Create a video writer
                fourcc = cv2.VideoWriter_fourcc(*'VP80')  # WebM format
                video_writer = cv2.VideoWriter(output_video_path,
                                              fourcc,
                                              30.0,  # FPS
                                              (width, height))

                # Add all frames to the video
                for i, frame_path in enumerate(sorted_frames):
                    frame = cv2.imread(frame_path)
                    video_writer.write(frame)
                    if i % 50 == 0:
                        print(f"Processing frame {i}/{len(sorted_frames)}")

                # Release the video writer
                video_writer.release()

                print(f"Video created with OpenCV: {output_video_path}")
                print(f"Video file size: {os.path.getsize(output_video_path) / (1024*1024):.2f} MB")
                return True
            except Exception as e:
                print(f"OpenCV video creation failed: {e}")
                import traceback
                traceback.print_exc()

    return False

## Running the Application

Finally, we'll set up and run the application:

In [None]:
# Initialize global variable to store frame information
stored_frame_info = None

# Configure logging
configure_logging()

# Define paths
script_dir = os.path.join(os.getcwd(), "scripts/tao_peoplenet")
config_file = os.path.join(script_dir, "tao_peoplenet.yaml")
data_path = os.path.join(script_dir, "data/")
model_path = os.path.join(data_path, "resnet34_peoplenet_int8.onnx")

# Create and configure the application
app = PeopleAndFaceDetectApp(data_path, model_path)
app.config(config_file)

# Set scheduler
scheduler = GreedyScheduler(app, name="greedy_scheduler", max_duration_ms=100000000)
app.scheduler(scheduler)

# Run the application
try:
    print("\n\n===== STARTING APPLICATION =====\n\n")
    app.run()
except Exception as e:
    print(f"Error running application: {e}")
    import traceback
    traceback.print_exc()
finally:
    # Create video from frames after application completes
    print("\n\n===== APPLICATION FINISHED =====\n\n")
    print("Waiting for video creation to complete...")
    time.sleep(5)  # Give a moment for any pending operations

    if stored_frame_info:
        frames_dir = stored_frame_info["frames_dir"]
        output_video = stored_frame_info["output_video"]
        create_video_from_frames(frames_dir, output_video)
    else:
        # Try fallback approach
        frames_dir = os.path.join(script_dir, "data/temp_frames")
        output_video = os.path.join(script_dir, "data/people_faces_detected.webm")

        if os.path.exists(frames_dir):
            print(f"Using fallback approach to create video from {frames_dir}")
            create_video_from_frames(frames_dir, output_video)

## **Viewing the Original Video**

First, let's make sure we are able to run the original "un-processed" video before running the "Face Detection" application. We will display the origina video using IPython's display capabilities:

In [None]:
from IPython.display import Video

# Display the video with absolute path and optional width
Video("/content/scripts/tao_peoplenet/data/people.mp4", embed=True, width=800)

## **Viewing the Results**

After running the application, we can view the resulting video using IPython's display capabilities:

In [None]:
# Display the detected faces video using absolute path
Video("/content/scripts/tao_peoplenet/data/people_faces_detected.webm", embed=True, width=800)

This will display the processed video with face detections highlighted.

## Summary

In this section, we've built a complete Holoscan application for face detection using:

1. **Pre-built operators** for video input, format conversion, and inference
2. **Custom operators** for preprocessing, postprocessing, and visualization
3. **GPU acceleration** throughout the pipeline
4. **Configuration-driven approach** using YAML files
5. **Video creation** from processed frames

This application demonstrates the power of Holoscan for building high-performance, GPU-accelerated AI pipelines for real-time video processing.

# Conclusion and Next Steps

## What We've Learned

In this tutorial, we have explored NVIDIA Holoscan, a powerful SDK for building GPU-accelerated streaming AI pipelines. We have covered:

1. **The Motivation for GPU Acceleration**
   - Understanding CPU bottlenecks in AI workflows
   - The benefits of end-to-end GPU acceleration
   - How Holoscan addresses common data pipeline challenges

2. **Core Holoscan Concepts**
   - Operators as the basic building blocks of processing pipelines
   - Data flow between operators through input and output ports
   - Applications that orchestrate operators and data flow
   - Configuring pipelines using YAML files

3. **Building Custom Operators**
   - Implementing the `setup` method to define ports
   - Implementing the `compute` method for data processing
   - Managing operator state across compute calls

4. **Using Pre-Built Operators**
   - Leveraging optimized operators for common tasks
   - Configuring operators for specific requirements
   - Connecting pre-built and custom operators

5. **Building a Complete Application**
   - Face detection using the PeopleNet model
   - Video processing pipeline with multiple stages
   - Real-time visualization and video output

## Key Takeaways

- **GPU Acceleration**: Holoscan enables end-to-end GPU acceleration, significantly improving performance for streaming AI applications.
- **Operator Paradigm**: The operator-based architecture provides a clean, modular approach to building complex pipelines.
- **Configurability**: YAML-based configuration allows for flexible adjustment of parameters without code changes.
- **Production-Ready**: Holoscan provides the tools needed to build high-performance, production-ready AI applications.

## Next Steps

To continue your journey with NVIDIA Holoscan, consider exploring these advanced topics:

1. **Custom GPU Operators**: Develop operators that leverage CUDA for maximum performance.
2. **Multi-GPU Processing**: Scale applications across multiple GPUs for higher throughput.
3. **Edge Deployment**: Deploy Holoscan applications on edge devices with NVIDIA hardware.
4. **Real-time Constraints**: Implement applications with strict latency requirements.
5. **Custom Visualization**: Create advanced visualizations for specific domains.

## Resources

- [NVIDIA Holoscan Documentation](https://docs.nvidia.com/holoscan/) - Comprehensive documentation and guides
- [Holoscan GitHub Repository](https://github.com/nvidia-holoscan/holoscan-sdk) - Source code and examples
- [NVIDIA Developer Forums](https://forums.developer.nvidia.com/) - Community support and discussions
- [NGC Catalog](https://catalog.ngc.nvidia.com/) - Pre-trained models and containers

# Community Contribution: HoloHub

## What is HoloHub?

HoloHub is the official repository for community-contributed Holoscan applications and operators. It serves as a "town-square" where engineering teams can easily contribute, share, and reuse new functionalities while demonstrating novel applications.

The repository provides:
- A curated collection of sample applications across multiple domains
- Reusable operators that accelerate development
- Quality-graded code contributions with clear metadata

## How to Get Involved

### Explore the Repository
Visit the official HoloHub repository at: [https://github.com/nvidia-holoscan/holohub](https://github.com/nvidia-holoscan/holohub)

### Contribute Your Work
Consider contributing to the Holoscan ecosystem by:

- **Sharing your custom operators** - Help others avoid reinventing the wheel
- **Contributing applications** - Demonstrate innovative use cases
- **Reporting bugs and requesting features** - Improve the platform for everyone
- **Participating in community discussions** - Share knowledge and insights
- **Creating tutorials and examples** - Help new users get started

### Contribution Guidelines

All sample applications and operators in HoloHub are marked with a `metadata.json` file that:
- Describes the contribution
- Grades it for code quality
- Marks supported compute platforms
- Lists dependencies
- Provides domain tags (e.g., medical, industrial, aerospace)

Before contributing to HoloHub, please consult the contribution guidelines at: [https://github.com/nvidia-holoscan/holohub/blob/main/CONTRIBUTING.md](https://github.com/nvidia-holoscan/holohub/blob/main/CONTRIBUTING.md)

## Benefits of Participating

By contributing to HoloHub, you can:
- Showcase your expertise to the Holoscan community
- Learn from others' implementations
- Accelerate your development by leveraging existing components
- Connect with other developers working on similar challenges
- Help shape the future of GPU-accelerated streaming AI applications

**Thank you for exploring NVIDIA Holoscan! We hope this tutorial has equipped you with the knowledge and skills to build your own high-performance, GPU-accelerated streaming AI applications. Now, take the next step and become part of the growing Holoscan community through HoloHub!**