diff --git a/lib/ldclient-rb/events.rb b/lib/ldclient-rb/events.rb index 69563572..f57287a4 100644 --- a/lib/ldclient-rb/events.rb +++ b/lib/ldclient-rb/events.rb @@ -4,6 +4,23 @@ require "thread" require "time" +# +# Analytics event processing in the SDK involves several components. The purpose of this design is to +# minimize overhead on the application threads that are generating analytics events. +# +# EventProcessor receives an analytics event from the SDK client, on an application thread. It places +# the event in a bounded queue, the "inbox", and immediately returns. +# +# On a separate worker thread, EventDispatcher consumes events from the inbox. These are considered +# "input events" because they may or may not actually be sent to LaunchDarkly; most flag evaluation +# events are not sent, but are counted and the counters become part of a single summary event. +# EventDispatcher updates those counters, creates "index" events for any users that have not been seen +# recently, and places any events that will be sent to LaunchDarkly into the "outbox" queue. +# +# When it is time to flush events to LaunchDarkly, the contents of the outbox are handed off to +# another worker thread which sends the HTTP request. +# + module LaunchDarkly MAX_FLUSH_WORKERS = 5 CURRENT_SCHEMA_VERSION = 3 @@ -68,28 +85,30 @@ class StopMessage < SynchronousMessage # @private class EventProcessor def initialize(sdk_key, config, client = nil) - @queue = Queue.new + @logger = config.logger + @inbox = SizedQueue.new(config.capacity) @flush_task = Concurrent::TimerTask.new(execution_interval: config.flush_interval) do - @queue << FlushMessage.new + post_to_inbox(FlushMessage.new) end @flush_task.execute @users_flush_task = Concurrent::TimerTask.new(execution_interval: config.user_keys_flush_interval) do - @queue << FlushUsersMessage.new + post_to_inbox(FlushUsersMessage.new) end @users_flush_task.execute @stopped = Concurrent::AtomicBoolean.new(false) - - EventDispatcher.new(@queue, sdk_key, config, client) + @inbox_full = Concurrent::AtomicBoolean.new(false) + + EventDispatcher.new(@inbox, sdk_key, config, client) end def add_event(event) event[:creationDate] = (Time.now.to_f * 1000).to_i - @queue << EventMessage.new(event) + post_to_inbox(EventMessage.new(event)) end def flush # flush is done asynchronously - @queue << FlushMessage.new + post_to_inbox(FlushMessage.new) end def stop @@ -97,9 +116,11 @@ def stop if @stopped.make_true @flush_task.shutdown @users_flush_task.shutdown - @queue << FlushMessage.new + # Note that here we are not calling post_to_inbox, because we *do* want to wait if the inbox + # is full; an orderly shutdown can't happen unless these messages are received. + @inbox << FlushMessage.new stop_msg = StopMessage.new - @queue << stop_msg + @inbox << stop_msg stop_msg.wait_for_completion end end @@ -107,14 +128,30 @@ def stop # exposed only for testing def wait_until_inactive sync_msg = TestSyncMessage.new - @queue << sync_msg + @inbox << sync_msg sync_msg.wait_for_completion end + + private + + def post_to_inbox(message) + begin + @inbox.push(message, non_block=true) + rescue ThreadError + # If the inbox is full, it means the EventDispatcher thread is seriously backed up with not-yet-processed + # events. This is unlikely, but if it happens, it means the application is probably doing a ton of flag + # evaluations across many threads-- so if we wait for a space in the inbox, we risk a very serious slowdown + # of the app. To avoid that, we'll just drop the event. The log warning about this will only be shown once. + if @inbox_full.make_true + @logger.warn { "[LDClient] Events are being produced faster than they can be processed; some events will be dropped" } + end + end + end end # @private class EventDispatcher - def initialize(queue, sdk_key, config, client) + def initialize(inbox, sdk_key, config, client) @sdk_key = sdk_key @config = config @@ -129,10 +166,10 @@ def initialize(queue, sdk_key, config, client) @disabled = Concurrent::AtomicBoolean.new(false) @last_known_past_time = Concurrent::AtomicReference.new(0) - buffer = EventBuffer.new(config.capacity, config.logger) + outbox = EventBuffer.new(config.capacity, config.logger) flush_workers = NonBlockingThreadPool.new(MAX_FLUSH_WORKERS) - Thread.new { main_loop(queue, buffer, flush_workers) } + Thread.new { main_loop(inbox, outbox, flush_workers) } end private @@ -141,16 +178,16 @@ def now_millis() (Time.now.to_f * 1000).to_i end - def main_loop(queue, buffer, flush_workers) + def main_loop(inbox, outbox, flush_workers) running = true while running do begin - message = queue.pop + message = inbox.pop case message when EventMessage - dispatch_event(message.event, buffer) + dispatch_event(message.event, outbox) when FlushMessage - trigger_flush(buffer, flush_workers) + trigger_flush(outbox, flush_workers) when FlushUsersMessage @user_keys.clear when TestSyncMessage @@ -181,11 +218,11 @@ def synchronize_for_testing(flush_workers) flush_workers.wait_all end - def dispatch_event(event, buffer) + def dispatch_event(event, outbox) return if @disabled.value # Always record the event in the summary. - buffer.add_to_summary(event) + outbox.add_to_summary(event) # Decide whether to add the event to the payload. Feature events may be added twice, once for # the event (if tracked) and once for debugging. @@ -205,7 +242,7 @@ def dispatch_event(event, buffer) # an identify event for that user. if !(will_add_full_event && @config.inline_users_in_events) if event.has_key?(:user) && !notice_user(event[:user]) && event[:kind] != "identify" - buffer.add_event({ + outbox.add_event({ kind: "index", creationDate: event[:creationDate], user: event[:user] @@ -213,8 +250,8 @@ def dispatch_event(event, buffer) end end - buffer.add_event(event) if will_add_full_event - buffer.add_event(debug_event) if !debug_event.nil? + outbox.add_event(event) if will_add_full_event + outbox.add_event(debug_event) if !debug_event.nil? end # Add to the set of users we've noticed, and return true if the user was already known to us. @@ -236,12 +273,12 @@ def should_debug_event(event) end end - def trigger_flush(buffer, flush_workers) + def trigger_flush(outbox, flush_workers) if @disabled.value return end - payload = buffer.get_payload + payload = outbox.get_payload if !payload.events.empty? || !payload.summary.counters.empty? # If all available worker threads are busy, success will be false and no job will be queued. success = flush_workers.post do @@ -252,7 +289,7 @@ def trigger_flush(buffer, flush_workers) Util.log_exception(@config.logger, "Unexpected error in event processor", e) end end - buffer.clear if success # Reset our internal state, these events now belong to the flush worker + outbox.clear if success # Reset our internal state, these events now belong to the flush worker end end