Skip to content

Commit

Permalink
Merge branch 'mnoman/AddBatchEP' of github.com:optimizely/python-sdk …
Browse files Browse the repository at this point in the history
…into mnoman/AddBatchEP
  • Loading branch information
oakbani committed Sep 20, 2019
2 parents b3883d6 + 443b7aa commit 655a859
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 36 deletions.
79 changes: 48 additions & 31 deletions optimizely/event/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,26 @@
from datetime import timedelta
from six.moves import queue

from .user_event import UserEvent
from .event_factory import EventFactory
from optimizely import logger as _logging
from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher
from optimizely.helpers import validator
from .user_event import UserEvent
from .event_factory import EventFactory

ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})


class EventProcessor(ABC):
""" Class encapsulating event_processor functionality. Override with your own processor
providing process method. """
class BaseEventProcessor(ABC):
""" Class encapsulating event processing. Override with your own implementation. """

@abc.abstractmethod
def process(user_event):
pass


class BatchEventProcessor(EventProcessor):
class BatchEventProcessor(BaseEventProcessor):
"""
BatchEventProcessor is a batched implementation of the EventProcessor.
BatchEventProcessor is a batched implementation of the BaseEventProcessor.
The BatchEventProcessor maintains a single consumer thread that pulls events off of
the blocking queue and buffers them for either a configured batch size or for a
maximum duration before the resulting LogEvent is sent to the EventDispatcher.
Expand All @@ -53,21 +52,21 @@ class BatchEventProcessor(EventProcessor):
LOCK = threading.Lock()

def __init__(self,
event_dispatcher,
logger,
start_on_init=False,
event_queue=None,
batch_size=None,
flush_interval=None,
timeout_interval=None):
event_dispatcher,
logger,
start_on_init=False,
event_queue=None,
batch_size=None,
flush_interval=None,
timeout_interval=None):
""" BatchEventProcessor init method to configure event batching.
Args:
event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it.
logger: Provides a log method to log messages. By default nothing would be logged.
start_on_init: Optional boolean param which starts the consumer thread if set to True.
By default thread does not start unless 'start' method is called.
Default value is False.
event_queue: Optional component which accumulates the events until dispacthed.
batch_size: Optional param which defines the upper limit of the number of events in event_queue after which
batch_size: Optional param which defines the upper limit on the number of events in event_queue after which
the event_queue will be flushed.
flush_interval: Optional floating point number representing time interval in seconds after which event_queue will
be flushed.
Expand All @@ -85,7 +84,6 @@ def __init__(self,
self.timeout_interval = timedelta(seconds=timeout_interval) \
if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \
else self._DEFAULT_TIMEOUT_INTERVAL
self._disposed = False
self._is_started = False
self._current_batch = list()

Expand All @@ -94,50 +92,69 @@ def __init__(self,

@property
def is_started(self):
""" Property to check if consumer thread is alive or not. """
return self._is_started

@property
def disposed(self):
return self._disposed

def _validate_intantiation_props(self, prop, prop_name):
""" Method to determine if instantiation properties like batch_size, flush_interval
and timeout_interval are valid.
Args:
prop: Property value that needs to be validated.
prop_name: Property name.
Returns:
False if property value is None or less than 1 or not a finite number.
False if property name is batch_size and value is a floating point number.
True otherwise.
"""
if (prop_name == 'batch_size' and not isinstance(prop, int)) or prop is None or prop < 1 or \
not validator.is_finite_number(prop):
self.logger.info('Using default value for {}.'.format(prop_name))
return False

return True

def _get_time_in_ms(self, _time=None):
def _get_time(self, _time=None):
""" Method to return rounded off time as integer in seconds. If _time is None, uses current time.
Args:
_time: time in seconds that needs to be rounded off.
Returns:
Integer time in seconds.
"""
if _time is None:
return int(round(time.time() * 1000))
return int(round(time.time()))

return int(round(_time * 1000))
return int(round(_time))

def start(self):
if self.is_started and not self.disposed:
""" Starts the batch processing thread to batch events. """
if self.is_started:
self.logger.warning('Service already started')
return

self.flushing_interval_deadline = self._get_time_in_ms() + self._get_time_in_ms(self.flush_interval.total_seconds())
self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds())
self.executor = threading.Thread(target=self._run)
self.executor.setDaemon(True)
self.executor.start()

self._is_started = True

def _run(self):
""" Scheduler method that periodically flushes events queue. """
""" Triggered as part of the thread which batches events or flushes event_queue and sleeps
periodically if queue is empty.
"""
try:
while True:
if self._get_time_in_ms() > self.flushing_interval_deadline:
if self._get_time() > self.flushing_interval_deadline:
self._flush_queue()

try:
item = self.event_queue.get(True, 0.05)

except queue.Empty:
self.logger.debug('Empty queue, sleeping for 50ms.')
time.sleep(0.05)
continue

Expand Down Expand Up @@ -201,8 +218,8 @@ def _add_to_batch(self, user_event):

# Reset the deadline if starting a new batch.
if len(self._current_batch) == 0:
self.flushing_interval_deadline = self._get_time_in_ms() + \
self._get_time_in_ms(self.flush_interval.total_seconds())
self.flushing_interval_deadline = self._get_time() + \
self._get_time(self.flush_interval.total_seconds())

with self.LOCK:
self._current_batch.append(user_event)
Expand Down
10 changes: 5 additions & 5 deletions tests/test_event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def test_flush_on_max_timeout(self):
self._event_processor.process(user_event)
event_dispatcher.expect_conversion(self.event_name, self.test_user_id)

time.sleep(1.5)
time.sleep(3)

self.assertStrictTrue(event_dispatcher.compare_events())
self.assertEqual(0, self._event_processor.event_queue.qsize())
Expand Down Expand Up @@ -189,7 +189,7 @@ def test_flush(self):
self._event_processor.flush()
event_dispatcher.expect_conversion(self.event_name, self.test_user_id)

time.sleep(1.5)
time.sleep(3)

self.assertStrictTrue(event_dispatcher.compare_events())
self.assertEqual(0, self._event_processor.event_queue.qsize())
Expand All @@ -214,7 +214,7 @@ def test_flush_on_mismatch_revision(self):
self._event_processor.process(user_event_2)
event_dispatcher.expect_conversion(self.event_name, self.test_user_id)

time.sleep(1.5)
time.sleep(3)

self.assertStrictTrue(event_dispatcher.compare_events())
self.assertEqual(0, self._event_processor.event_queue.qsize())
Expand All @@ -239,7 +239,7 @@ def test_flush_on_mismatch_project_id(self):
self._event_processor.process(user_event_2)
event_dispatcher.expect_conversion(self.event_name, self.test_user_id)

time.sleep(1.5)
time.sleep(3)

self.assertStrictTrue(event_dispatcher.compare_events())
self.assertEqual(0, self._event_processor.event_queue.qsize())
Expand All @@ -254,7 +254,7 @@ def test_stop_and_start(self):
self._event_processor.process(user_event)
event_dispatcher.expect_conversion(self.event_name, self.test_user_id)

time.sleep(1.5)
time.sleep(3)

self.assertStrictTrue(event_dispatcher.compare_events())
self._event_processor.stop()
Expand Down

0 comments on commit 655a859

Please sign in to comment.