Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added tests/coordination/__init__.py
Empty file.
95 changes: 95 additions & 0 deletions tests/coordination/test_coordination_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import pytest

import ydb
from ydb import aio

from ydb.coordination import (
NodeConfig,
ConsistencyMode,
RateLimiterCountersMode,
CoordinationClient,
)


class TestCoordination:
def test_coordination_node_lifecycle(self, driver_sync: ydb.Driver):
client = CoordinationClient(driver_sync)
node_path = "/local/test_node_lifecycle"

try:
client.delete_node(node_path)
except ydb.SchemeError:
pass

with pytest.raises(ydb.SchemeError):
client.describe_node(node_path)

initial_config = NodeConfig(
session_grace_period_millis=1000,
attach_consistency_mode=ConsistencyMode.STRICT,
read_consistency_mode=ConsistencyMode.STRICT,
rate_limiter_counters_mode=RateLimiterCountersMode.UNSET,
self_check_period_millis=0,
)
client.create_node(node_path, initial_config)

node_conf = client.describe_node(node_path)
assert node_conf == initial_config

updated_config = NodeConfig(
session_grace_period_millis=12345,
attach_consistency_mode=ConsistencyMode.STRICT,
read_consistency_mode=ConsistencyMode.RELAXED,
rate_limiter_counters_mode=RateLimiterCountersMode.DETAILED,
self_check_period_millis=10,
)
client.alter_node(node_path, updated_config)

node_conf = client.describe_node(node_path)
assert node_conf == updated_config

client.delete_node(node_path)

with pytest.raises(ydb.SchemeError):
client.describe_node(node_path)

async def test_coordination_node_lifecycle_async(self, aio_connection):
client = aio.CoordinationClient(aio_connection)
node_path = "/local/test_node_lifecycle"

try:
await client.delete_node(node_path)
except ydb.SchemeError:
pass

with pytest.raises(ydb.SchemeError):
await client.describe_node(node_path)

initial_config = NodeConfig(
session_grace_period_millis=1000,
attach_consistency_mode=ConsistencyMode.STRICT,
read_consistency_mode=ConsistencyMode.STRICT,
rate_limiter_counters_mode=RateLimiterCountersMode.UNSET,
self_check_period_millis=0,
)
await client.create_node(node_path, initial_config)

node_conf = await client.describe_node(node_path)
assert node_conf == initial_config

updated_config = NodeConfig(
session_grace_period_millis=12345,
attach_consistency_mode=ConsistencyMode.STRICT,
read_consistency_mode=ConsistencyMode.RELAXED,
rate_limiter_counters_mode=RateLimiterCountersMode.DETAILED,
self_check_period_millis=10,
)
await client.alter_node(node_path, updated_config)

node_conf = await client.describe_node(node_path)
assert node_conf == updated_config

await client.delete_node(node_path)

with pytest.raises(ydb.SchemeError):
await client.describe_node(node_path)
15 changes: 15 additions & 0 deletions ydb/_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ydb_operation_v1_pb2_grpc,
ydb_topic_v1_pb2_grpc,
ydb_query_v1_pb2_grpc,
ydb_coordination_v1_pb2_grpc,
)

from ._grpc.v4.protos import (
Expand All @@ -22,6 +23,7 @@
ydb_operation_pb2,
ydb_common_pb2,
ydb_query_pb2,
ydb_coordination_pb2,
)

else:
Expand All @@ -33,6 +35,7 @@
ydb_operation_v1_pb2_grpc,
ydb_topic_v1_pb2_grpc,
ydb_query_v1_pb2_grpc,
ydb_coordination_v1_pb2_grpc,
)

from ._grpc.common.protos import (
Expand All @@ -44,6 +47,7 @@
ydb_operation_pb2,
ydb_common_pb2,
ydb_query_pb2,
ydb_coordination_pb2,
)


Expand All @@ -56,6 +60,7 @@
ydb_discovery = ydb_discovery_pb2
ydb_operation = ydb_operation_pb2
ydb_query = ydb_query_pb2
ydb_coordination = ydb_coordination_pb2


