# Overview of Client

The pipeline for loading data for machine learning training is a critical process that requires thoughtful and deliberate planning. Key considerations that need to be accounted for include:

* keeping track of all data(s) and/or label(s) for each individual training case
* consistent stratified sampling between training and validation splits
* consistent stratified sampling between different data cohorts (e.g. from different hospitals or sources)
* randomization of data loading order between epochs
* any "real time" data preprocessing (e.g. normalization)

Other advanced functionality may include:

* in-memory loading of all data before training starts (if dataset is small)
* asynchronous loading of data (if dataset is large)

In this tutorial, we will cover the basics of creating a `client` for loading data, covering much of the key functionality described above. For each individual project, you must create your **own individualized** client for loading data. To help you get started, a template fully functional `client` is availabe in this repository at `/dl_core/clients/client.py`. Example usage of this module is available at the end of this tutorial.

## Key Concepts

All terms in **bold** will reference specific concepts that are reused throughout this tutorial. Please review these terms before proceeding further.

Machine learning algorithms require data to be **split** into *training* and *validation* sets. All training paradigms *require* this baseline division of data. For the purposes of this tutorial, a **split** represents the current usage of any particular example as *training* or *validation* data. Note that in the setting of cross-validation, all data will be used as *both* training and validation cases at different points during algorithm development.

For certain problems, it is necessary to further subdivide data into **cohorts**. For the purposes of this tutorial, a **cohort** represents *any arbitrary* division of data into user defined subgroups. Why are **cohorts** important? It turns out that stratified sampling (e.g. selecting data at fixed rates from specific cohorts) oftentimes improves training dynamics for heterogenous datasets, including those commonly seen in medical problems. For example, given the low prevalence of most disease states, it is often beneficial to load data at an even 50-50% distribution between positive and negative examples (e.g supersample from the positive category). 

Given the above, two different sampling **rates** are defined:

* **training_rate**: represents the rate of randomly selecting training / validation cases
* **sampling_rate**: represents the rate at which each individual **cohort** is sampled from

# Creating a Data Summary

Given the need to careful subselect splits or cohorts of data, it is often valuable to first extract key characteristics of data *first* as an independent step prior to loading any data. In this tutorial, implementation will including the following steps:

* find all data(s) and/or label(s) for each individual training case
* extract summary information about each training case
* aggregate all summarized data
* determine training / validation splits
* serialize summary as a Pickle file

## Finding Data

This portion of the code will be most the variable depending on the directory hierachy for the data in your project. In general, the goal will be to create a list of dictionaries, with each key-value pair representing a single full file path name. For example:

```
d = {
    'dat_0': /full/path/to/dat/0,
    'lbl_0': /full/path/to/lbl/0,
    'lbl_1': /full/path/to/lbl/1, ... }
```

Note that the keys you choose can be arbitrary, as long as you remember what is what!

In our example, the data is currently organized as follows:

```
/hdfs/[ID-...0]/dat.hdf5
/hdfs/[ID-...0]/bet.hdf5
/hdfs/[ID-...1]/dat.hdf5
/hdfs/[ID-...1]/lbl.hdf5
...
```

Assuming that your data is organized in a similar way, here is a simple method to generate such a list of dictionaries:

In [None]:
import os, glob

def find_data(query):
    """
    Method to read all data and make summary dictionary 

    :params

      (dict) query : {

        'root': '/path/to/root/dir',
        [key_00]: [query_00],
        [key_01]: [query_01], ...

      }
      
    """
    assert 'root' in query
    assert len(query) > 1

    root = query.pop('root')
    keys = sorted(query.keys())

    q = query.pop(keys[0])
    matches = glob.glob('%s/**/%s' % (root, q), recursive=True)

    DATA = []
    
    for n, m in enumerate(matches):

        print('CREATING SUMMARY (%07i/%07i): %s' % (n + 1, len(matches), m))

        d = {keys[0]: m}
        b = os.path.dirname(m)

        # --- Find other matches
        for key in keys[1:]:
            ms = glob.glob('%s/%s' % (b, query[key]))

            if len(ms) == 1: 
                d[key] = ms[0]
                
        if len(d) == len(keys):
            DATA.append(d)
    
    return DATA

