diff --git a/resotocore/resotocore/analytics/__init__.py b/resotocore/resotocore/analytics/__init__.py index 1ee89b4d4..01c57c16a 100644 --- a/resotocore/resotocore/analytics/__init__.py +++ b/resotocore/resotocore/analytics/__init__.py @@ -39,6 +39,7 @@ class CoreEvent: TaskCompleted = "task-handler.task-completed" ClientError = "error.client" ServerError = "error.server" + ActionError = "error.action" UsageMetricsTurnedOff = "usage-metrics.turned-off" BenchmarkPerformed = "report.benchmark" FirstUserCreated = "user.created.first" diff --git a/resotocore/resotocore/task/task_handler.py b/resotocore/resotocore/task/task_handler.py index a7c3f8194..67d39aaf0 100644 --- a/resotocore/resotocore/task/task_handler.py +++ b/resotocore/resotocore/task/task_handler.py @@ -427,17 +427,21 @@ async def handle_action_error(self, err: ActionError) -> None: if rt := self.tasks.get(err.task_id): await self.handle_action_result(err, lambda wi: wi.handle_error(err)) rt.info_messages.append(err) + name = rt.descriptor.name + await self.event_sender.core_event(CoreEvent.ActionError, {"workflow": name, "message": err.error}) await self.message_bus.emit_event( - CoreMessage.ErrorMessage, {"workflow": rt.descriptor.name, "task": rt.id, "message": err.error} + CoreMessage.ErrorMessage, {"workflow": name, "task": rt.id, "message": err.error} ) async def handle_action_info(self, info: ActionInfo) -> None: if rt := self.tasks.get(info.task_id): rt.handle_info(info) + name = rt.descriptor.name await self.running_task_db.update_state(rt, info) + await self.event_sender.core_event(CoreEvent.ActionError, {"workflow": name, "message": info.message}) if info.level == "error": await self.message_bus.emit_event( - CoreMessage.ErrorMessage, {"workflow": rt.descriptor.name, "task": rt.id, "message": info.message} + CoreMessage.ErrorMessage, {"workflow": name, "task": rt.id, "message": info.message} ) async def handle_action_progress(self, info: ActionProgress) -> None: diff --git a/resotocore/tests/resotocore/task/task_handler_test.py b/resotocore/tests/resotocore/task/task_handler_test.py index 8b4847366..5b3e59b65 100644 --- a/resotocore/tests/resotocore/task/task_handler_test.py +++ b/resotocore/tests/resotocore/task/task_handler_test.py @@ -1,3 +1,4 @@ +import asyncio import logging from datetime import timedelta from typing import List @@ -5,13 +6,14 @@ import pytest from pytest import LogCaptureFixture -from resotocore.analytics import AnalyticsEventSender +from resotocore.analytics import AnalyticsEventSender, InMemoryEventSender from resotocore.cli.cli import CLIService from resotocore.db.jobdb import JobDb from resotocore.db.runningtaskdb import RunningTaskDb from resotocore.dependencies import empty_config from resotocore.ids import SubscriberId, TaskDescriptorId -from resotocore.message_bus import MessageBus, Event, Message, ActionDone, Action +from resotocore.message_bus import MessageBus, Event, Message, ActionDone, Action, ActionInfo, ActionError +from resotocore.task import RunningTaskInfo from resotocore.task.scheduler import Scheduler from resotocore.task.subscribers import SubscriptionHandler from resotocore.task.task_description import ( @@ -149,6 +151,23 @@ async def test_handle_failing_task_command(task_handler: TaskHandlerService, cap assert "Command non_existing_command failed with error" in caplog.records[0].message +@pytest.mark.asyncio +async def test_handle_failing_actor( + task_handler: TaskHandlerService, test_workflow: Workflow, event_sender: InMemoryEventSender +) -> None: + sub = await task_handler.subscription_handler.add_subscription( + SubscriberId("sub_1"), "collect", True, timedelta(seconds=30) + ) + info = await task_handler.start_task(test_workflow, "test") + assert info is not None + await asyncio.sleep(0.1) + task = info.running_task + await task_handler.handle_action_info(ActionInfo("collect", task.id, "act", sub.id, "error", "wrong!")) + assert [e.kind for e in event_sender.events] == ["task-handler.task-started", "error.action"] + await task_handler.handle_action_error(ActionError("collect", task.id, "act", sub.id, "wrong!")) + assert [e.kind for e in event_sender.events] == ["task-handler.task-started", "error.action", "error.action"] + + @pytest.mark.asyncio async def test_default_workflow_triggers() -> None: workflows = {wf.name: wf for wf in TaskHandlerService.known_workflows(empty_config())}