Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
* BROKEN CHANGES: change names of public method in topic client
* BROKEN CHANGES: rename parameter producer_and_message_group_id to producer_id
* producer_id is optional now

## 3.0.1b5 ##
* Remove six package from code and dependencies (remove support python2)
Expand Down
6 changes: 3 additions & 3 deletions examples/topic/writer_async_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,23 +10,23 @@
async def create_writer(db: ydb.aio.Driver):
async with ydb.TopicClientAsyncIO(db).writer(
"/database/topic/path",
producer_and_message_group_id="producer-id",
producer_id="producer-id",
) as writer:
await writer.write(TopicWriterMessage("asd"))


async def connect_and_wait(db: ydb.aio.Driver):
async with ydb.TopicClientAsyncIO(db).writer(
"/database/topic/path",
producer_and_message_group_id="producer-id",
producer_id="producer-id",
) as writer:
writer.wait_init()


async def connect_without_context_manager(db: ydb.aio.Driver):
writer = ydb.TopicClientAsyncIO(db).writer(
"/database/topic/path",
producer_and_message_group_id="producer-id",
producer_id="producer-id",
)
try:
pass # some code
Expand Down
8 changes: 4 additions & 4 deletions examples/topic/writer_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,31 @@ async def connect():
)
writer = ydb.TopicClientAsyncIO(db).writer(
"/local/topic",
producer_and_message_group_id="producer-id",
producer_id="producer-id",
)
await writer.write(TopicWriterMessage("asd"))


def create_writer(db: ydb.Driver):
with ydb.TopicClient(db).writer(
"/database/topic/path",
producer_and_message_group_id="producer-id",
producer_id="producer-id",
) as writer:
writer.write(TopicWriterMessage("asd"))


def connect_and_wait(db: ydb.Driver):
with ydb.TopicClient(db).writer(
"/database/topic/path",
producer_and_message_group_id="producer-id",
producer_id="producer-id",
) as writer:
writer.wait()


def connect_without_context_manager(db: ydb.Driver):
writer = ydb.TopicClient(db).writer(
"/database/topic/path",
producer_and_message_group_id="producer-id",
producer_id="producer-id",
)
try:
pass # some code
Expand Down
19 changes: 16 additions & 3 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,24 @@ async def topic_path(driver, topic_consumer, database) -> str:
@pytest.fixture()
@pytest.mark.asyncio()
async def topic_with_messages(driver, topic_path):
writer = driver.topic_client.writer(
topic_path, producer_and_message_group_id="fixture-producer-id"
)
writer = driver.topic_client.writer(topic_path, producer_id="fixture-producer-id")
await writer.write_with_ack(
ydb.TopicWriterMessage(data="123".encode()),
ydb.TopicWriterMessage(data="456".encode()),
)
await writer.close()


@pytest.fixture()
@pytest.mark.asyncio()
async def topic_reader(driver, topic_consumer, topic_path) -> ydb.TopicReaderAsyncIO:
reader = driver.topic_client.reader(topic=topic_path, consumer=topic_consumer)
yield reader
await reader.close()


@pytest.fixture()
def topic_reader_sync(driver_sync, topic_consumer, topic_path) -> ydb.TopicReader:
reader = driver_sync.topic_client.reader(topic=topic_path, consumer=topic_consumer)
yield reader
reader.close()
57 changes: 39 additions & 18 deletions tests/topics/test_topic_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,14 @@
@pytest.mark.asyncio
class TestTopicWriterAsyncIO:
async def test_send_message(self, driver: ydb.aio.Driver, topic_path):
writer = driver.topic_client.writer(
topic_path, producer_and_message_group_id="test"
)
writer = driver.topic_client.writer(topic_path, producer_id="test")
await writer.write(ydb.TopicWriterMessage(data="123".encode()))
await writer.close()

async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
async with driver.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
producer_id="test",
auto_seqno=False,
) as writer:
await writer.write_with_ack(
Expand All @@ -24,16 +22,28 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):

