Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pytest
import ydb
import time
import subprocess


@pytest.fixture(autouse=True, scope="session")
Expand Down Expand Up @@ -96,3 +97,21 @@ async def driver(endpoint, database, event_loop):
yield driver

await driver.stop(timeout=10)


@pytest.fixture()
def topic_path(endpoint) -> str:
subprocess.run(
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic drop /local/test-topic"""
% endpoint,
shell=True,
)
res = subprocess.run(
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic create /local/test-topic"""
% endpoint,
shell=True,
capture_output=True,
)
assert res.returncode == 0, res.stderr + res.stdout

return "/local/test-topic"
30 changes: 30 additions & 0 deletions tests/topics/test_topic_writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest

import ydb.aio


@pytest.mark.asyncio
class TestTopicWriterAsyncIO:
async def test_send_message(self, driver: ydb.aio.Driver, topic_path):
writer = driver.topic_client.topic_writer(
topic_path, producer_and_message_group_id="test"
)
writer.write(ydb.TopicWriterMessage(data="123".encode()))

async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
async with driver.topic_client.topic_writer(
topic_path,
producer_and_message_group_id="test",
auto_seqno=False,
) as writer:
await writer.write_with_ack(
ydb.TopicWriterMessage(data="123".encode(), seqno=5)
)

async with driver.topic_client.topic_writer(
topic_path,
producer_and_message_group_id="test",
get_last_seqno=True,
) as writer2:
init_info = await writer2.wait_init()
assert init_info.last_seqno == 5
3 changes: 3 additions & 0 deletions ydb/_topic_writer/topic_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ class Skipped:
pass


PublicWriteResultTypes = Union[PublicWriteResult.Written, PublicWriteResult.Skipped]


class WriterSettings(PublicWriterSettings):
def __init__(self, settings: PublicWriterSettings):
self.__dict__ = settings.__dict__.copy()
Expand Down
12 changes: 6 additions & 6 deletions ydb/_topic_writer/topic_writer_asyncio.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import datetime
import threading
from collections import deque
from typing import Deque, AsyncIterator, Union, List, Optional, Callable

Expand All @@ -9,13 +8,13 @@
PublicWriterSettings,
WriterSettings,
Writer,
PublicWriteResult,
PublicMessage,
PublicWriterInitInfo,
InternalMessage,
TopicWriterStopped,
TopicWriterError,
messages_to_proto_requests,
PublicWriteResultTypes,
)
from .. import (
_apis,
Expand All @@ -35,21 +34,22 @@
class WriterAsyncIO:
_loop: asyncio.AbstractEventLoop
_reconnector: "WriterAsyncIOReconnector"
_lock: threading.Lock
_lock: asyncio.Lock
_closed: bool

@property
def last_seqno(self) -> int:
raise NotImplementedError()

def __init__(self, driver: SupportedDriverType, settings: PublicWriterSettings):
self._lock = asyncio.Lock()
self._loop = asyncio.get_running_loop()
self._closed = False
self._reconnector = WriterAsyncIOReconnector(
driver=driver, settings=WriterSettings(settings)
)

async def __aenter__(self):
async def __aenter__(self) -> "WriterAsyncIO":
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
Expand All @@ -62,7 +62,7 @@ def __del__(self):
self._loop.call_soon(self.close)

async def close(self):
with self._lock:
async with self._lock:
if self._closed:
return
self._closed = True
Expand All @@ -73,7 +73,7 @@ async def write_with_ack(
self,
messages: Union[Writer.MessageType, List[Writer.MessageType]],
*args: Optional[Writer.MessageType],
) -> Union[PublicWriteResult, List[PublicWriteResult]]:
) -> Union[PublicWriteResultTypes, List[PublicWriteResultTypes]]:
"""
IT IS SLOWLY WAY. IT IS BAD CHOISE IN MOST CASES.
It is recommended to use write with optionally flush or write_with_ack_futures and receive acks by wait futures.
Expand Down