From 34f68dc758ad36f429d4c1186da1acc542792e03 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 3 Jun 2021 18:32:57 +0100 Subject: [PATCH 1/9] support async scenarios --- pyproject.toml | 4 + pytest_bdd/__init__.py | 4 +- pytest_bdd/_async/__init__.py | 0 pytest_bdd/_async/scenario.py | 307 ++++++++++++++++++++++++ pytest_bdd/scenario.py | 327 ++------------------------ setup.cfg | 2 +- setup.py | 11 +- tests/feature/test_async_scenarios.py | 104 ++++++++ tox.ini | 5 + 9 files changed, 454 insertions(+), 310 deletions(-) create mode 100644 pytest_bdd/_async/__init__.py create mode 100644 pytest_bdd/_async/scenario.py create mode 100644 tests/feature/test_async_scenarios.py diff --git a/pyproject.toml b/pyproject.toml index f383bfde..544a65fb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,7 @@ +[build-system] +requires = ["setuptools", "wheel", "unasync~=0.5.0"] +build-backend = "setuptools.build_meta" + [tool.black] line-length = 120 target-version = ['py36', 'py37', 'py38'] diff --git a/pytest_bdd/__init__.py b/pytest_bdd/__init__.py index b835e1b3..7c9ed65d 100644 --- a/pytest_bdd/__init__.py +++ b/pytest_bdd/__init__.py @@ -1,8 +1,8 @@ """pytest-bdd public API.""" from pytest_bdd.steps import given, when, then -from pytest_bdd.scenario import scenario, scenarios +from pytest_bdd.scenario import scenario, scenarios, async_scenario, async_scenarios __version__ = "4.0.2" -__all__ = [given.__name__, when.__name__, then.__name__, scenario.__name__, scenarios.__name__] +__all__ = ["given", "when", "then", "scenario", "scenarios", "async_scenario", "async_scenarios"] diff --git a/pytest_bdd/_async/__init__.py b/pytest_bdd/_async/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pytest_bdd/_async/scenario.py b/pytest_bdd/_async/scenario.py new file mode 100644 index 00000000..a98ce1af --- /dev/null +++ b/pytest_bdd/_async/scenario.py @@ -0,0 +1,307 @@ +"""Scenario implementation. + +The pytest will collect the test case and the steps will be executed +line by line. + +Example: + +test_publish_article = scenario( + feature_name="publish_article.feature", + scenario_name="Publishing the article", +) +""" +import collections +import os +import re + +import pytest +from _pytest.fixtures import FixtureLookupError + +from .. import exceptions +from ..feature import get_feature, get_features +from ..steps import get_step_fixture_name, inject_fixture +from ..utils import CONFIG_STACK, get_args, get_caller_module_locals, get_caller_module_path + +PYTHON_REPLACE_REGEX = re.compile(r"\W") +ALPHA_REGEX = re.compile(r"^\d+_*") + + +def find_argumented_step_fixture_name(name, type_, fixturemanager, request=None): + """Find argumented step fixture name.""" + # happens to be that _arg2fixturedefs is changed during the iteration so we use a copy + for fixturename, fixturedefs in list(fixturemanager._arg2fixturedefs.items()): + for fixturedef in fixturedefs: + parser = getattr(fixturedef.func, "parser", None) + if parser is None: + continue + match = parser.is_matching(name) + if not match: + continue + + converters = getattr(fixturedef.func, "converters", {}) + for arg, value in parser.parse_arguments(name).items(): + if arg in converters: + value = converters[arg](value) + if request: + inject_fixture(request, arg, value) + parser_name = get_step_fixture_name(parser.name, type_) + if request: + try: + request.getfixturevalue(parser_name) + except FixtureLookupError: + continue + return parser_name + + +def _find_step_function(request, step, scenario): + """Match the step defined by the regular expression pattern. + + :param request: PyTest request object. + :param step: Step. + :param scenario: Scenario. + + :return: Function of the step. + :rtype: function + """ + name = step.name + try: + # Simple case where no parser is used for the step + return request.getfixturevalue(get_step_fixture_name(name, step.type)) + except FixtureLookupError: + try: + # Could not find a fixture with the same name, let's see if there is a parser involved + name = find_argumented_step_fixture_name(name, step.type, request._fixturemanager, request) + if name: + return request.getfixturevalue(name) + raise + except FixtureLookupError: + raise exceptions.StepDefinitionNotFoundError( + f"Step definition is not found: {step}. " + f'Line {step.line_number} in scenario "{scenario.name}" in the feature "{scenario.feature.filename}"' + ) + + +async def _execute_step_function(request, scenario, step, step_func): + """Execute step function. + + :param request: PyTest request. + :param scenario: Scenario. + :param step: Step. + :param function step_func: Step function. + :param example: Example table. + """ + kw = dict(request=request, feature=scenario.feature, scenario=scenario, step=step, step_func=step_func) + + request.config.hook.pytest_bdd_before_step(**kw) + + kw["step_func_args"] = {} + try: + # Get the step argument values. + kwargs = {arg: request.getfixturevalue(arg) for arg in get_args(step_func)} + kw["step_func_args"] = kwargs + + request.config.hook.pytest_bdd_before_step_call(**kw) + target_fixture = getattr(step_func, "target_fixture", None) + # Execute the step. + return_value = await step_func(**kwargs) + if target_fixture: + inject_fixture(request, target_fixture, return_value) + + request.config.hook.pytest_bdd_after_step(**kw) + except Exception as exception: + request.config.hook.pytest_bdd_step_error(exception=exception, **kw) + raise + + +async def _execute_scenario(feature, scenario, request): + """Execute the scenario. + + :param feature: Feature. + :param scenario: Scenario. + :param request: request. + :param encoding: Encoding. + """ + request.config.hook.pytest_bdd_before_scenario(request=request, feature=feature, scenario=scenario) + + try: + # Execute scenario steps + for step in scenario.steps: + try: + step_func = _find_step_function(request, step, scenario) + except exceptions.StepDefinitionNotFoundError as exception: + request.config.hook.pytest_bdd_step_func_lookup_error( + request=request, feature=feature, scenario=scenario, step=step, exception=exception + ) + raise + await _execute_step_function(request, scenario, step, step_func) + finally: + request.config.hook.pytest_bdd_after_scenario(request=request, feature=feature, scenario=scenario) + + +FakeRequest = collections.namedtuple("FakeRequest", ["module"]) + + +def _get_scenario_decorator(feature, feature_name, scenario, scenario_name): + # HACK: Ideally we would use `def decorator(fn)`, but we want to return a custom exception + # when the decorator is misused. + # Pytest inspect the signature to determine the required fixtures, and in that case it would look + # for a fixture called "fn" that doesn't exist (if it exists then it's even worse). + # It will error with a "fixture 'fn' not found" message instead. + # We can avoid this hack by using a pytest hook and check for misuse instead. + def decorator(*args): + if not args: + raise exceptions.ScenarioIsDecoratorOnly( + "scenario function can only be used as a decorator. Refer to the documentation." + ) + [fn] = args + args = get_args(fn) + function_args = list(args) + for arg in scenario.get_example_params(): + if arg not in function_args: + function_args.append(arg) + + @pytest.mark.usefixtures(*function_args) + async def scenario_wrapper(request): + await _execute_scenario(feature, scenario, request) + return await fn(*[request.getfixturevalue(arg) for arg in args]) + + for param_set in scenario.get_params(): + if param_set: + scenario_wrapper = pytest.mark.parametrize(*param_set)(scenario_wrapper) + for tag in scenario.tags.union(feature.tags): + config = CONFIG_STACK[-1] + config.hook.pytest_bdd_apply_tag(tag=tag, function=scenario_wrapper) + + scenario_wrapper.__doc__ = f"{feature_name}: {scenario_name}" + scenario_wrapper.__scenario__ = scenario + scenario.test_function = scenario_wrapper + return scenario_wrapper + + return decorator + + +def scenario(feature_name, scenario_name, encoding="utf-8", example_converters=None, features_base_dir=None): + """Scenario decorator. + + :param str feature_name: Feature file name. Absolute or relative to the configured feature base path. + :param str scenario_name: Scenario name. + :param str encoding: Feature file encoding. + :param dict example_converters: optional `dict` of example converter function, where key is the name of the + example parameter, and value is the converter function. + """ + + scenario_name = str(scenario_name) + caller_module_path = get_caller_module_path() + + # Get the feature + if features_base_dir is None: + features_base_dir = get_features_base_dir(caller_module_path) + feature = get_feature(features_base_dir, feature_name, encoding=encoding) + + # Get the scenario + try: + scenario = feature.scenarios[scenario_name] + except KeyError: + feature_name = feature.name or "[Empty]" + raise exceptions.ScenarioNotFound( + f'Scenario "{scenario_name}" in feature "{feature_name}" in {feature.filename} is not found.' + ) + + scenario.example_converters = example_converters + + # Validate the scenario + scenario.validate() + + return _get_scenario_decorator( + feature=feature, feature_name=feature_name, scenario=scenario, scenario_name=scenario_name + ) + + +def get_features_base_dir(caller_module_path): + default_base_dir = os.path.dirname(caller_module_path) + return get_from_ini("bdd_features_base_dir", default_base_dir) + + +def get_from_ini(key, default): + """Get value from ini config. Return default if value has not been set. + + Use if the default value is dynamic. Otherwise set default on addini call. + """ + config = CONFIG_STACK[-1] + value = config.getini(key) + return value if value != "" else default + + +def make_python_name(string): + """Make python attribute name out of a given string.""" + string = re.sub(PYTHON_REPLACE_REGEX, "", string.replace(" ", "_")) + return re.sub(ALPHA_REGEX, "", string).lower() + + +def make_python_docstring(string): + """Make a python docstring literal out of a given string.""" + return '"""{}."""'.format(string.replace('"""', '\\"\\"\\"')) + + +def make_string_literal(string): + """Make python string literal out of a given string.""" + return "'{}'".format(string.replace("'", "\\'")) + + +def get_python_name_generator(name): + """Generate a sequence of suitable python names out of given arbitrary string name.""" + python_name = make_python_name(name) + suffix = "" + index = 0 + + def get_name(): + return f"test_{python_name}{suffix}" + + while True: + yield get_name() + index += 1 + suffix = f"_{index}" + + +def scenarios(*feature_paths, **kwargs): + """Parse features from the paths and put all found scenarios in the caller module. + + :param *feature_paths: feature file paths to use for scenarios + """ + caller_locals = get_caller_module_locals() + caller_path = get_caller_module_path() + + features_base_dir = kwargs.get("features_base_dir") + if features_base_dir is None: + features_base_dir = get_features_base_dir(caller_path) + + abs_feature_paths = [] + for path in feature_paths: + if not os.path.isabs(path): + path = os.path.abspath(os.path.join(features_base_dir, path)) + abs_feature_paths.append(path) + found = False + + module_scenarios = frozenset( + (attr.__scenario__.feature.filename, attr.__scenario__.name) + for name, attr in caller_locals.items() + if hasattr(attr, "__scenario__") + ) + + for feature in get_features(abs_feature_paths): + for scenario_name, scenario_object in feature.scenarios.items(): + # skip already bound scenarios + if (scenario_object.feature.filename, scenario_name) not in module_scenarios: + + @scenario(feature.filename, scenario_name, **kwargs) + async def _scenario(): + pass # pragma: no cover + + for test_name in get_python_name_generator(scenario_name): + if test_name not in caller_locals: + # found an unique test name + caller_locals[test_name] = _scenario + break + found = True + if not found: + raise exceptions.NoScenariosFound(abs_feature_paths) diff --git a/pytest_bdd/scenario.py b/pytest_bdd/scenario.py index 2b9c6f2c..51c5deaf 100644 --- a/pytest_bdd/scenario.py +++ b/pytest_bdd/scenario.py @@ -1,307 +1,22 @@ -"""Scenario implementation. - -The pytest will collect the test case and the steps will be executed -line by line. - -Example: - -test_publish_article = scenario( - feature_name="publish_article.feature", - scenario_name="Publishing the article", +__all__ = [ + "async_scenario", + "async_scenarios", + "scenario", + "scenarios", + "find_argumented_step_fixture_name", + "make_python_docstring", + "make_python_name", + "make_string_literal", + "get_python_name_generator", +] + +from ._async.scenario import scenario as async_scenario, scenarios as async_scenarios +from ._sync.scenario import ( + scenario, + scenarios, + find_argumented_step_fixture_name, + make_python_docstring, + make_python_name, + make_string_literal, + get_python_name_generator, ) -""" -import collections -import os -import re - -import pytest -from _pytest.fixtures import FixtureLookupError - -from . import exceptions -from .feature import get_feature, get_features -from .steps import get_step_fixture_name, inject_fixture -from .utils import CONFIG_STACK, get_args, get_caller_module_locals, get_caller_module_path - -PYTHON_REPLACE_REGEX = re.compile(r"\W") -ALPHA_REGEX = re.compile(r"^\d+_*") - - -def find_argumented_step_fixture_name(name, type_, fixturemanager, request=None): - """Find argumented step fixture name.""" - # happens to be that _arg2fixturedefs is changed during the iteration so we use a copy - for fixturename, fixturedefs in list(fixturemanager._arg2fixturedefs.items()): - for fixturedef in fixturedefs: - parser = getattr(fixturedef.func, "parser", None) - if parser is None: - continue - match = parser.is_matching(name) - if not match: - continue - - converters = getattr(fixturedef.func, "converters", {}) - for arg, value in parser.parse_arguments(name).items(): - if arg in converters: - value = converters[arg](value) - if request: - inject_fixture(request, arg, value) - parser_name = get_step_fixture_name(parser.name, type_) - if request: - try: - request.getfixturevalue(parser_name) - except FixtureLookupError: - continue - return parser_name - - -def _find_step_function(request, step, scenario): - """Match the step defined by the regular expression pattern. - - :param request: PyTest request object. - :param step: Step. - :param scenario: Scenario. - - :return: Function of the step. - :rtype: function - """ - name = step.name - try: - # Simple case where no parser is used for the step - return request.getfixturevalue(get_step_fixture_name(name, step.type)) - except FixtureLookupError: - try: - # Could not find a fixture with the same name, let's see if there is a parser involved - name = find_argumented_step_fixture_name(name, step.type, request._fixturemanager, request) - if name: - return request.getfixturevalue(name) - raise - except FixtureLookupError: - raise exceptions.StepDefinitionNotFoundError( - f"Step definition is not found: {step}. " - f'Line {step.line_number} in scenario "{scenario.name}" in the feature "{scenario.feature.filename}"' - ) - - -def _execute_step_function(request, scenario, step, step_func): - """Execute step function. - - :param request: PyTest request. - :param scenario: Scenario. - :param step: Step. - :param function step_func: Step function. - :param example: Example table. - """ - kw = dict(request=request, feature=scenario.feature, scenario=scenario, step=step, step_func=step_func) - - request.config.hook.pytest_bdd_before_step(**kw) - - kw["step_func_args"] = {} - try: - # Get the step argument values. - kwargs = {arg: request.getfixturevalue(arg) for arg in get_args(step_func)} - kw["step_func_args"] = kwargs - - request.config.hook.pytest_bdd_before_step_call(**kw) - target_fixture = getattr(step_func, "target_fixture", None) - # Execute the step. - return_value = step_func(**kwargs) - if target_fixture: - inject_fixture(request, target_fixture, return_value) - - request.config.hook.pytest_bdd_after_step(**kw) - except Exception as exception: - request.config.hook.pytest_bdd_step_error(exception=exception, **kw) - raise - - -def _execute_scenario(feature, scenario, request): - """Execute the scenario. - - :param feature: Feature. - :param scenario: Scenario. - :param request: request. - :param encoding: Encoding. - """ - request.config.hook.pytest_bdd_before_scenario(request=request, feature=feature, scenario=scenario) - - try: - # Execute scenario steps - for step in scenario.steps: - try: - step_func = _find_step_function(request, step, scenario) - except exceptions.StepDefinitionNotFoundError as exception: - request.config.hook.pytest_bdd_step_func_lookup_error( - request=request, feature=feature, scenario=scenario, step=step, exception=exception - ) - raise - _execute_step_function(request, scenario, step, step_func) - finally: - request.config.hook.pytest_bdd_after_scenario(request=request, feature=feature, scenario=scenario) - - -FakeRequest = collections.namedtuple("FakeRequest", ["module"]) - - -def _get_scenario_decorator(feature, feature_name, scenario, scenario_name): - # HACK: Ideally we would use `def decorator(fn)`, but we want to return a custom exception - # when the decorator is misused. - # Pytest inspect the signature to determine the required fixtures, and in that case it would look - # for a fixture called "fn" that doesn't exist (if it exists then it's even worse). - # It will error with a "fixture 'fn' not found" message instead. - # We can avoid this hack by using a pytest hook and check for misuse instead. - def decorator(*args): - if not args: - raise exceptions.ScenarioIsDecoratorOnly( - "scenario function can only be used as a decorator. Refer to the documentation." - ) - [fn] = args - args = get_args(fn) - function_args = list(args) - for arg in scenario.get_example_params(): - if arg not in function_args: - function_args.append(arg) - - @pytest.mark.usefixtures(*function_args) - def scenario_wrapper(request): - _execute_scenario(feature, scenario, request) - return fn(*[request.getfixturevalue(arg) for arg in args]) - - for param_set in scenario.get_params(): - if param_set: - scenario_wrapper = pytest.mark.parametrize(*param_set)(scenario_wrapper) - for tag in scenario.tags.union(feature.tags): - config = CONFIG_STACK[-1] - config.hook.pytest_bdd_apply_tag(tag=tag, function=scenario_wrapper) - - scenario_wrapper.__doc__ = f"{feature_name}: {scenario_name}" - scenario_wrapper.__scenario__ = scenario - scenario.test_function = scenario_wrapper - return scenario_wrapper - - return decorator - - -def scenario(feature_name, scenario_name, encoding="utf-8", example_converters=None, features_base_dir=None): - """Scenario decorator. - - :param str feature_name: Feature file name. Absolute or relative to the configured feature base path. - :param str scenario_name: Scenario name. - :param str encoding: Feature file encoding. - :param dict example_converters: optional `dict` of example converter function, where key is the name of the - example parameter, and value is the converter function. - """ - - scenario_name = str(scenario_name) - caller_module_path = get_caller_module_path() - - # Get the feature - if features_base_dir is None: - features_base_dir = get_features_base_dir(caller_module_path) - feature = get_feature(features_base_dir, feature_name, encoding=encoding) - - # Get the scenario - try: - scenario = feature.scenarios[scenario_name] - except KeyError: - feature_name = feature.name or "[Empty]" - raise exceptions.ScenarioNotFound( - f'Scenario "{scenario_name}" in feature "{feature_name}" in {feature.filename} is not found.' - ) - - scenario.example_converters = example_converters - - # Validate the scenario - scenario.validate() - - return _get_scenario_decorator( - feature=feature, feature_name=feature_name, scenario=scenario, scenario_name=scenario_name - ) - - -def get_features_base_dir(caller_module_path): - default_base_dir = os.path.dirname(caller_module_path) - return get_from_ini("bdd_features_base_dir", default_base_dir) - - -def get_from_ini(key, default): - """Get value from ini config. Return default if value has not been set. - - Use if the default value is dynamic. Otherwise set default on addini call. - """ - config = CONFIG_STACK[-1] - value = config.getini(key) - return value if value != "" else default - - -def make_python_name(string): - """Make python attribute name out of a given string.""" - string = re.sub(PYTHON_REPLACE_REGEX, "", string.replace(" ", "_")) - return re.sub(ALPHA_REGEX, "", string).lower() - - -def make_python_docstring(string): - """Make a python docstring literal out of a given string.""" - return '"""{}."""'.format(string.replace('"""', '\\"\\"\\"')) - - -def make_string_literal(string): - """Make python string literal out of a given string.""" - return "'{}'".format(string.replace("'", "\\'")) - - -def get_python_name_generator(name): - """Generate a sequence of suitable python names out of given arbitrary string name.""" - python_name = make_python_name(name) - suffix = "" - index = 0 - - def get_name(): - return f"test_{python_name}{suffix}" - - while True: - yield get_name() - index += 1 - suffix = f"_{index}" - - -def scenarios(*feature_paths, **kwargs): - """Parse features from the paths and put all found scenarios in the caller module. - - :param *feature_paths: feature file paths to use for scenarios - """ - caller_locals = get_caller_module_locals() - caller_path = get_caller_module_path() - - features_base_dir = kwargs.get("features_base_dir") - if features_base_dir is None: - features_base_dir = get_features_base_dir(caller_path) - - abs_feature_paths = [] - for path in feature_paths: - if not os.path.isabs(path): - path = os.path.abspath(os.path.join(features_base_dir, path)) - abs_feature_paths.append(path) - found = False - - module_scenarios = frozenset( - (attr.__scenario__.feature.filename, attr.__scenario__.name) - for name, attr in caller_locals.items() - if hasattr(attr, "__scenario__") - ) - - for feature in get_features(abs_feature_paths): - for scenario_name, scenario_object in feature.scenarios.items(): - # skip already bound scenarios - if (scenario_object.feature.filename, scenario_name) not in module_scenarios: - - @scenario(feature.filename, scenario_name, **kwargs) - def _scenario(): - pass # pragma: no cover - - for test_name in get_python_name_generator(scenario_name): - if test_name not in caller_locals: - # found an unique test name - caller_locals[test_name] = _scenario - break - found = True - if not found: - raise exceptions.NoScenariosFound(abs_feature_paths) diff --git a/setup.cfg b/setup.cfg index 6553a080..8b3755b6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,7 +35,7 @@ install_requires = pytest>=4.3 tests_require = tox -packages = pytest_bdd +packages = find: include_package_data = True [options.entry_points] diff --git a/setup.py b/setup.py index 60684932..f7439e1c 100755 --- a/setup.py +++ b/setup.py @@ -1,3 +1,12 @@ +import unasync from setuptools import setup -setup() +setup( + cmdclass={ + "build_py": unasync.cmdclass_build_py( + rules=[ + unasync.Rule("/pytest_bdd/_async/", "/pytest_bdd/_sync/"), + ] + ) + } +) diff --git a/tests/feature/test_async_scenarios.py b/tests/feature/test_async_scenarios.py new file mode 100644 index 00000000..bd68fd5c --- /dev/null +++ b/tests/feature/test_async_scenarios.py @@ -0,0 +1,104 @@ +"""Test scenarios shortcut.""" +import textwrap + +from tests.utils import assert_outcomes + + +def test_scenarios(testdir, pytest_params): + """Test scenarios shortcut (used together with @scenario for individual test override).""" + testdir.makeini( + """ + [pytest] + console_output_style=classic + """ + ) + testdir.makeconftest( + """ + import pytest + from pytest_bdd import given + + import anyio + + @given('I have a bar') + async def i_have_bar(): + await anyio.sleep(0) + print('bar!') + return 'bar' + """ + ) + features = testdir.mkdir("features") + features.join("test.feature").write_text( + textwrap.dedent( + """ + @anyio + Scenario: Test scenario + Given I have a bar + """ + ), + "utf-8", + ensure=True, + ) + features.join("subfolder", "test.feature").write_text( + textwrap.dedent( + """ + @anyio + Scenario: Test subfolder scenario + Given I have a bar + + @anyio + Scenario: Test failing subfolder scenario + Given I have a failing bar + + @anyio + Scenario: Test already bound scenario + Given I have a bar + + @anyio + Scenario: Test scenario + Given I have a bar + """ + ), + "utf-8", + ensure=True, + ) + testdir.makepyfile( + """ + import pytest + from pytest_bdd import async_scenarios, async_scenario + + @pytest.mark.anyio + @async_scenario('features/subfolder/test.feature', 'Test already bound scenario') + async def test_already_bound(): + pass + + async_scenarios('features') + """ + ) + result = testdir.runpytest_subprocess("-v", "-s", *pytest_params) + assert_outcomes(result, passed=8, failed=2) + result.stdout.fnmatch_lines(["*collected 10 items"]) + result.stdout.fnmatch_lines(["*test_test_subfolder_scenario[[]asyncio[]] *bar!", "PASSED"]) + result.stdout.fnmatch_lines(["*test_test_subfolder_scenario[[]trio[]] *bar!", "PASSED"]) + result.stdout.fnmatch_lines(["*test_test_scenario[[]asyncio[]] *bar!", "PASSED"]) + result.stdout.fnmatch_lines(["*test_test_scenario[[]trio[]] *bar!", "PASSED"]) + result.stdout.fnmatch_lines(["*test_test_failing_subfolder_scenario[[]asyncio[]] *FAILED"]) + result.stdout.fnmatch_lines(["*test_test_failing_subfolder_scenario[[]trio[]] *FAILED"]) + result.stdout.fnmatch_lines(["*test_already_bound[[]asyncio[]] *bar!", "PASSED"]) + result.stdout.fnmatch_lines(["*test_already_bound[[]trio[]] *bar!", "PASSED"]) + result.stdout.fnmatch_lines(["*test_test_scenario_1[[]asyncio[]] *bar!", "PASSED"]) + result.stdout.fnmatch_lines(["*test_test_scenario_1[[]trio[]] *bar!", "PASSED"]) + + +def test_scenarios_none_found(testdir, pytest_params): + """Test scenarios shortcut when no scenarios found.""" + testpath = testdir.makepyfile( + """ + import pytest + from pytest_bdd import async_scenarios + + async_scenarios('.') + """ + ) + result = testdir.runpytest_subprocess(testpath, *pytest_params) + assert_outcomes(result, errors=1) + result.stdout.fnmatch_lines(["*NoScenariosFound*"]) diff --git a/tox.ini b/tox.ini index 219b1cbb..901be7ab 100644 --- a/tox.ini +++ b/tox.ini @@ -1,4 +1,8 @@ [tox] +minversion=3.23.1 +requires= + virtualenv>=20.4.7 +isolated_build=true distshare = {homedir}/.tox/distshare envlist = py38-pytestlatest-linters, py39-pytest{43,44,45,46,50,51,52,53,54,60,61,62, latest}-coverage, @@ -28,6 +32,7 @@ deps = coverage: coverage xdist: pytest-xdist -r{toxinidir}/requirements-testing.txt + anyio[trio] commands = {env:_PYTEST_CMD:pytest} {env:_PYTEST_MORE_ARGS:} {posargs:-vvl} ; Black doesn't support >py38 now From 677a47f3684d08477301b25f36ef5228217d9bbb Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 3 Jun 2021 18:38:24 +0100 Subject: [PATCH 2/9] fix tox -e *-coverage running against source files --- tox.ini | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tox.ini b/tox.ini index 901be7ab..81fc237e 100644 --- a/tox.ini +++ b/tox.ini @@ -12,7 +12,7 @@ skip_missing_interpreters = true [testenv] setenv = - coverage: _PYTEST_CMD=coverage run --append -m pytest + coverage: _PYTEST_CMD=pytest --cov xdist: _PYTEST_MORE_ARGS=-n3 -rfsxX deps = pytestlatest: pytest @@ -29,7 +29,7 @@ deps = pytest44: pytest~=4.4.0 pytest43: pytest~=4.3.0 - coverage: coverage + coverage: pytest-cov xdist: pytest-xdist -r{toxinidir}/requirements-testing.txt anyio[trio] From abbf6699f64aa861f092c188032f9ab7d2f01971 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 3 Jun 2021 18:44:59 +0100 Subject: [PATCH 3/9] xfail async scenarios test on py3.10 --- tests/feature/test_async_scenarios.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/feature/test_async_scenarios.py b/tests/feature/test_async_scenarios.py index bd68fd5c..4bae0d14 100644 --- a/tests/feature/test_async_scenarios.py +++ b/tests/feature/test_async_scenarios.py @@ -1,9 +1,15 @@ """Test scenarios shortcut.""" +import sys import textwrap +import pytest + from tests.utils import assert_outcomes +@pytest.mark.xfail( + sys.version_info >= (3, 10), reason="trio does not support py3.10 https://github.com/python-trio/trio/pull/1921" +) def test_scenarios(testdir, pytest_params): """Test scenarios shortcut (used together with @scenario for individual test override).""" testdir.makeini( From 750e3dcc15017297f7225bbc87081c1f3a36766c Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 3 Jun 2021 18:51:22 +0100 Subject: [PATCH 4/9] collect reporting from pytest_bdd package --- .coveragerc | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/.coveragerc b/.coveragerc index 91f6be48..56cf47fa 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,5 +1,10 @@ [run] branch = true -include = - pytest_bdd/* - tests/* +source_pkgs = pytest_bdd +source = tests + +[paths] +source = + . + .tox/*/lib/*/site-packages/ + .tox\\*\\Lib\\site-packages\\ From 7e35215217ac31b8fc692d627a45a87e7cda20e3 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Thu, 3 Jun 2021 18:51:32 +0100 Subject: [PATCH 5/9] ignore coverage part files --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 8ddc92e6..89e984ff 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,7 @@ pip-log.txt # Unit test / coverage reports .coverage +.coverage.* .tox nosetests.xml From c1e1ee1f31f71775e80e0a1cbad63efe8cbe485a Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Wed, 16 Jun 2021 23:22:54 +0100 Subject: [PATCH 6/9] upgrade xdist for sys.path fix --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 81fc237e..89b84563 100644 --- a/tox.ini +++ b/tox.ini @@ -30,7 +30,7 @@ deps = pytest43: pytest~=4.3.0 coverage: pytest-cov - xdist: pytest-xdist + xdist: pytest-xdist>=2.3.0 -r{toxinidir}/requirements-testing.txt anyio[trio] commands = {env:_PYTEST_CMD:pytest} {env:_PYTEST_MORE_ARGS:} {posargs:-vvl} From 72be7f2e5719f3722761818b5893bc12cfb054da Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sun, 27 Jun 2021 11:39:40 +0100 Subject: [PATCH 7/9] Revert "xfail async scenarios test on py3.10" This reverts commit abbf6699f64aa861f092c188032f9ab7d2f01971. --- tests/feature/test_async_scenarios.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/feature/test_async_scenarios.py b/tests/feature/test_async_scenarios.py index 4bae0d14..bd68fd5c 100644 --- a/tests/feature/test_async_scenarios.py +++ b/tests/feature/test_async_scenarios.py @@ -1,15 +1,9 @@ """Test scenarios shortcut.""" -import sys import textwrap -import pytest - from tests.utils import assert_outcomes -@pytest.mark.xfail( - sys.version_info >= (3, 10), reason="trio does not support py3.10 https://github.com/python-trio/trio/pull/1921" -) def test_scenarios(testdir, pytest_params): """Test scenarios shortcut (used together with @scenario for individual test override).""" testdir.makeini( From 92bac990418f9e9b08aaba1dc3780541f9a53212 Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sun, 27 Jun 2021 12:04:06 +0100 Subject: [PATCH 8/9] remove unasync --- pyproject.toml | 2 +- pytest_bdd/__init__.py | 4 +- pytest_bdd/_async/__init__.py | 0 pytest_bdd/_async/scenario.py | 307 ---------------------- pytest_bdd/scenario.py | 360 ++++++++++++++++++++++++-- setup.py | 11 +- tests/feature/test_async_scenarios.py | 10 +- 7 files changed, 348 insertions(+), 346 deletions(-) delete mode 100644 pytest_bdd/_async/__init__.py delete mode 100644 pytest_bdd/_async/scenario.py diff --git a/pyproject.toml b/pyproject.toml index 544a65fb..675cf18d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ["setuptools", "wheel", "unasync~=0.5.0"] +requires = ["setuptools", "wheel"] build-backend = "setuptools.build_meta" [tool.black] diff --git a/pytest_bdd/__init__.py b/pytest_bdd/__init__.py index 7c9ed65d..521513de 100644 --- a/pytest_bdd/__init__.py +++ b/pytest_bdd/__init__.py @@ -1,8 +1,8 @@ """pytest-bdd public API.""" from pytest_bdd.steps import given, when, then -from pytest_bdd.scenario import scenario, scenarios, async_scenario, async_scenarios +from pytest_bdd.scenario import scenario, scenarios __version__ = "4.0.2" -__all__ = ["given", "when", "then", "scenario", "scenarios", "async_scenario", "async_scenarios"] +__all__ = ["given", "when", "then", "scenario", "scenarios"] diff --git a/pytest_bdd/_async/__init__.py b/pytest_bdd/_async/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/pytest_bdd/_async/scenario.py b/pytest_bdd/_async/scenario.py deleted file mode 100644 index a98ce1af..00000000 --- a/pytest_bdd/_async/scenario.py +++ /dev/null @@ -1,307 +0,0 @@ -"""Scenario implementation. - -The pytest will collect the test case and the steps will be executed -line by line. - -Example: - -test_publish_article = scenario( - feature_name="publish_article.feature", - scenario_name="Publishing the article", -) -""" -import collections -import os -import re - -import pytest -from _pytest.fixtures import FixtureLookupError - -from .. import exceptions -from ..feature import get_feature, get_features -from ..steps import get_step_fixture_name, inject_fixture -from ..utils import CONFIG_STACK, get_args, get_caller_module_locals, get_caller_module_path - -PYTHON_REPLACE_REGEX = re.compile(r"\W") -ALPHA_REGEX = re.compile(r"^\d+_*") - - -def find_argumented_step_fixture_name(name, type_, fixturemanager, request=None): - """Find argumented step fixture name.""" - # happens to be that _arg2fixturedefs is changed during the iteration so we use a copy - for fixturename, fixturedefs in list(fixturemanager._arg2fixturedefs.items()): - for fixturedef in fixturedefs: - parser = getattr(fixturedef.func, "parser", None) - if parser is None: - continue - match = parser.is_matching(name) - if not match: - continue - - converters = getattr(fixturedef.func, "converters", {}) - for arg, value in parser.parse_arguments(name).items(): - if arg in converters: - value = converters[arg](value) - if request: - inject_fixture(request, arg, value) - parser_name = get_step_fixture_name(parser.name, type_) - if request: - try: - request.getfixturevalue(parser_name) - except FixtureLookupError: - continue - return parser_name - - -def _find_step_function(request, step, scenario): - """Match the step defined by the regular expression pattern. - - :param request: PyTest request object. - :param step: Step. - :param scenario: Scenario. - - :return: Function of the step. - :rtype: function - """ - name = step.name - try: - # Simple case where no parser is used for the step - return request.getfixturevalue(get_step_fixture_name(name, step.type)) - except FixtureLookupError: - try: - # Could not find a fixture with the same name, let's see if there is a parser involved - name = find_argumented_step_fixture_name(name, step.type, request._fixturemanager, request) - if name: - return request.getfixturevalue(name) - raise - except FixtureLookupError: - raise exceptions.StepDefinitionNotFoundError( - f"Step definition is not found: {step}. " - f'Line {step.line_number} in scenario "{scenario.name}" in the feature "{scenario.feature.filename}"' - ) - - -async def _execute_step_function(request, scenario, step, step_func): - """Execute step function. - - :param request: PyTest request. - :param scenario: Scenario. - :param step: Step. - :param function step_func: Step function. - :param example: Example table. - """ - kw = dict(request=request, feature=scenario.feature, scenario=scenario, step=step, step_func=step_func) - - request.config.hook.pytest_bdd_before_step(**kw) - - kw["step_func_args"] = {} - try: - # Get the step argument values. - kwargs = {arg: request.getfixturevalue(arg) for arg in get_args(step_func)} - kw["step_func_args"] = kwargs - - request.config.hook.pytest_bdd_before_step_call(**kw) - target_fixture = getattr(step_func, "target_fixture", None) - # Execute the step. - return_value = await step_func(**kwargs) - if target_fixture: - inject_fixture(request, target_fixture, return_value) - - request.config.hook.pytest_bdd_after_step(**kw) - except Exception as exception: - request.config.hook.pytest_bdd_step_error(exception=exception, **kw) - raise - - -async def _execute_scenario(feature, scenario, request): - """Execute the scenario. - - :param feature: Feature. - :param scenario: Scenario. - :param request: request. - :param encoding: Encoding. - """ - request.config.hook.pytest_bdd_before_scenario(request=request, feature=feature, scenario=scenario) - - try: - # Execute scenario steps - for step in scenario.steps: - try: - step_func = _find_step_function(request, step, scenario) - except exceptions.StepDefinitionNotFoundError as exception: - request.config.hook.pytest_bdd_step_func_lookup_error( - request=request, feature=feature, scenario=scenario, step=step, exception=exception - ) - raise - await _execute_step_function(request, scenario, step, step_func) - finally: - request.config.hook.pytest_bdd_after_scenario(request=request, feature=feature, scenario=scenario) - - -FakeRequest = collections.namedtuple("FakeRequest", ["module"]) - - -def _get_scenario_decorator(feature, feature_name, scenario, scenario_name): - # HACK: Ideally we would use `def decorator(fn)`, but we want to return a custom exception - # when the decorator is misused. - # Pytest inspect the signature to determine the required fixtures, and in that case it would look - # for a fixture called "fn" that doesn't exist (if it exists then it's even worse). - # It will error with a "fixture 'fn' not found" message instead. - # We can avoid this hack by using a pytest hook and check for misuse instead. - def decorator(*args): - if not args: - raise exceptions.ScenarioIsDecoratorOnly( - "scenario function can only be used as a decorator. Refer to the documentation." - ) - [fn] = args - args = get_args(fn) - function_args = list(args) - for arg in scenario.get_example_params(): - if arg not in function_args: - function_args.append(arg) - - @pytest.mark.usefixtures(*function_args) - async def scenario_wrapper(request): - await _execute_scenario(feature, scenario, request) - return await fn(*[request.getfixturevalue(arg) for arg in args]) - - for param_set in scenario.get_params(): - if param_set: - scenario_wrapper = pytest.mark.parametrize(*param_set)(scenario_wrapper) - for tag in scenario.tags.union(feature.tags): - config = CONFIG_STACK[-1] - config.hook.pytest_bdd_apply_tag(tag=tag, function=scenario_wrapper) - - scenario_wrapper.__doc__ = f"{feature_name}: {scenario_name}" - scenario_wrapper.__scenario__ = scenario - scenario.test_function = scenario_wrapper - return scenario_wrapper - - return decorator - - -def scenario(feature_name, scenario_name, encoding="utf-8", example_converters=None, features_base_dir=None): - """Scenario decorator. - - :param str feature_name: Feature file name. Absolute or relative to the configured feature base path. - :param str scenario_name: Scenario name. - :param str encoding: Feature file encoding. - :param dict example_converters: optional `dict` of example converter function, where key is the name of the - example parameter, and value is the converter function. - """ - - scenario_name = str(scenario_name) - caller_module_path = get_caller_module_path() - - # Get the feature - if features_base_dir is None: - features_base_dir = get_features_base_dir(caller_module_path) - feature = get_feature(features_base_dir, feature_name, encoding=encoding) - - # Get the scenario - try: - scenario = feature.scenarios[scenario_name] - except KeyError: - feature_name = feature.name or "[Empty]" - raise exceptions.ScenarioNotFound( - f'Scenario "{scenario_name}" in feature "{feature_name}" in {feature.filename} is not found.' - ) - - scenario.example_converters = example_converters - - # Validate the scenario - scenario.validate() - - return _get_scenario_decorator( - feature=feature, feature_name=feature_name, scenario=scenario, scenario_name=scenario_name - ) - - -def get_features_base_dir(caller_module_path): - default_base_dir = os.path.dirname(caller_module_path) - return get_from_ini("bdd_features_base_dir", default_base_dir) - - -def get_from_ini(key, default): - """Get value from ini config. Return default if value has not been set. - - Use if the default value is dynamic. Otherwise set default on addini call. - """ - config = CONFIG_STACK[-1] - value = config.getini(key) - return value if value != "" else default - - -def make_python_name(string): - """Make python attribute name out of a given string.""" - string = re.sub(PYTHON_REPLACE_REGEX, "", string.replace(" ", "_")) - return re.sub(ALPHA_REGEX, "", string).lower() - - -def make_python_docstring(string): - """Make a python docstring literal out of a given string.""" - return '"""{}."""'.format(string.replace('"""', '\\"\\"\\"')) - - -def make_string_literal(string): - """Make python string literal out of a given string.""" - return "'{}'".format(string.replace("'", "\\'")) - - -def get_python_name_generator(name): - """Generate a sequence of suitable python names out of given arbitrary string name.""" - python_name = make_python_name(name) - suffix = "" - index = 0 - - def get_name(): - return f"test_{python_name}{suffix}" - - while True: - yield get_name() - index += 1 - suffix = f"_{index}" - - -def scenarios(*feature_paths, **kwargs): - """Parse features from the paths and put all found scenarios in the caller module. - - :param *feature_paths: feature file paths to use for scenarios - """ - caller_locals = get_caller_module_locals() - caller_path = get_caller_module_path() - - features_base_dir = kwargs.get("features_base_dir") - if features_base_dir is None: - features_base_dir = get_features_base_dir(caller_path) - - abs_feature_paths = [] - for path in feature_paths: - if not os.path.isabs(path): - path = os.path.abspath(os.path.join(features_base_dir, path)) - abs_feature_paths.append(path) - found = False - - module_scenarios = frozenset( - (attr.__scenario__.feature.filename, attr.__scenario__.name) - for name, attr in caller_locals.items() - if hasattr(attr, "__scenario__") - ) - - for feature in get_features(abs_feature_paths): - for scenario_name, scenario_object in feature.scenarios.items(): - # skip already bound scenarios - if (scenario_object.feature.filename, scenario_name) not in module_scenarios: - - @scenario(feature.filename, scenario_name, **kwargs) - async def _scenario(): - pass # pragma: no cover - - for test_name in get_python_name_generator(scenario_name): - if test_name not in caller_locals: - # found an unique test name - caller_locals[test_name] = _scenario - break - found = True - if not found: - raise exceptions.NoScenariosFound(abs_feature_paths) diff --git a/pytest_bdd/scenario.py b/pytest_bdd/scenario.py index 51c5deaf..44dfed90 100644 --- a/pytest_bdd/scenario.py +++ b/pytest_bdd/scenario.py @@ -1,22 +1,340 @@ -__all__ = [ - "async_scenario", - "async_scenarios", - "scenario", - "scenarios", - "find_argumented_step_fixture_name", - "make_python_docstring", - "make_python_name", - "make_string_literal", - "get_python_name_generator", -] - -from ._async.scenario import scenario as async_scenario, scenarios as async_scenarios -from ._sync.scenario import ( - scenario, - scenarios, - find_argumented_step_fixture_name, - make_python_docstring, - make_python_name, - make_string_literal, - get_python_name_generator, +"""Scenario implementation. + +The pytest will collect the test case and the steps will be executed +line by line. + +Example: + +test_publish_article = scenario( + feature_name="publish_article.feature", + scenario_name="Publishing the article", ) +""" +import contextlib +import collections +import os +import re + +import pytest +from _pytest.fixtures import FixtureLookupError + +from . import exceptions +from .feature import get_feature, get_features +from .steps import get_step_fixture_name, inject_fixture +from .utils import CONFIG_STACK, get_args, get_caller_module_locals, get_caller_module_path + +PYTHON_REPLACE_REGEX = re.compile(r"\W") +ALPHA_REGEX = re.compile(r"^\d+_*") + + +def find_argumented_step_fixture_name(name, type_, fixturemanager, request=None): + """Find argumented step fixture name.""" + # happens to be that _arg2fixturedefs is changed during the iteration so we use a copy + for fixturename, fixturedefs in list(fixturemanager._arg2fixturedefs.items()): + for fixturedef in fixturedefs: + parser = getattr(fixturedef.func, "parser", None) + if parser is None: + continue + match = parser.is_matching(name) + if not match: + continue + + converters = getattr(fixturedef.func, "converters", {}) + for arg, value in parser.parse_arguments(name).items(): + if arg in converters: + value = converters[arg](value) + if request: + inject_fixture(request, arg, value) + parser_name = get_step_fixture_name(parser.name, type_) + if request: + try: + request.getfixturevalue(parser_name) + except FixtureLookupError: + continue + return parser_name + + +def _find_step_function(request, step, scenario): + """Match the step defined by the regular expression pattern. + + :param request: PyTest request object. + :param step: Step. + :param scenario: Scenario. + + :return: Function of the step. + :rtype: function + """ + name = step.name + try: + # Simple case where no parser is used for the step + return request.getfixturevalue(get_step_fixture_name(name, step.type)) + except FixtureLookupError: + try: + # Could not find a fixture with the same name, let's see if there is a parser involved + name = find_argumented_step_fixture_name(name, step.type, request._fixturemanager, request) + if name: + return request.getfixturevalue(name) + raise + except FixtureLookupError: + raise exceptions.StepDefinitionNotFoundError( + f"Step definition is not found: {step}. " + f'Line {step.line_number} in scenario "{scenario.name}" in the feature "{scenario.feature.filename}"' + ) + + +async def _execute_step_function(request, scenario, step, step_func, sync): + """Execute step function. + + :param request: PyTest request. + :param scenario: Scenario. + :param step: Step. + :param function step_func: Step function. + :param example: Example table. + """ + kw = dict(request=request, feature=scenario.feature, scenario=scenario, step=step, step_func=step_func) + + request.config.hook.pytest_bdd_before_step(**kw) + + kw["step_func_args"] = {} + try: + # Get the step argument values. + kwargs = {arg: request.getfixturevalue(arg) for arg in get_args(step_func)} + kw["step_func_args"] = kwargs + + request.config.hook.pytest_bdd_before_step_call(**kw) + target_fixture = getattr(step_func, "target_fixture", None) + # Execute the step. + if sync: + return_value = step_func(**kwargs) + else: + return_value = await step_func(**kwargs) + if target_fixture: + inject_fixture(request, target_fixture, return_value) + + request.config.hook.pytest_bdd_after_step(**kw) + except Exception as exception: + request.config.hook.pytest_bdd_step_error(exception=exception, **kw) + raise + + +async def _execute_scenario(feature, scenario, request, sync): + """Execute the scenario. + + :param feature: Feature. + :param scenario: Scenario. + :param request: request. + :param encoding: Encoding. + """ + request.config.hook.pytest_bdd_before_scenario(request=request, feature=feature, scenario=scenario) + + try: + # Execute scenario steps + for step in scenario.steps: + try: + step_func = _find_step_function(request, step, scenario) + except exceptions.StepDefinitionNotFoundError as exception: + request.config.hook.pytest_bdd_step_func_lookup_error( + request=request, feature=feature, scenario=scenario, step=step, exception=exception + ) + raise + await _execute_step_function(request, scenario, step, step_func, sync) + finally: + request.config.hook.pytest_bdd_after_scenario(request=request, feature=feature, scenario=scenario) + + +FakeRequest = collections.namedtuple("FakeRequest", ["module"]) + + +def await_(fn, *args): + v = fn(*args) + with contextlib.closing(v.__await__()) as gen: + try: + gen.send(None) + except StopIteration as e: + return e.value + else: + raise RuntimeError("coro did not stop") + + +def _get_scenario_decorator(feature, feature_name, scenario, scenario_name, *, sync): + # HACK: Ideally we would use `def decorator(fn)`, but we want to return a custom exception + # when the decorator is misused. + # Pytest inspect the signature to determine the required fixtures, and in that case it would look + # for a fixture called "fn" that doesn't exist (if it exists then it's even worse). + # It will error with a "fixture 'fn' not found" message instead. + # We can avoid this hack by using a pytest hook and check for misuse instead. + def decorator(*args): + if not args: + raise exceptions.ScenarioIsDecoratorOnly( + "scenario function can only be used as a decorator. Refer to the documentation." + ) + [fn] = args + args = get_args(fn) + function_args = list(args) + for arg in scenario.get_example_params(): + if arg not in function_args: + function_args.append(arg) + + if sync: + + @pytest.mark.usefixtures(*function_args) + def scenario_wrapper(request): + await_(_execute_scenario, feature, scenario, request, sync) + return fn(*[request.getfixturevalue(arg) for arg in args]) + + else: + + @pytest.mark.usefixtures(*function_args) + async def scenario_wrapper(request): + await _execute_scenario(feature, scenario, request, sync) + return await fn(*[request.getfixturevalue(arg) for arg in args]) + + for param_set in scenario.get_params(): + if param_set: + scenario_wrapper = pytest.mark.parametrize(*param_set)(scenario_wrapper) + for tag in scenario.tags.union(feature.tags): + config = CONFIG_STACK[-1] + config.hook.pytest_bdd_apply_tag(tag=tag, function=scenario_wrapper) + + scenario_wrapper.__doc__ = f"{feature_name}: {scenario_name}" + scenario_wrapper.__scenario__ = scenario + scenario.test_function = scenario_wrapper + return scenario_wrapper + + return decorator + + +def scenario(feature_name, scenario_name, encoding="utf-8", example_converters=None, features_base_dir=None, sync=True): + """Scenario decorator. + + :param str feature_name: Feature file name. Absolute or relative to the configured feature base path. + :param str scenario_name: Scenario name. + :param str encoding: Feature file encoding. + :param dict example_converters: optional `dict` of example converter function, where key is the name of the + example parameter, and value is the converter function. + """ + + scenario_name = str(scenario_name) + caller_module_path = get_caller_module_path() + + # Get the feature + if features_base_dir is None: + features_base_dir = get_features_base_dir(caller_module_path) + feature = get_feature(features_base_dir, feature_name, encoding=encoding) + + # Get the scenario + try: + scenario = feature.scenarios[scenario_name] + except KeyError: + feature_name = feature.name or "[Empty]" + raise exceptions.ScenarioNotFound( + f'Scenario "{scenario_name}" in feature "{feature_name}" in {feature.filename} is not found.' + ) + + scenario.example_converters = example_converters + + # Validate the scenario + scenario.validate() + + return _get_scenario_decorator( + feature=feature, feature_name=feature_name, scenario=scenario, scenario_name=scenario_name, sync=sync + ) + + +def get_features_base_dir(caller_module_path): + default_base_dir = os.path.dirname(caller_module_path) + return get_from_ini("bdd_features_base_dir", default_base_dir) + + +def get_from_ini(key, default): + """Get value from ini config. Return default if value has not been set. + + Use if the default value is dynamic. Otherwise set default on addini call. + """ + config = CONFIG_STACK[-1] + value = config.getini(key) + return value if value != "" else default + + +def make_python_name(string): + """Make python attribute name out of a given string.""" + string = re.sub(PYTHON_REPLACE_REGEX, "", string.replace(" ", "_")) + return re.sub(ALPHA_REGEX, "", string).lower() + + +def make_python_docstring(string): + """Make a python docstring literal out of a given string.""" + return '"""{}."""'.format(string.replace('"""', '\\"\\"\\"')) + + +def make_string_literal(string): + """Make python string literal out of a given string.""" + return "'{}'".format(string.replace("'", "\\'")) + + +def get_python_name_generator(name): + """Generate a sequence of suitable python names out of given arbitrary string name.""" + python_name = make_python_name(name) + suffix = "" + index = 0 + + def get_name(): + return f"test_{python_name}{suffix}" + + while True: + yield get_name() + index += 1 + suffix = f"_{index}" + + +def scenarios(*feature_paths, sync=True, **kwargs): + """Parse features from the paths and put all found scenarios in the caller module. + + :param *feature_paths: feature file paths to use for scenarios + """ + caller_locals = get_caller_module_locals() + caller_path = get_caller_module_path() + + features_base_dir = kwargs.get("features_base_dir") + if features_base_dir is None: + features_base_dir = get_features_base_dir(caller_path) + + abs_feature_paths = [] + for path in feature_paths: + if not os.path.isabs(path): + path = os.path.abspath(os.path.join(features_base_dir, path)) + abs_feature_paths.append(path) + found = False + + module_scenarios = frozenset( + (attr.__scenario__.feature.filename, attr.__scenario__.name) + for name, attr in caller_locals.items() + if hasattr(attr, "__scenario__") + ) + + for feature in get_features(abs_feature_paths): + for scenario_name, scenario_object in feature.scenarios.items(): + # skip already bound scenarios + if (scenario_object.feature.filename, scenario_name) not in module_scenarios: + + decorator = scenario(feature.filename, scenario_name, sync=sync, **kwargs) + if sync: + + @decorator + def _scenario(): + pass # pragma: no cover + + else: + + @decorator + async def _scenario(): + pass # pragma: no cover + + for test_name in get_python_name_generator(scenario_name): + if test_name not in caller_locals: + # found an unique test name + caller_locals[test_name] = _scenario + break + found = True + if not found: + raise exceptions.NoScenariosFound(abs_feature_paths) diff --git a/setup.py b/setup.py index f7439e1c..60684932 100755 --- a/setup.py +++ b/setup.py @@ -1,12 +1,3 @@ -import unasync from setuptools import setup -setup( - cmdclass={ - "build_py": unasync.cmdclass_build_py( - rules=[ - unasync.Rule("/pytest_bdd/_async/", "/pytest_bdd/_sync/"), - ] - ) - } -) +setup() diff --git a/tests/feature/test_async_scenarios.py b/tests/feature/test_async_scenarios.py index bd68fd5c..8c2dff4c 100644 --- a/tests/feature/test_async_scenarios.py +++ b/tests/feature/test_async_scenarios.py @@ -64,14 +64,14 @@ async def i_have_bar(): testdir.makepyfile( """ import pytest - from pytest_bdd import async_scenarios, async_scenario + from pytest_bdd import scenarios, scenario @pytest.mark.anyio - @async_scenario('features/subfolder/test.feature', 'Test already bound scenario') + @scenario('features/subfolder/test.feature', 'Test already bound scenario', sync=False) async def test_already_bound(): pass - async_scenarios('features') + scenarios('features', sync=False) """ ) result = testdir.runpytest_subprocess("-v", "-s", *pytest_params) @@ -94,9 +94,9 @@ def test_scenarios_none_found(testdir, pytest_params): testpath = testdir.makepyfile( """ import pytest - from pytest_bdd import async_scenarios + from pytest_bdd import scenarios - async_scenarios('.') + scenarios('.', sync=False) """ ) result = testdir.runpytest_subprocess(testpath, *pytest_params) From a33ad338f75e58c5ce24b86428c7e1fe9102390d Mon Sep 17 00:00:00 2001 From: Thomas Grainger Date: Sun, 27 Jun 2021 13:10:50 +0100 Subject: [PATCH 9/9] avoid _PYTEST_CMD --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 89b84563..f193cfe8 100644 --- a/tox.ini +++ b/tox.ini @@ -12,7 +12,7 @@ skip_missing_interpreters = true [testenv] setenv = - coverage: _PYTEST_CMD=pytest --cov + coverage: _PYTEST_MORE_ARGS=--cov xdist: _PYTEST_MORE_ARGS=-n3 -rfsxX deps = pytestlatest: pytest