Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.6 #47

Merged
merged 5 commits into from
Jan 28, 2024
Merged

0.6 #47

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[bumpversion]
current_version = 0.5.7
current_version = 0.6.0
files = flowmancer/version.py pyproject.toml
111 changes: 110 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,19 @@ config:
- internal_flowmancer_package
```

### Changing Default File Logger Directory
The Job Definition accepts an optional `loggers` section, which if left empty will default to using a `FileLogger` with default settings.
To utilize the default `FileLogger`, but with a different configuration, explicitly provide the `loggers` block:
```yaml
loggers:
my-file-logger:
logger: FileLogger
parameters:
# NOTE: this path is relative to the `.py` file where `Flowmancer().start()` is invoked.
base_log_dir: ./my_custom_log_dir # ./logs is the default, if omitted.
retention_days: 3 # 10 is the default, if omitted.
```

### Complex Parameters
While this is mostly used for `Task` implementations, the details outlined here apply for any built-in and custom `Extension` and `Logger` implementations.

Expand Down Expand Up @@ -228,7 +241,103 @@ In addition to the required `run` method, an implementation of `Task` may option
Just as with `run`, all lifecycle methods have access to `self.shared_dict` and any parameters.

### Custom Loggers
Coming soon.
Custom implementations of the `Logger` may be provided to Flowmancer to either replace OR write to in addition to the default `FileLogger`.

A custom implementation must extend the `Logger` class, be decorated with the `logger` decorator, and implement the async `update` method at minimum:
```python
@logger
import json
import requests
from flowmancer.loggers.logger import Logger, logger

class SlackMessageLogger(Logger):
webhook: str

def _post_to_slack(self, msg: str) -> None:
requests.post(
self.webhook,
data=json.dumps({'text': title, 'attachments': [{'text': msg}]}),
headers={'Content-Type': 'application/json'},
)

async def update(self, evt: SerializableLogEvent) -> None:
# The `LogStartEvent` and `LogEndEvent` events only have a `name` property.
if isinstance(evt, LogStartEvent):
self._post_to_slack(f'[{evt.name}] START: Logging is beginning')
elif isinstance(evt, LogEndEvent):
self._post_to_slack(f'[{evt.name}] END: Logging is ending')
# The `LogWriteEvent` additionally has `severity` and `message` properties.
elif isinstance(evt, LogWriteEvent):
self._post_to_slack(f'[{evt.name}] {evt.severity.value}: {evt.message}')
```

The `Logger` implementation may also have the following optional `async` lifecycle methods:
* `on_create`
* `on_restart`
* `on_success`
* `on_failure`
* `on_destroy`
* `on_abort`

To incorporate your custom `Logger` into Flowmancer, ensure that it exists in a module either in `./loggers` or in a module listed in `config.extension_directories` in the Job Definition.

This allows it to be provided in the `loggers` section of the Job Definition.
> :warning: Providing the `loggers` section will remove the default logger (`FileLogger`) from your job's configuration.
> If you want to add your custom logger alongside the default logger, the `FileLogger` must explicitly be configured.

```yaml
loggers:
# Load the default logger with default parameters
default-logger:
logger: FileLogger

# Custom logger implementation
slack-logger:
logger: SlackMessageLogger
parameters:
webhook: https://some.webhook.url
```

### Custom Extensions
Coming soon.

### Custom Checkpointers
Custom implementations of the `Checkpointer` may be provided to Flowmancer to replace the default `FileCheckpointer`.
> :warning: Unlike loggers and extensions, only one checkpointer can be configured per Job Definition.

A custom implementation must extend the `Checkpointer` class, be decorated with the `checkpointer` decorator, and implement the async `update` method at minimum:
```python
from .checkpointer import CheckpointContents, Checkpointer, NoCheckpointAvailableError, checkpointer

@checkpointer
class DatabaseCheckpointer(Checkpointer):
host: str
port: int
username: str
password: str

def write_checkpoint(self, name: str, content: CheckpointContents) -> None:
# Store checkpoint state - must be able to store contents of
# `CheckpointContents` in a way that it can be reconstructed later.

