Skip to content

Commit

Permalink
[resotocore][feat] Send analytics event on action error (#1619)
Browse files Browse the repository at this point in the history
* [resotocore][feat] Send analytics event on action error

* add task handler test
  • Loading branch information
aquamatthias committed Jun 1, 2023
1 parent e2c3a39 commit a9cb6df
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 4 deletions.
1 change: 1 addition & 0 deletions resotocore/resotocore/analytics/__init__.py
Expand Up @@ -39,6 +39,7 @@ class CoreEvent:
TaskCompleted = "task-handler.task-completed"
ClientError = "error.client"
ServerError = "error.server"
ActionError = "error.action"
UsageMetricsTurnedOff = "usage-metrics.turned-off"
BenchmarkPerformed = "report.benchmark"
FirstUserCreated = "user.created.first"
Expand Down
8 changes: 6 additions & 2 deletions resotocore/resotocore/task/task_handler.py
Expand Up @@ -427,17 +427,21 @@ async def handle_action_error(self, err: ActionError) -> None:
if rt := self.tasks.get(err.task_id):
await self.handle_action_result(err, lambda wi: wi.handle_error(err))
rt.info_messages.append(err)
name = rt.descriptor.name
await self.event_sender.core_event(CoreEvent.ActionError, {"workflow": name, "message": err.error})
await self.message_bus.emit_event(
CoreMessage.ErrorMessage, {"workflow": rt.descriptor.name, "task": rt.id, "message": err.error}
CoreMessage.ErrorMessage, {"workflow": name, "task": rt.id, "message": err.error}
)

async def handle_action_info(self, info: ActionInfo) -> None:
if rt := self.tasks.get(info.task_id):
rt.handle_info(info)
name = rt.descriptor.name
await self.running_task_db.update_state(rt, info)
await self.event_sender.core_event(CoreEvent.ActionError, {"workflow": name, "message": info.message})
if info.level == "error":
await self.message_bus.emit_event(
CoreMessage.ErrorMessage, {"workflow": rt.descriptor.name, "task": rt.id, "message": info.message}
CoreMessage.ErrorMessage, {"workflow": name, "task": rt.id, "message": info.message}
)

async def handle_action_progress(self, info: ActionProgress) -> None:
Expand Down
23 changes: 21 additions & 2 deletions resotocore/tests/resotocore/task/task_handler_test.py
@@ -1,17 +1,19 @@
import asyncio
import logging
from datetime import timedelta
from typing import List

import pytest
from pytest import LogCaptureFixture

from resotocore.analytics import AnalyticsEventSender
from resotocore.analytics import AnalyticsEventSender, InMemoryEventSender
from resotocore.cli.cli import CLIService
from resotocore.db.jobdb import JobDb
from resotocore.db.runningtaskdb import RunningTaskDb
from resotocore.dependencies import empty_config
from resotocore.ids import SubscriberId, TaskDescriptorId
from resotocore.message_bus import MessageBus, Event, Message, ActionDone, Action
from resotocore.message_bus import MessageBus, Event, Message, ActionDone, Action, ActionInfo, ActionError
from resotocore.task import RunningTaskInfo
from resotocore.task.scheduler import Scheduler
from resotocore.task.subscribers import SubscriptionHandler
from resotocore.task.task_description import (
Expand Down Expand Up @@ -149,6 +151,23 @@ async def test_handle_failing_task_command(task_handler: TaskHandlerService, cap
assert "Command non_existing_command failed with error" in caplog.records[0].message


@pytest.mark.asyncio
async def test_handle_failing_actor(
task_handler: TaskHandlerService, test_workflow: Workflow, event_sender: InMemoryEventSender
) -> None:
sub = await task_handler.subscription_handler.add_subscription(
SubscriberId("sub_1"), "collect", True, timedelta(seconds=30)
)
info = await task_handler.start_task(test_workflow, "test")
assert info is not None
await asyncio.sleep(0.1)
task = info.running_task
await task_handler.handle_action_info(ActionInfo("collect", task.id, "act", sub.id, "error", "wrong!"))
assert [e.kind for e in event_sender.events] == ["task-handler.task-started", "error.action"]
await task_handler.handle_action_error(ActionError("collect", task.id, "act", sub.id, "wrong!"))
assert [e.kind for e in event_sender.events] == ["task-handler.task-started", "error.action", "error.action"]


@pytest.mark.asyncio
async def test_default_workflow_triggers() -> None:
workflows = {wf.name: wf for wf in TaskHandlerService.known_workflows(empty_config())}
Expand Down

0 comments on commit a9cb6df

Please sign in to comment.