Skip to content

Commit

Permalink
Merge branch 'mnoman/AddBatchEP' into mnoman/log_event_notification
Browse files Browse the repository at this point in the history
# Conflicts:
#	optimizely/event/event_processor.py
  • Loading branch information
mariamjamal94 committed Sep 24, 2019
2 parents 3b2cb4a + 01a8ea1 commit 3baf024
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 21 deletions.
4 changes: 2 additions & 2 deletions optimizely/event/event_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
from optimizely.helpers import enums
from optimizely.helpers import event_tag_utils
from optimizely.helpers import validator
from . import user_event
from . import payload
from . import log_event
from . import payload
from . import user_event

CUSTOM_ATTRIBUTE_FEATURE_TYPE = 'custom'

Expand Down
32 changes: 17 additions & 15 deletions optimizely/event/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher
from optimizely.helpers import enums
from optimizely.helpers import validator
from .user_event import UserEvent
from .event_factory import EventFactory
from .user_event import UserEvent

ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})

Expand All @@ -33,12 +33,16 @@ class BaseEventProcessor(ABC):

@abc.abstractmethod
def process(user_event):
""" Method to provide intermediary processing stage within event production.
Args:
user_event: UserEvent instance that needs to be processed and dispatched.
"""
pass


class BatchEventProcessor(BaseEventProcessor):
"""
BatchEventProcessor is a batched implementation of the BaseEventProcessor.
BatchEventProcessor is an implementation of the BaseEventProcessor that batches events.
The BatchEventProcessor maintains a single consumer thread that pulls events off of
the blocking queue and buffers them for either a configured batch size or for a
maximum duration before the resulting LogEvent is sent to the EventDispatcher.
Expand Down Expand Up @@ -89,17 +93,15 @@ def __init__(self,
if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \
else self._DEFAULT_TIMEOUT_INTERVAL
self.notification_center = notification_center

self._is_started = False
self._current_batch = list()

if start_on_init is True:
self.start()

@property
def is_started(self):
def is_running(self):
""" Property to check if consumer thread is alive or not. """
return self._is_started
return self.executor.isAlive()

def _validate_intantiation_props(self, prop, prop_name):
""" Method to determine if instantiation properties like batch_size, flush_interval
Expand Down Expand Up @@ -137,17 +139,15 @@ def _get_time(self, _time=None):

def start(self):
""" Starts the batch processing thread to batch events. """
if self.is_started:
self.logger.warning('Service already started')
if hasattr(self, 'executor') and self.is_running:
self.logger.warning('BatchEventProcessor already started.')
return

self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds())
self.executor = threading.Thread(target=self._run)
self.executor.setDaemon(True)
self.executor.start()

self._is_started = True

def _run(self):
""" Triggered as part of the thread which batches events or flushes event_queue and sleeps
periodically if queue is empty.
Expand Down Expand Up @@ -212,6 +212,10 @@ def _flush_queue(self):
self.logger.error('Error dispatching event: ' + str(log_event) + ' ' + str(e))

def process(self, user_event):
""" Method to process the user_event by putting it in event_queue.
Args:
user_event: UserEvent Instance.
"""
if not isinstance(user_event, UserEvent):
self.logger.error('Provided event is in an invalid format.')
return
Expand Down Expand Up @@ -255,12 +259,10 @@ def _should_split(self, user_event):

def stop(self):
""" Stops and disposes batch event processor. """

self.event_queue.put(self._SHUTDOWN_SIGNAL)
self.logger.warning('Stopping Scheduler.')

self.executor.join(self.timeout_interval.total_seconds())

if self.executor.isAlive():
if self.is_running:
self.logger.error('Timeout exceeded while attempting to close for ' + str(self.timeout_interval) + ' ms.')

self.logger.warning('Stopping Scheduler.')
self._is_started = False
3 changes: 1 addition & 2 deletions tests/test_config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ def test_fetch_datafile(self, _):

def test_is_running(self, _):
""" Test that polling thread is running after instance of PollingConfigManager is created. """
with mock.patch('optimizely.config_manager.PollingConfigManager.fetch_datafile') as mock_fetch_datafile:
with mock.patch('optimizely.config_manager.PollingConfigManager.fetch_datafile'):
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')
self.assertTrue(project_config_manager.is_running)
mock_fetch_datafile.assert_called_with()
4 changes: 2 additions & 2 deletions tests/test_event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ def test_stop_and_start(self):
event_dispatcher.expect_conversion(self.event_name, self.test_user_id)

self._event_processor.start()
self.assertStrictTrue(self._event_processor.is_started)
self.assertStrictTrue(self._event_processor.is_running)

self._event_processor.stop()
self.assertStrictFalse(self._event_processor.is_started)
self.assertStrictFalse(self._event_processor.is_running)

self.assertEqual(0, self._event_processor.event_queue.qsize())

Expand Down

0 comments on commit 3baf024

Please sign in to comment.