In [1]:
from IPython.core.display import display, HTML
display(HTML('<style>.container {width:80% !important;}</style>'))

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torchvision import transforms, utils
import torchvision.transforms as transforms
import torchvision.models as models
import torchvision

import numpy as np
from tqdm import tqdm
from queue import Empty, Queue
from threading import Thread
import threading
from torch.autograd import Variable
from sklearn.utils import shuffle
import pickle
import sys

# augmentation library
from imgaug.augmentables import Keypoint, KeypointsOnImage
import imgaug.augmenters as iaa
import accimage

## Problem statement
Для того, чтобы было ясно, что происходит в этом коде, нужно сформулировать задачу, для которой запускаются генераторы данных, описываемые ниже.

Задача - автокодировщик. Это означает, что целевая переменная совпадает с признаковым описанием объектов. То есть, нейросеть должна выучить идентичное преобразование.

А это означает, что результатом генератора данных может быть один массив/батч массивов. Порождать совпадающий с ним массив целевой переменной нет надобности.

In [None]:
num_epochs = 100
use_gpu = True
batch_size = 64
images_base_dir = '/app/vg/'

In [None]:
class thread_killer(object):    
    """Boolean object for signaling a worker thread to terminate
    Once a thread is launched, it should be terminated at some moment.
    In case the function of this thread is an infinite loop, one needs a mutex
    for signaling a worker thread to break the loop.
    The fuction will return, and the thread will be terminated.
    """
    
    def __init__(self):
        self.to_kill = False

    def __call__(self):
        return self.to_kill

    def set_tokill(self, tokill):
        self.to_kill = tokill

In [None]:
def threaded_batches_feeder(tokill, batches_queue, dataset_generator):
    """
    Threaded worker for pre-processing input data.
    tokill (thread_killer): an object that indicates whether a thread should be terminated
    dataset_generator (Dataset): training/validation data generator
    batches_queue (Queue): a limited size thread-safe Queue instance for train/validation data batches
    """
    while tokill() == False:
        for sample_batch in dataset_generator:
            batches_queue.put(torch.Tensor(sample_batch), block=True)
            if tokill() == True:
                return

In [None]:
def threaded_cuda_batches(tokill, cuda_batches_queue, batches_queue):
    """
    Thread worker for transferring pytorch tensors into GPU. 
    batches_queue (Queue): the queue that fetches numpy cpu tensors.
    cuda_batches_queue (Queue): the queue receiving numpy cpu tensors and transfering them to GPU memory.
    """
    while tokill() == False:
        sample_batch = batches_queue.get(block=True)
        sample_batch = Variable(sample_batch).to(device)
        cuda_batches_queue.put(sample_batch, block=True)
        if tokill() == True:
            return

In [None]:
class Threadsafe_iter:
    """
    Takes an iterator/generator and makes it thread-safe by
    serializing call to the `next` method of given iterator/generator.
    """
    def __init__(self, it):
        self.it = it
        self.lock = threading.Lock()

    def __iter__(self):
        return self

    def __next__(self):
        with self.lock:
            return next(self.it)

def get_objects_i(objects_count):
    """Cyclic generator of paths indices"""
    current_objects_id = 0
    while True:
        yield current_objects_id
        current_objects_id  = (current_objects_id + 1) % objects_count

