From 1fac5bdeb5d7ba76cc29b0530f46ea0436067677 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Tue, 11 Apr 2023 18:54:35 +0300 Subject: [PATCH 1/3] raise exception if aquire session from stopped pool --- tests/aio/test_table.py | 12 ++++++++++++ tests/table/table_test.py | 10 ++++++++++ ydb/_sp_impl.py | 4 ++++ ydb/aio/table.py | 2 +- 4 files changed, 27 insertions(+), 1 deletion(-) create mode 100644 tests/aio/test_table.py diff --git a/tests/aio/test_table.py b/tests/aio/test_table.py new file mode 100644 index 00000000..67d19dd9 --- /dev/null +++ b/tests/aio/test_table.py @@ -0,0 +1,12 @@ +import pytest +import ydb.aio + + +@pytest.mark.asyncio +class TestSessionPool: + async def test_checkout_from_stopped_pool(self, driver): + pool = ydb.aio.SessionPool(driver, 1) + await pool.stop() + + with pytest.raises(ValueError): + await pool.acquire() diff --git a/tests/table/table_test.py b/tests/table/table_test.py index 97bdcc88..9f81a37a 100644 --- a/tests/table/table_test.py +++ b/tests/table/table_test.py @@ -1,6 +1,16 @@ +import pytest import ydb +class TestSessionPool: + def test_checkout_from_stopped_pool(self, driver_sync): + pool = ydb.SessionPool(driver_sync, 1) + pool.stop() + + with pytest.raises(ValueError): + pool.acquire() + + class TestTable: def test_create_table_with_not_null_primary_key_by_api(self, driver_sync, database): table_path = database + "/test_table" diff --git a/ydb/_sp_impl.py b/ydb/_sp_impl.py index 6c305b80..455f1d07 100644 --- a/ydb/_sp_impl.py +++ b/ydb/_sp_impl.py @@ -335,6 +335,10 @@ def _on_keep_alive(self, session, f): self._destroy(session, "keep-alive-error") def acquire(self, blocking=True, timeout=None): + if self._should_stop.is_set(): + self._logger.debug("Acquired not inited session") + raise ValueError("Take session from closed session pool.") + waiter = self.subscribe() has_result = False if blocking: diff --git a/ydb/aio/table.py b/ydb/aio/table.py index 01786db9..eed02020 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -342,7 +342,7 @@ async def acquire(self, timeout: float = None, retry_timeout: float = None, retr if self._should_stop.is_set(): self._logger.debug("Acquired not inited session") - return self._create() + raise ValueError("Take session from closed session pool.") if retry_timeout is None: retry_timeout = timeout From f1e382610cdcb46bb4f6a1be7dc56208c240cfd4 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 13 Apr 2023 19:00:11 +0300 Subject: [PATCH 2/3] Fix tests --- tests/aio/test_session_pool.py | 16 +++++++------- tests/session_pool.py | 40 ++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 8 deletions(-) create mode 100644 tests/session_pool.py diff --git a/tests/aio/test_session_pool.py b/tests/aio/test_session_pool.py index 0fa3d8f5..c2875ba3 100644 --- a/tests/aio/test_session_pool.py +++ b/tests/aio/test_session_pool.py @@ -62,14 +62,14 @@ async def test_close_basic_logic_case_1(driver): waiter = asyncio.ensure_future(pool.acquire()) await pool.stop() - waiter_sess = waiter.result() - assert not waiter_sess.initialized() - after_stop = await pool.acquire() - assert not after_stop.initialized() + + with pytest.raises(ValueError): + waiter.result() + + with pytest.raises(ValueError): + await pool.acquire() await pool.release(s) - await pool.release(after_stop) - await pool.release(waiter_sess) assert pool._active_count == 0 @@ -106,9 +106,9 @@ async def test_close_basic_logic_case_2(driver): assert pool._active_count == 0 - sess = await pool.acquire() + with pytest.raises(ValueError): + await pool.acquire() - assert not sess.initialized() await pool.stop() diff --git a/tests/session_pool.py b/tests/session_pool.py new file mode 100644 index 00000000..4398b803 --- /dev/null +++ b/tests/session_pool.py @@ -0,0 +1,40 @@ +import pytest + +import ydb + + +def test_close_basic_logic_case_1(driver_sync): + pool = ydb.SessionPool(driver_sync, 1) + s = pool.acquire() + + pool.stop() + + with pytest.raises(ValueError): + pool.acquire() + + pool.release(s) + assert pool._pool_impl._active_count == 0 + + +def test_close_basic_logic_case_2(driver_sync): + pool = ydb.SessionPool(driver_sync, 10) + acquired = [] + + for _ in range(10): + acquired.append(pool.acquire()) + + for _ in range(3): + pool.release(acquired.pop(-1)) + + pool.stop() + assert pool._pool_impl._active_count == 7 + + while acquired: + pool.release(acquired.pop(-1)) + + assert pool._pool_impl._active_count == 0 + + with pytest.raises(ValueError): + pool.acquire() + + pool.stop() From 4216727b3dbb6979e4fa2dc1e234d604045dea58 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Fri, 14 Apr 2023 18:02:56 +0300 Subject: [PATCH 3/3] fix log message --- ydb/_sp_impl.py | 2 +- ydb/aio/table.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/ydb/_sp_impl.py b/ydb/_sp_impl.py index 455f1d07..dfea89f9 100644 --- a/ydb/_sp_impl.py +++ b/ydb/_sp_impl.py @@ -336,7 +336,7 @@ def _on_keep_alive(self, session, f): def acquire(self, blocking=True, timeout=None): if self._should_stop.is_set(): - self._logger.debug("Acquired not inited session") + self._logger.error("Take session from closed session pool") raise ValueError("Take session from closed session pool.") waiter = self.subscribe() diff --git a/ydb/aio/table.py b/ydb/aio/table.py index eed02020..2a33cf78 100644 --- a/ydb/aio/table.py +++ b/ydb/aio/table.py @@ -341,7 +341,7 @@ async def _get_session_from_queue(self, timeout: float): async def acquire(self, timeout: float = None, retry_timeout: float = None, retry_num: int = None) -> ydb.ISession: if self._should_stop.is_set(): - self._logger.debug("Acquired not inited session") + self._logger.error("Take session from closed session pool") raise ValueError("Take session from closed session pool.") if retry_timeout is None: