Skip to content

Commit

Permalink
Runner - support 'pattern' in 'mpi' mode to run tasks in parallel (#430)
Browse files Browse the repository at this point in the history
* add mpi-parallels mode

* update according to comments

* fix and update doc

* update

* merge into 'mpi' mode

* udpate according to comments

* fix testcases

* fix ansible

* regard pattern as field

* udpate

* fix flake8 version

* add flake8 range

* remove map-by from host config

* udpate comments
  • Loading branch information
RyoYang committed Nov 29, 2022
1 parent 3c97381 commit e4eeda0
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 9 deletions.
10 changes: 10 additions & 0 deletions docs/superbench-config.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,7 @@ Some attributes may only be suitable for specific mode.
| `env` ||||
| `mca` ||||
| `parallel` ||||
| `pattern` ||||

* accepted values: `local | torch.distributed | mpi`
* default value: `local`
Expand Down Expand Up @@ -454,3 +455,12 @@ Whether run benchmarks in parallel (all ranks at the same time) or in sequence (
Only available for `local` mode.

* default value: `yes`

### `pattern`

Pattern variables to run benchmarks with nodes in specified traffic pattern combination, in a flatten key-value dictionary.
Only available for `mpi` mode.

Available variables in formatted string includes:
+ `name`
* accepted values: `all-nodes`
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def run(self):
'test': [
'flake8-docstrings>=1.5.0',
'flake8-quotes>=3.2.0',
'flake8>=3.8.4',
'flake8>=3.8.4, <6.0.0',
'mypy>=0.800',
'pydocstyle>=5.1.1',
'pytest-cov>=2.11.1',
Expand Down
2 changes: 2 additions & 0 deletions superbench/common/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from superbench.common.utils.lazy_import import LazyImport
from superbench.common.utils.process import run_command
from superbench.common.utils.topo_aware import gen_topo_aware_config
from superbench.common.utils.gen_traffic_pattern_config import gen_tarffic_pattern_host_group

device_manager = LazyImport('superbench.common.utils.device_manager')

Expand All @@ -24,4 +25,5 @@
'rotate_dir',
'run_command',
'gen_topo_aware_config',
'gen_tarffic_pattern_host_group',
]
65 changes: 65 additions & 0 deletions superbench/common/utils/gen_traffic_pattern_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Utilities for traffic pattern config."""
from superbench.common.utils import logger


def gen_all_nodes_config(n):
"""Generate all nodes config.
Args:
n (int): the number of participants.
Returns:
config (list): the generated config list, each item in the list is a str like "0,1,2,3".
"""
config = []
if n <= 0:
logger.warning('n is not positive')
return config
config = [','.join(map(str, range(n)))]
return config


def __convert_config_to_host_group(config, host_list):
"""Convert config format to host node.
Args:
host_list (list): the list of hostnames read from hostfile.
config (list): the traffic pattern config.
Returns:
host_groups (list): the host groups converted from traffic pattern config.
"""
host_groups = []
for item in config:
groups = item.strip().strip(';').split(';')
host_group = []
for group in groups:
hosts = []
for index in group.split(','):
hosts.append(host_list[int(index)])
host_group.append(hosts)
host_groups.append(host_group)
return host_groups


def gen_tarffic_pattern_host_group(host_list, pattern):
"""Generate host group from specified traffic pattern.
Args:
host_list (list): the list of hostnames read from hostfile.
pattern (DictConfig): the mpi pattern dict.
Returns:
host_group (list): the host group generated from traffic pattern.
"""
config = []
n = len(host_list)
if pattern.name == 'all-nodes':
config = gen_all_nodes_config(n)
else:
logger.error('Unsupported traffic pattern: {}'.format(pattern.name))
host_group = __convert_config_to_host_group(config, host_list)
return host_group
9 changes: 8 additions & 1 deletion superbench/runner/playbooks/check_env.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,16 @@
- '{{ workspace }}/sb.env'
- /tmp/sb.env
become: yes
- name: Updating Hostfile
- name: Updating Hostfile to Remote
copy:
content: "{{ sb_nodes | join('\n') }}\n"
dest: '{{ workspace }}/hostfile'
mode: 0644
become: yes
- name: Generating Hostfile to Local
delegate_to: localhost
run_once: true
copy:
content: "{{ sb_nodes | join('\n') }}\n"
dest: '{{ output_dir }}/hostfile'
mode: 0644
26 changes: 21 additions & 5 deletions superbench/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from joblib import Parallel, delayed
from omegaconf import ListConfig, OmegaConf

from superbench.common.utils import SuperBenchLogger, logger
from superbench.common.utils import SuperBenchLogger, logger, gen_tarffic_pattern_host_group
from superbench.runner.ansible import AnsibleClient
from superbench.benchmarks import ReduceType, Reducer
from superbench.monitor import MonitorRecord
Expand Down Expand Up @@ -109,6 +109,7 @@ def __get_mode_command(self, benchmark_name, mode, timeout=None):
benchmark_name (str): Benchmark name.
mode (DictConfig): Runner mode.
timeout (int): The timeout value in seconds.
host_list (list): The specified Host node list.
Return:
str: Runner command.
Expand Down Expand Up @@ -143,12 +144,13 @@ def __get_mode_command(self, benchmark_name, mode, timeout=None):
'mpirun ' # use default OpenMPI in image
'-tag-output ' # tag mpi output with [jobid,rank]<stdout/stderr> prefix
'-allow-run-as-root ' # allow mpirun to run when executed by root user
'{host_list} ' # use prepared hostfile and launch {proc_num} processes on each node
'{host_list} ' # use prepared hostfile or specify nodes and launch {proc_num} processes on each node
'-bind-to numa ' # bind processes to numa
'{mca_list} {env_list} {command}'
).format(
host_list=f'-host localhost:{mode.proc_num}'
if mode.node_num == 1 else f'-hostfile hostfile -map-by ppr:{mode.proc_num}:node',
host_list=f'-host localhost:{mode.proc_num}' if mode.node_num == 1 else
f'-hostfile hostfile -map-by ppr:{mode.proc_num}:node' if mode.host_list is None else '-host ' +
','.join(f'{host}:{mode.proc_num}' for host in mode.host_list),
mca_list=' '.join(f'-mca {k} {v}' for k, v in mode.mca.items()),
env_list=' '.join(
f'-x {k}={str(v).format(proc_rank=mode.proc_rank, proc_num=mode.proc_num)}'
Expand Down Expand Up @@ -444,7 +446,21 @@ def run(self):
)
ansible_rc = sum(rc_list)
elif mode.name == 'torch.distributed' or mode.name == 'mpi':
ansible_rc = self._run_proc(benchmark_name, mode, {'proc_rank': 0})
if not mode.pattern:
ansible_rc = self._run_proc(benchmark_name, mode, {'proc_rank': 0})
else:
with open(self._output_path / 'hostfile', 'r') as f:
host_list = f.read().splitlines()
pattern_hostx = gen_tarffic_pattern_host_group(host_list, mode.pattern)
for host_groups in pattern_hostx:
para_rc_list = Parallel(n_jobs=len(host_groups))(
delayed(self._run_proc)
(benchmark_name, mode, vars={
'proc_rank': 0,
'host_list': host_group,
}) for host_group in host_groups
)
ansible_rc = ansible_rc + sum(para_rc_list)
else:
logger.warning('Unknown mode %s.', mode.name)
if ansible_rc != 0:
Expand Down
30 changes: 30 additions & 0 deletions tests/common/test_gen_tarffic_pattern_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

"""Tests for traffic pattern config generation module."""
import argparse
import unittest

from superbench.common.utils import gen_tarffic_pattern_host_group


class GenConfigTest(unittest.TestCase):
"""Test the utils for generating config."""
def test_gen_tarffic_pattern_host_group(self):
"""Test the function of generating traffic pattern config from specified mode."""
# test under 8 nodes
hostx = ['node0', 'node1', 'node2', 'node3', 'node4', 'node5', 'node6', 'node7']
parser = argparse.ArgumentParser(
add_help=False,
usage=argparse.SUPPRESS,
allow_abbrev=False,
)
parser.add_argument(
'--name',
type=str,
default='all-nodes',
required=False,
)
pattern, _ = parser.parse_known_args()
expected_host_group = [[['node0', 'node1', 'node2', 'node3', 'node4', 'node5', 'node6', 'node7']]]
self.assertEqual(gen_tarffic_pattern_host_group(hostx, pattern), expected_host_group)
36 changes: 34 additions & 2 deletions tests/runner/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,12 +196,39 @@ def test_get_mode_command(self):
f'sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo'
),
},
{
'benchmark_name':
'foo',
'mode': {
'name': 'mpi',
'proc_num': 8,
'proc_rank': 1,
'mca': {},
'pattern': {
'name': 'all-nodes',
},
'env': {
'PATH': None,
'LD_LIBRARY_PATH': None,
},
},
'expected_command': (
'mpirun -tag-output -allow-run-as-root -host node0:8,node1:8 -bind-to numa '
' -x PATH -x LD_LIBRARY_PATH '
f'sb exec --output-dir {self.sb_output_dir} -c sb.config.yaml -C superbench.enable=foo'
),
},
]

for test_case in test_cases:
with self.subTest(msg='Testing with case', test_case=test_case):
mode = OmegaConf.create(test_case['mode'])
if 'pattern' in test_case['mode']:
mode.update({'host_list': ['node0', 'node1']})
self.assertEqual(
self.runner._SuperBenchRunner__get_mode_command(
test_case['benchmark_name'], OmegaConf.create(test_case['mode'])
test_case['benchmark_name'],
mode,
), test_case['expected_command']
)

Expand All @@ -210,9 +237,14 @@ def test_get_mode_command(self):
index = test_case['expected_command'].find('sb exec')
expected_command = test_case['expected_command'][:index] + timeout_str + test_case['expected_command'][
index:]
mode = OmegaConf.create(test_case['mode'])
if 'pattern' in test_case['mode']:
mode.update({'host_list': ['node0', 'node1']})
self.assertEqual(
self.runner._SuperBenchRunner__get_mode_command(
test_case['benchmark_name'], OmegaConf.create(test_case['mode']), test_case['timeout']
test_case['benchmark_name'],
mode,
test_case['timeout'],
), expected_command
)

Expand Down

0 comments on commit e4eeda0

Please sign in to comment.