diff --git a/CHANGELOG.md b/CHANGELOG.md index 339be6a4..760269a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* BROKEN CHANGES: change names of public method in topic client + ## 3.0.1b5 ## * Remove six package from code and dependencies (remove support python2) * Use anonymous credentials by default instead of iam metadata (use ydb.driver.credentials_from_env_variables for creds by env var) diff --git a/examples/topic/reader_async_example.py b/examples/topic/reader_async_example.py index e702903f..96142921 100644 --- a/examples/topic/reader_async_example.py +++ b/examples/topic/reader_async_example.py @@ -10,14 +10,12 @@ async def connect(): connection_string="grpc://localhost:2135?database=/local", credentials=ydb.credentials.AnonymousCredentials(), ) - reader = ydb.TopicClientAsyncIO(db).topic_reader( - "/local/topic", consumer="consumer" - ) + reader = ydb.TopicClientAsyncIO(db).reader("/local/topic", consumer="consumer") return reader async def create_reader_and_close_with_context_manager(db: ydb.aio.Driver): - with ydb.TopicClientAsyncIO(db).topic_reader( + with ydb.TopicClientAsyncIO(db).reader( "/database/topic/path", consumer="consumer" ) as reader: async for message in reader.messages(): @@ -91,7 +89,7 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO) async def auto_deserialize_message(db: ydb.aio.Driver): # async, batch work similar to this - async with ydb.TopicClientAsyncIO(db).topic_reader( + async with ydb.TopicClientAsyncIO(db).reader( "/database/topic/path", consumer="asd", deserializer=json.loads ) as reader: async for message in reader.messages(): @@ -133,7 +131,7 @@ def process_batch(batch): async def connect_and_read_few_topics(db: ydb.aio.Driver): - with ydb.TopicClientAsyncIO(db).topic_reader( + with ydb.TopicClientAsyncIO(db).reader( [ "/database/topic/path", ydb.TopicSelector("/database/second-topic", partitions=3), @@ -156,7 +154,7 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None: print(event.topic) print(event.offset) - async with ydb.TopicClientAsyncIO(db).topic_reader( + async with ydb.TopicClientAsyncIO(db).reader( "/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit ) as reader: async for message in reader.messages(): @@ -173,7 +171,7 @@ async def on_get_partition_start_offset( resp.start_offset = 123 return resp - async with ydb.TopicClient(db).topic_reader( + async with ydb.TopicClient(db).reader( "/local/test", consumer="consumer", on_get_partition_start_offset=on_get_partition_start_offset, diff --git a/examples/topic/reader_example.py b/examples/topic/reader_example.py index 7cea2a35..183c51d6 100644 --- a/examples/topic/reader_example.py +++ b/examples/topic/reader_example.py @@ -9,12 +9,12 @@ def connect(): connection_string="grpc://localhost:2135?database=/local", credentials=ydb.credentials.AnonymousCredentials(), ) - reader = ydb.TopicClient(db).topic_reader("/local/topic", consumer="consumer") + reader = ydb.TopicClient(db).reader("/local/topic", consumer="consumer") return reader def create_reader_and_close_with_context_manager(db: ydb.Driver): - with ydb.TopicClient(db).topic_reader( + with ydb.TopicClient(db).reader( "/database/topic/path", consumer="consumer", buffer_size_bytes=123 ) as reader: for message in reader: @@ -81,7 +81,7 @@ def get_one_batch_from_external_loop(reader: ydb.TopicReader): def auto_deserialize_message(db: ydb.Driver): # async, batch work similar to this - reader = ydb.TopicClient(db).topic_reader( + reader = ydb.TopicClient(db).reader( "/database/topic/path", consumer="asd", deserializer=json.loads ) for message in reader.messages(): @@ -123,7 +123,7 @@ def process_batch(batch): def connect_and_read_few_topics(db: ydb.Driver): - with ydb.TopicClient(db).topic_reader( + with ydb.TopicClient(db).reader( [ "/database/topic/path", ydb.TopicSelector("/database/second-topic", partitions=3), @@ -146,7 +146,7 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None: print(event.topic) print(event.offset) - with ydb.TopicClient(db).topic_reader( + with ydb.TopicClient(db).reader( "/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit ) as reader: for message in reader: @@ -164,7 +164,7 @@ def on_get_partition_start_offset( resp.start_offset = 123 return resp - with ydb.TopicClient(db).topic_reader( + with ydb.TopicClient(db).reader( "/local/test", consumer="consumer", on_get_partition_start_offset=on_get_partition_start_offset, diff --git a/examples/topic/writer_async_example.py b/examples/topic/writer_async_example.py index 29c79b08..30fbecab 100644 --- a/examples/topic/writer_async_example.py +++ b/examples/topic/writer_async_example.py @@ -8,7 +8,7 @@ async def create_writer(db: ydb.aio.Driver): - async with ydb.TopicClientAsyncIO(db).topic_writer( + async with ydb.TopicClientAsyncIO(db).writer( "/database/topic/path", producer_and_message_group_id="producer-id", ) as writer: @@ -16,7 +16,7 @@ async def create_writer(db: ydb.aio.Driver): async def connect_and_wait(db: ydb.aio.Driver): - async with ydb.TopicClientAsyncIO(db).topic_writer( + async with ydb.TopicClientAsyncIO(db).writer( "/database/topic/path", producer_and_message_group_id="producer-id", ) as writer: @@ -24,7 +24,7 @@ async def connect_and_wait(db: ydb.aio.Driver): async def connect_without_context_manager(db: ydb.aio.Driver): - writer = ydb.TopicClientAsyncIO(db).topic_writer( + writer = ydb.TopicClientAsyncIO(db).writer( "/database/topic/path", producer_and_message_group_id="producer-id", ) @@ -81,7 +81,7 @@ async def send_messages_with_wait_ack(writer: ydb.TopicWriterAsyncIO): async def send_json_message(db: ydb.aio.Driver): - async with ydb.TopicClientAsyncIO(db).topic_writer( + async with ydb.TopicClientAsyncIO(db).writer( "/database/path/topic", serializer=json.dumps ) as writer: writer.write({"a": 123}) diff --git a/examples/topic/writer_example.py b/examples/topic/writer_example.py index 27387e11..10f7db21 100644 --- a/examples/topic/writer_example.py +++ b/examples/topic/writer_example.py @@ -13,7 +13,7 @@ async def connect(): connection_string="grpc://localhost:2135?database=/local", credentials=ydb.credentials.AnonymousCredentials(), ) - writer = ydb.TopicClientAsyncIO(db).topic_writer( + writer = ydb.TopicClientAsyncIO(db).writer( "/local/topic", producer_and_message_group_id="producer-id", ) @@ -21,7 +21,7 @@ async def connect(): def create_writer(db: ydb.Driver): - with ydb.TopicClient(db).topic_writer( + with ydb.TopicClient(db).writer( "/database/topic/path", producer_and_message_group_id="producer-id", ) as writer: @@ -29,7 +29,7 @@ def create_writer(db: ydb.Driver): def connect_and_wait(db: ydb.Driver): - with ydb.TopicClient(db).topic_writer( + with ydb.TopicClient(db).writer( "/database/topic/path", producer_and_message_group_id="producer-id", ) as writer: @@ -37,7 +37,7 @@ def connect_and_wait(db: ydb.Driver): def connect_without_context_manager(db: ydb.Driver): - writer = ydb.TopicClient(db).topic_writer( + writer = ydb.TopicClient(db).writer( "/database/topic/path", producer_and_message_group_id="producer-id", ) @@ -98,7 +98,7 @@ def send_messages_with_wait_ack(writer: ydb.TopicWriter): def send_json_message(db: ydb.Driver): - with ydb.TopicClient(db).topic_writer( + with ydb.TopicClient(db).writer( "/database/path/topic", serializer=json.dumps ) as writer: writer.write({"a": 123}) diff --git a/tests/conftest.py b/tests/conftest.py index 09c02977..e94a83dc 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -131,7 +131,7 @@ async def topic_path(driver, topic_consumer, database) -> str: @pytest.fixture() @pytest.mark.asyncio() async def topic_with_messages(driver, topic_path): - writer = driver.topic_client.topic_writer( + writer = driver.topic_client.writer( topic_path, producer_and_message_group_id="fixture-producer-id" ) await writer.write_with_ack( diff --git a/tests/topics/test_control_plane.py b/tests/topics/test_control_plane.py index 8e1d6f23..2446ddcf 100644 --- a/tests/topics/test_control_plane.py +++ b/tests/topics/test_control_plane.py @@ -27,7 +27,7 @@ async def test_drop_topic(self, driver, topic_path): await client.drop_topic(topic_path) async def test_describe_topic(self, driver, topic_path: str, topic_consumer): - res = await driver.topic_client.describe(topic_path) + res = await driver.topic_client.describe_topic(topic_path) assert res.self.name == os.path.basename(topic_path) @@ -61,8 +61,7 @@ def test_drop_topic(self, driver_sync, topic_path): client.drop_topic(topic_path) def test_describe_topic(self, driver_sync, topic_path: str, topic_consumer): - res = driver_sync.topic_client.describe(topic_path) - res.partition_count_limit + res = driver_sync.topic_client.describe_topic(topic_path) assert res.self.name == os.path.basename(topic_path) diff --git a/tests/topics/test_topic_reader.py b/tests/topics/test_topic_reader.py index 8107ac16..21675eb2 100644 --- a/tests/topics/test_topic_reader.py +++ b/tests/topics/test_topic_reader.py @@ -6,7 +6,7 @@ class TestTopicReaderAsyncIO: async def test_read_message( self, driver, topic_path, topic_with_messages, topic_consumer ): - reader = driver.topic_client.topic_reader(topic_consumer, topic_path) + reader = driver.topic_client.reader(topic_consumer, topic_path) assert await reader.receive_batch() is not None await reader.close() @@ -16,7 +16,7 @@ class TestTopicReaderSync: def test_read_message( self, driver_sync, topic_path, topic_with_messages, topic_consumer ): - reader = driver_sync.topic_client.topic_reader(topic_consumer, topic_path) + reader = driver_sync.topic_client.reader(topic_consumer, topic_path) assert reader.receive_batch() is not None reader.close() diff --git a/tests/topics/test_topic_writer.py b/tests/topics/test_topic_writer.py index 799c4d13..5ae976b8 100644 --- a/tests/topics/test_topic_writer.py +++ b/tests/topics/test_topic_writer.py @@ -6,14 +6,14 @@ @pytest.mark.asyncio class TestTopicWriterAsyncIO: async def test_send_message(self, driver: ydb.aio.Driver, topic_path): - writer = driver.topic_client.topic_writer( + writer = driver.topic_client.writer( topic_path, producer_and_message_group_id="test" ) await writer.write(ydb.TopicWriterMessage(data="123".encode())) await writer.close() async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path): - async with driver.topic_client.topic_writer( + async with driver.topic_client.writer( topic_path, producer_and_message_group_id="test", auto_seqno=False, @@ -22,7 +22,7 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path): ydb.TopicWriterMessage(data="123".encode(), seqno=5) ) - async with driver.topic_client.topic_writer( + async with driver.topic_client.writer( topic_path, producer_and_message_group_id="test", get_last_seqno=True, @@ -31,7 +31,7 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path): assert init_info.last_seqno == 5 async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path): - async with driver.topic_client.topic_writer( + async with driver.topic_client.writer( topic_path, producer_and_message_group_id="test", auto_seqno=False, @@ -43,7 +43,7 @@ async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path): ydb.TopicWriterMessage(data=f"msg-{i}", seqno=last_seqno) ) - async with driver.topic_client.topic_writer( + async with driver.topic_client.writer( topic_path, producer_and_message_group_id="test", get_last_seqno=True, @@ -54,21 +54,21 @@ async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path): class TestTopicWriterSync: def test_send_message(self, driver_sync: ydb.Driver, topic_path): - writer = driver_sync.topic_client.topic_writer( + writer = driver_sync.topic_client.writer( topic_path, producer_and_message_group_id="test" ) writer.write(ydb.TopicWriterMessage(data="123".encode())) writer.close() def test_wait_last_seqno(self, driver_sync: ydb.Driver, topic_path): - with driver_sync.topic_client.topic_writer( + with driver_sync.topic_client.writer( topic_path, producer_and_message_group_id="test", auto_seqno=False, ) as writer: writer.write_with_ack(ydb.TopicWriterMessage(data="123".encode(), seqno=5)) - with driver_sync.topic_client.topic_writer( + with driver_sync.topic_client.writer( topic_path, producer_and_message_group_id="test", get_last_seqno=True, @@ -77,7 +77,7 @@ def test_wait_last_seqno(self, driver_sync: ydb.Driver, topic_path): assert init_info.last_seqno == 5 def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path): - with driver_sync.topic_client.topic_writer( + with driver_sync.topic_client.writer( topic_path, producer_and_message_group_id="test", auto_seqno=False, @@ -87,7 +87,7 @@ def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path): last_seqno = i + 1 writer.write(ydb.TopicWriterMessage(data=f"msg-{i}", seqno=last_seqno)) - with driver_sync.topic_client.topic_writer( + with driver_sync.topic_client.writer( topic_path, producer_and_message_group_id="test", get_last_seqno=True, diff --git a/ydb/topic.py b/ydb/topic.py index 9378d100..3c70b061 100644 --- a/ydb/topic.py +++ b/ydb/topic.py @@ -92,7 +92,7 @@ async def create_topic( _wrap_operation, ) - async def describe( + async def describe_topic( self, path: str, include_stats: bool = False ) -> TopicDescription: args = locals().copy() @@ -115,7 +115,7 @@ async def drop_topic(self, path: str): _wrap_operation, ) - def topic_reader( + def reader( self, consumer: str, topic: str, @@ -139,7 +139,7 @@ def topic_reader( settings = TopicReaderSettings(**args) return TopicReaderAsyncIO(self._driver, settings) - def topic_writer( + def writer( self, topic, *, @@ -215,7 +215,9 @@ def create_topic( _wrap_operation, ) - def describe(self, path: str, include_stats: bool = False) -> TopicDescription: + def describe_topic( + self, path: str, include_stats: bool = False + ) -> TopicDescription: args = locals().copy() del args["self"] req = _ydb_topic_public_types.DescribeTopicRequestParams(**args) @@ -236,7 +238,7 @@ def drop_topic(self, path: str): _wrap_operation, ) - def topic_reader( + def reader( self, consumer: str, topic: str, @@ -260,7 +262,7 @@ def topic_reader( settings = TopicReaderSettings(**args) return TopicReader(self._driver, settings) - def topic_writer( + def writer( self, topic, producer_and_message_group_id: str,