-
Notifications
You must be signed in to change notification settings - Fork 298
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
changed line endings from \n\r to \n in entire project
- Loading branch information
Showing
13 changed files
with
3,055 additions
and
3,055 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,132 +1,132 @@ | ||
""" | ||
This module implements the core class hierarchy for implementing EO tasks. An EO task is any class the inherits | ||
from the abstract EOTask class. Each EO task has to implement the execute method; invoking __call__ on a EO task | ||
instance invokes the execute method. EO tasks are meant primarily to operate on EO patches (i.e. instances of EOPatch). | ||
EO task classes are generally lightweight (i.e. not too complicated), short, and do one thing well. For example, an | ||
EO task might take as input an EOPatch containing cloud mask and return as a result the cloud coverage for that mask. | ||
""" | ||
|
||
import sys | ||
import logging | ||
import datetime | ||
import inspect | ||
from collections import OrderedDict | ||
from abc import ABC, abstractmethod | ||
|
||
import attr | ||
|
||
from .utilities import FeatureParser | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class EOTask(ABC): | ||
|
||
"""Base class for EOTask.""" | ||
def __new__(cls, *args, **kwargs): | ||
"""Stores initialization parameters and the order to the instance attribute `init_args`.""" | ||
self = super().__new__(cls) | ||
|
||
init_args = OrderedDict() | ||
for arg, value in zip(inspect.getfullargspec(self.__init__).args[1: len(args) + 1], args): | ||
init_args[arg] = repr(value) | ||
for arg in inspect.getfullargspec(self.__init__).args[len(args) + 1:]: | ||
if arg in kwargs: | ||
init_args[arg] = repr(kwargs[arg]) | ||
|
||
self.private_task_config = _PrivateTaskConfig(init_args=init_args) | ||
|
||
return self | ||
|
||
def __mul__(self, other): | ||
"""Creates a composite task of this and passed task.""" | ||
return CompositeTask(other, self) | ||
|
||
def __call__(self, *eopatches, monitor=False, **kwargs): | ||
"""Executes the task.""" | ||
# if monitor: | ||
# return self.execute_and_monitor(*eopatches, **kwargs) | ||
|
||
return self._execute_handling(*eopatches, **kwargs) | ||
|
||
def execute_and_monitor(self, *eopatches, **kwargs): | ||
""" In the current version nothing additional happens in this method | ||
""" | ||
return self._execute_handling(*eopatches, **kwargs) | ||
|
||
def _execute_handling(self, *eopatches, **kwargs): | ||
""" Handles measuring execution time and error propagation | ||
""" | ||
self.private_task_config.start_time = datetime.datetime.now() | ||
|
||
caught_exception = None | ||
try: | ||
return_value = self.execute(*eopatches, **kwargs) | ||
except BaseException as exception: | ||
caught_exception = exception, sys.exc_info()[2] | ||
|
||
if caught_exception is not None: # Exception is not raised in except statement to prevent duplicated traceback | ||
exception, traceback = caught_exception | ||
raise type(exception)('During execution of task {}: {}'.format(self.__class__.__name__, | ||
exception)).with_traceback(traceback) | ||
|
||
self.private_task_config.end_time = datetime.datetime.now() | ||
return return_value | ||
|
||
@abstractmethod | ||
def execute(self, *eopatches, **kwargs): | ||
""" Implement execute function | ||
""" | ||
raise NotImplementedError | ||
|
||
@staticmethod | ||
def _parse_features(features, new_names=False, rename_function=None, default_feature_type=None, | ||
allowed_feature_types=None): | ||
""" See eolearn.core.utilities.FeatureParser class. | ||
""" | ||
return FeatureParser(features, new_names=new_names, rename_function=rename_function, | ||
default_feature_type=default_feature_type, allowed_feature_types=allowed_feature_types) | ||
|
||
|
||
@attr.s(cmp=False) | ||
class _PrivateTaskConfig: | ||
""" A container for general EOTask parameters required during EOWorkflow and EOExecution | ||
:param init_args: A dictionary of parameters and values used for EOTask initialization | ||
:type init_args: OrderedDict | ||
:param uuid: An unique hexadecimal identifier string a task gets in EOWorkflow | ||
:type uuid: str or None | ||
:param start_time: Time when task execution started | ||
:type start_time: datetime.datetime or None | ||
:param end_time: Time when task execution ended | ||
:type end_time: datetime.datetime or None | ||
""" | ||
init_args = attr.ib() | ||
uuid = attr.ib(default=None) | ||
start_time = attr.ib(default=None) | ||
end_time = attr.ib(default=None) | ||
|
||
def __add__(self, other): | ||
return _PrivateTaskConfig(init_args=OrderedDict(list(self.init_args.items()) + list(other.init_args.items()))) | ||
|
||
|
||
class CompositeTask(EOTask): | ||
"""Creates a task that is composite of two tasks. | ||
Note: Instead of directly using this task it might be more convenient to use `'*'` operation between tasks. | ||
Example: `composite_task = task1 * task2` | ||
:param eotask1: Task which will be executed first | ||
:type eotask1: EOTask | ||
:param eotask2: Task which will be executed on results of first task | ||
:type eotask2: EOTask | ||
""" | ||
def __init__(self, eotask1, eotask2): | ||
self.eotask1 = eotask1 | ||
self.eotask2 = eotask2 | ||
|
||
self.private_task_config = eotask1.private_task_config + eotask2.private_task_config | ||
|
||
def execute(self, *eopatches, **kwargs): | ||
return self.eotask2.execute(self.eotask1.execute(*eopatches, **kwargs)) | ||
""" | ||
This module implements the core class hierarchy for implementing EO tasks. An EO task is any class the inherits | ||
from the abstract EOTask class. Each EO task has to implement the execute method; invoking __call__ on a EO task | ||
instance invokes the execute method. EO tasks are meant primarily to operate on EO patches (i.e. instances of EOPatch). | ||
EO task classes are generally lightweight (i.e. not too complicated), short, and do one thing well. For example, an | ||
EO task might take as input an EOPatch containing cloud mask and return as a result the cloud coverage for that mask. | ||
""" | ||
|
||
import sys | ||
import logging | ||
import datetime | ||
import inspect | ||
from collections import OrderedDict | ||
from abc import ABC, abstractmethod | ||
|
||
import attr | ||
|
||
from .utilities import FeatureParser | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
class EOTask(ABC): | ||
|
||
"""Base class for EOTask.""" | ||
def __new__(cls, *args, **kwargs): | ||
"""Stores initialization parameters and the order to the instance attribute `init_args`.""" | ||
self = super().__new__(cls) | ||
|
||
init_args = OrderedDict() | ||
for arg, value in zip(inspect.getfullargspec(self.__init__).args[1: len(args) + 1], args): | ||
init_args[arg] = repr(value) | ||
for arg in inspect.getfullargspec(self.__init__).args[len(args) + 1:]: | ||
if arg in kwargs: | ||
init_args[arg] = repr(kwargs[arg]) | ||
|
||
self.private_task_config = _PrivateTaskConfig(init_args=init_args) | ||
|
||
return self | ||
|
||
def __mul__(self, other): | ||
"""Creates a composite task of this and passed task.""" | ||
return CompositeTask(other, self) | ||
|
||
def __call__(self, *eopatches, monitor=False, **kwargs): | ||
"""Executes the task.""" | ||
# if monitor: | ||
# return self.execute_and_monitor(*eopatches, **kwargs) | ||
|
||
return self._execute_handling(*eopatches, **kwargs) | ||
|
||
def execute_and_monitor(self, *eopatches, **kwargs): | ||
""" In the current version nothing additional happens in this method | ||
""" | ||
return self._execute_handling(*eopatches, **kwargs) | ||
|
||
def _execute_handling(self, *eopatches, **kwargs): | ||
""" Handles measuring execution time and error propagation | ||
""" | ||
self.private_task_config.start_time = datetime.datetime.now() | ||
|
||
caught_exception = None | ||
try: | ||
return_value = self.execute(*eopatches, **kwargs) | ||
except BaseException as exception: | ||
caught_exception = exception, sys.exc_info()[2] | ||
|
||
if caught_exception is not None: # Exception is not raised in except statement to prevent duplicated traceback | ||
exception, traceback = caught_exception | ||
raise type(exception)('During execution of task {}: {}'.format(self.__class__.__name__, | ||
exception)).with_traceback(traceback) | ||
|
||
self.private_task_config.end_time = datetime.datetime.now() | ||
return return_value | ||
|
||
@abstractmethod | ||
def execute(self, *eopatches, **kwargs): | ||
""" Implement execute function | ||
""" | ||
raise NotImplementedError | ||
|
||
@staticmethod | ||
def _parse_features(features, new_names=False, rename_function=None, default_feature_type=None, | ||
allowed_feature_types=None): | ||
""" See eolearn.core.utilities.FeatureParser class. | ||
""" | ||
return FeatureParser(features, new_names=new_names, rename_function=rename_function, | ||
default_feature_type=default_feature_type, allowed_feature_types=allowed_feature_types) | ||
|
||
|
||
@attr.s(cmp=False) | ||
class _PrivateTaskConfig: | ||
""" A container for general EOTask parameters required during EOWorkflow and EOExecution | ||
:param init_args: A dictionary of parameters and values used for EOTask initialization | ||
:type init_args: OrderedDict | ||
:param uuid: An unique hexadecimal identifier string a task gets in EOWorkflow | ||
:type uuid: str or None | ||
:param start_time: Time when task execution started | ||
:type start_time: datetime.datetime or None | ||
:param end_time: Time when task execution ended | ||
:type end_time: datetime.datetime or None | ||
""" | ||
init_args = attr.ib() | ||
uuid = attr.ib(default=None) | ||
start_time = attr.ib(default=None) | ||
end_time = attr.ib(default=None) | ||
|
||
def __add__(self, other): | ||
return _PrivateTaskConfig(init_args=OrderedDict(list(self.init_args.items()) + list(other.init_args.items()))) | ||
|
||
|
||
class CompositeTask(EOTask): | ||
"""Creates a task that is composite of two tasks. | ||
Note: Instead of directly using this task it might be more convenient to use `'*'` operation between tasks. | ||
Example: `composite_task = task1 * task2` | ||
:param eotask1: Task which will be executed first | ||
:type eotask1: EOTask | ||
:param eotask2: Task which will be executed on results of first task | ||
:type eotask2: EOTask | ||
""" | ||
def __init__(self, eotask1, eotask2): | ||
self.eotask1 = eotask1 | ||
self.eotask2 = eotask2 | ||
|
||
self.private_task_config = eotask1.private_task_config + eotask2.private_task_config | ||
|
||
def execute(self, *eopatches, **kwargs): | ||
return self.eotask2.execute(self.eotask1.execute(*eopatches, **kwargs)) |
Oops, something went wrong.