diff --git a/.bumpversion.cfg b/.bumpversion.cfg index b520712..ec33c1d 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,3 +1,3 @@ [bumpversion] -current_version = 0.5.7 +current_version = 0.6.0 files = flowmancer/version.py pyproject.toml diff --git a/README.md b/README.md index 0e77722..37758b9 100644 --- a/README.md +++ b/README.md @@ -166,6 +166,19 @@ config: - internal_flowmancer_package ``` +### Changing Default File Logger Directory +The Job Definition accepts an optional `loggers` section, which if left empty will default to using a `FileLogger` with default settings. +To utilize the default `FileLogger`, but with a different configuration, explicitly provide the `loggers` block: +```yaml +loggers: + my-file-logger: + logger: FileLogger + parameters: + # NOTE: this path is relative to the `.py` file where `Flowmancer().start()` is invoked. + base_log_dir: ./my_custom_log_dir # ./logs is the default, if omitted. + retention_days: 3 # 10 is the default, if omitted. +``` + ### Complex Parameters While this is mostly used for `Task` implementations, the details outlined here apply for any built-in and custom `Extension` and `Logger` implementations. @@ -228,7 +241,103 @@ In addition to the required `run` method, an implementation of `Task` may option Just as with `run`, all lifecycle methods have access to `self.shared_dict` and any parameters. ### Custom Loggers -Coming soon. +Custom implementations of the `Logger` may be provided to Flowmancer to either replace OR write to in addition to the default `FileLogger`. + +A custom implementation must extend the `Logger` class, be decorated with the `logger` decorator, and implement the async `update` method at minimum: +```python +@logger +import json +import requests +from flowmancer.loggers.logger import Logger, logger + +class SlackMessageLogger(Logger): + webhook: str + + def _post_to_slack(self, msg: str) -> None: + requests.post( + self.webhook, + data=json.dumps({'text': title, 'attachments': [{'text': msg}]}), + headers={'Content-Type': 'application/json'}, + ) + + async def update(self, evt: SerializableLogEvent) -> None: + # The `LogStartEvent` and `LogEndEvent` events only have a `name` property. + if isinstance(evt, LogStartEvent): + self._post_to_slack(f'[{evt.name}] START: Logging is beginning') + elif isinstance(evt, LogEndEvent): + self._post_to_slack(f'[{evt.name}] END: Logging is ending') + # The `LogWriteEvent` additionally has `severity` and `message` properties. + elif isinstance(evt, LogWriteEvent): + self._post_to_slack(f'[{evt.name}] {evt.severity.value}: {evt.message}') +``` + +The `Logger` implementation may also have the following optional `async` lifecycle methods: +* `on_create` +* `on_restart` +* `on_success` +* `on_failure` +* `on_destroy` +* `on_abort` + +To incorporate your custom `Logger` into Flowmancer, ensure that it exists in a module either in `./loggers` or in a module listed in `config.extension_directories` in the Job Definition. + +This allows it to be provided in the `loggers` section of the Job Definition. +> :warning: Providing the `loggers` section will remove the default logger (`FileLogger`) from your job's configuration. +> If you want to add your custom logger alongside the default logger, the `FileLogger` must explicitly be configured. + +```yaml +loggers: + # Load the default logger with default parameters + default-logger: + logger: FileLogger + + # Custom logger implementation + slack-logger: + logger: SlackMessageLogger + parameters: + webhook: https://some.webhook.url +``` ### Custom Extensions Coming soon. + +### Custom Checkpointers +Custom implementations of the `Checkpointer` may be provided to Flowmancer to replace the default `FileCheckpointer`. +> :warning: Unlike loggers and extensions, only one checkpointer can be configured per Job Definition. + +A custom implementation must extend the `Checkpointer` class, be decorated with the `checkpointer` decorator, and implement the async `update` method at minimum: +```python +from .checkpointer import CheckpointContents, Checkpointer, NoCheckpointAvailableError, checkpointer + +@checkpointer +class DatabaseCheckpointer(Checkpointer): + host: str + port: int + username: str + password: str + + def write_checkpoint(self, name: str, content: CheckpointContents) -> None: + # Store checkpoint state - must be able to store contents of + # `CheckpointContents` in a way that it can be reconstructed later. + + def read_checkpoint(self, name: str) -> CheckpointContents: + # Recall checkpoint state - reconstruct and return `CheckpointContents` + # if exists for `name`. Otherwise raise `NoCheckpointAvailableError` + # to indicate no valid checkpoint exists to restart from. + + def clear_checkpoint(self, name: str) -> None: + # Remove checkpoint state for `name`. +``` + +To incorporate your custom `Checkpointer` into Flowmancer, ensure that it exists in a module either in `./extensions` or in a module listed in `config.extension_directories` in the Job Definition. + +This allows it to be provided in the `checkpointer_config` section of the Job Definition: +```yaml +checkpointer_config: + checkpointer: DatabaseCheckpointer + parameters: + host: something + port: 9999 + username: user + password: 1234 +``` diff --git a/flowmancer/__init__.py b/flowmancer/__init__.py index 6936872..f99648c 100644 --- a/flowmancer/__init__.py +++ b/flowmancer/__init__.py @@ -1,3 +1,3 @@ -from . import checkpoint, extensions, loggers +from . import checkpointer, extensions, loggers from .flowmancer import Flowmancer from .jobdefinition import file diff --git a/flowmancer/checkpoint/__init__.py b/flowmancer/checkpoint/__init__.py deleted file mode 100644 index a6bc20a..0000000 --- a/flowmancer/checkpoint/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -# noqa: F401 -# Ensure implementations are registered -from . import file -from .checkpoint import Checkpoint, CheckpointContents, NoCheckpointAvailableError, checkpoint - -__all__ = ['Checkpoint', 'CheckpointContents', 'NoCheckpointAvailableError', 'checkpoint'] diff --git a/flowmancer/checkpoint/file.py b/flowmancer/checkpoint/file.py deleted file mode 100644 index 5c85217..0000000 --- a/flowmancer/checkpoint/file.py +++ /dev/null @@ -1,38 +0,0 @@ -import os -import pickle -from pathlib import Path - -from .checkpoint import Checkpoint, CheckpointContents, NoCheckpointAvailableError, checkpoint - - -@checkpoint -class FileCheckpoint(Checkpoint): - checkpoint_name: str - checkpoint_dir: str = './.flowmancer' - - @property - def checkpoint_file_path(self) -> Path: - return Path(self.checkpoint_dir) / self.checkpoint_name - - def write_checkpoint(self, content: CheckpointContents) -> None: - cdir = Path(self.checkpoint_dir) - if not os.path.exists(cdir): - os.makedirs(cdir, exist_ok=True) - tmp = cdir / (self.checkpoint_name + '.tmp') - perm = self.checkpoint_file_path - with open(tmp, 'wb') as f: - pickle.dump(content, f) - if os.path.isfile(perm): - os.unlink(perm) - os.rename(tmp, perm) - - def read_checkpoint(self) -> CheckpointContents: - checkpoint_file = self.checkpoint_file_path - if not checkpoint_file.exists(): - raise NoCheckpointAvailableError(f'Checkpoint file does not exist: {self.checkpoint_file_path}') - return pickle.load(open(checkpoint_file, 'rb')) - - def clear_checkpoint(self) -> None: - cfile = self.checkpoint_file_path - if os.path.isfile(cfile): - os.unlink(cfile) diff --git a/flowmancer/checkpointer/__init__.py b/flowmancer/checkpointer/__init__.py new file mode 100644 index 0000000..959b79f --- /dev/null +++ b/flowmancer/checkpointer/__init__.py @@ -0,0 +1,6 @@ +# noqa: F401 +# Ensure implementations are registered +from . import file +from .checkpointer import CheckpointContents, Checkpointer, NoCheckpointAvailableError, checkpointer + +__all__ = ['Checkpointer', 'CheckpointContents', 'NoCheckpointAvailableError', 'checkpointer'] diff --git a/flowmancer/checkpoint/checkpoint.py b/flowmancer/checkpointer/checkpointer.py similarity index 56% rename from flowmancer/checkpoint/checkpoint.py rename to flowmancer/checkpointer/checkpointer.py index 9b869b6..f1a92f6 100644 --- a/flowmancer/checkpoint/checkpoint.py +++ b/flowmancer/checkpointer/checkpointer.py @@ -8,13 +8,13 @@ from ..executor import ExecutionStateMap -_checkpoint_classes = dict() +_checkpointer_classes = dict() -def checkpoint(t: type[Checkpoint]) -> Any: - if not issubclass(t, Checkpoint): - raise TypeError(f'Must extend `Checkpoint` type: {t.__name__}') - _checkpoint_classes[t.__name__] = t +def checkpointer(t: type[Checkpointer]) -> Any: + if not issubclass(t, Checkpointer): + raise TypeError(f'Must extend `Checkpointer` type: {t.__name__}') + _checkpointer_classes[t.__name__] = t return t @@ -29,19 +29,19 @@ class CheckpointContents: shared_dict: Dict[Any, Any] -class Checkpoint(ABC, BaseModel): +class Checkpointer(ABC, BaseModel): class Config: extra = Extra.forbid underscore_attrs_are_private = True @abstractmethod - def write_checkpoint(self, content: CheckpointContents) -> None: + def write_checkpoint(self, name: str, content: CheckpointContents) -> None: pass @abstractmethod - def read_checkpoint(self) -> CheckpointContents: + def read_checkpoint(self, name: str) -> CheckpointContents: pass @abstractmethod - def clear_checkpoint(self) -> None: + def clear_checkpoint(self, name: str) -> None: pass diff --git a/flowmancer/checkpointer/file.py b/flowmancer/checkpointer/file.py new file mode 100644 index 0000000..0ff8d9a --- /dev/null +++ b/flowmancer/checkpointer/file.py @@ -0,0 +1,33 @@ +import os +import pickle +from pathlib import Path + +from .checkpointer import CheckpointContents, Checkpointer, NoCheckpointAvailableError, checkpointer + + +@checkpointer +class FileCheckpointer(Checkpointer): + checkpoint_dir: str = './.flowmancer' + + def write_checkpoint(self, name: str, content: CheckpointContents) -> None: + cdir = Path(self.checkpoint_dir) + if not os.path.exists(cdir): + os.makedirs(cdir, exist_ok=True) + tmp = cdir / (name + '.tmp') + perm = Path(self.checkpoint_dir) / name + with open(tmp, 'wb') as f: + pickle.dump(content, f) + if os.path.isfile(perm): + os.unlink(perm) + os.rename(tmp, perm) + + def read_checkpoint(self, name: str) -> CheckpointContents: + checkpoint_file = Path(self.checkpoint_dir) / name + if not checkpoint_file.exists(): + raise NoCheckpointAvailableError(f'Checkpoint file does not exist: {checkpoint_file}') + return pickle.load(open(checkpoint_file, 'rb')) + + def clear_checkpoint(self, name: str) -> None: + cfile = Path(self.checkpoint_dir) / name + if os.path.isfile(cfile): + os.unlink(cfile) diff --git a/flowmancer/flowmancer.py b/flowmancer/flowmancer.py index cf46e20..b797eba 100644 --- a/flowmancer/flowmancer.py +++ b/flowmancer/flowmancer.py @@ -6,7 +6,6 @@ import inspect import os import pkgutil -import sys import time from argparse import ArgumentParser from collections import namedtuple @@ -16,8 +15,9 @@ from pydantic import BaseModel -from .checkpoint import CheckpointContents, NoCheckpointAvailableError -from .checkpoint.file import FileCheckpoint +from .checkpointer import CheckpointContents, Checkpointer, NoCheckpointAvailableError +from .checkpointer.checkpointer import _checkpointer_classes +from .checkpointer.file import FileCheckpointer from .eventbus import EventBus from .eventbus.execution import ExecutionState, ExecutionStateTransition, SerializableExecutionEvent from .eventbus.log import SerializableLogEvent @@ -43,6 +43,14 @@ class NoTasksLoadedError(Exception): pass +class ExtensionsDirectoryNotFoundError(Exception): + pass + + +class NotAPackageError(Exception): + pass + + # Need to explicitly manage loop in case multiple instances of Flowmancer are run. @contextlib.contextmanager def _create_loop(): @@ -53,15 +61,37 @@ def _create_loop(): loop.close() -def _load_extensions_path(path: str, add_to_path: bool = True, package_chain: List[str] = []): +def _load_extensions_path(path: str, package_chain: Optional[List[str]] = None): if not path.startswith('/'): - path = os.path.join(os.path.dirname(os.path.abspath(inspect.stack()[-1][1])), path) - if add_to_path: - sys.path.append(path) + path = os.path.abspath( + os.path.join( + os.path.dirname( + os.path.abspath(inspect.stack()[-1][1]) + ), + path + ) + ) + + if not os.path.exists(path): + raise ExtensionsDirectoryNotFoundError(f"No such directory: '{path}'") + if os.path.isfile(path): + raise NotAPackageError(f"Only packages (directories) are allowed. The following is not a dir: '{path}'") + if not os.path.exists(os.path.join(path, '__init__.py')): + print(f"WARNING: The '{path}' dir is not a package (no __init__.py file found). Modules will not be imported.") + + if not package_chain: + package_chain = [os.path.basename(path)] + for x in pkgutil.iter_modules(path=[path]): - importlib.import_module('.'.join(package_chain+[x.name])) + try: + print(f"importing: {'.'.join(package_chain+[x.name])}") + importlib.import_module('.'.join(package_chain+[x.name])) + except Exception as e: + print( + f"WARNING: Skipping import for '{'.'.join(package_chain+[x.name])}' due to {type(e).__name__}: {str(e)}" + ) if x.ispkg: - _load_extensions_path(os.path.join(path, x.name), False, package_chain+[x.name]) + _load_extensions_path(os.path.join(path, x.name), package_chain+[x.name]) class Flowmancer: @@ -77,16 +107,18 @@ def __init__(self, *, test: bool = False, debug: bool = False) -> None: self._states = ExecutionStateMap() self._registered_extensions: Dict[str, Extension] = dict() self._registered_loggers: Dict[str, Logger] = dict() + self._checkpointer_instance: Checkpointer = FileCheckpointer() self._checkpoint_interval_seconds = 10 self._tick_interval_seconds = 0.25 def start(self) -> int: orig_cwd = os.getcwd() try: - # Ensure any components, such as file loggers, work with respect to the caller's project dir. + # Ensure any components, such as file loggers, work with respect to the .py file in which the `start` + # command is invoked, which is usually the project root dir. os.chdir(os.path.dirname(os.path.abspath(inspect.stack()[-1][1]))) if not self._test: - self._process_cmd_args() + self._process_cmd_args(orig_cwd) if not self._executors: raise NoTasksLoadedError( 'No Tasks have been loaded! Please check that you have provided a valid Job Definition file.' @@ -110,7 +142,7 @@ async def _initiate(self) -> int: await asyncio.gather(*observer_tasks, *executor_tasks, *logger_tasks, checkpoint_task) return len(self._states[ExecutionState.FAILED]) + len(self._states[ExecutionState.DEFAULTED]) - def _process_cmd_args(self) -> None: + def _process_cmd_args(self, caller_cwd: str) -> None: parser = ArgumentParser(description='Flowmancer job execution options.') parser.add_argument('-j', '--jobdef', action='store', dest='jobdef') parser.add_argument('-r', '--restart', action='store_true', dest='restart', default=False) @@ -124,11 +156,12 @@ def _process_cmd_args(self) -> None: self._debug = args.debug if args.jobdef: - self.load_job_definition(args.jobdef) + jobdef_path = args.jobdef if args.jobdef.startswith('/') else os.path.join(caller_cwd, args.jobdef) + self.load_job_definition(jobdef_path) if args.restart: try: - cp = FileCheckpoint(checkpoint_name=self._config.name).read_checkpoint() + cp = self._checkpointer_instance.read_checkpoint(self._config.name) self._shared_dict.update(cp.shared_dict) for name in cp.states[ExecutionState.FAILED]: self._executors[name].instance.is_restart = True @@ -160,11 +193,12 @@ def _process_cmd_args(self) -> None: # ASYNC INITIALIZATIONS def _init_checkpointer(self, root_event) -> asyncio.Task: async def _write_checkpoint() -> None: - checkpoint = FileCheckpoint(checkpoint_name=self._config.name) + checkpointer = self._checkpointer_instance last_write = 0 while True: if (time.time() - last_write) >= self._checkpoint_interval_seconds: - checkpoint.write_checkpoint( + checkpointer.write_checkpoint( + self._config.name, CheckpointContents( name=self._config.name, states=self._states, @@ -177,7 +211,7 @@ async def _write_checkpoint() -> None: and not self._states[ExecutionState.DEFAULTED] and not self._states[ExecutionState.ABORTED] ): - checkpoint.clear_checkpoint() + checkpointer.clear_checkpoint(self._config.name) break else: await asyncio.sleep(self._tick_interval_seconds) @@ -313,8 +347,15 @@ def load_job_definition(self, j: Union[JobDefinition, str], filetype: str = 'yam # Recursively import any modules found in the following paths in order to trigger the registration of any # decorated classes. - search_paths = ['./tasks', './extensions', './loggers'] + jobdef.config.extension_directories - for p in search_paths: + for p in ['./tasks', './extensions', './loggers']: + try: + _load_extensions_path(p) + except ExtensionsDirectoryNotFoundError: + # Don't error on the absence of dirs that are searched by default. + pass + + # Allow for missing dir exceptions for passed-in paths. + for p in jobdef.config.extension_directories: _load_extensions_path(p) for p in jobdef.config.extension_packages: @@ -331,6 +372,11 @@ def load_job_definition(self, j: Union[JobDefinition, str], filetype: str = 'yam parameters=t.parameters ) + # Checkpointer + self._checkpointer_instance = _checkpointer_classes[jobdef.checkpointer_config.checkpointer]( + **jobdef.checkpointer_config.parameters + ) + # Observers for n, e in jobdef.extensions.items(): self._registered_extensions[n] = _extension_classes[e.extension](**e.parameters) diff --git a/flowmancer/jobdefinition/__init__.py b/flowmancer/jobdefinition/__init__.py index 6482e98..c7cafeb 100644 --- a/flowmancer/jobdefinition/__init__.py +++ b/flowmancer/jobdefinition/__init__.py @@ -1,7 +1,7 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import Callable, Dict, List, Type, Union +from typing import Any, Callable, Dict, List, Type from pydantic import BaseModel, Extra @@ -26,7 +26,7 @@ class Config: class LoggerDefinition(JobDefinitionComponent): logger: str - parameters: Dict[str, Union[int, str]] = dict() + parameters: Dict[str, Any] = dict() class TaskDefinition(JobDefinitionComponent): @@ -34,12 +34,12 @@ class TaskDefinition(JobDefinitionComponent): dependencies: List[str] = [] max_attempts: int = 1 backoff: int = 0 - parameters: Dict[str, Union[int, str]] = dict() + parameters: Dict[str, Any] = dict() class ExtensionDefinition(JobDefinitionComponent): extension: str - parameters: Dict[str, Union[int, str]] = dict() + parameters: Dict[str, Any] = dict() class Configuration(JobDefinitionComponent): @@ -49,12 +49,18 @@ class Configuration(JobDefinitionComponent): extension_packages: List[str] = [] +class CheckpointerDefinition(JobDefinitionComponent): + checkpointer: str + parameters: Dict[str, Any] = dict() + + class JobDefinition(JobDefinitionComponent): version: float = 0.1 config: Configuration = Configuration() tasks: Dict[str, TaskDefinition] loggers: Dict[str, LoggerDefinition] = {'file-logger': LoggerDefinition(logger='FileLogger')} extensions: Dict[str, ExtensionDefinition] = {'progress-bar': ExtensionDefinition(extension='RichProgressBar')} + checkpointer_config: CheckpointerDefinition = CheckpointerDefinition(checkpointer='FileCheckpointer') class SerializableJobDefinition(ABC): diff --git a/flowmancer/version.py b/flowmancer/version.py index 1cc82e6..906d362 100644 --- a/flowmancer/version.py +++ b/flowmancer/version.py @@ -1 +1 @@ -__version__ = "0.5.7" +__version__ = "0.6.0" diff --git a/pyproject.toml b/pyproject.toml index 37fa8a7..9ab4dc3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "flowmancer" -version = "0.5.7" +version = "0.6.0" description = "The Python Thing-Doer" authors = ["Nathan Lee "] license = "MIT"