Skip to content

Commit

Permalink
feat(usecases): wire up data upload functionality
Browse files Browse the repository at this point in the history
Connect the different modules/components on the main application pipeline to implement upload of extracted data to the server.
  • Loading branch information
kennedykori committed Aug 17, 2022
1 parent 6ba5789 commit b74ffdc
Show file tree
Hide file tree
Showing 4 changed files with 201 additions and 26 deletions.
14 changes: 4 additions & 10 deletions app/use_cases/fetch_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,8 @@ def _data_source_types_to_tasks(
data_source_types: Iterable[DataSourceType],
) -> Sequence[DoFetchDataSourceTypeSources]:
return tuple(
(
DoFetchDataSourceTypeSources(
data_source_type=_data_source_type
)
for _data_source_type in data_source_types
)
DoFetchDataSourceTypeSources(data_source_type=_data_source_type)
for _data_source_type in data_source_types
)


Expand Down Expand Up @@ -139,8 +135,6 @@ def _data_sources_to_tasks(
data_sources: Iterable[DataSource],
) -> Sequence[DoFetchDataSourceExtracts]:
return tuple(
(
DoFetchDataSourceExtracts(data_source=_data_source)
for _data_source in data_sources
)
DoFetchDataSourceExtracts(data_source=_data_source)
for _data_source in data_sources
)
23 changes: 8 additions & 15 deletions app/use_cases/main_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from typing import Any, Sequence

from app.core import DataSourceType, ExtractMetadata, Task, Transport
from app.core import DataSourceType, ExtractMetadata, Transport
from app.lib import Pipeline

from .fetch_metadata import FetchDataSources, FetchExtractMetadata
from .run_extraction import GroupSiblingExtracts, RunDataSourceExtracts
from .types import RunExtractionResult
from .upload_extracts import PostUploadChunks, PostUploads, PrepareUploads