The following code demonstrates example usage:

In [None]:
# --- Set query
query = {
    'root': '../../data/hdfs',
    'dat': 'dat.hdf5',
    'bet': 'bet.hdf5'}

# --- Find data
data = find_data(query)
print(data)

## Extracting Slice Location

Next, we need to identify information about **each slice** of data which will be used for algorithm training. To do so, we will first create a system to reference each individual slice data using an **index** and a **coord** variable:

* **index**: a value from `[0, n - 1]` representing the n-th sample in the dataset 
* **coord**: a *normalized* coordinate `[0, 1]` that represents the z-position of the slice

After all the data has been loaded and summarized, we should have two vectors, `index` and `coord`, *equal in size* to the total number of slices of all data. For example, if we had five volumes, each with 10 slices, then:

```
index = [0, 0, 0 ..., 1, 1, 1 ..., 9, 9, 9, ... 9, 9, 9]
coord = [0, 1, 2 ..., 0, 1, 2 ..., 0, 1, 2, ... 7, 8, 9] / 9
```

Assuming `data` contains a 4D Numpy volume, the following snippet of pseudocode will accomplish this:

```
index = []
coord = []

for INDEX in range(len(EXAMS)):

    [... load data ...]
    
    index.append(np.ones(data.shape[0], dtype='int') * INDEX)
    coord.append(np.arange(data.shape[0]) / (data.shape[0] - 1))
```

See below for a rough implementation of the above concepts. Keep in mind we need an additional variable, `LABELED`, which references the *key* in query from which to load data and use for calculations (in our case, `bet`). To load `*.hdf5` files we will use the `dl_core.io.hdf5` library. See dedicated notebook for more information.

Both `index` and `coord` vectors are embedded in a return dictionary `META`. For all subsequent code, we will use this convention and simply continue adding more vectors of information to the same dictionary variable.

In [None]:
import os, glob
import numpy as np

import sys
PATH = '../../'
if PATH not in sys.path:
    sys.path.append('../../')
from dl_core.io import hdf5

In [None]:
def make_summary(query, LABELED):
    """
    Method to read all data and make summary dictionary 

    :params

      (dict) query : {

        'root': '/path/to/root/dir',
        [key_00]: [query_00],
        [key_01]: [query_01], ...

      }

    """
    assert 'root' in query
    assert len(query) > 1
    assert LABELED in query

    root = query.pop('root')
    keys = sorted(query.keys())

    q = query.pop(keys[0])
    matches = glob.glob('%s/**/%s' % (root, q), recursive=True)

    DATA = []
    META = {}
    META['index'] = []
    META['coord'] = []

    for n, m in enumerate(matches):

        d = {keys[0]: m}
        b = os.path.dirname(m)

        # --- Find other matches
        for key in keys[1:]:
            ms = glob.glob('%s/%s' % (b, query[key]))

            if len(ms) == 1: 
                d[key] = ms[0]

        # --- Caculate summary meta information from LABELED
        if len(d) == len(keys):

            data, _ = hdf5.load(d[LABELED])

            # --- Aggregate information
            META['index'].append(np.ones(data.shape[0], dtype='int') * len(DATA))
            META['coord'].append(np.arange(data.shape[0]) / (data.shape[0] - 1))
            DATA.append(d)

    # --- Concatenate all vectors
    META = {k: np.concatenate(v) for k, v in META.items()}
    
    return DATA, META

The following code demonstrates example usage:

In [None]:
# --- Set query
query = {
    'root': '../../data/hdfs',
    'dat': 'dat.hdf5',
    'bet': 'bet.hdf5'}

# --- Find data
data, meta = make_summary(query, LABELED='bet')
print(data)
print(meta)

## Extracting Slice Data

In addition to `index` and `coord`, we may want to extract some additional information that may be useful when stratifying data loading in future. Oftentimes, this information will be based on the label values that are present at each slice, as this type of information will allow us to strategically load data at varying rates based on based on presence or absence of disease entity. 