def read_checkpoint(self, name: str) -> CheckpointContents:
# Recall checkpoint state - reconstruct and return `CheckpointContents`
# if exists for `name`. Otherwise raise `NoCheckpointAvailableError`
# to indicate no valid checkpoint exists to restart from.

def clear_checkpoint(self, name: str) -> None:
# Remove checkpoint state for `name`.
```

To incorporate your custom `Checkpointer` into Flowmancer, ensure that it exists in a module either in `./extensions` or in a module listed in `config.extension_directories` in the Job Definition.

This allows it to be provided in the `checkpointer_config` section of the Job Definition:
```yaml
checkpointer_config:
checkpointer: DatabaseCheckpointer
parameters:
host: something
port: 9999
username: user
password: 1234
```
2 changes: 1 addition & 1 deletion flowmancer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from . import checkpoint, extensions, loggers
from . import checkpointer, extensions, loggers
from .flowmancer import Flowmancer
from .jobdefinition import file
6 changes: 0 additions & 6 deletions flowmancer/checkpoint/__init__.py

This file was deleted.

38 changes: 0 additions & 38 deletions flowmancer/checkpoint/file.py

This file was deleted.

6 changes: 6 additions & 0 deletions flowmancer/checkpointer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# noqa: F401
# Ensure implementations are registered
from . import file
from .checkpointer import CheckpointContents, Checkpointer, NoCheckpointAvailableError, checkpointer

__all__ = ['Checkpointer', 'CheckpointContents', 'NoCheckpointAvailableError', 'checkpointer']
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

from ..executor import ExecutionStateMap

_checkpoint_classes = dict()
_checkpointer_classes = dict()


def checkpoint(t: type[Checkpoint]) -> Any:
if not issubclass(t, Checkpoint):
raise TypeError(f'Must extend `Checkpoint` type: {t.__name__}')
_checkpoint_classes[t.__name__] = t
def checkpointer(t: type[Checkpointer]) -> Any:
if not issubclass(t, Checkpointer):
raise TypeError(f'Must extend `Checkpointer` type: {t.__name__}')
_checkpointer_classes[t.__name__] = t
return t


Expand All @@ -29,19 +29,19 @@ class CheckpointContents:
shared_dict: Dict[Any, Any]


class Checkpoint(ABC, BaseModel):
class Checkpointer(ABC, BaseModel):
class Config:
extra = Extra.forbid
underscore_attrs_are_private = True

@abstractmethod
def write_checkpoint(self, content: CheckpointContents) -> None:
def write_checkpoint(self, name: str, content: CheckpointContents) -> None:
pass

@abstractmethod
def read_checkpoint(self) -> CheckpointContents:
def read_checkpoint(self, name: str) -> CheckpointContents:
pass

@abstractmethod
def clear_checkpoint(self) -> None:
def clear_checkpoint(self, name: str) -> None:
pass
33 changes: 33 additions & 0 deletions flowmancer/checkpointer/file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import os
import pickle
from pathlib import Path

from .checkpointer import CheckpointContents, Checkpointer, NoCheckpointAvailableError, checkpointer


@checkpointer
class FileCheckpointer(Checkpointer):
checkpoint_dir: str = './.flowmancer'

def write_checkpoint(self, name: str, content: CheckpointContents) -> None:
cdir = Path(self.checkpoint_dir)
if not os.path.exists(cdir):
os.makedirs(cdir, exist_ok=True)
tmp = cdir / (name + '.tmp')
perm = Path(self.checkpoint_dir) / name
with open(tmp, 'wb') as f:
pickle.dump(content, f)
if os.path.isfile(perm):
os.unlink(perm)
os.rename(tmp, perm)

def read_checkpoint(self, name: str) -> CheckpointContents:
checkpoint_file = Path(self.checkpoint_dir) / name
if not checkpoint_file.exists():
raise NoCheckpointAvailableError(f'Checkpoint file does not exist: {checkpoint_file}')
return pickle.load(open(checkpoint_file, 'rb'))

def clear_checkpoint(self, name: str) -> None:
cfile = Path(self.checkpoint_dir) / name
if os.path.isfile(cfile):
os.unlink(cfile)
Loading