Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Fixed to handle deadline on topic stream in async driver

## 3.8.0 ##
* Added clients for draft.BaseDynamicConfig service

Expand Down
2 changes: 1 addition & 1 deletion ydb/_grpc/grpcwrapper/common_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ async def receive(self) -> Any:
# todo handle grpc exceptions and convert it to internal exceptions
try:
grpc_message = await self.from_server_grpc.__anext__()
except grpc.RpcError as e:
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
raise connection._rpc_error_handler(self._connection_state, e)

issues._process_response(grpc_message)
Expand Down
9 changes: 7 additions & 2 deletions ydb/connection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
import logging
import copy
import typing
from concurrent import futures
import uuid
import threading
Expand Down Expand Up @@ -61,15 +62,19 @@ def _log_request(rpc_state, request):
logger.debug("%s: request = { %s }", rpc_state, _message_to_string(request))


def _rpc_error_handler(rpc_state, rpc_error, on_disconnected=None):
def _rpc_error_handler(
rpc_state,
rpc_error: typing.Union[grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call],
on_disconnected: typing.Callable[[], None] = None,
):
"""
RPC call error handler, that translates gRPC error into YDB issue
:param rpc_state: A state of rpc
:param rpc_error: an underlying rpc error to handle
:param on_disconnected: a handler to call on disconnected connection
"""
logger.info("%s: received error, %s", rpc_state, rpc_error)
if isinstance(rpc_error, grpc.Call):
if isinstance(rpc_error, (grpc.RpcError, grpc.aio.AioRpcError, grpc.Call, grpc.aio.Call)):
if rpc_error.code() == grpc.StatusCode.UNAUTHENTICATED:
return issues.Unauthenticated(rpc_error.details())
elif rpc_error.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
Expand Down