In our example, the labels contain brain masks (1 == background, 2 == brain). Hypothetically, we may choose to load data such that 50% of slices contain backround, and 50% of slices contain brain. Thus, we will choose to now create two additional vectors, equal in size to the total number slices of all data (same as `index` and `coord`), that contain a binary True or False as to whether that slice contains background and/or brain. Both vectors will be added to the `META` return variable.

The following snippet of pseudocode will accomplish this for an arbitrary number of `CLASSES`:

```
META = {}
for INDEX in range(len(EXAMS)):

    [... load data ...]
    
    for c in range(CLASSES + 1):
        s = np.sum(data == c, axis=(1, 2, 3)) > 0
        META[c].append(s)
```

**IMPORTANT NOTE**: In your specific project, you may want to stratify data loading in different ways (e.g. from different hospitals, etc). Whichever method you choose to divide up your cohorts, you **must** extract the relevant information at this step. In subsequent steps, we will use this information is some way to formally divide up cohorts. 

See below for a rough implementation of the above concepts. 

In [None]:
import os, glob
import numpy as np

import sys
PATH = '../../'
if PATH not in sys.path:
    sys.path.append('../../')
from dl_core.io import hdf5

In [None]:
def make_summary(query, LABELED, CLASSES=2):
    """
    Method to read all data and make summary dictionary 

    :params

      (dict) query : {

        'root': '/path/to/root/dir',
        [key_00]: [query_00],
        [key_01]: [query_01], ...

      }

    """
    assert 'root' in query
    assert len(query) > 1
    assert LABELED in query

    root = query.pop('root')
    keys = sorted(query.keys())

    q = query.pop(keys[0])
    matches = glob.glob('%s/**/%s' % (root, q), recursive=True)

    DATA = []
    META = {c: [] for c in range(CLASSES + 1)}
    META['index'] = []
    META['coord'] = []

    for n, m in enumerate(matches):

        d = {keys[0]: m}
        b = os.path.dirname(m)

        # --- Find other matches
        for key in keys[1:]:
            ms = glob.glob('%s/%s' % (b, query[key]))

            if len(ms) == 1: 
                d[key] = ms[0]

        # --- Caculate summary meta information from LABELED
        if len(d) == len(keys):

            # --- Aggregate slice-by-slice label information
            data, _ = hdf5.load(d[LABELED])

            for c in range(CLASSES + 1):
                s = np.sum(data == c, axis=(1, 2, 3)) > 0
                META[c].append(s)

            # --- Aggregate information
            META['index'].append(np.ones(data.shape[0], dtype='int') * len(DATA))
            META['coord'].append(np.arange(data.shape[0]) / (data.shape[0] - 1))
            DATA.append(d)

    # --- Concatenate all vectors
    META = {k: np.concatenate(v) for k, v in META.items()}
    
    return DATA, META

The following code demonstrates example usage:

In [None]:
# --- Set query
query = {
    'root': '../../data/hdfs',
    'dat': 'dat.hdf5',
    'bet': 'bet.hdf5'}

# --- Find data
data, meta = make_summary(query, LABELED='bet', CLASSES=2)
print(data)
print(meta)

## Creating Train / Valid Splits

Before we finish creating the required summary information, we must divide all the data up into training / validation splits. The easiest strategy here it to randomly assign all cases an integer between `[0, n - 1]` where `n` represents the total number of splits desired. During training, all cases that match a given **fold** will be set to validation data, while the remaining cases will be set to training data.

As a simple example, to create an 80%-20% training/validation split, we set *n = 5*, e.g. all cases are randomly assigned a number from `[0, 4]`. In the first round of training, all cases assigned to `0` will be used for validation (~20%) while the remaining cases (~80%) will be used for training. From this, cross-validation is easy to implement by simply changing the validation **fold** from 0 to 4 which in turn updates the training/validation cases at each step. 

The following snippet of pseudocode will accomplish this for an arbitrary number of `N_FOLDS`:

