diff --git a/tests/conftest.py b/tests/conftest.py index 336ffdae..2681037b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,7 @@ import pytest import ydb import time +import subprocess @pytest.fixture(autouse=True, scope="session") @@ -96,3 +97,21 @@ async def driver(endpoint, database, event_loop): yield driver await driver.stop(timeout=10) + + +@pytest.fixture() +def topic_path(endpoint) -> str: + subprocess.run( + """docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic drop /local/test-topic""" + % endpoint, + shell=True, + ) + res = subprocess.run( + """docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic create /local/test-topic""" + % endpoint, + shell=True, + capture_output=True, + ) + assert res.returncode == 0, res.stderr + res.stdout + + return "/local/test-topic" diff --git a/tests/topics/test_topic_writer.py b/tests/topics/test_topic_writer.py new file mode 100644 index 00000000..3071c655 --- /dev/null +++ b/tests/topics/test_topic_writer.py @@ -0,0 +1,30 @@ +import pytest + +import ydb.aio + + +@pytest.mark.asyncio +class TestTopicWriterAsyncIO: + async def test_send_message(self, driver: ydb.aio.Driver, topic_path): + writer = driver.topic_client.topic_writer( + topic_path, producer_and_message_group_id="test" + ) + writer.write(ydb.TopicWriterMessage(data="123".encode())) + + async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path): + async with driver.topic_client.topic_writer( + topic_path, + producer_and_message_group_id="test", + auto_seqno=False, + ) as writer: + await writer.write_with_ack( + ydb.TopicWriterMessage(data="123".encode(), seqno=5) + ) + + async with driver.topic_client.topic_writer( + topic_path, + producer_and_message_group_id="test", + get_last_seqno=True, + ) as writer2: + init_info = await writer2.wait_init() + assert init_info.last_seqno == 5 diff --git a/ydb/_topic_writer/topic_writer.py b/ydb/_topic_writer/topic_writer.py index da614bf2..ecc20e10 100644 --- a/ydb/_topic_writer/topic_writer.py +++ b/ydb/_topic_writer/topic_writer.py @@ -165,6 +165,9 @@ class Skipped: pass +PublicWriteResultTypes = Union[PublicWriteResult.Written, PublicWriteResult.Skipped] + + class WriterSettings(PublicWriterSettings): def __init__(self, settings: PublicWriterSettings): self.__dict__ = settings.__dict__.copy() diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index b4373b17..61ab1e8c 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -1,6 +1,5 @@ import asyncio import datetime -import threading from collections import deque from typing import Deque, AsyncIterator, Union, List, Optional, Callable @@ -9,13 +8,13 @@ PublicWriterSettings, WriterSettings, Writer, - PublicWriteResult, PublicMessage, PublicWriterInitInfo, InternalMessage, TopicWriterStopped, TopicWriterError, messages_to_proto_requests, + PublicWriteResultTypes, ) from .. import ( _apis, @@ -35,7 +34,7 @@ class WriterAsyncIO: _loop: asyncio.AbstractEventLoop _reconnector: "WriterAsyncIOReconnector" - _lock: threading.Lock + _lock: asyncio.Lock _closed: bool @property @@ -43,13 +42,14 @@ def last_seqno(self) -> int: raise NotImplementedError() def __init__(self, driver: SupportedDriverType, settings: PublicWriterSettings): + self._lock = asyncio.Lock() self._loop = asyncio.get_running_loop() self._closed = False self._reconnector = WriterAsyncIOReconnector( driver=driver, settings=WriterSettings(settings) ) - async def __aenter__(self): + async def __aenter__(self) -> "WriterAsyncIO": return self async def __aexit__(self, exc_type, exc_val, exc_tb): @@ -62,7 +62,7 @@ def __del__(self): self._loop.call_soon(self.close) async def close(self): - with self._lock: + async with self._lock: if self._closed: return self._closed = True @@ -73,7 +73,7 @@ async def write_with_ack( self, messages: Union[Writer.MessageType, List[Writer.MessageType]], *args: Optional[Writer.MessageType], - ) -> Union[PublicWriteResult, List[PublicWriteResult]]: + ) -> Union[PublicWriteResultTypes, List[PublicWriteResultTypes]]: """ IT IS SLOWLY WAY. IT IS BAD CHOISE IN MOST CASES. It is recommended to use write with optionally flush or write_with_ack_futures and receive acks by wait futures.