Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,23 @@ Time goes by, new tests are added and old ones are removed/renamed during develo
Thus, there's no need to store durations after changing the test suite.
However, when there are major changes in the suite compared to what's stored in .test_durations, it's recommended to update the duration information with `--store-durations` to ensure that the splitting is in balance.

The splitting algorithm can be controlled with the `--splitting-algorithm` CLI option and defaults to `duration_based_chunks`. For more information about the different algorithms and their tradeoffs, please see the section below.

## Splitting algorithms
The plugin supports multiple algorithms to split tests into groups.
Each algorithm makes different tradeoffs, but generally `least_duration` should give more balanced groups.

| Algorithm | Maintains Absolute Order | Maintains Relative Order | Split Quality |
|----------------|--------------------------|--------------------------|---------------|
| duration_based_chunks | :heavy_check_mark: | :heavy_check_mark: | Good |
| least_duration | :heavy_multiplication_x: | :heavy_check_mark: | Better |

Explanation of the terms in the table:
* Absolute Order: whether each group contains all tests between first and last element in the same order as the original list of tests
* Relative Order: whether each test in each group has the same relative order to its neighbours in the group as in the original list of tests

The `duration_based_chunks` algorithm aims to find optimal boundaries for the list of tests and every test group contains all tests between the start and end bounary.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it could be valuable to mention also in the usage section that one can specify the splitting algorithm via command line arg and also mention what is the default behaviour

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a small note about it.

The `least_duration` algorithm walks the list of tests and assigns each test to the group with the smallest current duration.


[**Demo with GitHub Actions**](https://github.com/jerry-git/pytest-split-gh-actions-demo)
119 changes: 119 additions & 0 deletions src/pytest_split/algorithms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import enum
import functools
import heapq
from typing import TYPE_CHECKING, NamedTuple

if TYPE_CHECKING:
from typing import Dict, List, Tuple

from _pytest import nodes


class TestGroup(NamedTuple):
selected: "List[nodes.Item]"
deselected: "List[nodes.Item]"
duration: float


def least_duration(splits: int, items: "List[nodes.Item]", durations: "Dict[str, float]") -> "List[TestGroup]":
"""
Split tests into groups by runtime.
Assigns the test with the largest runtime to the test with the smallest
duration sum.

:param splits: How many groups we're splitting in.
:param items: Test items passed down by Pytest.
:param durations: Our cached test runtimes. Assumes contains timings only of relevant tests
:return:
List of groups
"""
durations = _remove_irrelevant_durations(items, durations)
avg_duration_per_test = _get_avg_duration_per_test(durations)

selected: "List[List[nodes.Item]]" = [[] for i in range(splits)]
deselected: "List[List[nodes.Item]]" = [[] for i in range(splits)]
duration: "List[float]" = [0 for i in range(splits)]

# create a heap of the form (summed_durations, group_index)
heap: "List[Tuple[float, int]]" = [(0, i) for i in range(splits)]
heapq.heapify(heap)
for item in items:
item_duration = durations.get(item.nodeid, avg_duration_per_test)

# get group with smallest sum
summed_durations, group_idx = heapq.heappop(heap)
new_group_durations = summed_durations + item_duration

# store assignment
selected[group_idx].append(item)
duration[group_idx] = new_group_durations
for i in range(splits):
if i != group_idx:
deselected[i].append(item)

# store new duration - in case of ties it sorts by the group_idx
heapq.heappush(heap, (new_group_durations, group_idx))

return [TestGroup(selected=selected[i], deselected=deselected[i], duration=duration[i]) for i in range(splits)]


def duration_based_chunks(splits: int, items: "List[nodes.Item]", durations: "Dict[str, float]") -> "List[TestGroup]":
"""
Split tests into groups by runtime.
Ensures tests are split into non-overlapping groups.
The original list of test items is split into groups by finding boundary indices i_0, i_1, i_2
and creating group_1 = items[0:i_0], group_2 = items[i_0, i_1], group_3 = items[i_1, i_2], ...

:param splits: How many groups we're splitting in.
:param items: Test items passed down by Pytest.
:param durations: Our cached test runtimes. Assumes contains timings only of relevant tests
:return: List of TestGroup
"""
durations = _remove_irrelevant_durations(items, durations)
avg_duration_per_test = _get_avg_duration_per_test(durations)

tests_and_durations = {item: durations.get(item.nodeid, avg_duration_per_test) for item in items}
time_per_group = sum(tests_and_durations.values()) / splits

selected: "List[List[nodes.Item]]" = [[] for i in range(splits)]
deselected: "List[List[nodes.Item]]" = [[] for i in range(splits)]
duration: "List[float]" = [0 for i in range(splits)]

group_idx = 0
for item in items:
if duration[group_idx] >= time_per_group:
group_idx += 1

selected[group_idx].append(item)
for i in range(splits):
if i != group_idx:
deselected[i].append(item)
duration[group_idx] += tests_and_durations.pop(item)

return [TestGroup(selected=selected[i], deselected=deselected[i], duration=duration[i]) for i in range(splits)]


def _get_avg_duration_per_test(durations: "Dict[str, float]") -> float:
if durations:
avg_duration_per_test = sum(durations.values()) / len(durations)
else:
# If there are no durations, give every test the same arbitrary value
avg_duration_per_test = 1
return avg_duration_per_test


def _remove_irrelevant_durations(items: "List[nodes.Item]", durations: "Dict[str, float]") -> "Dict[str, float]":
# Filtering down durations to relevant ones ensures the avg isn't skewed by irrelevant data
test_ids = [item.nodeid for item in items]
durations = {name: durations[name] for name in test_ids if name in durations}
return durations


class Algorithms(enum.Enum):
# values have to wrapped inside functools to avoid them being considered method definitions
duration_based_chunks = functools.partial(duration_based_chunks)
least_duration = functools.partial(least_duration)

@staticmethod
def names() -> "List[str]":
return [x.name for x in Algorithms]
87 changes: 28 additions & 59 deletions src/pytest_split/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
from _pytest.config import create_terminal_writer, hookimpl
from _pytest.reports import TestReport

from pytest_split import algorithms

if TYPE_CHECKING:
from typing import List, Optional, Tuple, Union
from typing import Dict, List, Optional, Union

from _pytest import nodes
from _pytest.config import Config
Expand Down Expand Up @@ -53,6 +55,14 @@ def pytest_addoption(parser: "Parser") -> None:
type=int,
help="The group of tests that should be executed (first one is 1)",
)
group.addoption(
"--splitting-algorithm",
dest="splitting_algorithm",
type=str,
help=f"Algorithm used to split the tests. Choices: {algorithms.Algorithms.names()}",
default="duration_based_chunks",
choices=algorithms.Algorithms.names(),
)


