Skip to content

Commit e890871

Browse files
committed
feat: track ActiveRecord async queries
1 parent 3bb36a6 commit e890871

File tree

8 files changed

+336
-2
lines changed

8 files changed

+336
-2
lines changed

Gemfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,4 @@ source 'https://rubygems.org'
99
gem 'rake', '~> 13.0'
1010
gem 'rubocop', '~> 1.69.1'
1111
gem 'rubocop-performance', '~> 1.23.0'
12+
gem 'opentelemetry-instrumentation-concurrent_ruby'

instrumentation/active_record/Appraisals

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.1.0')
1616
appraise "activerecord-#{version}" do
1717
gem 'sqlite3', '~> 1.4'
1818
gem 'activerecord', "~> #{version}"
19+
20+
# ActiveRecord async query instrumentation should work with or without this present
21+
remove_gem 'opentelemetry-instrumentation-concurrent_ruby'
1922
end
2023
end
2124

instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/instrumentation.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
module OpenTelemetry
88
module Instrumentation
99
module ActiveRecord
10+
QUERY_SPAN_NAME_KEY = OpenTelemetry::Context.create_key('async_query_span_name')
11+
1012
# The Instrumentation class contains logic to detect and install the ActiveRecord instrumentation
1113
class Instrumentation < OpenTelemetry::Instrumentation::Base
1214
MINIMUM_VERSION = Gem::Version.new('7')
@@ -39,6 +41,7 @@ def require_dependencies
3941
require_relative 'patches/transactions_class_methods'
4042
require_relative 'patches/validations'
4143
require_relative 'patches/relation_persistence'
44+
require_relative 'patches/async_query_context_propagation'
4245
end
4346

4447
def patch_activerecord
@@ -55,6 +58,10 @@ def patch_activerecord
5558
::ActiveRecord::Base.prepend(Patches::Validations)
5659

5760
::ActiveRecord::Relation.prepend(Patches::RelationPersistence)
61+
62+
::ActiveRecord::ConnectionAdapters::ConnectionPool.prepend(Patches::AsyncQueryContextPropagation) unless defined?(OpenTelemetry::Instrumentation::ConcurrentRuby::Instrumentation)
63+
64+
::ActiveRecord::FutureResult.prepend(Patches::FutureResultExtensions)
5865
end
5966
end
6067
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
module OpenTelemetry
8+
module Instrumentation
9+
module ActiveRecord
10+
QUERY_SPAN_NAME_KEY = OpenTelemetry::Context.create_key('async_query_span_name')
11+
12+
module Patches
13+
# Module to prepend to ActiveRecord::ConnectionAdapters::ConnectionPool.
14+
# This is only installed if OpenTelemetry::Instrumentation::ConcurrentRuby is not defined.
15+
module AsyncQueryContextPropagation
16+
def schedule_query(future_result) # :nodoc:
17+
context = OpenTelemetry::Context.current
18+
19+
@async_executor.post do
20+
# This can happen in the request thread, in the case of a busy executor (fallback_action is executed.)
21+
OpenTelemetry::Context.with_current(context) do
22+
future_result.execute_or_skip
23+
end
24+
end
25+
26+
Thread.pass
27+
end
28+
end
29+
30+
# Module to support otel context propagation to ActiveRecord::FutureResults
31+
module FutureResultExtensions
32+
OTEL_QUERY_SPAN_NAME_IVAR = :@__otel_query_span_name
33+
34+
def initialize(...)
35+
super
36+
37+
if (query_span_name = OpenTelemetry::Context.current.value(QUERY_SPAN_NAME_KEY))
38+
instance_variable_set(OTEL_QUERY_SPAN_NAME_IVAR, query_span_name)
39+
end
40+
end
41+
42+
private
43+
44+
def execute_query(connection, async: false)
45+
name = instance_variable_get(OTEL_QUERY_SPAN_NAME_IVAR) || @args[1] || 'execute_query'
46+
Instrumentation.instance.tracer.in_span(name, attributes: { 'async' => async }) do
47+
super
48+
end
49+
end
50+
end
51+
end
52+
end
53+
end
54+
end

