From 9578c400914dc88ba352be3edf68e3c62d8aa092 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Tue, 24 Jan 2023 18:01:59 +0300 Subject: [PATCH 1/3] sync --- tests/conftest.py | 22 +++++++++++++++++++--- tests/topics/test_topic_writer_async.py | 4 ++++ 2 files changed, 23 insertions(+), 3 deletions(-) create mode 100644 tests/topics/test_topic_writer_async.py diff --git a/tests/conftest.py b/tests/conftest.py index 336ffdae..7a168717 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,7 @@ import pytest import ydb import time +import subprocess @pytest.fixture(autouse=True, scope="session") @@ -56,9 +57,9 @@ def secure_endpoint(pytestconfig, session_scoped_container_getter): assert os.path.exists(ca_path) os.environ["YDB_SSL_ROOT_CERTIFICATES_FILE"] = ca_path with ydb.Driver( - endpoint="grpcs://localhost:2135", - database="/local", - root_certificates=ydb.load_ydb_root_certificate(), + endpoint="grpcs://localhost:2135", + database="/local", + root_certificates=ydb.load_ydb_root_certificate(), ) as driver: wait_container_ready(driver) yield "localhost:2135" @@ -96,3 +97,18 @@ async def driver(endpoint, database, event_loop): yield driver await driver.stop(timeout=10) + + +@pytest.fixture() +def topic_path() -> str: + subprocess.run( + """docker-compose exec ydb /ydb -e grpc://localhost:2136 -d /local topic drop /local/test-topic""", + shell=True, + ) + res = subprocess.run( + """exec ydb /ydb -e grpc://localhost:2136 -d /local topic create /local/test-topic""", + shell=True, + ) + assert res.returncode == 0 + + return "/local/test-topic" diff --git a/tests/topics/test_topic_writer_async.py b/tests/topics/test_topic_writer_async.py new file mode 100644 index 00000000..6fd7d329 --- /dev/null +++ b/tests/topics/test_topic_writer_async.py @@ -0,0 +1,4 @@ + + +def test_write_single_message(driver, topic_path): + print(topic_path) From 28e87f58f1ee204086c3d207df1c07cca2167774 Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Mon, 30 Jan 2023 17:44:54 +0300 Subject: [PATCH 2/3] sync --- tests/topics/test_topic_writer_async.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/topics/test_topic_writer_async.py b/tests/topics/test_topic_writer_async.py index 6fd7d329..f823ffd4 100644 --- a/tests/topics/test_topic_writer_async.py +++ b/tests/topics/test_topic_writer_async.py @@ -1,4 +1,8 @@ +import pytest -def test_write_single_message(driver, topic_path): - print(topic_path) +@pytest.mark.asyncio +class TesttopicWriter: + async def test_send_message(self, driver): + pass + From 994eef3b09d19be470de9cd433ab72c4c7dbdb0c Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Tue, 31 Jan 2023 14:14:53 +0300 Subject: [PATCH 3/3] use anonymous credentials by default --- tests/conftest.py | 17 +++++++------ tests/topics/test_topic_writer.py | 30 +++++++++++++++++++++++ tests/topics/test_topic_writer_async.py | 8 ------ ydb/_topic_writer/topic_writer.py | 3 +++ ydb/_topic_writer/topic_writer_asyncio.py | 12 ++++----- 5 files changed, 49 insertions(+), 21 deletions(-) create mode 100644 tests/topics/test_topic_writer.py delete mode 100644 tests/topics/test_topic_writer_async.py diff --git a/tests/conftest.py b/tests/conftest.py index 7a168717..2681037b 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -57,9 +57,9 @@ def secure_endpoint(pytestconfig, session_scoped_container_getter): assert os.path.exists(ca_path) os.environ["YDB_SSL_ROOT_CERTIFICATES_FILE"] = ca_path with ydb.Driver( - endpoint="grpcs://localhost:2135", - database="/local", - root_certificates=ydb.load_ydb_root_certificate(), + endpoint="grpcs://localhost:2135", + database="/local", + root_certificates=ydb.load_ydb_root_certificate(), ) as driver: wait_container_ready(driver) yield "localhost:2135" @@ -100,15 +100,18 @@ async def driver(endpoint, database, event_loop): @pytest.fixture() -def topic_path() -> str: +def topic_path(endpoint) -> str: subprocess.run( - """docker-compose exec ydb /ydb -e grpc://localhost:2136 -d /local topic drop /local/test-topic""", + """docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic drop /local/test-topic""" + % endpoint, shell=True, ) res = subprocess.run( - """exec ydb /ydb -e grpc://localhost:2136 -d /local topic create /local/test-topic""", + """docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic create /local/test-topic""" + % endpoint, shell=True, + capture_output=True, ) - assert res.returncode == 0 + assert res.returncode == 0, res.stderr + res.stdout return "/local/test-topic" diff --git a/tests/topics/test_topic_writer.py b/tests/topics/test_topic_writer.py new file mode 100644 index 00000000..3071c655 --- /dev/null +++ b/tests/topics/test_topic_writer.py @@ -0,0 +1,30 @@ +import pytest + +import ydb.aio + + +@pytest.mark.asyncio +class TestTopicWriterAsyncIO: + async def test_send_message(self, driver: ydb.aio.Driver, topic_path): + writer = driver.topic_client.topic_writer( + topic_path, producer_and_message_group_id="test" + ) + writer.write(ydb.TopicWriterMessage(data="123".encode())) + + async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path): + async with driver.topic_client.topic_writer( + topic_path, + producer_and_message_group_id="test", + auto_seqno=False, + ) as writer: + await writer.write_with_ack( + ydb.TopicWriterMessage(data="123".encode(), seqno=5) + ) + + async with driver.topic_client.topic_writer( + topic_path, + producer_and_message_group_id="test", + get_last_seqno=True, + ) as writer2: + init_info = await writer2.wait_init() + assert init_info.last_seqno == 5 diff --git a/tests/topics/test_topic_writer_async.py b/tests/topics/test_topic_writer_async.py deleted file mode 100644 index f823ffd4..00000000 --- a/tests/topics/test_topic_writer_async.py +++ /dev/null @@ -1,8 +0,0 @@ -import pytest - - -@pytest.mark.asyncio -class TesttopicWriter: - async def test_send_message(self, driver): - pass - diff --git a/ydb/_topic_writer/topic_writer.py b/ydb/_topic_writer/topic_writer.py index da614bf2..ecc20e10 100644 --- a/ydb/_topic_writer/topic_writer.py +++ b/ydb/_topic_writer/topic_writer.py @@ -165,6 +165,9 @@ class Skipped: pass +PublicWriteResultTypes = Union[PublicWriteResult.Written, PublicWriteResult.Skipped] + + class WriterSettings(PublicWriterSettings): def __init__(self, settings: PublicWriterSettings): self.__dict__ = settings.__dict__.copy() diff --git a/ydb/_topic_writer/topic_writer_asyncio.py b/ydb/_topic_writer/topic_writer_asyncio.py index b4373b17..61ab1e8c 100644 --- a/ydb/_topic_writer/topic_writer_asyncio.py +++ b/ydb/_topic_writer/topic_writer_asyncio.py @@ -1,6 +1,5 @@ import asyncio import datetime -import threading from collections import deque from typing import Deque, AsyncIterator, Union, List, Optional, Callable @@ -9,13 +8,13 @@ PublicWriterSettings, WriterSettings, Writer, - PublicWriteResult, PublicMessage, PublicWriterInitInfo, InternalMessage, TopicWriterStopped, TopicWriterError, messages_to_proto_requests, + PublicWriteResultTypes, ) from .. import ( _apis, @@ -35,7 +34,7 @@ class WriterAsyncIO: _loop: asyncio.AbstractEventLoop _reconnector: "WriterAsyncIOReconnector" - _lock: threading.Lock + _lock: asyncio.Lock _closed: bool @property @@ -43,13 +42,14 @@ def last_seqno(self) -> int: raise NotImplementedError() def __init__(self, driver: SupportedDriverType, settings: PublicWriterSettings): + self._lock = asyncio.Lock() self._loop = asyncio.get_running_loop() self._closed = False self._reconnector = WriterAsyncIOReconnector( driver=driver, settings=WriterSettings(settings) ) - async def __aenter__(self): + async def __aenter__(self) -> "WriterAsyncIO": return self async def __aexit__(self, exc_type, exc_val, exc_tb): @@ -62,7 +62,7 @@ def __del__(self): self._loop.call_soon(self.close) async def close(self): - with self._lock: + async with self._lock: if self._closed: return self._closed = True @@ -73,7 +73,7 @@ async def write_with_ack( self, messages: Union[Writer.MessageType, List[Writer.MessageType]], *args: Optional[Writer.MessageType], - ) -> Union[PublicWriteResult, List[PublicWriteResult]]: + ) -> Union[PublicWriteResultTypes, List[PublicWriteResultTypes]]: """ IT IS SLOWLY WAY. IT IS BAD CHOISE IN MOST CASES. It is recommended to use write with optionally flush or write_with_ack_futures and receive acks by wait futures.