Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
3410567
refactor for unicorn threads
thomaszurkan-optimizely Dec 9, 2019
bdb018d
lint errors
thomaszurkan-optimizely Dec 9, 2019
7d457b3
remove rescue Exception.
thomaszurkan-optimizely Dec 9, 2019
33d4ec5
more robust error message on queue full
thomaszurkan-optimizely Dec 9, 2019
4784a79
rename methods for clarity
thomaszurkan-optimizely Dec 9, 2019
2d22d0a
fix loop
thomaszurkan-optimizely Dec 11, 2019
779ed8d
fix for unicorn
thomaszurkan-optimizely Dec 11, 2019
974546a
testing
thomaszurkan-optimizely Dec 11, 2019
0a6febf
fix lint error
thomaszurkan-optimizely Dec 11, 2019
40db209
fix unit tests
thomaszurkan-optimizely Dec 12, 2019
b1f60dc
fix lint errors
thomaszurkan-optimizely Dec 12, 2019
af7fb6f
use mutex and resource to wait for flush interval or be signalled whe…
thomaszurkan-optimizely Dec 12, 2019
245d067
fix lint
thomaszurkan-optimizely Dec 12, 2019
05c8aee
trying to cleanup last few unit tests
thomaszurkan-optimizely Dec 17, 2019
a14c22b
fix failing tests
thomaszurkan-optimizely Dec 18, 2019
b128c36
Merge branch 'master' into refactorForUnicorn
thomaszurkan-optimizely Dec 18, 2019
3bc3cfb
fix lint errors
thomaszurkan-optimizely Dec 18, 2019
2601125
Merge branch 'refactorForUnicorn' of https://github.com/optimizely/ru…
thomaszurkan-optimizely Dec 18, 2019
0f6531a
fix lint error
thomaszurkan-optimizely Dec 18, 2019
8a1349b
fix last tests
thomaszurkan-optimizely Dec 19, 2019
bf2264b
fix timeout
thomaszurkan-optimizely Dec 19, 2019
a94c4e6
fix bath processor test
thomaszurkan-optimizely Dec 19, 2019
061e37c
fix broken tests on different ruby versions
thomaszurkan-optimizely Dec 19, 2019
174d2d9
fill queue.
thomaszurkan-optimizely Dec 19, 2019
2b1f53e
try and pass with 2.6. skip update
thomaszurkan-optimizely Dec 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Style/SignalException:
Enabled: false

Lint/RescueException:
Enabled: false
Enabled: true

Layout/EndOfLine:
EnforcedStyle: lf
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ rvm:
- 2.5.1
- 2.6.0
before_install:
- gem update --system
# - gem update --system
- gem install bundler
install:
- bundle install
Expand Down
86 changes: 47 additions & 39 deletions lib/optimizely/event/batch_event_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class BatchEventProcessor < EventProcessor
DEFAULT_BATCH_INTERVAL = 30_000 # interval in milliseconds
DEFAULT_QUEUE_CAPACITY = 1000
DEFAULT_TIMEOUT_INTERVAL = 5 # interval in seconds
MAX_NIL_COUNT = 3

