From 9097cf46ca42781daebee9a0c16401979605cee6 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 18 May 2023 18:54:28 +0300 Subject: [PATCH 1/3] Fix hang while write messages to closed driver. Close #296 --- tests/topics/test_topic_writer.py | 54 ++++++++++++++++++++++- ydb/_topic_writer/topic_writer_asyncio.py | 4 +- ydb/_topic_writer/topic_writer_sync.py | 8 +++- ydb/aio/pool.py | 2 + ydb/pool.py | 3 ++ ydb/topic.py | 2 +- 6 files changed, 68 insertions(+), 5 deletions(-) diff --git a/tests/topics/test_topic_writer.py b/tests/topics/test_topic_writer.py index 327cb81e..336bd7e4 100644 --- a/tests/topics/test_topic_writer.py +++ b/tests/topics/test_topic_writer.py @@ -96,12 +96,38 @@ async def test_write_multi_message_with_ack( None, ], ) - async def test_write_encoded(self, driver: ydb.Driver, topic_path: str, codec): + async def test_write_encoded(self, driver: ydb.aio.Driver, topic_path: str, codec): async with driver.topic_client.writer(topic_path, codec=codec) as writer: await writer.write("a" * 1000) await writer.write("b" * 1000) await writer.write("c" * 1000) + async def test_create_writer_after_stop(self, driver: ydb.aio.Driver, topic_path: str): + await driver.stop() + with pytest.raises(ydb.Error): + async with driver.topic_client.writer(topic_path) as writer: + await writer.write_with_ack("123") + + async def test_send_message_after_stop(self, driver: ydb.aio.Driver, topic_path: str): + writer = driver.topic_client.writer(topic_path) + await driver.stop() + with pytest.raises(ydb.Error): + await writer.write_with_ack("123") + + async def test_preserve_exception_on_cm_close(self, driver: ydb.aio.Driver, topic_path: str): + class TestException(Exception): + pass + + with pytest.raises(TestException): + async with driver.topic_client.writer(topic_path) as writer: + driver.stop() # will raise exception on topic writer __exit__ + try: + writer.write("123") + except ydb.Error: + pass + + raise TestException() + class TestTopicWriterSync: def test_send_message(self, driver_sync: ydb.Driver, topic_path): @@ -212,3 +238,29 @@ def test_start_many_sync_writers_in_parallel(self, driver_sync: ydb.Driver, topi for writer in writers: writer.close() + + def test_create_writer_after_stop(self, driver_sync: ydb.Driver, topic_path: str): + driver_sync.stop() + with pytest.raises(ydb.Error): + with driver_sync.topic_client.writer(topic_path) as writer: + writer.write_with_ack("123") + + def test_send_message_after_stop(self, driver_sync: ydb.Driver, topic_path: str): + writer = driver_sync.topic_client.writer(topic_path) + driver_sync.stop() + with pytest.raises(ydb.Error): + writer.write_with_ack("123") + + def test_preserve_exception_on_cm_close(self, driver_sync: ydb.Driver, topic_path: str): + class TestException(Exception): + pass + + with pytest.raises(TestException): + with driver_sync.topic_client.writer(topic_path) as writer: + driver_sync.stop() # will raise exception on topic writer __exit__ + try: + writer.write("123") + except ydb.Error: + pass + + raise TestException() diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 30ab9fb3..9aea2f30 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -330,7 +330,7 @@ def _prepare_internal_messages(self, messages: List[PublicMessage]) -> List[Inte def _check_stop(self): if self._stop_reason.done(): - raise self._stop_reason.result() + raise self._stop_reason.exception() async def _connection_loop(self): retry_settings = RetrySettings() # todo @@ -543,7 +543,7 @@ def _stop(self, reason: BaseException): if self._stop_reason.done(): return - self._stop_reason.set_result(reason) + self._stop_reason.set_exception(reason) for f in self._messages_future: f.set_exception(reason) diff --git a/ydb/_topic_writer/topic_writer_sync.py b/ydb/_topic_writer/topic_writer_sync.py index 43c4fec9..5e7b542c 100644 --- a/ydb/_topic_writer/topic_writer_sync.py +++ b/ydb/_topic_writer/topic_writer_sync.py @@ -5,6 +5,7 @@ from concurrent.futures import Future from typing import Union, List, Optional +import ydb from .._grpc.grpcwrapper.common_utils import SupportedDriverType from .topic_writer import ( PublicWriterSettings, @@ -56,7 +57,12 @@ def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): - self.close() + try: + self.close() + except ydb.Error: + if exc_val: + raise exc_val + raise def __del__(self): self.close(flush=False) diff --git a/ydb/aio/pool.py b/ydb/aio/pool.py index 08bfaacb..6e95dd6f 100644 --- a/ydb/aio/pool.py +++ b/ydb/aio/pool.py @@ -241,6 +241,8 @@ async def __call__( preferred_endpoint=None, fast_fail=False, ): + if self._stopped: + raise issues.Error("Driver was stopped") wait_timeout = settings.timeout if settings else 10 try: connection = await self._store.get(preferred_endpoint, fast_fail=fast_fail, wait_timeout=wait_timeout) diff --git a/ydb/pool.py b/ydb/pool.py index 736344e3..e0bf2f15 100644 --- a/ydb/pool.py +++ b/ydb/pool.py @@ -429,6 +429,9 @@ def __call__( :return: A result of computation """ + if self._stopped: + raise issues.Error("Driver was stopped") + tracing.trace(self.tracer, {"request": request, "stub": stub, "rpc_name": rpc_name}) try: connection = self._store.get(preferred_endpoint) diff --git a/ydb/topic.py b/ydb/topic.py index abf93903..38ecec26 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -337,7 +337,7 @@ def writer( encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None, encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool ) -> TopicWriter: - args = locals() + args = locals().copy() del args["self"] self._check_closed() From dda52b90216eec649f51563b3c29a7fd174d0ccd Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Fri, 19 May 2023 17:55:43 +0300 Subject: [PATCH 2/3] Fix hang while write messages to closed driver. Close #296 --- tests/topics/test_topic_writer.py | 22 +++++++++++++--------- ydb/_topic_writer/topic_writer_asyncio.py | 6 +++++- ydb/_topic_writer/topic_writer_sync.py | 7 +++---- ydb/topic.py | 2 +- 4 files changed, 22 insertions(+), 15 deletions(-) diff --git a/tests/topics/test_topic_writer.py b/tests/topics/test_topic_writer.py index 336bd7e4..3817e34d 100644 --- a/tests/topics/test_topic_writer.py +++ b/tests/topics/test_topic_writer.py @@ -1,4 +1,6 @@ from __future__ import annotations + +import asyncio from typing import List import pytest @@ -120,11 +122,12 @@ class TestException(Exception): with pytest.raises(TestException): async with driver.topic_client.writer(topic_path) as writer: - driver.stop() # will raise exception on topic writer __exit__ - try: - writer.write("123") - except ydb.Error: - pass + await writer.wait_init() + await driver.stop() # will raise exception on topic writer __exit__ + + # ensure writer has exception internally + with pytest.raises((ydb.Error, asyncio.CancelledError)): + await writer.write_with_ack("123") raise TestException() @@ -257,10 +260,11 @@ class TestException(Exception): with pytest.raises(TestException): with driver_sync.topic_client.writer(topic_path) as writer: + writer.wait_init() driver_sync.stop() # will raise exception on topic writer __exit__ - try: - writer.write("123") - except ydb.Error: - pass + + # ensure writer has exception internally + with pytest.raises(ydb.Error): + writer.write_with_ack("123") raise TestException() diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 9aea2f30..83984a64 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -65,7 +65,11 @@ async def __aenter__(self) -> "WriterAsyncIO": return self async def __aexit__(self, exc_type, exc_val, exc_tb): - await self.close() + try: + await self.close() + except BaseException as e: + if exc_val is None: + raise def __del__(self): if self._closed or self._loop.is_closed(): diff --git a/ydb/_topic_writer/topic_writer_sync.py b/ydb/_topic_writer/topic_writer_sync.py index 5e7b542c..76c559dc 100644 --- a/ydb/_topic_writer/topic_writer_sync.py +++ b/ydb/_topic_writer/topic_writer_sync.py @@ -59,10 +59,9 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): try: self.close() - except ydb.Error: - if exc_val: - raise exc_val - raise + except BaseException as e: + if exc_val is None: + raise def __del__(self): self.close(flush=False) diff --git a/ydb/topic.py b/ydb/topic.py index 38ecec26..abf93903 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -337,7 +337,7 @@ def writer( encoders: Optional[Mapping[_ydb_topic_public_types.PublicCodec, Callable[[bytes], bytes]]] = None, encoder_executor: Optional[concurrent.futures.Executor] = None, # default shared client executor pool ) -> TopicWriter: - args = locals().copy() + args = locals() del args["self"] self._check_closed() From 2a4141fae1a2b9bf0d18466075e972a8a585a6c3 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Fri, 19 May 2023 17:57:25 +0300 Subject: [PATCH 3/3] Fix linter --- ydb/_topic_writer/topic_writer_asyncio.py | 2 +- ydb/_topic_writer/topic_writer_sync.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index 83984a64..d83187fc 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -67,7 +67,7 @@ async def __aenter__(self) -> "WriterAsyncIO": async def __aexit__(self, exc_type, exc_val, exc_tb): try: await self.close() - except BaseException as e: + except BaseException: if exc_val is None: raise diff --git a/ydb/_topic_writer/topic_writer_sync.py b/ydb/_topic_writer/topic_writer_sync.py index 76c559dc..a5193caf 100644 --- a/ydb/_topic_writer/topic_writer_sync.py +++ b/ydb/_topic_writer/topic_writer_sync.py @@ -5,7 +5,6 @@ from concurrent.futures import Future from typing import Union, List, Optional -import ydb from .._grpc.grpcwrapper.common_utils import SupportedDriverType from .topic_writer import ( PublicWriterSettings, @@ -59,7 +58,7 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): try: self.close() - except BaseException as e: + except BaseException: if exc_val is None: raise