Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error using airflow x ODD: insert or update on table "data_entity_task_run" violates foreign key constraint "data_entity_task_run_task_oddrn_fk" #8

Closed
thiagopirex opened this issue Aug 2, 2023 · 4 comments
Assignees

Comments

@thiagopirex
Copy link

I configured my airflow to use ODD Plataform, setting the "odd http" connection with url, port and password (got from collector token).

After running the dag, the ODD Plataform raise some erros in log: odd_plataform.log

DAG code: dag_example.txt

Airflow version: 2.6.3

@RamanDamayeu
Copy link

Hi! Unfortunately, the problem is that the task entities themselves are created when this hook is called: on_dag_run_running, that is, at the moment the dag is launched, information about the tasks is also sent to ODD platform. But information is ingested only for the tasks that have "inlets" or "outlets" attribute not empty (these are old concepts in Airflow for lineage tracking and here this concept is reused).

The inconsistency arises due to the fact that later the check for the presence of inlets or outlets does not work at the start of the tasks (to create task run entry in ODD). That is, we did not create a task, but we are trying to create a run for it.

In general, we need to change the logic a bit and we'll do it. In the meantime, workaround is to add a list with empty string for inlets or outlets to the task:

    test_task = BashOperator(
        task_id="test_task",
        bash_command=command,
        inlets=[''],
    )

Btw, inlets in that case are designed to list ODDRNs of Datasets that are considered to be inputs for the task. And outlets, in turn, are outputs. That way we could build a lineage for the task. At the moment there is no automation to create inlets and outlets so we have to mention them manually for each task. Of course, this attributes are templates so we could utilize full power of templating.

We'll also add more information about inlets and outlets to the readme file to reduce confusion!

@RamanDamayeu
Copy link

#10

@RamanDamayeu
Copy link

@ValeriyWorld could I please ask you to check if inlets/outlets are in the list of template_fields, I suspect that they are not anymore templated by default...

@ValeriyWorld
Copy link

@RamanDamayeu
As for now (tested with older apache-airflow==2.5.3, and newer apache-airflow==2.7.3), inlets and outlets are not templated by default. Moreover if we are trying to inherit from BaseOperator and create custom one with inlets/outlets included in template_fields we will not get the successful result.
For example, I created simple custom operator:

class CustomPythonOperator(BaseOperator):
    template_fields = ("inlets", "outlets",)

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)

    def execute(self, context):
        message = f"Hello World!"
        print(message)
        return message

Afterwards I've tried to create a task using CustomPythonOperator, but in airflow UI I got an error:

Broken DAG: [/opt/airflow/dags/test_dags/test_issue.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 829, in serialize_operator
    return cls._serialize_node(op, include_deps=op.deps is not BaseOperator.deps)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 860, in _serialize_node
    raise AirflowException(f"Cannot template BaseOperator fields: {template_field}")
airflow.exceptions.AirflowException: Cannot template BaseOperator fields: inlets

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1401, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 1316, in serialize_dag
    raise SerializationError(f"Failed to serialize DAG {dag.dag_id!r}: {e}")
airflow.exceptions.SerializationError: Failed to serialize DAG 'test_issue': Cannot template BaseOperator fields: inlets

The conclusion: inlets and outlets can not be included in template_fields.

I guess that certain fields, including inlets and outlets, cannot be templated directly in Apache Airflow because these fields are meant to be set during the rendering phase, which occurs before the task is executed. Inlets and outlets are special attributes in Airflow, and they are processed during task instantiation, not during execution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants