Basic test for Nvidia DALI

# Import

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import numpy as np
from PIL import Image
from pathlib import Path
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils, datasets
import torch.optim as optim
from IPython.core.debugger import set_trace
import pandas as pd
import lmdb
import pickle
import h5py
from random import shuffle
from nvidia.dali.pipeline import Pipeline
import nvidia.dali.ops as ops
import nvidia.dali.types as types
from nvidia.dali.plugin.pytorch import DALIGenericIterator

In [3]:
import warnings
warnings.filterwarnings('ignore')

# Config

In [4]:
path_data = Path('data')

In [5]:
!ls {path_data}

cifar-10-batches-py  medium_imgs  small_hdf5  small_lmdb
medium_hdf5	     medium_lmdb  small_imgs


# Networks

In [6]:
class FastNet(nn.Module):
    # Just do a single convolution followed by a linear layer
    # Made to be simple to emphasize affect of image loading
    # and augmentation
    def __init__(self, num_cl):
        super(FastNet, self).__init__()
        self.conv = nn.Conv2d(3, 64, 3, stride=2)
        self.fc = nn.Linear(64, num_cl)
        
    def forward(self, x):
        x = self.conv(x)
        x = F.adaptive_avg_pool2d(x, 1)
        x = torch.flatten(x, 1)
        x = self.fc(x)        
        return x

# Pipelines

In [7]:
# Image folder no augmentation pipeline
class ImageNaPipeline(Pipeline):
    def __init__(self, path_imgs, num_batch, num_threads, device_id):
        super(ImageNaPipeline, self).__init__(num_batch, num_threads, device_id)
        # Use FileReader to read images and get labels from class folder  
        self.input = ops.FileReader(file_root=path_imgs, 
                                    random_shuffle=True)
        
        # Attempt to decode on gpu
        self.decode = ops.ImageDecoder(device='mixed', 
                                       output_type=types.RGB)
        
        # This has the important action of transposing channel from last to 2nd dimension
        # Note that normalization is done WRT raw uint8 values (so they are [0, 255])
        self.cmnp = ops.CropMirrorNormalize(device='gpu',
                                            output_dtype=types.FLOAT,
                                            output_layout=types.NCHW,
                                            image_type=types.RGB,
                                            mean=[0.5 * 255,0.5 * 255,0.5 * 255],
                                            std= [0.5 * 255,0.5 * 255,0.5 * 255])

    def define_graph(self):
        imgs, labels = self.input() # Automagically gets labels from class folder
        imgs = self.decode(imgs)    # Images are decoded on gpu (if jpg)
        imgs = self.cmnp(imgs)      # This also converts from NHWC -> NCHW
        return (imgs, labels)

In [8]:
# Image folder with augmentation pipeline
class ImageAugPipeline(Pipeline):
    def __init__(self, path_imgs, num_batch, num_threads, device_id):
        super(ImageAugPipeline, self).__init__(num_batch, num_threads, device_id)
        # Use FileReader to read images and get labels from class folder  
        self.input = ops.FileReader(file_root=path_imgs, 
                                    random_shuffle=True)
        
        # Attempt to decode on gpu
        self.decode = ops.ImageDecoder(device='mixed', 
                                       output_type=types.RGB)
        
        
        # Random rotation
        self.rng = ops.Uniform(range=(-30.0, 30.0))
        self.rotate = ops.Rotate(device='gpu',
                                 fill_value=0.0,
                                 keep_size=True)
        
        # This has the important action of transposing channel from last to 2nd dimension
        # Note that normalization is done WRT raw uint8 values (so they are [0, 255])
        self.cmnp = ops.CropMirrorNormalize(device='gpu',
                                            output_dtype=types.FLOAT,
                                            output_layout=types.NCHW,
                                            image_type=types.RGB,
                                            mean=[0.5 * 255,0.5 * 255,0.5 * 255],
                                            std= [0.5 * 255,0.5 * 255,0.5 * 255])

    def define_graph(self):
        imgs, labels = self.input()          # Automagically gets labels from class folder
        imgs = self.decode(imgs)             # Images are decoded on gpu (if jpg)
        imgs = self.rotate(imgs, 
                           angle=self.rng()) # Rotation on gpu
        imgs = self.cmnp(imgs)               # This also converts from NHWC -> NCHW
        return (imgs, labels)

