Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[resotocore][feat] Outer edge collection support - Part 1/4 #877

Merged
merged 7 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions resotocore/resotocore/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from resotocore.cli.model import CLIDependencies
from resotocore.config.config_handler_service import ConfigHandlerService
from resotocore.config.core_config_handler import CoreConfigHandler
from resotocore.merge_outer_edge.merge_outer_edge_handler import MergeOuterEdgesHandler
from resotocore.core_config import config_from_db, CoreConfig, RunConfig
from resotocore.db import SystemData
from resotocore.db.db_access import DbAccess
Expand Down Expand Up @@ -151,6 +152,7 @@ def with_config(
db.running_task_db, db.job_db, message_bus, event_sender, subscriptions, scheduler, cli, config
)
core_config_handler = CoreConfigHandler(config, message_bus, worker_task_queue, config_handler)
merge_outer_edges_handler = MergeOuterEdgesHandler(message_bus, subscriptions, task_handler)
cli_deps.extend(task_handler=task_handler)
api = Api(
db,
Expand Down Expand Up @@ -182,6 +184,7 @@ async def on_start() -> None:
await cli.start()
await task_handler.start()
await core_config_handler.start()
await merge_outer_edges_handler.start()
await cert_handler.start()
await api.start()
if created:
Expand All @@ -205,6 +208,7 @@ async def on_stop() -> None:
await api.stop()
await cert_handler.stop()
await core_config_handler.stop()
await merge_outer_edges_handler.stop()
await task_handler.stop()
await cli.stop()
await event_sender.core_event(CoreEvent.SystemStopped, total_seconds=int(duration.total_seconds()))
Expand Down
5 changes: 3 additions & 2 deletions resotocore/resotocore/cli/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
respond_cytoscape,
)
from resotocore.worker_task_queue import WorkerTask, WorkerTaskName
from resotocore.task.task_description import TaskDescriptorId

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -2578,7 +2579,7 @@ async def delete_job(job_id: str) -> AsyncIterator[str]:
yield f"Job {job_id} deleted." if job else f"No job with this id: {job_id}"

async def run_job(job_id: str) -> AsyncIterator[str]:
task = await self.dependencies.task_handler.start_task_by_descriptor_id(job_id)
task = await self.dependencies.task_handler.start_task_by_descriptor_id(TaskDescriptorId(job_id))
yield f"Job {task.descriptor.id} started with id {task.id}." if task else f"No job with this id: {job_id}"

async def activate_deactivate_job(job_id: str, active: bool) -> AsyncIterator[JsonElement]:
Expand Down Expand Up @@ -3642,7 +3643,7 @@ async def show_workflow(wf_id: str) -> AsyncIterator[JsonElement]:
yield f"No workflow with this id: {wf_id}"

