Skip to content

Commit

Permalink
feat(runner): Concurrent tests execution in runner
Browse files Browse the repository at this point in the history
  • Loading branch information
Stranger6667 committed Nov 30, 2019
1 parent d0c17b2 commit 3542d91
Show file tree
Hide file tree
Showing 9 changed files with 105 additions and 32 deletions.
6 changes: 6 additions & 0 deletions docs/changelog.rst
Expand Up @@ -6,6 +6,11 @@ Changelog
`Unreleased`_
-------------

Added
~~~~~

- Concurrent test execution in CLI / runner. `#91`_

`0.18.1`_ - 2019-11-28
----------------------

Expand Down Expand Up @@ -531,6 +536,7 @@ Fixed
.. _#98: https://github.com/kiwicom/schemathesis/issues/98
.. _#94: https://github.com/kiwicom/schemathesis/issues/94
.. _#92: https://github.com/kiwicom/schemathesis/issues/92
.. _#91: https://github.com/kiwicom/schemathesis/issues/91
.. _#90: https://github.com/kiwicom/schemathesis/issues/90
.. _#78: https://github.com/kiwicom/schemathesis/issues/78
.. _#75: https://github.com/kiwicom/schemathesis/issues/75
Expand Down
2 changes: 1 addition & 1 deletion src/schemathesis/_hypothesis.py
Expand Up @@ -135,7 +135,7 @@ def get_case_strategy(endpoint: Endpoint) -> st.SearchStrategy:
if value is not None:
if parameter == "path_parameters":
strategies[parameter] = (
from_schema(value).filter(filter_path_parameters).map(quote_all) # type: ignore # type: ignore
from_schema(value).filter(filter_path_parameters).map(quote_all) # type: ignore
)
elif parameter == "headers":
strategies[parameter] = from_schema(value).filter(is_valid_header) # type: ignore
Expand Down
7 changes: 3 additions & 4 deletions src/schemathesis/cli/__init__.py
Expand Up @@ -225,7 +225,6 @@ class OutputStyle(Enum):
def execute(prepared_runner: Generator[events.ExecutionEvent, None, None], workers_num: int) -> None:
"""Execute a prepared runner by drawing events from it and passing to a proper handler."""
handler = get_output_handler(workers_num)
with utils.capture_hypothesis_output() as hypothesis_output:
context = events.ExecutionContext(hypothesis_output, workers_num)
for event in prepared_runner:
handler(context, event)
context = events.ExecutionContext(workers_num=workers_num)
for event in prepared_runner:
handler(context, event)
1 change: 1 addition & 0 deletions src/schemathesis/cli/output/default.py
Expand Up @@ -278,6 +278,7 @@ def handle_event(context: events.ExecutionContext, event: events.ExecutionEvent)
if isinstance(event, events.BeforeExecution):
handle_before_execution(context, event)
if isinstance(event, events.AfterExecution):
context.hypothesis_output.extend(event.hypothesis_output)
handle_after_execution(context, event)
if isinstance(event, events.Finished):
handle_finished(context, event)
Expand Down
1 change: 1 addition & 0 deletions src/schemathesis/cli/output/short.py
Expand Up @@ -19,6 +19,7 @@ def handle_event(context: events.ExecutionContext, event: events.ExecutionEvent)
if isinstance(event, events.Initialized):
default.handle_initialized(context, event)
if isinstance(event, events.AfterExecution):
context.hypothesis_output.extend(event.hypothesis_output)
handle_after_execution(context, event)
if isinstance(event, events.Finished):
default.handle_finished(context, event)
Expand Down
52 changes: 34 additions & 18 deletions src/schemathesis/runner/__init__.py
Expand Up @@ -3,7 +3,7 @@
import time
from contextlib import contextmanager
from queue import Queue
from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple, Union
from typing import Any, Callable, Dict, Generator, Iterable, List, Optional, Tuple, Union, cast

