Skip to content

Commit

Permalink
Improve concurrency control for command pools
Browse files Browse the repository at this point in the history
Previously there was only CommandPool.concurrency to control *how many*
commands were allowed to run concurrently, now the caller can control
*which* commands are allowed to run concurrently (using the two new
properties ExternalCommand.{dependencies,group_by}).
  • Loading branch information
xolox committed Jul 9, 2016
1 parent 6f8f0b9 commit f153d8a
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 8 deletions.
37 changes: 36 additions & 1 deletion executor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
unicode = str

# Semi-standard module versioning.
__version__ = '12.0'
__version__ = '13.0'

# Initialize a logger.
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -472,6 +472,23 @@ def decoded_stderr(self):
if value is not None:
return value.decode(self.encoding)

@writable_property(cached=True)
def dependencies(self):
"""
The dependencies of the command (a list of :class:`ExternalCommand` objects).
The :attr:`dependencies` property enables low level concurrency control
in command pools by imposing a specific order of execution:
- Command pools will never start a command until the
:attr:`~.ExternalCommand.is_finished` properties of all of the
command's :attr:`~.ExternalCommand.dependencies` are :data:`True`.
- If :attr:`dependencies` is empty it has no effect and concurrency is
controlled by :attr:`group_by` and :attr:`~.CommandPool.concurrency`.
"""
return []

@mutable_property
def directory(self):
"""
Expand Down Expand Up @@ -580,6 +597,24 @@ def finish_event(self):
pools.
"""

@mutable_property
def group_by(self):
"""
Identifier that's used to group the external command (any hashable value).
The :attr:`group_by` property enables high level concurrency control in
command pools by making it easy to control which commands are allowed
to run concurrently and which are required to run serially:
- Command pools will never start more than one command within a group
of commands that share the same value of :attr:`group_by` (for values
that aren't :data:`None`).
- If :attr:`group_by` is :data:`None` it has no effect and concurrency
is controlled by :attr:`dependencies` and
:attr:`~.CommandPool.concurrency`.
"""

@property
def have_superuser_privileges(self):
"""
Expand Down
53 changes: 46 additions & 7 deletions executor/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
The :mod:`executor.concurrent` module defines the :class:`CommandPool` class
which makes it easy to prepare a large number of external commands, group them
together in a pool, start executing a configurable number of external commands
simultaneously and wait for all external commands to finish.
simultaneously and wait for all external commands to finish. For fine grained
concurrency control please refer to the :attr:`~.ExternalCommand.dependencies`
and :attr:`~.ExternalCommand.group_by` properties of the
:class:`.ExternalCommand` class.
"""

# Standard library modules.
Expand Down Expand Up @@ -138,6 +141,20 @@ def num_running(self):
"""The number of currently running commands in the pool (an integer)."""
return sum(cmd.is_running for id, cmd in self.commands)

@property
def running_groups(self):
"""
A set of running command groups.
The value of :attr:`running_groups` is a :class:`set` with the
:attr:`~ExternalCommand.group_by` values of all currently running
commands (:data:`None` is never included in the set).
"""
return set(
cmd.group_by for id, cmd in self.commands
if cmd.is_running and cmd.group_by is not None
)

@property
def results(self):
"""
Expand Down Expand Up @@ -290,14 +307,36 @@ def spawn(self):
:returns: The number of external commands that were spawned by this
invocation of :func:`spawn()` (an integer).
The commands to start are picked according to three criteria:
1. The command's :attr:`~.ExternalCommand.was_started` property is
:data:`False`.
2. The command's :attr:`~.ExternalCommand.group_by` value is not
present in :attr:`running_groups`.
3. The :attr:`~.ExternalCommand.is_finished` properties of all of the
command's :attr:`~.ExternalCommand.dependencies` are :data:`True`.
"""
num_started = 0
todo = [cmd for id, cmd in self.commands if not cmd.was_started]
while todo and self.num_running < self.concurrency:
cmd = todo.pop(0)
cmd.start()
num_started += 1
if num_started >= 1:
limit = self.concurrency - self.num_running
if limit > 0:
running_groups = self.running_groups
for id, cmd in self.commands:
# Skip commands that have already been started.
if not cmd.was_started:
# If command groups are being used we'll only
# allow one running command per command group.
if cmd.group_by not in running_groups:
# If a command has any dependencies we won't allow it
# to start until all of its dependencies have finished.
if all(dependency.is_finished for dependency in cmd.dependencies):
cmd.start()
num_started += 1
if cmd.group_by is not None:
running_groups.add(cmd.group_by)
if num_started == limit:
break
if num_started > 0:
logger.debug("Spawned %s ..", pluralize(num_started, "external command"))
return num_started

Expand Down
30 changes: 30 additions & 0 deletions executor/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,36 @@ def test_command_pool_logs_directory(self):
contents = handle.read()
assert filename == ('%s.log' % contents.strip())

def test_concurrency_control_with_groups(self):
"""Make sure command pools support ``group_by`` for high level concurrency control."""
pool = CommandPool(concurrency=10)
for i in range(10):
pool.add(ExternalCommand('sleep 0.1', group_by='group-a'))
for i in range(10):
pool.add(ExternalCommand('sleep 0.1', group_by='group-b'))
while not pool.is_finished:
pool.spawn()
# Make sure we never see more than two commands running at the same
# time (because the commands are spread over two command groups).
assert pool.num_running <= 2
pool.collect()

def test_concurrency_control_with_dependencies(self):
"""Make sure command pools support ``dependencies`` for low level concurrency control."""
pool = CommandPool(concurrency=10)
group_one = [ExternalCommand('sleep 0.1') for i in range(5)]
group_two = [ExternalCommand('sleep 0.1', dependencies=group_one) for i in range(5)]
group_three = [ExternalCommand('sleep 0.1', dependencies=group_two) for i in range(5)]
for group in group_one, group_two, group_three:
for cmd in group:
pool.add(cmd)
while not pool.is_finished:
pool.spawn()
# Make sure we never see more than one group of commands running at
# the same time (because we've set up the dependencies like this).
assert pool.num_running <= 5
pool.collect()

def test_ssh_user_at_host(self):
"""Make sure a username can be injected via an SSH alias."""
cmd = RemoteCommand('root@host', 'true')
Expand Down

0 comments on commit f153d8a

Please sign in to comment.