@pytest.mark.tryfirst
Expand Down Expand Up @@ -119,8 +129,6 @@ class PytestSplitPlugin(Base):
def __init__(self, config: "Config"):
super().__init__(config)

self._messages: "List[str]" = []

if not self.cached_durations:
message = self.writer.markup(
"\n[pytest-split] No test durations found. Pytest-split will "
Expand All @@ -136,66 +144,27 @@ def pytest_collection_modifyitems(self, config: "Config", items: "List[nodes.Ite
Collect and select the tests we want to run, and deselect the rest.
"""
splits: int = config.option.splits
group: int = config.option.group
group_idx: int = config.option.group

selected_tests, deselected_tests = self._split_tests(splits, group, items, self.cached_durations)
algo = algorithms.Algorithms[config.option.splitting_algorithm].value
groups = algo(splits, items, self.cached_durations)
group = groups[group_idx - 1]

items[:] = selected_tests
config.hook.pytest_deselected(items=deselected_tests)
items[:] = group.selected
config.hook.pytest_deselected(items=group.deselected)

self.writer.line(self.writer.markup(f"\n\n[pytest-split] Running group {group}/{splits}\n"))
self.writer.line(
self.writer.markup(
f"\n\n[pytest-split] Splitting tests with algorithm: {config.option.splitting_algorithm}"
)
)
self.writer.line(
self.writer.markup(
f"[pytest-split] Running group {group_idx}/{splits} (estimated duration: {group.duration:.2f}s)\n"
)
)
return None

@staticmethod
def _split_tests(
splits: int,
group: int,
items: "List[nodes.Item]",
stored_durations: dict,
) -> "Tuple[list, list]":
"""
Split tests into groups by runtime.

:param splits: How many groups we're splitting in.
:param group: Which group this run represents.
:param items: Test items passed down by Pytest.
:param stored_durations: Our cached test runtimes.
:return:
Tuple of two lists.
The first list represents the tests we want to run,
while the other represents the tests we want to deselect.
"""
# Filtering down durations to relevant ones ensures the avg isn't skewed by irrelevant data
test_ids = [item.nodeid for item in items]
durations = {k: v for k, v in stored_durations.items() if k in test_ids}

if durations:
avg_duration_per_test = sum(durations.values()) / len(durations)
else:
# If there are no durations, give every test the same arbitrary value
avg_duration_per_test = 1

tests_and_durations = {item: durations.get(item.nodeid, avg_duration_per_test) for item in items}
time_per_group = sum(tests_and_durations.values()) / splits
selected, deselected = [], []

for _group in range(1, splits + 1):
group_tests, group_runtime = [], 0

for item in dict(tests_and_durations):
if group_runtime > time_per_group:
break

group_tests.append(item)
group_runtime += tests_and_durations.pop(item)

if _group == group:
selected = group_tests
else:
deselected.extend(group_tests)

return selected, deselected


class PytestSplitCachePlugin(Base):
"""
Expand All @@ -208,7 +177,7 @@ def pytest_sessionfinish(self) -> None:
https://github.com/pytest-dev/pytest/blob/main/src/_pytest/main.py#L308
"""
terminal_reporter = self.config.pluginmanager.get_plugin("terminalreporter")
test_durations = {}
test_durations: "Dict[str, float]" = {}

for test_reports in terminal_reporter.stats.values():
for test_report in test_reports:
Expand Down
85 changes: 85 additions & 0 deletions tests/test_algorithms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
from collections import namedtuple

import pytest

from pytest_split.algorithms import Algorithms

item = namedtuple("item", "nodeid")


class TestAlgorithms:
@pytest.mark.parametrize("algo_name", Algorithms.names())
def test__split_test(self, algo_name):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep consistency, other test modules seem to use single _ after test 🙂

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is consistent in the sense that I use the format: test_{func}_{does_something}_{when}.
And in this case the func is called _split_test.

Please let me know if you still prefer the change.

durations = {"a": 1, "b": 1, "c": 1}
items = [item(x) for x in durations.keys()]
algo = Algorithms[algo_name].value
first, second, third = algo(splits=3, items=items, durations=durations)

# each split should have one test
assert first.selected == [item("a")]
assert first.deselected == [item("b"), item("c")]
assert first.duration == 1

assert second.selected == [item("b")]
assert second.deselected == [item("a"), item("c")]
assert second.duration == 1

assert third.selected == [item("c")]
assert third.deselected == [item("a"), item("b")]
assert third.duration == 1

@pytest.mark.parametrize("algo_name", Algorithms.names())
def test__split_tests_handles_tests_in_durations_but_missing_from_items(self, algo_name):
durations = {"a": 1, "b": 1}
items = [item(x) for x in ["a"]]
algo = Algorithms[algo_name].value
splits = algo(splits=2, items=items, durations=durations)

first, second = splits
assert first.selected == [item("a")]
assert second.selected == []

@pytest.mark.parametrize("algo_name", Algorithms.names())
def test__split_tests_handles_tests_with_missing_durations(self, algo_name):
durations = {"a": 1}
items = [item(x) for x in ["a", "b"]]
algo = Algorithms[algo_name].value
splits = algo(splits=2, items=items, durations=durations)

first, second = splits
assert first.selected == [item("a")]
assert second.selected == [item("b")]

@pytest.mark.parametrize("algo_name", Algorithms.names())
@pytest.mark.skip("current algorithm does handle this well")
def test__split_test_handles_large_duration_at_end(self, algo_name):
durations = {"a": 1, "b": 1, "c": 1, "d": 3}
items = [item(x) for x in ["a", "b", "c", "d"]]
algo = Algorithms[algo_name].value
splits = algo(splits=2, items=items, durations=durations)

first, second = splits
assert first.selected == [item("d")]
assert second.selected == [item(x) for x in ["a", "b", "c"]]

@pytest.mark.parametrize(
"algo_name, expected",
[
("duration_based_chunks", [[item("a"), item("b")], [item("c"), item("d")]]),
("least_duration", [[item("a"), item("c")], [item("b"), item("d")]]),
],
)
def test__split_tests_calculates_avg_test_duration_only_on_present_tests(self, algo_name, expected):
# If the algo includes test e's duration to calculate the averge then
# a will be expected to take a long time, and so 'a' will become its
# own group. Intended behaviour is that a gets estimated duration 1 and
# this will create more balanced groups.
durations = {"b": 1, "c": 1, "d": 1, "e": 10000}
items = [item(x) for x in ["a", "b", "c", "d"]]
algo = Algorithms[algo_name].value
splits = algo(splits=2, items=items, durations=durations)

first, second = splits
expected_first, expected_second = expected
assert first.selected == expected_first
assert second.selected == expected_second
Loading