In [None]:
class LandmarksDataset(Dataset):
    def __init__(self, pkl_file, root_dir, transform=None, batch_size = 8, augment = True):
        """
        Args:
            pkl_file (string): Path to the pkl file with annotations.
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied on a sample.
            batch_size (int, optional): batch size
        """
        with open(pkl_file, 'rb') as f:
            marked_data_loaded = pickle.load(f)
        self.landmarks_frame = []
        self.total_len = 0
        for dicts in marked_data_loaded:
            dict_len = len(dicts) 
            self.total_len += dict_len
            if dict_len > 0:
                self.landmarks_frame.append(pd.DataFrame(dicts).T)
        self.root_dir = root_dir
        self.transform = transform
        
        self.batch_size = batch_size
        
        self.objects_id_generator = Threadsafe_iter(get_objects_i(len(self.landmarks_frame)))
        
        self.lock = threading.Lock()
        self.yield_lock = threading.Lock()
        self.init_count = 0
        self.augment = augment
        self.cache = {}
        
        if self.augment:
            # instantiate augmentations
            self.seq = iaa.Sequential([iaa.Fliplr(0.5),
                                       iaa.Flipud(0.5),
                                       iaa.GaussianBlur(sigma=(0, 5)),
                                       iaa.Cutout(nb_iterations=(1, 3), size=0.3, squared=False, cval=0),
                                       iaa.CoarseDropout((0.0, 0.05), size_percent=(0.02, 0.25)),
                                       iaa.Affine(rotate=(-180,180),
                                                  translate_percent={'x': (-0.05, 0.05), 'y': (-0.05, 0.05)},
                                                  shear={'x': (-10, 10), 'y': (-10, 10)}),
                                       iaa.ElasticTransformation(alpha=(0, 5.0), sigma=0.25)],
                                      random_order=True)
        else:
            self.seq = iaa.Identity()       


    def __len__(self):                        
        return self.total_len
    
    def shuffle(self):
        self.landmarks_frame = shuffle(self.landmarks_frame)
    
    def __iter__(self):
        while True:
            with self.lock:
                if (self.init_count == 0):
                    self.shuffle()
                    self.imgs = []
                    self.init_count = 1
            
            for obj_id in self.objects_id_generator:
                image_set = []
                for i in range(6):
                    img_name = os.path.join(self.root_dir, self.landmarks_frame[obj_id].iloc[0 , i])
                    if img_name in self.cache:
                        img = self.cache[img_name]
                    else:
                        img = accimage.Image(img_name)
                        image_np = np.empty([img.channels, img.height, img.width], dtype=np.uint8)      # CxHxW
                        img.copyto(image_np)
                        self.cache[img_name] = img
                    
                    img_augmented = self.seq(img)
                    
                    image_set.append(img_augmented)
                image_set = np.concatenate(image_set, axis=-1)
                for part_id in range(self.landmarks_frame[obj_id].shape[0]):
                    landmarks = self.landmarks_frame[obj_id].iloc[part_id, 6:]
                    landmarks = landmarks.to_numpy(int)
                    landmarks = landmarks.reshape(-1, 4)

                    image = image_set[landmarks[0, 2]: landmarks[0, 3],
                                      landmarks[0, 0]: landmarks[0, 1], :]
                    image = image.transpose((2, 0, 1))
                    image = transform.resize(image, (18, 280, 280))

                    # Concurrent access by multiple threads to the lists below
                    with self.yield_lock:
                        if (len(self.imgs)) < self.batch_size:
                            self.imgs.append(image)
                        if len(self.imgs) % self.batch_size == 0:
                            imgs_f32 = np.float32(self.imgs)
                            yield imgs_f32
                            self.imgs = []
            # At the end of an epoch we re-init data-structures
            with self.lock:
                self.landmarks_frame = shuffle(self.landmarks_frame)
                self.init_count = 0

In [None]:
class ToTensor(object):
    """Convert ndarrays in sample to Tensors."""
    def __call__(self, sample):
        image = sample
        return torch.from_numpy(image)

In [None]:
# Here we create the instance of the LandmarksDataset class
dataset = LandmarksDataset(pkl_file='./marked_full_data.pkl',
                           root_dir=images_base_dir,
                           transform=ToTensor(),
                           batch_size=batch_size)

### sanity check
the dataset should generate batches of data

In [None]:
for batch in dataset:
    break

In [None]:
# Here we define the device which will be used to perform a network optimization
device = torch.device("cuda:0" if use_gpu and torch.cuda.is_available() else "cpu")

In [None]:
STEPS_PER_EPOCH = len(dataset)//batch_size + 1
STEPS_PER_EPOCH

In [None]:
# set to training mode
# model.train()  

# Here we instantiate queues and mutexes, and launch the threads that will preprocess the data and send it into GPU
batches_queue_length = min(STEPS_PER_EPOCH, 24)
    
train_batches_queue = Queue(maxsize=batches_queue_length)
train_cuda_batches_queue = Queue(maxsize=10)
train_thread_killer = thread_killer()
train_thread_killer.set_tokill(False)
preprocess_workers = 32

for _ in range(preprocess_workers):
    thr = Thread(target=threaded_batches_feeder, args=(train_thread_killer, train_batches_queue, dataset))
    thr.start()

train_cuda_transfers_thread_killer = thread_killer()
train_cuda_transfers_thread_killer.set_tokill(False)
train_cudathread = Thread(target=threaded_cuda_batches, args=(train_cuda_transfers_thread_killer, train_cuda_batches_queue, train_batches_queue))
train_cudathread.start()
# Everything is ready for the training

print('Training ...')
for epoch in range(num_epochs):
    num_batches = 0
    for image_batch in tqdm(range(STEPS_PER_EPOCH), total=STEPS_PER_EPOCH):
        x = train_cuda_batches_queue.get(block=True)
        x = x.float()
        
        # Here one usually implements training steps
#         apply model
#         y = model(x)
#         loss = ...
        
#         backpropagation
#         optimizer.zero_grad()
#         loss.backward()
        
#         one step of the optmizer (using the gradients from backpropagation)
#         optimizer.step()
        
        num_batches += 1
    
#     scheduler.step()



train_thread_killer.set_tokill(True)
train_cuda_transfers_thread_killer.set_tokill(True)
for _ in range(preprocess_workers):
    try:
        # Enforcing thread shutdown
        train_batches_queue.get(block=True, timeout=1)
        train_cuda_batches_queue.get(block=True, timeout=1)
    except Empty:
        pass