diff --git a/openfl/component/director/director.py b/openfl/component/director/director.py index 6dedd0dce5..0ac6d70147 100644 --- a/openfl/component/director/director.py +++ b/openfl/component/director/director.py @@ -5,8 +5,6 @@ import asyncio import logging -import os -import shutil import time import typing from collections import defaultdict @@ -14,6 +12,7 @@ from openfl.federated import Plan from openfl.protocols import director_pb2 +from openfl.utilities.workspace import ExperimentWorkspace logger = logging.getLogger(__name__) @@ -71,13 +70,12 @@ async def set_new_experiment( # TODO: save to file self.experiment_data[experiment_name] = data_file_path - self.create_workspace(experiment_name, data_file_path) - asyncio.create_task(self._run_aggregator( + asyncio.create_task(self._run_aggregator_in_workspace( + experiment_name=experiment_name, + data_file_name=data_file_path, experiment_sender=sender_name, initial_tensor_dict=tensor_dict, - experiment_name=experiment_name, collaborator_names=collaborator_names, - data_file_name=data_file_path )) logger.info(f'New experiment {experiment_name} for ' @@ -197,16 +195,16 @@ def get_envoys(self) -> list: return envoy_infos - @staticmethod - def create_workspace(experiment_name: str, data_file_path: Path): - """Create the aggregator workspace.""" - if os.path.exists(experiment_name): - shutil.rmtree(experiment_name) - os.makedirs(experiment_name) - arch_name = f'{experiment_name}/{experiment_name}' + '.zip' - shutil.copy(data_file_path, arch_name) - shutil.unpack_archive(arch_name, experiment_name) - os.remove(arch_name) + async def _run_aggregator_in_workspace( + self, + *, + experiment_name: str, + data_file_name: Path, + **kwargs + ) -> None: + """Run aggregator in a workspace.""" + with ExperimentWorkspace(experiment_name, data_file_name): + await self._run_aggregator(experiment_name=experiment_name, **kwargs) async def _run_aggregator( self, @@ -215,14 +213,10 @@ async def _run_aggregator( initial_tensor_dict, experiment_name, collaborator_names, - data_file_name, plan_path='plan/plan.yaml' - ): + ) -> None: """Run aggregator.""" - cwd = os.getcwd() - os.chdir(f'{cwd}/{experiment_name}') plan = Plan.parse(plan_config_path=Path(plan_path)) - plan.authorized_cols = list(collaborator_names) logger.info('🧿 Starting the Aggregator Service.') @@ -246,9 +240,6 @@ async def _run_aggregator( except KeyboardInterrupt: pass finally: - os.chdir(cwd) - shutil.rmtree(experiment_name) - os.remove(data_file_name) grpc_server.stop(0) # Temporary solution to free RAM used by TensorDB aggregator_server.aggregator.tensor_db.clean_up(0) diff --git a/openfl/component/envoy/envoy.py b/openfl/component/envoy/envoy.py index de3b036779..2fabc75029 100644 --- a/openfl/component/envoy/envoy.py +++ b/openfl/component/envoy/envoy.py @@ -4,9 +4,8 @@ """Envoy module.""" import logging -import os -import sys import time +import uuid from concurrent.futures import ThreadPoolExecutor from pathlib import Path @@ -14,6 +13,7 @@ from openfl.federated import Plan from openfl.transport.grpc.director_client import ShardDirectorClient +from openfl.utilities.workspace import ExperimentWorkspace logger = logging.getLogger(__name__) @@ -49,21 +49,36 @@ def run(self): while True: try: # Workspace import should not be done by gRPC client! - experiment_name = self.director_client.get_experiment_data() + experiment_name = self.director_client.wait_experiment() + data_stream = self.director_client.get_experiment_data(experiment_name) except Exception as exc: logger.error(f'Failed to get experiment: {exc}') time.sleep(DEFAULT_RETRY_TIMEOUT_IN_SECONDS) continue + data_file_path = self._save_data_stream_to_file(data_stream) self.is_experiment_running = True try: - self._run_collaborator(experiment_name) + with ExperimentWorkspace( + experiment_name, data_file_path, is_install_requirements=True + ): + self._run_collaborator(experiment_name) except Exception as exc: logger.error(f'Collaborator failed: {exc}') finally: # Workspace cleaning should not be done by gRPC client! - self.director_client.remove_workspace(experiment_name) self.is_experiment_running = False + @staticmethod + def _save_data_stream_to_file(data_stream): + data_file_path = Path(str(uuid.uuid4())).absolute() + with open(data_file_path, 'wb') as data_file: + for response in data_stream: + if response.size == len(response.npbytes): + data_file.write(response.npbytes) + else: + raise Exception('Broken archive') + return data_file_path + def send_health_check(self): """Send health check to the director.""" logger.info('The health check sender is started.') @@ -77,12 +92,6 @@ def send_health_check(self): def _run_collaborator(self, experiment_name, plan='plan/plan.yaml', ): """Run the collaborator for the experiment running.""" - cwd = os.getcwd() - os.chdir(f'{cwd}/{experiment_name}') # TODO: probably it should be another way - - # This is needed for python module finder - sys.path.append(os.getcwd()) - plan = Plan.parse(plan_config_path=Path(plan)) # TODO: Need to restructure data loader config file loader @@ -91,10 +100,7 @@ def _run_collaborator(self, experiment_name, plan='plan/plan.yaml', ): col = plan.get_collaborator(self.name, self.root_certificate, self.private_key, self.certificate, shard_descriptor=self.shard_descriptor) - try: - col.run() - finally: - os.chdir(cwd) + col.run() def start(self): """Start the envoy.""" diff --git a/openfl/interface/interactive_api/experiment.py b/openfl/interface/interactive_api/experiment.py index 01aa86d059..15521344f6 100644 --- a/openfl/interface/interactive_api/experiment.py +++ b/openfl/interface/interactive_api/experiment.py @@ -296,10 +296,8 @@ def _prepare_plan(self, model_provider, task_keeper, data_loader, # TaskRunner framework plugin # ['required_plugin_components'] should be already in the default plan with all the fields # filled with the default values - plan.config['task_runner'] = { - 'required_plugin_components': { - 'framework_adapters': model_provider.framework_plugin - } + plan.config['task_runner']['required_plugin_components'] = { + 'framework_adapters': model_provider.framework_plugin } # API layer diff --git a/openfl/transport/grpc/director_client.py b/openfl/transport/grpc/director_client.py index a21485d29b..eaf3b3ffae 100644 --- a/openfl/transport/grpc/director_client.py +++ b/openfl/transport/grpc/director_client.py @@ -4,12 +4,7 @@ """Director clients module.""" import logging -import os -import shutil -import time from datetime import datetime -from subprocess import check_call -from sys import executable import grpc @@ -69,8 +64,8 @@ def report_shard_info(self, shard_descriptor) -> bool: acknowledgement = self.stub.AcknowledgeShard(shard_info) return acknowledgement.accepted - def get_experiment_data(self): - """Get an experiment data from the director.""" + def wait_experiment(self): + """Wait an experiment data from the director.""" logger.info('Send WaitExperiment request') response_iter = self.stub.WaitExperiment(self._get_experiment_data()) logger.info('WaitExperiment response has received') @@ -78,58 +73,19 @@ def get_experiment_data(self): experiment_name = response.experiment_name if not experiment_name: raise Exception('No experiment') + + return experiment_name + + def get_experiment_data(self, experiment_name): + """Get an experiment data from the director.""" logger.info(f'Request experiment {experiment_name}') request = director_pb2.GetExperimentDataRequest( experiment_name=experiment_name, collaborator_name=self.shard_name ) - response_iter = self.stub.GetExperimentData(request) + data_stream = self.stub.GetExperimentData(request) - self.create_workspace(experiment_name, response_iter) - - return experiment_name - - @staticmethod - def remove_workspace(experiment_name): - """Remove the workspace.""" - shutil.rmtree(experiment_name, ignore_errors=True) - - @staticmethod - def create_workspace(experiment_name, response_iter): - """Create a collaborator workspace for the experiment.""" - if os.path.exists(experiment_name): - shutil.rmtree(experiment_name) - os.makedirs(experiment_name) - - arch_name = f'{experiment_name}/{experiment_name}' + '.zip' - logger.info(f'arch_name: {arch_name}') - with open(arch_name, 'wb') as content_file: - for response in response_iter: - logger.info(f'Size: {response.size}') - if response.size == len(response.npbytes): - content_file.write(response.npbytes) - else: - raise Exception('Broken archive') - - shutil.unpack_archive(arch_name, experiment_name) - os.remove(arch_name) - - requirements_filename = f'./{experiment_name}/requirements.txt' - - if os.path.isfile(requirements_filename): - attempts = 3 - for _ in range(attempts): - try: - check_call([ - executable, '-m', 'pip', 'install', '-r', requirements_filename], - shell=False) - except Exception as exc: - logger.error(f'Failed to install requirements: {exc}') - time.sleep(3) - else: - break - else: - logger.error('No ' + requirements_filename + ' file found.') + return data_stream def _get_experiment_data(self): """Generate the experiment data request.""" diff --git a/openfl/utilities/utils.py b/openfl/utilities/utils.py index 450d6b4335..ff9d461dc7 100644 --- a/openfl/utilities/utils.py +++ b/openfl/utilities/utils.py @@ -5,7 +5,6 @@ import hashlib import logging import os -import shutil from socket import getfqdn import numpy as np @@ -53,36 +52,6 @@ def log_to_root(message, *args, **kwargs): setattr(logging, method_name, log_to_root) -class UnpackWorkspace: - """Workspace context manager. - - It unpacks the archive at the begining of the experiment and removes the archive. - Then in moving to the workspace's folder. - Exiting it removes the workspace - """ - - def __init__(self, archive_path) -> None: - """Initialize workspace context manager.""" - self.cwd = os.getcwd() - - self.workspace_name = os.path.basename(archive_path).split('.')[0] - if os.path.exists(self.workspace_name): - shutil.rmtree(self.workspace_name) - os.makedirs(self.workspace_name) - shutil.unpack_archive(archive_path, self.workspace_name) - - os.remove(archive_path) - - def __enter__(self): - """Enter workspace context manager.""" - os.chdir(f'{self.cwd}/{self.workspace_name}') - - def __exit__(self, exc_type, exc_value, traceback): - """Exit workspace context manager.""" - os.chdir(self.cwd) - shutil.rmtree(self.workspace_name) - - def split_tensor_dict_into_floats_and_non_floats(tensor_dict): """ Split the tensor dictionary into float and non-floating point values. diff --git a/openfl/utilities/workspace.py b/openfl/utilities/workspace.py new file mode 100644 index 0000000000..77a6c345d4 --- /dev/null +++ b/openfl/utilities/workspace.py @@ -0,0 +1,77 @@ +# Copyright (C) 2020-2021 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +"""Workspace utils module.""" + +import logging +import os +import shutil +import sys +import time +from pathlib import Path +from subprocess import check_call +from sys import executable + +logger = logging.getLogger(__name__) + + +class ExperimentWorkspace: + """Experiment workspace context manager.""" + + def __init__( + self, + experiment_name: str, + data_file_path: Path, + is_install_requirements: bool = False + ) -> None: + """Initialize workspace context manager.""" + self.experiment_name = experiment_name + self.data_file_path = data_file_path + self.cwd = os.getcwd() + self.experiment_work_dir = f'{self.cwd}/{self.experiment_name}' + self.is_install_requirements = is_install_requirements + + def _install_requirements(self): + """Install experiment requirements.""" + requirements_filename = f'{self.experiment_work_dir}/requirements.txt' + + if os.path.isfile(requirements_filename): + attempts = 10 + for _ in range(attempts): + try: + check_call([ + executable, '-m', 'pip', 'install', '-r', requirements_filename], + shell=False) + except Exception as exc: + logger.error(f'Failed to install requirements: {exc}') + # It's a workaround for cases when collaborators run + # in common virtual environment + time.sleep(5) + else: + break + else: + logger.error('No ' + requirements_filename + ' file found.') + + def __enter__(self): + """Create a collaborator workspace for the experiment.""" + if os.path.exists(self.experiment_work_dir): + shutil.rmtree(self.experiment_work_dir) + os.makedirs(self.experiment_work_dir) + + shutil.unpack_archive(self.data_file_path, self.experiment_work_dir, format='zip') + + if self.is_install_requirements: + self._install_requirements() + + os.chdir(self.experiment_work_dir) + + # This is needed for python module finder + sys.path.append(self.experiment_work_dir) + + def __exit__(self, exc_type, exc_value, traceback): + """Remove the workspace.""" + os.chdir(self.cwd) + shutil.rmtree(self.experiment_name, ignore_errors=True) + if self.experiment_work_dir in sys.path: + sys.path.remove(self.experiment_work_dir) + os.remove(self.data_file_path)