```
valid = np.arange(N) % N_FOLDS
valid = valid[np.random.permutation(valid.size)]
```

See below for a rough implementation of the above concepts. 

In [None]:
import os, glob
import numpy as np

import sys
PATH = '../../'
if PATH not in sys.path:
    sys.path.append('../../')
from dl_core.io import hdf5

In [None]:
def make_summary(query, LABELED, CLASSES=2, N_FOLDS=5):
    """
    Method to read all data and make summary dictionary 

    :params

      (dict) query : {

        'root': '/path/to/root/dir',
        [key_00]: [query_00],
        [key_01]: [query_01], ...

      }

    """
    assert 'root' in query
    assert len(query) > 1
    assert LABELED in query

    root = query.pop('root')
    keys = sorted(query.keys())

    q = query.pop(keys[0])
    matches = glob.glob('%s/**/%s' % (root, q), recursive=True)

    DATA = []
    META = {c: [] for c in range(CLASSES + 1)}
    META['index'] = []
    META['coord'] = []

    for n, m in enumerate(matches):

        d = {keys[0]: m}
        b = os.path.dirname(m)

        # --- Find other matches
        for key in keys[1:]:
            ms = glob.glob('%s/%s' % (b, query[key]))

            if len(ms) == 1: 
                d[key] = ms[0]

        # --- Caculate summary meta information from LABELED
        if len(d) == len(keys):

            # --- Aggregate slice-by-slice label information
            data, _ = hdf5.load(d[LABELED])

            for c in range(CLASSES + 1):
                s = np.sum(data == c, axis=(1, 2, 3)) > 0
                META[c].append(s)

            # --- Aggregate information
            META['index'].append(np.ones(data.shape[0], dtype='int') * len(DATA))
            META['coord'].append(np.arange(data.shape[0]) / (data.shape[0] - 1))
            DATA.append(d)

    # --- Set validation fold (N-folds)
    valid = np.arange(len(META['index'])) % N_FOLDS
    valid = valid[np.random.permutation(valid.size)]
    META['valid'] = [np.ones(c.size) * v for c, v in zip(META['coord'], valid)]

    # --- Concatenate all vectors
    META = {k: np.concatenate(v) for k, v in META.items()}
    
    return DATA, META

The following code demonstrates example usage:

In [None]:
# --- Set query
query = {
    'root': '../../data/hdfs',
    'dat': 'dat.hdf5',
    'bet': 'bet.hdf5'}

# --- Find data
data, meta = make_summary(query, LABELED='bet', CLASSES=2)
print(data)
print(meta)

At this point, the variable `meta` contains a dictionary with multiple N-element vectors where N represents the total number of slices in the entire dataset.

```
meta['index'] = [0, 0, 0 ..., 1, 1, 1 ..., 9, 9, 9, ... 9, 9, 9]
meta['coord'] = [0, 1, 2 ..., 0, 1, 2 ..., 0, 1, 2, ... 7, 8, 9] / 9
meta[0] = [... presence or absence of 0 at this slice ...]
meta[1] = [... presence or absence of 1 at this slice ...]
meta[2] = [... presence or absence of 2 at this slice ...]
```

# Preparing Cohorts

Now that we have information extracted at all slice locations, we are ready to separate our data in cohorts. The cohorts will be split based on train / valid status as well as any potential cohort-specific requirements we may need for any particular project. In this example, we separated out slices into those containing: (1) background; (2) brain. Thus we will have a total of four separate cohorts:

* train: background; brain
* valid: background; brain

Assuming that we have a series of N-element vectors, the easiest way to define these four cohorts is to simply create four sets (or lists) of indices, each set of which indicates the indices that are present in each cohort. Note that it is not necessary for the sets to be mutually exclusive.

**IMPORTANT NOTE**: In your specific project, you may want to stratify data loading in different ways (e.g. from different hospitals, etc). Based on the information you extracted from data above, you **must** use that information now to properly create cohorts at this step. 

See below for a rough implementation of the above concepts. 

