fix(ingestion): release Engine resources on database switch (#27625)#27627
fix(ingestion): release Engine resources on database switch (#27625)#27627
Conversation
* fix(ingestion): release engine resources on database switch Close fairies in _connection_map and clear _inspector_map before engine.dispose() in CommonDbSourceService.set_inspector/close. Dispose alone does not free Inspector.info_cache or release checked-out ConnectionFairies, leaving the old engine GC-pinned across DB switches and triggering _finalize_fairy RecursionError at interpreter shutdown. Eagerly fetch multi-DB name queries (MultiDBSource._execute_database_query and SnowflakeSource.get_database_names_raw) so the cursor closes before the caller invokes set_inspector, which disposes the engine the cursor was bound to. Also rebind scoped_session to the new engine so it doesn't keep the disposed one alive via sessionmaker.bind. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * py format * fix(ingestion): address PR review feedback from gitar-bot and Copilot - Set self.engine = None after dispose in _release_engine (gitar-bot): prevents close() from leaving a dangling disposed-engine reference that would produce a confusing pool error on accidental later access. - _FakeSource now has close() and is wrapped in a fixture that cleans up its checked-out connection (Copilot #1): avoids resource warnings and an interfering fairy across test teardown. - Rewrite test_generator_survives_engine_dispose_mid_iteration as test_generator_survives_connection_close_mid_iteration (Copilot #2): Engine.dispose() does not close checked-out connections, so the old test did not reproduce what _release_engine actually does. The real regression is the explicit conn.close() on the fairy in _connection_map before dispose. The new test closes the connection mid-iteration, which is what fetchall() needs to survive. - Switch the query in _FakeSource.get_database_names_raw and the seeded INSERT assertions to the TEXT name column (Copilot #3): _execute_database_query is typed Iterable[str]; testing on integer ids obscured the actual contract. - Update test_disposes_pool to assert surrogate.engine is None after release (follows from the new self.engine = None behavior) and verify the original pool's checkedout() is 0. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| except Exception as exc: # pylint: disable=broad-except | ||
| logger.warning(f"Failed to dispose engine: {exc}") |
There was a problem hiding this comment.
💡 Quality: _release_engine silently swallows dispose failure details
In _release_engine, the engine.dispose() failure is logged at warning level with only the exception message (f"Failed to dispose engine: {exc}"), while all other failures in the method use logger.debug(…, exc_info=True) to capture the full traceback. For consistency and debuggability, consider adding exc_info=True to the dispose warning as well, since a dispose failure is the most important one to diagnose.
Suggested fix:
except Exception as exc: # pylint: disable=broad-except
logger.warning(f"Failed to dispose engine: {exc}", exc_info=True)
Was this helpful? React with 👍 / 👎 | Reply gitar fix to apply this suggestion
There was a problem hiding this comment.
Pull request overview
This PR addresses SQLAlchemy resource leaks during multi-database ingestion by explicitly releasing checked-out connections/inspectors and rebinding sessions when switching databases, and by avoiding streaming DB-name cursors across engine disposal.
Changes:
- Replace
kill_active_connections()usage with a newCommonDbSourceService._release_engine()that closes all tracked connections, clears inspector/session state, and disposes the engine on database switch/close. - Make DB-name queries eager by calling
.fetchall()inMultiDBSource._execute_database_query()andSnowflakeSource.get_database_names_raw()to avoid invalidated cursors during engine disposal. - Add unit/acceptance tests covering engine release behavior, GC eligibility, and eager-fetch semantics.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| ingestion/src/metadata/ingestion/source/database/common_db_source.py | Adds _release_engine() and uses it from set_inspector()/close(); rebinds session on engine switch. |
| ingestion/src/metadata/ingestion/source/database/multi_db_source.py | Eagerly buffers DB-name query results via .fetchall(). |
| ingestion/src/metadata/ingestion/source/database/snowflake/metadata.py | Uses .fetchall() for Snowflake DB-name enumeration to prevent cursor invalidation. |
| ingestion/tests/unit/topology/database/test_common_db_source.py | Adds tests for _release_engine() cleanup, idempotency, and GC eligibility + eager-fetch generator safety. |
| ingestion/tests/unit/topology/database/test_snowflake.py | Adds tests asserting Snowflake DB-name enumeration calls .fetchall() once and yields in order. |
| self._release_engine() | ||
| logger.info(f"Ingesting from database: {database_name}") | ||
|
|
||
| new_service_connection = deepcopy(self.service_connection) | ||
| new_service_connection.database = database_name | ||
| self.engine = get_connection(new_service_connection) | ||
| self.session = create_and_bind_thread_safe_session(self.engine) | ||
|
|
||
| self._connection_map = {} # Lazy init as well | ||
| def _release_engine(self) -> None: | ||
| # Close fairies first so _ConnectionRecord drops its pool reference; | ||
| # dispose alone leaves them orphaned and causes _finalize_fairy | ||
| # RecursionErrors at GC time. Clearing _inspector_map is what | ||
| # actually frees Inspector.info_cache — dispose() does not. | ||
| if getattr(self, "engine", None) is None: | ||
| return | ||
| for conn in self._connection_map.values(): | ||
| try: | ||
| conn.close() | ||
| except Exception: # pylint: disable=broad-except | ||
| logger.debug("Connection already closed", exc_info=True) | ||
| self._connection_map = {} | ||
| self._inspector_map = {} | ||
| session = getattr(self, "session", None) | ||
| if session is not None: | ||
| try: | ||
| session.remove() | ||
| except Exception: # pylint: disable=broad-except | ||
| logger.debug("Session cleanup failed", exc_info=True) | ||
| self.session = None | ||
| try: | ||
| self.engine.dispose() | ||
| except Exception as exc: # pylint: disable=broad-except | ||
| logger.warning(f"Failed to dispose engine: {exc}") | ||
| self.engine = None |
There was a problem hiding this comment.
_release_engine() disposes and nulls self.engine, but CommonDbSourceService.__init__ stores the initial engine in self.connection_obj and it is never updated/cleared on database switches. That strong reference can keep the old Engine (and its pool/Inspector caches) alive even after _release_engine(), undermining the intended leak fix. Consider clearing self.connection_obj in _release_engine() (or before/after dispose) and rebinding it to the new engine in set_inspector() to keep it consistent with self.engine.
🟡 Playwright Results — all passed (19 flaky)✅ 3692 passed · ❌ 0 failed · 🟡 19 flaky · ⏭️ 89 skipped
🟡 19 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |
…tches self.connection_obj is set once in __init__ to the initial engine and never updated. After set_inspector rebuilds self.engine, connection_obj still points at the disposed original engine — pinning its dialect and compiled_cache alive for the source's lifetime. Rebind connection_obj when creating the new engine in set_inspector, and clear it in _release_engine so close() leaves nothing dangling. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Code Review 👍 Approved with suggestions 0 resolved / 1 findingsReleases Engine resources during database switches to prevent memory leaks. Consider logging the underlying exception in _release_engine instead of swallowing dispose failures. 💡 Quality: _release_engine silently swallows dispose failure details📄 ingestion/src/metadata/ingestion/source/database/common_db_source.py:185-186 In Suggested fix🤖 Prompt for agentsOptionsDisplay: compact → Showing less information. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|
…27627) * fix(ingestion): release Engine resources on database switch (#27625) * fix(ingestion): release engine resources on database switch Close fairies in _connection_map and clear _inspector_map before engine.dispose() in CommonDbSourceService.set_inspector/close. Dispose alone does not free Inspector.info_cache or release checked-out ConnectionFairies, leaving the old engine GC-pinned across DB switches and triggering _finalize_fairy RecursionError at interpreter shutdown. Eagerly fetch multi-DB name queries (MultiDBSource._execute_database_query and SnowflakeSource.get_database_names_raw) so the cursor closes before the caller invokes set_inspector, which disposes the engine the cursor was bound to. Also rebind scoped_session to the new engine so it doesn't keep the disposed one alive via sessionmaker.bind. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * py format * fix(ingestion): address PR review feedback from gitar-bot and Copilot - Set self.engine = None after dispose in _release_engine (gitar-bot): prevents close() from leaving a dangling disposed-engine reference that would produce a confusing pool error on accidental later access. - _FakeSource now has close() and is wrapped in a fixture that cleans up its checked-out connection (Copilot #1): avoids resource warnings and an interfering fairy across test teardown. - Rewrite test_generator_survives_engine_dispose_mid_iteration as test_generator_survives_connection_close_mid_iteration (Copilot #2): Engine.dispose() does not close checked-out connections, so the old test did not reproduce what _release_engine actually does. The real regression is the explicit conn.close() on the fairy in _connection_map before dispose. The new test closes the connection mid-iteration, which is what fetchall() needs to survive. - Switch the query in _FakeSource.get_database_names_raw and the seeded INSERT assertions to the TEXT name column (Copilot #3): _execute_database_query is typed Iterable[str]; testing on integer ids obscured the actual contract. - Update test_disposes_pool to assert surrogate.engine is None after release (follows from the new self.engine = None behavior) and verify the original pool's checkedout() is 0. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(ingestion): keep connection_obj in sync with engine across DB switches self.connection_obj is set once in __init__ to the initial engine and never updated. After set_inspector rebuilds self.engine, connection_obj still points at the disposed original engine — pinning its dialect and compiled_cache alive for the source's lifetime. Rebind connection_obj when creating the new engine in set_inspector, and clear it in _release_engine so close() leaves nothing dangling. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…27627) * fix(ingestion): release Engine resources on database switch (#27625) * fix(ingestion): release engine resources on database switch Close fairies in _connection_map and clear _inspector_map before engine.dispose() in CommonDbSourceService.set_inspector/close. Dispose alone does not free Inspector.info_cache or release checked-out ConnectionFairies, leaving the old engine GC-pinned across DB switches and triggering _finalize_fairy RecursionError at interpreter shutdown. Eagerly fetch multi-DB name queries (MultiDBSource._execute_database_query and SnowflakeSource.get_database_names_raw) so the cursor closes before the caller invokes set_inspector, which disposes the engine the cursor was bound to. Also rebind scoped_session to the new engine so it doesn't keep the disposed one alive via sessionmaker.bind. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * py format * fix(ingestion): address PR review feedback from gitar-bot and Copilot - Set self.engine = None after dispose in _release_engine (gitar-bot): prevents close() from leaving a dangling disposed-engine reference that would produce a confusing pool error on accidental later access. - _FakeSource now has close() and is wrapped in a fixture that cleans up its checked-out connection (Copilot #1): avoids resource warnings and an interfering fairy across test teardown. - Rewrite test_generator_survives_engine_dispose_mid_iteration as test_generator_survives_connection_close_mid_iteration (Copilot #2): Engine.dispose() does not close checked-out connections, so the old test did not reproduce what _release_engine actually does. The real regression is the explicit conn.close() on the fairy in _connection_map before dispose. The new test closes the connection mid-iteration, which is what fetchall() needs to survive. - Switch the query in _FakeSource.get_database_names_raw and the seeded INSERT assertions to the TEXT name column (Copilot #3): _execute_database_query is typed Iterable[str]; testing on integer ids obscured the actual contract. - Update test_disposes_pool to assert surrogate.engine is None after release (follows from the new self.engine = None behavior) and verify the original pool's checkedout() is 0. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(ingestion): keep connection_obj in sync with engine across DB switches self.connection_obj is set once in __init__ to the initial engine and never updated. After set_inspector rebuilds self.engine, connection_obj still points at the disposed original engine — pinning its dialect and compiled_cache alive for the source's lifetime. Rebind connection_obj when creating the new engine in set_inspector, and clear it in _release_engine so close() leaves nothing dangling. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>



Summary
CommonDbSourceService.set_inspector/closenow close every checked-out connection in_connection_mapand clear_inspector_mapbefore disposing the engine.engine.dispose()alone does not releaseConnectionFairyobjects or freeInspector.info_cache— prior code dropped the map refs without closing, leaving the old engine pinned via_ConnectionRecord→ pool → engine. Across many databases this leaks tens of MB per switch (largeinfo_cacheof reflection results) and at interpreter shutdown the orphaned fairies triggerRecursionErrorin_finalize_fairyreturning to a disposed pool's Condition lock.MultiDBSource._execute_database_queryandSnowflakeSource.get_database_names_rawnow eagerly.fetchall()instead of streaming. Lazy iteration kept a cursor alive acrossset_inspectorcalls; sinceset_inspectordisposes the engine the cursor is bound to, the cursor would have been invalidated when we started closing connections. Fetching the DB-name list up front (a small finite set) closes the cursor before any close can happen.scoped_sessionis rebound to the new engine on eachset_inspector— otherwisesessionmaker.bindretains the disposed engine.Test plan
ingestion/tests/unit/topology/database/test_common_db_source.pyandtest_snowflake.py— 12 tests, all pass (39 total in the two files, 0 regressions).test_old_engine_becomes_gc_eligible_after_releasecreates a real SQLAlchemy engine, stashes a fairy in_connection_map, takes aweakref, calls_release_engine, drops strong refs, runsgc.collect(), asserts the weakref is dead. This fails against the priorkill_active_connections-only code path and passes with the fix — the direct regression guard._release_enginecovered against a real in-memory SQLite engine: closes every map entry (including entries keyed by arbitrary worker thread ids), clears the inspector map, removes the session, disposes the pool, idempotent whenengine is None, tolerates already-closed connections..fetchall()behavior validated:test_generator_survives_engine_dispose_mid_iterationadvances the generator, disposes the engine, confirms remaining rows still yield — i.e. the cursor is no longer live at the pointset_inspectordisposes the engine.self.connectionand asserting.fetchall()called exactly once with results yielded in order.Exception ignored in: <function _ConnectionRecord.checkout.<locals>.<lambda>>stderr warning at shutdown no longer appears.Related symptom
A production ingestion against a 39-database Snowflake account exhibited container memory growing from ~1 GiB to ~4 GiB (pod memory limit, OOMKill territory) across a single run, followed by
RecursionError: maximum recursion depth exceededat interpreter shutdown insqlalchemy/pool/base.py:_finalize_fairyandsnowflake/connector/vendored/urllib3/connectionpool.py:_close_pool_connections. Both tracebacks bottom out inthreading.Condition/RLockacquisition — the signature of weakref finalizers firing on pools that had been disposed while their fairies were still in flight. This PR eliminates the SQLAlchemy-side path by closing fairies explicitly before dispose.🤖 Generated with Claude Code