# NVIDIA GTC 2023
## Developer Breakout
### Accelerating Enterprise Workflows With Triton Server and DALI 
*[SE52140]*

## Overview

NVIDIA Data Loading Library (DALI) is a collection of highly optimized building blocks and an execution engine that accelerates the data pipeline for computer vision and audio deep learning applications.

Input and augmentation pipelines provided by Deep Learning frameworks fit typically into one of two categories:

* fast, but inflexible - written in C++, they are exposed as a single monolithic Python object with very specific set and ordering of operations it provides
* slow, but flexible - set of building blocks written in either C++ or Python, that can be used to compose arbitrary data pipelines that end up being slow. One of the biggest overheads for this type of data pipelines is Global Interpreter Lock (GIL) in Python. This forces developers to use multiprocessing, complicating the design of efficient input pipelines.
 
DALI stands out by providing both performance and flexibility of accelerating different data pipelines. It achieves that by exposing optimized building blocks which are executed using simple and efficient engine, and enabling offloading of operations to GPU (thus enabling scaling to multi-GPU systems).

It is a single library, that can be easily integrated into different deep learning training and inference applications.

DALI offers ease-of-use and flexibility across GPU enabled systems with direct framework plugins, multiple input data formats, and configurable graphs. DALI can help achieve overall speedup on deep learning workflows that are bottlenecked on I/O pipelines due to the limitations of CPU cycles. Typically, systems with high GPU to CPU ratio are constrained on the host CPU, thereby under-utilizing the available GPU compute capabilities. DALI significantly accelerates input processing on such dense GPU configurations to achieve the overall throughput.

In [None]:
from nvidia.dali import fn, math, ops, pipeline_def, types
import numpy as np
import os
from PIL import Image
import psutil
from matplotlib import gridspec, patches
import matplotlib.pyplot as plt
from timeit import default_timer as timer

%matplotlib inline
%config InlineBackend.figure_format='retina'
plt.style.use("dark_background")

___
## Pipeline

At the core of data processing with DALI lies the concept of a data processing pipeline. It is composed of multiple operations connected in a directed graph and contained in an object of class class `nvidia.dali.Pipeline`. This class provides functions necessary for defining, building and running data processing pipelines.

Let us start with defining a very simple pipeline for a classification task determining whether a picture contains a dog or a kitten. We prepared a directory structure containing COCO data.

Our simple pipeline will read images from this directory, decode them and return (image, label) pairs.

The easiest way to create a pipieline is by using the `pipeline_def` decorator. In the `simple_pipeline` function we define the operations to be performed and the flow of the computation between them.

1. Use `fn.readers.coco` to read jpegs (encoded images) and labels.

2. Use the `fn.decoders.image` operation to decode images from jpeg to RGB.

3. Specify which of the intermediate variables should be returned as the outputs of the pipeline.

In [None]:
def show_images(image_batch, title=None, index=0, dpi=60):
    fig, ax = plt.subplots(dpi=dpi)
    ax.imshow(image_batch[index])
    plt.axis("off")
    plt.title(title)
    plt.show()

In [None]:
def coco_reader_def(ratio=False):
    inputs, bboxes, labels, polygons, vertices = fn.readers.coco(
        file_root=coco_file_root,
        annotations_file=coco_annotations_file,
        polygon_masks=True,  # Load segmentation mask data as polygons
        ratio=ratio,  # Bounding box and mask polygons to be expressed in relative coordinates
        ltrb=True,  # Bounding boxes to be expressed as left, top, right, bottom coordinates
    )
    return inputs, bboxes, labels, polygons, vertices

In [None]:
def speedtest(pipeline, batch, n_threads, device_id=0):
    pipe = pipeline(batch_size=batch, num_threads=n_threads, device_id=device_id)
    pipe.build()
    # warmup
    for i in range(5):
        pipe.run()
    # test
    n_test = 10
    t_start = timer()
    for i in range(n_test):
        pipe.run()
    t = timer() - t_start
    return "{:,.0f} items/s".format((n_test * batch) / t)

In [None]:
@pipeline_def
def cpu_pipeline():
    encoded, bboxes, labels, polygons, vertices = coco_reader_def()
    images = fn.decoders.image(encoded, device="cpu")
    return images


@pipeline_def
def gpu_pipeline():
    encoded, bboxes, labels, polygons, vertices = coco_reader_def()
    images = fn.decoders.image(encoded, device="mixed", hw_decoder_load=0.75)
    return images

In [None]:
@pipeline_def
def cpu_augmentation_pipeline():
    encoded, bboxes, labels, polygons, vertices = coco_reader_def()
    original = fn.decoders.image(encoded, device="cpu")
    sliced = fn.slice(
        original, rel_start=[0.3, 0.2], rel_shape=[0.5, 0.6], axis_names="HW"
    )
    sphered = fn.sphere(original)
    rotated = fn.rotate(original, angle=30)
    warped = fn.warp_affine(original, matrix=[1.0, 0.8, 0.0, 0.0, 1.2, 0.0])
    hflip = fn.flip(original, vertical=0, horizontal=1)
    bced = fn.brightness_contrast(original, brightness=0.5, contrast=1.5)
    dist = fn.jpeg_compression_distortion(original, quality=5)
    water = fn.water(original)

    return original, sliced, sphered, rotated, warped, hflip, bced, dist, water


