Skip to content

Commit

Permalink
Merge unique ids for generated tasks into dev
Browse files Browse the repository at this point in the history
Fixes #985.
  • Loading branch information
gnn committed Jan 24, 2023
2 parents 0f6fd2e + eafaea3 commit 5f0f521
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -677,9 +677,13 @@ Bug Fixes
* Fix aggregation of DSM-components
`#1058 <https://github.com/openego/eGon-data/issues/1069>`_
* Fix URL of TYNDP scenario dataset
* Automatically generated tasks now get unique :code:`task_id`\s.
Fixes issue `#985`_ via PR `#986`_.

.. _PR #692: https://github.com/openego/eGon-data/pull/692
.. _#343: https://github.com/openego/eGon-data/issues/343
.. _#556: https://github.com/openego/eGon-data/issues/556
.. _#641: https://github.com/openego/eGon-data/issues/641
.. _#669: https://github.com/openego/eGon-data/issues/669
.. _#985: https://github.com/openego/eGon-data/issues/985
.. _#986: https://github.com/openego/eGon-data/pull/986
2 changes: 1 addition & 1 deletion setup.cfg
Expand Up @@ -25,7 +25,7 @@ python_files =
tests.py
addopts =
-ra
--strict
--strict-markers
--ignore=docs/conf.py
--ignore=setup.py
--ignore=ci
Expand Down
2 changes: 1 addition & 1 deletion src/egon/data/datasets/__init__.py
Expand Up @@ -230,7 +230,7 @@ def __post_init__(self):
# Explicitly create single final task, because we can't know
# which of the multiple tasks finishes last.
name = prefix(self)
name = name if name else f"{self.name}."
name = f"{name if name else f'{self.__module__}.'}{self.name}."
update_version = PythonOperator(
task_id=f"{name}update-version",
# Do nothing, because updating will be added later.
Expand Down
45 changes: 45 additions & 0 deletions tests/test_dataset_class.py
@@ -0,0 +1,45 @@
from dataclasses import dataclass
from typing import Union

from airflow.models.dag import DAG

from egon.data.datasets import Dataset, TaskGraph, Tasks


def test_uniqueness_of_automatically_generated_final_dataset_task():
"""Test that the generated final dataset task is named uniquely.
This is a regression test for issue #985. Having multiple `Dataset`s ending
in parallel tasks doesn't work if those `Dataset`s are in a module below
the `egon.data.datasets` package. In that case the code removing the module
name prefix from task ids and the code generating the final dataset task
which updates the dataset version once all parallel tasks have finished
interact in a way that generates non-distinct task ids so that tasks
generated later clobber the ones generated earlier. This leads to spurious
cycles and other inconsistencies and bugs in the graph.
"""

noops = [(lambda: None) for _ in range(4)]
for i, noop in enumerate(noops):
noop.__name__ = f"noop-{i}"

@dataclass
class Dataset_1(Dataset):
name: str = "DS1"
version: str = "0.0.0"
tasks: Union[Tasks, TaskGraph] = ({noops[0], noops[1]},)

@dataclass
class Dataset_2(Dataset):
name: str = "DS2"
version: str = "0.0.0"
tasks: Union[Tasks, TaskGraph] = ({noops[2], noops[3]},)

Dataset_1.__module__ = "egon.data.datasets.test.datasets"
Dataset_2.__module__ = "egon.data.datasets.test.datasets"
with DAG(dag_id="Test-DAG", default_args={"start_date": "1111-11-11"}):
datasets = [Dataset_1(), Dataset_2()]
ids = [list(dataset.tasks)[-1] for dataset in datasets]
assert (
ids[0] != ids[1]
), "Expected unique names for final tasks of distinct datasets."

0 comments on commit 5f0f521

Please sign in to comment.