Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 57 additions & 1 deletion tests/topics/test_topic_writer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from __future__ import annotations

import asyncio
from typing import List

import pytest
Expand Down Expand Up @@ -96,12 +98,39 @@ async def test_write_multi_message_with_ack(
None,
],
)
async def test_write_encoded(self, driver: ydb.Driver, topic_path: str, codec):
async def test_write_encoded(self, driver: ydb.aio.Driver, topic_path: str, codec):
async with driver.topic_client.writer(topic_path, codec=codec) as writer:
await writer.write("a" * 1000)
await writer.write("b" * 1000)
await writer.write("c" * 1000)

async def test_create_writer_after_stop(self, driver: ydb.aio.Driver, topic_path: str):
await driver.stop()
with pytest.raises(ydb.Error):
async with driver.topic_client.writer(topic_path) as writer:
await writer.write_with_ack("123")

async def test_send_message_after_stop(self, driver: ydb.aio.Driver, topic_path: str):
writer = driver.topic_client.writer(topic_path)
await driver.stop()
with pytest.raises(ydb.Error):
await writer.write_with_ack("123")

async def test_preserve_exception_on_cm_close(self, driver: ydb.aio.Driver, topic_path: str):
class TestException(Exception):
pass

with pytest.raises(TestException):
async with driver.topic_client.writer(topic_path) as writer:
await writer.wait_init()
await driver.stop() # will raise exception on topic writer __exit__

# ensure writer has exception internally
with pytest.raises((ydb.Error, asyncio.CancelledError)):
await writer.write_with_ack("123")

raise TestException()


class TestTopicWriterSync:
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
Expand Down Expand Up @@ -212,3 +241,30 @@ def test_start_many_sync_writers_in_parallel(self, driver_sync: ydb.Driver, topi

for writer in writers:
writer.close()

def test_create_writer_after_stop(self, driver_sync: ydb.Driver, topic_path: str):
driver_sync.stop()
with pytest.raises(ydb.Error):
with driver_sync.topic_client.writer(topic_path) as writer:
writer.write_with_ack("123")

def test_send_message_after_stop(self, driver_sync: ydb.Driver, topic_path: str):
writer = driver_sync.topic_client.writer(topic_path)
driver_sync.stop()
with pytest.raises(ydb.Error):
writer.write_with_ack("123")

def test_preserve_exception_on_cm_close(self, driver_sync: ydb.Driver, topic_path: str):
class TestException(Exception):
pass

with pytest.raises(TestException):
with driver_sync.topic_client.writer(topic_path) as writer:
writer.wait_init()
driver_sync.stop() # will raise exception on topic writer __exit__

# ensure writer has exception internally
with pytest.raises(ydb.Error):
writer.write_with_ack("123")

raise TestException()
10 changes: 7 additions & 3 deletions ydb/_topic_writer/topic_writer_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ async def __aenter__(self) -> "WriterAsyncIO":
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
try:
await self.close()
except BaseException:
if exc_val is None:
raise

def __del__(self):
if self._closed or self._loop.is_closed():
Expand Down Expand Up @@ -330,7 +334,7 @@ def _prepare_internal_messages(self, messages: List[PublicMessage]) -> List[Inte

def _check_stop(self):
if self._stop_reason.done():
raise self._stop_reason.result()
raise self._stop_reason.exception()

async def _connection_loop(self):
retry_settings = RetrySettings() # todo
Expand Down Expand Up @@ -543,7 +547,7 @@ def _stop(self, reason: BaseException):
if self._stop_reason.done():
return

self._stop_reason.set_result(reason)
self._stop_reason.set_exception(reason)

for f in self._messages_future:
f.set_exception(reason)
Expand Down
6 changes: 5 additions & 1 deletion ydb/_topic_writer/topic_writer_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
try:
self.close()
except BaseException:
if exc_val is None:
raise

def __del__(self):
self.close(flush=False)
Expand Down
2 changes: 2 additions & 0 deletions ydb/aio/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ async def __call__(
preferred_endpoint=None,
fast_fail=False,
):
if self._stopped:
raise issues.Error("Driver was stopped")
wait_timeout = settings.timeout if settings else 10
try:
connection = await self._store.get(preferred_endpoint, fast_fail=fast_fail, wait_timeout=wait_timeout)
Expand Down
3 changes: 3 additions & 0 deletions ydb/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,9 @@ def __call__(

:return: A result of computation
"""
if self._stopped:
raise issues.Error("Driver was stopped")

tracing.trace(self.tracer, {"request": request, "stub": stub, "rpc_name": rpc_name})
try:
connection = self._store.get(preferred_endpoint)
Expand Down