Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* BROKEN CHANGES: change names of public method in topic client

## 3.0.1b5 ##
* Remove six package from code and dependencies (remove support python2)
* Use anonymous credentials by default instead of iam metadata (use ydb.driver.credentials_from_env_variables for creds by env var)
Expand Down
14 changes: 6 additions & 8 deletions examples/topic/reader_async_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,12 @@ async def connect():
connection_string="grpc://localhost:2135?database=/local",
credentials=ydb.credentials.AnonymousCredentials(),
)
reader = ydb.TopicClientAsyncIO(db).topic_reader(
"/local/topic", consumer="consumer"
)
reader = ydb.TopicClientAsyncIO(db).reader("/local/topic", consumer="consumer")
return reader


async def create_reader_and_close_with_context_manager(db: ydb.aio.Driver):
with ydb.TopicClientAsyncIO(db).topic_reader(
with ydb.TopicClientAsyncIO(db).reader(
"/database/topic/path", consumer="consumer"
) as reader:
async for message in reader.messages():
Expand Down Expand Up @@ -91,7 +89,7 @@ async def get_one_batch_from_external_loop_async(reader: ydb.TopicReaderAsyncIO)
async def auto_deserialize_message(db: ydb.aio.Driver):
# async, batch work similar to this

async with ydb.TopicClientAsyncIO(db).topic_reader(
async with ydb.TopicClientAsyncIO(db).reader(
"/database/topic/path", consumer="asd", deserializer=json.loads
) as reader:
async for message in reader.messages():
Expand Down Expand Up @@ -133,7 +131,7 @@ def process_batch(batch):


async def connect_and_read_few_topics(db: ydb.aio.Driver):
with ydb.TopicClientAsyncIO(db).topic_reader(
with ydb.TopicClientAsyncIO(db).reader(
[
"/database/topic/path",
ydb.TopicSelector("/database/second-topic", partitions=3),
Expand All @@ -156,7 +154,7 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
print(event.topic)
print(event.offset)

async with ydb.TopicClientAsyncIO(db).topic_reader(
async with ydb.TopicClientAsyncIO(db).reader(
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
) as reader:
async for message in reader.messages():
Expand All @@ -173,7 +171,7 @@ async def on_get_partition_start_offset(
resp.start_offset = 123
return resp

async with ydb.TopicClient(db).topic_reader(
async with ydb.TopicClient(db).reader(
"/local/test",
consumer="consumer",
on_get_partition_start_offset=on_get_partition_start_offset,
Expand Down
12 changes: 6 additions & 6 deletions examples/topic/reader_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ def connect():
connection_string="grpc://localhost:2135?database=/local",
credentials=ydb.credentials.AnonymousCredentials(),
)
reader = ydb.TopicClient(db).topic_reader("/local/topic", consumer="consumer")
reader = ydb.TopicClient(db).reader("/local/topic", consumer="consumer")
return reader


def create_reader_and_close_with_context_manager(db: ydb.Driver):
with ydb.TopicClient(db).topic_reader(
with ydb.TopicClient(db).reader(
"/database/topic/path", consumer="consumer", buffer_size_bytes=123
) as reader:
for message in reader:
Expand Down Expand Up @@ -81,7 +81,7 @@ def get_one_batch_from_external_loop(reader: ydb.TopicReader):
def auto_deserialize_message(db: ydb.Driver):
# async, batch work similar to this

reader = ydb.TopicClient(db).topic_reader(
reader = ydb.TopicClient(db).reader(
"/database/topic/path", consumer="asd", deserializer=json.loads
)
for message in reader.messages():
Expand Down Expand Up @@ -123,7 +123,7 @@ def process_batch(batch):


def connect_and_read_few_topics(db: ydb.Driver):
with ydb.TopicClient(db).topic_reader(
with ydb.TopicClient(db).reader(
[
"/database/topic/path",
ydb.TopicSelector("/database/second-topic", partitions=3),
Expand All @@ -146,7 +146,7 @@ def on_commit(event: ydb.TopicReaderEvents.OnCommit) -> None:
print(event.topic)
print(event.offset)

with ydb.TopicClient(db).topic_reader(
with ydb.TopicClient(db).reader(
"/local", consumer="consumer", commit_batch_time=4, on_commit=on_commit
) as reader:
for message in reader:
Expand All @@ -164,7 +164,7 @@ def on_get_partition_start_offset(
resp.start_offset = 123
return resp

with ydb.TopicClient(db).topic_reader(
with ydb.TopicClient(db).reader(
"/local/test",
consumer="consumer",
on_get_partition_start_offset=on_get_partition_start_offset,
Expand Down
8 changes: 4 additions & 4 deletions examples/topic/writer_async_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@


async def create_writer(db: ydb.aio.Driver):
async with ydb.TopicClientAsyncIO(db).topic_writer(
async with ydb.TopicClientAsyncIO(db).writer(
"/database/topic/path",
producer_and_message_group_id="producer-id",
) as writer:
await writer.write(TopicWriterMessage("asd"))


async def connect_and_wait(db: ydb.aio.Driver):
async with ydb.TopicClientAsyncIO(db).topic_writer(
async with ydb.TopicClientAsyncIO(db).writer(
"/database/topic/path",
producer_and_message_group_id="producer-id",
) as writer:
writer.wait_init()


async def connect_without_context_manager(db: ydb.aio.Driver):
writer = ydb.TopicClientAsyncIO(db).topic_writer(
writer = ydb.TopicClientAsyncIO(db).writer(
"/database/topic/path",
producer_and_message_group_id="producer-id",
)
Expand Down Expand Up @@ -81,7 +81,7 @@ async def send_messages_with_wait_ack(writer: ydb.TopicWriterAsyncIO):


async def send_json_message(db: ydb.aio.Driver):
async with ydb.TopicClientAsyncIO(db).topic_writer(
async with ydb.TopicClientAsyncIO(db).writer(
"/database/path/topic", serializer=json.dumps
) as writer:
writer.write({"a": 123})
Expand Down
10 changes: 5 additions & 5 deletions examples/topic/writer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,31 +13,31 @@ async def connect():
connection_string="grpc://localhost:2135?database=/local",
credentials=ydb.credentials.AnonymousCredentials(),
)
writer = ydb.TopicClientAsyncIO(db).topic_writer(
writer = ydb.TopicClientAsyncIO(db).writer(
"/local/topic",
producer_and_message_group_id="producer-id",
)
await writer.write(TopicWriterMessage("asd"))


def create_writer(db: ydb.Driver):
with ydb.TopicClient(db).topic_writer(
with ydb.TopicClient(db).writer(
"/database/topic/path",
producer_and_message_group_id="producer-id",
) as writer:
writer.write(TopicWriterMessage("asd"))


def connect_and_wait(db: ydb.Driver):
with ydb.TopicClient(db).topic_writer(
with ydb.TopicClient(db).writer(
"/database/topic/path",
producer_and_message_group_id="producer-id",
) as writer:
writer.wait()


def connect_without_context_manager(db: ydb.Driver):
writer = ydb.TopicClient(db).topic_writer(
writer = ydb.TopicClient(db).writer(
"/database/topic/path",
producer_and_message_group_id="producer-id",
)
Expand Down Expand Up @@ -98,7 +98,7 @@ def send_messages_with_wait_ack(writer: ydb.TopicWriter):


def send_json_message(db: ydb.Driver):
with ydb.TopicClient(db).topic_writer(
with ydb.TopicClient(db).writer(
"/database/path/topic", serializer=json.dumps
) as writer:
writer.write({"a": 123})
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ async def topic_path(driver, topic_consumer, database) -> str:
@pytest.fixture()
@pytest.mark.asyncio()
async def topic_with_messages(driver, topic_path):
writer = driver.topic_client.topic_writer(
writer = driver.topic_client.writer(
topic_path, producer_and_message_group_id="fixture-producer-id"
)
await writer.write_with_ack(
Expand Down
5 changes: 2 additions & 3 deletions tests/topics/test_control_plane.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def test_drop_topic(self, driver, topic_path):
await client.drop_topic(topic_path)

async def test_describe_topic(self, driver, topic_path: str, topic_consumer):
res = await driver.topic_client.describe(topic_path)
res = await driver.topic_client.describe_topic(topic_path)

assert res.self.name == os.path.basename(topic_path)

Expand Down Expand Up @@ -61,8 +61,7 @@ def test_drop_topic(self, driver_sync, topic_path):
client.drop_topic(topic_path)

def test_describe_topic(self, driver_sync, topic_path: str, topic_consumer):
res = driver_sync.topic_client.describe(topic_path)
res.partition_count_limit
res = driver_sync.topic_client.describe_topic(topic_path)

assert res.self.name == os.path.basename(topic_path)

Expand Down
4 changes: 2 additions & 2 deletions tests/topics/test_topic_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ class TestTopicReaderAsyncIO:
async def test_read_message(
self, driver, topic_path, topic_with_messages, topic_consumer
):
reader = driver.topic_client.topic_reader(topic_consumer, topic_path)
reader = driver.topic_client.reader(topic_consumer, topic_path)

assert await reader.receive_batch() is not None
await reader.close()
Expand All @@ -16,7 +16,7 @@ class TestTopicReaderSync:
def test_read_message(
self, driver_sync, topic_path, topic_with_messages, topic_consumer
):
reader = driver_sync.topic_client.topic_reader(topic_consumer, topic_path)
reader = driver_sync.topic_client.reader(topic_consumer, topic_path)

assert reader.receive_batch() is not None
reader.close()
20 changes: 10 additions & 10 deletions tests/topics/test_topic_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
@pytest.mark.asyncio
class TestTopicWriterAsyncIO:
async def test_send_message(self, driver: ydb.aio.Driver, topic_path):
writer = driver.topic_client.topic_writer(
writer = driver.topic_client.writer(
topic_path, producer_and_message_group_id="test"
)
await writer.write(ydb.TopicWriterMessage(data="123".encode()))
await writer.close()

async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
async with driver.topic_client.topic_writer(
async with driver.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
auto_seqno=False,
Expand All @@ -22,7 +22,7 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
ydb.TopicWriterMessage(data="123".encode(), seqno=5)
)

async with driver.topic_client.topic_writer(
async with driver.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
get_last_seqno=True,
Expand All @@ -31,7 +31,7 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
assert init_info.last_seqno == 5

async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path):
async with driver.topic_client.topic_writer(
async with driver.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
auto_seqno=False,
Expand All @@ -43,7 +43,7 @@ async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path):
ydb.TopicWriterMessage(data=f"msg-{i}", seqno=last_seqno)
)

async with driver.topic_client.topic_writer(
async with driver.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
get_last_seqno=True,
Expand All @@ -54,21 +54,21 @@ async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path):

class TestTopicWriterSync:
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
writer = driver_sync.topic_client.topic_writer(
writer = driver_sync.topic_client.writer(
topic_path, producer_and_message_group_id="test"
)
writer.write(ydb.TopicWriterMessage(data="123".encode()))
writer.close()

def test_wait_last_seqno(self, driver_sync: ydb.Driver, topic_path):
with driver_sync.topic_client.topic_writer(
with driver_sync.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
auto_seqno=False,
) as writer:
writer.write_with_ack(ydb.TopicWriterMessage(data="123".encode(), seqno=5))

with driver_sync.topic_client.topic_writer(
with driver_sync.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
get_last_seqno=True,
Expand All @@ -77,7 +77,7 @@ def test_wait_last_seqno(self, driver_sync: ydb.Driver, topic_path):
assert init_info.last_seqno == 5

def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path):
with driver_sync.topic_client.topic_writer(
with driver_sync.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
auto_seqno=False,
Expand All @@ -87,7 +87,7 @@ def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path):
last_seqno = i + 1
writer.write(ydb.TopicWriterMessage(data=f"msg-{i}", seqno=last_seqno))

with driver_sync.topic_client.topic_writer(
with driver_sync.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
get_last_seqno=True,
Expand Down
14 changes: 8 additions & 6 deletions ydb/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ async def create_topic(
_wrap_operation,
)

async def describe(
async def describe_topic(
self, path: str, include_stats: bool = False
) -> TopicDescription:
args = locals().copy()
Expand All @@ -115,7 +115,7 @@ async def drop_topic(self, path: str):
_wrap_operation,
)

def topic_reader(
def reader(
self,
consumer: str,
topic: str,
Expand All @@ -139,7 +139,7 @@ def topic_reader(
settings = TopicReaderSettings(**args)
return TopicReaderAsyncIO(self._driver, settings)

def topic_writer(
def writer(
self,
topic,
*,
Expand Down Expand Up @@ -215,7 +215,9 @@ def create_topic(
_wrap_operation,
)

def describe(self, path: str, include_stats: bool = False) -> TopicDescription:
def describe_topic(
self, path: str, include_stats: bool = False
) -> TopicDescription:
args = locals().copy()
del args["self"]
req = _ydb_topic_public_types.DescribeTopicRequestParams(**args)
Expand All @@ -236,7 +238,7 @@ def drop_topic(self, path: str):
_wrap_operation,
)

def topic_reader(
def reader(
self,
consumer: str,
topic: str,
Expand All @@ -260,7 +262,7 @@ def topic_reader(
settings = TopicReaderSettings(**args)
return TopicReader(self._driver, settings)

def topic_writer(
def writer(
self,
topic,
producer_and_message_group_id: str,
Expand Down