In [1]:
# define logging and working directory
from ProjectRoot import change_wd_to_project_root
change_wd_to_project_root()
from src.utils.Notebook_imports import *
import numpy as np

search for root_dir and set working directory
Working directory set to: /mnt/ssd/git/wft21_septum_landmark_detection


# Test the main concepts of a DL generator

A Generator yields a tuple of (x,y) and is used to feed data into a deep learning model

Each tuple has the following shape: 

$(batchsize \times inputObjects \times inputShape)$ , $(batchsize \times ioutputObjects \times outputShape)$

--> $inputShape$/$outputShape$ could be:
- for 1D vector-data: $width$
- for 2D images: $height \times width$: 
- for 3D volumes: $depth \times height \times width$


# Create fake data interactively

Please define:

- $examples$ - The total number of patients (the size of the dataset)
- $inputObjects$ / $ouputObjects$ - This could be the timesteps of 4D CMR files or the number of 3D volumes/ different modalities (multi-input model)
- $inputShape$ / $outputShape$ - could be 3/2 or 1D data, e.g.>: 𝑑𝑒𝑝𝑡ℎ∗ℎ𝑒𝑖𝑔ℎ𝑡∗𝑤𝑖𝑑𝑡ℎ.
- $batchsize$ -  Which is the number of entities yielded in one step

Usually the generator would save only the references to the corresponding files.
In this example we create numpy arrays with the desired shape on the fly.

In [2]:
# Define the size of our fake data
upper_example_limit = 200
upper_example_size_limit = 8
upper_batchsize_limit = 20

@interact_manual
def create_fake_data(examples_=(1,upper_example_limit), 
                     inputObjects=(1,upper_example_size_limit),
                     outputObjects=(1,upper_example_size_limit),
                     batchsize_=(1,upper_batchsize_limit), 
                     input_obj_shape='(10,10,10)', 
                     ouptut_obj_shape='(10,10,10)'):
    
    global examples, input_objects, output_objects , batchsize, indexes, X, Y, x_dict, y_dict, batches
    examples = examples_
    batchsize = batchsize_
    input_objects = inputObjects
    output_objects = outputObjects
    
    # make sure the dimensions have the correct formating
    # converts a string of int-tuple into a tuple of int '(10,10,10)' --> (10,10,10)
    x_dim = tuple(map(int, input_obj_shape.replace(')', '').replace('(','').split(',')))
    y_dim = tuple(map(int, ouptut_obj_shape.replace(')', '').replace('(','').split(',')))
    
    # create some fake data
    x_dict = {}
    y_dict = {}  
    for example in range(examples_):
        # create example data (batchsize x input_objects x input_object_shape and batchsize output_objects x output_object_shape)
        # squeeze unused dimensions
        x_dict[example] = np.squeeze(np.stack([np.round(np.random.sample(x_dim),2)+example for i in range(input_objects)]))
        y_dict[example] = np.squeeze(np.stack([np.round(np.random.sample(y_dim),2)+(10*example) for i in range(output_objects)]))
        # testing purpose if lists are faster than dicts
        #X.append(np.stack([np.round(np.random.sample(x_dim),2)+example for i in range(input_objects)]))
    # index our data, we can use the indicies to select one example or a batch of examples from a list or dictionary
    # By this we dont need to shuffle the data itself, we shuffle only the indexes
    indexes = list(range(len(x_dict)))
    batches = int(np.floor(examples/batchsize))-1
    print('Shape of one batch X: {} * {}, Y: {} * {}'.format(batchsize, x_dict[0].shape, batchsize, y_dict[0].shape))

