Skip to content

Commit

Permalink
feat: Add initial implementation of data extraction (#22)
Browse files Browse the repository at this point in the history
  • Loading branch information
kennedykori committed Jul 15, 2022
1 parent 637f922 commit 09c6664
Show file tree
Hide file tree
Showing 36 changed files with 1,315 additions and 299 deletions.
28 changes: 27 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,18 @@ env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

jobs:
build:
build:
runs-on: ubuntu-latest
services:
mysql:
image: mysql:5.6
env:
MYSQL_ALLOW_EMPTY_PASSWORD: yes
ports:
- 3306/tcp
# needed because the mysql container does not provide a healthcheck
options: --health-cmd="mysqladmin ping" --health-interval=10s --health-timeout=10s --health-retries=10

strategy:
matrix:
python-version: ["3.9", "3.10"]
Expand All @@ -33,6 +43,14 @@ jobs:
run: |
pytest
coveralls --service=github
env:
# use localhost for the host here because we are running the job on the VM.
# If we were running the job on in a container this would be mysql
MYSQL_TEST_DB_HOST: localhost
MYSQL_TEST_DB_PORT: ${{ job.services.mysql.ports[3306] }} # get randomly assigned published port
MYSQL_TEST_DB_NAME: mysql
MYSQL_TEST_DB_USERNAME: root
MYSQL_TEST_DB_PASSWORD: ""

- name: Coveralls Finished
uses: coverallsapp/github-action@master
Expand All @@ -42,3 +60,11 @@ jobs:

- name: Test with tox
run: tox -r
env:
# use localhost for the host here because we are running the job on the VM.
# If we were running the job on in a container this would be mysql
MYSQL_TEST_DB_HOST: localhost
MYSQL_TEST_DB_PORT: ${{ job.services.mysql.ports[3306] }} # get randomly assigned published port
MYSQL_TEST_DB_NAME: mysql
MYSQL_TEST_DB_USERNAME: root
MYSQL_TEST_DB_PASSWORD: ""
2 changes: 1 addition & 1 deletion app/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def _load_settings_initializers(
initializer_klass = import_string_as_klass(
_initializer_dotted_path, SettingInitializer
)
initializers.append(initializer_klass())
initializers.append(initializer_klass()) # type: ignore
except ImportError as exp:
raise ImproperlyConfiguredError(
message='"%s" does not seem to be a valid path.'
Expand Down
16 changes: 8 additions & 8 deletions app/__main__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from argparse import ArgumentParser
from typing import Any, Mapping, Optional
from typing import Any, Optional, Sequence

import app
from app.core import DataSourceType, Transport
from app.lib import Pipeline
from app.use_cases.main_pipeline import (
FetchMetadata,
ProcessExtracts,
RunExtraction,
UploadExtracts,
)
Expand Down Expand Up @@ -77,7 +76,7 @@ def argparse_factory(prog_name: str = __title__) -> ArgumentParser:

def main_pipeline_factory(
transport: Optional[Transport] = None,
) -> Pipeline[Mapping[str, DataSourceType], Any]:
) -> Pipeline[Sequence[DataSourceType], Any]:
"""A factory for the main application pipeline.
Returns a fully initialized pipeline ready for use. An optional
Expand All @@ -103,7 +102,6 @@ def main_pipeline_factory(
return Pipeline(
FetchMetadata(transport=_transport),
RunExtraction(),
ProcessExtracts(),
UploadExtracts(transport=_transport),
)

Expand All @@ -124,10 +122,12 @@ def main() -> None: # pragma: no cover
args = parser.parse_args()

app.setup(config_file_path=args.config)
main_pipeline: Pipeline[
Mapping[str, DataSourceType], Any
] = main_pipeline_factory()
main_pipeline.execute(app.registry.data_source_types)
transport_factory = app.registry.get_default_transport_factory_or_raise()
with transport_factory() as transport:
main_pipeline: Pipeline[
Sequence[DataSourceType], Any
] = main_pipeline_factory(transport=transport)
main_pipeline.execute(tuple(app.registry.data_source_types.values()))
print("Done ...")


Expand Down
9 changes: 8 additions & 1 deletion app/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@
DataSource,
DataSourceType,
ExtractMetadata,
IdentifiableDomainObject,
)
from .exceptions import (
IDRClientException,
TransportClosedError,
TransportError,
)
from .exceptions import IDRClientException, TransportError
from .mixins import InitFromMapping, ToMapping, ToTask
from .task import Task
from .transport import Transport, TransportOptions
Expand All @@ -15,11 +20,13 @@
"DataSourceType",
"ExtractMetadata",
"IDRClientException",
"IdentifiableDomainObject",
"InitFromMapping",
"Task",
"ToMapping",
"ToTask",
"Transport",
"TransportClosedError",
"TransportError",
"TransportOptions",
]
127 changes: 104 additions & 23 deletions app/core/domain.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@

from typing_inspect import is_optional_type

# from .mixins import ToTask
from .mixins import Disposable, InitFromMapping, ToTask

# =============================================================================
# TYPES
# =============================================================================


_ADO = TypeVar("_ADO", bound="AbstractDomainObject")
_DS = TypeVar("_DS", bound="DataSource", covariant=True)
_IN = TypeVar("_IN")
_RT = TypeVar("_RT")


# =============================================================================
Expand Down Expand Up @@ -113,7 +115,52 @@ def get_required_fields(cls) -> Sequence[str]:
return _get_required_fields_names(cls)


class DataSource(AbstractDomainObject, metaclass=ABCMeta):
class IdentifiableDomainObject(AbstractDomainObject, metaclass=ABCMeta):
"""Describes a domain object that has an id property."""

id: str


class ExtractMetadata(
Generic[_IN, _RT],
IdentifiableDomainObject,
InitFromMapping,
ToTask[_IN, _RT],
metaclass=ABCMeta,
):
"""
An interface that represents metadata describing the data to be extracted
from a :class:`DataSource`.
"""

name: str
description: Optional[str]
preferred_uploads_name: Optional[str]

def __str__(self) -> str:
return "%s::%s" % (self.id, self.name)

@classmethod
def of_mapping(cls, mapping: Mapping[str, Any]) -> "ExtractMetadata":
"""
Initialize and return an extract metadata instance from a mapping
describing it's state.
:param mapping: A mapping that describes the state of an extract
metadata.
:return: The initialized extract metadata instance.
"""
return cls(**mapping)


class DataSource(
IdentifiableDomainObject,
Disposable,
InitFromMapping,
Generic[_RT],
metaclass=ABCMeta,
):
"""An interface representing an entity that contains data of interest."""

id: str
Expand All @@ -122,7 +169,7 @@ class DataSource(AbstractDomainObject, metaclass=ABCMeta):

@property
@abstractmethod
def extract_metadata(self) -> Mapping[str, "ExtractMetadata"]:
def extract_metadata(self) -> Mapping[str, ExtractMetadata[_RT, Any]]:
"""
Return a readonly mapping of the extract metadata instances that
operate on this data source.
Expand All @@ -135,7 +182,7 @@ def extract_metadata(self) -> Mapping[str, "ExtractMetadata"]:
@extract_metadata.setter
@abstractmethod
def extract_metadata(
self, extract_metadata: Mapping[str, "ExtractMetadata"]
self, extract_metadata: Mapping[str, ExtractMetadata]
) -> None:
"""Set the extract metadata instances that belong to this data source.
Expand All @@ -150,11 +197,41 @@ def extract_metadata(
"""
...

def get_extract_task_args(self) -> _RT:
"""
Return an argument to be passed to an :class:`ExtractMetadata` task.
This method is called before performing extraction for each of the
extract metadata instances belonging to this data source.
:return: An argument to be passed to an extract metadata task.
"""
# TODO: Add a better API for this method.
...

def __str__(self) -> str:
return "%s::%s" % (self.id, self.name)

@classmethod
def of_mapping(cls, mapping: Mapping[str, Any]) -> "DataSource":
"""
Initialize and return a data source from a mapping describing it's
state.
:param mapping: A mapping that describes the state of a data source.
:return: The initialized data source instance.
"""
return cls(**mapping)


class UploadMetadata(
IdentifiableDomainObject, InitFromMapping, metaclass=ABCMeta
):
"""An interface that defines a data upload to an IDR Server."""


class DataSourceType(Generic[_DS], AbstractDomainObject, metaclass=ABCMeta):
class DataSourceType(AbstractDomainObject, metaclass=ABCMeta):
"""
An interface representing the different kinds of supported data sources.
"""
Expand All @@ -175,7 +252,7 @@ def code(self) -> str:

@property
@abstractmethod
def data_sources(self) -> Mapping[str, _DS]:
def data_sources(self) -> Mapping[str, DataSource]:
"""
Return a readonly mapping of all data sources that belong to this data
source type.
Expand All @@ -187,7 +264,7 @@ def data_sources(self) -> Mapping[str, _DS]:

@data_sources.setter
@abstractmethod
def data_sources(self, data_sources: Mapping[str, _DS]) -> None:
def data_sources(self, data_sources: Mapping[str, DataSource]) -> None:
"""Set the data sources that belong to this data source type.
.. note::
Expand All @@ -204,21 +281,25 @@ def data_sources(self, data_sources: Mapping[str, _DS]) -> None:
def __str__(self) -> str:
return "%s::%s" % (self.code, self.name)

@classmethod
@abstractmethod
def imp_data_source_klass(cls) -> Type[DataSource]:
"""
Return the :class:`DataSource` concrete implementation class for this
data source type.
class ExtractMetadata(
Generic[_DS],
AbstractDomainObject,
# ToTask,
metaclass=ABCMeta,
):
"""
Metadata describing the data to be extracted from a :class:`DataSource`.
"""
:return: The ``DataSource`` implementation for this data source type.
"""
...

id: str
name: str
description: Optional[str]
preferred_uploads_name: Optional[str]
@classmethod
@abstractmethod
def imp_extract_metadata_klass(cls) -> Type[ExtractMetadata]:
"""
Return the :class:`ExtractMetadata` concrete implementation class for
this dats source type.
def __str__(self) -> str:
return "%s::%s" % (self.id, self.name)
:return: The ``ExtractMetadata`` implementation for this data source
type.
"""
...
15 changes: 15 additions & 0 deletions app/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,18 @@ class TransportError(IDRClientException):
An exception indicating that some error occurred during transport of data
from the client to an IDR Server or vice versa.
"""


class TransportClosedError(TransportError):
"""
An exception indicating that an erroneous usage of a closed transport was
made.
"""

def __init__(self, message: Optional[str] = "Transport closed.", *args):
"""Initialize an ``TransportClosedError`` with the given parameters.
:param message: An optional error message.
:param args: args to pass to forward to the base exception.
"""
super().__init__(message, *args)
Loading

0 comments on commit 09c6664

Please sign in to comment.