Skip to content

Commit

Permalink
Report async queries lock wait duration
Browse files Browse the repository at this point in the history
This duration is very important to figure wether the `load_async`
actually improved something.
  • Loading branch information
byroot committed Jun 25, 2021
1 parent 6fa41fc commit 99913f6
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 43 deletions.
41 changes: 35 additions & 6 deletions activerecord/lib/active_record/future_result.rb
Expand Up @@ -2,10 +2,34 @@

module ActiveRecord
class FutureResult # :nodoc:
class EventBuffer
def initialize(future_result, instrumenter)
@future_result = future_result
@instrumenter = instrumenter
@events = []
end

def instrument(name, payload = {}, &block)
event = @instrumenter.new_event(name, payload)
@events << event
event.record(&block)
end

def flush
events, @events = @events, []
events.each do |event|
event.payload[:lock_wait] = @future_result.lock_wait
ActiveSupport::Notifications.publish_event(event)
end
end
end

Canceled = Class.new(ActiveRecordError)

delegate :empty?, :to_a, to: :result

attr_reader :lock_wait

def initialize(pool, *args, **kwargs)
@mutex = Mutex.new

Expand Down Expand Up @@ -43,7 +67,7 @@ def execute_or_skip
return unless @mutex.try_lock
begin
if pending?
@event_buffer = @instrumenter.buffer
@event_buffer = EventBuffer.new(self, @instrumenter)
connection.with_instrumenter(@event_buffer) do
execute_query(connection, async: true)
end
Expand Down Expand Up @@ -77,12 +101,17 @@ def canceled?
end

def execute_or_wait
return unless pending?

@mutex.synchronize do
if pending?
execute_query(@pool.connection)
if pending?
start = Concurrent.monotonic_time
@mutex.synchronize do
if pending?
execute_query(@pool.connection)
else
@lock_wait = (Concurrent.monotonic_time - start) * 1_000
end
end
else
@lock_wait = 0.0
end
end

Expand Down
7 changes: 5 additions & 2 deletions activerecord/lib/active_record/log_subscriber.rb
Expand Up @@ -37,9 +37,12 @@ def sql(event)

return if IGNORE_PAYLOAD_NAMES.include?(payload[:name])

name = "#{payload[:name]} (#{event.duration.round(1)}ms)"
name = if payload[:async]
"ASYNC #{payload[:name]} (#{payload[:lock_wait].round(1)}ms) (db time #{event.duration.round(1)}ms)"
else
"#{payload[:name]} (#{event.duration.round(1)}ms)"
end
name = "CACHE #{name}" if payload[:cached]
name = "ASYNC #{name}" if payload[:async]
sql = payload[:sql]
binds = nil

Expand Down
6 changes: 6 additions & 0 deletions activerecord/test/cases/log_subscriber_test.rb
Expand Up @@ -133,6 +133,12 @@ def test_basic_payload_name_logging_coloration_named_sql
end
end

def test_async_query
logger = TestDebugLogSubscriber.new
logger.sql(Event.new(0.9, sql: "SELECT * from models", name: "Model Load", async: true, lock_wait: 0.01))
assert_match(/ASYNC Model Load \(0\.0ms\) \(db time 0\.9ms\) SELECT/i, logger.debugs.last)
end

def test_query_logging_coloration_with_nested_select
logger = TestDebugLogSubscriber.new
logger.colorize_logging = true
Expand Down
6 changes: 6 additions & 0 deletions activerecord/test/cases/relation/load_async_test.rb
Expand Up @@ -62,6 +62,7 @@ def test_notification_forwarding
status[:executed] = true
status[:async] = event.payload[:async]
status[:thread_id] = Thread.current.object_id
status[:lock_wait] = event.payload[:lock_wait]
end
end

Expand All @@ -70,6 +71,11 @@ def test_notification_forwarding
assert_equal expected_records, deferred_posts.to_a
assert_equal Post.connection.supports_concurrent_connections?, status[:async]
assert_equal Thread.current.object_id, status[:thread_id]
if Post.connection.supports_concurrent_connections?
assert_instance_of Float, status[:lock_wait]
else
assert_nil status[:lock_wait]
end
ensure
ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber
end
Expand Down
24 changes: 0 additions & 24 deletions activesupport/lib/active_support/notifications/instrumenter.rb
Expand Up @@ -6,26 +6,6 @@ module ActiveSupport
module Notifications
# Instrumenters are stored in a thread local.
class Instrumenter
class Buffer # :nodoc:
def initialize(instrumenter)
@instrumenter = instrumenter
@events = []
end

def instrument(name, payload = {}, &block)
event = @instrumenter.new_event(name, payload)
@events << event
event.record(&block)
end

def flush
events, @events = @events, []
events.each do |event|
ActiveSupport::Notifications.publish_event(event)
end
end
end

attr_reader :id

def initialize(notifier)
Expand Down Expand Up @@ -55,10 +35,6 @@ def new_event(name, payload = {}) # :nodoc:
Event.new(name, nil, nil, @id, payload)
end

def buffer # :nodoc:
Buffer.new(self)
end

# Send a start notification with +name+ and +payload+.
def start(name, payload)
@notifier.start name, @id, payload
Expand Down
11 changes: 0 additions & 11 deletions activesupport/test/log_subscriber_test.rb
Expand Up @@ -107,17 +107,6 @@ def test_does_not_send_the_event_if_logger_is_nil
end
end

def test_does_not_send_buffered_events_if_logger_is_nil
ActiveSupport::LogSubscriber.logger = nil
assert_not_called(@log_subscriber, :some_event) do
ActiveSupport::LogSubscriber.attach_to :my_log_subscriber, @log_subscriber
buffer = ActiveSupport::Notifications.instrumenter.buffer
buffer.instrument "some_event.my_log_subscriber"
buffer.flush
wait
end
end

def test_does_not_fail_with_non_namespaced_events
ActiveSupport::LogSubscriber.attach_to :my_log_subscriber, @log_subscriber
instrument "whatever"
Expand Down

0 comments on commit 99913f6

Please sign in to comment.