In [None]:
def prepare_cohorts(data, meta, fold):
    
    cohorts = {'train': {}, 'valid': {}}
    
    for split in ['train', 'valid']:

        # --- Determine mask corresponding to current split 
        if split == 'train': 
            mask = meta['valid'] != fold
        if split == 'valid':
            mask = meta['valid'] == fold

        # --- Find slices with class == 2
        cohorts[split][2] = np.nonzero(meta[2] & mask)[0]

        # --- Find slices with class == 1 (and not class == 2)
        cohorts[split][1] = np.nonzero(meta[1] & ~meta[2] & mask)[0]
    
    return cohorts

The following code demonstrates example usage:

In [None]:
cohorts = prepare_cohorts(data, meta, fold=0)
print(cohorts)

## Training Rates

Now that the data is split into cohorts, we could easily control the rate at which sample from each group. We will keep track of the desired ratio using a simple dictionary:

In [None]:
training_rates = {'train': 0.8, 'valid': 0.2}

## Sampling Rates

In addition to rate at which we select training and validation cases, we would like to specify the rate at which we sample from each of our defined cohorts. In our example, we have two cohorts (`1` and `2`) which represent slices that contain background and brain, respectively. Although in our example we have a simple two-class division, we may in theory have multiple classes. To make the sampling problem easier in the future, we will parameterize this distribution by keeping track of the lower / upper bounds that correspond to each desired class.

For example, if we had three classes (A, B and C) and desired a sampling distribution of 0.1, 0.4 and 0.5 respectively, we would have the following parameterization:

```
(class)  (lower)  (upper)  
A        0.0      0.1
B        0.1      0.5
C        0.5      1.0
```
Given a random seed between `[0, 1]` it is now easy to "pick" one of these three classes.

In [None]:
def set_sampling_rates(cohorts, rates={}):

    assert set(cohorts['train'].keys()) == set(rates.keys())
    assert sum(list(rates.values())) == 1

    keys = sorted(rates.keys())
    vals = [rates[k] for k in keys]

    lower = np.array([0] + vals[:-1])
    upper = np.array(vals[1:] + [1])

    return {
        'cohorts': keys,
        'lower': np.array(lower),
        'upper': np.array(upper)} 

The following code demonstrates example usage:

In [None]:
sampling_rates = set_sampling_rates(cohorts, rates={1: 0.5, 2: 0.5}) 
print(sampling_rates)

## Randomizing Data

Next, we need a mechanism to keep track of randomly selecting data from each cohort. To do so, for each cohort of size N, we create a corresponding randomly permuated vector of indices indicating the order from which to sample from each cohort. In our example, for 4 total cohorts, we need a total of 4 separate vectors of random numbers. 

At this time we will create an additional dictionary variable `current` that will keep track of the exact current index for each individual cohort, as well as count to total number of epochs per cohort.

Let's see this in action:

In [None]:
def prepare_next_epoch(cohorts, indices, current, split, cohort):
    
    assert cohort in cohorts[split]
    
    indices[split][cohort] = np.random.permutation(cohorts[split][cohort].size)
    current[split][cohort]['epoch'] += 1
    current[split][cohort]['count'] = 0 
    
    return indices, current

Before any training begins, we initialize empty values for `indices` and `current`:

In [None]:
indices = {'train': {}, 'valid': {}}
current = {'train': {}, 'valid': {}}

Now we loop through each of our defined cohorts and create randomized indices: 

In [None]:
for split in ['train', 'valid']:
    for cohort in cohorts[split]:
        current[split][cohort] = {'epoch': -1, 'count': 0}
        indices, current = prepare_next_epoch(cohorts, indices, current, split, cohort)

## Putting It All Togther

Let us now quickly review all the variables that we have prepared:

* `cohorts`
* `indices`
* `current`
* `training_rates`
* `sampling_rates`

Now let's see how we use all this information to load data from any given specified `split` and/or `cohort`, or to choose randomly if not defined:

In [None]:
def prepare_next_array(split=None, cohort=None):

    if split is None:
        split = 'train' if np.random.rand() < training_rates['train'] else 'valid'

    if cohort is None:
        if sampling_rates is not None:
            i = np.random.rand()
            i = (i < sampling_rates['upper']) & (i >= sampling_rates['lower'])
            i = int(np.nonzero(i)[0])
            cohort = sampling_rates['cohorts'][i]
        else:
            cohort = sorted(cohorts[split].keys())[0]

    c = current[split][cohort]
    i = indices[split][cohort]

    if c['count'] > i.size - 1:
        repare_next_epoch(split, cohort)
        c = current[split][cohort]
        i = indices[split][cohort]

    index = meta['index'][i[c['count']]]
    coord = meta['coord'][i[c['count']]]
    data_ = data[index]

    # --- Increment counter
    c['count'] += 1

    return data_, {'coord': coord, 'split': split, 'cohort': cohort}

The following code demonstrates example usage:

In [None]:
for i in range(10):
    data_, meta_ = prepare_next_array()
    print(data_)
    print(meta_)

# Loading Data

## Loading HDF5 Files

In [None]:
import os, glob
import numpy as np

import sys
PATH = '../../'
if PATH not in sys.path:
    sys.path.append('../../')
from dl_core.io import hdf5

In [None]:
def load_hdf5(fname, **kwargs):

    infos = kwargs['infos'] if 'infos' in kwargs else None

    return hdf5.load(fname, infos)

## Wrapper for Generic Data

In [None]:
def load(data, **kwargs):

    if type(data) is not str:
        return data

    LOAD_FUNC = {
        'hdf5': load_hdf5}

    ext = data.split('.')[-1]

    if ext in LOAD_FUNC:
        return LOAD_FUNC[ext](data, **kwargs)

    else:
        print('ERROR provided extension is not supported: %s' % ext)
        return None, {} 

## Wrapper for Dictionary of Files

In [None]:
def load_dict(data, **kwargs):

    assert type(data) is dict

    arrays = {}
    for key, val in data.items():
        arrays[key], _ = load(data=val, **kwargs)

    return arrays

# Usage of Template Client

In this section, we will explore example usage of the template `client` provided in this repoository. 

**IMPORTANT**: This `client` has been written to load 1 x 512 x 512 (e.g. single slice) arrays from the provided example head CT. You *will need to modify* this code for your own individual projects.

## Import dl_core

To use the `dl_core` library, you need to ensure that the repository path has been set. If you are using the python interpreter directlying (e.g. command line) you will need to add the repository path to the `$PYTHONPATH` environment variable. If you are using an iPython interface (e.g. including Jupyter) you will need to set the path using the `sys` module: 

In [None]:
# --- Set PATH to dl_core library path
PATH = '../../' 
import sys
if PATH not in sys.path:
    sys.path.append(PATH)

## Import client

In [None]:
from dl_core.clients import Client

In [None]:
# --- Set default path locations
SUMMARY_PATH = '../../data/pkls/summary.pkl'
HDFS_PATH = '../../data/hdfs'

## Creating a summary of the data

Recall that in order to properly handle stratified sampling requirements, we need to know more information about the underlying data (e.g. which slice(s) are positive, etc). This information will then be used to randomize and organize cohorts for future data loading pipelines. See section above for more information.

In [None]:
client = Client(SUMMARY_PATH=SUMMARY_PATH)

client.make_summary(
    query={
        'root': HDFS_PATH,
        'dat': 'dat.hdf5',
        'bet': 'bet.hdf5'},
    LABELED='bet',
    CLASSES=2
)

## Preparing a client

Prior to loading data, we need to prepare the client with specifications regarding the desired cohort and sampling rates. See section above for more information. 

In [None]:
client = Client(SUMMARY_PATH=SUMMARY_PATH)
client.load_summary()

client.prepare_cohorts(fold=0)
client.set_sampling_rates(rates={
    1: 0.5,
    2: 0.5
})

## Loading data

At last, we are ready to use the client to load data.

In [None]:
for i in range(10):
    arrays = client.get()
    print(arrays['dat'].shape)