Skip to content

Commit

Permalink
0.8 (#64)
Browse files Browse the repository at this point in the history
- Added new SQLiteCheckpointer.
- Exposed internal execution loop trigger intervals in the config section of the JobDefinition.
    - `synchro_interval_seconds`
    - `loggers_interval_seconds`
    - `extensions_interval_seconds`
    - `checkpointer_interval_seconds`
- Added basic Checkpoint validation to check checkpoint contents against the given JobDefinition.
- Renamed the `checkpointer_config` section to simply `checkpointer`.
  • Loading branch information
natsunlee committed Mar 2, 2024
1 parent 7a898f0 commit 97b8d89
Show file tree
Hide file tree
Showing 19 changed files with 309 additions and 150 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[bumpversion]
current_version = 0.7.2
current_version = 0.8.0
files = flowmancer/version.py pyproject.toml
8 changes: 6 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ In the `config` block of the Job Definition, the following optional parameters m
|max_concurrency|int|0|Maximum number tasks that can run in parallel. If 0 or less, then there is no limit.|
|extension_directories|List[str]|[]|List of paths, either absolute or relative to driver `.py` file, that contain any `@task`, `@logger`, or `@extension` decorated classes to make accessible to Flowmancer. The `./task`, `./extensions`, and `./loggers` directories are ALWAYS checked by default.|
|extension_packages|List[str]|[]|List of installed Python packages that contain `@task`, `@logger`, or `@extension` decorated classes to make accessible to Flowmancer.|
|synchro_interval_seconds|float|0.25|Core execution loop interval for waking and checking status of tasks and whether loggers/extensions/checkpointer should trigger.|
|loggers_interval_seconds|float|0.25|Interval in seconds to wait before emitting log messages to configured `Logger` instances.|
|extensions_interval_seconds|float|0.25|Interval in seconds to wait before emitting state change information to configured `Extension` instances.|
|checkpointer_interval_seconds|float|10.0|Interval in seconds to wait before writing checkpoint information to the configured `Checkpointer`.|

For example:
```yaml
Expand Down Expand Up @@ -411,9 +415,9 @@ class DatabaseCheckpointer(Checkpointer):

To incorporate your custom `Checkpointer` into Flowmancer, ensure that it exists in a module either in `./extensions` or in a module listed in `config.extension_directories` in the Job Definition.

This allows it to be provided in the `checkpointer_config` section of the Job Definition:
This allows it to be provided in the `checkpointer` section of the Job Definition:
```yaml
checkpointer_config:
checkpointer:
checkpointer: DatabaseCheckpointer
parameters:
host: something
Expand Down
2 changes: 1 addition & 1 deletion flowmancer/checkpointer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# noqa: F401
# Ensure implementations are registered
from . import file
from . import database, file
from .checkpointer import CheckpointContents, Checkpointer, NoCheckpointAvailableError, checkpointer

__all__ = ['Checkpointer', 'CheckpointContents', 'NoCheckpointAvailableError', 'checkpointer']
14 changes: 7 additions & 7 deletions flowmancer/checkpointer/checkpointer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict
from typing import Any, Dict, Set

from pydantic import BaseModel, Extra

from ..executor import ExecutionStateMap
from ..lifecycle import AsyncLifecycle

_checkpointer_classes = dict()

Expand All @@ -25,23 +25,23 @@ class NoCheckpointAvailableError(Exception):
@dataclass
class CheckpointContents:
name: str
states: ExecutionStateMap
states: Dict[str, Set[str]]
shared_dict: Dict[Any, Any]


class Checkpointer(ABC, BaseModel):
class Checkpointer(ABC, BaseModel, AsyncLifecycle):
class Config:
extra = Extra.forbid
underscore_attrs_are_private = True

@abstractmethod
def write_checkpoint(self, name: str, content: CheckpointContents) -> None:
async def write_checkpoint(self, name: str, content: CheckpointContents) -> None:
pass

@abstractmethod
def read_checkpoint(self, name: str) -> CheckpointContents:
async def read_checkpoint(self, name: str) -> CheckpointContents:
pass

@abstractmethod
def clear_checkpoint(self, name: str) -> None:
async def clear_checkpoint(self, name: str) -> None:
pass
75 changes: 75 additions & 0 deletions flowmancer/checkpointer/database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import pickle
import sqlite3
from contextlib import closing
from typing import Optional
from uuid import UUID, uuid4

from .checkpointer import CheckpointContents, Checkpointer, NoCheckpointAvailableError, checkpointer


@checkpointer
class SQLiteCheckpointer(Checkpointer):
class SQLiteCheckpointerState:
def __init__(self):
self.con: Optional[sqlite3.Connection] = None
self.uuid: UUID = uuid4()

_state: SQLiteCheckpointerState = SQLiteCheckpointerState()
checkpoint_database: str = './.flowmancer/checkpoint.db'

@property
def _con(self) -> sqlite3.Connection:
if not self._state.con:
self._state.con = sqlite3.connect(self.checkpoint_database)
self._state.con.row_factory = sqlite3.Row
return self._state.con

async def on_create(self) -> None:
self._con.execute('''
CREATE TABLE IF NOT EXISTS checkpoint (
id INTEGER PRIMARY KEY AUTOINCREMENT,
uuid TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
start_ts DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
end_ts DATETIME,
checkpoint_contents BLOB
)
''')

async def on_destroy(self) -> None:
if self._state.con:
self._state.con.close()

async def write_checkpoint(self, name: str, content: CheckpointContents) -> None:
dumped_content = sqlite3.Binary(pickle.dumps(content))
params = [str(self._state.uuid), name, dumped_content, dumped_content]
self._con.execute('''
INSERT INTO checkpoint (uuid, name, checkpoint_contents) VALUES (?, ?, ?)
ON CONFLICT(uuid) DO UPDATE SET checkpoint_contents = ?
''', params)
self._con.commit()

async def read_checkpoint(self, name: str) -> CheckpointContents:
with closing(self._con.cursor()) as cur:
# Check if the checkpoint table exists. In the event of first ever execution for the given DB, no such
# table will exist and therefore no checkpoints exist.
cur.execute("SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = 'checkpoint'")
row = cur.fetchone()
if not row:
raise NoCheckpointAvailableError(f'Checkpoint entry does not exist for: {name}')

cur.execute('SELECT * FROM checkpoint WHERE name = ? ORDER BY id DESC LIMIT 1', [name])
row = cur.fetchone()
if row and not row['end_ts']:
self._state.uuid = UUID(row['uuid'])
else:
raise NoCheckpointAvailableError(f'Checkpoint entry does not exist for: {name}')

return pickle.loads(row['checkpoint_contents'])

async def clear_checkpoint(self, _: str) -> None:
self._con.execute(
'UPDATE checkpoint SET end_ts = CURRENT_TIMESTAMP WHERE uuid = ?',
[str(self._state.uuid)]
)
self._con.commit()
6 changes: 3 additions & 3 deletions flowmancer/checkpointer/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
class FileCheckpointer(Checkpointer):
checkpoint_dir: str = './.flowmancer'

def write_checkpoint(self, name: str, content: CheckpointContents) -> None:
async def write_checkpoint(self, name: str, content: CheckpointContents) -> None:
cdir = Path(self.checkpoint_dir)
if not os.path.exists(cdir):
os.makedirs(cdir, exist_ok=True)
Expand All @@ -21,13 +21,13 @@ def write_checkpoint(self, name: str, content: CheckpointContents) -> None:
os.unlink(perm)
os.rename(tmp, perm)

def read_checkpoint(self, name: str) -> CheckpointContents:
async def read_checkpoint(self, name: str) -> CheckpointContents:
checkpoint_file = Path(self.checkpoint_dir) / name
if not checkpoint_file.exists():
raise NoCheckpointAvailableError(f'Checkpoint file does not exist: {checkpoint_file}')
return pickle.load(open(checkpoint_file, 'rb'))

def clear_checkpoint(self, name: str) -> None:
async def clear_checkpoint(self, name: str) -> None:
cfile = Path(self.checkpoint_dir) / name
if os.path.isfile(cfile):
os.unlink(cfile)
40 changes: 40 additions & 0 deletions flowmancer/eventbus/execution.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from __future__ import annotations

from enum import Enum
from typing import Dict, Set, Union

from . import SerializableEvent, serializable_event

Expand All @@ -20,6 +23,43 @@ class ExecutionState(Enum):
INIT = '_'


class ExecutionStateMap:
def __init__(self) -> None:
self.data: Dict[ExecutionState, Set[str]] = dict()

def __getitem__(self, k: Union[str, ExecutionState]) -> Set[str]:
es = ExecutionState(k)
if es not in self.data:
self.data[es] = set()
return self.data[ExecutionState(k)]

def __setitem__(self, k: Union[str, ExecutionState], v: Set[str]) -> None:
es = ExecutionState(k)
self.data[es] = v

def __str__(self):
return str(self.data)

def items(self):
return self.data.items()

def keys(self):
return self.data.keys()

def values(self):
return self.data.values()

@classmethod
def from_simple_dict(cls, data: Dict[str, Set[str]]) -> ExecutionStateMap:
m = ExecutionStateMap()
for k, v in data.items():
m[ExecutionState(k)] = v
return m

def to_simple_dict(self) -> Dict[str, Set[str]]:
return {k.value: v for k, v in self.data.items()}


@serializable_event
class ExecutionStateTransition(SerializableExecutionEvent):
name: str
Expand Down
14 changes: 14 additions & 0 deletions flowmancer/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
class NoTasksLoadedError(Exception):
pass


class ExtensionsDirectoryNotFoundError(Exception):
pass


class NotAPackageError(Exception):
pass


class CheckpointInvalidError(Exception):
pass
25 changes: 1 addition & 24 deletions flowmancer/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from multiprocessing import Process
from multiprocessing.managers import DictProxy
from multiprocessing.sharedctypes import Value
from typing import Any, AsyncIterator, Callable, Coroutine, Dict, Optional, Set, TextIO, Type, Union, cast
from typing import Any, AsyncIterator, Callable, Coroutine, Dict, Optional, TextIO, Type, Union, cast

from .eventbus import EventBus
from .eventbus.execution import ExecutionState, ExecutionStateTransition, SerializableExecutionEvent
Expand All @@ -21,29 +21,6 @@ async def _default_await_dependencies() -> bool:
return True


class ExecutionStateMap:
def __init__(self) -> None:
self.data: Dict[ExecutionState, Set[str]] = dict()

def __getitem__(self, k: Union[str, ExecutionState]) -> Set[str]:
es = ExecutionState(k)
if es not in self.data:
self.data[es] = set()
return self.data[ExecutionState(k)]

def __str__(self):
return str(self.data)

def items(self):
return self.data.items()

def keys(self):
return self.data.keys()

def values(self):
return self.data.values()


class ProcessResult:
def __init__(self) -> None:
self._retcode = Value('i', 0)
Expand Down
2 changes: 1 addition & 1 deletion flowmancer/extensions/notifications/email.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class EmailNotification(Notification):
sender_user: str
sender_host: str

def send_notification(self, title: str, msg: str) -> None:
async def send_notification(self, title: str, msg: str) -> None:
em = EmailMessage()
em['From'] = f'{self.sender_user}@{self.sender_host}'
em['Subject'] = title
Expand Down
10 changes: 5 additions & 5 deletions flowmancer/extensions/notifications/notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,26 @@ async def update(self, _: SerializableExecutionEvent) -> None:
pass

@abstractmethod
def send_notification(self, title: str, msg: str) -> None:
async def send_notification(self, title: str, msg: str) -> None:
pass

async def on_create(self) -> None:
self.send_notification(
await self.send_notification(
"Flowmancer Job Notification: STARTING", f"Job initiated at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)

async def on_success(self) -> None:
self.send_notification(
await self.send_notification(
"Flowmancer Job Notification: SUCCESS",
f"Job completed successfully at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
)

async def on_failure(self) -> None:
self.send_notification(
await self.send_notification(
"Flowmancer Job Notification: FAILURE", f"Job failed at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)

async def on_abort(self) -> None:
self.send_notification(
await self.send_notification(
"Flowmancer Job Notification: ABORTED", f"Job aborted at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
2 changes: 1 addition & 1 deletion flowmancer/extensions/notifications/pushover.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class PushoverNotification(Notification):
app_token: str
user_key: str

def send_notification(self, title: str, msg: str) -> None:
async def send_notification(self, title: str, msg: str) -> None:
headers = {'Content-type': 'application/x-www-form-urlencoded'}
data = {'token': self.app_token, 'user': self.user_key, 'title': title, 'message': msg}
requests.post('https://api.pushover.net/1/messages.json', headers=headers, data=data)
2 changes: 1 addition & 1 deletion flowmancer/extensions/notifications/slack.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
class SlackWebhookNotification(Notification):
webhook: str

def send_notification(self, title: str, msg: str) -> None:
async def send_notification(self, title: str, msg: str) -> None:
requests.post(
self.webhook,
data=json.dumps({'text': title, 'attachments': [{'text': msg}]}),
Expand Down
19 changes: 9 additions & 10 deletions flowmancer/extensions/progressbar.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,17 @@
from .extension import Extension, extension


class RichProgressBarState:
def __init__(self) -> None:
self.state_counts: Dict[ExecutionState, int]
self.start_time: float
self.progress: Progress
self.event: asyncio.Event
self.task: TaskID
self.update_task: asyncio.Task


@extension
class RichProgressBar(Extension):
class RichProgressBarState:
def __init__(self) -> None:
self.state_counts: Dict[ExecutionState, int]
self.start_time: float
self.progress: Progress
self.event: asyncio.Event
self.task: TaskID
self.update_task: asyncio.Task

_state: RichProgressBarState = RichProgressBarState()

def _update_pbar(self, advance: int = 0) -> None:
Expand Down
Loading

0 comments on commit 97b8d89

Please sign in to comment.