Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
- {pull}`709` add uv pre-commit check.
- {pull}`713` removes uv as a test dependency. Closes {issue}`712`. Thanks to {user}`erooke`!
- {pull}`718` fixes {issue}`717` by properly parsing the `pdbcls` configuration option from config files. Thanks to {user}`MImmesberger` for the report!
- {pull}`719` fixes repeated tasks with the same function name in the programmatic interface to ensure all tasks execute correctly.

## 0.5.5 - 2025-07-25

Expand Down
269 changes: 252 additions & 17 deletions docs/source/how_to_guides/functional_interface.ipynb

Large diffs are not rendered by default.

34 changes: 26 additions & 8 deletions src/_pytask/collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from _pytask.shared import to_list
from _pytask.shared import unwrap_task_function
from _pytask.task_utils import COLLECTED_TASKS
from _pytask.task_utils import parse_collected_tasks_with_task_marker
from _pytask.task_utils import task as task_decorator
from _pytask.typing import is_task_function

Expand Down Expand Up @@ -108,6 +109,10 @@ def _collect_from_paths(session: Session) -> None:

def _collect_from_tasks(session: Session) -> None:
"""Collect tasks from user provided tasks via the functional interface."""
# First pass: collect and group tasks by path
tasks_by_path: dict[Path | None, list[Any]] = {}
non_task_objects = []

for raw_task in to_list(session.config.get("tasks", ())):
if is_task_function(raw_task):
if not hasattr(raw_task, "pytask_meta"):
Expand All @@ -117,18 +122,31 @@ def _collect_from_tasks(session: Session) -> None:
name = raw_task.pytask_meta.name

if has_mark(raw_task, "task"):
# When tasks with @task are passed to the programmatic interface multiple
# times, they are deleted from ``COLLECTED_TASKS`` in the first iteration
# and are missing in the later. See #625.
# When tasks with @task are passed to the programmatic interface
# multiple times, they are deleted from ``COLLECTED_TASKS`` in the first
# iteration and are missing in the later. See #625.
with suppress(ValueError):
COLLECTED_TASKS[path].remove(raw_task)

# When a task is not a callable, it can be anything or a PTask. Set arbitrary
# values and it will pass without errors and not collected.
# Group tasks by path for parametrization
if path not in tasks_by_path:
tasks_by_path[path] = []
tasks_by_path[path].append(raw_task)
else:
name = ""
path = None

# When a task is not a callable, it can be anything or a PTask. Set
# arbitrary values and it will pass without errors and not collected.
non_task_objects.append((raw_task, None, ""))

# Second pass: apply parametrization to grouped tasks
parametrized_tasks = []
for path, tasks in tasks_by_path.items():
# Apply the same parametrization logic as file-based collection
name_to_function = parse_collected_tasks_with_task_marker(tasks)
for name, function in name_to_function.items():
parametrized_tasks.append((function, path, name))

# Third pass: collect all tasks
for raw_task, path, name in parametrized_tasks + non_task_objects:
report = session.hook.pytask_collect_task_protocol(
session=session,
reports=session.collection_reports,
Expand Down
58 changes: 58 additions & 0 deletions tests/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,64 @@ def test_pass_non_task_to_functional_api_that_are_ignored():
assert len(session.tasks) == 0


@pytest.mark.skipif(
sys.platform == "win32" and os.environ.get("CI") == "true",
reason="Windows does not pick up the right Python interpreter.",
)
def test_repeated_tasks_via_functional_interface(tmp_path):
"""Test that repeated tasks with the same function name work correctly.

This test ensures that when multiple tasks with the same function name are passed
to pytask.build(), they all get unique IDs and execute correctly, similar to how
file-based collection handles repeated tasks.
"""
source = """
from pathlib import Path
from typing import Annotated
from pytask import Product, task, build, ExitCode
import sys

# Create repeated tasks with the same function name
tasks = []
for i in range(3):
def create_data(
value: int = i * 10,
produces: Annotated[Path, Product] = Path(f"output_{i}.txt")
) -> None:
'''Generate data based on a value.'''
produces.write_text(str(value))

tasks.append(create_data)

if __name__ == "__main__":
session = build(tasks=tasks)

# Verify all tasks were collected and executed
assert session.exit_code == ExitCode.OK, f"Exit code: {session.exit_code}"
assert len(session.tasks) == 3, f"Expected 3 tasks, got {len(session.tasks)}"
assert len(session.execution_reports) == 3

# Verify each task executed and produced the correct output
assert Path("output_0.txt").read_text() == "0"
assert Path("output_1.txt").read_text() == "10"
assert Path("output_2.txt").read_text() == "20"

# Verify tasks have unique names with repeated task IDs
task_names = [task.name for task in session.tasks]
assert len(task_names) == len(set(task_names)), "Task names should be unique"
assert all("create_data[" in name for name in task_names), \\
f"Task names should contain repeated task IDs: {task_names}"

sys.exit(session.exit_code)
"""
tmp_path.joinpath("task_module.py").write_text(textwrap.dedent(source))
result = run_in_subprocess(
(sys.executable, tmp_path.joinpath("task_module.py").as_posix()),
cwd=tmp_path,
)
assert result.exit_code == ExitCode.OK


def test_multiple_product_annotations(runner, tmp_path):
source = """
from pytask import Product
Expand Down
87 changes: 87 additions & 0 deletions tests/test_jupyter/test_repeated_tasks_functional_interface.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"id": "0",
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"from typing import Annotated\n",
"\n",
"import pytask\n",
"from pytask import ExitCode\n",
"from pytask import Product"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1",
"metadata": {},
"outputs": [],
"source": [
"# Create repeated tasks with the same function name\n",
"tasks = []\n",
"for i in range(3):\n",
"\n",
" def create_data(\n",
" value: int = i * 10,\n",
" produces: Annotated[Path, Product] = Path(f\"data_{i}.txt\"),\n",
" ):\n",
" \"\"\"Generate data based on a value.\"\"\"\n",
" produces.write_text(str(value))\n",
"\n",
" tasks.append(create_data)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "2",
"metadata": {},
"outputs": [],
"source": [
"# Test that all tasks execute correctly\n",
"session = pytask.build(tasks=tasks)\n",
"assert session.exit_code == ExitCode.OK\n",
"assert len(session.tasks) == 3, f\"Expected 3 tasks, got {len(session.tasks)}\"\n",
"assert len(session.execution_reports) == 3, (\n",
" f\"Expected 3 execution reports, got {len(session.execution_reports)}\"\n",
")\n",
"\n",
"# Verify each file was created with the correct content\n",
"assert Path(\"data_0.txt\").read_text() == \"0\"\n",
"assert Path(\"data_1.txt\").read_text() == \"10\"\n",
"assert Path(\"data_2.txt\").read_text() == \"20\"\n",
"\n",
"# Clean up\n",
"Path(\"data_0.txt\").unlink()\n",
"Path(\"data_1.txt\").unlink()\n",
"Path(\"data_2.txt\").unlink()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}