diff --git a/activerecord/lib/active_record/future_result.rb b/activerecord/lib/active_record/future_result.rb index a45b1006953de..de9f228030cab 100644 --- a/activerecord/lib/active_record/future_result.rb +++ b/activerecord/lib/active_record/future_result.rb @@ -2,10 +2,34 @@ module ActiveRecord class FutureResult # :nodoc: + class EventBuffer + def initialize(future_result, instrumenter) + @future_result = future_result + @instrumenter = instrumenter + @events = [] + end + + def instrument(name, payload = {}, &block) + event = @instrumenter.new_event(name, payload) + @events << event + event.record(&block) + end + + def flush + events, @events = @events, [] + events.each do |event| + event.payload[:lock_wait] = @future_result.lock_wait + ActiveSupport::Notifications.publish_event(event) + end + end + end + Canceled = Class.new(ActiveRecordError) delegate :empty?, :to_a, to: :result + attr_reader :lock_wait + def initialize(pool, *args, **kwargs) @mutex = Mutex.new @@ -43,7 +67,7 @@ def execute_or_skip return unless @mutex.try_lock begin if pending? - @event_buffer = @instrumenter.buffer + @event_buffer = EventBuffer.new(self, @instrumenter) connection.with_instrumenter(@event_buffer) do execute_query(connection, async: true) end @@ -77,12 +101,17 @@ def canceled? end def execute_or_wait - return unless pending? - - @mutex.synchronize do - if pending? - execute_query(@pool.connection) + if pending? + start = Concurrent.monotonic_time + @mutex.synchronize do + if pending? + execute_query(@pool.connection) + else + @lock_wait = (Concurrent.monotonic_time - start) * 1_000 + end end + else + @lock_wait = 0.0 end end diff --git a/activerecord/lib/active_record/log_subscriber.rb b/activerecord/lib/active_record/log_subscriber.rb index 502d945b5b99e..d3a335d8d307d 100644 --- a/activerecord/lib/active_record/log_subscriber.rb +++ b/activerecord/lib/active_record/log_subscriber.rb @@ -37,9 +37,12 @@ def sql(event) return if IGNORE_PAYLOAD_NAMES.include?(payload[:name]) - name = "#{payload[:name]} (#{event.duration.round(1)}ms)" + name = if payload[:async] + "ASYNC #{payload[:name]} (#{payload[:lock_wait].round(1)}ms) (db time #{event.duration.round(1)}ms)" + else + "#{payload[:name]} (#{event.duration.round(1)}ms)" + end name = "CACHE #{name}" if payload[:cached] - name = "ASYNC #{name}" if payload[:async] sql = payload[:sql] binds = nil diff --git a/activerecord/test/cases/log_subscriber_test.rb b/activerecord/test/cases/log_subscriber_test.rb index b984c59bbd81d..c34eef35625db 100644 --- a/activerecord/test/cases/log_subscriber_test.rb +++ b/activerecord/test/cases/log_subscriber_test.rb @@ -133,6 +133,12 @@ def test_basic_payload_name_logging_coloration_named_sql end end + def test_async_query + logger = TestDebugLogSubscriber.new + logger.sql(Event.new(0.9, sql: "SELECT * from models", name: "Model Load", async: true, lock_wait: 0.01)) + assert_match(/ASYNC Model Load \(0\.0ms\) \(db time 0\.9ms\) SELECT/i, logger.debugs.last) + end + def test_query_logging_coloration_with_nested_select logger = TestDebugLogSubscriber.new logger.colorize_logging = true diff --git a/activerecord/test/cases/relation/load_async_test.rb b/activerecord/test/cases/relation/load_async_test.rb index a8d6dfc61565d..4e4c6fe1a7fac 100644 --- a/activerecord/test/cases/relation/load_async_test.rb +++ b/activerecord/test/cases/relation/load_async_test.rb @@ -62,6 +62,7 @@ def test_notification_forwarding status[:executed] = true status[:async] = event.payload[:async] status[:thread_id] = Thread.current.object_id + status[:lock_wait] = event.payload[:lock_wait] end end @@ -70,6 +71,11 @@ def test_notification_forwarding assert_equal expected_records, deferred_posts.to_a assert_equal Post.connection.supports_concurrent_connections?, status[:async] assert_equal Thread.current.object_id, status[:thread_id] + if Post.connection.supports_concurrent_connections? + assert_instance_of Float, status[:lock_wait] + else + assert_nil status[:lock_wait] + end ensure ActiveSupport::Notifications.unsubscribe(subscriber) if subscriber end diff --git a/activesupport/lib/active_support/notifications/instrumenter.rb b/activesupport/lib/active_support/notifications/instrumenter.rb index 834f6cb1d0d70..8546346ac37d0 100644 --- a/activesupport/lib/active_support/notifications/instrumenter.rb +++ b/activesupport/lib/active_support/notifications/instrumenter.rb @@ -6,26 +6,6 @@ module ActiveSupport module Notifications # Instrumenters are stored in a thread local. class Instrumenter - class Buffer # :nodoc: - def initialize(instrumenter) - @instrumenter = instrumenter - @events = [] - end - - def instrument(name, payload = {}, &block) - event = @instrumenter.new_event(name, payload) - @events << event - event.record(&block) - end - - def flush - events, @events = @events, [] - events.each do |event| - ActiveSupport::Notifications.publish_event(event) - end - end - end - attr_reader :id def initialize(notifier) @@ -55,10 +35,6 @@ def new_event(name, payload = {}) # :nodoc: Event.new(name, nil, nil, @id, payload) end - def buffer # :nodoc: - Buffer.new(self) - end - # Send a start notification with +name+ and +payload+. def start(name, payload) @notifier.start name, @id, payload diff --git a/activesupport/test/log_subscriber_test.rb b/activesupport/test/log_subscriber_test.rb index 0f51c6aaa7da9..d4bd082defac9 100644 --- a/activesupport/test/log_subscriber_test.rb +++ b/activesupport/test/log_subscriber_test.rb @@ -107,17 +107,6 @@ def test_does_not_send_the_event_if_logger_is_nil end end - def test_does_not_send_buffered_events_if_logger_is_nil - ActiveSupport::LogSubscriber.logger = nil - assert_not_called(@log_subscriber, :some_event) do - ActiveSupport::LogSubscriber.attach_to :my_log_subscriber, @log_subscriber - buffer = ActiveSupport::Notifications.instrumenter.buffer - buffer.instrument "some_event.my_log_subscriber" - buffer.flush - wait - end - end - def test_does_not_fail_with_non_namespaced_events ActiveSupport::LogSubscriber.attach_to :my_log_subscriber, @log_subscriber instrument "whatever"