class CmsService(object):
Expand Down Expand Up @@ -134,3 +139,13 @@ class QueryService(object):
ExecuteQuery = "ExecuteQuery"
ExecuteScript = "ExecuteScript"
FetchScriptResults = "FetchScriptResults"


class CoordinationService(object):
Stub = ydb_coordination_v1_pb2_grpc.CoordinationServiceStub

Session = "Session"
CreateNode = "CreateNode"
AlterNode = "AlterNode"
DropNode = "DropNode"
DescribeNode = "DescribeNode"
57 changes: 57 additions & 0 deletions ydb/_grpc/grpcwrapper/ydb_coordination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import typing
from dataclasses import dataclass

from .ydb_coordination_public_types import NodeConfig

if typing.TYPE_CHECKING:
from ..v4.protos import ydb_coordination_pb2
else:
from ..common.protos import ydb_coordination_pb2

from .common_utils import IToProto


@dataclass
class CreateNodeRequest(IToProto):
path: str
config: typing.Optional[NodeConfig]

def to_proto(self) -> ydb_coordination_pb2.CreateNodeRequest:
cfg_proto = self.config.to_proto() if self.config else None
return ydb_coordination_pb2.CreateNodeRequest(
path=self.path,
config=cfg_proto,
)


@dataclass
class AlterNodeRequest(IToProto):
path: str
config: NodeConfig

def to_proto(self) -> ydb_coordination_pb2.AlterNodeRequest:
cfg_proto = self.config.to_proto() if self.config else None
return ydb_coordination_pb2.AlterNodeRequest(
path=self.path,
config=cfg_proto,
)


@dataclass
class DescribeNodeRequest(IToProto):
path: str

def to_proto(self) -> ydb_coordination_pb2.DescribeNodeRequest:
return ydb_coordination_pb2.DescribeNodeRequest(
path=self.path,
)


@dataclass
class DropNodeRequest(IToProto):
path: str

def to_proto(self) -> ydb_coordination_pb2.DropNodeRequest:
return ydb_coordination_pb2.DropNodeRequest(
path=self.path,
)
57 changes: 57 additions & 0 deletions ydb/_grpc/grpcwrapper/ydb_coordination_public_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from dataclasses import dataclass
from enum import IntEnum
import typing


if typing.TYPE_CHECKING:
from ..v4.protos import ydb_coordination_pb2
else:
from ..common.protos import ydb_coordination_pb2


class ConsistencyMode(IntEnum):
UNSET = ydb_coordination_pb2.ConsistencyMode.CONSISTENCY_MODE_UNSET
STRICT = ydb_coordination_pb2.ConsistencyMode.CONSISTENCY_MODE_STRICT
RELAXED = ydb_coordination_pb2.ConsistencyMode.CONSISTENCY_MODE_RELAXED


class RateLimiterCountersMode(IntEnum):
UNSET = ydb_coordination_pb2.RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_UNSET
AGGREGATED = ydb_coordination_pb2.RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_AGGREGATED
DETAILED = ydb_coordination_pb2.RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_DETAILED


@dataclass
class NodeConfig:
attach_consistency_mode: ConsistencyMode
rate_limiter_counters_mode: RateLimiterCountersMode
read_consistency_mode: ConsistencyMode
self_check_period_millis: int
session_grace_period_millis: int

@staticmethod
def from_proto(msg: ydb_coordination_pb2.Config) -> "NodeConfig":
return NodeConfig(
attach_consistency_mode=msg.attach_consistency_mode,
rate_limiter_counters_mode=msg.rate_limiter_counters_mode,
read_consistency_mode=msg.read_consistency_mode,
self_check_period_millis=msg.self_check_period_millis,
session_grace_period_millis=msg.session_grace_period_millis,
)

def to_proto(self) -> ydb_coordination_pb2.Config:
return ydb_coordination_pb2.Config(
attach_consistency_mode=self.attach_consistency_mode,
rate_limiter_counters_mode=self.rate_limiter_counters_mode,
read_consistency_mode=self.read_consistency_mode,
self_check_period_millis=self.self_check_period_millis,
session_grace_period_millis=self.session_grace_period_millis,
)