async def run_workflow(wf_id: str) -> AsyncIterator[str]:
task = await self.dependencies.task_handler.start_task_by_descriptor_id(wf_id)
task = await self.dependencies.task_handler.start_task_by_descriptor_id(TaskDescriptorId(wf_id))
yield (
f"Workflow {task.descriptor.id} started with id {task.id}."
if task
Expand Down
4 changes: 2 additions & 2 deletions resotocore/resotocore/db/runningtaskdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from resotocore.db.entitydb import EntityDb, ArangoEntityDb
from resotocore.message_bus import Message
from resotocore.model.typed_model import to_js
from resotocore.task.task_description import RunningTask
from resotocore.task.task_description import RunningTask, TaskDescriptorId
from resotocore.types import Json
from resotocore.util import utc

Expand All @@ -22,7 +22,7 @@ class RunningTaskData:
# id of the related task
id: str
# id of the related task descriptor
task_descriptor_id: str
task_descriptor_id: TaskDescriptorId
# name of the related task descriptor
task_descriptor_name: str
# all messages that have been received by this task
Expand Down
66 changes: 66 additions & 0 deletions resotocore/resotocore/merge_outer_edge/merge_outer_edge_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from resotocore.message_bus import MessageBus, ActionDone, Action
import logging
import asyncio
from asyncio import Task, Future
from typing import Optional
from contextlib import suppress
from datetime import timedelta
from resotocore.task.model import Subscriber
from resotocore.task.task_handler import TaskHandlerService

from resotocore.task.subscribers import SubscriptionHandler

log = logging.getLogger(__name__)

subscriber_id = "resotocore.merge_outer_edges"
meln1k marked this conversation as resolved.
Show resolved Hide resolved
merge_outer_edges = "merge_outer_edges"


class MergeOuterEdgesHandler:
meln1k marked this conversation as resolved.
Show resolved Hide resolved
def __init__(
self,
message_bus: MessageBus,
subscription_handler: SubscriptionHandler,
task_handler_service: TaskHandlerService,
):
self.message_bus = message_bus
self.merge_outer_edges_listener: Optional[Task[None]] = None
self.subscription_handler = subscription_handler
self.subscriber: Optional[Subscriber] = None
self.task_handler_service = task_handler_service

def merge_outer_edges(self, task_id: str) -> None:
meln1k marked this conversation as resolved.
Show resolved Hide resolved
log.info(f"MergeOuterEdgesHandler: Noop outer edge merge for task_id: {task_id}")

async def __handle_events(self, subscription_done: Future[None]) -> None:
async with self.message_bus.subscribe(subscriber_id, [merge_outer_edges]) as events:
subscription_done.set_result(None)
while True:
event = await events.get()
if isinstance(event, Action) and event.message_type == merge_outer_edges:
self.merge_outer_edges(event.task_id)
await self.task_handler_service.handle_action_done(
ActionDone(
merge_outer_edges,
event.task_id,
merge_outer_edges,
self.subscriber.id if self.subscriber else subscriber_id,
)
meln1k marked this conversation as resolved.
Show resolved Hide resolved
)

async def start(self) -> None:
subscription_done = asyncio.get_event_loop().create_future()
self.subscriber = await self.subscription_handler.add_subscription(
subscriber_id, merge_outer_edges, True, timedelta(seconds=30)
)
self.merge_outer_edges_listener = asyncio.create_task(
self.__handle_events(subscription_done), name=subscriber_id
)
await subscription_done

async def stop(self) -> None:
if self.merge_outer_edges_listener:
with suppress(Exception):
self.merge_outer_edges_listener.cancel()
if self.subscriber:
await self.subscription_handler.remove_subscription(subscriber_id, merge_outer_edges)
4 changes: 2 additions & 2 deletions resotocore/resotocore/task/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import Optional, List

from resotocore.task.task_description import Job, RunningTask, Workflow
from resotocore.task.task_description import Job, RunningTask, Workflow, TaskDescriptorId


class TaskHandler(ABC):
Expand All @@ -22,7 +22,7 @@ async def delete_job(self, job_id: str) -> Optional[Job]:
pass

@abstractmethod
async def start_task_by_descriptor_id(self, uid: str) -> Optional[RunningTask]:
async def start_task_by_descriptor_id(self, uid: TaskDescriptorId) -> Optional[RunningTask]:
pass

@abstractmethod
Expand Down
22 changes: 12 additions & 10 deletions resotocore/resotocore/task/task_description.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from abc import ABC
from datetime import timedelta
from enum import Enum
from typing import Optional, Any, Sequence, MutableSequence, Callable, Dict, List, Set, Tuple
from typing import Optional, Any, Sequence, MutableSequence, Callable, Dict, List, Set, Tuple, NewType

from dataclasses import dataclass

Expand All @@ -24,6 +24,8 @@

log = logging.getLogger(__name__)

TaskDescriptorId = NewType("TaskDescriptorId", str)


class StepErrorBehaviour(Enum):
"""
Expand Down Expand Up @@ -142,7 +144,7 @@ class ExecuteOnCLI(TaskCommand):

# endregion

# region Trigger: when an task should be triggered
# region Trigger: when a task should be triggered
class Trigger(ABC):
def __eq__(self, other: object) -> bool:
return self.__dict__ == other.__dict__ if isinstance(other, Trigger) else False
Expand Down Expand Up @@ -199,7 +201,7 @@ def __eq__(self, other: object) -> bool:
class TaskDescription(ABC):
def __init__(
self,
uid: str,
uid: TaskDescriptorId,
name: str,
steps: Sequence[Step],
triggers: Sequence[Trigger],
Expand All @@ -225,7 +227,7 @@ def __eq__(self, other: object) -> bool:
class Job(TaskDescription):
def __init__(
self,
uid: str,
uid: TaskDescriptorId,
command: ExecuteCommand,
timeout: timedelta,
trigger: Optional[Trigger] = None,
Expand All @@ -234,7 +236,7 @@ def __init__(
mutable: bool = True,
active: bool = True,
):
steps = []
steps: List[Step] = []
if wait:
wait_trigger, wait_timeout = wait
action = WaitForEvent(wait_trigger.message_type, wait_trigger.filter_data)
Expand Down Expand Up @@ -289,7 +291,7 @@ class Workflow(TaskDescription):

def __init__(
self,
uid: str,
uid: TaskDescriptorId,
name: str,
steps: Sequence[Step],
triggers: Sequence[Trigger],
Expand Down Expand Up @@ -433,7 +435,7 @@ class PerformActionState(StepState):
def __init__(self, perform: PerformAction, step: Step, instance: RunningTask):
super().__init__(step, instance)
self.perform = perform
self.wait_for = self.instance.subscribers_by_event().get(perform.message_type, [])
self.wait_for: List[Subscriber] = self.instance.subscribers_by_event().get(perform.message_type, [])

def current_step_done(self) -> bool:
"""
Expand Down Expand Up @@ -585,9 +587,9 @@ def empty(
) -> Tuple[RunningTask, Sequence[TaskCommand]]:
assert len(descriptor.steps) > 0, "TaskDescription needs at least one step!"
uid = str(uuid.uuid1())
wi = RunningTask(uid, descriptor, subscriber_by_event)
messages = [SendMessage(Event("task_started", data={"task": descriptor.name})), *wi.move_to_next_state()]
return wi, messages
task = RunningTask(uid, descriptor, subscriber_by_event)
messages = [SendMessage(Event("task_started", data={"task": descriptor.name})), *task.move_to_next_state()]
return task, messages

def __init__(
self, uid: str, descriptor: TaskDescription, subscribers_by_event: Callable[[], Dict[str, List[Subscriber]]]
Expand Down
26 changes: 14 additions & 12 deletions resotocore/resotocore/task/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
StepErrorBehaviour,
RestartAgainStepAction,
Trigger,
TaskDescriptorId,
)
from resotocore.util import first, Periodic, group_by, utc_str, utc

Expand Down Expand Up @@ -125,12 +126,12 @@ def evaluate(step: Step) -> Step:

async def start_task_directly(self, desc: TaskDescription, reason: str) -> RunningTask:
updated = self.evaluate_task_definition(desc)
wi, commands = RunningTask.empty(updated, self.subscription_handler.subscribers_by_event)
log.info(f"Start new task: {updated.name} with id {wi.id}")
task, commands = RunningTask.empty(updated, self.subscription_handler.subscribers_by_event)
log.info(f"Start new task: {updated.name} with id {task.id}")
# store initial state in database
await self.running_task_db.insert(wi)
self.tasks[wi.id] = wi
await self.execute_task_commands(wi, commands)
await self.running_task_db.insert(task)
self.tasks[task.id] = task
await self.execute_task_commands(task, commands)
await self.event_sender.core_event(
CoreEvent.TaskStarted,
{
Expand All @@ -140,7 +141,7 @@ async def start_task_directly(self, desc: TaskDescription, reason: str) -> Runni
"kind": type(updated).__name__,
},
)
return wi
return task

async def start_task(self, desc: TaskDescription, reason: str) -> Optional[RunningTask]:
existing = first(lambda x: x.descriptor.id == desc.id and x.is_active, self.tasks.values())
Expand Down Expand Up @@ -284,7 +285,7 @@ async def stop(self) -> None:
async def running_tasks(self) -> List[RunningTask]:
return list(self.tasks.values())

async def start_task_by_descriptor_id(self, uid: str) -> Optional[RunningTask]:
async def start_task_by_descriptor_id(self, uid: TaskDescriptorId) -> Optional[RunningTask]:
td = first(lambda t: t.id == uid, self.task_descriptions)
if td:
return await self.start_task(td, "direct")
Expand Down Expand Up @@ -504,7 +505,7 @@ def known_jobs() -> List[Job]:

@staticmethod
def known_workflows(config: CoreConfig) -> List[Workflow]:
def workflow(name: str, steps: List[Step]) -> Workflow:
def workflow(name: TaskDescriptorId, steps: List[Step]) -> Workflow:
trigger: List[Trigger] = [EventTrigger(f"start_{name}_workflow")]
wf_config = config.workflows.get(name)
if wf_config:
Expand All @@ -514,6 +515,7 @@ def workflow(name: str, steps: List[Step]) -> Workflow:
collect_steps = [
Step("pre_collect", PerformAction("pre_collect"), timedelta(seconds=10)),
Step("collect", PerformAction("collect"), timedelta(seconds=10)),
Step("merge_outer_edges", PerformAction("merge_outer_edges"), timedelta(seconds=10)),
Step("post_collect", PerformAction("post_collect"), timedelta(seconds=10)),
]
cleanup_steps = [
Expand All @@ -530,10 +532,10 @@ def workflow(name: str, steps: List[Step]) -> Workflow:
Step("post_generate_metrics", PerformAction("post_generate_metrics"), timedelta(seconds=10)),
]
return [
workflow("collect", collect_steps + metrics_steps),
workflow("cleanup", cleanup_steps + metrics_steps),
workflow("metrics", metrics_steps),
workflow("collect_and_cleanup", collect_steps + cleanup_steps + metrics_steps),
workflow(TaskDescriptorId("collect"), collect_steps + metrics_steps),
workflow(TaskDescriptorId("cleanup"), cleanup_steps + metrics_steps),
workflow(TaskDescriptorId("metrics"), metrics_steps),
workflow(TaskDescriptorId("collect_and_cleanup"), collect_steps + cleanup_steps + metrics_steps),
]

# endregion
2 changes: 1 addition & 1 deletion resotocore/resotocore/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def exist(f: Callable[[Any], Any], iterable: Iterable[Any]) -> bool:

# we expect a callable that returns a truthy value.
# Due to limitations of lambda expressions we use Any here.
def first(f: Callable[[Any], Any], iterable: Iterable[AnyT]) -> Optional[AnyT]:
def first(f: Callable[[AnyT], Any], iterable: Iterable[AnyT]) -> Optional[AnyT]:
for a in iterable:
if f(a):
return a
Expand Down
6 changes: 3 additions & 3 deletions resotocore/tests/resotocore/db/jobdb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from resotocore.db.async_arangodb import AsyncArangoDB
from resotocore.db.entitydb import EventEntityDb
from resotocore.db.jobdb import JobDb, EventJobDb
from resotocore.task.task_description import Job, ExecuteCommand, EventTrigger
from resotocore.task.task_description import Job, ExecuteCommand, EventTrigger, TaskDescriptorId

# noinspection PyUnresolvedReferences
from tests.resotocore.analytics import event_sender
Expand All @@ -36,8 +36,8 @@ def event_db(job_db: JobDb, event_sender: InMemoryEventSender) -> EventJobDb:
def jobs() -> List[Job]:
wait = (EventTrigger("wait"), timedelta(seconds=30))
return [
Job("id1", ExecuteCommand("echo hello"), timedelta(seconds=10), EventTrigger("run_job")),
Job("id2", ExecuteCommand("sleep 10"), timedelta(seconds=10), EventTrigger("run_job"), wait),
Job(TaskDescriptorId("id1"), ExecuteCommand("echo hello"), timedelta(seconds=10), EventTrigger("run_job")),
Job(TaskDescriptorId("id2"), ExecuteCommand("sleep 10"), timedelta(seconds=10), EventTrigger("run_job"), wait),
]


Expand Down
7 changes: 5 additions & 2 deletions resotocore/tests/resotocore/db/runningtaskdb_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from resotocore.util import utc
from resotocore.task.model import Subscriber

from resotocore.task.task_description import RunningTask
from resotocore.task.task_description import RunningTask, TaskDescriptorId

# noinspection PyUnresolvedReferences
from tests.resotocore.task.task_description_test import workflow_instance, test_workflow
Expand All @@ -34,7 +34,10 @@ async def running_task_db(test_db: StandardDatabase) -> RunningTaskDb:
def instances() -> List[RunningTaskData]:
messages = [ActionDone(str(a), "test", "bla", "sf") for a in range(0, 10)]
state_data = {"test": 1}
return [RunningTaskData(str(a), str(a), "task_123", messages, "start", state_data, utc()) for a in range(0, 10)]
return [
RunningTaskData(str(a), TaskDescriptorId(str(a)), "task_123", messages, "start", state_data, utc())
for a in range(0, 10)
]


@pytest.mark.asyncio
Expand Down