In [None]:
import os
import time

import json

import torch
from random import shuffle
import numpy as np
import cupy as cp
from nvidia.dali.backend import TensorListGPU
from nvidia.dali import pipeline_def, Pipeline
import nvidia.dali.types as types
import nvidia.dali.fn as fn

import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt

import sil

BATCH_SIZE = 4 # batch size per GPU
IMAGENET_MAX_SIZE = 15737107 # max size of images in the imagenet dataset

In [None]:
#https://docs.nvidia.com/deeplearning/dali/user-guide/docs/examples/image_processing/decoder_examples.html
def show_images(image_batch):
    columns = 4
    rows = BATCH_SIZE // (columns)
    fig = plt.figure(figsize=(32, (32 // columns) * rows))
    gs = gridspec.GridSpec(rows, columns)
    for j in range(rows * columns):
        plt.subplot(gs[j])
        plt.axis("off")
        plt.imshow(image_batch.at(j))

def show_pipeline_output(pipe):
    pipe.build()
    images, _ = pipe.run()
    if isinstance(images, TensorListGPU):
        images = images.as_cpu()
    show_images(images)

In [None]:
@pipeline_def
def dali_pipe(data_dir):
    jpegs, labels = fn.readers.file(file_root=data_dir,
            shard_id=0,
            num_shards=1,
            random_shuffle=True,
            pad_last_batch=True,
            name="FILE"
        )
    
    images = fn.decoders.image(jpegs,
        device="mixed",
        output_type=types.RGB,
        device_memory_padding=211025920,
        host_memory_padding=140544512,
    )
    return images, labels

In [None]:
class SILInputIterator(object):
    def __init__(self, device, data_dir, batch_size, backend, mnt="", gpu_nqueues=6, queue_depth=1024):
        self.n = sil.init(device, data_dir = data_dir, backend = backend, batch_size = batch_size, gpu_nqueues = gpu_nqueues, queue_depth = queue_depth, mnt = mnt)
        self.batch_size = batch_size
        
    def __iter__(self):
        self.i = 0
        return self

    def __next__(self):
        if self.i >= self.n:
            self.__iter__()
            raise StopIteration

        batch = []
        labels = []
        arr, lab = sil.next()
        for i in range(len(arr)):
            batch.append(cp.ndarray(shape=arr[i].shape, dtype=arr[i].dtype, memptr=cp.cuda.MemoryPointer(cp.cuda.UnownedMemory(arr[i].ctypes.data, len(arr[i]), self), 0)))
            labels.append(torch.tensor([lab[i]], dtype=torch.int32))
        self.i += self.batch_size

        return batch, labels

    def __len__(self):
        return self.n
    
    def __del__(self):
        sil.term()

    next = __next__
    
@pipeline_def
def libnvm_cpu_pipe(data_dir):
    pipe = Pipeline.current()
    batch_size = pipe.max_batch_size

    jpegs, labels = fn.external_source(
        source=SILInputIterator(device="/dev/libnvm0", backend = "libnvm-cpu", data_dir=data_dir, batch_size=batch_size, queue_depth=63), num_outputs=2, dtype=[types.UINT8, types.INT32]
    )
    
    images = fn.decoders.image(jpegs,
        device="cpu",
        output_type=types.RGB,
        device_memory_padding=211025920,
        host_memory_padding=140544512,
    )
    return images, labels

@pipeline_def
def libnvm_gpu_pipe(data_dir):
    pipe = Pipeline.current()
    batch_size = pipe.max_batch_size

    jpegs, labels = fn.external_source(
        source=SILInputIterator(device="/dev/libnvm0", backend = "libnvm-gpu", data_dir=data_dir, batch_size=batch_size), num_outputs=2, dtype=[types.UINT8, types.INT32]
    )
    images = fn.decoders.image(jpegs,
        device="mixed",
        output_type=types.RGB,
        device_memory_padding=211025920,
        host_memory_padding=140544512,
    )
    return images, labels

@pipeline_def
def gds_pipe(data_dir):
    pipe = Pipeline.current()
    batch_size = pipe.max_batch_size

    jpegs, labels = fn.external_source(
        source=SILInputIterator(device="/dev/nvme4n1", backend = "gds", mnt="/data", data_dir=data_dir, batch_size=batch_size), num_outputs=2, dtype=[types.UINT8, types.INT32]
    )
    images = fn.decoders.image(jpegs,
        device="mixed",
        output_type=types.RGB,
        device_memory_padding=211025920,
        host_memory_padding=140544512,
    )
    return images, labels

@pipeline_def
def posix_pipe(data_dir):
    pipe = Pipeline.current()
    batch_size = pipe.max_batch_size

    jpegs, labels = fn.external_source(
        source=SILInputIterator(device="/dev/nvme4n1", backend = "posix", mnt="/data", data_dir=data_dir, batch_size=batch_size), num_outputs=2, dtype=[types.UINT8, types.INT32]
    )
    images = fn.decoders.image(jpegs,
        device="mixed",
        output_type=types.RGB,
        device_memory_padding=211025920,
        host_memory_padding=140544512,
    )
    return images, labels

In [None]:
with gds_pipe(data_dir="train", batch_size=BATCH_SIZE, num_threads=1, device_id=0) as pipe:
    show_pipeline_output(pipe)
    sil.term()

In [None]:
with posix_pipe(data_dir="train", batch_size=BATCH_SIZE, num_threads=1, device_id=0) as pipe:
    show_pipeline_output(pipe)
    sil.term()

In [None]:
with libnvm_gpu_pipe(data_dir="train", batch_size=BATCH_SIZE, num_threads=1, device_id=0) as pipe:
    show_pipeline_output(pipe)
    sil.term()

In [None]:
with libnvm_cpu_pipe(data_dir="train", batch_size=BATCH_SIZE, num_threads=1, device_id=0) as pipe:
    show_pipeline_output(pipe)
    sil.term()