async with driver.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
get_last_seqno=True,
producer_id="test",
) as writer2:
init_info = await writer2.wait_init()
assert init_info.last_seqno == 5

async def test_random_producer_id(
self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO
):
async with driver.topic_client.writer(topic_path) as writer:
await writer.write(ydb.TopicWriterMessage(data="123".encode()))
async with driver.topic_client.writer(topic_path) as writer:
await writer.write(ydb.TopicWriterMessage(data="123".encode()))

batch1 = await topic_reader.receive_batch()
batch2 = await topic_reader.receive_batch()

assert batch1.messages[0].producer_id != batch2.messages[0].producer_id

async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path):
async with driver.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
producer_id="test",
auto_seqno=False,
) as writer:
last_seqno = 0
Expand All @@ -45,41 +55,37 @@ async def test_auto_flush_on_close(self, driver: ydb.aio.Driver, topic_path):

async with driver.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
get_last_seqno=True,
producer_id="test",
) as writer:
init_info = await writer.wait_init()
assert init_info.last_seqno == last_seqno


class TestTopicWriterSync:
def test_send_message(self, driver_sync: ydb.Driver, topic_path):
writer = driver_sync.topic_client.writer(
topic_path, producer_and_message_group_id="test"
)
writer = driver_sync.topic_client.writer(topic_path, producer_id="test")
writer.write(ydb.TopicWriterMessage(data="123".encode()))
writer.close()

def test_wait_last_seqno(self, driver_sync: ydb.Driver, topic_path):
with driver_sync.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
producer_id="test",
auto_seqno=False,
) as writer:
writer.write_with_ack(ydb.TopicWriterMessage(data="123".encode(), seqno=5))

with driver_sync.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
get_last_seqno=True,
producer_id="test",
) as writer2:
init_info = writer2.wait_init()
assert init_info.last_seqno == 5

def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path):
with driver_sync.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
producer_id="test",
auto_seqno=False,
) as writer:
last_seqno = 0
Expand All @@ -89,8 +95,23 @@ def test_auto_flush_on_close(self, driver_sync: ydb.Driver, topic_path):

with driver_sync.topic_client.writer(
topic_path,
producer_and_message_group_id="test",
get_last_seqno=True,
producer_id="test",
) as writer:
init_info = writer.wait_init()
assert init_info.last_seqno == last_seqno

def test_random_producer_id(
self,
driver_sync: ydb.aio.Driver,
topic_path,
topic_reader_sync: ydb.TopicReader,
):
with driver_sync.topic_client.writer(topic_path) as writer:
writer.write(ydb.TopicWriterMessage(data="123".encode()))
with driver_sync.topic_client.writer(topic_path) as writer:
writer.write(ydb.TopicWriterMessage(data="123".encode()))

batch1 = topic_reader_sync.receive_batch()
batch2 = topic_reader_sync.receive_batch()

assert batch1.messages[0].producer_id != batch2.messages[0].producer_id
41 changes: 25 additions & 16 deletions ydb/_topic_writer/topic_writer.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import datetime
import enum
import uuid
from dataclasses import dataclass
from enum import Enum
from typing import List, Union, TextIO, BinaryIO, Optional, Callable, Mapping, Any, Dict
from typing import List, Union, TextIO, BinaryIO, Optional, Any, Dict

import typing

Expand All @@ -16,21 +17,31 @@

@dataclass
class PublicWriterSettings:
"""
Settings for topic writer.

order of fields IS NOT stable, use keywords only
"""

topic: str
producer_and_message_group_id: str
producer_id: Optional[str] = None
session_metadata: Optional[Dict[str, str]] = None
encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
serializer: Union[Callable[[Any], bytes], None] = None
send_buffer_count: Optional[int] = 10000
send_buffer_bytes: Optional[int] = 100 * 1024 * 1024
partition_id: Optional[int] = None
codec: Optional[int] = None
codec_autoselect: bool = True
auto_seqno: bool = True
auto_created_at: bool = True
get_last_seqno: bool = False
retry_policy: Optional["RetryPolicy"] = None
update_token_interval: Union[int, float] = 3600
# get_last_seqno: bool = False
# encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None
# serializer: Union[Callable[[Any], bytes], None] = None
# send_buffer_count: Optional[int] = 10000
# send_buffer_bytes: Optional[int] = 100 * 1024 * 1024
# codec: Optional[int] = None
# codec_autoselect: bool = True
# retry_policy: Optional["RetryPolicy"] = None
# update_token_interval: Union[int, float] = 3600

