-
Notifications
You must be signed in to change notification settings - Fork 35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(eventProcessor): Add EventProcessor and BatchEventProcessor #203
Conversation
Summary ------- - Introduces an EventProcessor interface. - Introduces a BatchEventProcessor Buffering events within a queue before dispatching is an optimization that should prevent SDK implementations from exhausting resources while increasing throughput. This implementation relies on a BlockingCollection to buffer events received from one-to-many producers. A single consumer thread continuously polls from this queue to build a batch before emitting the batched LogEvent. Test plan --------- - Added unit tests.
# Conflicts: # optimizely/event/entity/conversion_event.py # optimizely/event/entity/decision.py # optimizely/event/entity/event_context.py # optimizely/event/entity/impression_event.py
# Conflicts: # optimizely/event/entity/user_event.py # optimizely/event/entity/visitor.py
…/AddBatchEP # Conflicts: # optimizely/event/payload.py
…into mnoman/AddBatchEP
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good mostly, but we need a good amount of documentation.
optimizely/event/event_processor.py
Outdated
|
||
|
||
class EventProcessor(ABC): | ||
""" Class encapsulating event_processor functionality. Override with your own processor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. What is event_processor
here? It does not mean anything.
I feel it should be something like Class encapsulating event processing. Override with your own implementation.
optimizely/event/event_processor.py
Outdated
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) | ||
|
||
|
||
class EventProcessor(ABC): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conventionally (at least in this repo) we should have the word Base
and so I will recommend naming this BaseEventProcessor
.
optimizely/event/event_processor.py
Outdated
from datetime import timedelta | ||
from six.moves import queue | ||
|
||
from .user_event import UserEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. Imports from the same package should happen at last. So, on line 26 after other packages have been imported.
optimizely/event/event_processor.py
Outdated
LOCK = threading.Lock() | ||
|
||
def __init__(self, | ||
event_dispatcher, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation seems off. Everything should line up with the s of self.
optimizely/event/event_processor.py
Outdated
self._is_started = True | ||
|
||
def _run(self): | ||
""" Scheduler method that periodically flushes events queue. """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not entirely true right?
optimizely/event/event_processor.py
Outdated
item = self.event_queue.get(True, 0.05) | ||
|
||
except queue.Empty: | ||
self.logger.debug('Empty queue, sleeping for 50ms.') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. This can be a very loud message happening 20 times per second. I'd recommend not having this message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very close. Some more comments.
optimizely/event/event_processor.py
Outdated
|
||
class BatchEventProcessor(BaseEventProcessor): | ||
""" | ||
BatchEventProcessor is a batched implementation of the BaseEventProcessor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. batched implementation does not imply anything.
I think more accurate will be BatchEventProcessor is an implementation of the BaseEventProcessor that batches events.
optimizely/event/event_processor.py
Outdated
def start(self): | ||
""" Starts the batch processing thread to batch events. """ | ||
if self.is_started: | ||
self.logger.warning('Service already started') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. This is not a very descriptive message. Perhaps say BatchEventProcessor already started
.
optimizely/event/event_processor.py
Outdated
@property | ||
def is_started(self): | ||
""" Property to check if consumer thread is alive or not. """ | ||
return self._is_started |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need _is_started
? Wouldn't checking self.executor
exists and self.executor.isAlive()
do the trick?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, if is_running
is probably a better name for the property.
if len(self._current_batch) >= self.batch_size: | ||
self._flush_queue() | ||
|
||
def _should_split(self, user_event): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. This needs docstring.
except queue.Full: | ||
self.logger.debug('Payload not accepted by the queue. Current size: {}'.format(str(self.event_queue.qsize()))) | ||
|
||
def _add_to_batch(self, user_event): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. This needs docstring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
Summary
Buffering events within a queue before dispatching is an optimization that should prevent SDK implementations from exhausting resources while increasing throughput. This implementation relies on a BlockingCollection to buffer events received from one-to-many producers. A single consumer thread continuously polls from this queue to build a batch before emitting the batched LogEvent.
Test plan