instrumentation/active_record/lib/opentelemetry/instrumentation/active_record/patches/querying.rb

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,11 @@ module ClassMethods
2121
method_name = ::ActiveRecord.version >= Gem::Version.new('7.0.0') ? :_query_by_sql : :find_by_sql
2222

2323
define_method(method_name) do |*args, **kwargs, &block|
24-
tracer.in_span("#{self} query") do
25-
super(*args, **kwargs, &block)
24+
query_span_name = "#{self} query"
25+
OpenTelemetry::Context.with_value(QUERY_SPAN_NAME_KEY, query_span_name) do
26+
tracer.in_span(kwargs[:async] ? "schedule #{query_span_name}" : query_span_name) do
27+
super(*args, **kwargs, &block)
28+
end
2629
end
2730
end
2831

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
# frozen_string_literal: true
2+
3+
# Copyright The OpenTelemetry Authors
4+
#
5+
# SPDX-License-Identifier: Apache-2.0
6+
7+
require 'test_helper'
8+
9+
require_relative '../../../../lib/opentelemetry/instrumentation/active_record'
10+
require_relative '../../../../lib/opentelemetry/instrumentation/active_record/patches/async_query_context_propagation'
11+
12+
ASYNC_TEST_LOGGER = Logger.new($stdout).tap { |logger| logger.level = Logger::WARN }
13+
14+
describe OpenTelemetry::Instrumentation::ActiveRecord::Patches::AsyncQueryContextPropagation do
15+
let(:exporter) { EXPORTER }
16+
let(:unfiltered_spans) { exporter.finished_spans }
17+
let(:instrumentation) { OpenTelemetry::Instrumentation::ActiveRecord::Instrumentation.instance }
18+
let(:logger) { ASYNC_TEST_LOGGER }
19+
20+
before do
21+
exporter.reset
22+
setup_asynchronous_queries_session
23+
User.create!
24+
end
25+
26+
after do
27+
teardown_asynchronous_queries_session
28+
29+
ActiveRecord::Base.subclasses.each do |model|
30+
model.connection.truncate(model.table_name)
31+
end
32+
end
33+
34+
def setup_asynchronous_queries_session
35+
@_async_queries_session = ActiveRecord::Base.asynchronous_queries_tracker.start_session
36+
end
37+
38+
def teardown_asynchronous_queries_session
39+
args = ActiveRecord::VERSION::MAJOR >= 8 ? [true] : []
40+
ActiveRecord::Base.asynchronous_queries_tracker.finalize_session(*args) if @_async_queries_session
41+
end
42+
43+
def run_async_load
44+
logger.debug('>>> run async load')
45+
in_new_trace do
46+
OpenTelemetry::Context.with_value(SpanThreadIdTracking::TRACK_THREAD_ID, true) do
47+
instrumentation.tracer.in_span('test_wrapper') do
48+
if block_given?
49+
yield
50+
else
51+
users = User.all.load_async
52+
sleep(0.5)
53+
logger.debug('>>> async #to_a')
54+
users.to_a
55+
end
56+
end
57+
end
58+
end
59+
end
60+
61+
def in_new_trace(&block)
62+
OpenTelemetry::Context.with_current(OpenTelemetry::Context::ROOT, &block)
63+
end
64+
65+
def spans
66+
test_wrapper_span = unfiltered_spans.find { |span| span.name == 'test_wrapper' }
67+
unfiltered_spans.select { |span| span.trace_id == test_wrapper_span.trace_id }
68+
end
69+
70+
def span_names
71+
spans.map(&:name).sort
72+
end
73+
74+
# call with block_queue: true to completely block the executor (no tasks can be enqueued),
75+
# or with block_queue: false to block the workers only (tasks still accepted in the queue)
76+
def with_busy_executor(block_queue: true)
77+
_(ActiveRecord.async_query_executor).must_equal :global_thread_pool
78+
79+
mutex = Mutex.new
80+
condvar = ConditionVariable.new
81+
executor = ActiveRecord.instance_variable_get(:@global_thread_pool_async_query_executor)
82+
83+
task_count = executor.max_length
84+
task_count += executor.max_queue if block_queue
85+
86+
awaiting_signals = (0...task_count).to_a
87+
88+
# Fill up the max thread count and queue with tasks that
89+
# will never complete until they are signaled to do so.
90+
task_count.times do |n|
91+
executor.post do
92+
mutex.synchronize do
93+
ASYNC_TEST_LOGGER.debug("task #{n} waiting...")
94+
condvar.wait(mutex)
95+
ASYNC_TEST_LOGGER.debug("task #{n} got the signal")
96+
awaiting_signals.delete(n)
97+
end
98+
end
99+
end
100+
101+
logger.debug("yielding... block_queue=#{block_queue}")
102+
yield
103+
logger.debug('...done!')
104+
ensure
105+
logger.debug('cleaning up...')
106+
# clean up the queue
107+
mutex.synchronize { condvar.signal } until awaiting_signals.empty?
108+
end
109+
110+
def current_thread_id
111+
Thread.current.object_id
112+
end
113+
114+
def execute_query_span
115+
spans.find { |span| span.name == 'User query' }
116+
end
117+
118+
it 'async_query' do
119+
run_async_load
120+
121+
_(span_names).must_equal(['test_wrapper', 'User query', 'schedule User query'].sort)
122+
_(execute_query_span.attributes['__test_only_thread_id']).wont_equal(current_thread_id)
123+
_(execute_query_span.attributes['async']).must_equal(true)
124+
end
125+
126+
describe 'no executor' do
127+
before do
128+
@async_query_executor_was = ActiveRecord.async_query_executor
129+
ActiveRecord.async_query_executor = nil
130+
end
131+
132+
after do
133+
ActiveRecord.async_query_executor = @async_query_executor_was
134+
end
135+
136+
it 'is not actually async' do
137+
run_async_load # sic
138+
139+
_(spans.map(&:name)).wont_include('Schedule User query')
140+
_(spans.map(&:name)).must_include('User query')
141+
142+
user_query = spans.find { |span| span.name == 'User query' }
143+
_(user_query.attributes['async']).must_equal(false) if user_query.attributes.key?('async')
144+
_(span_names).must_equal(['User query', 'test_wrapper'].sort)
145+
_(execute_query_span.attributes['__test_only_thread_id']).must_equal(current_thread_id)
146+
end
147+
end
148+
149+
it 'async_query_blocked_executor' do
150+
with_busy_executor { run_async_load }
151+
152+
# In this case the wrapped task is executed as the 'fallback_action' by the thread pool executor,
153+
# so we get the async span, even though it is not actually async.
154+
_(execute_query_span.attributes['__test_only_thread_id']).must_equal(current_thread_id)
155+
156+
skip(<<~SKIP)
157+
`async` _should_ be false here, but it's executed as a fallback action and
158+
is incorrectly set to `true`.
159+
160+
Whether or not this is actually an issue is up for debate;
161+
it's true that the query would have been async if the global pool load was lower,
162+
so it could be said that the benefit of attempting to enqueue the task
163+
is measured in degrees, ranging from no benefit to saving the entire time of the query.
164+
165+
However, the _other_ scenario in which the task is enqueued but not yet worked on
166+
causes `async` to be false.
167+
168+
Ultimately, however, this is a bug in Rails's instrumentation around async queries,
169+
so it doesn't feel particularly pressing to solve it here with a bunch of
170+
otherwise unecessary patches.
171+
SKIP
172+
173+
_(execute_query_span.attributes['async']).must_equal(false)
174+
end
175+
176+
it 'async_query_slow_executor' do
177+
# executor accepts task, but doesn't fulfill it before the waiter
178+
with_busy_executor(block_queue: false) do
179+
run_async_load
180+
end
181+
182+
# When #to_a is called, the query is still pending and hasn't been picked up,
183+
# so AR executes is synchronously. The executor task is cancelled (or should be?),
184+
# so this span won't be here.
185+
_(execute_query_span.attributes['async']).must_equal(false)
186+
_(execute_query_span.attributes['__test_only_thread_id']).must_equal(current_thread_id)
187+
_(span_names).must_equal(['User query', 'schedule User query', 'test_wrapper'])
188+
end
189+
190+
it 'async_query_no_wait' do
191+
run_async_load do
192+
User.all.load_async.to_a
193+
end
194+
195+
# here we call #to_a inline, so it (maybe) executes before the async scheduler
196+
# could assign the task to a worker. This expectation will not always pass,
197+
# but it remains here to exhaust the possible async execution scenarios.
198+
skip('this expectation is allowed to fail') if execute_query_span.attributes['async']
199+
200+
_(execute_query_span.attributes['async']).must_equal(false)
201+
_(execute_query_span.attributes['__test_only_thread_id']).must_equal(current_thread_id)
202+
end
203+
204+
it 'async_count' do
205+
if User.respond_to?(:async_count)
206+
run_async_load do
207+
count = User.async_count
208+
sleep(0.5)
209+
count.value
210+
end
211+
212+
count_span = spans.find { |span| span.name == 'User Count' }
213+
_(count_span.attributes['async']).must_equal(true)
214+
else
215+
skip("async_count not supported in ActiveRecord #{ActiveRecord::VERSION::STRING}")
216+
end
217+
end
218+
219+
it 'works with concurrent queries' do
220+
Account.create!
221+
222+
run_async_load do
223+
users = User.all.load_async
224+
accounts = Account.all.load_async
225+
226+
sleep(0.5)
227+
228+
users.to_a
229+
accounts.to_a
230+
end
231+
232+
user_schedule_span = spans.find { |span| span.name == 'schedule User query' }
233+
account_schedule_span = spans.find { |span| span.name == 'schedule Account query' }
234+
user_query_span = spans.find { |span| span.name == 'User query' }
235+
account_query_span = spans.find { |span| span.name == 'Account query' }
236+
test_wrapper_span = spans.find { |span| span.name == 'test_wrapper' }
237+
238+
_(user_schedule_span.parent_span_id).must_equal(test_wrapper_span.span_id)
239+
_(account_schedule_span.parent_span_id).must_equal(test_wrapper_span.span_id)
240+
241+
_(user_query_span.parent_span_id).must_equal(user_schedule_span.span_id)
242+
_(account_query_span.parent_span_id).must_equal(account_schedule_span.span_id)
243+
244+
_(user_query_span.attributes['async']).must_equal(true)
245+
_(account_query_span.attributes['async']).must_equal(true)
246+
end
247+
end