interactive(children=(IntSlider(value=100, description='examples_', max=200, min=1), IntSlider(value=4, descri…

# Select one batch and yield the corresponding values/shape $(batchsize \times inputObjects \times inputObjShape),(batchsize \times outputObjects \times outputObjShape)$

In [3]:
@interact
def select_batch(selected_batch = (0,batches), shuffle_indexes=False, debug=False):
    global indexes
    
    if shuffle_indexes:
        random.shuffle(indexes)
    # make sure indexes are correctly initialised
    assert len(indexes) == examples, print('len indexes: {}, number of examples: {}'.format(len(indexes), examples))
    
    # define the lower/upper index slicing borders of the current batch
    start_idx = selected_batch*batchsize
    end_idx = (selected_batch+1)*batchsize
    
    # we slice the indexes of the current batch from the index list
    batch_indexes = indexes[start_idx: end_idx]
    
    # print the restrictions of the current batch
    print('selected batch: {} of {} with a batchsize of {} and total {} examples'.format(selected_batch, batches, batchsize, examples))
    print('start idx: {}, end idx: {}'.format(start_idx, end_idx))
    print('Indexes of the currrent batch: {}'.format(batch_indexes))
    print('-'*40)
    
    # stack the entities of the current batch
    batch_x = np.stack([x_dict[k] for k in batch_indexes])
    batch_y = np.stack([y_dict[k] for k in batch_indexes])
    if debug:
        [print('index: {}: value: {}'.format(k, x_dict[k])) for k in batch_indexes]
        [print('index: {}: value: {}'.format(k, y_dict[k])) for k in batch_indexes]
    
    return([batch_x.shape, batch_y.shape])

interactive(children=(IntSlider(value=4, description='selected_batch', max=9), Checkbox(value=False, descripti…

# Simple random generator, subclassed from tensorflow.keras.utils.Sequence

In [4]:
import tensorflow, random
from time import time
import concurrent.futures
from concurrent.futures import as_completed
import logging
from src.utils.Utils_io import Console_and_file_logger

class BaseGenerator(tensorflow.keras.utils.Sequence):
    """
    Base generator class
    """

    def __init__(self, x=None, y=None, config={}):
        """
        Creates a base datagenerator for a list of nrrd images and a list of nrrd masks
        :param x: list of nrrd image file names
        :param y: list of nrrd mask file names
        :param config:
        """
        # Define standard parameters
        # ###################################################################
        logging.info('Create BaseDataGenerator')
        assert len(x) == len(y)

        self.EXAMPLES = len(x)
        self.INPUTS = config.get('INPUTS', 1)
        self.OUTPUTS = config.get('OUTPUTS', 1)
        self.X_DIM = str(config.get('X_DIM', (256, 256)))
        self.Y_DIM = str(config.get('Y_DIM', (256, 256)))
        self.BATCHSIZE = config.get('BATCHSIZE', 32)
        self.SHUFFLE = config.get('SHUFFLE', True)
        
        # create one worker per image & mask (batchsize) for parallel pre-processing if nothing else is defined
        self.MAX_WORKERS = config.get('MAX_WORKERS', self.BATCHSIZE)
        self.MAX_WORKERS = min(32, self.MAX_WORKERS)

        # Make sure the dimensions have the correct formating
        # converts a string of int-tuple into a tuple of int '(10,10,10)' --> (10,10,10), or 10 --> (10,)
        self.X_DIM = tuple(map(int, self.X_DIM.replace(')', '').replace('(', '').split(',')))
        self.Y_DIM = tuple(map(int, self.Y_DIM.replace(')', '').replace('(', '').split(',')))

        # Create some static fake data
        # #######################################################################
        self.x_dict = {}
        self.y_dict = {}
        for example in range(self.EXAMPLES):
            # create example data (batchsize x input_objects x input_object_shape and batchsize output_objects x output_object_shape)
            # squeeze unused dimensions
            self.x_dict[example] = np.squeeze(
                np.stack([np.round(np.random.sample(self.X_DIM), 2) + example for i in range(self.INPUTS)]))
            self.y_dict[example] = np.squeeze(
                np.stack([np.round(np.random.sample(self.Y_DIM), 2) + (10 * example) for i in range(self.OUTPUTS)]))
        # #######################################################################
        # index our data, we can use the indicies to select one example or a batch of examples from a list or dictionary
        # By this we dont need to shuffle the data itself, we shuffle only the indexes
        
        # We use these indicies to access and shuffle the data
        # #######################################################################
        self.INDICES = list(range(len(self.x_dict)))

        print('Shape of one batch X: {} * {}, Y: {} * {}'.format(self.BATCHSIZE, self.x_dict[0].shape, self.BATCHSIZE,
                                                                 self.y_dict[0].shape))
        
        self.X_SHAPE = np.empty((self.BATCHSIZE, *self.x_dict[0].shape), dtype=np.float32)
        self.Y_SHAPE = np.empty((self.BATCHSIZE, *self.y_dict[0].shape), dtype=np.float32)

    def __len__(self):

        """
        Denotes the number of batches per epoch
        :return: number of batches
        """
        return int(np.floor(len(self.INDICES) / self.BATCHSIZE))

    def __getitem__(self, index):

        """
        Generate the indexes for one batch of data
        This method allows to access the gen by simple indices
        gen = BaseGenerator(...)
        x,y = gen[0]
        :param index: int in the range of  {0: len(dataset)/Batchsize}
        :return: pre-processed batch as x,y tuples
        """

        t0 = time()
        # collect n indices with n = Batchsize
        # starting from the given index parameter
        # which is in the range of  {0: len(dataset)/Batchsize}
        idxs = self.INDICES[index * self.BATCHSIZE: (index + 1) * self.BATCHSIZE]

        return self.__data_generation__(idxs)

    def on_epoch_end(self):

        """
        Shuffle the indexes after each epoch
        :return: None
        """

        if self.SHUFFLE:
            np.random.shuffle(self.INDICES)

    def __data_generation__(self, ids):

        """
        Preprocess one batch, represented by the list of ids
        Could pre-process each entity in parallel
        returns the preprocessed batch

        :param list_IDs_temp:
        :return: X : (batchsize, *dim, n_channels), Y : (batchsize, *dim, number_of_classes)
        """

        # Initialization

        x = np.empty_like(self.X_SHAPE)
        y = np.empty_like(self.Y_SHAPE)
        logging.info('preprocess one batch with: {}, {}'.format(x.shape, y.shape))

        futures = set()

        # spawn one thread per worker
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.MAX_WORKERS) as executor:

            t0 = time()
            # Generate data
            for i, ID in enumerate(ids):

                try:
                    # keep ordering of the shuffled indexes
                    futures.add(executor.submit(self.__preprocess_one_image__, i, ID))

                except Exception as e:
                    logging.error(
                        'Exception {} in datagenerator with: image: {} or mask: {}'.format(str(e), self.x_dict[ID],
                                                                                           self.y_dict[ID]))
        # This is out of the threadPool executor context
        for i, future in enumerate(as_completed(futures)):
            # use the index i to place each processed example in the batch
            # otherwise slower images will always be at the end of the batch
            # Use the ID for exception handling as reference to the file name
            try:
                x_, y_, i, ID, needed_time = future.result()
                x[i,], y[i,] = x_, y_
                logging.info('img finished after {:0.3f} sec.'.format(needed_time))
            except Exception as e:
                logging.error(
                    'Exception {} in datagenerator with: image: {} or mask: {}'.format(str(e), self.x_dict[ID],
                                                                                       self.y_dict[ID]))

        logging.debug('Batchsize: {} preprocessing took: {:0.3f} sec'.format(self.BATCHSIZE, time() - t0))

        return np.array(x.astype(np.float32)), np.array(y.astype(np.float32))

    def __preprocess_one_image__(self, i, ID):
        t0 = time()
        import time as t
        #t.sleep(1) # testing purpose
        # in this function we would load and preprocess the file self.x_dict[ID] and self.y_dict[ID]
        return self.x_dict[ID], self.y_dict[ID], i, ID, time() - t0

# Play here with the generator/config params

The generator should follow the convention over configuration paradigm and provide a standard value for each possible parameter

In [5]:
cfg = {}
cfg['X_DIM'] = 5,5
cfg['Y_DIM'] = 5,5
cfg['BATCHSIZE'] = 5
cfg['MAX_WORKERS'] = 5
files = 10
gen = BaseGenerator(x=[1]*files,y=[1]*files, config=cfg)

Shape of one batch X: 5 * (5, 5), Y: 5 * (5, 5)


In [7]:
@interact
def get_batches_from_generator(i=(0,len(gen)), epoch_end=False):
    global gen
    if epoch_end:gen.on_epoch_end()
    x,y = gen[i]
    print('x-shape: {}, y-shape: {}'.format(x.shape, y.shape))
    print('mean x: {}, mean y: {}'.format(x.mean(), y.mean()))
    return x,y

interactive(children=(IntSlider(value=1, description='i', max=2), Checkbox(value=False, description='epoch_end…

# Performance of different indexing methods

In [55]:
%%timeit -r 1
# process all files of the generator
_ = [(x,y) for x,y in gen]

2.01 s ± 1.09 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)


# Subclassing new generators

In [9]:
class DataGenerator(BaseGenerator):
    """
    Yields (X, Y) / image,mask for 2D and 3D U-net training
    could be used to yield (X, None)
    """
        
    def __preprocess_one_image__(self, i, ID):
        delta = 0.1
        
        # Add here any fancy new 
        
        return self.x_dict[ID]*delta, self.y_dict[ID]*delta, i, ID, time() - t0

In [10]:
gen = DataGenerator(x=[1]*files,y=[1]*files, config=cfg)

Shape of one batch X: 5 * (5, 5), Y: 5 * (5, 5)


In [12]:
gen[7]

(array([[[2.78580580e+33, 3.07556987e-41, 0.00000000e+00, 0.00000000e+00,
                     nan],
         [4.58743078e-41, 1.63729959e-19, 1.14468136e+24, 2.56410238e+29,
          6.13169348e+28],
         [1.94209412e+31, 7.33822581e+34, 6.86081970e+22, 1.27715149e+01,
          3.45812815e+12],
         [1.15299647e+27, 6.65319884e-33, 1.14468136e+24, 5.08488755e+31,
          4.96401942e+28],
         [3.04812811e+32, 1.89427093e+23, 2.01115550e-19, 1.94316151e-19,
          7.68133284e+31]],
 
        [[7.22507385e+28, 2.28396210e+02, 2.00258371e-19, 6.74221921e+22,
          1.75892947e+22],
         [6.86081970e+22, 1.04630877e+27, 1.89808543e+28, 1.76668287e+22,
          9.49574173e+12],
         [2.82191761e+26, 4.61141998e+24, 6.26087042e+22, 4.74281365e+30,
          4.85607420e+33],
         [4.54177956e+30, 4.96401895e+28, 3.24870624e+33, 4.29568637e+24,
          1.13616057e+30],
         [7.15473767e+22, 1.80373090e+28, 1.93680168e+31, 1.95185260e-19,
          7.00

In [None]:
def __init__(self, **kwargs):
        super(self.__class__, self).__init__(**kwargs)
        self.name = 'myfirstgenerator'

In [4]:
from random import randint
# We have a list --> X and a dictionary --> x_dict, 
# len(X) == len(x_dict)
# and X[i] == x_dict[i] for all i in range(len(X))
# We create n random indicies within the range of len(X)
samples = [randint(0, examples-1) for _ in range(10000000)]

In [5]:
%%timeit
# access the list n times, append the value to a new list
temp = None
for i in samples:
    temp = X[i]
    

248 ms ± 2.87 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [6]:
%%timeit
# access the dict n times, append the value to a new list
temp = None
for i in samples:
    temp = x_dict[i]

398 ms ± 2.14 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
