/
process_manager.ex
67 lines (56 loc) · 1.76 KB
/
process_manager.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
defmodule OpentelemetryCommanded.ProcessManager do
@moduledoc false
require OpenTelemetry.Tracer
import OpentelemetryCommanded.Util
alias OpenTelemetry.Tracer
def setup do
:telemetry.attach(
{__MODULE__, :start},
[:commanded, :process_manager, :handle, :start],
&__MODULE__.handle_start/4,
[]
)
:telemetry.attach(
{__MODULE__, :stop},
[:commanded, :process_manager, :handle, :stop],
&__MODULE__.handle_stop/4,
[]
)
:telemetry.attach(
{__MODULE__, :exception},
[:commanded, :process_manager, :handle, :exception],
&__MODULE__.handle_exception/4,
[]
)
end
def handle_start(_event, _, meta, _) do
event = meta.recorded_event
attributes = [
application: meta.application,
"process_manager.uuid": meta.process_uuid,
"process_manager.name": meta.process_manager_name,
"process_manager.module": meta.process_manager_module,
"event.id": event.event_id,
"event.number": event.event_number,
"event.type": event.event_type,
"correlation.id": event.correlation_id,
"causation.id": event.causation_id,
"stream.id": event.stream_id,
"stream.version": event.stream_version
]
Tracer.start_span("commanded:process_manager:handle", %{
kind: :CONSUMER,
parent: decode_ctx(event.metadata["trace_ctx"]),
attributes: attributes
})
end
def handle_stop(_event, _measurements, meta, _) do
commands = Map.get(meta, :commands, [])
Tracer.set_attribute(:"command.count", Enum.count(commands))
Tracer.end_span()
end
def handle_exception(_event, _measurements, meta, _) do
Tracer.set_attributes(error: true, "error.exception": inspect(meta[:error]))
Tracer.end_span()
end
end