-
Notifications
You must be signed in to change notification settings - Fork 149
/
tracer_middleware.rb
75 lines (67 loc) · 3.22 KB
/
tracer_middleware.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# frozen_string_literal: true
# Copyright The OpenTelemetry Authors
#
# SPDX-License-Identifier: Apache-2.0
module OpenTelemetry
module Instrumentation
module Sidekiq
module Middlewares
module Server
# TracerMiddleware propagates context and instruments Sidekiq requests
# by way of its middleware system
class TracerMiddleware
include ::Sidekiq::ServerMiddleware if defined?(::Sidekiq::ServerMiddleware)
def call(_worker, msg, _queue)
attributes = {
SemanticConventions::Trace::MESSAGING_SYSTEM => 'sidekiq',
'messaging.sidekiq.job_class' => msg['wrapped']&.to_s || msg['class'],
SemanticConventions::Trace::MESSAGING_MESSAGE_ID => msg['jid'],
SemanticConventions::Trace::MESSAGING_DESTINATION => msg['queue'],
SemanticConventions::Trace::MESSAGING_DESTINATION_KIND => 'queue',
SemanticConventions::Trace::MESSAGING_OPERATION => 'process'
}
attributes[SemanticConventions::Trace::PEER_SERVICE] = instrumentation_config[:peer_service] if instrumentation_config[:peer_service]
span_name = case instrumentation_config[:span_naming]
when :job_class then "#{msg['wrapped']&.to_s || msg['class']} process"
else "#{msg['queue']} process"
end
extracted_context = OpenTelemetry.propagation.extract(msg)
OpenTelemetry::Context.with_current(extracted_context) do
if instrumentation_config[:propagation_style] == :child
tracer.in_span(span_name, attributes: attributes, kind: :consumer) do |span|
span.add_event('created_at', timestamp: msg['created_at'])
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
yield
end
else
links = []
span_context = OpenTelemetry::Trace.current_span(extracted_context).context
links << OpenTelemetry::Trace::Link.new(span_context) if instrumentation_config[:propagation_style] == :link && span_context.valid?
span = tracer.start_root_span(span_name, attributes: attributes, links: links, kind: :consumer)
OpenTelemetry::Trace.with_span(span) do
span.add_event('created_at', timestamp: msg['created_at'])
span.add_event('enqueued_at', timestamp: msg['enqueued_at'])
yield
rescue Exception => e # rubocop:disable Lint/RescueException
span.record_exception(e)
span.status = OpenTelemetry::Trace::Status.error("Unhandled exception of type: #{e.class}")
raise e
ensure
span.finish
end
end
end
end
private
def instrumentation_config
Sidekiq::Instrumentation.instance.config
end
def tracer
Sidekiq::Instrumentation.instance.tracer
end
end
end
end
end
end
end