class FetchMetadata(
Expand All @@ -31,20 +32,12 @@ def __init__(self):
super().__init__(GroupSiblingExtracts(), RunDataSourceExtracts())


class UploadExtracts(Task[Sequence[RunExtractionResult], Any]):
class UploadExtracts(Pipeline[Sequence[RunExtractionResult], Any]):
"""Upload the extracted metadata to the remote server."""

def __init__(self, transport: Transport):
self._transport: Transport = transport

def execute(self, an_input: Sequence[RunExtractionResult]) -> Any:
# TODO: Add proper implementation.
for _extract in an_input:
print("==========================================================")
print(_extract[0].name)
print("==========================================================")
print("\n", _extract[1], "\n")
print("----------------------------------------------------------")
print("\n")

return an_input
super().__init__(
PostUploads(transport=transport),
PrepareUploads(),
PostUploadChunks(transport=transport),
)
19 changes: 18 additions & 1 deletion app/use_cases/run_extraction.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from itertools import chain, groupby
from logging import getLogger
from typing import Any, Sequence, Tuple

from app.core import DataSource, ExtractMetadata, Task
Expand All @@ -7,9 +8,15 @@
from .types import RunExtractionResult

# =============================================================================
# TYPES
# CONSTANTS
# =============================================================================

_LOGGER = getLogger(__name__)


# =============================================================================
# TYPES
# =============================================================================

_GroupedSiblingExtracts = Tuple[DataSource, Sequence[ExtractMetadata]]

Expand All @@ -31,6 +38,11 @@ def __init__(self, extract_metadata: ExtractMetadata):
def execute(self, an_input: DataSource) -> RunExtractionResult:
# The extract should only be run against its parent data source.
assert self._extract_metadata.data_source == an_input

_LOGGER.info(
'Running extraction for extract metadata="%s".',
str(self._extract_metadata),
)
task_args: Any = an_input.get_extract_task_args()
extract: Any = self._extract_metadata.to_task().execute(task_args)
return self._extract_metadata, extract
Expand All @@ -52,6 +64,7 @@ class GroupSiblingExtracts(
def execute(
self, an_input: Sequence[ExtractMetadata]
) -> Sequence[_GroupedSiblingExtracts]:
_LOGGER.debug("Grouping extracts.")
# Sort the given extracts by their parent data source's id.
extracts: Sequence[ExtractMetadata] = sorted(
an_input, key=lambda _e: _e.data_source.id
Expand All @@ -75,6 +88,7 @@ class RunDataSourceExtracts(
def execute(
self, an_input: Sequence[_GroupedSiblingExtracts]
) -> Sequence[RunExtractionResult]:
_LOGGER.debug("Running extraction for all data sources.")
return tuple(
chain.from_iterable(
self.run_data_source_extracts(
Expand All @@ -88,6 +102,9 @@ def execute(
def run_data_source_extracts(
data_source: DataSource, extracts: Sequence[ExtractMetadata]
) -> Sequence[RunExtractionResult]:
_LOGGER.info(
'Running extraction for data source="%s"', str(data_source)
)
with data_source:
executor: ConcurrentExecutor[
DataSource, Sequence[RunExtractionResult]
Expand Down
171 changes: 171 additions & 0 deletions app/use_cases/upload_extracts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
from itertools import chain
from logging import getLogger
from typing import Any, Iterable, Sequence, Tuple, Type

import app
from app.core import (
DataSource,
DataSourceType,
ExtractMetadata,
Task,
Transport,
UploadChunk,
UploadMetadata,
)
from app.lib import ConcurrentExecutor

from .types import RunExtractionResult

# =============================================================================
# CONSTANTS
# =============================================================================

_LOGGER = getLogger(__name__)


# =============================================================================
# TYPES
# =============================================================================

_PostedUpload = Tuple[UploadMetadata, Any]

_PreparedUpload = Tuple[UploadMetadata, Sequence[bytes]]


# =============================================================================
# HELPER TASKS
# =============================================================================


class DoPostUpload(Task[Transport, _PostedUpload]):
def __init__(self, extract: RunExtractionResult):
self._extract: RunExtractionResult = extract

def execute(self, an_input: Transport) -> _PostedUpload:
_LOGGER.info(
'Posting upload metadata for extract metadata="%s".',
str(self._extract[0]),
)
extract_meta: ExtractMetadata = self._extract[0]
parent_ds: DataSource = extract_meta.data_source
parent_dst: DataSourceType = parent_ds.data_source_type
upload_meta_klass: Type[
UploadMetadata
] = parent_dst.imp_upload_metadata_klass()
content_type: str = upload_meta_klass.get_content_type()

upload_meta: UploadMetadata = an_input.post_upload_metadata(
extract_metadata=extract_meta,
content_type=content_type,
org_unit_code=app.settings.ORG_UNIT_CODE,
org_unit_name=app.settings.ORG_UNIT_NAME,
extra_init_kwargs=extract_meta.get_upload_meta_extra_init_kwargs(),
)
return upload_meta, self._extract[1]


class DoPostChunk(Task[Transport, UploadChunk]):
def __init__(
self, upload: UploadMetadata, chunk_index: int, chunk_content: bytes
):
self._upload: UploadMetadata = upload
self._chunk_index: int = chunk_index
self._chunk_content: bytes = chunk_content

def execute(self, an_input: Transport) -> UploadChunk:
_LOGGER.info(
'Posting upload chunks for upload metadata="%s".',
str(self._upload),
)
extra_init_kwargs = self._upload.get_upload_chunk_extra_init_kwargs()
chunk: UploadChunk = an_input.post_upload_chunk(
upload_metadata=self._upload,
chunk_index=self._chunk_index,
chunk_content=self._chunk_content,
extra_init_kwargs=extra_init_kwargs,
)
return chunk


# =============================================================================
# MAIN TASKS
# =============================================================================


class PostUploads(
Task[Sequence[RunExtractionResult], Sequence[_PostedUpload]]
):
def __init__(self, transport: Transport):
self._transport: Transport = transport

def execute(
self, an_input: Sequence[RunExtractionResult]
) -> Sequence[_PostedUpload]:
_LOGGER.info("Posting uploads.")
executor: ConcurrentExecutor[Transport, Sequence[_PostedUpload]]
executor = ConcurrentExecutor(
*self._extraction_results_to_tasks(an_input), initial_value=list()
)
uploads: Sequence[_PostedUpload]
uploads = executor(self._transport) # noqa
return uploads

@staticmethod
def _extraction_results_to_tasks(
extraction_results: Iterable[RunExtractionResult],
) -> Sequence[DoPostUpload]:
return tuple(DoPostUpload(_extract) for _extract in extraction_results)


class PrepareUploads(Task[Sequence[_PostedUpload], Sequence[_PreparedUpload]]):
def execute(
self, an_input: Sequence[_PostedUpload]
) -> Sequence[_PreparedUpload]:
_LOGGER.info("Preparing uploads.")
return tuple(
self._prepare_upload(_posted_upload) for _posted_upload in an_input
)

@staticmethod
def _prepare_upload(posted_upload: _PostedUpload) -> _PreparedUpload:
_LOGGER.info('Preparing upload metadata="%s".', str(posted_upload[0]))
upload: UploadMetadata = posted_upload[0]
extract_data: Any = posted_upload[1]
chunks: Sequence[bytes] = upload.to_task().execute(extract_data)
return upload, chunks


class PostUploadChunks(Task[Sequence[_PreparedUpload], Sequence[UploadChunk]]):
def __init__(self, transport: Transport):
self._transport: Transport = transport

def execute(
self, an_input: Sequence[_PreparedUpload]
) -> Sequence[UploadChunk]:
_LOGGER.info("Posting upload chunks.")
return tuple(
chain.from_iterable(
self._post_upload_chunks(
upload=_prepared_upload[0],
chunks=_prepared_upload[1],
transport=self._transport,
)
for _prepared_upload in an_input
)
)

@staticmethod
def _post_upload_chunks(
upload: UploadMetadata, chunks: Sequence[bytes], transport: Transport
) -> Sequence[UploadChunk]:
executor: ConcurrentExecutor[Transport, Sequence[UploadChunk]]
executor = ConcurrentExecutor(
*(
DoPostChunk(
upload=upload, chunk_index=_index, chunk_content=_chunk
)
for _index, _chunk in enumerate(chunks)
),
initial_value=list(),
)
return executor(transport) # noqa

0 comments on commit b74ffdc

Please sign in to comment.