diff --git a/ydb/_errors.py b/ydb/_errors.py new file mode 100644 index 00000000..e8628586 --- /dev/null +++ b/ydb/_errors.py @@ -0,0 +1,62 @@ +from dataclasses import dataclass +from typing import Optional + +from ydb import issues + +_errors_retriable_fast_backoff_types = [ + issues.Unavailable, +] +_errors_retriable_slow_backoff_types = [ + issues.Aborted, + issues.BadSession, + issues.Overloaded, + issues.SessionPoolEmpty, + issues.ConnectionError, +] +_errors_retriable_slow_backoff_idempotent_types = [ + issues.Undetermined, +] + + +def check_retriable_error(err, retry_settings, attempt): + if isinstance(err, issues.NotFound): + if retry_settings.retry_not_found: + return ErrorRetryInfo( + True, retry_settings.fast_backoff.calc_timeout(attempt) + ) + else: + return ErrorRetryInfo(False, None) + + if isinstance(err, issues.InternalError): + if retry_settings.retry_internal_error: + return ErrorRetryInfo( + True, retry_settings.slow_backoff.calc_timeout(attempt) + ) + else: + return ErrorRetryInfo(False, None) + + for t in _errors_retriable_fast_backoff_types: + if isinstance(err, t): + return ErrorRetryInfo( + True, retry_settings.fast_backoff.calc_timeout(attempt) + ) + + for t in _errors_retriable_slow_backoff_types: + if isinstance(err, t): + return ErrorRetryInfo( + True, retry_settings.slow_backoff.calc_timeout(attempt) + ) + + if retry_settings.idempotent: + for t in _errors_retriable_slow_backoff_idempotent_types: + return ErrorRetryInfo( + True, retry_settings.slow_backoff.calc_timeout(attempt) + ) + + return ErrorRetryInfo(False, None) + + +@dataclass +class ErrorRetryInfo: + is_retriable: bool + sleep_timeout_seconds: Optional[float] diff --git a/ydb/table.py b/ydb/table.py index ebc4ece3..6f0a4868 100644 --- a/ydb/table.py +++ b/ydb/table.py @@ -21,6 +21,7 @@ _tx_ctx_impl, tracing, ) +from ._errors import check_retriable_error try: from . import interceptor @@ -916,12 +917,28 @@ class YdbRetryOperationSleepOpt(object): def __init__(self, timeout): self.timeout = timeout + def __eq__(self, other): + return type(self) == type(other) and self.timeout == other.timeout + + def __repr__(self): + return "YdbRetryOperationSleepOpt(%s)" % self.timeout + class YdbRetryOperationFinalResult(object): def __init__(self, result): self.result = result self.exc = None + def __eq__(self, other): + return ( + type(self) == type(other) + and self.result == other.result + and self.exc == other.exc + ) + + def __repr__(self): + return "YdbRetryOperationFinalResult(%s, exc=%s)" % (self.result, self.exc) + def set_exception(self, exc): self.exc = exc @@ -938,56 +955,28 @@ def retry_operation_impl(callee, retry_settings=None, *args, **kwargs): if result.exc is not None: raise result.exc - except ( - issues.Aborted, - issues.BadSession, - issues.NotFound, - issues.InternalError, - ) as e: - status = e - retry_settings.on_ydb_error_callback(e) - - if isinstance(e, issues.NotFound) and not retry_settings.retry_not_found: - raise e - - if ( - isinstance(e, issues.InternalError) - and not retry_settings.retry_internal_error - ): - raise e - - except ( - issues.Overloaded, - issues.SessionPoolEmpty, - issues.ConnectionError, - ) as e: - status = e - retry_settings.on_ydb_error_callback(e) - yield YdbRetryOperationSleepOpt( - retry_settings.slow_backoff.calc_timeout(attempt) - ) - - except issues.Unavailable as e: + except issues.Error as e: status = e retry_settings.on_ydb_error_callback(e) - yield YdbRetryOperationSleepOpt( - retry_settings.fast_backoff.calc_timeout(attempt) - ) - except issues.Undetermined as e: - status = e - retry_settings.on_ydb_error_callback(e) - if not retry_settings.idempotent: - # operation is not idempotent, so we cannot retry. + retriable_info = check_retriable_error(e, retry_settings, attempt) + if not retriable_info.is_retriable: raise - yield YdbRetryOperationSleepOpt( - retry_settings.fast_backoff.calc_timeout(attempt) - ) + skip_yield_error_types = [ + issues.Aborted, + issues.BadSession, + issues.NotFound, + issues.InternalError, + ] - except issues.Error as e: - retry_settings.on_ydb_error_callback(e) - raise + yield_sleep = True + for t in skip_yield_error_types: + if isinstance(e, t): + yield_sleep = False + + if yield_sleep: + yield YdbRetryOperationSleepOpt(retriable_info.sleep_timeout_seconds) except Exception as e: # you should provide your own handler you want diff --git a/ydb/table_test.py b/ydb/table_test.py new file mode 100644 index 00000000..361719be --- /dev/null +++ b/ydb/table_test.py @@ -0,0 +1,135 @@ +from unittest import mock +from ydb import ( + retry_operation_impl, + YdbRetryOperationFinalResult, + issues, + YdbRetryOperationSleepOpt, + RetrySettings, +) + + +def test_retry_operation_impl(monkeypatch): + monkeypatch.setattr("random.random", lambda: 0.5) + monkeypatch.setattr( + issues.Error, + "__eq__", + lambda self, other: type(self) == type(other) and self.message == other.message, + ) + + retry_once_settings = RetrySettings( + max_retries=1, + on_ydb_error_callback=mock.Mock(), + ) + retry_once_settings.unknown_error_handler = mock.Mock() + + def get_results(callee): + res_generator = retry_operation_impl(callee, retry_settings=retry_once_settings) + results = [] + exc = None + try: + for res in res_generator: + results.append(res) + if isinstance(res, YdbRetryOperationFinalResult): + break + except Exception as e: + exc = e + + return results, exc + + class TestException(Exception): + def __init__(self, message): + super(TestException, self).__init__(message) + self.message = message + + def __eq__(self, other): + return type(self) == type(other) and self.message == other.message + + def check_unretriable_error(err_type, call_ydb_handler): + retry_once_settings.on_ydb_error_callback.reset_mock() + retry_once_settings.unknown_error_handler.reset_mock() + + results = get_results( + mock.Mock(side_effect=[err_type("test1"), err_type("test2")]) + ) + yields = results[0] + exc = results[1] + + assert yields == [] + assert exc == err_type("test1") + + if call_ydb_handler: + assert retry_once_settings.on_ydb_error_callback.call_count == 1 + retry_once_settings.on_ydb_error_callback.assert_called_with( + err_type("test1") + ) + + assert retry_once_settings.unknown_error_handler.call_count == 0 + else: + assert retry_once_settings.on_ydb_error_callback.call_count == 0 + + assert retry_once_settings.unknown_error_handler.call_count == 1 + retry_once_settings.unknown_error_handler.assert_called_with( + err_type("test1") + ) + + def check_retriable_error(err_type, backoff): + retry_once_settings.on_ydb_error_callback.reset_mock() + + results = get_results( + mock.Mock(side_effect=[err_type("test1"), err_type("test2")]) + ) + yields = results[0] + exc = results[1] + + if backoff: + assert [ + YdbRetryOperationSleepOpt(backoff.calc_timeout(0)), + YdbRetryOperationSleepOpt(backoff.calc_timeout(1)), + ] == yields + else: + assert [] == yields + + assert exc == err_type("test2") + + assert retry_once_settings.on_ydb_error_callback.call_count == 2 + retry_once_settings.on_ydb_error_callback.assert_any_call(err_type("test1")) + retry_once_settings.on_ydb_error_callback.assert_called_with(err_type("test2")) + + assert retry_once_settings.unknown_error_handler.call_count == 0 + + # check ok + assert get_results(lambda: True) == ([YdbRetryOperationFinalResult(True)], None) + + # check retry error and return result + assert get_results(mock.Mock(side_effect=[issues.Overloaded("test"), True])) == ( + [ + YdbRetryOperationSleepOpt(retry_once_settings.slow_backoff.calc_timeout(0)), + YdbRetryOperationFinalResult(True), + ], + None, + ) + + # check errors + check_retriable_error(issues.Aborted, None) + check_retriable_error(issues.BadSession, None) + + check_retriable_error(issues.NotFound, None) + with mock.patch.object(retry_once_settings, "retry_not_found", False): + check_unretriable_error(issues.NotFound, True) + + check_retriable_error(issues.InternalError, None) + with mock.patch.object(retry_once_settings, "retry_internal_error", False): + check_unretriable_error(issues.InternalError, True) + + check_retriable_error(issues.Overloaded, retry_once_settings.slow_backoff) + check_retriable_error(issues.SessionPoolEmpty, retry_once_settings.slow_backoff) + check_retriable_error(issues.ConnectionError, retry_once_settings.slow_backoff) + + check_retriable_error(issues.Unavailable, retry_once_settings.fast_backoff) + + check_unretriable_error(issues.Undetermined, True) + with mock.patch.object(retry_once_settings, "idempotent", True): + check_retriable_error(issues.Unavailable, retry_once_settings.fast_backoff) + + check_unretriable_error(issues.Error, True) + check_unretriable_error(TestException, False)