Skip to content

Commit

Permalink
Merge pull request #169 from zuoxingdong/step_info_trajectory
Browse files Browse the repository at this point in the history
newly implement run_experiment
  • Loading branch information
zuoxingdong committed May 8, 2019
2 parents 7f0253c + 3706f82 commit 79107a8
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 109 deletions.
12 changes: 0 additions & 12 deletions docs/source/experiment.rst
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@ Config

.. autoclass:: Config
:members:

Experiment Master
-----------------

.. autoclass:: ExperimentMaster
:members:

Experiment Worker
-----------------

.. autoclass:: ExperimentWorker
:members:

Run experiment
--------------
Expand Down
3 changes: 0 additions & 3 deletions lagom/experiment/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,4 @@
from .config import Condition
from .config import Config

from .experiment_master import ExperimentMaster
from .experiment_worker import ExperimentWorker

from .run_experiment import run_experiment
136 changes: 87 additions & 49 deletions lagom/experiment/run_experiment.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,71 @@
import os
from shutil import rmtree
from shutil import copyfile
from pathlib import Path
import inspect
from itertools import product
from concurrent.futures import ProcessPoolExecutor

import torch
from lagom.utils import pickle_dump
from lagom.utils import yaml_dump
from lagom.utils import ask_yes_or_no
from lagom.utils import timeit

from .experiment_master import ExperimentMaster
from .experiment_worker import ExperimentWorker
from lagom.utils import CloudpickleWrapper


