Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task Assigner initial implementation #343

Merged

Conversation

aleksandr-mokrov
Copy link
Contributor

@aleksandr-mokrov aleksandr-mokrov commented Feb 18, 2022

Example of usage in Jupyter:
Getting registered tasks:

task_keeper = task_interface
tasks = task_keeper.get_registered_tasks()

Creating Random Assigner:

def random_assigner(collaborators, round_number, **kwargs):
    """Assigning task groups randomly while ensuring target distribution"""
    import random
    random.shuffle(collaborators)
    collaborator_task_map = {}
    for idx, col in enumerate(collaborators):
        # select only 30% collaborators for training and validation, 70% for validation
        if (idx+1)/len(collaborators) <= 0.7:
            collaborator_task_map[col] = tasks.values()
        else:
            collaborator_task_map[col] = [tasks['aggregated_model_validate']]
    return collaborator_task_map

or
creating filtering assigner:

def filter_assigner(collaborators, round_number, **kwargs):
    collaborator_task_map = {}
    exclude_collaborators = ['env_two', 'env_three']
    for collaborator in collaborators:
        if collaborator in exclude_collaborators:
            continue
        collaborator_task_map[collaborator] = [
            tasks['train'], 
            tasks['locally_tuned_model_validate'],
            tasks['aggregated_model_validate']
        ]
    return collaborator_task_map

or
creating filtering assigner by gpu info:

shard_registry = federation.get_shard_registry()

def filter_by_shard_registry_assigner(collaborators, round_number, **kwargs):
    collaborator_task_map = {}
    for collaborator in collaborators:
        col_status = shard_registry.get(collaborator)
        if not col_status or not col_status['is_online']:
            continue
        node_info = col_status['shard_info'].node_info
        # Assign train task if collaborator has GPU with total memory more that 8 GB
        if len(node_info.cuda_devices) > 0 and node_info.cuda_devices[0].memory_total > 8 * 1024**3:
            collaborator_task_map[collaborator] = [
                tasks['train'], 
                tasks['locally_tuned_model_validate'],
                tasks['aggregated_model_validate'],
            ]
        else:
            collaborator_task_map[collaborator] = [
                tasks['aggregated_model_validate'],
            ]
    return collaborator_task_map

or
create assigner with additional validation round

rounds_to_train = 3
total_rounds = rounds_to_train + 1 # fl_experiment.start(..., rounds_to_train=total_rounds,...)

def assigner_with_last_round_validation(collaborators, round_number, **kwargs):
    collaborator_task_map = {}
    for collaborator in collaborators:
        if round_number == total_rounds - 1:
            collaborator_task_map[collaborator] = [
                tasks['aggregated_model_validate'],
            ]
        else:
            collaborator_task_map[collaborator] = [
                tasks['train'], 
                tasks['locally_tuned_model_validate'],
                tasks['aggregated_model_validate']
            ]
    return collaborator_task_map

and then pass assigner into start experiment call(task_assigner key):

fl_experiment.start(
    model_provider=model_interface, 
    task_keeper=task_interface,
    data_loader=fed_dataset,
    task_assigner=assigner,
    rounds_to_train=1,
    opt_treatment='CONTINUE_GLOBAL',
    device_assignment_policy='CUDA_PREFERRED'
)

Future opportunities with that assigner function interface:
Filtering by collaborator status (shard_registry)

def filter_by_shard_registry_assigner(collaborators, round_number, **kwargs):
    shard_registry = kwargs.get('shard_registry')
    collaborator_task_map = {}
    for collaborator in collaborators:
        col_status = shard_registry.get(collaborator)
        if not col_status or not col_status['is_online']:
            continue
        collaborator_task_map[collaborator] = [
            tasks['train'], 
            tasks['locally_tuned_model_validate'],
            tasks['aggregated_model_validate']
        ]
    return collaborator_task_map


# There no enough information to understand that it's train or validate task
# Looks like we should provide task type.
kwargs = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing the kwargs resolution from the plan will break the task runner API. Maybe add an if statement for this logic:

    if hasattr(self.task_runner, 'TASK_REGISTRY'):
        func_name = task.function_name

        # There no enough information to understand that it's train or validate task
        # Looks like we should provide task type.
        kwargs = {}
        if func_name == 'validate':
            if task.is_local:
                kwargs['apply'] = 'local'
            else:
                kwargs['apply'] = 'global'
    else:
        func_name = self.task_config[task]['function']
        kwargs = self.task_config[task]['kwargs']

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

@alexey-gruzdev alexey-gruzdev linked an issue Feb 24, 2022 that may be closed by this pull request
@alexey-gruzdev alexey-gruzdev added the enhancement New feature or request label Feb 24, 2022
@aleksandr-mokrov aleksandr-mokrov marked this pull request as ready for review February 24, 2022 21:46
@@ -854,6 +857,7 @@ def _end_of_round_check(self):
if self._time_to_quit():
self.logger.info('Experiment Completed. Cleaning up...')
else:
self.round_number += 1
Copy link
Contributor

@psfoley psfoley Feb 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the round increment to the else statement will result in N+1 rounds being executed. This should be changed back

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, in fact difference only in logging. Real checking that it is time-time-quit happens in get_tasks function. I returned it back, but we should rewrite this part of code in the future.

@alexey-gruzdev alexey-gruzdev changed the title New assigner sketch Task Assigner initial implementation Feb 28, 2022
@alexey-gruzdev alexey-gruzdev added this to the v1.3 milestone Feb 28, 2022
@alexey-gruzdev alexey-gruzdev merged commit 9dcf0d4 into securefederatedai:develop Feb 28, 2022
@github-actions github-actions bot locked and limited conversation to collaborators Feb 28, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Task Assigner Entity
5 participants