Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions tests/aio/test_session_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ async def test_close_basic_logic_case_1(driver):
waiter = asyncio.ensure_future(pool.acquire())

await pool.stop()
waiter_sess = waiter.result()
assert not waiter_sess.initialized()
after_stop = await pool.acquire()
assert not after_stop.initialized()

with pytest.raises(ValueError):
waiter.result()

with pytest.raises(ValueError):
await pool.acquire()

await pool.release(s)
await pool.release(after_stop)
await pool.release(waiter_sess)
assert pool._active_count == 0


Expand Down Expand Up @@ -106,9 +106,9 @@ async def test_close_basic_logic_case_2(driver):

assert pool._active_count == 0

sess = await pool.acquire()
with pytest.raises(ValueError):
await pool.acquire()

assert not sess.initialized()
await pool.stop()


Expand Down
12 changes: 12 additions & 0 deletions tests/aio/test_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import pytest
import ydb.aio


@pytest.mark.asyncio
class TestSessionPool:
async def test_checkout_from_stopped_pool(self, driver):
pool = ydb.aio.SessionPool(driver, 1)
await pool.stop()

with pytest.raises(ValueError):
await pool.acquire()
40 changes: 40 additions & 0 deletions tests/session_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pytest

import ydb


def test_close_basic_logic_case_1(driver_sync):
pool = ydb.SessionPool(driver_sync, 1)
s = pool.acquire()

pool.stop()

with pytest.raises(ValueError):
pool.acquire()

pool.release(s)
assert pool._pool_impl._active_count == 0


def test_close_basic_logic_case_2(driver_sync):
pool = ydb.SessionPool(driver_sync, 10)
acquired = []

for _ in range(10):
acquired.append(pool.acquire())

for _ in range(3):
pool.release(acquired.pop(-1))

pool.stop()
assert pool._pool_impl._active_count == 7

while acquired:
pool.release(acquired.pop(-1))

assert pool._pool_impl._active_count == 0

with pytest.raises(ValueError):
pool.acquire()

pool.stop()
10 changes: 10 additions & 0 deletions tests/table/table_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
import pytest
import ydb


class TestSessionPool:
def test_checkout_from_stopped_pool(self, driver_sync):
pool = ydb.SessionPool(driver_sync, 1)
pool.stop()

with pytest.raises(ValueError):
pool.acquire()


class TestTable:
def test_create_table_with_not_null_primary_key_by_api(self, driver_sync, database):
table_path = database + "/test_table"
Expand Down
4 changes: 4 additions & 0 deletions ydb/_sp_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ def _on_keep_alive(self, session, f):
self._destroy(session, "keep-alive-error")

def acquire(self, blocking=True, timeout=None):
if self._should_stop.is_set():
self._logger.error("Take session from closed session pool")
raise ValueError("Take session from closed session pool.")

waiter = self.subscribe()
has_result = False
if blocking:
Expand Down
4 changes: 2 additions & 2 deletions ydb/aio/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,8 +341,8 @@ async def _get_session_from_queue(self, timeout: float):
async def acquire(self, timeout: float = None, retry_timeout: float = None, retry_num: int = None) -> ydb.ISession:

if self._should_stop.is_set():
self._logger.debug("Acquired not inited session")
return self._create()
self._logger.error("Take session from closed session pool")
raise ValueError("Take session from closed session pool.")

if retry_timeout is None:
retry_timeout = timeout
Expand Down