Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workspace context manager #148

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 15 additions & 24 deletions openfl/component/director/director.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,14 @@

import asyncio
import logging
import os
import shutil
import time
import typing
from collections import defaultdict
from pathlib import Path

from openfl.federated import Plan
from openfl.protocols import director_pb2
from .workspace import AggregatorWorkspace

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,13 +70,12 @@ async def set_new_experiment(
# TODO: save to file
self.experiment_data[experiment_name] = data_file_path

self.create_workspace(experiment_name, data_file_path)
asyncio.create_task(self._run_aggregator(
asyncio.create_task(self._run_aggregator_in_workspace(
experiment_name=experiment_name,
data_file_name=data_file_path,
experiment_sender=sender_name,
initial_tensor_dict=tensor_dict,
experiment_name=experiment_name,
collaborator_names=collaborator_names,
data_file_name=data_file_path
))

logger.info(f'New experiment {experiment_name} for '
Expand Down Expand Up @@ -197,16 +195,16 @@ def get_envoys(self) -> list:

return envoy_infos

@staticmethod
def create_workspace(experiment_name: str, data_file_path: Path):
"""Create the aggregator workspace."""
if os.path.exists(experiment_name):
shutil.rmtree(experiment_name)
os.makedirs(experiment_name)
arch_name = f'{experiment_name}/{experiment_name}' + '.zip'
shutil.copy(data_file_path, arch_name)
shutil.unpack_archive(arch_name, experiment_name)
os.remove(arch_name)
async def _run_aggregator_in_workspace(
self,
*,
experiment_name: str,
data_file_name: Path,
**kwargs
) -> None:
"""Run aggregator in a workspace."""
with AggregatorWorkspace(experiment_name, data_file_name):
await self._run_aggregator(experiment_name=experiment_name, **kwargs)

async def _run_aggregator(
self,
Expand All @@ -215,14 +213,10 @@ async def _run_aggregator(
initial_tensor_dict,
experiment_name,
collaborator_names,
data_file_name,
plan_path='plan/plan.yaml'
):
) -> None:
"""Run aggregator."""
cwd = os.getcwd()
os.chdir(f'{cwd}/{experiment_name}')
plan = Plan.parse(plan_config_path=Path(plan_path))

plan.authorized_cols = list(collaborator_names)

logger.info('🧿 Starting the Aggregator Service.')
Expand All @@ -246,9 +240,6 @@ async def _run_aggregator(
except KeyboardInterrupt:
pass
finally:
os.chdir(cwd)
shutil.rmtree(experiment_name)
os.remove(data_file_name)
grpc_server.stop(0)
# Temporary solution to free RAM used by TensorDB
aggregator_server.aggregator.tensor_db.clean_up(0)
42 changes: 42 additions & 0 deletions openfl/component/director/workspace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Workspace module."""

import os
import shutil
from pathlib import Path


class AggregatorWorkspace:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest moving this away from the components package, to the utilities probably

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to utilities/workspace.py

"""Aggregator workspace context manager."""

def __init__(self, experiment_name: str, data_file_path: Path) -> None:
"""Initialize workspace context manager."""
self.experiment_name = experiment_name
self.data_file_path = data_file_path
self.cwd = os.getcwd()
self.experiment_work_dir = f'{self.cwd}/{self.experiment_name}'

def _get_experiment_data(self):
"""Copy experiment data file and extract experiment data."""
arch_name = f'{self.experiment_name}/{self.experiment_name}' + '.zip'
shutil.copy(self.data_file_path, arch_name)
shutil.unpack_archive(arch_name, self.experiment_name)
os.remove(arch_name)

def __enter__(self):
"""Prepare collaborator workspace."""
if os.path.exists(self.experiment_name):
shutil.rmtree(self.experiment_name)
os.makedirs(self.experiment_name)

self._get_experiment_data()

os.chdir(self.experiment_work_dir)

def __exit__(self, exc_type, exc_value, traceback):
"""Remove the workspace."""
os.chdir(self.cwd)
shutil.rmtree(self.experiment_name)
os.remove(self.data_file_path)
21 changes: 6 additions & 15 deletions openfl/component/envoy/envoy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
"""Envoy module."""

import logging
import os
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
Expand All @@ -14,6 +12,7 @@

from openfl.federated import Plan
from openfl.transport.grpc.director_client import ShardDirectorClient
from .workspace import CollaboratorWorkspace

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -47,19 +46,20 @@ def run(self):
while True:
try:
# Workspace import should not be done by gRPC client!
experiment_name = self.director_client.get_experiment_data()
experiment_name = self.director_client.wait_experiment()
data_stream = self.director_client.get_experiment_data(experiment_name)
except Exception as exc:
logger.error(f'Failed to get experiment: {exc}')
time.sleep(DEFAULT_RETRY_TIMEOUT_IN_SECONDS)
continue
self.is_experiment_running = True
try:
self._run_collaborator(experiment_name)
with CollaboratorWorkspace(experiment_name, data_stream):
self._run_collaborator(experiment_name)
except Exception as exc:
logger.error(f'Collaborator failed: {exc}')
finally:
# Workspace cleaning should not be done by gRPC client!
self.director_client.remove_workspace(experiment_name)
self.is_experiment_running = False

def send_health_check(self):
Expand All @@ -75,12 +75,6 @@ def send_health_check(self):

def _run_collaborator(self, experiment_name, plan='plan/plan.yaml',):
"""Run the collaborator for the experiment running."""
cwd = os.getcwd()
os.chdir(f'{cwd}/{experiment_name}') # TODO: probably it should be another way

# This is needed for python module finder
sys.path.append(os.getcwd())

plan = Plan.parse(plan_config_path=Path(plan))

# TODO: Need to restructure data loader config file loader
Expand All @@ -89,10 +83,7 @@ def _run_collaborator(self, experiment_name, plan='plan/plan.yaml',):

col = plan.get_collaborator(self.name, self.root_ca, self.key,
self.cert, shard_descriptor=self.shard_descriptor)
try:
col.run()
finally:
os.chdir(cwd)
col.run()

def start(self):
"""Start the envoy."""
Expand Down
79 changes: 79 additions & 0 deletions openfl/component/envoy/workspace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright (C) 2020-2021 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""Workspace module."""

import logging
import os
import shutil
import sys
import time
from subprocess import check_call
from sys import executable

logger = logging.getLogger(__name__)


class CollaboratorWorkspace:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest moving this away from the components package too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved to utilities/workspace.py

"""Aggregator workspace context manager."""

def __init__(self, experiment_name, data_stream):
"""Initialize workspace context manager."""
self.experiment_name = experiment_name
self.data_stream = data_stream
self.cwd = os.getcwd()
self.experiment_work_dir = f'{self.cwd}/{self.experiment_name}'

def _get_experiment_data(self):
"""Download experiment data file and extract experiment data."""
arch_name = f'{self.experiment_name}/{self.experiment_name}' + '.zip'
with open(arch_name, 'wb') as content_file:
for response in self.data_stream:
if response.size == len(response.npbytes):
content_file.write(response.npbytes)
else:
raise Exception('Broken archive')
shutil.unpack_archive(arch_name, self.experiment_name)
os.remove(arch_name)

def _install_requirements(self):
"""Install experiment requirements."""
requirements_filename = f'./{self.experiment_name}/requirements.txt'

if os.path.isfile(requirements_filename):
attempts = 10
for _ in range(attempts):
try:
check_call([
executable, '-m', 'pip', 'install', '-r', requirements_filename],
shell=False)
except Exception as exc:
logger.error(f'Failed to install requirements: {exc}')
# It's a workaround for cases when collaborators run
# in common virtual environment
time.sleep(5)
else:
break
else:
logger.error('No ' + requirements_filename + ' file found.')

def __enter__(self):
"""Create a collaborator workspace for the experiment."""
if os.path.exists(self.experiment_name):
shutil.rmtree(self.experiment_name)
os.makedirs(self.experiment_name)

self._get_experiment_data()
self._install_requirements()

os.chdir(self.experiment_work_dir)

# This is needed for python module finder
sys.path.append(self.experiment_work_dir)

def __exit__(self, exc_type, exc_value, traceback):
"""Remove the workspace."""
os.chdir(self.cwd)
shutil.rmtree(self.experiment_name, ignore_errors=True)
if self.experiment_work_dir in sys.path:
sys.path.remove(self.experiment_work_dir)
6 changes: 2 additions & 4 deletions openfl/interface/interactive_api/experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -296,10 +296,8 @@ def _prepare_plan(self, model_provider, task_keeper, data_loader,
# TaskRunner framework plugin
# ['required_plugin_components'] should be already in the default plan with all the fields
# filled with the default values
plan.config['task_runner'] = {
'required_plugin_components': {
'framework_adapters': model_provider.framework_plugin
}
plan.config['task_runner']['required_plugin_components'] = {
'framework_adapters': model_provider.framework_plugin
}

# API layer
Expand Down
62 changes: 9 additions & 53 deletions openfl/transport/grpc/director_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@
"""Director clients module."""

import logging
import os
import shutil
import time
from datetime import datetime
from subprocess import check_call
from sys import executable

import grpc

Expand Down Expand Up @@ -69,67 +64,28 @@ def report_shard_info(self, shard_descriptor) -> bool:
acknowledgement = self.stub.AcknowledgeShard(shard_info)
return acknowledgement.accepted

def get_experiment_data(self):
"""Get an experiment data from the director."""
def wait_experiment(self):
"""Wait an experiment data from the director."""
Comment on lines +67 to +68
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe wait_experiment should be get_experiment_name?

logger.info('Send WaitExperiment request')
response_iter = self.stub.WaitExperiment(self._get_experiment_data())
logger.info('WaitExperiment response has received')
response = next(response_iter)
experiment_name = response.experiment_name
if not experiment_name:
raise Exception('No experiment')

return experiment_name

def get_experiment_data(self, experiment_name):
"""Get an experiment data from the director."""
logger.info(f'Request experiment {experiment_name}')
request = director_pb2.GetExperimentDataRequest(
experiment_name=experiment_name,
collaborator_name=self.shard_name
)
response_iter = self.stub.GetExperimentData(request)
data_stream = self.stub.GetExperimentData(request)

self.create_workspace(experiment_name, response_iter)

return experiment_name

@staticmethod
def remove_workspace(experiment_name):
"""Remove the workspace."""
shutil.rmtree(experiment_name, ignore_errors=True)

@staticmethod
def create_workspace(experiment_name, response_iter):
"""Create a collaborator workspace for the experiment."""
if os.path.exists(experiment_name):
shutil.rmtree(experiment_name)
os.makedirs(experiment_name)

arch_name = f'{experiment_name}/{experiment_name}' + '.zip'
logger.info(f'arch_name: {arch_name}')
with open(arch_name, 'wb') as content_file:
for response in response_iter:
logger.info(f'Size: {response.size}')
if response.size == len(response.npbytes):
content_file.write(response.npbytes)
else:
raise Exception('Broken archive')

shutil.unpack_archive(arch_name, experiment_name)
os.remove(arch_name)

requirements_filename = f'./{experiment_name}/requirements.txt'

if os.path.isfile(requirements_filename):
attempts = 3
for _ in range(attempts):
try:
check_call([
executable, '-m', 'pip', 'install', '-r', requirements_filename],
shell=False)
except Exception as exc:
logger.error(f'Failed to install requirements: {exc}')
time.sleep(3)
else:
break
else:
logger.error('No ' + requirements_filename + ' file found.')
return data_stream

def _get_experiment_data(self):
"""Generate the experiment data request."""
Expand Down