Skip to content

Fixes #27148: Eliminate N+1 is_paused queries in AirflowSource.get_pipelines_list#27149

Open
RajdeepKushwaha5 wants to merge 2 commits intoopen-metadata:mainfrom
RajdeepKushwaha5:fix/airflow-n1-is-paused-get-pipelines-list
Open

Fixes #27148: Eliminate N+1 is_paused queries in AirflowSource.get_pipelines_list#27149
RajdeepKushwaha5 wants to merge 2 commits intoopen-metadata:mainfrom
RajdeepKushwaha5:fix/airflow-n1-is-paused-get-pipelines-list

Conversation

@RajdeepKushwaha5
Copy link
Copy Markdown
Contributor

Describe your changes:

Fixes #27148

AirflowSource.get_pipelines_list was issuing one extra SELECT is_paused FROM dag WHERE dag_id = ? query for every DAG returned by the paginated main query. With N DAGs, the ingestion run performed N+1 DB round-trips where 1 would suffice.

Root cause: The original main SELECT did not include DagModel.is_paused. A separate per-row sub-query was introduced as a workaround.

Fix:

Code path Before After
Airflow 2.x, includeUnDeployedPipelines=True SerializedDagModel queried alone + DagModel per row Added LEFT OUTER JOIN dag on DagModel + is_paused in SELECT
Airflow 2.x, includeUnDeployedPipelines=False select_from(join(SDM, DagModel)) for the filter + DagModel per row Same outerjoin now in base query; select_from(join(...)) removed (redundant)
Airflow 3.x, either setting DagModel already joined for fileloc; is_paused fetched per row DagModel.is_paused added to SELECT — no extra join needed

The inner try/except block that fired self.session.query(DagModel.is_paused).filter(...).scalar() per row is deleted. is_paused is now read from serialized_dag[3] (the new 4th column in all query paths).

The unused join import from sqlalchemy is removed.

How I tested:

  • Syntax verified with ast.parse
  • Manually traced all four query-construction branches ensuring DagModel is in the FROM clause via the appropriate join type before DagModel.is_paused is referenced in the filter

Type of change:

  • Bug fix

Checklist:

  • I have read the CONTRIBUTING document.
  • My PR title is Fixes <issue-number>: Eliminate N+1 is_paused queries in AirflowSource.get_pipelines_list
  • I have commented on my code, particularly in hard-to-understand areas.
  • For JSON Schema changes: I updated the migration scripts or explained why it is not needed.
  • I have added a test that covers the exact scenario we are fixing.

@RajdeepKushwaha5 RajdeepKushwaha5 requested a review from a team as a code owner April 8, 2026 01:05
Copilot AI review requested due to automatic review settings April 8, 2026 01:05
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Eliminates an N+1 query pattern in AirflowSource.get_pipelines_list() by selecting DagModel.is_paused as part of the main paginated query instead of issuing a per-DAG lookup.

Changes:

  • Airflow 2.x path: adds a LEFT OUTER JOIN to DagModel and selects DagModel.is_paused in the main query.
  • Airflow 3.x path: adds DagModel.is_paused to the selected columns (no additional join).
  • Removes the per-row DagModel.is_paused scalar query and drops the unused sqlalchemy.join import.

Comment thread ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py Outdated
Comment thread ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py Outdated
@RajdeepKushwaha5 RajdeepKushwaha5 force-pushed the fix/airflow-n1-is-paused-get-pipelines-list branch from 94ab23d to dea826d Compare April 8, 2026 01:15
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

Comment thread ingestion/tests/unit/topology/pipeline/test_airflow.py Outdated
Copilot AI review requested due to automatic review settings April 8, 2026 04:35
@RajdeepKushwaha5 RajdeepKushwaha5 force-pushed the fix/airflow-n1-is-paused-get-pipelines-list branch from dea826d to 8a5e277 Compare April 8, 2026 04:35
@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

