In [None]:
import os
import time

import json

import torch
from random import shuffle
import numpy as np
import cupy as cp
from nvidia.dali.backend import TensorListGPU
from nvidia.dali import pipeline_def, Pipeline
import nvidia.dali.types as types
import nvidia.dali.fn as fn

import matplotlib.gridspec as gridspec
import matplotlib.pyplot as plt

from ctypes import CFUNCTYPE, POINTER, c_uint8, c_void_p, byref, pointer, cast, c_uint32
import xnvme.ctypes_bindings as xnvme
from xnvme.ctypes_bindings.api import char_pointer_cast


DATADIR = "/data"
BMAP = "imagenet_train_bmap.json"
BATCH_SIZE = 4 # batch size per GPU
IMAGENET_MAX_SIZE = 15737107 # max size of images in the imagenet dataset
GDS_QD = 63
NLB = 7

In [None]:
#https://docs.nvidia.com/deeplearning/dali/user-guide/docs/examples/image_processing/decoder_examples.html
def show_images(image_batch):
    columns = 4
    rows = BATCH_SIZE // (columns)
    fig = plt.figure(figsize=(32, (32 // columns) * rows))
    gs = gridspec.GridSpec(rows, columns)
    for j in range(rows * columns):
        plt.subplot(gs[j])
        plt.axis("off")
        plt.imshow(image_batch.at(j))

def show_pipeline_output(pipe):
    pipe.build()
    images, _ = pipe.run()
    if isinstance(images, TensorListGPU):
        images = images.as_cpu()
    show_images(images)

In [None]:
@pipeline_def
def dali_pipe(datadir):
    traindir = os.path.join(datadir, "train")
    jpegs, labels = fn.readers.file(file_root=traindir,
            shard_id=0,
            num_shards=1,
            random_shuffle=True,
            pad_last_batch=True,
            name="FILE"
        )
    
    images = fn.decoders.image(jpegs,
        device="mixed",
        output_type=types.RGB,
        device_memory_padding=211025920,
        host_memory_padding=140544512,
    )
    return images, labels

In [None]:
# Based on https://github.com/NVIDIA/DALI/blob/main/docs/examples/frameworks/pytorch/pytorch-external_input.ipynb
class FileInputIterator(object):
    # This is essentially fn.readers.file reimplemented as an external source
    def __init__(self, datadir, batch_size):
        self.images_dir = os.path.join(datadir, "train")
        self.batch_size = batch_size
        dirs = {os.path.join(self.images_dir, d): i for i, d in enumerate(sorted(os.listdir(self.images_dir)))}
        self.files = [(os.path.join(root, file), dirs[root]) for root, _, files in os.walk(self.images_dir) for file in files]
        self.n = len(self.files)

    def __iter__(self):
        self.i = 0
        shuffle(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []

        if self.i >= self.n:
            self.__iter__()
            raise StopIteration

        for _ in range(self.batch_size):
            jpeg, label = self.files[self.i % self.n]

            batch.append(
                np.fromfile(jpeg, dtype=np.uint8)
            )

            labels.append(
                torch.tensor([label], dtype=torch.int32)
            )

            self.i += 1
        return (batch, labels)

    def __len__(self):
        return self.n

    next = __next__
    
@pipeline_def
def file_pipe(datadir):
    pipe = Pipeline.current()
    batch_size = pipe.max_batch_size

    jpegs, labels = fn.external_source(
        source=FileInputIterator(datadir=datadir, batch_size=batch_size), num_outputs=2, dtype=[types.UINT8, types.INT32]
    )
    images = fn.decoders.image(jpegs,
        device="mixed",
        output_type=types.RGB,
        device_memory_padding=211025920,
        host_memory_padding=140544512,
    )
    return images, labels

In [None]:
class XNVMEFileInputIterator(object):
    # This is implemented using the xNVMe file API
    def __init__(self, datadir, batch_size):
        self.images_dir = os.path.join(datadir, "train")
        self.batch_size = batch_size
        dirs = {os.path.join(self.images_dir, d): i for i, d in enumerate(sorted(os.listdir(self.images_dir)))}
        self.files = [(os.path.join(root, file), dirs[root]) for root, _, files in os.walk(self.images_dir) for file in files]
        opts = xnvme.xnvme_opts()
        xnvme.xnvme_opts_set_defaults(byref(opts))
        # Make tmp file to link buffers to
        file_path = "/tmp/xnvme_file"
        if not os.path.exists(file_path):
            with open(file_path, 'w') as file:
                file.write("tmp")
        self.dev = xnvme.xnvme_file_open(char_pointer_cast(file_path), byref(opts))
        size = IMAGENET_MAX_SIZE
        self.buffers = (c_void_p * self.batch_size)()
        self.views = []
        for i in range(self.batch_size):
            buf = xnvme.xnvme_buf_alloc(self.dev, size)
            view = np.ctypeslib.as_array(
                    cast(buf, POINTER(c_uint8)),
                    shape=(size,),
            )
            self.buffers[i] = cast(buf, c_void_p)
            self.views.append(view)

        self.n = len(self.files)

    def __iter__(self):
        self.i = 0
        shuffle(self.files)
        return self

    def __next__(self):
        batch = []
        labels = []

        if self.i >= self.n:
            self.__iter__()
            raise StopIteration
        
        opts = xnvme.xnvme_opts()
        xnvme.xnvme_opts_set_defaults(byref(opts))
        for i in range(self.batch_size):
            jpeg, label = self.files[self.i % self.n]
                
            file = xnvme.xnvme_file_open(char_pointer_cast(jpeg), byref(opts))
            size = xnvme.xnvme_dev_get_geo(file).contents.tbytes
            ctx = xnvme.xnvme_file_get_cmd_ctx(file)
            err = xnvme.xnvme_file_pread(byref(ctx), self.buffers[i], size, 0)
            if err:
                raise IOError(f"Err: {err}")
            
            batch.append(
                self.views[i][:size]    
            )
           
            labels.append(
                torch.tensor([label], dtype=torch.int32)
            )
            xnvme.xnvme_file_close(file)
            
            self.i += 1
        
        return (batch, labels)

    def __len__(self):
        return self.n

    def __del__(self):
        for i in range(self.batch_size):
            xnvme.xnvme_buf_free(self.dev, self.buffers[i])
        xnvme.xnvme_file_close(self.dev)

    next = __next__

@pipeline_def
def xnvme_file_pipe(datadir):
    pipe = Pipeline.current()
    batch_size = pipe.max_batch_size

    jpegs, labels = fn.external_source(
        source=XNVMEFileInputIterator(datadir=datadir, batch_size=batch_size), num_outputs=2, dtype=[types.UINT8, types.INT32],
    )
    images = fn.decoders.image(jpegs,
        device="mixed",
        output_type=types.RGB,
        device_memory_padding=211025920,
        host_memory_padding=140544512,
    )
    return images, labels

In [None]:
class XNVMECPUInputIterator(object):
    # This is implemented using the xNVMe GDS-like backend with CPU buffers
    def __init__(self, dev, bmap, batch_size):
        with open(bmap) as f:
            d = json.load(f)
            dirs = sorted(list(set([k.split("/")[1] for k in d.keys()])))
            for k in d.keys():
                d[k]["label"] = dirs.index(k.split("/")[1])
            self.files = list(d.values())
        
        self.dev = dev
        self.batch_size = batch_size
        self.nsid = xnvme.xnvme_dev_get_nsid(self.dev)
        geo = xnvme.xnvme_dev_get_geo(self.dev)
        self.nlb = NLB
        self.nbytes = geo.contents.nbytes
        size = IMAGENET_MAX_SIZE 
        self.qd = GDS_QD
        self.buffers = (c_void_p * self.batch_size)()
        self.views = []
        for i in range(self.batch_size):
            buf = xnvme.xnvme_buf_alloc(dev, size)
            view = np.ctypeslib.as_array(
                    cast(buf, POINTER(c_uint8)),
                    shape=(size,),
            )
            self.buffers[i] = cast(buf, c_void_p)
            self.views.append(view)
        self.queue = POINTER(xnvme.xnvme_queue)()
        err = xnvme.xnvme_queue_init(self.dev, self.qd, 0, byref(self.queue))
        if err:
            raise RuntimeError(f"Failed to init queue: {err}")
        
        self.n = len(self.files)


    def __iter__(self):
        self.i = 0
        shuffle(self.files)
        return self

    def __next__(self):
        labels = []

        if self.i >= self.n:
            self.__iter__()
            raise StopIteration

        slbas = (c_uint32 * self.batch_size)()
        elbas = (c_uint32 * self.batch_size)()

        for i in range(self.batch_size):
            file = self.files[self.i % self.n]
            labels.append(
                torch.tensor([file["label"]], dtype=torch.int32)
            )
            slbas[i] = c_uint32(file["startblock"])
            elbas[i] = c_uint32(file["endblock"])
            self.i += 1
        
        err = xnvme.xnvme_io_range_submit(self.queue, xnvme.XNVME_SPEC_NVM_OPC_READ, slbas, elbas, self.nlb, self.nbytes, self.buffers, self.batch_size)
        if err:
            raise IOError(f"Err: {err}")

        batch = [self.views[i][:((elbas[i] - slbas[i])+ 1) * self.nbytes] for i in range(self.batch_size)]
        return (batch, labels)

    def __len__(self):
        return self.n
    
    def __del__(self):
        for i in range(self.batch_size):
            xnvme.xnvme_buf_free(self.dev, self.buffers[i])
        xnvme.xnvme_queue_term(self.queue)

    next = __next__

@pipeline_def
def xnvme_gds_cpu_pipe(dev, bmap):
    pipe = Pipeline.current()
    batch_size = pipe.max_batch_size

    jpegs, labels = fn.external_source(
        source=XNVMECPUInputIterator(dev=dev, bmap=bmap, batch_size=batch_size), num_outputs=2, dtype=[types.UINT8, types.INT32]
    )
    images = fn.decoders.image(jpegs,
        device="mixed",
        output_type=types.RGB,
        device_memory_padding=211025920,
        host_memory_padding=140544512,
    )
    return images, labels

In [None]:
class XNVMEGPUInputIterator(object):
    # This is implemented using the xNVMe GDS-like backend with GPU buffers
    def __init__(self, dev, bmap, batch_size):
        with open(bmap) as f:
            d = json.load(f)
            dirs = sorted(list(set([k.split("/")[1] for k in d.keys()])))
            for k in d.keys():
                d[k]["label"] = dirs.index(k.split("/")[1])
            self.files = list(d.values())
        
        self.dev = dev
        self.batch_size = batch_size
        self.nsid = xnvme.xnvme_dev_get_nsid(self.dev)
        geo = xnvme.xnvme_dev_get_geo(self.dev)
        self.nlb = NLB
        self.nbytes = geo.contents.nbytes
        size = IMAGENET_MAX_SIZE
        self.qd = GDS_QD
        self.buffers = (c_void_p * self.batch_size)()
        self.views = []
        for i in range(self.batch_size):
            buf = xnvme.xnvme_buf_alloc(dev, size)
            self.buffers[i] = cast(buf, c_void_p)
            view = cp.ndarray(shape=(size,), dtype=np.uint8, memptr=cp.cuda.MemoryPointer(cp.cuda.UnownedMemory(buf, size, dev), 0))
            self.views.append(view)
        self.queue = POINTER(xnvme.xnvme_queue)()
        err = xnvme.xnvme_queue_init(self.dev, self.qd, 0, byref(self.queue))
        if err:
            raise RuntimeError(f"Failed to init queue: {err}")
        
        self.n = len(self.files)


    def __iter__(self):
        self.i = 0
        shuffle(self.files)
        return self

    def __next__(self):
        labels = []

        if self.i >= self.n:
            self.__iter__()
            raise StopIteration

        slbas = (c_uint32 * self.batch_size)()
        elbas = (c_uint32 * self.batch_size)()

        for i in range(self.batch_size):
            file = self.files[self.i % self.n]
            labels.append(
                torch.tensor([file["label"]], dtype=torch.int32)
            )
            slbas[i] = c_uint32(file["startblock"])
            elbas[i] = c_uint32(file["endblock"])
            self.i += 1
        
        err = xnvme.xnvme_io_range_submit(self.queue, xnvme.XNVME_SPEC_NVM_OPC_READ, slbas, elbas, self.nlb, self.nbytes, self.buffers, self.batch_size)
        if err:
            raise IOError(f"Err: {err}")

        batch = [self.views[i][:((elbas[i] - slbas[i])+ 1) * self.nbytes] for i in range(self.batch_size)]
        return (batch, labels)

    def __len__(self):
        return self.n
    
    def __del__(self):
        for i in range(self.batch_size):
            xnvme.xnvme_buf_free(self.dev, self.buffers[i])
        xnvme.xnvme_queue_term(self.queue)

    next = __next__

@pipeline_def
def xnvme_gds_gpu_pipe(dev, bmap):
    pipe = Pipeline.current()
    batch_size = pipe.max_batch_size

    jpegs, labels = fn.external_source(
        source=XNVMEGPUInputIterator(dev=dev, bmap=bmap, batch_size=batch_size), num_outputs=2, dtype=[types.UINT8, types.INT32]
    )
    images = fn.decoders.image(jpegs,
        device="mixed",
        output_type=types.RGB,
        device_memory_padding=211025920,
        host_memory_padding=140544512,
    )
    return images, labels

In [None]:
class XNVMEBAMInputIterator(object):
    # This is implemented using the xNVMe BaM-like backend
    def __init__(self, dev, bmap, batch_size):
        with open(bmap) as f:
            d = json.load(f)
            dirs = sorted(list(set([k.split("/")[1] for k in d.keys()])))
            for k in d.keys():
                d[k]["label"] = dirs.index(k.split("/")[1])
            self.files = list(d.values())
        
        self.dev = dev
        self.batch_size = batch_size
        self.nsid = xnvme.xnvme_dev_get_nsid(self.dev)
        geo = xnvme.xnvme_dev_get_geo(self.dev)
        self.nlb = NLB
        self.nbytes = geo.contents.nbytes
        size = IMAGENET_MAX_SIZE
        buffers = []
        self.views = []
        for i in range(self.batch_size):
            buf = xnvme.xnvme_buf_alloc(dev, size)
            buffers.append(buf)
            view = cp.ndarray(shape=(size,), dtype=np.uint8, memptr=cp.cuda.MemoryPointer(cp.cuda.UnownedMemory(buf, size, dev), 0))
            self.views.append(view)
            
        self.buffers = np.asarray(buffers)
        self.buffers_ref = self.buffers.ctypes.data_as(POINTER(c_void_p))
        self.n = len(self.files)


    def __iter__(self):
        self.i = 0
        shuffle(self.files)
        return self

    def __next__(self):
        labels = []

        if self.i >= self.n:
            self.__iter__()
            raise StopIteration

        slbas = np.zeros([self.batch_size], np.uint32)
        elbas = np.zeros([self.batch_size], np.uint32)

        for i in range(self.batch_size):
            file = self.files[self.i % self.n]
            labels.append(
                torch.tensor([file["label"]], dtype=torch.int32)
            )
            slbas[i] = file["startblock"]
            elbas[i] = file["endblock"]
            
            self.i += 1

        y = 64
        x = 2048        
        err = xnvme.xnvme_kernels_range_submit(x, y, self.dev, xnvme.XNVME_SPEC_NVM_OPC_READ, slbas.ctypes.data_as(POINTER(c_uint32)), elbas.ctypes.data_as(POINTER(c_uint32)), self.nlb, self.nbytes, self.buffers_ref, self.batch_size)
        if err:
            raise IOError(f"Err: {err}")

        batch = [self.views[i][:((elbas[i] - slbas[i])+ 1) * self.nbytes] for i in range(self.batch_size)]
        return (batch, labels)

    def __len__(self):
        return self.n
    
    def __del__(self):
        for i in range(self.batch_size):
            xnvme.xnvme_buf_free(self.dev, int(self.buffers[i]))

    next = __next__
    
@pipeline_def
def xnvme_bam_pipe(dev, bmap):
    pipe = Pipeline.current()
    batch_size = pipe.max_batch_size

    jpegs, labels = fn.external_source(
        source=XNVMEBAMInputIterator(dev=dev, bmap=bmap, batch_size=batch_size), num_outputs=2, dtype=[types.UINT8, types.INT32]
    )
    images = fn.decoders.image(jpegs,
        device="mixed",
        output_type=types.RGB,
        device_memory_padding=211025920,
        host_memory_padding=140544512,
    )
    return images, labels

In [None]:
# The reference using DALI fn.readers.file
with dali_pipe(datadir=DATADIR, batch_size=BATCH_SIZE, num_threads=1, device_id=0) as pipe:
    show_pipeline_output(pipe)

In [None]:
# Replacing with fn.readers.file with external source
with file_pipe(datadir=DATADIR, batch_size=BATCH_SIZE, num_threads=2, device_id=0) as pipe:
    show_pipeline_output(pipe)

In [None]:
# Using xNVMe file with the external source
with xnvme_file_pipe(datadir=DATADIR, batch_size=BATCH_SIZE, num_threads=1, device_id=0) as pipe:
    show_pipeline_output(pipe)

In [None]:
# Using xNVMe with GDS-like backend (CPU buffers)
opts = xnvme.xnvme_opts()
xnvme.xnvme_opts_set_defaults(byref(opts))
opts.be = char_pointer_cast("gds")
opts.mem = char_pointer_cast("cpu")
dev = xnvme.xnvme_dev_open(char_pointer_cast("/dev/libnvm0"), byref(opts))
if not dev:
    raise RuntimeError("xNVMe failed to open device")
xnvme.xnvme_dev_derive_geo(dev)
with xnvme_gds_cpu_pipe(dev=dev, bmap=BMAP, batch_size=BATCH_SIZE, num_threads=1, device_id=0) as pipe:
    show_pipeline_output(pipe)
xnvme.xnvme_dev_close(dev)

In [None]:
# Using xNVMe with GDS-like backend (GPU buffers)
opts = xnvme.xnvme_opts()
xnvme.xnvme_opts_set_defaults(byref(opts))
opts.be = char_pointer_cast("gds")
opts.mem = char_pointer_cast("gpu")
dev = xnvme.xnvme_dev_open(char_pointer_cast("/dev/libnvm0"), byref(opts))
if not dev:
    raise RuntimeError("xNVMe failed to open device")
xnvme.xnvme_dev_derive_geo(dev)
with xnvme_gds_gpu_pipe(dev=dev, bmap=BMAP, batch_size=BATCH_SIZE, num_threads=1, device_id=0) as pipe:
    show_pipeline_output(pipe)
xnvme.xnvme_dev_close(dev)

In [None]:
# Using xNVMe with BaM-like backend
opts = xnvme.xnvme_opts()
xnvme.xnvme_opts_set_defaults(byref(opts))
opts.be = char_pointer_cast("bam")
dev = xnvme.xnvme_dev_open(char_pointer_cast("/dev/libnvm0"), byref(opts))
if not dev:
    raise RuntimeError("xNVMe failed to open device")
xnvme.xnvme_dev_derive_geo(dev)
with xnvme_bam_pipe(dev=dev, bmap=BMAP, batch_size=BATCH_SIZE, num_threads=1, device_id=0) as pipe:
     show_pipeline_output(pipe)
xnvme.xnvme_dev_close(dev)