From 341056738dfa6c98c24ef622bf54a24d2a272506 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Mon, 9 Dec 2019 14:24:49 -0800 Subject: [PATCH 01/23] refactor for unicorn threads --- lib/optimizely/event/batch_event_processor.rb | 54 +++++++++---------- 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index b49a55f7..fc613d25 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -107,36 +107,12 @@ def stop! private - def run - # if we receive a number of item nils that reach MAX_NIL_COUNT, - # then we hang on the pop via setting use_pop to false - @nil_count = 0 - # hang on pop if true - @use_pop = false - loop do - if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline - @logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.') - flush_queue! - @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval - @use_pop = true if @nil_count > MAX_NIL_COUNT - end - - item = @event_queue.pop if @event_queue.length.positive? || @use_pop - - if item.nil? - # when nil count is greater than MAX_NIL_COUNT, we hang on the pop until there is an item available. - # this avoids to much spinning of the loop. - @nil_count += 1 - next - end - - # reset nil_count and use_pop if we have received an item. - @nil_count = 0 - @use_pop = false - + def process_events + while @event_queue.length.positive? + item = @event_queue.pop if item == SHUTDOWN_SIGNAL @logger.log(Logger::DEBUG, 'Received shutdown signal.') - break + return false end if item == FLUSH_SIGNAL @@ -147,9 +123,29 @@ def run add_to_batch(item) if item.is_a? Optimizely::UserEvent end + return true + end + + def run + loop do + if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline + @logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.') + + break unless process_events + + flush_queue! + @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval + end + + break unless process_events + + interval = (Helpers::DateTimeUtils.create_timestamp - @flushing_interval_deadline)/1.0 + + sleep interval if interval > 0 + end rescue SignalException @logger.log(Logger::ERROR, 'Interrupted while processing buffer.') - rescue Exception => e + rescue => e @logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}") ensure @logger.log( From bdb018daf239bea62eab7748e02a7152ecf81aea Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Mon, 9 Dec 2019 14:36:37 -0800 Subject: [PATCH 02/23] lint errors --- lib/optimizely/event/batch_event_processor.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index fc613d25..3584a68d 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -123,7 +123,7 @@ def process_events add_to_batch(item) if item.is_a? Optimizely::UserEvent end - return true + true end def run @@ -139,9 +139,9 @@ def run break unless process_events - interval = (Helpers::DateTimeUtils.create_timestamp - @flushing_interval_deadline)/1.0 + interval = (Helpers::DateTimeUtils.create_timestamp - @flushing_interval_deadline) / 1.0 - sleep interval if interval > 0 + sleep interval if interval.positive? end rescue SignalException @logger.log(Logger::ERROR, 'Interrupted while processing buffer.') From 7d457b3979aedc3c616de6ec82c2f1369dfcd0ed Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Mon, 9 Dec 2019 14:53:56 -0800 Subject: [PATCH 03/23] remove rescue Exception. --- .rubocop.yml | 2 +- lib/optimizely/event/batch_event_processor.rb | 4 ++-- spec/event/batch_event_processor_spec.rb | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.rubocop.yml b/.rubocop.yml index d05c3a81..65bf129f 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -46,7 +46,7 @@ Style/SignalException: Enabled: false Lint/RescueException: - Enabled: false + Enabled: true Layout/EndOfLine: EnforcedStyle: lf diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 3584a68d..061f0805 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -90,8 +90,8 @@ def process(user_event) begin @event_queue.push(user_event, true) - rescue Exception - @logger.log(Logger::WARN, 'Payload not accepted by the queue.') + rescue => e + @logger.log(Logger::WARN, 'Payload not accepted by the queue. ' + e.message) return end end diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 7cd86d0b..7878fb0a 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -62,7 +62,7 @@ @event_processor.process(conversion_event) # flush interval is set to 100ms. Wait for 300ms and assert that event is dispatched. - sleep 1 + sleep 100 expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once expect(@notification_center).to have_received(:send_notifications).with( From 33d4ec59f56dd84b73efb8a2092a479ba35c59dd Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Mon, 9 Dec 2019 15:07:15 -0800 Subject: [PATCH 04/23] more robust error message on queue full --- lib/optimizely/event/batch_event_processor.rb | 2 +- spec/event/batch_event_processor_spec.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 061f0805..c7103470 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -91,7 +91,7 @@ def process(user_event) begin @event_queue.push(user_event, true) rescue => e - @logger.log(Logger::WARN, 'Payload not accepted by the queue. ' + e.message) + @logger.log(Logger::WARN, 'Payload not accepted by the queue: ' + e.message) return end end diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 7878fb0a..9f9eb8b4 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -344,7 +344,7 @@ # Wait until other thread has processed the event. while @event_processor.current_batch.length != 10; end expect(@event_dispatcher).not_to have_received(:dispatch_event) - expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Payload not accepted by the queue.').once + expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Payload not accepted by the queue: queue full').once end it 'should not process and log when Executor is not running' do From 4784a7944e7ec752e4d5ebf7328c411aa94312ed Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Mon, 9 Dec 2019 15:12:07 -0800 Subject: [PATCH 05/23] rename methods for clarity --- lib/optimizely/event/batch_event_processor.rb | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index c7103470..f47f3a28 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -72,7 +72,7 @@ def start! end @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval @logger.log(Logger::INFO, 'Starting scheduler.') - @thread = Thread.new { run } + @thread = Thread.new { run_queue } @started = true end @@ -107,7 +107,7 @@ def stop! private - def process_events + def process_queue while @event_queue.length.positive? item = @event_queue.pop if item == SHUTDOWN_SIGNAL @@ -126,18 +126,18 @@ def process_events true end - def run + def run_queue loop do if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline @logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.') - break unless process_events + break unless process_queue flush_queue! @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval end - break unless process_events + break unless process_queue interval = (Helpers::DateTimeUtils.create_timestamp - @flushing_interval_deadline) / 1.0 From 2d22d0a8c727efe3d274a669058f40b0c822d1ad Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 11 Dec 2019 10:16:31 -0800 Subject: [PATCH 06/23] fix loop --- lib/optimizely/event/batch_event_processor.rb | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index f47f3a28..f74a20ff 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -83,7 +83,7 @@ def flush def process(user_event) @logger.log(Logger::DEBUG, "Received userEvent: #{user_event}") - if !@started || !@thread.alive? + unless @started @logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.') return end @@ -139,7 +139,7 @@ def run_queue break unless process_queue - interval = (Helpers::DateTimeUtils.create_timestamp - @flushing_interval_deadline) / 1.0 + interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) / 1000.0 sleep interval if interval.positive? end @@ -148,10 +148,7 @@ def run_queue rescue => e @logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}") ensure - @logger.log( - Logger::INFO, - 'Exiting processing loop. Attempting to flush pending events.' - ) + @logger.log(Logger::INFO, 'Exiting processing loop. Attempting to flush pending events.') flush_queue! end From 779ed8da47c410b186bcfe984b1c33d7f65baf60 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 11 Dec 2019 12:35:26 -0800 Subject: [PATCH 07/23] fix for unicorn --- lib/optimizely/event/batch_event_processor.rb | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index f74a20ff..b61b1ed1 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -31,7 +31,6 @@ class BatchEventProcessor < EventProcessor DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds DEFAULT_QUEUE_CAPACITY = 1000 DEFAULT_TIMEOUT_INTERVAL = 5 # interval in seconds - MAX_NIL_COUNT = 3 FLUSH_SIGNAL = 'FLUSH_SIGNAL' SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL' @@ -62,7 +61,7 @@ def initialize( @notification_center = notification_center @current_batch = [] @started = false - start! + @stopped = false end def start! @@ -74,6 +73,7 @@ def start! @logger.log(Logger::INFO, 'Starting scheduler.') @thread = Thread.new { run_queue } @started = true + @stopped = false end def flush @@ -83,10 +83,7 @@ def flush def process(user_event) @logger.log(Logger::DEBUG, "Received userEvent: #{user_event}") - unless @started - @logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.') - return - end + start! unless @started || @stopped begin @event_queue.push(user_event, true) @@ -103,6 +100,7 @@ def stop! @event_queue << SHUTDOWN_SIGNAL @thread.join(DEFAULT_TIMEOUT_INTERVAL) @started = false + @stopped = true end private @@ -139,7 +137,10 @@ def run_queue break unless process_queue - interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) / 1000.0 + interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) + + interval = interval / 5.0 if interval == @flush_interval + interval = interval * 0.001 sleep interval if interval.positive? end From 974546a3bce4317157c7f9eaacfc7c577d100a59 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 11 Dec 2019 13:56:08 -0800 Subject: [PATCH 08/23] testing --- lib/optimizely/event/batch_event_processor.rb | 17 ++++++++++++++--- .../event/forwarding_event_processor.rb | 2 +- spec/event/batch_event_processor_spec.rb | 6 +++++- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index b61b1ed1..9d2dd837 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -83,7 +83,14 @@ def flush def process(user_event) @logger.log(Logger::DEBUG, "Received userEvent: #{user_event}") - start! unless @started || @stopped + # if the processor has been explicitly stopped. Don't accept tasks + if @stopped + @logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.') + return + end + + # start if the processor hasn't been started + start! unless @started begin @event_queue.push(user_event, true) @@ -137,10 +144,14 @@ def run_queue break unless process_queue + # what is the current interval to flush interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) - interval = interval / 5.0 if interval == @flush_interval - interval = interval * 0.001 + # lets sleep a quarter + interval /= 4.0 + + # convert to seconds from milliseconds + interval *= 0.001 sleep interval if interval.positive? end diff --git a/lib/optimizely/event/forwarding_event_processor.rb b/lib/optimizely/event/forwarding_event_processor.rb index 8970f301..113193d8 100644 --- a/lib/optimizely/event/forwarding_event_processor.rb +++ b/lib/optimizely/event/forwarding_event_processor.rb @@ -30,7 +30,7 @@ def process(user_event) log_event = Optimizely::EventFactory.create_log_event(user_event, @logger) begin - @event_dispatcher.dispatch_event(log_event) + Thread.new { @event_dispatcher.dispatch_event(log_event) } @notification_center&.send_notifications( NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], log_event diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 9f9eb8b4..02163131 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -27,11 +27,12 @@ describe Optimizely::BatchEventProcessor do let(:config_body_JSON) { OptimizelySpec::VALID_CONFIG_BODY_JSON } let(:error_handler) { Optimizely::NoOpErrorHandler.new } - let(:spy_logger) { spy('logger') } + let(:spy_logger) { spy('logger')} let(:project_config) { Optimizely::DatafileProjectConfig.new(config_body_JSON, spy_logger, error_handler) } let(:event) { project_config.get_event_from_key('test_event') } before(:example) do + spy_logger = spy('logger') @event_queue = SizedQueue.new(100) @event_dispatcher = Optimizely::EventDispatcher.new allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) @@ -46,6 +47,8 @@ it 'should log waring when service is already started' do @event_processor = Optimizely::BatchEventProcessor.new(logger: spy_logger) @event_processor.start! + @event_processor.start! + expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Service already started.').once end @@ -355,6 +358,7 @@ logger: spy_logger ) + @event_processor.start! @event_processor.stop! user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) From 0a6febf509007204779242255bbf01092e73c940 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 11 Dec 2019 14:55:08 -0800 Subject: [PATCH 09/23] fix lint error --- lib/optimizely/event/batch_event_processor.rb | 3 +-- spec/event/batch_event_processor_spec.rb | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 9d2dd837..8a49dac5 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -147,8 +147,7 @@ def run_queue # what is the current interval to flush interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) - # lets sleep a quarter - interval /= 4.0 + interval /= 5.0 # convert to seconds from milliseconds interval *= 0.001 diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 02163131..14e7d081 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -27,7 +27,7 @@ describe Optimizely::BatchEventProcessor do let(:config_body_JSON) { OptimizelySpec::VALID_CONFIG_BODY_JSON } let(:error_handler) { Optimizely::NoOpErrorHandler.new } - let(:spy_logger) { spy('logger')} + let(:spy_logger) { spy('logger') } let(:project_config) { Optimizely::DatafileProjectConfig.new(config_body_JSON, spy_logger, error_handler) } let(:event) { project_config.get_event_from_key('test_event') } From 40db2091aaca5393aceae62101e86f6d0084fdbd Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 11 Dec 2019 17:00:30 -0800 Subject: [PATCH 10/23] fix unit tests --- lib/optimizely/event/batch_event_processor.rb | 2 +- .../event/forwarding_event_processor.rb | 17 +++-- spec/event/batch_event_processor_spec.rb | 64 +++++++++++++------ spec/event/forwarding_event_processor_spec.rb | 2 + 4 files changed, 59 insertions(+), 26 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 8a49dac5..97260135 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -147,7 +147,7 @@ def run_queue # what is the current interval to flush interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) - interval /= 5.0 + interval /= 10.0 # convert to seconds from milliseconds interval *= 0.001 diff --git a/lib/optimizely/event/forwarding_event_processor.rb b/lib/optimizely/event/forwarding_event_processor.rb index 113193d8..7ddbd2cb 100644 --- a/lib/optimizely/event/forwarding_event_processor.rb +++ b/lib/optimizely/event/forwarding_event_processor.rb @@ -30,11 +30,18 @@ def process(user_event) log_event = Optimizely::EventFactory.create_log_event(user_event, @logger) begin - Thread.new { @event_dispatcher.dispatch_event(log_event) } - @notification_center&.send_notifications( - NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], - log_event - ) + Thread.new { + begin + @event_dispatcher.dispatch_event(log_event) + @notification_center&.send_notifications( + NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], + log_event + ) + rescue StandardError => e + @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.") + end + + } rescue StandardError => e @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.") end diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 14e7d081..aae94a9f 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -65,7 +65,7 @@ @event_processor.process(conversion_event) # flush interval is set to 100ms. Wait for 300ms and assert that event is dispatched. - sleep 100 + sleep 0.1 expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once expect(@notification_center).to have_received(:send_notifications).with( @@ -79,7 +79,7 @@ @event_processor = Optimizely::BatchEventProcessor.new( event_dispatcher: @event_dispatcher, batch_size: 11, - flush_interval: 100_000, + flush_interval: 10_000, logger: spy_logger ) @@ -95,8 +95,12 @@ end # Wait until other thread has processed the event. - until @event_processor.event_queue.empty?; end - until @event_processor.current_batch.empty?; end + until @event_processor.event_queue.empty?; + sleep 0.1 + end + until @event_processor.current_batch.empty?; + sleep 0.1 + end expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).once expect(@event_dispatcher).to have_received(:dispatch_event).with( @@ -112,7 +116,7 @@ @event_processor = Optimizely::BatchEventProcessor.new( event_queue: @event_queue, event_dispatcher: @event_dispatcher, - flush_interval: 100_000, + flush_interval: 10_000, logger: spy_logger ) @@ -123,8 +127,12 @@ @event_processor.flush # Wait until other thread has processed the event. - until @event_processor.event_queue.empty?; end - until @event_processor.current_batch.empty?; end + until @event_processor.event_queue.empty?; + sleep 0.1 + end + until @event_processor.current_batch.empty?; + sleep 0.1 + end expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).twice expect(@event_processor.event_queue.length).to eq(0) @@ -146,13 +154,17 @@ expect(user_event1.event_context[:revision]).to eq('1') @event_processor.process(user_event1) # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 1; end + while @event_processor.current_batch.length != 1; + sleep 0.1 + end expect(user_event2.event_context[:revision]).to eq('2') @event_processor.process(user_event2) @event_processor.process(user_event2) # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 2; end + while @event_processor.current_batch.length != 2; + sleep 0.1 + end expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.').once @@ -173,13 +185,17 @@ expect(user_event1.event_context[:project_id]).to eq('X') @event_processor.process(user_event1) # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 1; end + while @event_processor.current_batch.length != 1; + sleep 0.1 + end expect(user_event2.event_context[:project_id]).to eq('Y') @event_processor.process(user_event2) @event_processor.process(user_event2) # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 2; end + while @event_processor.current_batch.length != 2; + sleep 0.1 + end expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.').once @@ -255,10 +271,14 @@ @event_processor.process(conversion_event) # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 1; end + while @event_processor.current_batch.length != 1; + sleep 0.1 + end @event_processor.flush # Wait until other thread has processed the event. - until @event_processor.current_batch.empty?; end + until @event_processor.current_batch.empty?; + sleep 0.1 + end expect(@notification_center).to have_received(:send_notifications).with( Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], @@ -284,10 +304,14 @@ @event_processor.process(conversion_event) # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 1; end + while @event_processor.current_batch.length != 1; + sleep 0.1 + end @event_processor.flush # Wait until other thread has processed the event. - until @event_processor.current_batch.empty?; end + until @event_processor.current_batch.empty?; + sleep 0.1 + end expect(@notification_center).not_to have_received(:send_notifications) expect(spy_logger).to have_received(:log).once.with( @@ -318,7 +342,9 @@ end # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 4; end + while @event_processor.current_batch.length != 4; + sleep(0.1) + end expect(@event_dispatcher).not_to have_received(:dispatch_event) @event_processor.stop! @@ -335,7 +361,7 @@ event_queue: SizedQueue.new(10), event_dispatcher: @event_dispatcher, batch_size: 100, - flush_interval: 100_000, + flush_interval: 10_000, logger: spy_logger ) @@ -344,8 +370,6 @@ @event_processor.process(user_event) end - # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 10; end expect(@event_dispatcher).not_to have_received(:dispatch_event) expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Payload not accepted by the queue: queue full').once end @@ -354,7 +378,7 @@ @event_processor = Optimizely::BatchEventProcessor.new( event_dispatcher: @event_dispatcher, batch_size: 100, - flush_interval: 100_000, + flush_interval: 10_000, logger: spy_logger ) diff --git a/spec/event/forwarding_event_processor_spec.rb b/spec/event/forwarding_event_processor_spec.rb index f58d0e90..24e38b8d 100644 --- a/spec/event/forwarding_event_processor_spec.rb +++ b/spec/event/forwarding_event_processor_spec.rb @@ -104,6 +104,8 @@ forwarding_event_processor.process(@conversion_event) + sleep 0.1 + expect(notification_center).not_to have_received(:send_notifications) expect(spy_logger).to have_received(:log).once.with( From b1f60dc4dc3d381a82728bfe0e46a045e1a744be Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 11 Dec 2019 17:31:50 -0800 Subject: [PATCH 11/23] fix lint errors --- .../event/forwarding_event_processor.rb | 11 ++-- spec/event/batch_event_processor_spec.rb | 56 ++++++------------- 2 files changed, 21 insertions(+), 46 deletions(-) diff --git a/lib/optimizely/event/forwarding_event_processor.rb b/lib/optimizely/event/forwarding_event_processor.rb index 7ddbd2cb..36034013 100644 --- a/lib/optimizely/event/forwarding_event_processor.rb +++ b/lib/optimizely/event/forwarding_event_processor.rb @@ -30,18 +30,15 @@ def process(user_event) log_event = Optimizely::EventFactory.create_log_event(user_event, @logger) begin - Thread.new { + Thread.new do begin @event_dispatcher.dispatch_event(log_event) - @notification_center&.send_notifications( - NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], - log_event - ) + + @notification_center&.send_notifications(NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], log_event) rescue StandardError => e @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.") end - - } + end rescue StandardError => e @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.") end diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index aae94a9f..1b9b8a35 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -95,12 +95,9 @@ end # Wait until other thread has processed the event. - until @event_processor.event_queue.empty?; - sleep 0.1 - end - until @event_processor.current_batch.empty?; - sleep 0.1 - end + sleep 0.1 until @event_processor.event_queue.empty? + + sleep 0.1 until @event_processor.current_batch.empty? expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).once expect(@event_dispatcher).to have_received(:dispatch_event).with( @@ -127,12 +124,9 @@ @event_processor.flush # Wait until other thread has processed the event. - until @event_processor.event_queue.empty?; - sleep 0.1 - end - until @event_processor.current_batch.empty?; - sleep 0.1 - end + sleep 0.1 until @event_processor.event_queue.empty? + + sleep 0.1 until @event_processor.current_batch.empty? expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).twice expect(@event_processor.event_queue.length).to eq(0) @@ -154,17 +148,13 @@ expect(user_event1.event_context[:revision]).to eq('1') @event_processor.process(user_event1) # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 1; - sleep 0.1 - end + sleep 0.1 while @event_processor.current_batch.length != 1 expect(user_event2.event_context[:revision]).to eq('2') @event_processor.process(user_event2) @event_processor.process(user_event2) # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 2; - sleep 0.1 - end + sleep 0.1 while @event_processor.current_batch.length != 2 expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.').once @@ -185,17 +175,13 @@ expect(user_event1.event_context[:project_id]).to eq('X') @event_processor.process(user_event1) # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 1; - sleep 0.1 - end + sleep 0.1 while @event_processor.current_batch.length != 1 expect(user_event2.event_context[:project_id]).to eq('Y') @event_processor.process(user_event2) @event_processor.process(user_event2) # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 2; - sleep 0.1 - end + sleep 0.1 while @event_processor.current_batch.length != 2 expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.').once @@ -271,14 +257,11 @@ @event_processor.process(conversion_event) # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 1; - sleep 0.1 - end + sleep 0.1 while @event_processor.current_batch.length != 1 + @event_processor.flush # Wait until other thread has processed the event. - until @event_processor.current_batch.empty?; - sleep 0.1 - end + sleep 0.1 until @event_processor.current_batch.empty? expect(@notification_center).to have_received(:send_notifications).with( Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], @@ -304,14 +287,11 @@ @event_processor.process(conversion_event) # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 1; - sleep 0.1 - end + sleep 0.1 while @event_processor.current_batch.length != 1 + @event_processor.flush # Wait until other thread has processed the event. - until @event_processor.current_batch.empty?; - sleep 0.1 - end + sleep 0.1 until @event_processor.current_batch.empty? expect(@notification_center).not_to have_received(:send_notifications) expect(spy_logger).to have_received(:log).once.with( @@ -342,9 +322,7 @@ end # Wait until other thread has processed the event. - while @event_processor.current_batch.length != 4; - sleep(0.1) - end + sleep 0.1 while @event_processor.current_batch.length != 4 expect(@event_dispatcher).not_to have_received(:dispatch_event) @event_processor.stop! From af7fb6f70edb4129d1185c4a3490e577c6730b0d Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Thu, 12 Dec 2019 11:36:51 -0800 Subject: [PATCH 12/23] use mutex and resource to wait for flush interval or be signalled when there is something to process --- lib/optimizely/event/batch_event_processor.rb | 29 +++++++++++++------ spec/event/batch_event_processor_spec.rb | 2 +- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 97260135..53153409 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -71,6 +71,10 @@ def start! end @flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval @logger.log(Logger::INFO, 'Starting scheduler.') + if @wait_mutex.nil? + @wait_mutex = Mutex.new + @resource = ConditionVariable.new + end @thread = Thread.new { run_queue } @started = true @stopped = false @@ -78,6 +82,9 @@ def start! def flush @event_queue << FLUSH_SIGNAL + @wait_mutex.synchronize { + @resource.signal + } end def process(user_event) @@ -94,6 +101,9 @@ def process(user_event) begin @event_queue.push(user_event, true) + @wait_mutex.synchronize { + @resource.signal + } rescue => e @logger.log(Logger::WARN, 'Payload not accepted by the queue: ' + e.message) return @@ -105,6 +115,9 @@ def stop! @logger.log(Logger::INFO, 'Stopping scheduler.') @event_queue << SHUTDOWN_SIGNAL + @wait_mutex.synchronize { + @resource.signal + } @thread.join(DEFAULT_TIMEOUT_INTERVAL) @started = false @stopped = true @@ -144,15 +157,13 @@ def run_queue break unless process_queue - # what is the current interval to flush - interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) - - interval /= 10.0 - - # convert to seconds from milliseconds - interval *= 0.001 - - sleep interval if interval.positive? + # what is the current interval to flush in seconds + interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) * 0.001 + if interval.positive? + @wait_mutex.synchronize { + @resource.wait(@wait_mutex, interval) + } + end end rescue SignalException @logger.log(Logger::ERROR, 'Interrupted while processing buffer.') diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 1b9b8a35..28553fff 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -344,7 +344,7 @@ ) user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - 11.times do + 12.times do @event_processor.process(user_event) end From 245d067246d0813d7129c88c85b160e061f6d463 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Thu, 12 Dec 2019 11:47:09 -0800 Subject: [PATCH 13/23] fix lint --- lib/optimizely/event/batch_event_processor.rb | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/lib/optimizely/event/batch_event_processor.rb b/lib/optimizely/event/batch_event_processor.rb index 53153409..76e49293 100644 --- a/lib/optimizely/event/batch_event_processor.rb +++ b/lib/optimizely/event/batch_event_processor.rb @@ -82,9 +82,7 @@ def start! def flush @event_queue << FLUSH_SIGNAL - @wait_mutex.synchronize { - @resource.signal - } + @wait_mutex.synchronize { @resource.signal } end def process(user_event) @@ -101,9 +99,7 @@ def process(user_event) begin @event_queue.push(user_event, true) - @wait_mutex.synchronize { - @resource.signal - } + @wait_mutex.synchronize { @resource.signal } rescue => e @logger.log(Logger::WARN, 'Payload not accepted by the queue: ' + e.message) return @@ -115,9 +111,7 @@ def stop! @logger.log(Logger::INFO, 'Stopping scheduler.') @event_queue << SHUTDOWN_SIGNAL - @wait_mutex.synchronize { - @resource.signal - } + @wait_mutex.synchronize { @resource.signal } @thread.join(DEFAULT_TIMEOUT_INTERVAL) @started = false @stopped = true @@ -159,11 +153,10 @@ def run_queue # what is the current interval to flush in seconds interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) * 0.001 - if interval.positive? - @wait_mutex.synchronize { - @resource.wait(@wait_mutex, interval) - } - end + + next unless interval.positive? + + @wait_mutex.synchronize { @resource.wait(@wait_mutex, interval) } end rescue SignalException @logger.log(Logger::ERROR, 'Interrupted while processing buffer.') From 05c8aee6fc7ac17ece2732f3b3cf5e1aece4ef3e Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Tue, 17 Dec 2019 14:52:01 -0800 Subject: [PATCH 14/23] trying to cleanup last few unit tests --- spec/event/forwarding_event_processor_spec.rb | 2 + spec/project_spec.rb | 51 ++++++++++++++++--- 2 files changed, 46 insertions(+), 7 deletions(-) diff --git a/spec/event/forwarding_event_processor_spec.rb b/spec/event/forwarding_event_processor_spec.rb index 24e38b8d..22e340c1 100644 --- a/spec/event/forwarding_event_processor_spec.rb +++ b/spec/event/forwarding_event_processor_spec.rb @@ -78,6 +78,8 @@ forwarding_event_processor.process(@conversion_event) + sleep 0.1 + expect(notification_center).to have_received(:send_notifications).with( Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers) diff --git a/spec/project_spec.rb b/spec/project_spec.rb index ecfa7da3..b0c1af9a 100644 --- a/spec/project_spec.rb +++ b/spec/project_spec.rb @@ -187,6 +187,7 @@ class InvalidErrorHandler; end stub_request(:post, impression_log_url).with(query: params) expect(project_instance.activate('test_experiment', 'test_user')).to eq('control') + sleep 0.1 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, impression_log_url, params, post_headers)).once expect(project_instance.decision_service.bucketer).to have_received(:bucket).once end @@ -205,6 +206,7 @@ class InvalidErrorHandler; end stub_request(:post, impression_log_url).with(query: params) expect(project_instance.activate('test_experiment', 'test_user')).to eq('control') + sleep 0.1 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, impression_log_url, params, post_headers)).once end @@ -296,6 +298,7 @@ class InvalidErrorHandler; end # Should be included via exact match string audience with id '3468206642' expect(@project_typed_audience_instance.activate('typed_audience_experiment', 'test_user', 'house' => 'Gryffindor')) .to eq('A') + sleep 0.1 expect(@project_typed_audience_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, impression_log_url, params, post_headers)).once expect(@project_typed_audience_instance.decision_service.bucketer).to have_received(:bucket).once end @@ -371,6 +374,7 @@ class InvalidErrorHandler; end expect(@project_typed_audience_instance.activate('audience_combinations_experiment', 'test_user', user_attributes)) .to eq('A') + sleep 0.1 expect(@project_typed_audience_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, impression_log_url, params, post_headers)).once expect(@project_typed_audience_instance.decision_service.bucketer).to have_received(:bucket).once end @@ -432,6 +436,7 @@ class InvalidErrorHandler; end expect(project_instance.activate('test_experiment_with_audience', 'test_user', attributes)) .to eq('control_with_audience') + sleep 0.1 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, impression_log_url, params, post_headers)).once expect(project_instance.decision_service.bucketer).to have_received(:bucket).once end @@ -492,6 +497,7 @@ class InvalidErrorHandler; end project_instance.decision_service.set_forced_variation(project_config, 'test_experiment_with_audience', 'test_user', 'variation_with_audience') variation_to_return = project_instance.decision_service.get_forced_variation(project_config, 'test_experiment', 'test_user') allow(project_instance.decision_service.bucketer).to receive(:bucket).and_return(variation_to_return) + sleep 0.1 allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) expect(project_instance.activate('test_experiment_with_audience', 'test_user', 'browser_type' => 'firefox')) @@ -595,6 +601,7 @@ def callback(_args); end method(:callback) ) variation_to_return = project_config.get_variation_from_id('test_experiment', '111128') + sleep 0.1 allow(project_instance.decision_service.bucketer).to receive(:bucket).and_return(variation_to_return) allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) allow(project_config).to receive(:get_audience_ids_for_experiment) @@ -602,6 +609,8 @@ def callback(_args); end .and_return([]) experiment = project_config.get_experiment_from_key('test_experiment') + # sleep because forwarder is wrapped in a thread + sleep 0.1 # Decision listener expect(project_instance.notification_center).to receive(:send_notifications).with( Optimizely::NotificationCenter::NOTIFICATION_TYPES[:DECISION], any_args @@ -621,6 +630,8 @@ def callback(_args); end project_instance.activate('test_experiment', 'test_user') + sleep 0.1 + expect(spy_logger).to have_received(:log).once.with(Logger::INFO, "Activating user 'test_user' in experiment 'test_experiment'.") end @@ -634,6 +645,7 @@ def callback(_args); end allow(project_instance.decision_service.bucketer).to receive(:bucket).and_return(variation_to_return) allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(any_args).and_raise(RuntimeError) project_instance.activate('test_experiment', 'test_user') + sleep 0.1 expect(spy_logger).to have_received(:log).once.with(Logger::ERROR, "Error dispatching event: #{log_event} RuntimeError.") end @@ -661,6 +673,7 @@ def callback(_args); end expect(project_instance.activate('test_experiment_with_audience', 'forced_audience_user', 'browser_type' => 'wrong_browser')) .to eq('variation_with_audience') + sleep 0.1 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, impression_log_url, params, post_headers)).once expect(Optimizely::Audience).to_not have_received(:user_in_experiment?) end @@ -712,6 +725,7 @@ def callback(_args); end expect(project_instance.notification_center).to receive(:send_notifications).ordered project_instance.activate('test_experiment', 'test_user') + sleep 0.1 end end @@ -720,7 +734,7 @@ def callback(_args); end WebMock.allow_net_connect! notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler) - expect(notification_center).to receive(:send_notifications).with( + expect(notification_center).to receive(:send_notifications).once.with( Optimizely::NotificationCenter::NOTIFICATION_TYPES[:OPTIMIZELY_CONFIG_UPDATE] ).ordered @@ -728,8 +742,9 @@ def callback(_args); end expect(notification_center).to receive(:send_notifications).ordered http_project_config_manager = Optimizely::HTTPProjectConfigManager.new( + logger:spy_logger, url: 'https://cdn.optimizely.com/datafiles/QBw9gFM8oTn7ogY9ANCC1z.json', - notification_center: notification_center + notification_center: notification_center, ) project_instance = Optimizely::Project.new( @@ -741,18 +756,20 @@ def callback(_args); end expect(http_project_config_manager.config).not_to eq(nil) expect(project_instance.activate('checkout_flow_experiment', 'test_user')).not_to eq(nil) + sleep 0.5 end it 'should update config, send update notification when sdk key is provided' do WebMock.allow_net_connect! notification_center = Optimizely::NotificationCenter.new(spy_logger, error_handler) + allow(notification_center).to receive(:send_notifications) expect(notification_center).to receive(:send_notifications).with( Optimizely::NotificationCenter::NOTIFICATION_TYPES[:OPTIMIZELY_CONFIG_UPDATE] - ).ordered + ) - expect(notification_center).to receive(:send_notifications).ordered - expect(notification_center).to receive(:send_notifications).ordered + expect(notification_center).to receive(:send_notifications) + expect(notification_center).to receive(:send_notifications) http_project_config_manager = Optimizely::HTTPProjectConfigManager.new( sdk_key: 'QBw9gFM8oTn7ogY9ANCC1z', @@ -768,6 +785,7 @@ def callback(_args); end expect(http_project_config_manager.config).not_to eq(nil) expect(project_instance.activate('checkout_flow_experiment', 'test_user')).not_to eq(nil) + sleep 0.5 end end @@ -792,6 +810,7 @@ def callback(_args); end expect(project_instance.is_valid).to be true expect(project_instance.activate('checkout_flow_experiment', 'test_user')).not_to eq(nil) + sleep 0.5 end end end @@ -859,6 +878,7 @@ def callback(_args); end allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) project_instance.track('test_event', 'test_user') + sleep 0.1 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, params, post_headers)).once end @@ -866,6 +886,7 @@ def callback(_args); end project_instance.decision_service.set_forced_variation(project_config, 'test_experiment', 'test_user', 'variation') allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) project_instance.track('test_event', 'test_user') + sleep 0.1 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, @expected_track_event_params, post_headers)).once end @@ -878,6 +899,7 @@ def callback(_args); end project_instance.decision_service.set_forced_variation(project_config, 'test_experiment', 'test_user', 'variation') allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) project_instance.track('test_event', 'test_user', nil, revenue: 42) + sleep 0.1 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, params, post_headers)).once end @@ -889,6 +911,7 @@ def callback(_args); end allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(any_args).and_raise(RuntimeError) project_instance.track('test_event', 'test_user') + sleep 0.1 expect(spy_logger).to have_received(:log).once.with(Logger::ERROR, "Error dispatching event: #{log_event} RuntimeError.") end @@ -917,6 +940,7 @@ def callback(_args); end ).ordered project_instance.track('test_event', 'test_user', nil, 'revenue' => 42) + sleep 0.1 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, params, post_headers)).once end @@ -933,6 +957,7 @@ def callback(_args); end allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) project_instance.track('test_event_with_audience', 'test_user', 'browser_type' => 'firefox') + sleep 0.1 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, params, post_headers)).once end @@ -980,6 +1005,7 @@ def callback(_args); end # Should be included via substring match string audience with id '3988293898' allow(@project_typed_audience_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) @project_typed_audience_instance.track('item_bought', 'test_user', 'house' => 'Welcome to Slytherin!') + sleep 0.1 expect(@project_typed_audience_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, @expected_event_params, post_headers)).once end @@ -988,6 +1014,7 @@ def callback(_args); end params[:visitors][0][:attributes][0][:value] = 'Welcome to Hufflepuff!' allow(@project_typed_audience_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) @project_typed_audience_instance.track('item_bought', 'test_user', 'house' => 'Welcome to Hufflepuff!') + sleep 0.1 expect(@project_typed_audience_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, params, post_headers)).once end @@ -1018,6 +1045,7 @@ def callback(_args); end params[:visitors][0][:snapshots][0][:events][0][:key] = 'user_signed_up' allow(@project_typed_audience_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) @project_typed_audience_instance.track('user_signed_up', 'test_user', user_attributes) + sleep 0.1 expect(@project_typed_audience_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, params, post_headers)).once end end @@ -1035,6 +1063,7 @@ def callback(_args); end allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) project_instance.track('test_event_with_audience', 'test_user', 'browser_type' => 'cyberdog') + sleep 0.1 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, params, post_headers)).once end @@ -1044,6 +1073,7 @@ def callback(_args); end params[:visitors][0][:snapshots][0][:events][0][:key] = 'test_event_not_running' allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) project_instance.track('test_event_not_running', 'test_user') + sleep 0.1 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, conversion_log_url, params, post_headers)).once end @@ -1055,6 +1085,7 @@ def callback(_args); end ) allow(project_instance.event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event)) project_instance.track('test_event', 'test_user', nil, 'revenue' => 42) + sleep 0.1 expect(spy_logger).to have_received(:log).with(Logger::INFO, "Tracking event 'test_event' for user 'test_user'.") end @@ -1066,6 +1097,7 @@ def callback(_args); end it 'should return false when called with attributes in an invalid format' do expect(project_instance.error_handler).to receive(:handle_error).with(any_args).once.and_return(nil) project_instance.track('test_event', 'test_user', 'invalid') + sleep 0.1 end it 'should raise an exception when called with event tags in an invalid format' do @@ -1538,6 +1570,7 @@ def callback(_args); end allow(project_instance.decision_service).to receive(:get_variation_for_feature).and_return(decision_to_return) expect(project_instance.is_feature_enabled('multi_variate_feature', 'test_user')).to be false + sleep 0.2 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(instance_of(Optimizely::Event)).once expect(spy_logger).to have_received(:log).once.with(Logger::INFO, "Feature 'multi_variate_feature' is not enabled for user 'test_user'.") end @@ -1560,7 +1593,7 @@ def callback(_args); end # Decision listener called when the user is in experiment with variation feature on. expect(variation_to_return['featureEnabled']).to be true - expect(project_instance.notification_center).to receive(:send_notifications).once.with( + expect(project_instance.notification_center).to receive(:send_notifications).with( Optimizely::NotificationCenter::NOTIFICATION_TYPES[:DECISION], 'feature', 'test_user', {}, feature_enabled: true, @@ -1573,6 +1606,7 @@ def callback(_args); end ).ordered project_instance.is_feature_enabled('multi_variate_feature', 'test_user') + sleep 0.5 end it 'should call decision listener when user is bucketed into a feature experiment with featureEnabled property is false' do @@ -1604,6 +1638,7 @@ def callback(_args); end ) project_instance.is_feature_enabled('multi_variate_feature', 'test_user', 'browser_type' => 'chrome') + sleep 0.5 end it 'should call decision listener when user is bucketed into rollout with featureEnabled property is true' do @@ -1628,6 +1663,7 @@ def callback(_args); end ) project_instance.is_feature_enabled('boolean_single_variable_feature', 'test_user', 'browser_type' => 'firefox') + sleep 0.1 end it 'should call decision listener when user is bucketed into rollout with featureEnabled property is false' do @@ -1644,6 +1680,7 @@ def callback(_args); end ) project_instance.is_feature_enabled('boolean_single_variable_feature', 'test_user') + sleep 0.1 end it 'call decision listener when the user is not bucketed into any experiment or rollout' do @@ -2814,7 +2851,7 @@ def callback(_args); end project_instance = Optimizely::Project.new(nil, nil, nil, nil, true, nil, nil, config_manager, nil, event_processor) expect(config_manager.stopped).to be false - expect(event_processor.started).to be true + expect(event_processor.started).to be false project_instance.close From a14c22bebfafd7c98dfa8715a3109eac35d24873 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 18 Dec 2019 14:02:17 -0800 Subject: [PATCH 15/23] fix failing tests --- lib/optimizely/event/forwarding_event_processor.rb | 2 +- spec/event/forwarding_event_processor_spec.rb | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/optimizely/event/forwarding_event_processor.rb b/lib/optimizely/event/forwarding_event_processor.rb index 36034013..010e9d44 100644 --- a/lib/optimizely/event/forwarding_event_processor.rb +++ b/lib/optimizely/event/forwarding_event_processor.rb @@ -30,11 +30,11 @@ def process(user_event) log_event = Optimizely::EventFactory.create_log_event(user_event, @logger) begin + @notification_center&.send_notifications(NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], log_event) Thread.new do begin @event_dispatcher.dispatch_event(log_event) - @notification_center&.send_notifications(NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], log_event) rescue StandardError => e @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.") end diff --git a/spec/event/forwarding_event_processor_spec.rb b/spec/event/forwarding_event_processor_spec.rb index 22e340c1..59bf62d7 100644 --- a/spec/event/forwarding_event_processor_spec.rb +++ b/spec/event/forwarding_event_processor_spec.rb @@ -108,8 +108,6 @@ sleep 0.1 - expect(notification_center).not_to have_received(:send_notifications) - expect(spy_logger).to have_received(:log).once.with( Logger::ERROR, "Error dispatching event: #{log_event} Timeout::Error." From 3bc3cfbdcbd7bbffb216b25655ba5461768be1fb Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 18 Dec 2019 14:12:58 -0800 Subject: [PATCH 16/23] fix lint errors --- spec/project_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/project_spec.rb b/spec/project_spec.rb index b0c1af9a..f15b6cb2 100644 --- a/spec/project_spec.rb +++ b/spec/project_spec.rb @@ -742,9 +742,9 @@ def callback(_args); end expect(notification_center).to receive(:send_notifications).ordered http_project_config_manager = Optimizely::HTTPProjectConfigManager.new( - logger:spy_logger, + logger: spy_logger, url: 'https://cdn.optimizely.com/datafiles/QBw9gFM8oTn7ogY9ANCC1z.json', - notification_center: notification_center, + notification_center: notification_center ) project_instance = Optimizely::Project.new( From 0f6531a8b81462930d7810a97419d52a799de691 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 18 Dec 2019 15:33:21 -0800 Subject: [PATCH 17/23] fix lint error --- lib/optimizely/event/forwarding_event_processor.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/optimizely/event/forwarding_event_processor.rb b/lib/optimizely/event/forwarding_event_processor.rb index 010e9d44..421ce5b7 100644 --- a/lib/optimizely/event/forwarding_event_processor.rb +++ b/lib/optimizely/event/forwarding_event_processor.rb @@ -34,7 +34,6 @@ def process(user_event) Thread.new do begin @event_dispatcher.dispatch_event(log_event) - rescue StandardError => e @logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.") end From 8a1349b6a448ec9f35eb63d9fd9504d06ea4b01b Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 18 Dec 2019 16:10:05 -0800 Subject: [PATCH 18/23] fix last tests --- spec/event/batch_event_processor_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 28553fff..4a593ef3 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -65,7 +65,7 @@ @event_processor.process(conversion_event) # flush interval is set to 100ms. Wait for 300ms and assert that event is dispatched. - sleep 0.1 + sleep 0.175 expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once expect(@notification_center).to have_received(:send_notifications).with( @@ -344,7 +344,7 @@ ) user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - 12.times do + 11.times do @event_processor.process(user_event) end From bf2264b5decac2a60e678d7d51a3bd7cbc820cd3 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 18 Dec 2019 16:21:53 -0800 Subject: [PATCH 19/23] fix timeout --- spec/event/batch_event_processor_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 4a593ef3..ae510bab 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -65,7 +65,7 @@ @event_processor.process(conversion_event) # flush interval is set to 100ms. Wait for 300ms and assert that event is dispatched. - sleep 0.175 + sleep 0.2 expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once expect(@notification_center).to have_received(:send_notifications).with( From a94c4e6a0666cb1c915afa476c0da9a84b900c57 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 18 Dec 2019 17:02:03 -0800 Subject: [PATCH 20/23] fix bath processor test --- spec/event/batch_event_processor_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index ae510bab..0b680ef7 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -344,12 +344,12 @@ ) user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - 11.times do + 20.times do @event_processor.process(user_event) end expect(@event_dispatcher).not_to have_received(:dispatch_event) - expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Payload not accepted by the queue: queue full').once + expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Payload not accepted by the queue: queue full').at_least(:once) end it 'should not process and log when Executor is not running' do From 061e37cd0d12f83637181502505a96e81e543bc5 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 18 Dec 2019 17:05:16 -0800 Subject: [PATCH 21/23] fix broken tests on different ruby versions --- spec/project_spec.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/spec/project_spec.rb b/spec/project_spec.rb index f15b6cb2..e29bd16f 100644 --- a/spec/project_spec.rb +++ b/spec/project_spec.rb @@ -502,6 +502,7 @@ class InvalidErrorHandler; end expect(project_instance.activate('test_experiment_with_audience', 'test_user', 'browser_type' => 'firefox')) .to eq('variation_with_audience') + sleep 0.1 expect(project_instance.event_dispatcher).to have_received(:dispatch_event).with(Optimizely::Event.new(:post, impression_log_url, params, post_headers)).once end From 174d2d9851ee11812ce6c54b415cde024cb80f58 Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 18 Dec 2019 17:26:13 -0800 Subject: [PATCH 22/23] fill queue. --- spec/event/batch_event_processor_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/event/batch_event_processor_spec.rb b/spec/event/batch_event_processor_spec.rb index 0b680ef7..ffb0b92a 100644 --- a/spec/event/batch_event_processor_spec.rb +++ b/spec/event/batch_event_processor_spec.rb @@ -344,7 +344,7 @@ ) user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil) - 20.times do + 80.times do @event_processor.process(user_event) end From 2b1f53e471d3cd8c040c6dd58de01041041227cb Mon Sep 17 00:00:00 2001 From: Thomas Zurkan Date: Wed, 18 Dec 2019 17:42:20 -0800 Subject: [PATCH 23/23] try and pass with 2.6. skip update --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 9f807a80..c1c9d474 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,7 +9,7 @@ rvm: - 2.5.1 - 2.6.0 before_install: - - gem update --system +# - gem update --system - gem install bundler install: - bundle install