Skip to content

Commit

Permalink
rename flush_queue to flush_batch and update debug messages. fix one …
Browse files Browse the repository at this point in the history
…bug where current_batch was being reset without a lock.
  • Loading branch information
thomaszurkan-optimizely committed Dec 16, 2019
1 parent 027f277 commit ee08eb8
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 10 deletions.
17 changes: 8 additions & 9 deletions optimizely/event/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ def _run(self):
try:
while True:
if self._get_time() >= self.flushing_interval_deadline:
self._flush_queue()
self._flush_batch()
self.flushing_interval_deadline = self._get_time() + \
self._get_time(self.flush_interval.total_seconds())
self.logger.debug('Flush interval deadline. Flushed queue.')
self.logger.debug('Flush interval deadline. Flushed batch.')

try:
interval = self.flushing_interval_deadline - self._get_time()
Expand All @@ -202,7 +202,7 @@ def _run(self):

if item == self._FLUSH_SIGNAL:
self.logger.debug('Received flush signal.')
self._flush_queue()
self._flush_batch()
continue

if isinstance(item, UserEvent):
Expand All @@ -213,14 +213,14 @@ def _run(self):

finally:
self.logger.info('Exiting processing loop. Attempting to flush pending events.')
self._flush_queue()
self._flush_batch()

def flush(self):
""" Adds flush signal to event_queue. """

self.event_queue.put(self._FLUSH_SIGNAL)

def _flush_queue(self):
def _flush_batch(self):
""" Flushes event_queue by dispatching events. """
batch_len = len(self._current_batch)
if batch_len == 0:
Expand Down Expand Up @@ -270,9 +270,8 @@ def _add_to_batch(self, user_event):
user_event: UserEvent Instance.
"""
if self._should_split(user_event):
self.logger.debug('Flush on split.')
self._flush_queue()
self._current_batch = list()
self.logger.debug('Flushing batch on split.')
self._flush_batch()

# Reset the deadline if starting a new batch.
if len(self._current_batch) == 0:
Expand All @@ -282,7 +281,7 @@ def _add_to_batch(self, user_event):
self._current_batch.append(user_event)
if len(self._current_batch) >= self.batch_size:
self.logger.debug('Flushing on batch size.')
self._flush_queue()
self._flush_batch()

def _should_split(self, user_event):
""" Method to check if current event batch should split into two.
Expand Down
2 changes: 1 addition & 1 deletion tests/test_event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ def test_flush_once_max_timeout(self):
self.assertEqual(0, self.event_processor.event_queue.qsize())
self.assertTrue(mock_config_logging.debug.called)
mock_config_logging.debug.assert_any_call('Received event of type ConversionEvent for user test_user.')
mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed queue.')
mock_config_logging.debug.assert_any_call('Flushing batch size 1')
mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed batch.')
self.assertTrue(mock_config_logging.debug.call_count == 3)
self.optimizely.logger = SimpleLogger()

Expand Down

0 comments on commit ee08eb8

Please sign in to comment.