Skip to content

Commit

Permalink
chore(use_cases): move the retry functionality to use cases (#61)
Browse files Browse the repository at this point in the history
This builds on the work started on [#59](#59) and [#60](#60). After this refactor, all retry functionality should be moved from the ``lib`` and ``imp`` level all the way up to the application level and should help mitigate the scattering of the retry functionality all over the codebase.
  • Loading branch information
kennedykori committed Oct 14, 2022
1 parent 8068ad1 commit 819f2e3
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 9 deletions.
2 changes: 1 addition & 1 deletion app/lib/retry/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ def do_retry(self, wrapped: Callable[[], Any]) -> Any: # noqa
last_exp=last_exp,
)
_LOGGER.debug(
"Retrying due to {}, sleeping for {:.1f}s ...".format(
'Retrying due to "{}", sleeping for {:.1f}s ...'.format(
last_exp, sleep
)
)
Expand Down
10 changes: 9 additions & 1 deletion app/use_cases/fetch_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@
ExtractMetadata,
Task,
Transport,
TransportError,
)
from app.lib import (
ConcurrentExecutor,
Retry,
completed_successfully,
if_exception_type_factory,
)
from app.lib import ConcurrentExecutor, completed_successfully

# =============================================================================
# CONSTANTS
Expand All @@ -31,6 +37,7 @@ class DoFetchDataSources(Task[Transport, Sequence[DataSource]]):
def __init__(self, data_source_type: DataSourceType):
self._data_source_type: DataSourceType = data_source_type

@Retry(predicate=if_exception_type_factory(TransportError))
def execute(self, an_input: Transport) -> Sequence[DataSource]:
_LOGGER.info(
'Fetching data sources for data source type="%s".',
Expand All @@ -52,6 +59,7 @@ class DoFetchExtractMetadata(Task[Transport, Sequence[ExtractMetadata]]):
def __init__(self, data_source: DataSource):
self._data_source: DataSource = data_source

@Retry(predicate=if_exception_type_factory(TransportError))
def execute(self, an_input: Transport) -> Sequence[ExtractMetadata]:
_LOGGER.info(
'Fetching extract metadata for data source="%s".',
Expand Down
19 changes: 13 additions & 6 deletions app/use_cases/run_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,18 @@
from logging import getLogger
from typing import Any, Sequence, Tuple

from app.core import DataSource, ExtractMetadata, Task
from app.lib import ConcurrentExecutor, completed_successfully
from app.core import (
DataSource,
ExtractionOperationError,
ExtractMetadata,
Task,
)
from app.lib import (
ConcurrentExecutor,
Retry,
completed_successfully,
if_exception_type_factory,
)

from .types import RunExtractionResult

Expand Down Expand Up @@ -36,6 +46,7 @@ class DoExtract(Task[DataSource, RunExtractionResult]):
def __init__(self, extract_metadata: ExtractMetadata):
self._extract_metadata: ExtractMetadata = extract_metadata

@Retry(predicate=if_exception_type_factory(ExtractionOperationError))
def execute(self, an_input: DataSource) -> RunExtractionResult:
# The extract should only be run against its parent data source.
assert self._extract_metadata.data_source == an_input
Expand Down Expand Up @@ -125,7 +136,3 @@ def run_data_source_extracts(
),
)
)


# TODO: Add more tasks here to post process extraction results. E.g, handle
# errors if they occurred, etc.
12 changes: 11 additions & 1 deletion app/use_cases/upload_extracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,17 @@
ExtractMetadata,
Task,
Transport,
TransportError,
UploadChunk,
UploadMetadata,
)
from app.lib import ConcurrentExecutor, Consumer, completed_successfully
from app.lib import (
ConcurrentExecutor,
Consumer,
Retry,
completed_successfully,
if_exception_type_factory,
)

from .types import RunExtractionResult, UploadExtractResult

Expand Down Expand Up @@ -41,6 +48,7 @@ class DoPostUpload(Task[Transport, _PostedUpload]):
def __init__(self, extract: RunExtractionResult):
self._extract: RunExtractionResult = extract

@Retry(predicate=if_exception_type_factory(TransportError))
def execute(self, an_input: Transport) -> _PostedUpload:
_LOGGER.info(
'Posting upload metadata for extract metadata="%s".',
Expand Down Expand Up @@ -72,6 +80,7 @@ def __init__(
self._chunk_index: int = chunk_index
self._chunk_content: bytes = chunk_content

@Retry(predicate=if_exception_type_factory(TransportError))
def execute(self, an_input: Transport) -> UploadChunk:
_LOGGER.info(
'Posting upload chunks for upload metadata="%s".',
Expand All @@ -91,6 +100,7 @@ class DoMarkUploadAsComplete(Task[Transport, UploadMetadata]):
def __init__(self, upload: UploadMetadata):
self._upload: UploadMetadata = upload

@Retry(predicate=if_exception_type_factory(TransportError))
def execute(self, an_input: Transport) -> UploadMetadata:
_LOGGER.info('Marking upload="%s" as complete.', str(self._upload))
an_input.mark_upload_as_complete(self._upload)
Expand Down

0 comments on commit 819f2e3

Please sign in to comment.