Skip to content

Commit

Permalink
chore(use_cases): move upload completion to separate task
Browse files Browse the repository at this point in the history
Move marking uploads as complete from the PostUploadChunks` task to a new task, `MarkUploadsAsComplete`. Also rename the task `PrepareUploads` to `PrepareUploadChunks`.
  • Loading branch information
kennedykori committed Aug 25, 2022
1 parent 8273884 commit 1c3d99d
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 30 deletions.
12 changes: 9 additions & 3 deletions app/use_cases/main_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@
from .fetch_metadata import FetchDataSources, FetchExtractMetadata
from .run_extraction import GroupSiblingExtracts, RunDataSourceExtracts
from .types import RunExtractionResult
from .upload_extracts import PostUploadChunks, PostUploads, PrepareUploads
from .upload_extracts import (
MarkUploadsAsComplete,
PostUploadChunks,
PostUploads,
PrepareUploadChunks,
)


class FetchMetadata(
Expand All @@ -33,11 +38,12 @@ def __init__(self):


class UploadExtracts(Pipeline[Sequence[RunExtractionResult], Any]):
"""Upload the extracted metadata to the remote server."""
"""Upload the extracted metadata to their final destination."""

def __init__(self, transport: Transport):
super().__init__(
PostUploads(transport=transport),
PrepareUploads(),
PrepareUploadChunks(),
PostUploadChunks(transport=transport),
MarkUploadsAsComplete(transport=transport),
)
6 changes: 4 additions & 2 deletions app/use_cases/types.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from typing import Any, Tuple
from typing import Any, Sequence, Tuple

from app.core import ExtractMetadata
from app.core import ExtractMetadata, UploadChunk, UploadMetadata

RunExtractionResult = Tuple[ExtractMetadata, Any]

UploadExtractResult = Tuple[UploadMetadata, Sequence[UploadChunk]]
82 changes: 57 additions & 25 deletions app/use_cases/upload_extracts.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from itertools import chain
from logging import getLogger
from typing import Any, Iterable, Sequence, Tuple, Type

Expand All @@ -12,9 +11,9 @@
UploadChunk,
UploadMetadata,
)
from app.lib import ConcurrentExecutor
from app.lib import ConcurrentExecutor, Consumer

from .types import RunExtractionResult
from .types import RunExtractionResult, UploadExtractResult

# =============================================================================
# CONSTANTS
Expand All @@ -29,7 +28,7 @@

_PostedUpload = Tuple[UploadMetadata, Any]

_PreparedUpload = Tuple[UploadMetadata, Sequence[bytes]]
_PreparedChunks = Tuple[UploadMetadata, Sequence[bytes]]


# =============================================================================
Expand Down Expand Up @@ -87,6 +86,16 @@ def execute(self, an_input: Transport) -> UploadChunk:
return chunk


class DoMarkUploadAsComplete(Task[Transport, UploadMetadata]):
def __init__(self, upload: UploadMetadata):
self._upload: UploadMetadata = upload

def execute(self, an_input: Transport) -> UploadMetadata:
_LOGGER.info('Marking upload="%s" as complete.', str(self._upload))
an_input.mark_upload_as_complete(self._upload)
return self._upload


# =============================================================================
# MAIN TASKS
# =============================================================================
Expand Down Expand Up @@ -117,47 +126,54 @@ def _extraction_results_to_tasks(
return tuple(DoPostUpload(_extract) for _extract in extraction_results)


class PrepareUploads(Task[Sequence[_PostedUpload], Sequence[_PreparedUpload]]):
class PrepareUploadChunks(
Task[Sequence[_PostedUpload], Sequence[_PreparedChunks]]
):
def execute(
self, an_input: Sequence[_PostedUpload]
) -> Sequence[_PreparedUpload]:
_LOGGER.info("Preparing uploads.")
) -> Sequence[_PreparedChunks]:
_LOGGER.info("Preparing chunks.")
return tuple(
self._prepare_upload(_posted_upload) for _posted_upload in an_input
self._prepare_chunks_for_upload(_posted_upload)
for _posted_upload in an_input
)

@staticmethod
def _prepare_upload(posted_upload: _PostedUpload) -> _PreparedUpload:
_LOGGER.info('Preparing upload metadata="%s".', str(posted_upload[0]))
def _prepare_chunks_for_upload(
posted_upload: _PostedUpload,
) -> _PreparedChunks:
_LOGGER.info(
'Preparing chunks for upload metadata="%s".', str(posted_upload[0])
)
upload: UploadMetadata = posted_upload[0]
extract_data: Any = posted_upload[1]
chunks: Sequence[bytes] = upload.to_task().execute(extract_data)
return upload, chunks


class PostUploadChunks(Task[Sequence[_PreparedUpload], Sequence[UploadChunk]]):
class PostUploadChunks(
Task[Sequence[_PreparedChunks], Sequence[UploadExtractResult]]
):
def __init__(self, transport: Transport):
self._transport: Transport = transport

def execute(
self, an_input: Sequence[_PreparedUpload]
) -> Sequence[UploadChunk]:
self, an_input: Sequence[_PreparedChunks]
) -> Sequence[UploadExtractResult]:
_LOGGER.info("Posting upload chunks.")
return tuple(
chain.from_iterable(
self._post_upload_chunks(
upload=_prepared_upload[0],
chunks=_prepared_upload[1],
transport=self._transport,
)
for _prepared_upload in an_input
self._post_upload_chunks(
upload=_prepared_upload[0],
chunks=_prepared_upload[1],
transport=self._transport,
)
for _prepared_upload in an_input
)

@staticmethod
def _post_upload_chunks(
upload: UploadMetadata, chunks: Sequence[bytes], transport: Transport
) -> Sequence[UploadChunk]:
) -> UploadExtractResult:
executor: ConcurrentExecutor[Transport, Sequence[UploadChunk]]
executor = ConcurrentExecutor(
*(
Expand All @@ -169,7 +185,23 @@ def _post_upload_chunks(
initial_value=list(),
)
uploaded_chunks: Sequence[UploadChunk] = executor(transport) # noqa
# TODO: This should be it's own task, refactor this to reflect that.
_LOGGER.info('Marking upload metadata="%s" as complete.', str(upload))
transport.mark_upload_as_complete(upload_metadata=upload)
return uploaded_chunks
return upload, uploaded_chunks


class MarkUploadsAsComplete(Consumer[Sequence[UploadExtractResult]]):
def __init__(self, transport: Transport):
super().__init__(consume=self._mark_uploads_as_complete)
self._transport: Transport = transport

def _mark_uploads_as_complete(
self, posted_uploads: Sequence[UploadExtractResult]
) -> None:
_LOGGER.info("Marking completed upload as so.")
executor: ConcurrentExecutor[Transport, Any]
executor = ConcurrentExecutor(
*(
DoMarkUploadAsComplete(upload=_posted_upload[0])
for _posted_upload in posted_uploads
)
)
executor(an_input=self._transport) # noqa

0 comments on commit 1c3d99d

Please sign in to comment.