In [9]:
class ExternalInputIterator(object):
    def __init__(self, path_root, num_batch, subsamples=None):
        self.path_root = path_root
        self.num_batch = num_batch
        self.subsamples = subsamples
        self.samples = self._get_samples()
        
    def _get_samples(self):
        samples = [(p, int(p.parent.stem)) for p in self.path_root.glob('*/*')]
        
        if self.subsamples is not None:
            samples = [samples[i] for i in np.random.choice(len(samples), self.subsamples, replace=False)]
            
        return samples
    
    def __iter__(self):
        self.idx = 0
        shuffle(self.samples)
        return self

    def __next__(self):
        if self.idx >= len(self.samples):
            raise StopIteration
        
        batch = []
        labels = []
        for _ in range(self.num_batch):
            file_img, label = self.samples[self.idx]
            with open(file_img, 'rb') as f:
                batch.append(np.frombuffer(f.read(), dtype=np.uint8))
            labels.append(np.array([label], dtype=np.uint8))
            self.idx = (self.idx + 1)%len(self.samples)
        return (batch, labels)
    
    next = __next__
    
    
class ExternalSourcePipeline(Pipeline):
    def __init__(self, external_data, num_batch, num_threads, device_id):
        super(ExternalSourcePipeline, self).__init__(num_batch,
                                                     num_threads,
                                                     device_id,
                                                     seed=12)
        self.external_data = external_data
        self.es_img = ops.ExternalSource()
        self.es_label = ops.ExternalSource()
        
        self.decode = ops.ImageDecoder(device='mixed',
                                       output_type=types.RGB)
        
        # This has the important action of transposing channel from last to 2nd dimension
        # Note that normalization is done WRT raw uint8 values (so they are [0, 255])
        self.cmnp = ops.CropMirrorNormalize(device='gpu',
                                            output_dtype=types.FLOAT,
                                            output_layout=types.NCHW,
                                            image_type=types.RGB,
                                            mean=[0.5 * 255,0.5 * 255,0.5 * 255],
                                            std= [0.5 * 255,0.5 * 255,0.5 * 255])
        
        # Initialize iterator
        self.iterator = iter(self.external_data)
        
    def define_graph(self):
        self.imgs = self.es_img()
        self.labels = self.es_label()
        imgs = self.decode(self.imgs)
        imgs = self.cmnp(imgs)
        return (imgs, self.labels)

    def iter_setup(self):        
        try:
            (imgs, labels) = self.iterator.next()
            self.feed_input(self.imgs, imgs)
            self.feed_input(self.labels, labels)
        except StopIteration:
            self.iterator = iter(self.external_data)
            raise StopIteration

# Losses

In [10]:
loss = nn.CrossEntropyLoss()

# Train

In [11]:
def train(di, model, loss, opt, num_epochs):
    for epoch in range(num_epochs):
        for i, data in enumerate(di):
            X = data[0]["data"]
            y = data[0]["label"].squeeze().cuda().long()
                        
            opt.zero_grad()    # Zero gradients
            y_hat = model(X)   # Forward pass
            l = loss(y_hat, y) # Loss
            l.backward()       # Compute gradients
            opt.step()         # Step
            
        # print statistics
        print(f'Epoch: {epoch}; Loss: {l.item()}')
        
        # Reset iterator
        di.reset()

# Test Images

### Small size

In [12]:
path_imgs=path_data/'small_imgs'/'png'
num_batch=256
num_threads=12
device_id=0 
num_samples=40000