import attr
import hypothesis
Expand Down Expand Up @@ -91,7 +91,6 @@ def thread_task(
Pretty similar to the default one-thread flow, but includes communication with the main thread via the events queue.
"""
# pylint: disable=too-many-arguments
# TODO. catch hypothesis output - we should move it to the main thread
with get_session(auth, headers) as session:
with capture_hypothesis_output():
while not tasks_queue.empty():
Expand All @@ -101,13 +100,13 @@ def thread_task(
events_queue.put(event)


class Worker(threading.Thread):
def stop(self) -> None:
"""Raise an error in a thread so it is possible to immediately stop thread execution."""
thread_id = self._ident # type: ignore
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread_id), ctypes.py_object(SystemExit))
if res > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(thread_id, 0)
def stop_worker(thread_id: int) -> None:
"""Raise an error in a thread so it is possible to asynchronously stop thread execution."""
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(thread_id), ctypes.py_object(SystemExit))


class ThreadInterrupted(Exception):
"""Special exception when worker thread received SIGINT."""


@attr.s(slots=True)
Expand All @@ -123,6 +122,13 @@ def _execute(self, results: TestResultSet) -> Generator[events.ExecutionEvent, N
events_queue: Queue = Queue()
workers = self._init_workers(tasks_queue, events_queue, results)

def stop_workers() -> None:
for worker in workers:
# workers are initialized at this point and `worker.ident` is set with an integer value
ident = cast(int, worker.ident)
stop_worker(ident)
worker.join()

is_finished = False
try:
while not is_finished:
Expand All @@ -132,11 +138,17 @@ def _execute(self, results: TestResultSet) -> Generator[events.ExecutionEvent, N
time.sleep(0.001)
is_finished = all(not worker.is_alive() for worker in workers)
while not events_queue.empty():
yield events_queue.get()
event = events_queue.get()
yield event
if isinstance(event, events.Interrupted):
# Thread received SIGINT
# We could still have events in the queue, but ignore them to keep the logic simple
# for now, could be improved in the future to show more info in such corner cases
raise ThreadInterrupted
except ThreadInterrupted:
stop_workers()
except KeyboardInterrupt:
for worker in workers:
worker.stop()
worker.join()
stop_workers()
yield events.Interrupted(results=results, schema=self.schema)

def _get_tasks_queue(self) -> Queue:
Expand All @@ -145,10 +157,10 @@ def _get_tasks_queue(self) -> Queue:
tasks_queue.queue.extend(self.schema.get_all_endpoints())
return tasks_queue

def _init_workers(self, tasks_queue: Queue, events_queue: Queue, results: TestResultSet) -> List[Worker]:
def _init_workers(self, tasks_queue: Queue, events_queue: Queue, results: TestResultSet) -> List[threading.Thread]:
"""Initialize & start workers that will execute tests."""
workers = [
Worker(
threading.Thread(
target=thread_task,
kwargs={
"tasks_queue": tasks_queue,
Expand Down Expand Up @@ -209,12 +221,14 @@ def run_test(
# pylint: disable=too-many-arguments
result = TestResult(endpoint=endpoint, schema=schema)
yield events.BeforeExecution(results=results, schema=schema, endpoint=endpoint)
hypothesis_output: List[str] = []
try:
if isinstance(test, InvalidSchema):
status = Status.error
result.add_error(test)
else:
test(session, checks, result, request_timeout)
with capture_hypothesis_output() as hypothesis_output:
test(session, checks, result, request_timeout)
status = Status.success
except AssertionError:
status = Status.failure
Expand Down Expand Up @@ -247,8 +261,10 @@ def run_test(
result.seed = getattr(test, "_hypothesis_internal_use_seed", None) or getattr(
test, "_hypothesis_internal_use_generated_seed", None
)
results.append(result) # TODO. make thread safe
yield events.AfterExecution(results=results, schema=schema, endpoint=endpoint, status=status)
results.append(result)
yield events.AfterExecution(
results=results, schema=schema, endpoint=endpoint, status=status, hypothesis_output=hypothesis_output
)


def execute( # pylint: disable=too-many-arguments
Expand Down
3 changes: 2 additions & 1 deletion src/schemathesis/runner/events.py
Expand Up @@ -14,7 +14,7 @@
class ExecutionContext:
"""Storage for the current context of the execution."""

hypothesis_output: List[str] = attr.ib() # pragma: no mutate
hypothesis_output: List[str] = attr.ib(factory=list) # pragma: no mutate
workers_num: int = attr.ib(default=1) # pragma: no mutate
endpoints_processed: int = attr.ib(default=0) # pragma: no mutate
current_line_length: int = attr.ib(default=0) # pragma: no mutate
Expand Down Expand Up @@ -45,6 +45,7 @@ class BeforeExecution(ExecutionEvent):
class AfterExecution(ExecutionEvent):
endpoint: Endpoint = attr.ib() # pragma: no mutate
status: Status = attr.ib() # pragma: no mutate
hypothesis_output: List[str] = attr.ib(factory=list) # pragma: no mutate


@attr.s(slots=True) # pragma: no mutate
Expand Down
12 changes: 9 additions & 3 deletions src/schemathesis/utils.py
Expand Up @@ -54,6 +54,14 @@ def dict_not_none_values(**kwargs: Any) -> Mapping[str, Any]:
return {key: value for key, value in kwargs.items() if value is not None}


IGNORED_PATTERNS = (
"Falsifying example: ",
"You can add @seed",
"Failed to reproduce exception. Expected:",
"Flaky example!",
)


@contextmanager
def capture_hypothesis_output() -> Generator[List[str], None, None]:
"""Capture all output of Hypothesis into a list of strings.
Expand All @@ -74,9 +82,7 @@ def test(i):

def get_output(value: str) -> None:
# Drop messages that could be confusing in the Schemathesis context
if value.startswith(
("Falsifying example: ", "You can add @seed", "Failed to reproduce exception. Expected:", "Flaky example!")
):
if value.startswith(IGNORED_PATTERNS):
return
output.append(value)

Expand Down
53 changes: 48 additions & 5 deletions test/cli/test_commands.py
@@ -1,4 +1,5 @@
import os
import time
from test.utils import HERE, SIMPLE_PATH

import pytest
Expand Down Expand Up @@ -615,8 +616,18 @@ def new_check(response, result):
assert lines[14] == "Custom check failed!"


def assert_threaded_executor_interruption(lines, expected):
# It is possible to have a case when first call without an error will start processing
# But after, another thread will have interruption and will push this event before the
# first thread will finish. Race condition: "" is for this case and "." for the other
# way around
assert lines[10] in expected
assert "!! KeyboardInterrupt !!" in lines[11]
assert "== SUMMARY ==" in lines[13]


@pytest.mark.parametrize("workers", (1, 2))
def test_keyboard_interrupt(testdir, cli, schema_url, base_url, mocker, workers):
def test_keyboard_interrupt(cli, schema_url, base_url, mocker, workers):
# When a Schemathesis run in interrupted by keyboard or via SIGINT
original = Case("/success", "GET", base_url=base_url).call
counter = 0
Expand All @@ -625,6 +636,7 @@ def mocked(*args, **kwargs):
nonlocal counter
counter += 1
if counter > 1:
# For threaded case it emulates SIGINT for the worker thread
raise KeyboardInterrupt
return original(*args, **kwargs)

Expand All @@ -641,10 +653,41 @@ def mocked(*args, **kwargs):
assert "!! KeyboardInterrupt !!" in lines[12]
assert "== SUMMARY ==" in lines[14]
else:
print(lines)
assert lines[10] == "."
assert "!! KeyboardInterrupt !!" in lines[11]
assert "== SUMMARY ==" in lines[13]
assert_threaded_executor_interruption(lines, ("", "."))


def test_keyboard_interrupt_threaded(cli, schema_url, mocker):
# When a Schemathesis run in interrupted by keyboard or via SIGINT
original = time.sleep
counter = 0

def mocked(*args, **kwargs):
nonlocal counter
counter += 1
if counter > 1:
raise KeyboardInterrupt
return original(*args, **kwargs)

mocker.patch("schemathesis.runner.time.sleep", wraps=mocked)
result = cli.run(schema_url, "--workers=2")
# the exit status depends on what thread finished first
assert result.exit_code in (ExitCode.OK, ExitCode.TESTS_FAILED)
# Then execution stops and a message about interruption is displayed
lines = result.stdout.strip().split("\n")
# There are many scenarios possible, depends how many tests will be executed before interruption
# and in what order. it could be no tests at all, some of them or all of them.
assert_threaded_executor_interruption(lines, ("F", ".", "F.", ".F", ""))


@pytest.mark.endpoints("failure")
@pytest.mark.parametrize("workers", (1, 2))
def test_hypothesis_output_capture(mocker, cli, schema_url, workers):
mocker.patch("schemathesis.utils.IGNORED_PATTERNS", ())

result = cli.run(schema_url, f"--workers={workers}")
assert result.exit_code == ExitCode.TESTS_FAILED
assert "= HYPOTHESIS OUTPUT =" in result.stdout
assert "Falsifying example" in result.stdout


async def test_multiple_files_schema(app, testdir, cli, base_url):
Expand Down

0 comments on commit 3542d91

Please sign in to comment.