instrumentation/active_record/test/instrumentation/active_record/patches/querying_test.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
let(:spans) { exporter.finished_spans }
1515

1616
before { exporter.reset }
17+
1718
after do
1819
ActiveRecord::Base.subclasses.each do |model|
1920
model.connection.truncate(model.table_name)

instrumentation/active_record/test/test_helper.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
ActiveRecord::Base.logger = logger
3030
ActiveRecord::Migration.verbose = false
3131

32+
ActiveRecord.async_query_executor = :global_thread_pool
33+
3234
ActiveRecord::Base.establish_connection(
3335
adapter: 'sqlite3',
3436
database: 'db/development.sqlite3'
@@ -84,3 +86,19 @@ def change
8486
end
8587

8688
Minitest.after_run { CreateUserTable.migrate(:down) }
89+
90+
# Used in async tests to determine what thread spawned which span
91+
module SpanThreadIdTracking
92+
TRACK_THREAD_ID = OpenTelemetry::Context.create_key(:track_thread_id)
93+
94+
def internal_start_span(name, kind, attributes, links, start_timestamp, parent_context, instrumentation_scope) # rubocop: disable Metrics/ParameterLists
95+
if parent_context.value(TRACK_THREAD_ID)
96+
attributes ||= {}
97+
attributes['__test_only_thread_id'] = Thread.current.object_id
98+
end
99+
100+
super
101+
end
102+
end
103+
104+
OpenTelemetry::SDK::Trace::TracerProvider.prepend(SpanThreadIdTracking)

0 commit comments

Comments
 (0)