Skip to content

Commit

Permalink
feat(lib): implement the retry functionality
Browse files Browse the repository at this point in the history
Add a proper implementation of the retry functionality making the client more resilient when faced with IO Errors.
  • Loading branch information
kennedykori committed Oct 12, 2022
1 parent 9383fb2 commit b258a6e
Show file tree
Hide file tree
Showing 17 changed files with 836 additions and 24 deletions.
14 changes: 12 additions & 2 deletions .config.template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,22 @@ LOGGING:
level: "INFO"
handlers": [ "console" ]

# The retry module global configuration. The values here will be used for each
# retry instance when not specified.
# This setting is not required and defaults to the values given below.
RETRY:
default_deadline: 60.0
default_initial_delay: 1.0
default_maximum_delay: 60.0
default_multiplicative_factor: 2.0
enable_retries: true

# The different data source types supported by the app. This determines the
# kinds of data that the app can extract and operations that can be performed
# on that data including what is uploaded to the server.
# This setting is not required and defaults to an empty list.
SUPPORTED_DATA_SOURCE_TYPES:
- "app.imp.sql_data.SQLDataSourceType"
#SUPPORTED_DATA_SOURCE_TYPES:
# - "app.imp.sql_data.SQLDataSourceType"



Expand Down
4 changes: 4 additions & 0 deletions app/imp/sql_data/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pyarrow.parquet as pq
from sqlalchemy import create_engine
from sqlalchemy.engine import Connection, Engine
from sqlalchemy.exc import SQLAlchemyError

import app
from app.core import (
Expand All @@ -22,7 +23,9 @@
ChunkDataFrame,
ImproperlyConfiguredError,
Pipeline,
Retry,
SimpleSQLSelect,
if_exception_type_factory,
)

from .exceptions import SQLDataError, SQLDataSourceDisposedError
Expand Down Expand Up @@ -145,6 +148,7 @@ def dispose(self) -> None:
self._engine.dispose()
self._engine = None

@Retry(predicate=if_exception_type_factory(SQLAlchemyError))
def get_extract_task_args(self) -> Connection:
self._ensure_not_disposed()
assert self._engine is not None
Expand Down
10 changes: 9 additions & 1 deletion app/lib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
from .app_registry import AppRegistry, DefaultTransportFactory
from .checkers import ensure_not_none, ensure_not_none_nor_empty
from .checkers import (
ensure_greater_than,
ensure_not_none,
ensure_not_none_nor_empty,
)
from .config import * # noqa: F401,F403
from .config import __all__ as _all_config
from .module_loading import import_string, import_string_as_klass
from .retry import * # noqa: F401,F403
from .retry import __all__ as _all_retry
from .tasks import * # noqa: F401,F403
from .tasks import __all__ as _all_tasks
from .transports import * # noqa: F401,F403
Expand All @@ -11,11 +17,13 @@
__all__ = [
"AppRegistry",
"DefaultTransportFactory",
"ensure_greater_than",
"ensure_not_none",
"ensure_not_none_nor_empty",
"import_string",
"import_string_as_klass",
]
__all__ += _all_config # type: ignore
__all__ += _all_retry # type: ignore
__all__ += _all_tasks # type: ignore
__all__ += _all_transports # type: ignore
37 changes: 36 additions & 1 deletion app/lib/checkers.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,53 @@
from typing import Optional, Sized, TypeVar
from abc import abstractmethod
from typing import Optional, Protocol, Sized, SupportsFloat, TypeVar

# =============================================================================
# TYPES
# =============================================================================


_C = TypeVar("_C", bound="Comparable")

_S = TypeVar("_S", bound=Sized)

_T = TypeVar("_T")


class Comparable(Protocol):
"""This denotes a type that supports comparisons."""

@abstractmethod
def __lt__(self: _C, other: _C) -> bool:
...


# =============================================================================
# CHECKERS
# =============================================================================


def ensure_greater_than(
value: SupportsFloat,
base_value: SupportsFloat,
message: str = '"value" must greater than "base_value"',
) -> SupportsFloat:
"""Check that the given value is greater that the given base value.
:param value: The value to check for greatness.
:param base_value: The value to compare for greatness against.
:param message: An optional error message to be shown when value is
``None``.
:return: ``value`` if it is greater than ``base_value``.
:raise ValueError: If the given ``value`` is less than or equal to the
given ``base_value``.
"""
if value < base_value: # type: ignore
raise ValueError(message)
return value


def ensure_not_none(
value: Optional[_T], message: str = '"value" cannot be None.'
) -> _T:
Expand Down
11 changes: 11 additions & 0 deletions app/lib/retry/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from .exceptions import RetryError
from .retry import Retry, if_exception_type_factory, if_idr_exception
from .setting_initializers import RetryInitializer

__all__ = [
"Retry",
"RetryError",
"RetryInitializer",
"if_idr_exception",
"if_exception_type_factory",
]
21 changes: 21 additions & 0 deletions app/lib/retry/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from typing import Final

from .types import RetryConfig

RETRY_CONFIG_KEY: Final[str] = "RETRY"

DEFAULT_DEADLINE: Final[float] = 60.0 * 5 # In seconds

DEFAULT_INITIAL_DELAY: Final[float] = 1.0 # In seconds

DEFAULT_MAXIMUM_DELAY: Final[float] = 60.0 # In seconds

DEFAULT_MULTIPLICATIVE_FACTOR: Final[float] = 2.0

DEFAULT_RETRY_CONFIG: Final[RetryConfig] = {
"default_deadline": DEFAULT_DEADLINE,
"default_initial_delay": DEFAULT_INITIAL_DELAY,
"default_maximum_delay": DEFAULT_MAXIMUM_DELAY,
"default_multiplicative_factor": DEFAULT_MULTIPLICATIVE_FACTOR,
"enable_retries": True,
}
9 changes: 9 additions & 0 deletions app/lib/retry/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from app.core import IDRClientException


class RetryError(IDRClientException):
"""An exception used to indicate that a retry failed."""

def __init__(self, exp: BaseException, message="Deadline exceeded."):
self._exp: BaseException = exp
super().__init__(message, self._exp)
Loading

0 comments on commit b258a6e

Please sign in to comment.