Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workspace context manager #148

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
39 changes: 15 additions & 24 deletions openfl/component/director/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@

import asyncio
import logging
import os
import shutil
import time
import typing
from collections import defaultdict
from pathlib import Path

from openfl.federated import Plan
from openfl.protocols import director_pb2
from openfl.utilities.workspace import ExperimentWorkspace

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,13 +70,12 @@ async def set_new_experiment(
# TODO: save to file
self.experiment_data[experiment_name] = data_file_path

self.create_workspace(experiment_name, data_file_path)
asyncio.create_task(self._run_aggregator(
asyncio.create_task(self._run_aggregator_in_workspace(
experiment_name=experiment_name,
data_file_name=data_file_path,
experiment_sender=sender_name,
initial_tensor_dict=tensor_dict,
experiment_name=experiment_name,
collaborator_names=collaborator_names,
data_file_name=data_file_path
))

logger.info(f'New experiment {experiment_name} for '
Expand Down Expand Up @@ -197,16 +195,16 @@ def get_envoys(self) -> list:

return envoy_infos

@staticmethod
def create_workspace(experiment_name: str, data_file_path: Path):
"""Create the aggregator workspace."""
if os.path.exists(experiment_name):
shutil.rmtree(experiment_name)
os.makedirs(experiment_name)
arch_name = f'{experiment_name}/{experiment_name}' + '.zip'
shutil.copy(data_file_path, arch_name)
shutil.unpack_archive(arch_name, experiment_name)
os.remove(arch_name)
async def _run_aggregator_in_workspace(
self,
*,
experiment_name: str,
data_file_name: Path,
**kwargs
) -> None:
"""Run aggregator in a workspace."""
with ExperimentWorkspace(experiment_name, data_file_name):
await self._run_aggregator(experiment_name=experiment_name, **kwargs)

async def _run_aggregator(
self,
Expand All @@ -215,14 +213,10 @@ async def _run_aggregator(
initial_tensor_dict,
experiment_name,
collaborator_names,
data_file_name,
plan_path='plan/plan.yaml'
):
) -> None:
"""Run aggregator."""
cwd = os.getcwd()
os.chdir(f'{cwd}/{experiment_name}')
plan = Plan.parse(plan_config_path=Path(plan_path))

plan.authorized_cols = list(collaborator_names)

logger.info('🧿 Starting the Aggregator Service.')
Expand All @@ -246,9 +240,6 @@ async def _run_aggregator(
except KeyboardInterrupt:
pass
finally:
os.chdir(cwd)
shutil.rmtree(experiment_name)
os.remove(data_file_name)
grpc_server.stop(0)
# Temporary solution to free RAM used by TensorDB
aggregator_server.aggregator.tensor_db.clean_up(0)
36 changes: 21 additions & 15 deletions openfl/component/envoy/envoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@
"""Envoy module."""

import logging
import os
import sys
import time
import uuid
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

from click import echo

from openfl.federated import Plan
from openfl.transport.grpc.director_client import ShardDirectorClient
from openfl.utilities.workspace import ExperimentWorkspace

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -47,21 +47,36 @@ def run(self):
while True:
try:
# Workspace import should not be done by gRPC client!
experiment_name = self.director_client.get_experiment_data()
experiment_name = self.director_client.wait_experiment()
data_stream = self.director_client.get_experiment_data(experiment_name)
except Exception as exc:
logger.error(f'Failed to get experiment: {exc}')
time.sleep(DEFAULT_RETRY_TIMEOUT_IN_SECONDS)
continue
data_file_path = self._save_data_stream_to_file(data_stream)
self.is_experiment_running = True
try:
self._run_collaborator(experiment_name)
with ExperimentWorkspace(
experiment_name, data_file_path, is_install_requirements=True
):
Comment on lines +61 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe rename is_install_requirements => requirements in order to avoid so long line.

self._run_collaborator(experiment_name)
except Exception as exc:
logger.error(f'Collaborator failed: {exc}')
finally:
# Workspace cleaning should not be done by gRPC client!
self.director_client.remove_workspace(experiment_name)
self.is_experiment_running = False

@staticmethod
def _save_data_stream_to_file(data_stream):
data_file_path = Path(str(uuid.uuid4())).absolute()
with open(data_file_path, 'wb') as data_file:
for response in data_stream:
if response.size == len(response.npbytes):
data_file.write(response.npbytes)
else:
raise Exception('Broken archive')
return data_file_path

def send_health_check(self):
"""Send health check to the director."""
logger.info('The health check sender is started.')
Expand All @@ -75,12 +90,6 @@ def send_health_check(self):

def _run_collaborator(self, experiment_name, plan='plan/plan.yaml',):
"""Run the collaborator for the experiment running."""
cwd = os.getcwd()
os.chdir(f'{cwd}/{experiment_name}') # TODO: probably it should be another way

# This is needed for python module finder
sys.path.append(os.getcwd())

plan = Plan.parse(plan_config_path=Path(plan))

# TODO: Need to restructure data loader config file loader
Expand All @@ -89,10 +98,7 @@ def _run_collaborator(self, experiment_name, plan='plan/plan.yaml',):

col = plan.get_collaborator(self.name, self.root_ca, self.key,
self.cert, shard_descriptor=self.shard_descriptor)
try:
col.run()
finally:
os.chdir(cwd)
col.run()

def start(self):
"""Start the envoy."""
Expand Down
6 changes: 2 additions & 4 deletions openfl/interface/interactive_api/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,8 @@ def _prepare_plan(self, model_provider, task_keeper, data_loader,
# TaskRunner framework plugin
# ['required_plugin_components'] should be already in the default plan with all the fields
# filled with the default values
plan.config['task_runner'] = {
'required_plugin_components': {
'framework_adapters': model_provider.framework_plugin
}
plan.config['task_runner']['required_plugin_components'] = {
'framework_adapters': model_provider.framework_plugin
}

# API layer
Expand Down
62 changes: 9 additions & 53 deletions openfl/transport/grpc/director_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@
"""Director clients module."""

import logging
import os
import shutil
import time
from datetime import datetime
from subprocess import check_call
from sys import executable

import grpc

Expand Down Expand Up @@ -69,67 +64,28 @@ def report_shard_info(self, shard_descriptor) -> bool:
acknowledgement = self.stub.AcknowledgeShard(shard_info)
return acknowledgement.accepted

def get_experiment_data(self):
"""Get an experiment data from the director."""
def wait_experiment(self):
"""Wait an experiment data from the director."""
Comment on lines +67 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe wait_experiment should be get_experiment_name?

logger.info('Send WaitExperiment request')
response_iter = self.stub.WaitExperiment(self._get_experiment_data())
logger.info('WaitExperiment response has received')
response = next(response_iter)
experiment_name = response.experiment_name
if not experiment_name:
raise Exception('No experiment')

return experiment_name

def get_experiment_data(self, experiment_name):
"""Get an experiment data from the director."""
logger.info(f'Request experiment {experiment_name}')
request = director_pb2.GetExperimentDataRequest(
experiment_name=experiment_name,
collaborator_name=self.shard_name
)
response_iter = self.stub.GetExperimentData(request)
data_stream = self.stub.GetExperimentData(request)

self.create_workspace(experiment_name, response_iter)

return experiment_name

@staticmethod
def remove_workspace(experiment_name):
"""Remove the workspace."""
shutil.rmtree(experiment_name, ignore_errors=True)

@staticmethod
def create_workspace(experiment_name, response_iter):
"""Create a collaborator workspace for the experiment."""
if os.path.exists(experiment_name):
shutil.rmtree(experiment_name)
os.makedirs(experiment_name)

arch_name = f'{experiment_name}/{experiment_name}' + '.zip'
logger.info(f'arch_name: {arch_name}')
with open(arch_name, 'wb') as content_file:
for response in response_iter:
logger.info(f'Size: {response.size}')
if response.size == len(response.npbytes):
content_file.write(response.npbytes)
else:
raise Exception('Broken archive')

shutil.unpack_archive(arch_name, experiment_name)
os.remove(arch_name)

requirements_filename = f'./{experiment_name}/requirements.txt'

if os.path.isfile(requirements_filename):
attempts = 3
for _ in range(attempts):
try:
check_call([
executable, '-m', 'pip', 'install', '-r', requirements_filename],
shell=False)
except Exception as exc:
logger.error(f'Failed to install requirements: {exc}')
time.sleep(3)
else:
break
else:
logger.error('No ' + requirements_filename + ' file found.')
return data_stream

def _get_experiment_data(self):
"""Generate the experiment data request."""
Expand Down
31 changes: 0 additions & 31 deletions openfl/utilities/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import hashlib
import logging
import os
import shutil
from socket import getfqdn

import numpy as np
Expand Down Expand Up @@ -53,36 +52,6 @@ def log_to_root(message, *args, **kwargs):
setattr(logging, method_name, log_to_root)


class UnpackWorkspace:
"""Workspace context manager.

It unpacks the archive at the begining of the experiment and removes the archive.
Then in moving to the workspace's folder.
Exiting it removes the workspace
"""

def __init__(self, archive_path) -> None:
"""Initialize workspace context manager."""
self.cwd = os.getcwd()

self.workspace_name = os.path.basename(archive_path).split('.')[0]
if os.path.exists(self.workspace_name):
shutil.rmtree(self.workspace_name)
os.makedirs(self.workspace_name)
shutil.unpack_archive(archive_path, self.workspace_name)

os.remove(archive_path)

def __enter__(self):
"""Enter workspace context manager."""
os.chdir(f'{self.cwd}/{self.workspace_name}')

def __exit__(self, exc_type, exc_value, traceback):
"""Exit workspace context manager."""
os.chdir(self.cwd)
shutil.rmtree(self.workspace_name)


def split_tensor_dict_into_floats_and_non_floats(tensor_dict):
"""
Split the tensor dictionary into float and non-floating point values.
Expand Down
77 changes: 77 additions & 0 deletions openfl/utilities/workspace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Workspace utils module."""

import logging
import os
import shutil
import sys
import time
from pathlib import Path
from subprocess import check_call
from sys import executable

logger = logging.getLogger(__name__)


class ExperimentWorkspace:
"""Experiment workspace context manager."""

def __init__(
self,
experiment_name: str,
data_file_path: Path,
is_install_requirements: bool = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is_install_requirements -> install_requirements
probably this will look better

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's boolean flag. It will be strange:

if self.install_requirements:
            self._install_requirements()

Probably is_install_requirements_required would be better, but too long.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that requirements will be better. You still can rename it inside __init__.

) -> None:
"""Initialize workspace context manager."""
self.experiment_name = experiment_name
self.data_file_path = data_file_path
self.cwd = os.getcwd()
self.experiment_work_dir = f'{self.cwd}/{self.experiment_name}'
self.is_install_requirements = is_install_requirements

def _install_requirements(self):
"""Install experiment requirements."""
requirements_filename = f'{self.experiment_work_dir}/requirements.txt'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to use pathlib.Path here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is used in 3 places in this method, some of them are converted to a string, and some must be converted to a string manually. None of them use type Path.


if os.path.isfile(requirements_filename):
attempts = 10
for _ in range(attempts):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aleksandr-mokrov Shouldn't we raise an exception when all attempts to install the requirements failed?

try:
check_call([
executable, '-m', 'pip', 'install', '-r', requirements_filename],
shell=False)
except Exception as exc:
logger.error(f'Failed to install requirements: {exc}')
# It's a workaround for cases when collaborators run
# in common virtual environment
time.sleep(5)
else:
break
else:
logger.error('No ' + requirements_filename + ' file found.')

def __enter__(self):
"""Create a collaborator workspace for the experiment."""
if os.path.exists(self.experiment_work_dir):
shutil.rmtree(self.experiment_work_dir)
os.makedirs(self.experiment_work_dir)

shutil.unpack_archive(self.data_file_path, self.experiment_work_dir, format='zip')

if self.is_install_requirements:
self._install_requirements()

os.chdir(self.experiment_work_dir)

# This is needed for python module finder
sys.path.append(self.experiment_work_dir)

def __exit__(self, exc_type, exc_value, traceback):
"""Remove the workspace."""
os.chdir(self.cwd)
shutil.rmtree(self.experiment_name, ignore_errors=True)
if self.experiment_work_dir in sys.path:
sys.path.remove(self.experiment_work_dir)
os.remove(self.data_file_path)