@pipeline_def
def gpu_augmentation_pipeline():
    encoded, bboxes, labels, polygons, vertices = coco_reader_def()
    original = fn.decoders.image(encoded, device="mixed", hw_decoder_load=0.75)
    sliced = fn.slice(
        original, rel_start=[0.3, 0.2], rel_shape=[0.5, 0.6], axis_names="HW"
    )
    sphered = fn.sphere(original)
    rotated = fn.rotate(original, angle=30)
    warped = fn.warp_affine(original, matrix=[1.0, 0.8, 0.0, 0.0, 1.2, 0.0])
    hflip = fn.flip(original, vertical=0, horizontal=1)
    bced = fn.brightness_contrast(original, brightness=0.5, contrast=1.5)
    dist = fn.jpeg_compression_distortion(original, quality=5)
    water = fn.water(original)

    return original, sliced, sphered, rotated, warped, hflip, bced, dist, water

In [None]:
dali_extra_dir = "/data"
coco_file_root = "/data/db/coco/images"
coco_annotations_file = "/data/db/coco/instances.json"
batch_size = 4
seed = 42
initial_fill = 12
n_threads = psutil.cpu_count()
dpi = 130

run_speedtest = True
speedtest_batch_size = psutil.cpu_count(logical=False)

___
## COCO Reader with Augmentations

Here is an example that demonstrates how to combine the COCO Reader, which loads data from a COCO dataset, with some of the typical augmentations used in image detection and segmentation use cases. The COCO dataset consists of a directory with images and an annotations file containing information about bounding boxes, labels and segmentation masks.

A typical augmentation applied in detection and segmentation use cases is a random crop of the image with the restriction that at least one ground truth box is present in the cropped image. In DALI, we use RandomBBoxCrop for that. RandomBBoxCrop operator takes as an input the bounding boxes and the labels associated with them, and a set of constraints for the cropping operation. The result are the cropping window anchor and shape, as well as the processed bounding boxes and labels. The anchor and shape outputs, expressed in relative coordinates, can be directly fed into DALI's Slice operator to extract the region of interest of the image. The output bounding boxes and labels are processed to contain only the ones within the cropping window, and the coordinates are mapped to the new coordinate space. RandomBBoxCrop does not process segmentation masks, so the mask coordinates need to be mapped to the new coordinate space separately.

In [None]:
gpu_augmentation_pipe = gpu_augmentation_pipeline(
    batch_size=batch_size, num_threads=n_threads, device_id=0, seed=seed
)
gpu_augmentation_pipe.build()
(
    original,
    sliced,
    sphered,
    rotated,
    warped,
    hflip,
    bced,
    dist,
    water,
) = gpu_augmentation_pipe.run()

image_augmentations = {
    "original": original,
    "sliced": sliced,
    "sphered": sphered,
    "rotated": rotated,
    "warped": warped,
    "horizontal flip": hflip,
    "brightness & contrast": bced,
    "jpg distortion": dist,
    "water": water,
}

for k, v in image_augmentations.items():
    show_images(v.as_cpu(), title=k, index=3, dpi=dpi)

In [None]:
if run_speedtest:
    pipelines_speedtest = {
        "cpu_pipeline": cpu_pipeline,
        "gpu_pipeline": gpu_pipeline,
        "cpu_augmentation_pipeline": cpu_augmentation_pipeline,
        "gpu_augmentation_pipeline": gpu_augmentation_pipeline,
    }

    for k, v in pipelines_speedtest.items():
        print(
            "{}: {}".format(
                k, speedtest(v, speedtest_batch_size, n_threads, device_id=1)
            )
        )

In [None]:
def plot_coco_sample(
    image,
    bboxes,
    labels,
    mask_polygons,
    mask_vertices,
    relative_coords=False,
    title=None,
    dpi=60,
):
    H, W = image.shape[0], image.shape[1]
    fig, ax = plt.subplots(dpi=dpi)
    ax.imshow(image)
    plt.axis("off")
    if title:
        plt.title(title)

    # Bounding boxes
    for bbox, label in zip(bboxes, labels):
        l, t, r, b = bbox * [W, H, W, H] if relative_coords else bbox
        rect = patches.Rectangle(
            (l, t),
            width=(r - l),
            height=(b - t),
            linewidth=1,
            edgecolor="#76b900",
            facecolor="none",
        )
        ax.add_patch(rect)

    # Segmentation masks
    for polygon in mask_polygons:
        mask_idx, start_vertex, end_vertex = polygon
        polygon_vertices = mask_vertices[
            start_vertex:end_vertex
        ]  # Select polygon vertices
        # Scale relative coordinates to the image dimensions, if necessary
        polygon_vertices = (
            polygon_vertices * [W, H] if relative_coords else polygon_vertices
        )
        poly = patches.Polygon(
            xy=polygon_vertices, closed=True, facecolor="#76b900", alpha=0.7
        )
        ax.add_patch(poly)

    plt.show()


def show(outputs, relative_coords=False, title=None, index=0, dpi=60):
    images, bboxes, labels, mask_polygons, mask_vertices = outputs
    plot_coco_sample(
        images.as_cpu().at(index),
        bboxes.at(index),
        labels.at(index),
        mask_polygons.at(index),
        mask_vertices.at(index),
        relative_coords=relative_coords,
        title=title,
        dpi=dpi,
    )

In [None]:
@pipeline_def
def coco_pipeline():
    encoded, bboxes, labels, polygons, vertices = coco_reader_def()
    images = fn.decoders.image(
        encoded, device="mixed", output_type=types.RGB, hw_decoder_load=0.75
    )
    return images, bboxes, labels, polygons, vertices


