diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py new file mode 100644 index 00000000..db81dbc6 --- /dev/null +++ b/optimizely/event/event_processor.py @@ -0,0 +1,269 @@ +# Copyright 2019 Optimizely +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import threading +import time + +from datetime import timedelta +from six.moves import queue + +from optimizely import logger as _logging +from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher +from optimizely.helpers import validator +from .event_factory import EventFactory +from .user_event import UserEvent + +ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) + + +class BaseEventProcessor(ABC): + """ Class encapsulating event processing. Override with your own implementation. """ + + @abc.abstractmethod + def process(user_event): + """ Method to provide intermediary processing stage within event production. + Args: + user_event: UserEvent instance that needs to be processed and dispatched. + """ + pass + + +class BatchEventProcessor(BaseEventProcessor): + """ + BatchEventProcessor is an implementation of the BaseEventProcessor that batches events. + The BatchEventProcessor maintains a single consumer thread that pulls events off of + the blocking queue and buffers them for either a configured batch size or for a + maximum duration before the resulting LogEvent is sent to the EventDispatcher. + """ + + _DEFAULT_QUEUE_CAPACITY = 1000 + _DEFAULT_BATCH_SIZE = 10 + _DEFAULT_FLUSH_INTERVAL = timedelta(seconds=30) + _DEFAULT_TIMEOUT_INTERVAL = timedelta(seconds=5) + _SHUTDOWN_SIGNAL = object() + _FLUSH_SIGNAL = object() + LOCK = threading.Lock() + + def __init__(self, + event_dispatcher, + logger, + start_on_init=False, + event_queue=None, + batch_size=None, + flush_interval=None, + timeout_interval=None): + """ BatchEventProcessor init method to configure event batching. + Args: + event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. + logger: Provides a log method to log messages. By default nothing would be logged. + start_on_init: Optional boolean param which starts the consumer thread if set to True. + Default value is False. + event_queue: Optional component which accumulates the events until dispacthed. + batch_size: Optional param which defines the upper limit on the number of events in event_queue after which + the event_queue will be flushed. + flush_interval: Optional floating point number representing time interval in seconds after which event_queue will + be flushed. + timeout_interval: Optional floating point number representing time interval in seconds before joining the consumer + thread. + """ + self.event_dispatcher = event_dispatcher or default_event_dispatcher + self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) + self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) + self.batch_size = batch_size if self._validate_intantiation_props(batch_size, 'batch_size') \ + else self._DEFAULT_BATCH_SIZE + self.flush_interval = timedelta(seconds=flush_interval) \ + if self._validate_intantiation_props(flush_interval, 'flush_interval') \ + else self._DEFAULT_FLUSH_INTERVAL + self.timeout_interval = timedelta(seconds=timeout_interval) \ + if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \ + else self._DEFAULT_TIMEOUT_INTERVAL + self._current_batch = list() + + if start_on_init is True: + self.start() + + @property + def is_running(self): + """ Property to check if consumer thread is alive or not. """ + return self.executor.isAlive() + + def _validate_intantiation_props(self, prop, prop_name): + """ Method to determine if instantiation properties like batch_size, flush_interval + and timeout_interval are valid. + + Args: + prop: Property value that needs to be validated. + prop_name: Property name. + + Returns: + False if property value is None or less than 1 or not a finite number. + False if property name is batch_size and value is a floating point number. + True otherwise. + """ + if (prop_name == 'batch_size' and not isinstance(prop, int)) or prop is None or prop < 1 or \ + not validator.is_finite_number(prop): + self.logger.info('Using default value for {}.'.format(prop_name)) + return False + + return True + + def _get_time(self, _time=None): + """ Method to return rounded off time as integer in seconds. If _time is None, uses current time. + + Args: + _time: time in seconds that needs to be rounded off. + + Returns: + Integer time in seconds. + """ + if _time is None: + return int(round(time.time())) + + return int(round(_time)) + + def start(self): + """ Starts the batch processing thread to batch events. """ + if hasattr(self, 'executor') and self.is_running: + self.logger.warning('BatchEventProcessor already started.') + return + + self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) + self.executor = threading.Thread(target=self._run) + self.executor.setDaemon(True) + self.executor.start() + + def _run(self): + """ Triggered as part of the thread which batches events or flushes event_queue and sleeps + periodically if queue is empty. + """ + try: + while True: + if self._get_time() > self.flushing_interval_deadline: + self._flush_queue() + + try: + item = self.event_queue.get(True, 0.05) + + except queue.Empty: + time.sleep(0.05) + continue + + if item == self._SHUTDOWN_SIGNAL: + self.logger.debug('Received shutdown signal.') + break + + if item == self._FLUSH_SIGNAL: + self.logger.debug('Received flush signal.') + self._flush_queue() + continue + + if isinstance(item, UserEvent): + self._add_to_batch(item) + + except Exception as exception: + self.logger.error('Uncaught exception processing buffer. Error: ' + str(exception)) + + finally: + self.logger.info('Exiting processing loop. Attempting to flush pending events.') + self._flush_queue() + + def flush(self): + """ Adds flush signal to event_queue. """ + + self.event_queue.put(self._FLUSH_SIGNAL) + + def _flush_queue(self): + """ Flushes event_queue by dispatching events. """ + + if len(self._current_batch) == 0: + return + + with self.LOCK: + to_process_batch = list(self._current_batch) + self._current_batch = list() + + log_event = EventFactory.create_log_event(to_process_batch, self.logger) + + try: + self.event_dispatcher.dispatch_event(log_event) + except Exception as e: + self.logger.error('Error dispatching event: ' + str(log_event) + ' ' + str(e)) + + def process(self, user_event): + """ Method to process the user_event by putting it in event_queue. + Args: + user_event: UserEvent Instance. + """ + if not isinstance(user_event, UserEvent): + self.logger.error('Provided event is in an invalid format.') + return + + self.logger.debug('Received user_event: ' + str(user_event)) + + try: + self.event_queue.put_nowait(user_event) + except queue.Full: + self.logger.debug('Payload not accepted by the queue. Current size: {}'.format(str(self.event_queue.qsize()))) + + def _add_to_batch(self, user_event): + """ Method to append received user event to current batch. + Args: + user_event: UserEvent Instance. + """ + if self._should_split(user_event): + self._flush_queue() + self._current_batch = list() + + # Reset the deadline if starting a new batch. + if len(self._current_batch) == 0: + self.flushing_interval_deadline = self._get_time() + \ + self._get_time(self.flush_interval.total_seconds()) + + with self.LOCK: + self._current_batch.append(user_event) + if len(self._current_batch) >= self.batch_size: + self._flush_queue() + + def _should_split(self, user_event): + """ Method to check if current event batch should split into two. + Args: + user_event: UserEvent Instance. + Return Value: + - True, if revision number and project_id of last event in current batch do not match received event's + revision number and project id respectively. + - False, otherwise. + """ + if len(self._current_batch) == 0: + return False + + current_context = self._current_batch[-1].event_context + new_context = user_event.event_context + + if current_context.revision != new_context.revision: + return True + + if current_context.project_id != new_context.project_id: + return True + + return False + + def stop(self): + """ Stops and disposes batch event processor. """ + self.event_queue.put(self._SHUTDOWN_SIGNAL) + self.logger.warning('Stopping Scheduler.') + + self.executor.join(self.timeout_interval.total_seconds()) + + if self.is_running: + self.logger.error('Timeout exceeded while attempting to close for ' + str(self.timeout_interval) + ' ms.') diff --git a/optimizely/event/log_event.py b/optimizely/event/log_event.py index cf7d2b3d..30839faa 100644 --- a/optimizely/event/log_event.py +++ b/optimizely/event/log_event.py @@ -20,3 +20,6 @@ def __init__(self, url, params, http_verb=None, headers=None): self.params = params self.http_verb = http_verb or 'POST' self.headers = headers + + def __str__(self): + return str(self.__class__) + ": " + str(self.__dict__) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py new file mode 100644 index 00000000..2e6f0442 --- /dev/null +++ b/tests/test_event_processor.py @@ -0,0 +1,373 @@ +# Copyright 2019, Optimizely +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import time +from datetime import timedelta +from six.moves import queue + +from . import base +from optimizely.logger import SimpleLogger +from optimizely.event.payload import Decision, Visitor +from optimizely.event.user_event_factory import UserEventFactory +from optimizely.event.event_processor import BatchEventProcessor + + +class CanonicalEvent(object): + + def __init__(self, experiment_id, variation_id, event_name, visitor_id, attributes, tags): + self._experiment_id = experiment_id + self._variation_id = variation_id + self._event_name = event_name + self._visitor_id = visitor_id + self._attributes = attributes or {} + self._tags = tags or {} + + def __eq__(self, other): + if other is None: + return False + + return self.__dict__ == other.__dict__ + + +class TestEventDispatcher(object): + + IMPRESSION_EVENT_NAME = 'campaign_activated' + + def __init__(self, countdown_event=None): + self.countdown_event = countdown_event + self.expected_events = list() + self.actual_events = list() + + def compare_events(self): + if len(self.expected_events) != len(self.actual_events): + return False + + for index, event in enumerate(self.expected_events): + expected_event = event + actual_event = self.actual_events[index] + + if not expected_event == actual_event: + return False + + return True + + def dispatch_event(self, actual_log_event): + visitors = [] + log_event_params = actual_log_event.params + + if 'visitors' in log_event_params: + + for visitor in log_event_params['visitors']: + visitor_instance = Visitor(**visitor) + visitors.append(visitor_instance) + + if len(visitors) == 0: + return + + for visitor in visitors: + for snapshot in visitor.snapshots: + decisions = snapshot.get('decisions') or [Decision(None, None, None)] + for decision in decisions: + for event in snapshot.get('events'): + attributes = visitor.attributes + + self.actual_events.append(CanonicalEvent(decision.experiment_id, decision.variation_id, + event.get('key'), visitor.visitor_id, attributes, + event.get('event_tags'))) + + def expect_impression(self, experiment_id, variation_id, user_id, attributes=None): + self._expect(experiment_id, variation_id, self.IMPRESSION_EVENT_NAME, user_id, None) + + def expect_conversion(self, event_name, user_id, attributes=None, event_tags=None): + self._expect(None, None, event_name, user_id, attributes, event_tags) + + def _expect(self, experiment_id, variation_id, event_name, visitor_id, attributes, tags): + expected_event = CanonicalEvent(experiment_id, variation_id, event_name, visitor_id, attributes, tags) + self.expected_events.append(expected_event) + + +class BatchEventProcessorTest(base.BaseTest): + + DEFAULT_QUEUE_CAPACITY = 1000 + MAX_BATCH_SIZE = 10 + MAX_DURATION_SEC = 1 + MAX_TIMEOUT_INTERVAL_SEC = 5 + + def setUp(self, *args, **kwargs): + base.BaseTest.setUp(self, 'config_dict_with_multiple_experiments') + self.test_user_id = 'test_user' + self.event_name = 'test_event' + self.event_queue = queue.Queue(maxsize=self.DEFAULT_QUEUE_CAPACITY) + self.optimizely.logger = SimpleLogger() + + def tearDown(self): + self._event_processor.stop() + + def _build_conversion_event(self, event_name, project_config=None): + config = project_config or self.project_config + return UserEventFactory.create_conversion_event(config, event_name, self.test_user_id, {}, {}) + + def _set_event_processor(self, event_dispatcher, logger): + self._event_processor = BatchEventProcessor(event_dispatcher, + logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + self.MAX_DURATION_SEC, + self.MAX_TIMEOUT_INTERVAL_SEC + ) + + def test_drain_on_stop(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name) + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(5) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush_on_max_timeout(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name) + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(3) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush_max_batch_size(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + for i in range(0, self.MAX_BATCH_SIZE): + user_event = self._build_conversion_event(self.event_name) + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(1) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name) + self._event_processor.process(user_event) + self._event_processor.flush() + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + self._event_processor.process(user_event) + self._event_processor.flush() + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(3) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush_on_mismatch_revision(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + self.project_config.revision = 1 + self.project_config.project_id = 'X' + + user_event_1 = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event_1) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + self.project_config.revision = 2 + self.project_config.project_id = 'X' + + user_event_2 = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event_2) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(3) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush_on_mismatch_project_id(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + self.project_config.revision = 1 + self.project_config.project_id = 'X' + + user_event_1 = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event_1) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + self.project_config.revision = 1 + self.project_config.project_id = 'Y' + + user_event_2 = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event_2) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(3) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_stop_and_start(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(3) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self._event_processor.stop() + + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + self._event_processor.start() + self.assertStrictTrue(self._event_processor.is_running) + + self._event_processor.stop() + self.assertStrictFalse(self._event_processor.is_running) + + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_init__invalid_batch_size(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + 5.5, + self.MAX_DURATION_SEC, + self.MAX_TIMEOUT_INTERVAL_SEC + ) + + # default batch size is 10. + self.assertEqual(self._event_processor.batch_size, 10) + mock_config_logging.info.assert_called_with('Using default value for batch_size.') + + def test_init__NaN_batch_size(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + 'batch_size', + self.MAX_DURATION_SEC, + self.MAX_TIMEOUT_INTERVAL_SEC + ) + + # default batch size is 10. + self.assertEqual(self._event_processor.batch_size, 10) + mock_config_logging.info.assert_called_with('Using default value for batch_size.') + + def test_init__invalid_flush_interval(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._event_processor = BatchEventProcessor(event_dispatcher, + mock_config_logging, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + 0, + self.MAX_TIMEOUT_INTERVAL_SEC + ) + + # default flush interval is 30s. + self.assertEqual(self._event_processor.flush_interval, timedelta(seconds=30)) + mock_config_logging.info.assert_called_with('Using default value for flush_interval.') + + def test_init__NaN_flush_interval(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + True, + self.MAX_TIMEOUT_INTERVAL_SEC + ) + + # default flush interval is 30s. + self.assertEqual(self._event_processor.flush_interval, timedelta(seconds=30)) + mock_config_logging.info.assert_called_with('Using default value for flush_interval.') + + def test_init__invalid_timeout_interval(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + self.MAX_DURATION_SEC, + -100 + ) + + # default timeout interval is 5s. + self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) + mock_config_logging.info.assert_called_with('Using default value for timeout_interval.') + + def test_init__NaN_timeout_interval(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + self.MAX_DURATION_SEC, + False + ) + + # default timeout interval is 5s. + self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) + mock_config_logging.info.assert_called_with('Using default value for timeout_interval.') diff --git a/tox.ini b/tox.ini index 7fb571f6..2c9c6f1c 100644 --- a/tox.ini +++ b/tox.ini @@ -4,6 +4,6 @@ # E121 - continuation line indentation is not a multiple of four # E127 - continuation line over-indented for visual indent # E722 - do not use bare 'except' -ignore = E111,E114,E121,E127, E722 +ignore = E111,E114,E121,E127,E722 exclude = optimizely/lib/pymmh3.py,*virtualenv* max-line-length = 120