In [13]:
pipe = ImageNaPipeline(path_imgs=path_imgs,
                       num_batch=num_batch,
                       num_threads=num_threads,
                       device_id=device_id)

pipe.build()
di = DALIGenericIterator(pipe, ['data', 'label'], num_samples)

model = FastNet(10).cuda()
opt = optim.SGD(model.parameters(), lr=0.001)
%time train(di, model, loss, opt, 1)

Epoch: 0; Loss: 2.325206756591797
CPU times: user 2.95 s, sys: 859 ms, total: 3.81 s
Wall time: 765 ms


Very fast; add dali-style augmentation

In [14]:
pipe = ImageAugPipeline(path_imgs=path_imgs,
                        num_batch=num_batch,
                        num_threads=num_threads,
                        device_id=device_id)

pipe.build()
di = DALIGenericIterator(pipe, ['data', 'label'], num_samples)

model = FastNet(10).cuda()
opt = optim.SGD(model.parameters(), lr=0.001)
%time train(di, model, loss, opt, 1)

Epoch: 0; Loss: 2.3065807819366455
CPU times: user 2.74 s, sys: 761 ms, total: 3.5 s
Wall time: 691 ms


Try external source pipeline

In [15]:
pipe = ExternalSourcePipeline(external_data=ExternalInputIterator(path_imgs, num_batch, num_samples),
                              num_batch=num_batch, 
                              num_threads=num_threads, 
                              device_id=device_id)

pipe.build()
di = DALIGenericIterator(pipe, 
                         ['data', 'label'], 
                         num_samples,
                         fill_last_batch=False,
                         last_batch_padded=True)

model = FastNet(10).cuda()
opt = optim.SGD(model.parameters(), lr=0.001)
%time train(di, model, loss, opt, 1)

Epoch: 0; Loss: 2.279061794281006
CPU times: user 2.73 s, sys: 630 ms, total: 3.36 s
Wall time: 1.06 s


### Medium size

In [16]:
path_imgs=path_data/'medium_imgs'/'png'
num_batch=64
num_threads=12
device_id=0 
num_samples=4000

In [17]:
pipe = ImageNaPipeline(path_imgs=path_imgs,
                       num_batch=num_batch,
                       num_threads=num_threads,
                       device_id=device_id)

pipe.build()
di = DALIGenericIterator(pipe, ['data', 'label'], num_samples)

model = FastNet(10).cuda()
opt = optim.SGD(model.parameters(), lr=0.001)
%time train(di, model, loss, opt, 1)

Epoch: 0; Loss: 2.311649799346924
CPU times: user 4.17 s, sys: 305 ms, total: 4.48 s
Wall time: 895 ms


Incredibly fast

In [18]:
pipe = ImageAugPipeline(path_imgs=path_imgs,
                        num_batch=num_batch,
                        num_threads=num_threads,
                        device_id=device_id)

pipe.build()
di = DALIGenericIterator(pipe, ['data', 'label'], num_samples)

model = FastNet(10).cuda()
opt = optim.SGD(model.parameters(), lr=0.001)
%time train(di, model, loss, opt, 1)

Epoch: 0; Loss: 2.3008930683135986
CPU times: user 4.13 s, sys: 296 ms, total: 4.43 s
Wall time: 861 ms


In [19]:
pipe = ExternalSourcePipeline(external_data=ExternalInputIterator(path_imgs, num_batch, num_samples),
                              num_batch=num_batch, 
                              num_threads=num_threads, 
                              device_id=device_id)

pipe.build()
di = DALIGenericIterator(pipe, ['data', 'label'], num_samples)

model = FastNet(10).cuda()
opt = optim.SGD(model.parameters(), lr=0.001)
%time train(di, model, loss, opt, 1)

Epoch: 0; Loss: 2.2975008487701416
CPU times: user 6.86 s, sys: 861 ms, total: 7.72 s
Wall time: 2.68 s


# Test