def __post_init__(self):
if self.producer_id is None:
self.producer_id = uuid.uuid4().hex


@dataclass
Expand All @@ -55,18 +66,16 @@ def __init__(self, settings: PublicWriterSettings):
def create_init_request(self) -> StreamWriteMessage.InitRequest:
return StreamWriteMessage.InitRequest(
path=self.topic,
producer_id=self.producer_and_message_group_id,
producer_id=self.producer_id,
write_session_meta=self.session_metadata,
partitioning=self.get_partitioning(),
get_last_seq_no=self.get_last_seqno,
get_last_seq_no=True,
)

def get_partitioning(self) -> StreamWriteMessage.PartitioningType:
if self.partition_id is not None:
return StreamWriteMessage.PartitioningPartitionID(self.partition_id)
return StreamWriteMessage.PartitioningMessageGroupID(
self.producer_and_message_group_id
)
return StreamWriteMessage.PartitioningMessageGroupID(self.producer_id)


class SendMode(Enum):
Expand Down
4 changes: 2 additions & 2 deletions ydb/_topic_writer/topic_writer_asyncio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def default_settings(self) -> WriterSettings:
return WriterSettings(
PublicWriterSettings(
topic="/local/topic",
producer_and_message_group_id="test-producer",
producer_id="test-producer",
auto_seqno=False,
auto_created_at=False,
)
Expand Down Expand Up @@ -487,7 +487,7 @@ async def close(self):
def default_settings(self) -> PublicWriterSettings:
return PublicWriterSettings(
topic="/local/topic",
producer_and_message_group_id="producer-id",
producer_id="producer-id",
)

@pytest.fixture(autouse=True)
Expand Down
23 changes: 4 additions & 19 deletions ydb/topic.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import datetime
from typing import List, Callable, Union, Mapping, Any, Optional, Dict
from typing import List, Union, Mapping, Optional, Dict

from . import aio, Credentials, _apis

Expand Down Expand Up @@ -143,19 +143,11 @@ def writer(
self,
topic,
*,
producer_and_message_group_id: str,
producer_id: Optional[str] = None, # default - random
session_metadata: Mapping[str, str] = None,
encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
serializer: Union[Callable[[Any], bytes], None] = None,
send_buffer_count: Union[int, None] = 10000,
send_buffer_bytes: Union[int, None] = 100 * 1024 * 1024,
partition_id: Union[int, None] = None,
codec: Union[int, None] = None,
codec_autoselect: bool = True,
auto_seqno: bool = True,
auto_created_at: bool = True,
get_last_seqno: bool = False,
retry_policy: Union["TopicWriterRetryPolicy", None] = None,
) -> TopicWriterAsyncIO:
args = locals()
del args["self"]
Expand Down Expand Up @@ -265,19 +257,12 @@ def reader(
def writer(
self,
topic,
producer_and_message_group_id: str,
*,
producer_id: Optional[str] = None, # default - random
session_metadata: Mapping[str, str] = None,
encoders: Union[Mapping[int, Callable[[bytes], bytes]], None] = None,
serializer: Union[Callable[[Any], bytes], None] = None,
send_buffer_count: Union[int, None] = 10000,
send_buffer_bytes: Union[int, None] = 100 * 1024 * 1024,
partition_id: Union[int, None] = None,
codec: Union[int, None] = None,
codec_autoselect: bool = True,
auto_seqno: bool = True,
auto_created_at: bool = True,
get_last_seqno: bool = False,
retry_policy: Union["TopicWriterRetryPolicy", None] = None,
) -> TopicWriter:
args = locals()
del args["self"]
Expand Down