FLUSH_SIGNAL = 'FLUSH_SIGNAL'
SHUTDOWN_SIGNAL = 'SHUTDOWN_SIGNAL'
Expand Down Expand Up @@ -62,7 +61,7 @@ def initialize(
@notification_center = notification_center
@current_batch = []
@started = false
start!
@stopped = false
end

def start!
Expand All @@ -72,26 +71,37 @@ def start!
end
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
@logger.log(Logger::INFO, 'Starting scheduler.')
@thread = Thread.new { run }
if @wait_mutex.nil?
@wait_mutex = Mutex.new
@resource = ConditionVariable.new
end
@thread = Thread.new { run_queue }
@started = true
@stopped = false
end

def flush
@event_queue << FLUSH_SIGNAL
@wait_mutex.synchronize { @resource.signal }
end

def process(user_event)
@logger.log(Logger::DEBUG, "Received userEvent: #{user_event}")

if !@started || !@thread.alive?
# if the processor has been explicitly stopped. Don't accept tasks
if @stopped
@logger.log(Logger::WARN, 'Executor shutdown, not accepting tasks.')
return
end

# start if the processor hasn't been started
start! unless @started

begin
@event_queue.push(user_event, true)
rescue Exception
@logger.log(Logger::WARN, 'Payload not accepted by the queue.')
@wait_mutex.synchronize { @resource.signal }
rescue => e
@logger.log(Logger::WARN, 'Payload not accepted by the queue: ' + e.message)
return
end
end
Expand All @@ -101,42 +111,20 @@ def stop!

@logger.log(Logger::INFO, 'Stopping scheduler.')
@event_queue << SHUTDOWN_SIGNAL
@wait_mutex.synchronize { @resource.signal }
@thread.join(DEFAULT_TIMEOUT_INTERVAL)
@started = false
@stopped = true
end

private

def run
# if we receive a number of item nils that reach MAX_NIL_COUNT,
# then we hang on the pop via setting use_pop to false
@nil_count = 0
# hang on pop if true
@use_pop = false
loop do
if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline
@logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.')
flush_queue!
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
@use_pop = true if @nil_count > MAX_NIL_COUNT
end

item = @event_queue.pop if @event_queue.length.positive? || @use_pop

if item.nil?
# when nil count is greater than MAX_NIL_COUNT, we hang on the pop until there is an item available.
# this avoids to much spinning of the loop.
@nil_count += 1
next
end

# reset nil_count and use_pop if we have received an item.
@nil_count = 0
@use_pop = false

def process_queue
while @event_queue.length.positive?
item = @event_queue.pop
if item == SHUTDOWN_SIGNAL
@logger.log(Logger::DEBUG, 'Received shutdown signal.')
break
return false
end

if item == FLUSH_SIGNAL
Expand All @@ -147,15 +135,35 @@ def run

add_to_batch(item) if item.is_a? Optimizely::UserEvent
end
true
end

def run_queue
loop do
if Helpers::DateTimeUtils.create_timestamp >= @flushing_interval_deadline
@logger.log(Logger::DEBUG, 'Deadline exceeded flushing current batch.')

break unless process_queue

flush_queue!
@flushing_interval_deadline = Helpers::DateTimeUtils.create_timestamp + @flush_interval
end

break unless process_queue

# what is the current interval to flush in seconds
interval = (@flushing_interval_deadline - Helpers::DateTimeUtils.create_timestamp) * 0.001

next unless interval.positive?

@wait_mutex.synchronize { @resource.wait(@wait_mutex, interval) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we run into a race condition where flush, stop or process is called from the main thread and hence the resource is signaled while we are processing the queue? And this thread waits later. In this scenario, the thread would wait up to the interval before processing any events. That may become an issue if the interval is long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each of those elements adds an element to the queue. in the case of stop, the process exits, if we are not in the synchronize block and receive a signal, it is ignored. So, this would have to happen during interval calculation time. But, I could add a test to see if the queue length is positive before the wait.

end
rescue SignalException
@logger.log(Logger::ERROR, 'Interrupted while processing buffer.')
rescue Exception => e
rescue => e
@logger.log(Logger::ERROR, "Uncaught exception processing buffer. #{e.message}")
ensure
@logger.log(
Logger::INFO,
'Exiting processing loop. Attempting to flush pending events.'
)
@logger.log(Logger::INFO, 'Exiting processing loop. Attempting to flush pending events.')
flush_queue!
end

Expand Down
13 changes: 8 additions & 5 deletions lib/optimizely/event/forwarding_event_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,14 @@ def process(user_event)
log_event = Optimizely::EventFactory.create_log_event(user_event, @logger)

begin
@event_dispatcher.dispatch_event(log_event)
@notification_center&.send_notifications(
NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
log_event
)
@notification_center&.send_notifications(NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT], log_event)
Thread.new do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we introducing thread here? Isn't Forwarding Event Processor meant to work in the legacy style? Dispatching 1 event at a time in the main thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Synchronously 1 at a time on the same thread can't scale and so no one will ever use it. However, we could make this configurable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this makes sense, but does it make sense to do in a follow-up PR? Seems like it'll break a lot of tests. That way we can keep just the batching fix in this PR

begin
@event_dispatcher.dispatch_event(log_event)
rescue StandardError => e
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
end
end
rescue StandardError => e
@logger.log(Logger::ERROR, "Error dispatching event: #{log_event} #{e.message}.")
end
Expand Down
50 changes: 28 additions & 22 deletions spec/event/batch_event_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
let(:event) { project_config.get_event_from_key('test_event') }

before(:example) do
spy_logger = spy('logger')
@event_queue = SizedQueue.new(100)
@event_dispatcher = Optimizely::EventDispatcher.new
allow(@event_dispatcher).to receive(:dispatch_event).with(instance_of(Optimizely::Event))
Expand All @@ -46,6 +47,8 @@
it 'should log waring when service is already started' do
@event_processor = Optimizely::BatchEventProcessor.new(logger: spy_logger)
@event_processor.start!
@event_processor.start!

expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Service already started.').once
end

Expand All @@ -62,7 +65,7 @@

@event_processor.process(conversion_event)
# flush interval is set to 100ms. Wait for 300ms and assert that event is dispatched.
sleep 1
sleep 0.2

expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once
expect(@notification_center).to have_received(:send_notifications).with(
Expand All @@ -76,7 +79,7 @@
@event_processor = Optimizely::BatchEventProcessor.new(
event_dispatcher: @event_dispatcher,
batch_size: 11,
flush_interval: 100_000,
flush_interval: 10_000,
logger: spy_logger
)

Expand All @@ -92,8 +95,9 @@
end

# Wait until other thread has processed the event.
until @event_processor.event_queue.empty?; end
until @event_processor.current_batch.empty?; end
sleep 0.1 until @event_processor.event_queue.empty?

sleep 0.1 until @event_processor.current_batch.empty?

expect(Optimizely::EventFactory).to have_received(:create_log_event).with(expected_batch, spy_logger).once
expect(@event_dispatcher).to have_received(:dispatch_event).with(
Expand All @@ -109,7 +113,7 @@
@event_processor = Optimizely::BatchEventProcessor.new(
event_queue: @event_queue,
event_dispatcher: @event_dispatcher,
flush_interval: 100_000,
flush_interval: 10_000,
logger: spy_logger
)

Expand All @@ -120,8 +124,9 @@
@event_processor.flush

# Wait until other thread has processed the event.
until @event_processor.event_queue.empty?; end
until @event_processor.current_batch.empty?; end
sleep 0.1 until @event_processor.event_queue.empty?

sleep 0.1 until @event_processor.current_batch.empty?

expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).twice
expect(@event_processor.event_queue.length).to eq(0)
Expand All @@ -143,13 +148,13 @@
expect(user_event1.event_context[:revision]).to eq('1')
@event_processor.process(user_event1)
# Wait until other thread has processed the event.
while @event_processor.current_batch.length != 1; end
sleep 0.1 while @event_processor.current_batch.length != 1

expect(user_event2.event_context[:revision]).to eq('2')
@event_processor.process(user_event2)
@event_processor.process(user_event2)
# Wait until other thread has processed the event.
while @event_processor.current_batch.length != 2; end
sleep 0.1 while @event_processor.current_batch.length != 2

expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Revisions mismatched: Flushing current batch.').once
Expand All @@ -170,13 +175,13 @@
expect(user_event1.event_context[:project_id]).to eq('X')
@event_processor.process(user_event1)
# Wait until other thread has processed the event.
while @event_processor.current_batch.length != 1; end
sleep 0.1 while @event_processor.current_batch.length != 1

expect(user_event2.event_context[:project_id]).to eq('Y')
@event_processor.process(user_event2)
@event_processor.process(user_event2)
# Wait until other thread has processed the event.
while @event_processor.current_batch.length != 2; end
sleep 0.1 while @event_processor.current_batch.length != 2

expect(@event_dispatcher).to have_received(:dispatch_event).with(log_event).once
expect(spy_logger).to have_received(:log).with(Logger::DEBUG, 'Project Ids mismatched: Flushing current batch.').once
Expand Down Expand Up @@ -252,10 +257,11 @@
@event_processor.process(conversion_event)

# Wait until other thread has processed the event.
while @event_processor.current_batch.length != 1; end
sleep 0.1 while @event_processor.current_batch.length != 1

@event_processor.flush
# Wait until other thread has processed the event.
until @event_processor.current_batch.empty?; end
sleep 0.1 until @event_processor.current_batch.empty?

expect(@notification_center).to have_received(:send_notifications).with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
Expand All @@ -281,10 +287,11 @@

@event_processor.process(conversion_event)
# Wait until other thread has processed the event.
while @event_processor.current_batch.length != 1; end
sleep 0.1 while @event_processor.current_batch.length != 1

@event_processor.flush
# Wait until other thread has processed the event.
until @event_processor.current_batch.empty?; end
sleep 0.1 until @event_processor.current_batch.empty?

expect(@notification_center).not_to have_received(:send_notifications)
expect(spy_logger).to have_received(:log).once.with(
Expand Down Expand Up @@ -315,7 +322,7 @@
end

# Wait until other thread has processed the event.
while @event_processor.current_batch.length != 4; end
sleep 0.1 while @event_processor.current_batch.length != 4
expect(@event_dispatcher).not_to have_received(:dispatch_event)

@event_processor.stop!
Expand All @@ -332,29 +339,28 @@
event_queue: SizedQueue.new(10),
event_dispatcher: @event_dispatcher,
batch_size: 100,
flush_interval: 100_000,
flush_interval: 10_000,
logger: spy_logger
)

user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
11.times do
80.times do
@event_processor.process(user_event)
end

# Wait until other thread has processed the event.
while @event_processor.current_batch.length != 10; end
expect(@event_dispatcher).not_to have_received(:dispatch_event)
expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Payload not accepted by the queue.').once
expect(spy_logger).to have_received(:log).with(Logger::WARN, 'Payload not accepted by the queue: queue full').at_least(:once)
end

it 'should not process and log when Executor is not running' do
@event_processor = Optimizely::BatchEventProcessor.new(
event_dispatcher: @event_dispatcher,
batch_size: 100,
flush_interval: 100_000,
flush_interval: 10_000,
logger: spy_logger
)

@event_processor.start!
@event_processor.stop!

user_event = Optimizely::UserEventFactory.create_conversion_event(project_config, event, 'test_user', nil, nil)
Expand Down
4 changes: 3 additions & 1 deletion spec/event/forwarding_event_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@

forwarding_event_processor.process(@conversion_event)

sleep 0.1

expect(notification_center).to have_received(:send_notifications).with(
Optimizely::NotificationCenter::NOTIFICATION_TYPES[:LOG_EVENT],
Optimizely::Event.new(:post, log_url, @expected_conversion_params, post_headers)
Expand All @@ -104,7 +106,7 @@

forwarding_event_processor.process(@conversion_event)

expect(notification_center).not_to have_received(:send_notifications)
sleep 0.1

expect(spy_logger).to have_received(:log).once.with(
Logger::ERROR,
Expand Down
Loading