From ec672245a0b6f95f2f48b6505ac728192da1c85a Mon Sep 17 00:00:00 2001 From: mnoman09 Date: Fri, 4 Oct 2019 21:25:35 +0500 Subject: [PATCH 1/2] feat(eventProcessor): Add EventProcessor and BatchEventProcessor (#203) --- optimizely/event/event_processor.py | 269 ++++++++++++++++++++ optimizely/event/log_event.py | 3 + tests/test_event_processor.py | 373 ++++++++++++++++++++++++++++ tox.ini | 2 +- 4 files changed, 646 insertions(+), 1 deletion(-) create mode 100644 optimizely/event/event_processor.py create mode 100644 tests/test_event_processor.py diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py new file mode 100644 index 00000000..db81dbc6 --- /dev/null +++ b/optimizely/event/event_processor.py @@ -0,0 +1,269 @@ +# Copyright 2019 Optimizely +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import threading +import time + +from datetime import timedelta +from six.moves import queue + +from optimizely import logger as _logging +from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher +from optimizely.helpers import validator +from .event_factory import EventFactory +from .user_event import UserEvent + +ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) + + +class BaseEventProcessor(ABC): + """ Class encapsulating event processing. Override with your own implementation. """ + + @abc.abstractmethod + def process(user_event): + """ Method to provide intermediary processing stage within event production. + Args: + user_event: UserEvent instance that needs to be processed and dispatched. + """ + pass + + +class BatchEventProcessor(BaseEventProcessor): + """ + BatchEventProcessor is an implementation of the BaseEventProcessor that batches events. + The BatchEventProcessor maintains a single consumer thread that pulls events off of + the blocking queue and buffers them for either a configured batch size or for a + maximum duration before the resulting LogEvent is sent to the EventDispatcher. + """ + + _DEFAULT_QUEUE_CAPACITY = 1000 + _DEFAULT_BATCH_SIZE = 10 + _DEFAULT_FLUSH_INTERVAL = timedelta(seconds=30) + _DEFAULT_TIMEOUT_INTERVAL = timedelta(seconds=5) + _SHUTDOWN_SIGNAL = object() + _FLUSH_SIGNAL = object() + LOCK = threading.Lock() + + def __init__(self, + event_dispatcher, + logger, + start_on_init=False, + event_queue=None, + batch_size=None, + flush_interval=None, + timeout_interval=None): + """ BatchEventProcessor init method to configure event batching. + Args: + event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. + logger: Provides a log method to log messages. By default nothing would be logged. + start_on_init: Optional boolean param which starts the consumer thread if set to True. + Default value is False. + event_queue: Optional component which accumulates the events until dispacthed. + batch_size: Optional param which defines the upper limit on the number of events in event_queue after which + the event_queue will be flushed. + flush_interval: Optional floating point number representing time interval in seconds after which event_queue will + be flushed. + timeout_interval: Optional floating point number representing time interval in seconds before joining the consumer + thread. + """ + self.event_dispatcher = event_dispatcher or default_event_dispatcher + self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) + self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) + self.batch_size = batch_size if self._validate_intantiation_props(batch_size, 'batch_size') \ + else self._DEFAULT_BATCH_SIZE + self.flush_interval = timedelta(seconds=flush_interval) \ + if self._validate_intantiation_props(flush_interval, 'flush_interval') \ + else self._DEFAULT_FLUSH_INTERVAL + self.timeout_interval = timedelta(seconds=timeout_interval) \ + if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \ + else self._DEFAULT_TIMEOUT_INTERVAL + self._current_batch = list() + + if start_on_init is True: + self.start() + + @property + def is_running(self): + """ Property to check if consumer thread is alive or not. """ + return self.executor.isAlive() + + def _validate_intantiation_props(self, prop, prop_name): + """ Method to determine if instantiation properties like batch_size, flush_interval + and timeout_interval are valid. + + Args: + prop: Property value that needs to be validated. + prop_name: Property name. + + Returns: + False if property value is None or less than 1 or not a finite number. + False if property name is batch_size and value is a floating point number. + True otherwise. + """ + if (prop_name == 'batch_size' and not isinstance(prop, int)) or prop is None or prop < 1 or \ + not validator.is_finite_number(prop): + self.logger.info('Using default value for {}.'.format(prop_name)) + return False + + return True + + def _get_time(self, _time=None): + """ Method to return rounded off time as integer in seconds. If _time is None, uses current time. + + Args: + _time: time in seconds that needs to be rounded off. + + Returns: + Integer time in seconds. + """ + if _time is None: + return int(round(time.time())) + + return int(round(_time)) + + def start(self): + """ Starts the batch processing thread to batch events. """ + if hasattr(self, 'executor') and self.is_running: + self.logger.warning('BatchEventProcessor already started.') + return + + self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds()) + self.executor = threading.Thread(target=self._run) + self.executor.setDaemon(True) + self.executor.start() + + def _run(self): + """ Triggered as part of the thread which batches events or flushes event_queue and sleeps + periodically if queue is empty. + """ + try: + while True: + if self._get_time() > self.flushing_interval_deadline: + self._flush_queue() + + try: + item = self.event_queue.get(True, 0.05) + + except queue.Empty: + time.sleep(0.05) + continue + + if item == self._SHUTDOWN_SIGNAL: + self.logger.debug('Received shutdown signal.') + break + + if item == self._FLUSH_SIGNAL: + self.logger.debug('Received flush signal.') + self._flush_queue() + continue + + if isinstance(item, UserEvent): + self._add_to_batch(item) + + except Exception as exception: + self.logger.error('Uncaught exception processing buffer. Error: ' + str(exception)) + + finally: + self.logger.info('Exiting processing loop. Attempting to flush pending events.') + self._flush_queue() + + def flush(self): + """ Adds flush signal to event_queue. """ + + self.event_queue.put(self._FLUSH_SIGNAL) + + def _flush_queue(self): + """ Flushes event_queue by dispatching events. """ + + if len(self._current_batch) == 0: + return + + with self.LOCK: + to_process_batch = list(self._current_batch) + self._current_batch = list() + + log_event = EventFactory.create_log_event(to_process_batch, self.logger) + + try: + self.event_dispatcher.dispatch_event(log_event) + except Exception as e: + self.logger.error('Error dispatching event: ' + str(log_event) + ' ' + str(e)) + + def process(self, user_event): + """ Method to process the user_event by putting it in event_queue. + Args: + user_event: UserEvent Instance. + """ + if not isinstance(user_event, UserEvent): + self.logger.error('Provided event is in an invalid format.') + return + + self.logger.debug('Received user_event: ' + str(user_event)) + + try: + self.event_queue.put_nowait(user_event) + except queue.Full: + self.logger.debug('Payload not accepted by the queue. Current size: {}'.format(str(self.event_queue.qsize()))) + + def _add_to_batch(self, user_event): + """ Method to append received user event to current batch. + Args: + user_event: UserEvent Instance. + """ + if self._should_split(user_event): + self._flush_queue() + self._current_batch = list() + + # Reset the deadline if starting a new batch. + if len(self._current_batch) == 0: + self.flushing_interval_deadline = self._get_time() + \ + self._get_time(self.flush_interval.total_seconds()) + + with self.LOCK: + self._current_batch.append(user_event) + if len(self._current_batch) >= self.batch_size: + self._flush_queue() + + def _should_split(self, user_event): + """ Method to check if current event batch should split into two. + Args: + user_event: UserEvent Instance. + Return Value: + - True, if revision number and project_id of last event in current batch do not match received event's + revision number and project id respectively. + - False, otherwise. + """ + if len(self._current_batch) == 0: + return False + + current_context = self._current_batch[-1].event_context + new_context = user_event.event_context + + if current_context.revision != new_context.revision: + return True + + if current_context.project_id != new_context.project_id: + return True + + return False + + def stop(self): + """ Stops and disposes batch event processor. """ + self.event_queue.put(self._SHUTDOWN_SIGNAL) + self.logger.warning('Stopping Scheduler.') + + self.executor.join(self.timeout_interval.total_seconds()) + + if self.is_running: + self.logger.error('Timeout exceeded while attempting to close for ' + str(self.timeout_interval) + ' ms.') diff --git a/optimizely/event/log_event.py b/optimizely/event/log_event.py index cf7d2b3d..30839faa 100644 --- a/optimizely/event/log_event.py +++ b/optimizely/event/log_event.py @@ -20,3 +20,6 @@ def __init__(self, url, params, http_verb=None, headers=None): self.params = params self.http_verb = http_verb or 'POST' self.headers = headers + + def __str__(self): + return str(self.__class__) + ": " + str(self.__dict__) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py new file mode 100644 index 00000000..2e6f0442 --- /dev/null +++ b/tests/test_event_processor.py @@ -0,0 +1,373 @@ +# Copyright 2019, Optimizely +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import time +from datetime import timedelta +from six.moves import queue + +from . import base +from optimizely.logger import SimpleLogger +from optimizely.event.payload import Decision, Visitor +from optimizely.event.user_event_factory import UserEventFactory +from optimizely.event.event_processor import BatchEventProcessor + + +class CanonicalEvent(object): + + def __init__(self, experiment_id, variation_id, event_name, visitor_id, attributes, tags): + self._experiment_id = experiment_id + self._variation_id = variation_id + self._event_name = event_name + self._visitor_id = visitor_id + self._attributes = attributes or {} + self._tags = tags or {} + + def __eq__(self, other): + if other is None: + return False + + return self.__dict__ == other.__dict__ + + +class TestEventDispatcher(object): + + IMPRESSION_EVENT_NAME = 'campaign_activated' + + def __init__(self, countdown_event=None): + self.countdown_event = countdown_event + self.expected_events = list() + self.actual_events = list() + + def compare_events(self): + if len(self.expected_events) != len(self.actual_events): + return False + + for index, event in enumerate(self.expected_events): + expected_event = event + actual_event = self.actual_events[index] + + if not expected_event == actual_event: + return False + + return True + + def dispatch_event(self, actual_log_event): + visitors = [] + log_event_params = actual_log_event.params + + if 'visitors' in log_event_params: + + for visitor in log_event_params['visitors']: + visitor_instance = Visitor(**visitor) + visitors.append(visitor_instance) + + if len(visitors) == 0: + return + + for visitor in visitors: + for snapshot in visitor.snapshots: + decisions = snapshot.get('decisions') or [Decision(None, None, None)] + for decision in decisions: + for event in snapshot.get('events'): + attributes = visitor.attributes + + self.actual_events.append(CanonicalEvent(decision.experiment_id, decision.variation_id, + event.get('key'), visitor.visitor_id, attributes, + event.get('event_tags'))) + + def expect_impression(self, experiment_id, variation_id, user_id, attributes=None): + self._expect(experiment_id, variation_id, self.IMPRESSION_EVENT_NAME, user_id, None) + + def expect_conversion(self, event_name, user_id, attributes=None, event_tags=None): + self._expect(None, None, event_name, user_id, attributes, event_tags) + + def _expect(self, experiment_id, variation_id, event_name, visitor_id, attributes, tags): + expected_event = CanonicalEvent(experiment_id, variation_id, event_name, visitor_id, attributes, tags) + self.expected_events.append(expected_event) + + +class BatchEventProcessorTest(base.BaseTest): + + DEFAULT_QUEUE_CAPACITY = 1000 + MAX_BATCH_SIZE = 10 + MAX_DURATION_SEC = 1 + MAX_TIMEOUT_INTERVAL_SEC = 5 + + def setUp(self, *args, **kwargs): + base.BaseTest.setUp(self, 'config_dict_with_multiple_experiments') + self.test_user_id = 'test_user' + self.event_name = 'test_event' + self.event_queue = queue.Queue(maxsize=self.DEFAULT_QUEUE_CAPACITY) + self.optimizely.logger = SimpleLogger() + + def tearDown(self): + self._event_processor.stop() + + def _build_conversion_event(self, event_name, project_config=None): + config = project_config or self.project_config + return UserEventFactory.create_conversion_event(config, event_name, self.test_user_id, {}, {}) + + def _set_event_processor(self, event_dispatcher, logger): + self._event_processor = BatchEventProcessor(event_dispatcher, + logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + self.MAX_DURATION_SEC, + self.MAX_TIMEOUT_INTERVAL_SEC + ) + + def test_drain_on_stop(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name) + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(5) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush_on_max_timeout(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name) + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(3) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush_max_batch_size(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + for i in range(0, self.MAX_BATCH_SIZE): + user_event = self._build_conversion_event(self.event_name) + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(1) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name) + self._event_processor.process(user_event) + self._event_processor.flush() + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + self._event_processor.process(user_event) + self._event_processor.flush() + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(3) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush_on_mismatch_revision(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + self.project_config.revision = 1 + self.project_config.project_id = 'X' + + user_event_1 = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event_1) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + self.project_config.revision = 2 + self.project_config.project_id = 'X' + + user_event_2 = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event_2) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(3) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_flush_on_mismatch_project_id(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + self.project_config.revision = 1 + self.project_config.project_id = 'X' + + user_event_1 = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event_1) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + self.project_config.revision = 1 + self.project_config.project_id = 'Y' + + user_event_2 = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event_2) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(3) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_stop_and_start(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(3) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self._event_processor.stop() + + self._event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + self._event_processor.start() + self.assertStrictTrue(self._event_processor.is_running) + + self._event_processor.stop() + self.assertStrictFalse(self._event_processor.is_running) + + self.assertEqual(0, self._event_processor.event_queue.qsize()) + + def test_init__invalid_batch_size(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + 5.5, + self.MAX_DURATION_SEC, + self.MAX_TIMEOUT_INTERVAL_SEC + ) + + # default batch size is 10. + self.assertEqual(self._event_processor.batch_size, 10) + mock_config_logging.info.assert_called_with('Using default value for batch_size.') + + def test_init__NaN_batch_size(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + 'batch_size', + self.MAX_DURATION_SEC, + self.MAX_TIMEOUT_INTERVAL_SEC + ) + + # default batch size is 10. + self.assertEqual(self._event_processor.batch_size, 10) + mock_config_logging.info.assert_called_with('Using default value for batch_size.') + + def test_init__invalid_flush_interval(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._event_processor = BatchEventProcessor(event_dispatcher, + mock_config_logging, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + 0, + self.MAX_TIMEOUT_INTERVAL_SEC + ) + + # default flush interval is 30s. + self.assertEqual(self._event_processor.flush_interval, timedelta(seconds=30)) + mock_config_logging.info.assert_called_with('Using default value for flush_interval.') + + def test_init__NaN_flush_interval(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + True, + self.MAX_TIMEOUT_INTERVAL_SEC + ) + + # default flush interval is 30s. + self.assertEqual(self._event_processor.flush_interval, timedelta(seconds=30)) + mock_config_logging.info.assert_called_with('Using default value for flush_interval.') + + def test_init__invalid_timeout_interval(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + self.MAX_DURATION_SEC, + -100 + ) + + # default timeout interval is 5s. + self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) + mock_config_logging.info.assert_called_with('Using default value for timeout_interval.') + + def test_init__NaN_timeout_interval(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._event_processor = BatchEventProcessor(event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + self.MAX_DURATION_SEC, + False + ) + + # default timeout interval is 5s. + self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) + mock_config_logging.info.assert_called_with('Using default value for timeout_interval.') diff --git a/tox.ini b/tox.ini index 7fb571f6..2c9c6f1c 100644 --- a/tox.ini +++ b/tox.ini @@ -4,6 +4,6 @@ # E121 - continuation line indentation is not a multiple of four # E127 - continuation line over-indented for visual indent # E722 - do not use bare 'except' -ignore = E111,E114,E121,E127, E722 +ignore = E111,E114,E121,E127,E722 exclude = optimizely/lib/pymmh3.py,*virtualenv* max-line-length = 120 From 3731be4542b9208b7d8872a779bde1f45a913c66 Mon Sep 17 00:00:00 2001 From: Owais Akbani Date: Mon, 7 Oct 2019 10:19:47 +0500 Subject: [PATCH 2/2] feat(notification-center): Add LogEvent notification (#213) Going to merge this and run compat tests on master. --- optimizely/event/event_processor.py | 20 ++++++++++++++-- optimizely/helpers/enums.py | 4 ++++ tests/test_event_processor.py | 36 ++++++++++++++++++++++++++--- tests/test_notification_center.py | 16 +++++++++++++ 4 files changed, 71 insertions(+), 5 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index db81dbc6..823dd3f6 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -19,7 +19,9 @@ from six.moves import queue from optimizely import logger as _logging +from optimizely import notification_center as _notification_center from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher +from optimizely.helpers import enums from optimizely.helpers import validator from .event_factory import EventFactory from .user_event import UserEvent @@ -62,8 +64,10 @@ def __init__(self, event_queue=None, batch_size=None, flush_interval=None, - timeout_interval=None): - """ BatchEventProcessor init method to configure event batching. + timeout_interval=None, + notification_center=None): + """ EventProcessor init method to configure event batching. + Args: event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. logger: Provides a log method to log messages. By default nothing would be logged. @@ -76,6 +80,7 @@ def __init__(self, be flushed. timeout_interval: Optional floating point number representing time interval in seconds before joining the consumer thread. + notification_center: Optional instance of notification_center.NotificationCenter. """ self.event_dispatcher = event_dispatcher or default_event_dispatcher self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) @@ -88,8 +93,13 @@ def __init__(self, self.timeout_interval = timedelta(seconds=timeout_interval) \ if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \ else self._DEFAULT_TIMEOUT_INTERVAL + self.notification_center = notification_center self._current_batch = list() + if not validator.is_notification_center_valid(self.notification_center): + self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) + self.notification_center = _notification_center.NotificationCenter() + if start_on_init is True: self.start() @@ -195,6 +205,12 @@ def _flush_queue(self): log_event = EventFactory.create_log_event(to_process_batch, self.logger) + if self.notification_center is not None: + self.notification_center.send_notifications( + enums.NotificationTypes.LOG_EVENT, + log_event + ) + try: self.event_dispatcher.dispatch_event(log_event) except Exception as e: diff --git a/optimizely/helpers/enums.py b/optimizely/helpers/enums.py index 73ecfe54..893538ca 100644 --- a/optimizely/helpers/enums.py +++ b/optimizely/helpers/enums.py @@ -121,8 +121,12 @@ class NotificationTypes(object): TRACK notification listener has the following parameters: str event_key, str user_id, dict attributes (can be None), event_tags (can be None), Event event + + LOG_EVENT notification listener has the following parameter(s): + LogEvent log_event """ ACTIVATE = 'ACTIVATE:experiment, user_id, attributes, variation, event' DECISION = 'DECISION:type, user_id, attributes, decision_info' OPTIMIZELY_CONFIG_UPDATE = 'OPTIMIZELY_CONFIG_UPDATE' TRACK = 'TRACK:event_key, user_id, attributes, event_tags, event' + LOG_EVENT = 'LOG_EVENT:log_event' diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 2e6f0442..09a758b6 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -17,10 +17,12 @@ from six.moves import queue from . import base -from optimizely.logger import SimpleLogger from optimizely.event.payload import Decision, Visitor -from optimizely.event.user_event_factory import UserEventFactory from optimizely.event.event_processor import BatchEventProcessor +from optimizely.event.log_event import LogEvent +from optimizely.event.user_event_factory import UserEventFactory +from optimizely.helpers import enums +from optimizely.logger import SimpleLogger class CanonicalEvent(object): @@ -110,6 +112,7 @@ def setUp(self, *args, **kwargs): self.event_name = 'test_event' self.event_queue = queue.Queue(maxsize=self.DEFAULT_QUEUE_CAPACITY) self.optimizely.logger = SimpleLogger() + self.notification_center = self.optimizely.notification_center def tearDown(self): self._event_processor.stop() @@ -125,7 +128,8 @@ def _set_event_processor(self, event_dispatcher, logger): self.event_queue, self.MAX_BATCH_SIZE, self.MAX_DURATION_SEC, - self.MAX_TIMEOUT_INTERVAL_SEC + self.MAX_TIMEOUT_INTERVAL_SEC, + self.optimizely.notification_center ) def test_drain_on_stop(self): @@ -371,3 +375,29 @@ def test_init__NaN_timeout_interval(self): # default timeout interval is 5s. self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) mock_config_logging.info.assert_called_with('Using default value for timeout_interval.') + + def test_notification_center__on_log_event(self): + + mock_event_dispatcher = mock.Mock() + callback_hit = [False] + + def on_log_event(log_event): + self.assertStrictTrue(isinstance(log_event, LogEvent)) + callback_hit[0] = True + + self.optimizely.notification_center.add_notification_listener( + enums.NotificationTypes.LOG_EVENT, on_log_event + ) + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(mock_event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name, self.project_config) + self._event_processor.process(user_event) + + self._event_processor.stop() + + self.assertEqual(True, callback_hit[0]) + self.assertEqual(1, len(self.optimizely.notification_center.notification_listeners[ + enums.NotificationTypes.LOG_EVENT + ])) diff --git a/tests/test_notification_center.py b/tests/test_notification_center.py index eec1abe6..4ed8ba0d 100644 --- a/tests/test_notification_center.py +++ b/tests/test_notification_center.py @@ -34,6 +34,10 @@ def on_track_listener(*args): pass +def on_log_event_listener(*args): + pass + + class NotificationCenterTest(unittest.TestCase): def test_add_notification_listener__valid_type(self): @@ -59,6 +63,11 @@ def test_add_notification_listener__valid_type(self): 4, test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener) ) + self.assertEqual( + 5, test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, + on_log_event_listener) + ) + def test_add_notification_listener__multiple_listeners(self): """ Test that multiple listeners of the same type can be successfully added. """ @@ -138,6 +147,7 @@ def another_on_activate_listener(*args): self.assertEqual(2, len(test_notification_center.notification_listeners[enums.NotificationTypes.ACTIVATE])) self.assertEqual(1, len(test_notification_center.notification_listeners[enums.NotificationTypes.DECISION])) self.assertEqual(0, len(test_notification_center.notification_listeners[enums.NotificationTypes.TRACK])) + self.assertEqual(0, len(test_notification_center.notification_listeners[enums.NotificationTypes.LOG_EVENT])) # Remove one of the activate listeners and assert. self.assertTrue(test_notification_center.remove_notification_listener(3)) @@ -164,6 +174,10 @@ def another_on_activate_listener(*args): 3, test_notification_center.add_notification_listener(enums.NotificationTypes.ACTIVATE, another_on_activate_listener) ) + self.assertEqual( + 4, test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, + on_log_event_listener) + ) # Try removing a listener which does not exist. self.assertFalse(test_notification_center.remove_notification_listener(42)) @@ -180,6 +194,7 @@ def test_clear_notification_listeners(self): on_config_update_listener) test_notification_center.add_notification_listener(enums.NotificationTypes.DECISION, on_decision_listener) test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener) + test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, on_log_event_listener) # Assert all listeners are there: for notification_type in notification_center.NOTIFICATION_TYPES: @@ -210,6 +225,7 @@ def test_clear_all_notification_listeners(self): on_config_update_listener) test_notification_center.add_notification_listener(enums.NotificationTypes.DECISION, on_decision_listener) test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener) + test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, on_log_event_listener) # Assert all listeners are there: for notification_type in notification_center.NOTIFICATION_TYPES: