# FeTS Challenge

Contributing Authors (alphabetical order):
- Brandon Edwards (Intel)
- Patrick Foley (Intel)
- Alexey Gruzdev (Intel)
- Sarthak Pati (University of Pennsylvania)
- Micah Sheller (Intel)
- Ilya Trushkin (Intel)

In [2]:
import numpy as np

from fets_challenge import run_challenge_experiment
from fets_challenge.experiment import logger

# Adding custom functionality to the experiment
Within this notebook there are **four** functional areas that you can adjust to improve upon the challenge reference code:

- [Validation functions](#Custom-Validation-Functions)
- [Custom aggregation logic](#Custom-Aggregation-Functions)
- [Selection of training hyperparameters by round](#Custom-hyperparameters-for-training)
- [Collaborator training selection by round](#Custom-Collaborator-Training-Selection)


# Custom Validation Functions

Any of the standard PyTorch [validation metrics](https://torchmetrics.readthedocs.io/en/latest/references/modules.html#classification-metrics) can be used to evaluate the model. Any user defined validation functions should conform to the following interface:

    def validation_fun_interface(targets, predictions):
        """validation function interface
    
        Args:
            Targets: numpy array of target values
            Predictions: numpy array of predicted values by the model
        Returns:
            val_score : float
        
        return val_score
        
        
Accuracy is defined below as a reference implementation. To add custom metrics to the validation function that don't conform to the ```(targets, predictions)``` interface, [functool's partial function](https://docs.python.org/3/library/functools.html#functools.partial) can be used to fix a certain number of arguments of a function and generate a new function. In the example below, we use F1 score as a custom metric using the partial function in addition to the exisiting accuracy metric:

    from functools import partial
    from sklearn.metrics import f1_score
    validation_functions=[('acc', accuracy), ('f1_score', partial(f1_score, average='macro'))]
    
You'll notice that two validation functions are applied here: accuracy as well as f1_score. The string that is included in the 0-index of the tuple (e.g. 'acc') will be assigned that name when it is stored in the aggregator's database. More information about how to use this information for custom aggregation can be found [here](#Using-validation-metrics-for-filtering)



In [3]:
def accuracy(targets, predictions):
    """Calculates the average of all correct predictions
    
    Args:
        Targets: numpy array of target values
        Predictions: numpy array of predicted values by the model
    """
    return (targets == predictions).mean()

# Getting access to historical weights, metrics, and more
The **db_iterator** parameter gives full access to all of the tensors and metrics stored by the aggregator. Participants can access these records to create advanced aggregation methods, hyperparameters for training, and novel selection logic for which collaborators should participant in a given training round. See below for details about how data is stored internally and a comprehensive set of examples. 

## Basic Form
Each record yielded by the db_iterator contains the following fields:

<table>
    <thead>
        <tr>
            <th colspan=3>TensorKey</th>
            <th colspan=3>|</th>
            <th>Tensor</th>
        </tr>
    </thead>
    <tbody>
        <tr>
            <td>'tensor_name'</td>
            <td>'origin'</td>
            <td>'round'</td>
            <td>'report'</td>
            <td>'tags'</td>
            <td>|</td>
            <td>'nparray'</td>
        </tr>
    </tbody>
</table>

All records are stored as a numpy array internally: model weights, metrics, as well as hyperparameters. 

Detailed field explanation:
- **'tensor_name'** (str): The 'tensor_name' field corresponds to the model layer name (i.e. 'conv2d'), or the name of the metric that has been reported by a collaborator (i.e. 'accuracy'). The built-in validation functions used for evaluation of the challenge will be given a prefix of 'challenge_metric_\*'. The names that you provide in conjunction with a custom validation metrics to the ```run_challenge_experiment``` function will remain unchanged.  
- **'origin'** (str): The origin denotes where the numpy array came from. Possible values are any of the collaborator names (i.e. 'col1'), or the aggregator.
- **'round'** (int): The round that produced the tensor. If your experiment has N rounds, possible values are 0->N-1
- **'report'** (boolean): This field is one of the ways that a metric can be denoted; For the purpose of aggregation, this field can be ignored.
- **'tags'** (tuple(str)): The tags include unstructured information that can be used to create complex data flows. For example, model layer weights will have the same 'tensor_name' and 'round' before and after training, so a tag of 'trained' is used to denote that the numpy array corresponds to the layer of a locally trained model. This field is also used to capture metric information. For example, aggregated_model_validation assigns tags of 'metric' and 'validate_agg' to reflect that the metric reported corresponds to the validation score of the latest aggregated model, whereas the tags of 'metric' 'validate_local' are used for metrics produced through validation after training on a collaborator's local data.   
- **'nparray'** (numpy array) : This contains the value of the tensor. May contain the model weights, metrics, or hyperparameters as a numpy array.


# Custom Collaborator Training Selection
By default, all collaborators will be selected for training each round, but you can easily add your own logic to select a different set of collaborators based on custom criteria. An example is provided below for selecting a single collaborator on odd rounds that had the fastest training time (one_collaborator_on_odd_rounds).

In [4]:
def all_collaborators_train(collaborators,
                            db_iterator,
                            fl_round,
                            collaborators_chosen_each_round,
                            collaborator_times_per_round):
    """Chooses which collaborators will train for a given round.
    
    Args:
        collaborators: list of strings of collaborator names
        db_iterator: iterator over history of all tensors.
            Columns: ['tensor_name', 'round', 'tags', 'nparray']
        fl_round: round number
        collaborators_chosen_each_round: a dictionary of {round: list of collaborators}. Each list indicates which collaborators trained in that given round.
        collaborator_times_per_round: a dictionary of {round: {collaborator: total_time_taken_in_round}}.  
    """
    return collaborators

# this is not a good algorithm, but we include it to demonstrate the following:
# MICAH TODO FILL IN
def one_collaborator_on_odd_rounds(collaborators,
                                   db_iterator,
                                   fl_round,
                                   collaborators_chosen_each_round,
                                   collaborator_times_per_round):
    """Chooses which collaborators will train for a given round.
    
    Args:
        collaborators: list of strings of collaborator names
        db_iterator: iterator over history of all tensors.
            Columns: ['tensor_name', 'round', 'tags', 'nparray']
        fl_round: round number
        collaborators_chosen_each_round: a dictionary of {round: list of collaborators}. Each list indicates which collaborators trained in that given round.
        collaborator_times_per_round: a dictionary of {round: {collaborator: total_time_taken_in_round}}.  
    """
    # on odd rounds, choose the fastest from the previous round
    if fl_round % 2 == 1:
        training_collaborators = None
        fastest_time = np.inf
        for col, t in collaborator_times_per_round[fl_round - 1].items():
            if t < fastest_time:
                fastest_time = t
                training_collaborators = [col]
    else:
        training_collaborators = collaborators
    return training_collaborators

# Custom hyperparameters for training

The training hyperparameters for a round should return a dictionary with **'epochs_per_round'** and **'learning_rate'** as keys. Anything specified otherwise will raise an exception. These hyperparameters are set for all the collaborators chosen for that round. For example, if you set epoch_per_round to 2.0, all collaborator selected based on the [collaborator training selection criteria](#Custom-Collaborator-Training-Selection) will train for two epochs. Different hyperparameters can be specified for collaborators for different rounds but they remain the same for all the collaborators that are chosen for that particular round. In simpler words, collaborators can not have different hyperparameters for the same round.

In [5]:
def wacky_hyper_parameters_for_round(collaborators,
                                        db_iterator, # this will actually contain the hyper-parameter history as well
                                        fl_round,
                                        collaborators_chosen_each_round,
                                        collaborator_times_per_round):
    """Set the training hyper-parameters for the round.
    
    Args:
        collaborators: list of strings of collaborator names
        db_iterator: iterator over history of all tensors.
            Columns: ['tensor_name', 'round', 'tags', 'nparray']
        fl_round: round number
        collaborators_chosen_each_round: a dictionary of {round: list of collaborators}. Each list indicates which collaborators trained in that given round.
        collaborator_times_per_round: a dictionary of {round: {collaborator: total_time_taken_in_round}}.  
    Returns:
        a dictionary with optional keys from:
            - epochs_per_round 
            - batches_per_round (takes priority over "epochs_per_round")
            - learning_rate
            - TODO: others???
    """
    # adaptive epochs
    if fl_round == 0:
        epochs = 0.5
    elif fl_round < 3:
        epochs = 1.5
    else:
        epochs = 1.0
        
    # useless round 0
    if fl_round == 0:
        learning_rate = 1e-10
    else:
        learning_rate = 1e-4
    
    # MICAH TODO: make them return everything always, but make epochs/batches mutually-exclusive.
    
    return {'epochs_per_round': epochs, 'learning_rate': learning_rate}

def training_hyper_parameters_for_round(collaborators,
                                        db_iterator, # this will actually contain the hyper-parameter history as well
                                        fl_round,
                                        collaborators_chosen_each_round,
                                        collaborator_times_per_round):
    """Set the training hyper-parameters for the round.
    
    Args:
        collaborators: list of strings of collaborator names
        db_iterator: iterator over history of all tensors.
            Columns: ['tensor_name', 'round', 'tags', 'nparray']
        fl_round: round number
        collaborators_chosen_each_round: a dictionary of {round: list of collaborators}. Each list indicates which collaborators trained in that given round.
        collaborator_times_per_round: a dictionary of {round: {collaborator: total_time_taken_in_round}}.  
    Returns:
        a dictionary with optional keys from:
            - epochs_per_round 
            - batches_per_round (takes priority over "epochs_per_round")
            - learning_rate
    """
    # this will use the defaults of
    # MICAH TODO
    return {}

# Custom Aggregation Functions
Standard aggregation methods allow for simple layer-wise combination (via weighted_mean, mean, median, etc.); however, more complex aggregation methods can be supported by evaluating collaborator metrics, weights from prior rounds, etc. User provided custom aggregation functions should implement the [**AggregationFunctionInterface**](https://github.com/intel/openfl/blob/fets/openfl/component/aggregation_functions/interface.py). 

[**LocalTensors**](https://github.com/intel/openfl/blob/fets/openfl/utilities/types.py#L13) are named tuples of the form ('collaborator_name', 'tensor', 'collaborator_weight'). Your custom aggregation function will be passed a list of LocalTensors, which will contain an entry for each collaborator who participated in the prior training round. The [**db_iterator**](#Getting-access-to-historical-weights,-metrics,-and-more) can be used to construct complex aggregation methods. A few examples are included below.

## db_iterator aggregation examples
### Using prior layer weights
Here is an example of how to extract layer weights from prior round. The tag is 'aggregated' indicates this : 
    
    for record in db_iterator:
            if (
                record['round'] == (fl_round - 1)
                and record['tensor_name'] == tensor_name
                and 'aggregated' in record['tags']
                and 'delta' not in record['tags']
               ):
                previous_tensor_value = record['nparray']
                break

### Using validation metrics for filtering

    threshold = fl_round * 0.3 + 0.5
    metric_name = 'acc'
    tags = ('metric','validate_agg')
    selected_tensors = []
    selected_weights = []
    for record in db_iterator:
        for local_tensor in local_tensors:
            tags = set(tags + [local_tensor.col_name])
            if (
                tags <= set(record['tags']) 
                and record['round'] == fl_round
                and record['tensor_name'] == metric_name
                and record['nparray'] >= threshold
            ):
                selected_tensors.append(local_tensor.tensor)
                selected_weights.append(local_tensor.weight)

In [6]:
from openfl.component.aggregation_functions import AggregationFunctionInterface
import numpy as np

class ClippedAveraging(AggregationFunctionInterface):
    def __init__(self, ratio):
        self.ratio = ratio
        
    def call(self,
             local_tensors,
             db_iterator,
             tensor_name,
             fl_round,
             *__):
        """Aggregate tensors.

        Args:
            local_tensors(list[openfl.utilities.LocalTensor]): List of local tensors to aggregate.
            db_iterator: iterator over history of all tensors.
                Columns: ['tensor_name', 'round', 'tags', 'nparray']
            tensor_name: name of the tensor
            fl_round: round number
            tags: tuple of tags for this tensor
        """
        clipped_tensors = []
        previous_tensor_value = None
        for record in db_iterator:
            if (
                record['round'] == (fl_round - 1)
                and record['tensor_name'] == tensor_name
                and 'aggregated' in record['tags']
                and 'delta' not in record['tags']
               ):
                previous_tensor_value = record['nparray']
                break
        weights = []
        for local_tensor in local_tensors:
            prev_tensor = previous_tensor_value if previous_tensor_value is not None else local_tensor.tensor
            delta = local_tensor.tensor - prev_tensor
            new_tensor = prev_tensor + clip(delta, self.ratio)
            clipped_tensors.append(new_tensor)
            weights.append(local_tensor.weight)

        return np.average(clipped_tensors, weights=weights, axis=0)

# Running the Experiment

```run_challenge_experiment``` is singular interface where your custom methods can be passed.

- ```aggregation_function```, ```choose_training_collaborators```, ```training_hyper_parameters_for_round```, and ```validation_functions``` correspond to the [this list](#Custom-hyperparameters-for-training) of configurable functions 
described within this notebook.
- ```institution_split_csv_filename``` : Describes how the data should be split between all collaborators. Extended documentation about configuring the splits in the ```institution_split_csv_filename``` parameter can be found in the [README.md](https://github.com/FETS-AI/Challenge/blob/main/Task_1/README.md). 
- ```db_store_rounds``` : This parameter determines how long metrics and weights should be stored by the aggregator before being deleted. Providing a value of -1 will result in all historical data being retained, but memory usage will likely increase.
- ```rounds_to_train``` : Defines how many rounds will occur in the experiment
- ```device``` : Which device to use for training and validation

In [7]:
run_challenge_experiment(aggregation_function=ClippedAveraging(ratio=1.0),
                         choose_training_collaborators=all_collaborators_train,
                         training_hyper_parameters_for_round=training_hyper_parameters_for_round,
                         validation_functions=[('acc', accuracy)],
                         institution_split_csv_filename='small_split.csv',
                         brats_training_data_parent_dir='/raid/datasets/BraTS20/MICCAI_BraTS2020_TrainingData',
                         db_store_rounds=5,
                         rounds_to_train=5,
                         device='cuda')

No 'TrainOrVal' column found in split_subdirs csv, so performing automated split using percent_train of 0.8


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  errors=errors,



out_conv will be using final activation:  sigmoid

out_conv will be using sigmoid_input_multiplier:  20.0



