Skip to content

Commit

Permalink
Implement work-stealing scheduler (#862)
Browse files Browse the repository at this point in the history
Closes #858
  • Loading branch information
amezin committed Jan 11, 2023
1 parent 9b0b5b1 commit d1dfad3
Show file tree
Hide file tree
Showing 10 changed files with 638 additions and 26 deletions.
1 change: 1 addition & 0 deletions changelog/858.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
New ``worksteal`` scheduler, based on the idea of `work stealing <https://en.wikipedia.org/wiki/Work_stealing>`_. It's similar to ``load`` scheduler, but it should handle tests with significantly differing duration better, and, at the same time, it should provide similar or better reuse of fixtures.
9 changes: 9 additions & 0 deletions docs/distribution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,13 @@ The test distribution algorithm is configured with the ``--dist`` command-line o
This will make sure ``test1`` and ``TestA::test2`` will run in the same worker.
Tests without the ``xdist_group`` mark are distributed normally as in the ``--dist=load`` mode.

* ``--dist worksteal``: Initially, tests are distributed evenly among all
available workers. When a worker completes most of its assigned tests and
doesn't have enough tests to continue (currently, every worker needs at least
two tests in its queue), an attempt is made to reassign ("steal") a portion
of tests from some other worker's queue. The results should be similar to
the ``load`` method, but ``worksteal`` should handle tests with significantly
differing duration better, and, at the same time, it should provide similar
or better reuse of fixtures.

* ``--dist no``: The normal pytest execution mode, runs one test at a time (no distribution at all).
13 changes: 13 additions & 0 deletions src/xdist/dsession.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
LoadScopeScheduling,
LoadFileScheduling,
LoadGroupScheduling,
WorkStealingScheduling,
)


Expand Down Expand Up @@ -100,6 +101,7 @@ def pytest_xdist_make_scheduler(self, config, log):
"loadscope": LoadScopeScheduling,
"loadfile": LoadFileScheduling,
"loadgroup": LoadGroupScheduling,
"worksteal": WorkStealingScheduling,
}
return schedulers[dist](config, log)

Expand Down Expand Up @@ -282,6 +284,17 @@ def worker_runtest_protocol_complete(self, node, item_index, duration):
"""
self.sched.mark_test_complete(node, item_index, duration)

def worker_unscheduled(self, node, indices):
"""
Emitted when a node fires the 'unscheduled' event, signalling that
some tests have been removed from the worker's queue and should be
sent to some worker again.
This should happen only in response to 'steal' command, so schedulers
not using 'steal' command don't have to implement it.
"""
self.sched.remove_pending_tests_from_node(node, indices)

def worker_collectreport(self, node, rep):
"""Emitted when a node calls the pytest_collectreport hook.
Expand Down
12 changes: 11 additions & 1 deletion src/xdist/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,15 @@ def pytest_addoption(parser):
"--dist",
metavar="distmode",
action="store",
choices=["each", "load", "loadscope", "loadfile", "loadgroup", "no"],
choices=[
"each",
"load",
"loadscope",
"loadfile",
"loadgroup",
"worksteal",
"no",
],
dest="dist",
default="no",
help=(
Expand All @@ -107,6 +115,8 @@ def pytest_addoption(parser):
"loadfile: load balance by sending test grouped by file"
" to any available environment.\n\n"
"loadgroup: like load, but sends tests marked with 'xdist_group' to the same worker.\n\n"
"worksteal: split the test suite between available environments,"
" then rebalance when any worker runs out of tests.\n\n"
"(default) no: run tests inprocess, don't distribute."
),
)
Expand Down
78 changes: 54 additions & 24 deletions src/xdist/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
needs not to be installed in remote environments.
"""

import contextlib
import sys
import os
import time
Expand Down Expand Up @@ -56,14 +57,21 @@ def worker_title(title):


class WorkerInteractor:
SHUTDOWN_MARK = object()

def __init__(self, config, channel):
self.config = config
self.workerid = config.workerinput.get("workerid", "?")
self.testrunuid = config.workerinput["testrunuid"]
self.log = Producer(f"worker-{self.workerid}", enabled=config.option.debug)
self.channel = channel
self.torun = self._make_queue()
self.nextitem_index = None
config.pluginmanager.register(self)

def _make_queue(self):
return self.channel.gateway.execmodel.queue.Queue()

def sendevent(self, name, **kwargs):
self.log("sending", name, kwargs)
self.channel.send((name, kwargs))
Expand Down Expand Up @@ -92,38 +100,60 @@ def pytest_sessionfinish(self, exitstatus):
def pytest_collection(self, session):
self.sendevent("collectionstart")

def handle_command(self, command):
if command is self.SHUTDOWN_MARK:
self.torun.put(self.SHUTDOWN_MARK)
return

name, kwargs = command

self.log("received command", name, kwargs)
if name == "runtests":
for i in kwargs["indices"]:
self.torun.put(i)
elif name == "runtests_all":
for i in range(len(self.session.items)):
self.torun.put(i)
elif name == "shutdown":
self.torun.put(self.SHUTDOWN_MARK)
elif name == "steal":
self.steal(kwargs["indices"])

def steal(self, indices):
indices = set(indices)
stolen = []

old_queue, self.torun = self.torun, self._make_queue()

def old_queue_get_nowait_noraise():
with contextlib.suppress(self.channel.gateway.execmodel.queue.Empty):
return old_queue.get_nowait()

for i in iter(old_queue_get_nowait_noraise, None):
if i in indices:
stolen.append(i)
else:
self.torun.put(i)

self.sendevent("unscheduled", indices=stolen)

@pytest.hookimpl
def pytest_runtestloop(self, session):
self.log("entering main loop")
torun = []
while 1:
try:
name, kwargs = self.channel.receive()
except EOFError:
return True
self.log("received command", name, kwargs)
if name == "runtests":
torun.extend(kwargs["indices"])
elif name == "runtests_all":
torun.extend(range(len(session.items)))
self.log("items to run:", torun)
# only run if we have an item and a next item
while len(torun) >= 2:
self.run_one_test(torun)
if name == "shutdown":
if torun:
self.run_one_test(torun)
break
self.channel.setcallback(self.handle_command, endmarker=self.SHUTDOWN_MARK)
self.nextitem_index = self.torun.get()
while self.nextitem_index is not self.SHUTDOWN_MARK:
self.run_one_test()
return True

def run_one_test(self, torun):
def run_one_test(self):
items = self.session.items
self.item_index = torun.pop(0)
self.item_index, self.nextitem_index = self.nextitem_index, self.torun.get()
item = items[self.item_index]
if torun:
nextitem = items[torun[0]]
else:
if self.nextitem_index is self.SHUTDOWN_MARK:
nextitem = None
else:
nextitem = items[self.nextitem_index]

worker_title("[pytest-xdist running] %s" % item.nodeid)

Expand Down
1 change: 1 addition & 0 deletions src/xdist/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
from xdist.scheduler.loadfile import LoadFileScheduling # noqa
from xdist.scheduler.loadscope import LoadScopeScheduling # noqa
from xdist.scheduler.loadgroup import LoadGroupScheduling # noqa
from xdist.scheduler.worksteal import WorkStealingScheduling # noqa
Loading

0 comments on commit d1dfad3

Please sign in to comment.