Skip to content

Commit

Permalink
0.6 (#47)
Browse files Browse the repository at this point in the history
**JobDefinition:**
* New checkpointer_config section to allow configuring or replacing the default Checkpointer implementation.
* Exception will be raised if extension_directories entry is not a valid path.
* Exception will be raised if extension_directories entry is not a directory.
* Warning will be displayed if extension_directories entry is not a valid package (is a dir, but is missing __init__.py).
* Warning will be displayed if importing a module found in any extension_directories, including default dirs, results in an exception. The offending module will be skipped and module discovery + import will continue.

**Misc:**
* Changed behavior of -j and --jobdef arg to interpret path based on CWD, rather than w/ respect to the .py file where Flowmancer().start() is invoked.
  • Loading branch information
natsunlee committed Jan 28, 2024
1 parent cda24bf commit 6fb74b0
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 81 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[bumpversion]
current_version = 0.5.7
current_version = 0.6.0
files = flowmancer/version.py pyproject.toml
111 changes: 110 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,19 @@ config:
- internal_flowmancer_package
```

### Changing Default File Logger Directory
The Job Definition accepts an optional `loggers` section, which if left empty will default to using a `FileLogger` with default settings.
To utilize the default `FileLogger`, but with a different configuration, explicitly provide the `loggers` block:
```yaml
loggers:
my-file-logger:
logger: FileLogger
parameters:
# NOTE: this path is relative to the `.py` file where `Flowmancer().start()` is invoked.
base_log_dir: ./my_custom_log_dir # ./logs is the default, if omitted.
retention_days: 3 # 10 is the default, if omitted.
```

### Complex Parameters
While this is mostly used for `Task` implementations, the details outlined here apply for any built-in and custom `Extension` and `Logger` implementations.

Expand Down Expand Up @@ -228,7 +241,103 @@ In addition to the required `run` method, an implementation of `Task` may option
Just as with `run`, all lifecycle methods have access to `self.shared_dict` and any parameters.

### Custom Loggers
Coming soon.
Custom implementations of the `Logger` may be provided to Flowmancer to either replace OR write to in addition to the default `FileLogger`.

A custom implementation must extend the `Logger` class, be decorated with the `logger` decorator, and implement the async `update` method at minimum:
```python
@logger
import json
import requests
from flowmancer.loggers.logger import Logger, logger

class SlackMessageLogger(Logger):
webhook: str

def _post_to_slack(self, msg: str) -> None:
requests.post(
self.webhook,
data=json.dumps({'text': title, 'attachments': [{'text': msg}]}),
headers={'Content-Type': 'application/json'},
)

async def update(self, evt: SerializableLogEvent) -> None:
# The `LogStartEvent` and `LogEndEvent` events only have a `name` property.
if isinstance(evt, LogStartEvent):
self._post_to_slack(f'[{evt.name}] START: Logging is beginning')
elif isinstance(evt, LogEndEvent):
self._post_to_slack(f'[{evt.name}] END: Logging is ending')
# The `LogWriteEvent` additionally has `severity` and `message` properties.
elif isinstance(evt, LogWriteEvent):
self._post_to_slack(f'[{evt.name}] {evt.severity.value}: {evt.message}')
```

The `Logger` implementation may also have the following optional `async` lifecycle methods:
* `on_create`
* `on_restart`
* `on_success`
* `on_failure`
* `on_destroy`
* `on_abort`

To incorporate your custom `Logger` into Flowmancer, ensure that it exists in a module either in `./loggers` or in a module listed in `config.extension_directories` in the Job Definition.

This allows it to be provided in the `loggers` section of the Job Definition.
> :warning: Providing the `loggers` section will remove the default logger (`FileLogger`) from your job's configuration.
> If you want to add your custom logger alongside the default logger, the `FileLogger` must explicitly be configured.
```yaml
loggers:
# Load the default logger with default parameters
default-logger:
logger: FileLogger

# Custom logger implementation
slack-logger:
logger: SlackMessageLogger
parameters:
webhook: https://some.webhook.url
```

### Custom Extensions
Coming soon.

### Custom Checkpointers
Custom implementations of the `Checkpointer` may be provided to Flowmancer to replace the default `FileCheckpointer`.
> :warning: Unlike loggers and extensions, only one checkpointer can be configured per Job Definition.
A custom implementation must extend the `Checkpointer` class, be decorated with the `checkpointer` decorator, and implement the async `update` method at minimum:
```python
from .checkpointer import CheckpointContents, Checkpointer, NoCheckpointAvailableError, checkpointer

@checkpointer
class DatabaseCheckpointer(Checkpointer):
host: str
port: int
username: str
password: str

def write_checkpoint(self, name: str, content: CheckpointContents) -> None:
# Store checkpoint state - must be able to store contents of
# `CheckpointContents` in a way that it can be reconstructed later.

def read_checkpoint(self, name: str) -> CheckpointContents:
# Recall checkpoint state - reconstruct and return `CheckpointContents`
# if exists for `name`. Otherwise raise `NoCheckpointAvailableError`
# to indicate no valid checkpoint exists to restart from.

def clear_checkpoint(self, name: str) -> None:
# Remove checkpoint state for `name`.
```

To incorporate your custom `Checkpointer` into Flowmancer, ensure that it exists in a module either in `./extensions` or in a module listed in `config.extension_directories` in the Job Definition.

This allows it to be provided in the `checkpointer_config` section of the Job Definition:
```yaml
checkpointer_config:
checkpointer: DatabaseCheckpointer
parameters:
host: something
port: 9999
username: user
password: 1234
```
2 changes: 1 addition & 1 deletion flowmancer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from . import checkpoint, extensions, loggers
from . import checkpointer, extensions, loggers
from .flowmancer import Flowmancer
from .jobdefinition import file
6 changes: 0 additions & 6 deletions flowmancer/checkpoint/__init__.py

This file was deleted.

38 changes: 0 additions & 38 deletions flowmancer/checkpoint/file.py

This file was deleted.

6 changes: 6 additions & 0 deletions flowmancer/checkpointer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# noqa: F401
# Ensure implementations are registered
from . import file
from .checkpointer import CheckpointContents, Checkpointer, NoCheckpointAvailableError, checkpointer

__all__ = ['Checkpointer', 'CheckpointContents', 'NoCheckpointAvailableError', 'checkpointer']
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

from ..executor import ExecutionStateMap

_checkpoint_classes = dict()
_checkpointer_classes = dict()


def checkpoint(t: type[Checkpoint]) -> Any:
if not issubclass(t, Checkpoint):
raise TypeError(f'Must extend `Checkpoint` type: {t.__name__}')
_checkpoint_classes[t.__name__] = t
def checkpointer(t: type[Checkpointer]) -> Any:
if not issubclass(t, Checkpointer):
raise TypeError(f'Must extend `Checkpointer` type: {t.__name__}')
_checkpointer_classes[t.__name__] = t
return t


Expand All @@ -29,19 +29,19 @@ class CheckpointContents:
shared_dict: Dict[Any, Any]


class Checkpoint(ABC, BaseModel):
class Checkpointer(ABC, BaseModel):
class Config:
extra = Extra.forbid
underscore_attrs_are_private = True

@abstractmethod
def write_checkpoint(self, content: CheckpointContents) -> None:
def write_checkpoint(self, name: str, content: CheckpointContents) -> None:
pass

@abstractmethod
def read_checkpoint(self) -> CheckpointContents:
def read_checkpoint(self, name: str) -> CheckpointContents:
pass

@abstractmethod
def clear_checkpoint(self) -> None:
def clear_checkpoint(self, name: str) -> None:
pass
33 changes: 33 additions & 0 deletions flowmancer/checkpointer/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os
import pickle
from pathlib import Path

from .checkpointer import CheckpointContents, Checkpointer, NoCheckpointAvailableError, checkpointer


@checkpointer
class FileCheckpointer(Checkpointer):
checkpoint_dir: str = './.flowmancer'

def write_checkpoint(self, name: str, content: CheckpointContents) -> None:
cdir = Path(self.checkpoint_dir)
if not os.path.exists(cdir):
os.makedirs(cdir, exist_ok=True)
tmp = cdir / (name + '.tmp')
perm = Path(self.checkpoint_dir) / name
with open(tmp, 'wb') as f:
pickle.dump(content, f)
if os.path.isfile(perm):
os.unlink(perm)
os.rename(tmp, perm)

def read_checkpoint(self, name: str) -> CheckpointContents:
checkpoint_file = Path(self.checkpoint_dir) / name
if not checkpoint_file.exists():
raise NoCheckpointAvailableError(f'Checkpoint file does not exist: {checkpoint_file}')
return pickle.load(open(checkpoint_file, 'rb'))

def clear_checkpoint(self, name: str) -> None:
cfile = Path(self.checkpoint_dir) / name
if os.path.isfile(cfile):
os.unlink(cfile)
Loading

0 comments on commit 6fb74b0

Please sign in to comment.