Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/fixes/#985-create-unique-ids-on-…
Browse files Browse the repository at this point in the history
…generated-tasks' into local
  • Loading branch information
nailend committed Oct 19, 2022
2 parents 353a31d + 8f5f6c4 commit c2dd547
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 2 deletions.
2 changes: 1 addition & 1 deletion setup.cfg
Expand Up @@ -25,7 +25,7 @@ python_files =
tests.py
addopts =
-ra
--strict
--strict-markers
--ignore=docs/conf.py
--ignore=setup.py
--ignore=ci
Expand Down
2 changes: 1 addition & 1 deletion src/egon/data/datasets/__init__.py
Expand Up @@ -230,7 +230,7 @@ def __post_init__(self):
# Explicitly create single final task, because we can't know
# which of the multiple tasks finishes last.
name = prefix(self)
name = name if name else f"{self.name}."
name = f"{name if name else f'{self.__module__}.'}{self.name}."
update_version = PythonOperator(
task_id=f"{name}update-version",
# Do nothing, because updating will be added later.
Expand Down
45 changes: 45 additions & 0 deletions tests/test_dataset_class.py
@@ -0,0 +1,45 @@
from dataclasses import dataclass
from typing import Union

from airflow.models.dag import DAG

from egon.data.datasets import Dataset, TaskGraph, Tasks


def test_uniqueness_of_automatically_generated_final_dataset_task():
"""Test that the generated final dataset task is named uniquely.
This is a regression test for issue #985. Having multiple `Dataset`s ending
in parallel tasks doesn't work if those `Dataset`s are in a module below
the `egon.data.datasets` package. In that case the code removing the module
name prefix from task ids and the code generating the final dataset task
which updates the dataset version once all parallel tasks have finished
interact in a way that generates non-distinct task ids so that tasks
generated later clobber the ones generated earlier. This leads to spurious
cycles and other inconsistencies and bugs in the graph.
"""

noops = [(lambda: None) for _ in range(4)]
for i, noop in enumerate(noops):
noop.__name__ = f"noop-{i}"

@dataclass
class Dataset_1(Dataset):
name: str = "DS1"
version: str = "0.0.0"
tasks: Union[Tasks, TaskGraph] = ({noops[0], noops[1]},)

@dataclass
class Dataset_2(Dataset):
name: str = "DS2"
version: str = "0.0.0"
tasks: Union[Tasks, TaskGraph] = ({noops[2], noops[3]},)

Dataset_1.__module__ = "egon.data.datasets.test.datasets"
Dataset_2.__module__ = "egon.data.datasets.test.datasets"
with DAG(dag_id="Test-DAG", default_args={"start_date": "1111-11-11"}):
datasets = [Dataset_1(), Dataset_2()]
ids = [list(dataset.tasks)[-1] for dataset in datasets]
assert (
ids[0] != ids[1]
), "Expected unique names for final tasks of distinct datasets."

0 comments on commit c2dd547

Please sign in to comment.