Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,26 @@ repos:
hooks:
- id: refurb
args: [--ignore, FURB126]
- repo: https://github.com/econchick/interrogate
rev: 1.5.0
hooks:
- id: interrogate
args: [-v, --fail-under=75]
exclude: ^(tests/|docs/|scripts/)
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v1.0.1'
hooks:
- id: mypy
args: [
--no-strict-optional,
--ignore-missing-imports,
]
additional_dependencies: [
attrs>=21.3.0,
click,
types-setuptools
]
pass_filenames: false
- repo: https://github.com/executablebooks/mdformat
rev: 0.7.16
hooks:
Expand Down Expand Up @@ -82,12 +102,6 @@ repos:
docs/source/tutorials/selecting_tasks.md|
docs/source/tutorials/set_up_a_project.md
)$
- repo: https://github.com/econchick/interrogate
rev: 1.5.0
hooks:
- id: interrogate
args: [-v, --fail-under=75]
exclude: ^(tests/|docs/|scripts/)
- repo: https://github.com/codespell-project/codespell
rev: v2.2.2
hooks:
Expand All @@ -99,20 +113,6 @@ repos:
- id: check-manifest
args: [--no-build-isolation]
additional_dependencies: [setuptools-scm, toml]
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v1.0.1'
hooks:
- id: mypy
args: [
--no-strict-optional,
--ignore-missing-imports,
]
additional_dependencies: [
attrs>=21.3.0,
click,
types-setuptools
]
pass_filenames: false
- repo: meta
hooks:
- id: check-hooks-apply
Expand Down
2 changes: 2 additions & 0 deletions docs/source/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ releases are available on [PyPI](https://pypi.org/project/pytask) and
- {pull}`354` adds a `-f/--force` flag to execute tasks even though nothing may have
changed.
- {pull}`355` refactors a lot of things related to nodes.
- {pull}`357` add hashing for task files to detect changes when modification times do
not match.

## 0.3.1 - 2023-12-25

Expand Down
27 changes: 21 additions & 6 deletions src/_pytask/dag.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""This module contains code related to resolving dependencies."""
from __future__ import annotations

import hashlib
import itertools
import sys

Expand Down Expand Up @@ -130,16 +131,30 @@ def _have_task_or_neighbors_changed(
def pytask_dag_has_node_changed(session: Session, node: Node, task_name: str) -> bool:
"""Indicate whether a single dependency or product has changed."""
if isinstance(node, (FilePathNode, Task)):
state = session.hook.pytask_node_state(node=node)
if state is None:
# If node does not exist, we receive None.
file_state = session.hook.pytask_node_state(node=node)
if file_state is None:
return True

# If the node is not in the database.
try:
state_in_db = State[task_name, node.name].state # type: ignore[misc]
has_changed = state != state_in_db
name = node.name
db_state = State[task_name, name] # type: ignore[type-arg, valid-type]
except orm.ObjectNotFound:
has_changed = True
return has_changed
return True

# If the modification times match, the node has not been changed.
if file_state == db_state.modification_time:
return False

# If the modification time changed, quickly return for non-tasks.
if isinstance(node, FilePathNode):
return True

# When modification times changed, we are still comparing the hash of the file
# to avoid unnecessary and expensive reexecutions of tasks.
file_hash = hashlib.sha256(node.path.read_bytes()).hexdigest()
return file_hash != db_state.file_hash
return None


Expand Down
31 changes: 25 additions & 6 deletions src/_pytask/database_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
"""This module contains utilities for the database."""
from __future__ import annotations

import hashlib

from _pytask.dag_utils import node_and_neighbors
from _pytask.nodes_utils import Task
from _pytask.session import Session
from pony import orm

Expand All @@ -17,7 +20,8 @@ class State(db.Entity): # type: ignore[name-defined]

task = orm.Required(str)
node = orm.Required(str)
state = orm.Required(str)
modification_time = orm.Required(str)
file_hash = orm.Optional(str)

orm.PrimaryKey(task, node)

Expand All @@ -34,19 +38,34 @@ def create_database(


@orm.db_session
def _create_or_update_state(first_key: str, second_key: str, state: str) -> None:
def _create_or_update_state(
first_key: str, second_key: str, modification_time: str, file_hash: str
) -> None:
"""Create or update a state."""
try:
state_in_db = State[first_key, second_key] # type: ignore[type-arg, valid-type]
except orm.ObjectNotFound:
State(task=first_key, node=second_key, state=state)
State(
task=first_key,
node=second_key,
modification_time=modification_time,
file_hash=file_hash,
)
else:
state_in_db.state = state
state_in_db.modification_time = modification_time
state_in_db.file_hash = file_hash


def update_states_in_database(session: Session, task_name: str) -> None:
"""Update the state for each node of a task in the database."""
for name in node_and_neighbors(session.dag, task_name):
node = session.dag.nodes[name].get("task") or session.dag.nodes[name]["node"]
state = session.hook.pytask_node_state(node=node)
_create_or_update_state(task_name, node.name, state)

modification_time = session.hook.pytask_node_state(node=node)

if isinstance(node, Task):
hash_ = hashlib.sha256(node.path.read_bytes()).hexdigest()
else:
hash_ = ""

_create_or_update_state(task_name, node.name, modification_time, hash_)
1 change: 1 addition & 0 deletions src/_pytask/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class DagReport:

@classmethod
def from_exception(cls, exc_info: ExceptionInfo) -> DagReport:
exc_info = remove_internal_traceback_frames_from_exc_info(exc_info)
return cls(exc_info)


Expand Down
4 changes: 2 additions & 2 deletions tests/test_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ def task_write(produces):
(in_path.as_posix(), in_path),
(out_path.as_posix(), out_path),
):
state = State[task_id, id_].state
assert float(state) == path.stat().st_mtime
modification_time = State[task_id, id_].modification_time
assert float(modification_time) == path.stat().st_mtime


@pytest.mark.end_to_end()
Expand Down
36 changes: 12 additions & 24 deletions tests/test_dry_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ def task_example(produces):
assert result.exit_code == ExitCode.OK
assert "2 Succeeded" in result.output

tmp_path.joinpath("task_example_first.py").write_text(textwrap.dedent(source))
# Add a new line to the file or the task would be skipped.
tmp_path.joinpath("task_example_first.py").write_text(
textwrap.dedent(source + "\n")
)

result = runner.invoke(cli, ["--dry-run", tmp_path.as_posix()])

Expand All @@ -63,49 +66,34 @@ def task_example(produces):
@pytest.mark.end_to_end()
def test_dry_run_w_subsequent_skipped_task(runner, tmp_path):
"""A skip is more important than a would be run."""
source = """
source_1 = """
import pytask

@pytask.mark.produces("out.txt")
def task_example(produces):
produces.touch()
"""
tmp_path.joinpath("task_example_first.py").write_text(textwrap.dedent(source))
tmp_path.joinpath("task_example_first.py").write_text(textwrap.dedent(source_1))

source = """
source_2 = """
import pytask

@pytask.mark.depends_on("out.txt")
@pytask.mark.produces("out_2.txt")
def task_example(produces):
produces.touch()
"""
tmp_path.joinpath("task_example_second.py").write_text(textwrap.dedent(source))
tmp_path.joinpath("task_example_second.py").write_text(textwrap.dedent(source_2))

result = runner.invoke(cli, [tmp_path.as_posix()])

assert result.exit_code == ExitCode.OK
assert "2 Succeeded" in result.output

source = """
import pytask

@pytask.mark.produces("out.txt")
def task_example(produces):
produces.touch()
"""
tmp_path.joinpath("task_example_first.py").write_text(textwrap.dedent(source))

source = """
import pytask

@pytask.mark.skip
@pytask.mark.depends_on("out.txt")
@pytask.mark.produces("out_2.txt")
def task_example(produces):
produces.touch()
"""
tmp_path.joinpath("task_example_second.py").write_text(textwrap.dedent(source))
tmp_path.joinpath("task_example_first.py").write_text(textwrap.dedent(source_1))
tmp_path.joinpath("task_example_second.py").write_text(
textwrap.dedent(source_2 + "\n")
)

result = runner.invoke(cli, ["--dry-run", tmp_path.as_posix()])

Expand Down
14 changes: 13 additions & 1 deletion tests/test_execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ def task_example():
assert "Collect 1 task"


@pytest.mark.end_to_end()
def test_task_executed_with_force_although_unchanged(tmp_path):
tmp_path.joinpath("task_module.py").write_text("def task_example(): pass")
session = main({"paths": tmp_path})
Expand All @@ -407,3 +406,16 @@ def test_task_executed_with_force_although_unchanged_runner(runner, tmp_path):

assert result.exit_code == ExitCode.OK
assert "1 Succeeded"


@pytest.mark.end_to_end()
def test_task_is_not_reexecuted_when_modification_changed_file_not(runner, tmp_path):
tmp_path.joinpath("task_example.py").write_text("def task_example(): pass")
result = runner.invoke(cli, [tmp_path.as_posix()])
assert result.exit_code == ExitCode.OK
assert "1 Succeeded" in result.output

tmp_path.joinpath("task_example.py").write_text("def task_example(): pass")
result = runner.invoke(cli, [tmp_path.as_posix()])
assert result.exit_code == ExitCode.OK
assert "1 Skipped" in result.output
4 changes: 2 additions & 2 deletions tests/test_persist.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ def task_dummy(depends_on, produces):
task_id = tmp_path.joinpath("task_module.py").as_posix() + "::task_dummy"
node_id = tmp_path.joinpath("out.txt").as_posix()

state = State[task_id, node_id].state
assert float(state) == tmp_path.joinpath("out.txt").stat().st_mtime
modification_time = State[task_id, node_id].modification_time
assert float(modification_time) == tmp_path.joinpath("out.txt").stat().st_mtime

session = main({"paths": tmp_path})

Expand Down