class DescribeResult:
@staticmethod
def from_proto(msg: ydb_coordination_pb2.DescribeNodeResponse) -> "NodeConfig":
result = ydb_coordination_pb2.DescribeNodeResult()
msg.operation.result.Unpack(result)
return NodeConfig.from_proto(result.config)
1 change: 1 addition & 0 deletions ydb/aio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .driver import Driver # noqa
from .table import SessionPool, retry_operation # noqa
from .query import QuerySessionPool, QuerySession, QueryTxContext # noqa
from .coordination_client import CoordinationClient # noqa
39 changes: 39 additions & 0 deletions ydb/aio/coordination_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from typing import Optional

from ydb._grpc.grpcwrapper.ydb_coordination import (
CreateNodeRequest,
DescribeNodeRequest,
AlterNodeRequest,
DropNodeRequest,
)
from ydb._grpc.grpcwrapper.ydb_coordination_public_types import NodeConfig
from ydb.coordination.base_coordination_client import BaseCoordinationClient


class CoordinationClient(BaseCoordinationClient):
async def create_node(self, path: str, config: Optional[NodeConfig] = None, settings=None):
return await self._call_create(
CreateNodeRequest(path=path, config=config).to_proto(),
settings=settings,
)

async def describe_node(self, path: str, settings=None) -> NodeConfig:
return await self._call_describe(
DescribeNodeRequest(path=path).to_proto(),
settings=settings,
)

async def alter_node(self, path: str, new_config: NodeConfig, settings=None):
return await self._call_alter(
AlterNodeRequest(path=path, config=new_config).to_proto(),
settings=settings,
)

async def delete_node(self, path: str, settings=None):
return await self._call_delete(
DropNodeRequest(path=path).to_proto(),
settings=settings,
)

async def lock(self):
raise NotImplementedError("Will be implemented in future release")
10 changes: 10 additions & 0 deletions ydb/coordination/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from .coordination_client import CoordinationClient

from ydb._grpc.grpcwrapper.ydb_coordination_public_types import (
NodeConfig,
ConsistencyMode,
RateLimiterCountersMode,
DescribeResult,
)

__all__ = ["CoordinationClient", "NodeConfig", "ConsistencyMode", "RateLimiterCountersMode", "DescribeResult"]
65 changes: 65 additions & 0 deletions ydb/coordination/base_coordination_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from ydb import _apis, issues
from ydb._grpc.grpcwrapper.ydb_coordination_public_types import NodeConfig, DescribeResult
import logging


logger = logging.getLogger(__name__)


def wrapper_create_node(rpc_state, response_pb):
issues._process_response(response_pb.operation)


def wrapper_describe_node(rpc_state, response_pb) -> NodeConfig:
issues._process_response(response_pb.operation)
return DescribeResult.from_proto(response_pb)


def wrapper_delete_node(rpc_state, response_pb):
issues._process_response(response_pb.operation)


def wrapper_alter_node(rpc_state, response_pb):
issues._process_response(response_pb.operation)


class BaseCoordinationClient:
def __init__(self, driver):
logger.warning("Experimental API: interface may change in future releases.")
self._driver = driver

def _call_create(self, request, settings=None):
return self._driver(
request,
_apis.CoordinationService.Stub,
_apis.CoordinationService.CreateNode,
wrap_result=wrapper_create_node,
settings=settings,
)

def _call_describe(self, request, settings=None):
return self._driver(
request,
_apis.CoordinationService.Stub,
_apis.CoordinationService.DescribeNode,
wrap_result=wrapper_describe_node,
settings=settings,
)

def _call_alter(self, request, settings=None):
return self._driver(
request,
_apis.CoordinationService.Stub,
_apis.CoordinationService.AlterNode,
wrap_result=wrapper_alter_node,
settings=settings,
)

def _call_delete(self, request, settings=None):
return self._driver(
request,
_apis.CoordinationService.Stub,
_apis.CoordinationService.DropNode,
wrap_result=wrapper_delete_node,
settings=settings,
)
Loading
Loading