Comment on lines +481 to +501
# Airflow 2.x: fileloc is on SerializedDagModel.
# Always LEFT OUTER JOIN DagModel so we can select is_paused in the
# main query and avoid an extra DB round-trip per DAG (N+1).
session_query = (
self.session.query(
SerializedDagModel.dag_id,
json_data_column,
SerializedDagModel.fileloc,
DagModel.is_paused,
)
.join(
latest_dag_subquery,
and_(
SerializedDagModel.dag_id == latest_dag_subquery.c.dag_id,
timestamp_column == latest_dag_subquery.c.max_timestamp,
),
)
.outerjoin(
DagModel,
SerializedDagModel.dag_id == DagModel.dag_id,
)
Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Airflow 2.x query now always uses a LEFT OUTER JOIN to DagModel, even when includeUnDeployedPipelines is false. Since you then filter on DagModel.is_paused, the outer join becomes effectively an inner join anyway; using an explicit inner join in the includeUnDeployedPipelines=false path would be simpler and may yield a better query plan.

Copilot uses AI. Check for mistakes.
Comment on lines +419 to +432
# Build a mock that chains through any SQLAlchemy query method and returns
# our fake rows on the first .all() call, then [] to stop pagination.
mock_q = MagicMock()
for method in ("join", "outerjoin", "filter", "order_by", "limit", "offset", "group_by"):
getattr(mock_q, method).return_value = mock_q
mock_q.subquery.return_value = MagicMock()
mock_q.all.side_effect = [
[active_row, inactive_row, null_row],
[],
]

mock_session = MagicMock()
mock_session.query.return_value = mock_q

Copy link

Copilot AI Apr 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test intends to ensure there is no per-DAG is_paused lookup, but it doesn't explicitly assert that no additional Session.query()/scalar() calls are made during row iteration. Consider adding assertions like mock_session.query.call_count == 2 (subquery + main query) and/or mock_q.scalar.assert_not_called() to make the regression protection explicit and more robust.

Copilot uses AI. Check for mistakes.
Fixes the Python Checkstyle CI failure by applying black's line-length
formatting to the mock method tuple. Also addresses Copilot review feedback
by adding explicit regression guards:
  - mock_q.scalar.assert_not_called() - proves no per-row scalar lookup
  - mock_session.query.call_count == 2 - proves only subquery + main query
@github-actions
Copy link
Copy Markdown
Contributor

Hi there 👋 Thanks for your contribution!

The OpenMetadata team will review the PR shortly! Once it has been labeled as safe to test, the CI workflows
will start executing and we'll be able to make sure everything is working as expected.

Let us know if you need any help!

@gitar-bot
Copy link
Copy Markdown

gitar-bot bot commented Apr 17, 2026

Code Review ✅ Approved 2 resolved / 2 findings

Eliminates N+1 is_paused queries in AirflowSource.get_pipelines_list by ensuring test compatibility with read-only properties and correctly handling undeployed DAGs during INNER JOIN operations.

✅ 2 resolved
Bug: Test sets read-only property; will raise AttributeError

📄 ingestion/tests/unit/topology/pipeline/test_airflow.py:433-438 📄 ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py:223-231
The new test test_get_pipelines_list_derives_state_from_row assigns to self.airflow.session (line 434, 438), but session is a read-only @property with no setter (metadata.py:223-231). This will raise AttributeError: can't set attribute at runtime, meaning the test always fails.

The test should set the backing _session attribute instead.

Edge Case: Airflow 3.x uses INNER JOIN; undeployed DAGs silently dropped

📄 ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py:520-523
In the Airflow 3.x branch (line 520-523), DagModel is joined with an INNER JOIN (.join()). If includeUnDeployedPipelines=True is set, DAGs present in serialized_dag but not yet in the dag table would be silently excluded. The Airflow 2.x branch correctly uses .outerjoin() for this scenario.

This is a pre-existing behavior (the join was already there for fileloc), but now that is_paused also depends on it, it's worth noting. Consider switching to .outerjoin() for consistency with the 2.x path.

Options

Display: compact → Showing less information.

Comment with these commands to change:

Compact
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Airflow ingestion fires N extra DB queries (one per DAG) to fetch is_paused inside get_pipelines_list

2 participants