diff --git a/tests/coordination/__init__.py b/tests/coordination/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/coordination/test_coordination_client.py b/tests/coordination/test_coordination_client.py new file mode 100644 index 00000000..98fb6768 --- /dev/null +++ b/tests/coordination/test_coordination_client.py @@ -0,0 +1,95 @@ +import pytest + +import ydb +from ydb import aio + +from ydb.coordination import ( + NodeConfig, + ConsistencyMode, + RateLimiterCountersMode, + CoordinationClient, +) + + +class TestCoordination: + def test_coordination_node_lifecycle(self, driver_sync: ydb.Driver): + client = CoordinationClient(driver_sync) + node_path = "/local/test_node_lifecycle" + + try: + client.delete_node(node_path) + except ydb.SchemeError: + pass + + with pytest.raises(ydb.SchemeError): + client.describe_node(node_path) + + initial_config = NodeConfig( + session_grace_period_millis=1000, + attach_consistency_mode=ConsistencyMode.STRICT, + read_consistency_mode=ConsistencyMode.STRICT, + rate_limiter_counters_mode=RateLimiterCountersMode.UNSET, + self_check_period_millis=0, + ) + client.create_node(node_path, initial_config) + + node_conf = client.describe_node(node_path) + assert node_conf == initial_config + + updated_config = NodeConfig( + session_grace_period_millis=12345, + attach_consistency_mode=ConsistencyMode.STRICT, + read_consistency_mode=ConsistencyMode.RELAXED, + rate_limiter_counters_mode=RateLimiterCountersMode.DETAILED, + self_check_period_millis=10, + ) + client.alter_node(node_path, updated_config) + + node_conf = client.describe_node(node_path) + assert node_conf == updated_config + + client.delete_node(node_path) + + with pytest.raises(ydb.SchemeError): + client.describe_node(node_path) + + async def test_coordination_node_lifecycle_async(self, aio_connection): + client = aio.CoordinationClient(aio_connection) + node_path = "/local/test_node_lifecycle" + + try: + await client.delete_node(node_path) + except ydb.SchemeError: + pass + + with pytest.raises(ydb.SchemeError): + await client.describe_node(node_path) + + initial_config = NodeConfig( + session_grace_period_millis=1000, + attach_consistency_mode=ConsistencyMode.STRICT, + read_consistency_mode=ConsistencyMode.STRICT, + rate_limiter_counters_mode=RateLimiterCountersMode.UNSET, + self_check_period_millis=0, + ) + await client.create_node(node_path, initial_config) + + node_conf = await client.describe_node(node_path) + assert node_conf == initial_config + + updated_config = NodeConfig( + session_grace_period_millis=12345, + attach_consistency_mode=ConsistencyMode.STRICT, + read_consistency_mode=ConsistencyMode.RELAXED, + rate_limiter_counters_mode=RateLimiterCountersMode.DETAILED, + self_check_period_millis=10, + ) + await client.alter_node(node_path, updated_config) + + node_conf = await client.describe_node(node_path) + assert node_conf == updated_config + + await client.delete_node(node_path) + + with pytest.raises(ydb.SchemeError): + await client.describe_node(node_path) diff --git a/ydb/_apis.py b/ydb/_apis.py index 827a71a4..97f64b90 100644 --- a/ydb/_apis.py +++ b/ydb/_apis.py @@ -11,6 +11,7 @@ ydb_operation_v1_pb2_grpc, ydb_topic_v1_pb2_grpc, ydb_query_v1_pb2_grpc, + ydb_coordination_v1_pb2_grpc, ) from ._grpc.v4.protos import ( @@ -22,6 +23,7 @@ ydb_operation_pb2, ydb_common_pb2, ydb_query_pb2, + ydb_coordination_pb2, ) else: @@ -33,6 +35,7 @@ ydb_operation_v1_pb2_grpc, ydb_topic_v1_pb2_grpc, ydb_query_v1_pb2_grpc, + ydb_coordination_v1_pb2_grpc, ) from ._grpc.common.protos import ( @@ -44,6 +47,7 @@ ydb_operation_pb2, ydb_common_pb2, ydb_query_pb2, + ydb_coordination_pb2, ) @@ -56,6 +60,7 @@ ydb_discovery = ydb_discovery_pb2 ydb_operation = ydb_operation_pb2 ydb_query = ydb_query_pb2 +ydb_coordination = ydb_coordination_pb2 class CmsService(object): @@ -134,3 +139,13 @@ class QueryService(object): ExecuteQuery = "ExecuteQuery" ExecuteScript = "ExecuteScript" FetchScriptResults = "FetchScriptResults" + + +class CoordinationService(object): + Stub = ydb_coordination_v1_pb2_grpc.CoordinationServiceStub + + Session = "Session" + CreateNode = "CreateNode" + AlterNode = "AlterNode" + DropNode = "DropNode" + DescribeNode = "DescribeNode" diff --git a/ydb/_grpc/grpcwrapper/ydb_coordination.py b/ydb/_grpc/grpcwrapper/ydb_coordination.py new file mode 100644 index 00000000..176e4e02 --- /dev/null +++ b/ydb/_grpc/grpcwrapper/ydb_coordination.py @@ -0,0 +1,57 @@ +import typing +from dataclasses import dataclass + +from .ydb_coordination_public_types import NodeConfig + +if typing.TYPE_CHECKING: + from ..v4.protos import ydb_coordination_pb2 +else: + from ..common.protos import ydb_coordination_pb2 + +from .common_utils import IToProto + + +@dataclass +class CreateNodeRequest(IToProto): + path: str + config: typing.Optional[NodeConfig] + + def to_proto(self) -> ydb_coordination_pb2.CreateNodeRequest: + cfg_proto = self.config.to_proto() if self.config else None + return ydb_coordination_pb2.CreateNodeRequest( + path=self.path, + config=cfg_proto, + ) + + +@dataclass +class AlterNodeRequest(IToProto): + path: str + config: NodeConfig + + def to_proto(self) -> ydb_coordination_pb2.AlterNodeRequest: + cfg_proto = self.config.to_proto() if self.config else None + return ydb_coordination_pb2.AlterNodeRequest( + path=self.path, + config=cfg_proto, + ) + + +@dataclass +class DescribeNodeRequest(IToProto): + path: str + + def to_proto(self) -> ydb_coordination_pb2.DescribeNodeRequest: + return ydb_coordination_pb2.DescribeNodeRequest( + path=self.path, + ) + + +@dataclass +class DropNodeRequest(IToProto): + path: str + + def to_proto(self) -> ydb_coordination_pb2.DropNodeRequest: + return ydb_coordination_pb2.DropNodeRequest( + path=self.path, + ) diff --git a/ydb/_grpc/grpcwrapper/ydb_coordination_public_types.py b/ydb/_grpc/grpcwrapper/ydb_coordination_public_types.py new file mode 100644 index 00000000..a3580974 --- /dev/null +++ b/ydb/_grpc/grpcwrapper/ydb_coordination_public_types.py @@ -0,0 +1,57 @@ +from dataclasses import dataclass +from enum import IntEnum +import typing + + +if typing.TYPE_CHECKING: + from ..v4.protos import ydb_coordination_pb2 +else: + from ..common.protos import ydb_coordination_pb2 + + +class ConsistencyMode(IntEnum): + UNSET = ydb_coordination_pb2.ConsistencyMode.CONSISTENCY_MODE_UNSET + STRICT = ydb_coordination_pb2.ConsistencyMode.CONSISTENCY_MODE_STRICT + RELAXED = ydb_coordination_pb2.ConsistencyMode.CONSISTENCY_MODE_RELAXED + + +class RateLimiterCountersMode(IntEnum): + UNSET = ydb_coordination_pb2.RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_UNSET + AGGREGATED = ydb_coordination_pb2.RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_AGGREGATED + DETAILED = ydb_coordination_pb2.RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_DETAILED + + +@dataclass +class NodeConfig: + attach_consistency_mode: ConsistencyMode + rate_limiter_counters_mode: RateLimiterCountersMode + read_consistency_mode: ConsistencyMode + self_check_period_millis: int + session_grace_period_millis: int + + @staticmethod + def from_proto(msg: ydb_coordination_pb2.Config) -> "NodeConfig": + return NodeConfig( + attach_consistency_mode=msg.attach_consistency_mode, + rate_limiter_counters_mode=msg.rate_limiter_counters_mode, + read_consistency_mode=msg.read_consistency_mode, + self_check_period_millis=msg.self_check_period_millis, + session_grace_period_millis=msg.session_grace_period_millis, + ) + + def to_proto(self) -> ydb_coordination_pb2.Config: + return ydb_coordination_pb2.Config( + attach_consistency_mode=self.attach_consistency_mode, + rate_limiter_counters_mode=self.rate_limiter_counters_mode, + read_consistency_mode=self.read_consistency_mode, + self_check_period_millis=self.self_check_period_millis, + session_grace_period_millis=self.session_grace_period_millis, + ) + + +class DescribeResult: + @staticmethod + def from_proto(msg: ydb_coordination_pb2.DescribeNodeResponse) -> "NodeConfig": + result = ydb_coordination_pb2.DescribeNodeResult() + msg.operation.result.Unpack(result) + return NodeConfig.from_proto(result.config) diff --git a/ydb/aio/__init__.py b/ydb/aio/__init__.py index 1c9c887c..d38d9e73 100644 --- a/ydb/aio/__init__.py +++ b/ydb/aio/__init__.py @@ -1,3 +1,4 @@ from .driver import Driver # noqa from .table import SessionPool, retry_operation # noqa from .query import QuerySessionPool, QuerySession, QueryTxContext # noqa +from .coordination_client import CoordinationClient # noqa diff --git a/ydb/aio/coordination_client.py b/ydb/aio/coordination_client.py new file mode 100644 index 00000000..9aab6785 --- /dev/null +++ b/ydb/aio/coordination_client.py @@ -0,0 +1,39 @@ +from typing import Optional + +from ydb._grpc.grpcwrapper.ydb_coordination import ( + CreateNodeRequest, + DescribeNodeRequest, + AlterNodeRequest, + DropNodeRequest, +) +from ydb._grpc.grpcwrapper.ydb_coordination_public_types import NodeConfig +from ydb.coordination.base_coordination_client import BaseCoordinationClient + + +class CoordinationClient(BaseCoordinationClient): + async def create_node(self, path: str, config: Optional[NodeConfig] = None, settings=None): + return await self._call_create( + CreateNodeRequest(path=path, config=config).to_proto(), + settings=settings, + ) + + async def describe_node(self, path: str, settings=None) -> NodeConfig: + return await self._call_describe( + DescribeNodeRequest(path=path).to_proto(), + settings=settings, + ) + + async def alter_node(self, path: str, new_config: NodeConfig, settings=None): + return await self._call_alter( + AlterNodeRequest(path=path, config=new_config).to_proto(), + settings=settings, + ) + + async def delete_node(self, path: str, settings=None): + return await self._call_delete( + DropNodeRequest(path=path).to_proto(), + settings=settings, + ) + + async def lock(self): + raise NotImplementedError("Will be implemented in future release") diff --git a/ydb/coordination/__init__.py b/ydb/coordination/__init__.py new file mode 100644 index 00000000..55834e89 --- /dev/null +++ b/ydb/coordination/__init__.py @@ -0,0 +1,10 @@ +from .coordination_client import CoordinationClient + +from ydb._grpc.grpcwrapper.ydb_coordination_public_types import ( + NodeConfig, + ConsistencyMode, + RateLimiterCountersMode, + DescribeResult, +) + +__all__ = ["CoordinationClient", "NodeConfig", "ConsistencyMode", "RateLimiterCountersMode", "DescribeResult"] diff --git a/ydb/coordination/base_coordination_client.py b/ydb/coordination/base_coordination_client.py new file mode 100644 index 00000000..d2b5bf8c --- /dev/null +++ b/ydb/coordination/base_coordination_client.py @@ -0,0 +1,65 @@ +from ydb import _apis, issues +from ydb._grpc.grpcwrapper.ydb_coordination_public_types import NodeConfig, DescribeResult +import logging + + +logger = logging.getLogger(__name__) + + +def wrapper_create_node(rpc_state, response_pb): + issues._process_response(response_pb.operation) + + +def wrapper_describe_node(rpc_state, response_pb) -> NodeConfig: + issues._process_response(response_pb.operation) + return DescribeResult.from_proto(response_pb) + + +def wrapper_delete_node(rpc_state, response_pb): + issues._process_response(response_pb.operation) + + +def wrapper_alter_node(rpc_state, response_pb): + issues._process_response(response_pb.operation) + + +class BaseCoordinationClient: + def __init__(self, driver): + logger.warning("Experimental API: interface may change in future releases.") + self._driver = driver + + def _call_create(self, request, settings=None): + return self._driver( + request, + _apis.CoordinationService.Stub, + _apis.CoordinationService.CreateNode, + wrap_result=wrapper_create_node, + settings=settings, + ) + + def _call_describe(self, request, settings=None): + return self._driver( + request, + _apis.CoordinationService.Stub, + _apis.CoordinationService.DescribeNode, + wrap_result=wrapper_describe_node, + settings=settings, + ) + + def _call_alter(self, request, settings=None): + return self._driver( + request, + _apis.CoordinationService.Stub, + _apis.CoordinationService.AlterNode, + wrap_result=wrapper_alter_node, + settings=settings, + ) + + def _call_delete(self, request, settings=None): + return self._driver( + request, + _apis.CoordinationService.Stub, + _apis.CoordinationService.DropNode, + wrap_result=wrapper_delete_node, + settings=settings, + ) diff --git a/ydb/coordination/coordination_client.py b/ydb/coordination/coordination_client.py new file mode 100644 index 00000000..24dd999d --- /dev/null +++ b/ydb/coordination/coordination_client.py @@ -0,0 +1,39 @@ +from typing import Optional + +from ydb._grpc.grpcwrapper.ydb_coordination import ( + CreateNodeRequest, + DescribeNodeRequest, + AlterNodeRequest, + DropNodeRequest, +) +from ydb._grpc.grpcwrapper.ydb_coordination_public_types import NodeConfig +from ydb.coordination.base_coordination_client import BaseCoordinationClient + + +class CoordinationClient(BaseCoordinationClient): + def create_node(self, path: str, config: Optional[NodeConfig], settings=None): + return self._call_create( + CreateNodeRequest(path=path, config=config).to_proto(), + settings=settings, + ) + + def describe_node(self, path: str, settings=None) -> NodeConfig: + return self._call_describe( + DescribeNodeRequest(path=path).to_proto(), + settings=settings, + ) + + def alter_node(self, path: str, new_config: NodeConfig, settings=None): + return self._call_alter( + AlterNodeRequest(path=path, config=new_config).to_proto(), + settings=settings, + ) + + def delete_node(self, path: str, settings=None): + return self._call_delete( + DropNodeRequest(path=path).to_proto(), + settings=settings, + ) + + def lock(self): + raise NotImplementedError("Will be implemented in future release")