In [1]:
import os, sys, math, io
import numpy as np
import pandas as pd
import multiprocessing as mp
import bson
import struct
from PIL import Image
import time
import shutil

# %matplotlib inline
import matplotlib.pyplot as plt
from multiprocessing import Pool
import time

In [2]:
import torch
import torch.nn as nn
from torch.nn import init
from torch.autograd import Variable
import torchvision
import torchvision.transforms as T
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.utils.data import sampler
from torch.utils.data import Dataset

pytorch Dataloader is not thread safe due to the random access to the bson file on disk and will result in errors if the read if not finished and next read request comes. To use the multiprocessing, instead of putting the indices into the queue, and every worker processes the read bson action simutaneously, the new Dataloader just decode the bson bytes to images and use a pool of workers to transform the images to torch Tensors.

In [3]:
class BSONIterator(Dataset):
    def __init__(self, bson_file, images_df, offsets_df, transform, train=True):
        super(BSONIterator, self).__init__()
        self.file = bson_file
        self.images_df = images_df
        self.offsets_df = offsets_df
        self.transform = transform
        self.train = train

    def __getitem__(self, idx):
        #decode bson bytes to images
        image_row = self.images_df.iloc[idx]
        product_id = image_row["product_id"]
        offset_row = self.offsets_df.loc[product_id]
        # Random access this product's data from the BSON file.
        self.file.seek(offset_row["offset"])
        item_data = self.file.read(offset_row["length"])
        item = bson.BSON.decode(item_data)
        img_idx = image_row["img_idx"]
        return item["imgs"][img_idx]["picture"], image_row["category_idx"]
    
    def __len__(self):
        return len(self.images_df)

In [4]:
class TensorGenerator():
    
    def __init__(self, transformer):
        self.transformer = transformer
        
    def __call__(self, data):
        #perform the transformation to torch.Tensor
        bson_img = data
        # Load the image.
        image = io.BytesIO(bson_img)
        img = Image.open(image)
        x = self.transformer(img)
        return x
        

In [5]:
class BatchSampler(object):
    """Wraps another sampler to yield a mini-batch of indices.

    Args:
        sampler (Sampler): Base sampler.
        batch_size (int): Size of mini-batch.
        drop_last (bool): If ``True``, the sampler will drop the last batch if
            its size would be less than ``batch_size``

    Example:
        >>> list(BatchSampler(range(10), batch_size=3, drop_last=False))
        [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]]
        >>> list(BatchSampler(range(10), batch_size=3, drop_last=True))
        [[0, 1, 2], [3, 4, 5], [6, 7, 8]]
    """

    def __init__(self, sampler, batch_size, drop_last):
        self.sampler = sampler
        self.batch_size = batch_size
        self.drop_last = drop_last

    def __iter__(self):
        batch = []
        for idx in self.sampler:
            batch.append(idx)
            if len(batch) == self.batch_size:
                yield batch
                batch = []
        if len(batch) > 0 and not self.drop_last:
            yield batch

    def __len__(self):
        if self.drop_last:
            return len(self.sampler) // self.batch_size
        else:
            return (len(self.sampler) + self.batch_size - 1) // self.batch_size


In [6]:
class Dataloader(object):
    
    def __init__(self, batch_size, dataset, generator, sampler, pool, train=True, drop_last=False):
        self.batch_size = batch_size
        self.dataset = dataset
        self.generator = generator
        self.train = train
        self.sampler = sampler
        self.batch_sampler = BatchSampler(sampler, batch_size, drop_last)
        self.drop_last = drop_last
        self.pool = pool
    
    def __iter__(self):
        return DataloaderIter(self)
            
    
    def __len__(self):
        return len(self.dataset)
    
    
class DataloaderIter(object):
    "Iterates once over the DataLoader's dataset, as specified by the sampler"

    def __init__(self, loader):
        self.dataset = loader.dataset
        self.generator = loader.generator
        self.batch_sampler = loader.batch_sampler
        self.sample_iter = iter(self.batch_sampler)
        self.pool = loader.pool
        self.train = loader.train

    def __len__(self):
        return len(self.batch_sampler)

    def __next__(self):
        indices = next(self.sample_iter)  # may raise StopIteration
        image_dataset, labels = [], []
        for idx in indices:
            image_data, label = self.dataset[idx]
            image_dataset.append(image_data)
            labels.append(label)
        #multiprocess the images to torch.Tensor
        res = self.pool.map(self.generator, image_dataset)
        image_tensor = torch.stack(res, dim=0)
        if self.train:
            label_tensor = torch.from_numpy(np.array(labels))
            return image_tensor, label_tensor
        else:
            return image_tensor

    def __iter__(self):
        return self

In [7]:
train_offsets_df = pd.read_csv("train_offsets.csv", index_col=0)
train_images_df = pd.read_csv("train_images_withlevel.csv", index_col=0)
val_images_df = pd.read_csv("val_images_withlevel.csv", index_col=0)

data_dir = "./input/"
file_dir = r'C:\Users\YANG\Downloads\cdiscount'
train_bson_path = os.path.join(data_dir, "train_example.bson")
train_bson_file = open(train_bson_path, "rb")

In [8]:
mean, std = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]
transformer_train = T.Compose([T.RandomHorizontalFlip(), 
                             T.ToTensor(),T.Normalize(mean=mean, std=std)])
transformer_val = T.Compose([T.ToTensor(),T.Normalize(mean=mean, std=std)])

In [9]:
# indexes = list(range(96))
dataset = BSONIterator(train_bson_file, train_images_df, train_offsets_df, 
                         transformer_train)
train_gen = TensorGenerator(transformer=transformer_train)

In [10]:
Ncore = 4
pool = Pool(processes=Ncore)

In [11]:
loader_train = Dataloader(batch_size=96, dataset=dataset, generator=train_gen, sampler=sampler.RandomSampler(dataset),
                          pool=pool, train=True, drop_last=False)

In [12]:
itr = iter(loader_train)

In [13]:
%time bx, by = next(itr)
pool.close()
pool.join()

CPU times: user 44 ms, sys: 0 ns, total: 44 ms
Wall time: 123 ms


In [14]:
def load_test(Ncore):
    pool = Pool(processes=Ncore)
    loader_train = Dataloader(batch_size=96, dataset=dataset, generator=train_gen, sampler=sampler.RandomSampler(dataset),
                          pool=pool, train=True, drop_last=False)
    start = time.time()
    for i in range(50):
        itr = iter(loader_train)
        bx, by = next(itr)
    end = time.time()
    print(bx.size(), by.size())
    print((end - start)/50)
    pool.close()
    pool.join()

In [15]:
load_test(1)

torch.Size([96, 3, 180, 180]) torch.Size([96])
0.12584909915924072


In [16]:
load_test(2)

torch.Size([96, 3, 180, 180]) torch.Size([96])
0.08236462593078614


In [17]:
load_test(3)

torch.Size([96, 3, 180, 180]) torch.Size([96])
0.06693346500396728


In [18]:
load_test(4)

torch.Size([96, 3, 180, 180]) torch.Size([96])
0.06852519989013672