coco_pipe = coco_pipeline(
    batch_size=batch_size, num_threads=n_threads, device_id=0, seed=seed
)
coco_pipe.build()
outputs = coco_pipe.run()

show(outputs, title="coco_pipeline", index=3, dpi=dpi)

In [None]:
@pipeline_def
def coco_pipeline_bbox_flip():
    encoded, bboxes, labels, polygons, vertices = coco_reader_def(ratio=True)
    orig_shape = fn.peek_image_shape(encoded)
    images = fn.decoders.image(
        encoded, device="mixed", output_type=types.RGB, hw_decoder_load=0.75
    )
    px = fn.random.uniform(range=(0, 1))
    py = fn.random.uniform(range=(0, 1))
    ratio = fn.random.uniform(range=(1, 2))
    images = fn.paste(images, paste_x=px, paste_y=py, ratio=ratio, fill_value=(0, 0, 0))
    bboxes = fn.bbox_paste(bboxes, paste_x=px, paste_y=py, ratio=ratio, ltrb=True)

    scale = 1.0 / ratio
    margin = ratio - 1.0
    px_1 = scale * px * margin
    py_1 = scale * py * margin
    ver_x = scale * fn.slice(vertices, 0, 1, axes=[1]) + px_1
    ver_y = scale * fn.slice(vertices, 1, 1, axes=[1]) + py_1
    vertices = fn.cat(ver_x, ver_y, axis=1)

    should_flip = fn.random.coin_flip(
        probability=1.0
    )  # 100% probability for demo purposes
    images = fn.flip(images, horizontal=should_flip)
    bboxes = fn.bb_flip(bboxes, horizontal=should_flip, ltrb=True)
    vertices = fn.coord_flip(vertices, flip_x=should_flip)

    return images, bboxes, labels, polygons, vertices


coco_pipe_bbox_flip = coco_pipeline_bbox_flip(
    batch_size=batch_size, num_threads=n_threads, device_id=0, seed=seed
)
coco_pipe_bbox_flip.build()
outputs = coco_pipe_bbox_flip.run()
show(outputs, relative_coords=True, title="coco_pipeline_bbox_flip", index=3, dpi=dpi)

In [None]:
@pipeline_def
def coco_pipeline_bbox_random():
    encoded, bboxes, labels, polygons, vertices = coco_reader_def(ratio=True)
    images = fn.decoders.image(
        encoded, device="mixed", output_type=types.RGB, hw_decoder_load=0.75
    )
    input_shape = fn.peek_image_shape(encoded, dtype=types.INT32)[:2]
    anchor_rel, shape_rel, bboxes, labels, bbox_indices = fn.random_bbox_crop(
        bboxes,
        labels,
        aspect_ratio=[0.5, 2],
        thresholds=[0.0],
        allow_no_crop=False,
        scaling=[0.3, 0.6],
        seed=seed,
        bbox_layout="xyXY",
        output_bbox_indices=True,
    )

    # Partial decoding of the image
    images = fn.decoders.image_slice(
        encoded,
        anchor_rel,
        shape_rel,
        normalized_anchor=True,
        normalized_shape=True,
        device="cpu",
    )
    # Cropped image dimensions
    crop_shape = fn.shapes(images, dtype=types.FLOAT)
    crop_h = fn.slice(crop_shape, 0, 1, axes=[0])
    crop_w = fn.slice(crop_shape, 1, 1, axes=[0])

    images = images.gpu()

    # Adjust masks coordinates to the coordinate space of the cropped image, while also converting
    # relative to absolute coordinates by mapping the top-left corner (anchor_rel_x, anchor_rel_y), to (0, 0)
    # and the bottom-right corner (anchor_rel_x+shape_rel_x, anchor_rel_y+shape_rel_y) to (crop_w, crop_h)
    MT_vertices = fn.transforms.crop(
        from_start=anchor_rel,
        from_end=(anchor_rel + shape_rel),
        to_start=(0.0, 0.0),
        to_end=fn.cat(crop_w, crop_h),
    )
    vertices = fn.coord_transform(vertices, MT=MT_vertices)

    # Convert bounding boxes to absolute coordinates
    MT_bboxes = fn.transforms.crop(
        to_start=(0.0, 0.0, 0.0, 0.0), to_end=fn.cat(crop_w, crop_h, crop_w, crop_h)
    )
    bboxes = fn.coord_transform(bboxes, MT=MT_bboxes)

    return images, bboxes, labels, polygons, vertices


coco_pipe_bbox_random = coco_pipeline_bbox_random(
    batch_size=batch_size, num_threads=n_threads, device_id=0, seed=seed
)
coco_pipe_bbox_random.build()
outputs = coco_pipe_bbox_random.run()
show(outputs, title="coco_pipeline_bbox_random", index=3, dpi=dpi)

In [None]:
if run_speedtest:
    pipelines_speedtest = {
        "coco_pipeline": coco_pipeline,
        "coco_pipeline_bbox_flip": coco_pipeline_bbox_flip,
        "coco_pipeline_bbox_random": coco_pipeline_bbox_random,
    }

    for k, v in pipelines_speedtest.items():
        print(
            "{}: {}".format(
                k, speedtest(v, speedtest_batch_size, n_threads, device_id=1)
            )
        )

___
## Numpy Reader with Volumetric Data

This example shows how to read Numpy array files (*.npy), with DALI's ``readers.numpy`` reader. This notebook also shows how to use DALI to load numpy files directly to GPU memory, thanks to NVIDIA GPUDirect Storage, and how to use the region-of-interest (ROI) API to load regions of the array.

