From cbfb86abf53b4c6a889049d036d198f06d436fab Mon Sep 17 00:00:00 2001 From: Timofey Koolin Date: Thu, 2 Mar 2023 12:53:49 +0300 Subject: [PATCH] rename producer_and_message_group_id to producer_id and make it optional --- CHANGELOG.md | 2 + examples/topic/writer_async_example.py | 6 +- examples/topic/writer_example.py | 8 +-- tests/conftest.py | 19 ++++++- tests/topics/test_topic_writer.py | 57 +++++++++++++------ ydb/_topic_writer/topic_writer.py | 41 +++++++------ .../topic_writer_asyncio_test.py | 4 +- ydb/topic.py | 23 ++------ 8 files changed, 95 insertions(+), 65 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 760269a3..3043520c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ * BROKEN CHANGES: change names of public method in topic client +* BROKEN CHANGES: rename parameter producer_and_message_group_id to producer_id +* producer_id is optional now ## 3.0.1b5 ## * Remove six package from code and dependencies (remove support python2) diff --git a/examples/topic/writer_async_example.py b/examples/topic/writer_async_example.py index 30fbecab..548ef1aa 100644 --- a/examples/topic/writer_async_example.py +++ b/examples/topic/writer_async_example.py @@ -10,7 +10,7 @@ async def create_writer(db: ydb.aio.Driver): async with ydb.TopicClientAsyncIO(db).writer( "/database/topic/path", - producer_and_message_group_id="producer-id", + producer_id="producer-id", ) as writer: await writer.write(TopicWriterMessage("asd")) @@ -18,7 +18,7 @@ async def create_writer(db: ydb.aio.Driver): async def connect_and_wait(db: ydb.aio.Driver): async with ydb.TopicClientAsyncIO(db).writer( "/database/topic/path", - producer_and_message_group_id="producer-id", + producer_id="producer-id", ) as writer: writer.wait_init() @@ -26,7 +26,7 @@ async def connect_and_wait(db: ydb.aio.Driver): async def connect_without_context_manager(db: ydb.aio.Driver): writer = ydb.TopicClientAsyncIO(db).writer( "/database/topic/path", - producer_and_message_group_id="producer-id", + producer_id="producer-id", ) try: pass # some code diff --git a/examples/topic/writer_example.py b/examples/topic/writer_example.py index 10f7db21..e95107d1 100644 --- a/examples/topic/writer_example.py +++ b/examples/topic/writer_example.py @@ -15,7 +15,7 @@ async def connect(): ) writer = ydb.TopicClientAsyncIO(db).writer( "/local/topic", - producer_and_message_group_id="producer-id", + producer_id="producer-id", ) await writer.write(TopicWriterMessage("asd")) @@ -23,7 +23,7 @@ async def connect(): def create_writer(db: ydb.Driver): with ydb.TopicClient(db).writer( "/database/topic/path", - producer_and_message_group_id="producer-id", + producer_id="producer-id", ) as writer: writer.write(TopicWriterMessage("asd")) @@ -31,7 +31,7 @@ def create_writer(db: ydb.Driver): def connect_and_wait(db: ydb.Driver): with ydb.TopicClient(db).writer( "/database/topic/path", - producer_and_message_group_id="producer-id", + producer_id="producer-id", ) as writer: writer.wait() @@ -39,7 +39,7 @@ def connect_and_wait(db: ydb.Driver): def connect_without_context_manager(db: ydb.Driver): writer = ydb.TopicClient(db).writer( "/database/topic/path", - producer_and_message_group_id="producer-id", + producer_id="producer-id", ) try: pass # some code diff --git a/tests/conftest.py b/tests/conftest.py index e94a83dc..6fa1f174 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -131,11 +131,24 @@ async def topic_path(driver, topic_consumer, database) -> str: @pytest.fixture() @pytest.mark.asyncio() async def topic_with_messages(driver, topic_path): - writer = driver.topic_client.writer( - topic_path, producer_and_message_group_id="fixture-producer-id" - ) + writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id") await writer.write_with_ack( ydb.TopicWriterMessage(data="123".encode()), ydb.TopicWriterMessage(data="456".encode()), ) await writer.close() + + +@pytest.fixture() +@pytest.mark.asyncio() +async def topic_reader(driver, topic_consumer, topic_path) -> ydb.TopicReaderAsyncIO: + reader = driver.topic_client.reader(topic=topic_path, consumer=topic_consumer) + yield reader + await reader.close() + + +@pytest.fixture() +def topic_reader_sync(driver_sync, topic_consumer, topic_path) -> ydb.TopicReader: + reader = driver_sync.topic_client.reader(topic=topic_path, consumer=topic_consumer) + yield reader + reader.close() diff --git a/tests/topics/test_topic_writer.py b/tests/topics/test_topic_writer.py index 5ae976b8..e7db0e23 100644 --- a/tests/topics/test_topic_writer.py +++ b/tests/topics/test_topic_writer.py @@ -6,16 +6,14 @@ @pytest.mark.asyncio class TestTopicWriterAsyncIO: async def test_send_message(self, driver: ydb.aio.Driver, topic_path): - writer = driver.topic_client.writer( - topic_path, producer_and_message_group_id="test" - ) + writer = driver.topic_client.writer(topic_path, producer_id="test") await writer.write(ydb.TopicWriterMessage(data="123".encode())) await writer.close() async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path): async with driver.topic_client.writer( topic_path, - producer_and_message_group_id="test", + producer_id="test", auto_seqno=False, ) as writer: await writer.write_with_ack( @@ -24,16 +22,28 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path): async with driver.topic_client.writer( topic_path, - producer_and_message_group_id="test", - get_last_seqno=True, + producer_id="test", ) as writer2: init_info = await writer2.wait_init() assert init_info.last_seqno == 5 + async def test_random_producer_id( + self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO + ): + async with driver.topic_client.writer(topic_path) as writer: + await writer.write(ydb.TopicWriterMessage(data="123".encode())) + async with driver.topic_client.writer(topic_path) as writer: + await writer.write(ydb.TopicWriterMessage(data="123".encode())) + + batch1 = await topic_reader.receive_batch() + batch2 = await topic_reader.receive_batch() + + assert batch1.messages[0].producer_id != batch2.messages[0].producer_id + async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path): async with driver.topic_client.writer( topic_path, - producer_and_message_group_id="test", + producer_id="test", auto_seqno=False, ) as writer: last_seqno = 0 @@ -45,8 +55,7 @@ async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path): async with driver.topic_client.writer( topic_path, - producer_and_message_group_id="test", - get_last_seqno=True, + producer_id="test", ) as writer: init_info = await writer.wait_init() assert init_info.last_seqno == last_seqno @@ -54,24 +63,21 @@ async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path): class TestTopicWriterSync: def test_send_message(self, driver_sync: ydb.Driver, topic_path): - writer = driver_sync.topic_client.writer( - topic_path, producer_and_message_group_id="test" - ) + writer = driver_sync.topic_client.writer(topic_path, producer_id="test") writer.write(ydb.TopicWriterMessage(data="123".encode())) writer.close() def test_wait_last_seqno(self, driver_sync: ydb.Driver, topic_path): with driver_sync.topic_client.writer( topic_path, - producer_and_message_group_id="test", + producer_id="test", auto_seqno=False, ) as writer: writer.write_with_ack(ydb.TopicWriterMessage(data="123".encode(), seqno=5)) with driver_sync.topic_client.writer( topic_path, - producer_and_message_group_id="test", - get_last_seqno=True, + producer_id="test", ) as writer2: init_info = writer2.wait_init() assert init_info.last_seqno == 5 @@ -79,7 +85,7 @@ def test_wait_last_seqno(self, driver_sync: ydb.Driver, topic_path): def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path): with driver_sync.topic_client.writer( topic_path, - producer_and_message_group_id="test", + producer_id="test", auto_seqno=False, ) as writer: last_seqno = 0 @@ -89,8 +95,23 @@ def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path): with driver_sync.topic_client.writer( topic_path, - producer_and_message_group_id="test", - get_last_seqno=True, + producer_id="test", ) as writer: init_info = writer.wait_init() assert init_info.last_seqno == last_seqno + + def test_random_producer_id( + self, + driver_sync: ydb.aio.Driver, + topic_path, + topic_reader_sync: ydb.TopicReader, + ): + with driver_sync.topic_client.writer(topic_path) as writer: + writer.write(ydb.TopicWriterMessage(data="123".encode())) + with driver_sync.topic_client.writer(topic_path) as writer: + writer.write(ydb.TopicWriterMessage(data="123".encode())) + + batch1 = topic_reader_sync.receive_batch() + batch2 = topic_reader_sync.receive_batch() + + assert batch1.messages[0].producer_id != batch2.messages[0].producer_id diff --git a/ydb/_topic_writer/topic_writer.py b/ydb/_topic_writer/topic_writer.py index 75858324..aa147558 100644 --- a/ydb/_topic_writer/topic_writer.py +++ b/ydb/_topic_writer/topic_writer.py @@ -1,8 +1,9 @@ import datetime import enum +import uuid from dataclasses import dataclass from enum import Enum -from typing import List, Union, TextIO, BinaryIO, Optional, Callable, Mapping, Any, Dict +from typing import List, Union, TextIO, BinaryIO, Optional, Any, Dict import typing @@ -16,21 +17,31 @@ @dataclass class PublicWriterSettings: + """ + Settings for topic writer. + + order of fields IS NOT stable, use keywords only + """ + topic: str - producer_and_message_group_id: str + producer_id: Optional[str] = None session_metadata: Optional[Dict[str, str]] = None - encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None - serializer: Union[Callable[[Any], bytes], None] = None - send_buffer_count: Optional[int] = 10000 - send_buffer_bytes: Optional[int] = 100 * 1024 * 1024 partition_id: Optional[int] = None - codec: Optional[int] = None - codec_autoselect: bool = True auto_seqno: bool = True auto_created_at: bool = True - get_last_seqno: bool = False - retry_policy: Optional["RetryPolicy"] = None - update_token_interval: Union[int, float] = 3600 + # get_last_seqno: bool = False + # encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None + # serializer: Union[Callable[[Any], bytes], None] = None + # send_buffer_count: Optional[int] = 10000 + # send_buffer_bytes: Optional[int] = 100 * 1024 * 1024 + # codec: Optional[int] = None + # codec_autoselect: bool = True + # retry_policy: Optional["RetryPolicy"] = None + # update_token_interval: Union[int, float] = 3600 + + def __post_init__(self): + if self.producer_id is None: + self.producer_id = uuid.uuid4().hex @dataclass @@ -55,18 +66,16 @@ def __init__(self, settings: PublicWriterSettings): def create_init_request(self) -> StreamWriteMessage.InitRequest: return StreamWriteMessage.InitRequest( path=self.topic, - producer_id=self.producer_and_message_group_id, + producer_id=self.producer_id, write_session_meta=self.session_metadata, partitioning=self.get_partitioning(), - get_last_seq_no=self.get_last_seqno, + get_last_seq_no=True, ) def get_partitioning(self) -> StreamWriteMessage.PartitioningType: if self.partition_id is not None: return StreamWriteMessage.PartitioningPartitionID(self.partition_id) - return StreamWriteMessage.PartitioningMessageGroupID( - self.producer_and_message_group_id - ) + return StreamWriteMessage.PartitioningMessageGroupID(self.producer_id) class SendMode(Enum): diff --git a/ydb/_topic_writer/topic_writer_asyncio_test.py b/ydb/_topic_writer/topic_writer_asyncio_test.py index 7f19a4dd..32bb3de5 100644 --- a/ydb/_topic_writer/topic_writer_asyncio_test.py +++ b/ydb/_topic_writer/topic_writer_asyncio_test.py @@ -238,7 +238,7 @@ def default_settings(self) -> WriterSettings: return WriterSettings( PublicWriterSettings( topic="/local/topic", - producer_and_message_group_id="test-producer", + producer_id="test-producer", auto_seqno=False, auto_created_at=False, ) @@ -487,7 +487,7 @@ async def close(self): def default_settings(self) -> PublicWriterSettings: return PublicWriterSettings( topic="/local/topic", - producer_and_message_group_id="producer-id", + producer_id="producer-id", ) @pytest.fixture(autouse=True) diff --git a/ydb/topic.py b/ydb/topic.py index 3c70b061..45b8b073 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -1,5 +1,5 @@ import datetime -from typing import List, Callable, Union, Mapping, Any, Optional, Dict +from typing import List, Union, Mapping, Optional, Dict from . import aio, Credentials, _apis @@ -143,19 +143,11 @@ def writer( self, topic, *, - producer_and_message_group_id: str, + producer_id: Optional[str] = None, # default - random session_metadata: Mapping[str, str] = None, - encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None, - serializer: Union[Callable[[Any], bytes], None] = None, - send_buffer_count: Union[int, None] = 10000, - send_buffer_bytes: Union[int, None] = 100 * 1024 * 1024, partition_id: Union[int, None] = None, - codec: Union[int, None] = None, - codec_autoselect: bool = True, auto_seqno: bool = True, auto_created_at: bool = True, - get_last_seqno: bool = False, - retry_policy: Union["TopicWriterRetryPolicy", None] = None, ) -> TopicWriterAsyncIO: args = locals() del args["self"] @@ -265,19 +257,12 @@ def reader( def writer( self, topic, - producer_and_message_group_id: str, + *, + producer_id: Optional[str] = None, # default - random session_metadata: Mapping[str, str] = None, - encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None, - serializer: Union[Callable[[Any], bytes], None] = None, - send_buffer_count: Union[int, None] = 10000, - send_buffer_bytes: Union[int, None] = 100 * 1024 * 1024, partition_id: Union[int, None] = None, - codec: Union[int, None] = None, - codec_autoselect: bool = True, auto_seqno: bool = True, auto_created_at: bool = True, - get_last_seqno: bool = False, - retry_policy: Union["TopicWriterRetryPolicy", None] = None, ) -> TopicWriter: args = locals() del args["self"]