@timeit(color='green', attribute='bold')
def run_experiment(run, config, seeds, num_worker):
r"""A convenient function to launch a parallelized experiment (Master-Worker).
.. note::
def run_experiment(run, config, seeds, log_dir, max_workers, chunksize=1, use_gpu=False, gpu_ids=None):
r"""A convenient function to parallelize the experiment (master-worker pipeline).
It automatically creates all subfolders for logging the experiment. The topmost
folder is indicated by the logging directory specified in the configuration.
Then all subfolders for each configuration are created with the name of their ID.
Finally, under each configuration subfolder, a set subfolders are created for each
random seed (the random seed as folder name). Intuitively, an experiment could have
following directory structure::
- logs
- 0 # ID number
- 123 # random seed
- 345
- 567
- 1
- 123
- 345
- 567
- 2
- 123
- 345
- 567
- 3
- 123
- 345
- 567
- 4
- 123
- 345
- 567
It is implemented by using `concurrent.futures.ProcessPoolExecutor`
It automatically creates all subfolders for each pair of configuration and random seed
to store the loggings of the experiment. The root folder is given by the user.
Then all subfolders for each configuration are created with the name of their job IDs.
Under each configuration subfolder, a set subfolders are created for each
random seed (the random seed as folder name). Intuitively, an experiment could have
following directory structure::
- logs
- 0 # ID number
- 123 # random seed
- 345
- 567
- 1
- 123
- 345
- 567
- 2
- 123
- 345
- 567
- 3
- 123
- 345
- 567
- 4
- 123
- 345
- 567
Args:
run (function): an algorithm function to train on.
run (function): a function that defines an algorithm, it must take the
arguments `(config, seed, device, logdir)`
config (Config): a :class:`Config` object defining all configuration settings
seeds (list): a list of random seeds
num_worker (int): number of workers
log_dir (str): a string to indicate the path to store loggings.
max_workers (int): argument for ProcessPoolExecutor.
chunksize (int): argument for Executor.map()
use_gpu (bool): if `True`, then use CUDA. Otherwise, use CPU.
gpu_ids (list): if `None`, then use all available GPUs. Otherwise, only use the
GPU device defined in the list.
"""
experiment = ExperimentMaster(ExperimentWorker, num_worker, run, config, seeds)
configs = config.make_configs()

log_path = Path(experiment.configs[0]['log.dir'])
# create logging dir
log_path = Path(log_dir)
if not log_path.exists():
log_path.mkdir(parents=True)
else:
Expand All @@ -71,22 +80,51 @@ def run_experiment(run, config, seeds, num_worker):
log_path.mkdir(parents=True)
print(f"The old logging directory is renamed to '{old_log_path.absolute()}'. ")
input('Please, press Enter to continue\n>>> ')

# save source files
source_path = Path(log_path / 'source_files/')
source_path.mkdir(parents=True)
[copyfile(s, source_path / s.name) for s in Path(inspect.getsourcefile(run)).parent.glob('*.py')]

# Create subfolders for each ID and subsubfolders for each random seed
for config in experiment.configs:
for config in configs:
ID = config['ID']
for seed in experiment.seeds:
for seed in seeds:
p = log_path / f'{ID}' / f'{seed}'
p.mkdir(parents=True)
yaml_dump(obj=config, f=log_path/f'{ID}'/'config', ext='.yml')

pickle_dump(experiment.configs, log_path / 'configs', ext='.pkl')
yaml_dump(obj=config, f=log_path / f'{ID}' / 'config', ext='.yml')

pickle_dump(configs, log_path / 'configs', ext='.pkl')

def _run(args):
config, seed = args
# VERY IMPORTANT TO AVOID GETTING STUCK, oversubscription
# see following links
# https://github.com/pytorch/pytorch/issues/19163
# https://software.intel.com/en-us/intel-threading-building-blocks-openmp-or-native-threads
torch.set_num_threads(1)
if use_gpu:
num_gpu = torch.cuda.device_count()
if gpu_ids is None: # use all GPUs
device_id = config['ID'] % num_gpu
else:
assert all([i >= 0 and i < num_gpu for i in gpu_ids])
device_id = gpu_ids[config['ID'] % len(gpu_ids)]
torch.cuda.set_device(device_id)
device = torch.device(f'cuda:{device_id}')
else:
device = torch.device('cpu')

# Run experiment in parallel
results = experiment()
print(f'@ Experiment: Job ID ({config["ID"]}), PID ({os.getpid()}), seed ({seed}), device ({device})')
print('#'*50)
[print(f'# {key}: {value}') for key, value in config.items()]
print('#'*50)

logdir = log_path / f'{config["ID"]}' / f'{seed}'
result = run(config, seed, device, logdir)
return result

with ProcessPoolExecutor(max_workers=max_workers) as executor:
args = list(product(configs, seeds))
results = list(executor.map(CloudpickleWrapper(_run), args, chunksize=chunksize))
return results
4 changes: 4 additions & 0 deletions lagom/runner/trajectory.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ def numpy_dones(self):
def numpy_masks(self):
return 1. - self.numpy_dones

@property
def infos(self):
return [step_info.info for step_info in self.step_infos]

def get_all_info(self, key):
return [step_info[key] for step_info in self.step_infos]

Expand Down
File renamed without changes.
File renamed without changes.
92 changes: 92 additions & 0 deletions legacy/run_experiment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from shutil import rmtree
from shutil import copyfile
from pathlib import Path
import inspect

from lagom.utils import pickle_dump
from lagom.utils import yaml_dump
from lagom.utils import ask_yes_or_no
from lagom.utils import timeit

from .experiment_master import ExperimentMaster
from .experiment_worker import ExperimentWorker


@timeit(color='green', attribute='bold')
def run_experiment(run, config, seeds, num_worker):
r"""A convenient function to launch a parallelized experiment (Master-Worker).
.. note::
It automatically creates all subfolders for logging the experiment. The topmost
folder is indicated by the logging directory specified in the configuration.
Then all subfolders for each configuration are created with the name of their ID.
Finally, under each configuration subfolder, a set subfolders are created for each
random seed (the random seed as folder name). Intuitively, an experiment could have
following directory structure::
- logs
- 0 # ID number
- 123 # random seed
- 345
- 567
- 1
- 123
- 345
- 567
- 2
- 123
- 345
- 567
- 3
- 123
- 345
- 567
- 4
- 123
- 345
- 567
Args:
run (function): an algorithm function to train on.
config (Config): a :class:`Config` object defining all configuration settings
seeds (list): a list of random seeds
num_worker (int): number of workers
"""
experiment = ExperimentMaster(ExperimentWorker, num_worker, run, config, seeds)

log_path = Path(experiment.configs[0]['log.dir'])
if not log_path.exists():
log_path.mkdir(parents=True)
else:
msg = f"Logging directory '{log_path.absolute()}' already existed, do you want to clean it ?"
answer = ask_yes_or_no(msg)
if answer:
rmtree(log_path)
log_path.mkdir(parents=True)
else: # back up
old_log_path = log_path.with_name('old_' + log_path.name)
log_path.rename(old_log_path)
log_path.mkdir(parents=True)
print(f"The old logging directory is renamed to '{old_log_path.absolute()}'. ")
input('Please, press Enter to continue\n>>> ')

# save source files
source_path = Path(log_path / 'source_files/')
source_path.mkdir(parents=True)
[copyfile(s, source_path / s.name) for s in Path(inspect.getsourcefile(run)).parent.glob('*.py')]

# Create subfolders for each ID and subsubfolders for each random seed
for config in experiment.configs:
ID = config['ID']
for seed in experiment.seeds:
p = log_path / f'{ID}' / f'{seed}'
p.mkdir(parents=True)
yaml_dump(obj=config, f=log_path/f'{ID}'/'config', ext='.yml')

pickle_dump(experiment.configs, log_path / 'configs', ext='.pkl')

# Run experiment in parallel
results = experiment()
return results
58 changes: 13 additions & 45 deletions test/test_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@
from lagom.experiment import Sample
from lagom.experiment import Condition
from lagom.experiment import Config
from lagom.experiment import ExperimentWorker
from lagom.experiment import ExperimentMaster
from lagom.experiment import run_experiment


Expand Down Expand Up @@ -115,63 +113,33 @@ def test_config(num_sample, keep_dict_order):
assert list_config[ID]['ID'] == ID


def run(config, seed, device):
return config['ID'], seed


@pytest.mark.parametrize('num_sample', [1, 5])
@pytest.mark.parametrize('num_worker', [1, 3, 4, 7])
def test_experiment(num_sample, num_worker):
config = Config({'log.dir': 'some path',
'network.lr': Grid([1e-3, 5e-3]),
'network.size': [32, 16],
'env.id': Grid(['CartPole-v1', 'Ant-v2'])},
num_sample=num_sample,
keep_dict_order=True)
experiment = ExperimentMaster(ExperimentWorker, num_worker=num_worker, run=run, config=config, seeds=[1, 2, 3])
assert len(experiment.configs) == 4
assert experiment.num_worker == num_worker
@pytest.mark.parametrize('max_workers', [1, 5, 100])
@pytest.mark.parametrize('chunksize', [1, 7, 40])
def test_run_experiment(num_sample, max_workers, chunksize):
def run(config, seed, device, logdir):
return config['ID'], seed, device, logdir

tasks = experiment.make_tasks()
assert len(tasks) == 4*3
for task in tasks:
assert isinstance(task[0], dict) and list(task[0].keys()) == ['ID'] + list(config.items.keys())
assert task[1] in [1, 2, 3]
assert task[2] is run

results = experiment()
assert len(results) == 4*3
for i in range(0, 4*3, 3):
assert results[i][0] == i//3
assert results[i+1][0] == i//3
assert results[i+2][0] == i//3

assert results[i][1] == 1
assert results[i+1][1] == 2
assert results[i+2][1] == 3


@pytest.mark.parametrize('num_sample', [1, 5])
@pytest.mark.parametrize('num_worker', [1, 3, 4, 7])
def test_run_experiment(num_sample, num_worker):
config = Config({'log.dir': 'some path',
'network.lr': Grid([1e-3, 5e-3]),
config = Config({'network.lr': Grid([1e-3, 5e-3]),
'network.size': [32, 16],
'env.id': Grid(['CartPole-v1', 'Ant-v2'])},
num_sample=num_sample,
keep_dict_order=True)
seeds = [1, 2, 3]
run_experiment(run, config, seeds, num_worker)

p = Path('./some path')
log_dir = './some_path'
run_experiment(run, config, seeds, log_dir, max_workers, chunksize, use_gpu=False, gpu_ids=None)

p = Path('./some_path')
assert p.exists()
assert (p / 'configs.pkl').exists()
assert (p / 'source_files').exists() and (p / 'source_files').is_dir()
# Check all configuration folders with their IDs and subfolders for all random seeds
for i in range(4):
config_p = p / str(i)
assert config_p.exists()
assert (config_p / 'config.yml').exists()
for seed in seeds:
assert (config_p / str(seed)).exists
assert (config_p / str(seed)).exists()
# Clean the logging directory
rmtree(p)
# Test remove
Expand Down

0 comments on commit 79107a8

Please sign in to comment.