The reader extracts the shape and the data type information directly from the files. Please note that only Numpy v1 (and not v2) files are currently supported. Numpy v1 are the most commonly used. See the [numpy file format specification](https://numpy.org/neps/nep-0001-npy-format.html) for more details.

The operator returns arrays with shapes taken from the files. DALI tensors are always stored in C (row-major) order. If the files contain the data in FORTRAN (column-major) order, the operator will automatically transpose the data to C order. This transposition adds significant time to the loading process. Therefore, we recommend storing files in C order when possible.

In [None]:
data_dir_2d = os.path.join(
    dali_extra_dir, "db", "3D", "MRI", "Knee", "npy_2d_slices", "STU00001"
)
data_dir_3d = os.path.join(
    dali_extra_dir, "db", "3D", "MRI", "Knee", "npy_3d", "STU00001"
)

In [None]:
def plot_batch(np_arrays, nsamples=None, dpi=60):
    if nsamples is None:
        nsamples = len(np_arrays)
    fig, axvec = plt.subplots(
        nrows=1, ncols=nsamples, figsize=(10, 10 * nsamples), dpi=dpi
    )
    for i in range(nsamples):
        ax = axvec[i]
        ax.tick_params(left=False, bottom=False, labelleft=False, labelbottom=False)
        ax.imshow(Image.fromarray(np_arrays[i]))
    plt.tight_layout()


def run(p):
    p.build()  # build the pipeline
    outputs = p.run()  # Run once
    # Getting the batch as a list of numpy arrays, for displaying
    batch = [np.array(outputs[0][s]) for s in range(batch_size)]
    return batch

In [None]:
ser00001_2d = [
    np.load(os.path.join(data_dir_2d, "SER00001", f"{i}.npy")) for i in range(4)
]
plot_batch(ser00001_2d, dpi=dpi)

In [None]:
ser00001_3d = np.load(os.path.join(data_dir_3d, "SER00001.npy"))
plot_batch([ser00001_3d[0], ser00001_3d[1], ser00001_3d[2], ser00001_3d[3]], dpi=dpi)

#### Region-of-interest (ROI) API

In the example shown above, we see that the relevant data is concentrated in the upper left quadrant of the image and the rest does not contain useful information. This is not true for all the images in the dataset, but it will serve us as a good example to demonstrate ROI reading.

Numpy reader allows the user to specify a region of interest, equivalent to the arguments specified to ``slice`` operation. The benefit is that the reader will only read the relevant part of the file, saving I/O bandwidth and memory utilization. Note that setting the ``dont_use_mmap`` argument to False will negate that performance benefit, with the entire file being read first and then sliced.

The ROI can be specified in absolute or relative terms, and can be specified on a subset of the array's axes. For dimensions not specified in the ROI, the whole extent of the array shall be used. (see arguments ``roi_start``, ``rel_roi_start``, ``roi_end``, ``rel_roi_end``, ``roi_shape``, ``rel_roi_shape``, ``axes``).

In [None]:
@pipeline_def(batch_size=batch_size, num_threads=n_threads, device_id=0)
def pipe_roi1():
    data = fn.readers.numpy(
        device="cpu",
        file_root=samp_2d_data_dir,
        files=samp_2d_files,
        rel_roi_start=[0.1, 0.01],
        rel_roi_end=[0.4, 0.5],
        read_ahead=True,
        dont_use_mmap=True,
        pad_last_batch=True,
    )
    return data


samp_2d_data_dir = os.path.join(data_dir_2d, "SER00001")
samp_2d_files = ["0.npy"]
data_roi1 = run(pipe_roi1())

In [None]:
plot_batch(data_roi1, dpi=dpi)

#### GPUDirect Storage Support
DALI Numpy Reader supports [GPUDirect Storage (GDS)](https://developer.nvidia.com/gpudirect-storage) via libcufile. GDS enables a direct data path between storage and GPU memory and avoids extra copies through a bounce buffer in the CPU's memory.

In order to enable GDS support in DALI, make sure GDS is installed. On Systems with CUDA 11.4 or newer, GDS is already installed as part of the CUDA 11.4 SDK. For older CUDA releases, please install GDS separately (follow the link above for instructions).

Once GDS is installed, it can be used by simply switching the device of the reader to ``"gpu"``. Note that if GDS is not available, you will likely see a CUDA Driver API error when trying to execute the pipeline.

In [None]:
@pipeline_def(batch_size=batch_size, num_threads=n_threads, device_id=0)
def pipe_gds():
    data = fn.readers.numpy(
        device="gpu",
        file_root=samp_2d_data_dir,
        files=samp_2d_files,
        rel_roi_start=[0.1, 0.01],
        rel_roi_end=[0.4, 0.5],
        read_ahead=True,
        dont_use_mmap=True,
        pad_last_batch=True,
    )
    return data


samp_2d_data_dir = os.path.join(data_dir_2d, "SER00001")
samp_2d_files = ["0.npy"]
p = pipe_gds()
p.build()
pipe_out = p.run()

data_gds = pipe_out[0].as_cpu().as_array()

In [None]:
plot_batch(data_gds, dpi=dpi)

In [None]:
@pipeline_def(batch_size=batch_size, num_threads=n_threads, device_id=0)
def pipe_3d_gds():
    data = fn.readers.numpy(
        device="gpu",
        file_root=samp_3d_data_dir,
        files=samp_3d_files,
        #         rel_roi_start=[0, 0.1, 0.01],
        #         rel_roi_end=[1, 0.4, 0.5],
        read_ahead=True,
        dont_use_mmap=True,
        pad_last_batch=True,
    )

    data = fn.transpose(data, perm=[1, 2, 0])
    data = fn.resize(
        data,
        resize_x=320,
        resize_y=320,
        mode="stretch",
        interp_type=types.INTERP_LANCZOS3,
        mag_filter=types.INTERP_LANCZOS3,
    )
    data = fn.transpose(data, perm=[2, 0, 1])

    return data


# Tensors in the list must have the same shape
samp_3d_data_dir = data_dir_3d
samp_3d_files = ["SER00005.npy"]

p = pipe_3d_gds()
p.build()
pipe_out = p.run()

data_gds = pipe_out[0].as_cpu().as_array()

In [None]:
plot_batch(data_gds[0], dpi=dpi)

In [None]:
if run_speedtest:
    pipelines_speedtest = {
        "pipe_roi1": pipe_roi1,
        "pipe_gds": pipe_gds,
    }

    for k, v in pipelines_speedtest.items():
        print(
            "{}: {}".format(
                k, speedtest(v, speedtest_batch_size, n_threads, device_id=1)
            )
        )

___
## Audio Spectrogram

In this example we will go through the steps to build a DALI audio processing pipeline, including the calculation of a spectrogram. A spectrogram is a representation of a signal (e.g. an audio signal) that shows the evolution of the frequency spectrum in time.

Typically, a spectrogram is calculated by computing the fast fourier transform (FFT) over a series of overlapping windows extracted from the original signal. The process of dividing the signal in short term sequences of fixed size and applying FFT on those independently is called Short-time Fourier transform (STFT). The spectrogram is then calculated as the (typically squared) complex magnitude of the STFT.

Extracting short term windows of the original image affects the calculated spectrum by producing aliasing artifacts. This is often called spectral leakage. To control/reduce the spectral leakage effect, we use different window functions when extracting the windows. Some examples of window functions are: Hann, Hanning, etc.

It is beyond the scope of this example to go deeper into the details of the signal processing concepts we mentioned above. More information can be found here:
- [STFT](https://en.wikipedia.org/wiki/Short-time_Fourier_transform)
- [Window functions](https://en.wikipedia.org/wiki/Window_function)

In [None]:
import librosa as librosa
import librosa.display

In [None]:
def show_spectrogram(spec, title, sr, hop_length, y_axis="log", x_axis="time", dpi=60):
    fig, ax = plt.subplots(dpi=dpi)
    librosa.display.specshow(
        spec, sr=sr, y_axis=y_axis, x_axis=x_axis, hop_length=hop_length
    )

    plt.title(title)
    plt.colorbar(format="%+2.0f dB")
    plt.tight_layout()
    plt.show()

In [None]:
sample_data = os.path.join(dali_extra_dir, "db", "audio", "wav", "237-134500-0000.wav")

# Size of the FFT, which will also be used as the window length
n_fft = 2048

# Step or stride between windows. If the step is smaller than the window lenght, the windows will overlap
hop_length = 512

# Load sample audio file
y, sr = librosa.load(sample_data)

# Calculate the spectrogram as the square of the complex magnitude of the STFT
spectrogram_librosa = (
    np.abs(
        librosa.stft(
            y,
            n_fft=n_fft,
            hop_length=hop_length,
            win_length=n_fft,
            window="hann",
            pad_mode="reflect",
        )
    )
    ** 2
)

In [None]:
spectrogram_librosa_db = librosa.power_to_db(spectrogram_librosa, ref=np.max)
show_spectrogram(
    spectrogram_librosa_db, "Librosa power spectrogram", sr, hop_length, dpi=dpi
)

### Calculating the Spectrogram using DALI

To demonstrate DALI's [spectrogram](../../operations/nvidia.dali.fn.spectrogram.html) operator we will define a DALI pipeline. For demonstration purposes, we can just feed the same input in every iteration, as we will be only calculating one spectrogram.

In [None]:
@pipeline_def
def spectrogram_pipe(nfft, window_length, window_step, device="cpu"):
    audio = types.Constant(device=device, value=audio_data)
    spectrogram = fn.spectrogram(
        audio,
        device=device,
        nfft=nfft,
        window_length=window_length,
        window_step=window_step,
    )
    return spectrogram


audio_data = np.array(y, dtype=np.float32)

pipe = spectrogram_pipe(
    device="gpu",
    batch_size=1,
    num_threads=n_threads,
    device_id=0,
    nfft=n_fft,
    window_length=n_fft,
    window_step=hop_length,
)
pipe.build()
outputs = pipe.run()
spectrogram_dali = outputs[0][0].as_cpu()

In [None]:
spectrogram_dali_db = librosa.power_to_db(spectrogram_dali, ref=np.max)
show_spectrogram(spectrogram_dali_db, "DALI power spectrogram", sr, hop_length, dpi=dpi)

___
## Geometric Transforms

In this example we demonstrate the operators from `transforms` module and how they can be used for transforming images and point clouds.



### Warp Operators
All warp operators work by calculating the output pixels by sampling the source image at transformed coordinates:

${Out}(x, y) = {In}(x_{src}, y_{src})$

This way each output pixel is calculated exactly once.

If the source coordinates do not point exactly to pixel centers, the values of neighboring pixels will be interpolated or the nearst pixel is taken, depending on the interpolation method specified in the `interp_type` argument.

### Affine Transform

The operators from `transforms` module can generate and combine transform matrices for different kinds of affine transforms. An affine transform is defined by the formula:

$
X_{out}
= \begin{vmatrix}
M & T
\end{vmatrix}
\begin{vmatrix}
X_{in} \\
1
\end{vmatrix}
$

Where $X_{in}$ is an input point, $X_{out}$ - the corresponding output, $M$ - linear part of the transformation and $T$ - a translation vector.

If the points are in 2D space, the formula can be written as:

$
\begin{vmatrix}
x_{out} \\
y_{out}
\end{vmatrix}
= \begin{vmatrix}
m_{00} & m_{01} & t_x \\
m_{10} & m_{11} & t_y
\end{vmatrix}
\begin{vmatrix}
x_{in} \\
y_{in} \\
1
\end{vmatrix}
$

### Transform Catalogue

There are several transforms available in `transforms` module. Each of these operators can generate an affine transform matrix and combine it with a pre-existing transform. Here's the list of available transforms:

* `rotation` - rotate by given angle (in degrees) around given point and axis (for 3D only)
* `translation` - translate by given offset
* `scale` - scale by given factor
* `shear` - shear by given factors or angles; there are 2 shear factors for 2D and 6 factors for 3D
* `crop` - translates and scales so that input corners (`from_start`, `from_end`) map to output corners (`to_start`, `to_end`).

The documentation of the operators contains the detailed information about their parameters.

There's also the operator `combine` which combines multiple affine transforms.

In [None]:
def encoded_images_sizes(jpegs):
    shapes = fn.peek_image_shape(jpegs)  # the shapes are HWC
    h, w = shapes[0], shapes[1]  # extract H and W ...
    return fn.stack(w, h)  # ...and concatenate

In [None]:
dali_extra_dir = os.environ["DALI_EXTRA_PATH"]
root_dir = os.path.join(dali_extra_dir, "db", "face_landmark")
image_files = ["{}.jpeg".format(i) for i in range(6)]
keypoint_files = ["{}.npy".format(i) for i in range(6)]

In [None]:
def show(images, landmarks, title="", dpi=60):
    if hasattr(images, "as_cpu"):
        images = images.as_cpu()
    batch_size = len(images)

    fig = plt.figure(figsize=(16, 14), dpi=dpi)
    plt.suptitle(None)
    columns = 3
    rows = int(batch_size / columns)
    gs = gridspec.GridSpec(rows, columns)
    for i in range(batch_size):
        ax = plt.subplot(gs[i])
        plt.axis("off")
        plt.title(title)
        img = images.at(i)
        r = 0.002 * max(img.shape[0], img.shape[1])
        lm = 0
        for p in landmarks.at(i):
            circle = patches.Circle(p, r, color=(0, 1, 0, 1))
            ax.add_patch(circle)
            # uncomment to see show keypoint to feature visually.
            # plt.text(p[0], p[1], lm, fontsize=6)
            lm += 1
        plt.imshow(img)


def gallery(pipe_out, titles, dpi=60):
    pipe_out = [x.as_cpu() if hasattr(x, "as_cpu") else x for x in pipe_out]

    batch_size = len(pipe_out[0])

    fig = plt.figure(figsize=(16, 24), dpi=dpi)
    plt.suptitle(None)
    columns = batch_size
    rows = len(pipe_out) // 2
    gs = gridspec.GridSpec(rows, columns)
    flat = 0
    for j in range(0, len(pipe_out), 2):
        for i in range(batch_size):
            ax = plt.subplot(gs[flat])
            plt.axis("off")
            plt.title("")
            img = pipe_out[j].at(i)
            r = 0.002 * max(img.shape[0], img.shape[1])
            for p in pipe_out[j + 1].at(i):
                circle = patches.Circle(p, r, color=(0, 1, 0, 1))
                ax.add_patch(circle)
            plt.imshow(img)
            if i == 0:
                plt.title(titles[j // 2])
            flat += 1

In [None]:
@pipeline_def
def transforms_gallery_pipe():
    jpegs, _ = fn.readers.file(file_root=root_dir, files=image_files)
    images = fn.decoders.image(jpegs, device="mixed", hw_decoder_load=0.75)
    keypoints = fn.readers.numpy(file_root=root_dir, files=keypoint_files)

    size = encoded_images_sizes(jpegs)
    center = size / 2

    outputs = []

    transforms = [
        fn.transforms.translation(offset=fn.random.uniform(range=(-100, 100), shape=2)),
        fn.transforms.rotation(angle=fn.random.uniform(range=(-45, 45)), center=center),
        fn.transforms.scale(
            scale=fn.random.uniform(range=(0.5, 2), shape=[2]), center=center
        ),
        fn.transforms.shear(
            shear=fn.random.uniform(range=(-1, 1), shape=[2]), center=center
        ),
        fn.transforms.crop(
            from_start=size * 0.1,
            from_end=size * 0.8,
            to_start=[0, 0],
            to_end=size * 1.0,
        ),
    ]

    for mt in transforms:
        out_img = fn.warp_affine(images, matrix=mt, fill_value=0, inverse_map=False)
        out_kp = fn.coord_transform(keypoints, MT=mt)
        outputs += [out_img, out_kp]
    return tuple(outputs)


pipe = transforms_gallery_pipe(
    batch_size=6, num_threads=n_threads, device_id=0, seed=seed
)
pipe.build()
pipe_out = pipe.run()

In [None]:
gallery(pipe_out, ["translation", "rotation", "scale", "shear", "crop"], dpi=dpi)

### Facial Landmark Alignment with DALI

In [None]:
def get_face_rotation(points):
    center_of_eyes = points[27]
    left_eye = points[36]
    right_eye = points[45]
    dY = right_eye[1] - left_eye[1]
    dX = right_eye[0] - left_eye[0]
    tan = dY / dX

    # radians to degrees: 180/pi
    rotation = -1 * (math.atan(tan) * 57.29577951308232)
    return rotation, center_of_eyes


@pipeline_def
def facial_landmark_align_pipe():
    jpegs, _ = fn.readers.file(file_root=root_dir, files=image_files)
    images = fn.decoders.image(
        jpegs, device="mixed", output_type=types.RGB, hw_decoder_load=0.75
    )
    keypoints = fn.readers.numpy(file_root=root_dir, files=keypoint_files)
    rotation, center = get_face_rotation(keypoints)
    mt = fn.transforms.rotation(angle=rotation, center=center)
    images = fn.warp_affine(images, matrix=mt, fill_value=0, inverse_map=False)
    keypoints = fn.coord_transform(keypoints, MT=mt)
    return images, keypoints, rotation


pipe = facial_landmark_align_pipe(
    batch_size=6, num_threads=n_threads, device_id=0, seed=seed
)
pipe.build()
images, keypoints, rotation = pipe.run()

show(images, keypoints, title="Face Alignment with Warp Affine", dpi=dpi)

In [None]:
if run_speedtest:
    pipelines_speedtest = {
        "transforms_gallery_pipe": transforms_gallery_pipe,
        "facial_landmark_align_pipe": facial_landmark_align_pipe,
    }

    for k, v in pipelines_speedtest.items():
        print(
            "{}: {}".format(
                k, speedtest(v, speedtest_batch_size, n_threads, device_id=1)
            )
        )

___
## Simple Video Pipeline Reading From Multiple Files

In this example, we will go through the creation of a pipeline using the readers.video operator. The pipeline will return a batch of frame sequences. These sequences are an arbitrary number of frames (images). The difference being that images are or dimension HWC whereas sequences are of dimension FHWC.

For more information on the readers.video parameters, please look at the documentation.

We need some video containers to process. We can use [Sintel](https://en.wikipedia.org/wiki/Sintel) trailer, which is an mp4 container containing an h264 video and distributed under the Create Common license. Let’s split it into 10s clips in order to check how `readers.Video` handles multiple video files. This can be done easily with the `ffmpeg` standalone tool.

Then we can set the parameters that will be use in the pipeline. The `count` parameter will define how many frames we want in each sequence sample.

We can replace `video_directory` with any other directory containing video container files recognized by __FFmpeg__.

In [None]:
def show_video(sequence, dpi=60):
    columns = 4
    rows = (sequence_length + 1) // (columns)
    fig = plt.figure(figsize=(32, (16 // columns) * rows), dpi=dpi)
    gs = gridspec.GridSpec(rows, columns)
    for j in range(rows * columns):
        plt.subplot(gs[j])
        plt.axis("off")
        plt.imshow(sequence[j])

In [None]:
batch_size = 2
sequence_length = 8
initial_prefetch_size = 16
video_directory = os.path.join(
    os.environ["DALI_EXTRA_PATH"], "db", "video", "sintel", "video_files"
)
video_files = [video_directory + "/" + f for f in os.listdir(video_directory)]
n_iter = 6

In [None]:
@pipeline_def
def video_pipe():
    videos = fn.readers.video(
        device="gpu",
        filenames=video_files,
        sequence_length=sequence_length,
        shard_id=0,
        num_shards=1,
        random_shuffle=True,
        initial_fill=initial_prefetch_size,
        file_list_include_preceding_frame=True,
    )

    resized = fn.resize(
        videos,
        size=[900, 1600],
        mode="stretch",
        interp_type=types.INTERP_LANCZOS3,
        mag_filter=types.INTERP_LANCZOS3,
    )

    flipped = fn.flip(resized)
    motion = fn.optical_flow(videos)

    return flipped, motion


pipe = video_pipe(batch_size=batch_size, num_threads=n_threads, device_id=0, seed=seed)
pipe.build()
pipe_out = pipe.run()
sequences_out = pipe_out[0].as_cpu().as_array()
motion_out = pipe_out[1].as_cpu().as_array()

In [None]:
show_video(sequences_out[0], dpi=dpi)

In [None]:
if run_speedtest:
    pipelines_speedtest = {
        "video_pipe": video_pipe,
    }

    for k, v in pipelines_speedtest.items():
        print(
            "{}: {}".format(
                k, speedtest(v, speedtest_batch_size, n_threads, device_id=1)
            )
        )

___
## Triton Model Serialization & Example Performance Delta

![](https://developer.nvidia.com/sites/default/files/akamai/triton.png)
- __Client preprocessing__: Samples are decoded, resized, and normalized in parallel using OpenCV.
- __Server preprocessing__: The Python client script sends encoded images to the server, where the whole DALI preprocessing happens.

![](https://developer-blogs.nvidia.com/wp-content/uploads/2021/04/throughput-vs-latency-1.png)

<div class="alert alert-info">
<sub>Throughput vs. latency plots for both scenarios with batches of size 1, 4, 8, 32. The more to the left and to the top, the better the result is. The performance results were collected on a DGX A100 machine.</sub>
</div>

### Sample Model Preprocessing
Facial Recognition using [NVIDIA's pre-trained FaceDetect model](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/tao/models/facenet)

### Model Overview
The model described in this card detects one or more faces in the given image / video. Compared to the FaceirNet model, this model gives better results on RGB images and smaller faces.

### Model Architecture
The model is based on NVIDIA DetectNet_v2 detector with ResNet18 as a feature extractor. This architecture, also known as GridBox object detection, uses bounding-box regression on a uniform grid on the input image. Gridbox system divides an input image into a grid which predicts four normalized bounding-box parameters (xc, yc, w, h) and confidence value per output class.

The raw normalized bounding-box and confidence detections needs to be post-processed by a clustering algorithm such as DBSCAN or NMS to produce final bounding-box coordinates and category labels.

### Input
Grayscale Image whose values in RGB channels are the same. 736 X 416 X 3 Channel Ordering of the Input: NCHW, where N = Batch Size, C = number of channels (3), H = Height of images (416), W = Width of the images (736) Input scale: 1/255.0 Mean subtraction: None

### Triton Ensemble Model Example

```sh
model_repository
    ├── facedetect
    │   ├── 1
    │   │   ├── model.trt
    │   └── config.pbtxt
    ├── facedetect_ensemble
    │   ├── 1
    │   └── config.pbtxt
    ├── facedetect_postprocess
    │   ├── 1
    │   │   ├── model.py
    │   │   └── postprocessing
    │   │       ├── clustering_config_facedetect.prototxt
    │   │       ├── facenet_postprocessor.py
    │   │       ├── kitti.py
    │   │       ├── postprocessor_config_pb2.py
    │   │       ├── postprocessor_config.proto
    │   │       ├── postprocessor.py
    │   │       ├── preprocess_input.py
    │   │       ├── types
    │   │       │   ├── annotation.py
    │   │       │   ├── frame.py
    │   │       │   ├── __init__.py
    │   │       │   └── user_data.py
    │   │       └── utils.py
    │   └── config.pbtxt
    ├── facedetect_preprocess
    │   ├── 1
    │   │   ├── model.dali
    │   └── config.pbtxt
```

The example below will be able to handle client submissions using the acceleration provided by DALI as a preprocessing step to ensure the inputs to the FaceDetect model are appropriate every time and handled as efficiently as possible. 



Example `config.pbtxt` for the Dali Preprocessing model:

```sh
name: "facenet_preprocess"
backend: "dali"
default_model_filename: "model.dali"
max_batch_size: 32
dynamic_batching {
  preferred_batch_size: [ 1, 16, 32 ]
  max_queue_delay_microseconds: 500
}

instance_group [
  {
    count: 1
    kind: KIND_GPU
  }
]
  
input [
    {
      name: "input_image_data"
      data_type: TYPE_UINT8
      dims: [ -1 ]
    }
]
 
output [
    {
      name: "input_1"
      data_type: TYPE_FP32
      dims: [ 3, 416, 736 ]
    },
    {
      name: "true_image_size"
      data_type: TYPE_INT64
      dims: [ 3 ]
    }
]

parameters {
    key: "model_description"
    value: {
        string_value: "Reshapes a full size image. Grayscale Image whose values in RGB channels are the same. 736 X 416 X 3 Channel Ordering of the Input: NCHW, where N = Batch Size, C = number of channels (3), H = Height of images (416), W = Width of the images (736) Input scale: 1/255.0 Mean subtraction: None"
    }
}
parameters {        
    key: "license"
    value: {
        string_value: "Apache 2.0 license: https://www.apache.org/licenses/LICENSE-2.0"
    }
}

parameters: [
  {
    key: "num_threads"
    value: { string_value: "8" }
  }
]
```

In [None]:
SAVE_AS = "/workspace/triton_models/facenet_preprocess/1/model.dali"


class FacenetPipeline:
    """Grayscale Image whose values in RGB channels are the same. 736 X 416 X 3
    Channel Ordering of the Input: NCHW, where N = Batch Size, C = number of
    channels (3), H = Height of images (416), W = Width of the images (736)
    Input scale: 1/255.0 Mean subtraction: None
    https://catalog.ngc.nvidia.com/orgs/nvidia/teams/tao/models/facenet
    """

    def __init__(self):
        self.raw_image_tensor = fn.external_source(name="input_image_data")
        self.shapes = fn.peek_image_shape(self.raw_image_tensor)
        self.one_over_255 = 1 / 255.0

    def load_images(self):
        self.image_tensor = fn.decoders.image(
            self.raw_image_tensor, output_type=types.GRAY, device="mixed"
        )

    def color_space_conversion(self):
        self.image_tensor = fn.color_space_conversion(
            self.image_tensor, image_type=types.GRAY, output_type=types.RGB
        )

    def resize_images(self):
        self.image_tensor = fn.resize(
            self.image_tensor,
            resize_x=736,
            resize_y=416,
            interp_type=types.DALIInterpType.INTERP_LANCZOS3,
        )

    def transpose_images(self):
        self.image_tensor = fn.transpose(self.image_tensor, perm=[2, 0, 1])

    @pipeline_def(batch_size=32, num_threads=8)
    def facenet_reshape(self):
        self.load_images()
        self.color_space_conversion()
        self.resize_images()
        self.transpose_images()

        return self.image_tensor * self.one_over_255, self.shapes


facenet_pipeline = FacenetPipeline()
# NOT RUN
# _ = facenet_pipeline.facenet_reshape().serialize(filename=SAVE_AS)

___