refactor: select in rag_pipeline#34495
Conversation
Pyrefly DiffNo changes detected. |
Pyrefly DiffNo changes detected. |
There was a problem hiding this comment.
Pull request overview
This PR migrates ORM usage in the RAG pipeline service from SQLAlchemy 1.x Query patterns to SQLAlchemy 2.x select()/Session.scalar()/Session.scalars() patterns to improve typing and modernize query style (part of #22668).
Changes:
- Refactor multiple
db.session.query(...)call sites inrag_pipeline.pytoselect()-style statements, includingget()for PK lookups andselect(func.count(...))for counts. - Replace
.first()and.count()patterns withdb.session.scalar(...)anddb.session.scalars(...).all()equivalents. - Update unit test mocks in
test_rag_pipeline_service.pyto match the new session APIs.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 11 comments.
| File | Description |
|---|---|
| api/services/rag_pipeline/rag_pipeline.py | Replaces legacy Query usage with SQLAlchemy 2.x select()/scalar()/scalars() and session.get() patterns in the RAG pipeline service. |
| api/tests/unit_tests/services/rag_pipeline/test_rag_pipeline_service.py | Adjusts mocks/patches to align with the refactored scalar() / get() / scalars() access patterns. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -1656,8 +1602,7 @@ def _event_generator(): | |||
|
|
|||
| document = SimpleNamespace(indexing_status="waiting", error=None) | |||
| query = mocker.Mock() | |||
There was a problem hiding this comment.
Unused query mock left over after switching to db.session.get(...); remove it to avoid confusing test setup and to keep mocks minimal.
| query = mocker.Mock() |
|
|
||
|
|
||
| def test_get_pipeline_raises_when_dataset_missing(mocker, rag_pipeline_service) -> None: | ||
| query = mocker.Mock() |
There was a problem hiding this comment.
query = mocker.Mock() is now unused after patching db.session.scalar; remove the unused mock to keep the test focused and reduce noise.
| @@ -1742,8 +1682,7 @@ def test_get_pipeline_raises_when_dataset_missing(mocker, rag_pipeline_service) | |||
| def test_get_pipeline_raises_when_pipeline_missing(mocker, rag_pipeline_service) -> None: | |||
| dataset = SimpleNamespace(pipeline_id="p1") | |||
| query = mocker.Mock() | |||
There was a problem hiding this comment.
query = mocker.Mock() is unused in this test after migrating away from session.query(...); it can be deleted.
| @@ -1783,8 +1722,7 @@ def test_get_pipeline_templates_builtin_en_us_no_fallback(mocker) -> None: | |||
| def test_update_customized_pipeline_template_commits_when_name_empty(mocker) -> None: | |||
| template = SimpleNamespace(name="old", description="old", icon={}, updated_by=None) | |||
| query = mocker.Mock() | |||
There was a problem hiding this comment.
Unused query mock: the test patches db.session.scalar directly now, so this variable can be removed.
| @@ -2011,8 +1949,7 @@ def test_run_free_workflow_node_delegates_to_handle_result(mocker, rag_pipeline_ | |||
| def test_publish_customized_pipeline_template_raises_when_workflow_missing(mocker, rag_pipeline_service) -> None: | |||
| pipeline = SimpleNamespace(id="p1", tenant_id="t1", workflow_id="wf-1") | |||
| query = mocker.Mock() | |||
There was a problem hiding this comment.
query = mocker.Mock() is unused after switching this test to patch db.session.get; remove it to avoid dead code in the test.
| @@ -2069,8 +2001,8 @@ def test_retry_error_document_raises_when_workflow_missing(mocker, rag_pipeline_ | |||
| exec_log = SimpleNamespace(pipeline_id="p1") | |||
| pipeline = SimpleNamespace(id="p1") | |||
| query = mocker.Mock() | |||
There was a problem hiding this comment.
Unused query mock in this test after refactor to db.session.scalar/get; please remove it.
| @@ -2086,8 +2018,7 @@ def test_get_datasource_plugins_returns_empty_for_non_datasource_nodes(mocker, r | |||
| graph_dict={"nodes": [{"id": "n1", "data": {"type": "start"}}]}, rag_pipeline_variables=[] | |||
| ) | |||
| query = mocker.Mock() | |||
There was a problem hiding this comment.
query = mocker.Mock() is unused here now that the test patches db.session.scalar directly; remove to reduce unnecessary setup.
| query = mocker.Mock() |
| @@ -2250,8 +2181,7 @@ def test_get_datasource_plugins_handles_empty_datasource_data_and_non_published( | |||
| rag_pipeline_variables=[{"variable": "v1", "belong_to_node_id": "shared"}], | |||
| ) | |||
| query = mocker.Mock() | |||
There was a problem hiding this comment.
Unused query mock remains after switching this test to patch db.session.scalar; remove it to keep the test setup minimal.
| @@ -2291,8 +2221,7 @@ def test_get_datasource_plugins_extracts_user_inputs_and_credentials(mocker, rag | |||
| ], | |||
| ) | |||
| query = mocker.Mock() | |||
There was a problem hiding this comment.
Unused query mock: this test now patches db.session.scalar directly, so the query variable can be deleted.
| @@ -2310,8 +2239,7 @@ def test_get_pipeline_returns_pipeline_when_found(mocker, rag_pipeline_service) | |||
| dataset = SimpleNamespace(pipeline_id="p1") | |||
| pipeline = SimpleNamespace(id="p1") | |||
| query = mocker.Mock() | |||
There was a problem hiding this comment.
Unused query mock in this test after migrating to db.session.scalar mocking; please remove it.
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Summary
db.session.query()calls to SQLAlchemy 2.xselect()style inrag_pipeline/rag_pipeline.py.where().first()withdb.session.scalar(select().where().limit(1))session.get()for primary key lookups onPipeline,Workflow, andDocument.count()withdb.session.scalar(select(func.count())).query(func.max()).scalar()withdb.session.scalar(select(func.max())).query().all()withdb.session.scalars(stmt).all()test_rag_pipeline_service.pyto match new patterns (unavoidable)Test plan
Part of #22668