From 98d53f2cc849ee8c00555e8707175f9bdfe0bd52 Mon Sep 17 00:00:00 2001 From: aliabbasrizvi Date: Wed, 23 Oct 2019 21:51:45 -0700 Subject: [PATCH 1/6] Miscellaneous fixes --- optimizely/event/event_processor.py | 74 ++++++---- optimizely/optimizely.py | 9 +- tests/test_event_processor.py | 215 ++++++++++++++-------------- 3 files changed, 162 insertions(+), 136 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index f00f78a2..000d5923 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -34,7 +34,7 @@ class BaseEventProcessor(ABC): """ Class encapsulating event processing. Override with your own implementation. """ @abc.abstractmethod - def process(user_event): + def process(self, user_event): """ Method to provide intermediary processing stage within event production. Args: user_event: UserEvent instance that needs to be processed and dispatched. @@ -52,26 +52,26 @@ class BatchEventProcessor(BaseEventProcessor): _DEFAULT_QUEUE_CAPACITY = 1000 _DEFAULT_BATCH_SIZE = 10 - _DEFAULT_FLUSH_INTERVAL = timedelta(seconds=30) - _DEFAULT_TIMEOUT_INTERVAL = timedelta(seconds=5) + _DEFAULT_FLUSH_INTERVAL = 30 + _DEFAULT_TIMEOUT_INTERVAL = 5 _SHUTDOWN_SIGNAL = object() _FLUSH_SIGNAL = object() LOCK = threading.Lock() def __init__(self, event_dispatcher, - logger, + logger=None, start_on_init=False, event_queue=None, batch_size=None, flush_interval=None, timeout_interval=None, notification_center=None): - """ EventProcessor init method to configure event batching. + """ BatchEventProcessor init method to configure event batching. Args: event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it. - logger: Provides a log method to log messages. By default nothing would be logged. + logger: Optional component which provides a log method to log messages. By default nothing would be logged. start_on_init: Optional boolean param which starts the consumer thread if set to True. Default value is False. event_queue: Optional component which accumulates the events until dispacthed. @@ -86,20 +86,28 @@ def __init__(self, self.event_dispatcher = event_dispatcher or default_event_dispatcher self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) - self.batch_size = batch_size if self._validate_intantiation_props(batch_size, 'batch_size') \ + self.batch_size = batch_size if self._validate_intantiation_props(batch_size, + 'batch_size', + self._DEFAULT_BATCH_SIZE) \ else self._DEFAULT_BATCH_SIZE self.flush_interval = timedelta(seconds=flush_interval) \ - if self._validate_intantiation_props(flush_interval, 'flush_interval') \ - else self._DEFAULT_FLUSH_INTERVAL + if self._validate_intantiation_props(flush_interval, + 'flush_interval', + self._DEFAULT_FLUSH_INTERVAL) \ + else timedelta(self._DEFAULT_FLUSH_INTERVAL) self.timeout_interval = timedelta(seconds=timeout_interval) \ - if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \ - else self._DEFAULT_TIMEOUT_INTERVAL - self.notification_center = notification_center + if self._validate_intantiation_props(timeout_interval, + 'timeout_interval', + self._DEFAULT_TIMEOUT_INTERVAL) \ + else timedelta(self._DEFAULT_TIMEOUT_INTERVAL) + + self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger) self._current_batch = list() if not validator.is_notification_center_valid(self.notification_center): self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) - self.notification_center = _notification_center.NotificationCenter() + self.logger.debug('Creating notification center for use.') + self.notification_center = _notification_center.NotificationCenter(self.logger) self.executor = None if start_on_init is True: @@ -110,13 +118,14 @@ def is_running(self): """ Property to check if consumer thread is alive or not. """ return self.executor.isAlive() if self.executor else False - def _validate_intantiation_props(self, prop, prop_name): + def _validate_intantiation_props(self, prop, prop_name, default_value): """ Method to determine if instantiation properties like batch_size, flush_interval and timeout_interval are valid. Args: prop: Property value that needs to be validated. prop_name: Property name. + default_value: Default value for property. Returns: False if property value is None or less than or equal to 0 or not a finite number. @@ -132,7 +141,7 @@ def _validate_intantiation_props(self, prop, prop_name): is_valid = False if is_valid is False: - self.logger.info('Using default value for {}.'.format(prop_name)) + self.logger.info('Using default value {} for {}.'.format(default_value, prop_name)) return is_valid @@ -213,11 +222,10 @@ def _flush_queue(self): log_event = EventFactory.create_log_event(to_process_batch, self.logger) - if self.notification_center is not None: - self.notification_center.send_notifications( - enums.NotificationTypes.LOG_EVENT, - log_event - ) + self.notification_center.send_notifications( + enums.NotificationTypes.LOG_EVENT, + log_event + ) try: self.event_dispatcher.dispatch_event(log_event) @@ -226,6 +234,7 @@ def _flush_queue(self): def process(self, user_event): """ Method to process the user_event by putting it in event_queue. + Args: user_event: UserEvent Instance. """ @@ -233,7 +242,9 @@ def process(self, user_event): self.logger.error('Provided event is in an invalid format.') return - self.logger.debug('Received user_event: ' + str(user_event)) + self.logger.debug('Received event of type {} for user {}.'.format( + type(user_event).__name__, user_event.user_id) + ) try: self.event_queue.put_nowait(user_event) @@ -242,6 +253,7 @@ def process(self, user_event): def _add_to_batch(self, user_event): """ Method to append received user event to current batch. + Args: user_event: UserEvent Instance. """ @@ -261,9 +273,11 @@ def _add_to_batch(self, user_event): def _should_split(self, user_event): """ Method to check if current event batch should split into two. + Args: user_event: UserEvent Instance. - Return Value: + + Returns: - True, if revision number and project_id of last event in current batch do not match received event's revision number and project id respectively. - False, otherwise. @@ -311,7 +325,7 @@ def __init__(self, event_dispatcher, logger=None, notification_center=None): """ self.event_dispatcher = event_dispatcher self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) - self.notification_center = notification_center + self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger) if not validator.is_notification_center_valid(self.notification_center): self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center')) @@ -319,6 +333,7 @@ def __init__(self, event_dispatcher, logger=None, notification_center=None): def process(self, user_event): """ Method to process the user_event by dispatching it. + Args: user_event: UserEvent Instance. """ @@ -326,15 +341,16 @@ def process(self, user_event): self.logger.error('Provided event is in an invalid format.') return - self.logger.debug('Received user_event: ' + str(user_event)) + self.logger.debug('Received event of type {} for user {}.'.format( + type(user_event).__name__, user_event.user_id) + ) log_event = EventFactory.create_log_event(user_event, self.logger) - if self.notification_center is not None: - self.notification_center.send_notifications( - enums.NotificationTypes.LOG_EVENT, - log_event - ) + self.notification_center.send_notifications( + enums.NotificationTypes.LOG_EVENT, + log_event + ) try: self.event_dispatcher.dispatch_event(log_event) diff --git a/optimizely/optimizely.py b/optimizely/optimizely.py index fba5c5a6..a7a860ab 100644 --- a/optimizely/optimizely.py +++ b/optimizely/optimizely.py @@ -58,7 +58,10 @@ def __init__(self, notification_center: Optional instance of notification_center.NotificationCenter. Useful when providing own config_manager.BaseConfigManager implementation which can be using the same NotificationCenter instance. - event_processor: Processes the given event(s) by creating LogEvent(s) and then dispatching it. + event_processor: Optional component which processes the given event(s). + By default optimizely.event.event_processor.ForwardingEventProcessor is used + which simply forwards events to the event dispatcher. + To enable event batching configure and use optimizely.event.event_processor.BatchEventProcessor. """ self.logger_name = '.'.join([__name__, self.__class__.__name__]) self.is_valid = True @@ -68,8 +71,8 @@ def __init__(self, self.config_manager = config_manager self.notification_center = notification_center or NotificationCenter(self.logger) self.event_processor = event_processor or ForwardingEventProcessor(self.event_dispatcher, - self.logger, - self.notification_center) + logger=self.logger, + notification_center=self.notification_center) try: self._validate_instantiation_options() diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index 65ca1080..e061d5f7 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -11,12 +11,11 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import mock import time -from datetime import timedelta from six.moves import queue -from . import base from optimizely.event.payload import Decision, Visitor from optimizely.event.event_processor import BatchEventProcessor, ForwardingEventProcessor from optimizely.event.event_factory import EventFactory @@ -24,6 +23,7 @@ from optimizely.event.user_event_factory import UserEventFactory from optimizely.helpers import enums from optimizely.logger import SimpleLogger +from . import base class CanonicalEvent(object): @@ -116,14 +116,14 @@ def setUp(self, *args, **kwargs): self.notification_center = self.optimizely.notification_center def tearDown(self): - self._event_processor.stop() + self.event_processor.stop() def _build_conversion_event(self, event_name, project_config=None): config = project_config or self.project_config return UserEventFactory.create_conversion_event(config, event_name, self.test_user_id, {}, {}) def _set_event_processor(self, event_dispatcher, logger): - self._event_processor = BatchEventProcessor(event_dispatcher, + self.event_processor = BatchEventProcessor(event_dispatcher, logger, True, self.event_queue, @@ -140,13 +140,13 @@ def test_drain_on_stop(self): self._set_event_processor(event_dispatcher, mock_config_logging) user_event = self._build_conversion_event(self.event_name) - self._event_processor.process(user_event) + self.event_processor.process(user_event) event_dispatcher.expect_conversion(self.event_name, self.test_user_id) time.sleep(5) self.assertStrictTrue(event_dispatcher.compare_events()) - self.assertEqual(0, self._event_processor.event_queue.qsize()) + self.assertEqual(0, self.event_processor.event_queue.qsize()) def test_flush_on_max_timeout(self): event_dispatcher = TestEventDispatcher() @@ -155,13 +155,13 @@ def test_flush_on_max_timeout(self): self._set_event_processor(event_dispatcher, mock_config_logging) user_event = self._build_conversion_event(self.event_name) - self._event_processor.process(user_event) + self.event_processor.process(user_event) event_dispatcher.expect_conversion(self.event_name, self.test_user_id) time.sleep(3) self.assertStrictTrue(event_dispatcher.compare_events()) - self.assertEqual(0, self._event_processor.event_queue.qsize()) + self.assertEqual(0, self.event_processor.event_queue.qsize()) def test_flush_max_batch_size(self): event_dispatcher = TestEventDispatcher() @@ -171,13 +171,13 @@ def test_flush_max_batch_size(self): for i in range(0, self.MAX_BATCH_SIZE): user_event = self._build_conversion_event(self.event_name) - self._event_processor.process(user_event) + self.event_processor.process(user_event) event_dispatcher.expect_conversion(self.event_name, self.test_user_id) time.sleep(1) self.assertStrictTrue(event_dispatcher.compare_events()) - self.assertEqual(0, self._event_processor.event_queue.qsize()) + self.assertEqual(0, self.event_processor.event_queue.qsize()) def test_flush(self): event_dispatcher = TestEventDispatcher() @@ -186,18 +186,18 @@ def test_flush(self): self._set_event_processor(event_dispatcher, mock_config_logging) user_event = self._build_conversion_event(self.event_name) - self._event_processor.process(user_event) - self._event_processor.flush() + self.event_processor.process(user_event) + self.event_processor.flush() event_dispatcher.expect_conversion(self.event_name, self.test_user_id) - self._event_processor.process(user_event) - self._event_processor.flush() + self.event_processor.process(user_event) + self.event_processor.flush() event_dispatcher.expect_conversion(self.event_name, self.test_user_id) time.sleep(3) self.assertStrictTrue(event_dispatcher.compare_events()) - self.assertEqual(0, self._event_processor.event_queue.qsize()) + self.assertEqual(0, self.event_processor.event_queue.qsize()) def test_flush_on_mismatch_revision(self): event_dispatcher = TestEventDispatcher() @@ -209,20 +209,20 @@ def test_flush_on_mismatch_revision(self): self.project_config.project_id = 'X' user_event_1 = self._build_conversion_event(self.event_name, self.project_config) - self._event_processor.process(user_event_1) + self.event_processor.process(user_event_1) event_dispatcher.expect_conversion(self.event_name, self.test_user_id) self.project_config.revision = 2 self.project_config.project_id = 'X' user_event_2 = self._build_conversion_event(self.event_name, self.project_config) - self._event_processor.process(user_event_2) + self.event_processor.process(user_event_2) event_dispatcher.expect_conversion(self.event_name, self.test_user_id) time.sleep(3) self.assertStrictTrue(event_dispatcher.compare_events()) - self.assertEqual(0, self._event_processor.event_queue.qsize()) + self.assertEqual(0, self.event_processor.event_queue.qsize()) def test_flush_on_mismatch_project_id(self): event_dispatcher = TestEventDispatcher() @@ -234,20 +234,20 @@ def test_flush_on_mismatch_project_id(self): self.project_config.project_id = 'X' user_event_1 = self._build_conversion_event(self.event_name, self.project_config) - self._event_processor.process(user_event_1) + self.event_processor.process(user_event_1) event_dispatcher.expect_conversion(self.event_name, self.test_user_id) self.project_config.revision = 1 self.project_config.project_id = 'Y' user_event_2 = self._build_conversion_event(self.event_name, self.project_config) - self._event_processor.process(user_event_2) + self.event_processor.process(user_event_2) event_dispatcher.expect_conversion(self.event_name, self.test_user_id) time.sleep(3) self.assertStrictTrue(event_dispatcher.compare_events()) - self.assertEqual(0, self._event_processor.event_queue.qsize()) + self.assertEqual(0, self.event_processor.event_queue.qsize()) def test_stop_and_start(self): event_dispatcher = TestEventDispatcher() @@ -256,143 +256,150 @@ def test_stop_and_start(self): self._set_event_processor(event_dispatcher, mock_config_logging) user_event = self._build_conversion_event(self.event_name, self.project_config) - self._event_processor.process(user_event) + self.event_processor.process(user_event) event_dispatcher.expect_conversion(self.event_name, self.test_user_id) time.sleep(3) self.assertStrictTrue(event_dispatcher.compare_events()) - self._event_processor.stop() + self.event_processor.stop() - self._event_processor.process(user_event) + self.event_processor.process(user_event) event_dispatcher.expect_conversion(self.event_name, self.test_user_id) - self._event_processor.start() - self.assertStrictTrue(self._event_processor.is_running) + self.event_processor.start() + self.assertStrictTrue(self.event_processor.is_running) - self._event_processor.stop() - self.assertStrictFalse(self._event_processor.is_running) + self.event_processor.stop() + self.assertStrictFalse(self.event_processor.is_running) - self.assertEqual(0, self._event_processor.event_queue.qsize()) + self.assertEqual(0, self.event_processor.event_queue.qsize()) def test_init__invalid_batch_size(self): event_dispatcher = TestEventDispatcher() with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: - self._event_processor = BatchEventProcessor(event_dispatcher, - self.optimizely.logger, - True, - self.event_queue, - 5.5, - self.MAX_DURATION_SEC, - self.MAX_TIMEOUT_INTERVAL_SEC - ) + self.event_processor = BatchEventProcessor( + event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + 5.5, + self.MAX_DURATION_SEC, + self.MAX_TIMEOUT_INTERVAL_SEC + ) # default batch size is 10. - self.assertEqual(self._event_processor.batch_size, 10) - mock_config_logging.info.assert_called_with('Using default value for batch_size.') + self.assertEqual(10, self.event_processor.batch_size) + mock_config_logging.info.assert_called_with('Using default value 10 for batch_size.') def test_init__NaN_batch_size(self): event_dispatcher = TestEventDispatcher() with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: - self._event_processor = BatchEventProcessor(event_dispatcher, - self.optimizely.logger, - True, - self.event_queue, - 'batch_size', - self.MAX_DURATION_SEC, - self.MAX_TIMEOUT_INTERVAL_SEC - ) + self.event_processor = BatchEventProcessor( + event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + 'batch_size', + self.MAX_DURATION_SEC, + self.MAX_TIMEOUT_INTERVAL_SEC + ) # default batch size is 10. - self.assertEqual(self._event_processor.batch_size, 10) - mock_config_logging.info.assert_called_with('Using default value for batch_size.') + self.assertEqual(10, self.event_processor.batch_size) + mock_config_logging.info.assert_called_with('Using default value 10 for batch_size.') def test_init__invalid_flush_interval(self): event_dispatcher = TestEventDispatcher() with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: - self._event_processor = BatchEventProcessor(event_dispatcher, - mock_config_logging, - True, - self.event_queue, - self.MAX_BATCH_SIZE, - 0, - self.MAX_TIMEOUT_INTERVAL_SEC - ) + self.event_processor = BatchEventProcessor( + event_dispatcher, + mock_config_logging, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + 0, + self.MAX_TIMEOUT_INTERVAL_SEC + ) # default flush interval is 30s. - self.assertEqual(self._event_processor.flush_interval, timedelta(seconds=30)) - mock_config_logging.info.assert_called_with('Using default value for flush_interval.') + self.assertEqual(datetime.timedelta(30), self.event_processor.flush_interval) + mock_config_logging.info.assert_called_with('Using default value 30 for flush_interval.') def test_init__bool_flush_interval(self): event_dispatcher = TestEventDispatcher() with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: - self._event_processor = BatchEventProcessor(event_dispatcher, - self.optimizely.logger, - True, - self.event_queue, - self.MAX_BATCH_SIZE, - True, - self.MAX_TIMEOUT_INTERVAL_SEC - ) + self.event_processor = BatchEventProcessor( + event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + True, + self.MAX_TIMEOUT_INTERVAL_SEC + ) # default flush interval is 30s. - self.assertEqual(self._event_processor.flush_interval, timedelta(seconds=30)) - mock_config_logging.info.assert_called_with('Using default value for flush_interval.') + self.assertEqual(datetime.timedelta(30), self.event_processor.flush_interval) + mock_config_logging.info.assert_called_with('Using default value 30 for flush_interval.') def test_init__string_flush_interval(self): event_dispatcher = TestEventDispatcher() with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: - self._event_processor = BatchEventProcessor(event_dispatcher, - self.optimizely.logger, - True, - self.event_queue, - self.MAX_BATCH_SIZE, - "True", - self.MAX_TIMEOUT_INTERVAL_SEC - ) + self.event_processor = BatchEventProcessor( + event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + 'True', + self.MAX_TIMEOUT_INTERVAL_SEC + ) # default flush interval is 30s. - self.assertEqual(self._event_processor.flush_interval, timedelta(seconds=30)) - mock_config_logging.info.assert_called_with('Using default value for flush_interval.') + self.assertEqual(datetime.timedelta(30), self.event_processor.flush_interval) + mock_config_logging.info.assert_called_with('Using default value 30 for flush_interval.') def test_init__invalid_timeout_interval(self): event_dispatcher = TestEventDispatcher() with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: - self._event_processor = BatchEventProcessor(event_dispatcher, - self.optimizely.logger, - True, - self.event_queue, - self.MAX_BATCH_SIZE, - self.MAX_DURATION_SEC, - -100 - ) + self.event_processor = BatchEventProcessor( + event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + self.MAX_DURATION_SEC, + -100 + ) # default timeout interval is 5s. - self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) - mock_config_logging.info.assert_called_with('Using default value for timeout_interval.') + self.assertEqual(datetime.timedelta(5), self.event_processor.timeout_interval) + mock_config_logging.info.assert_called_with('Using default value 5 for timeout_interval.') def test_init__NaN_timeout_interval(self): event_dispatcher = TestEventDispatcher() with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: - self._event_processor = BatchEventProcessor(event_dispatcher, - self.optimizely.logger, - True, - self.event_queue, - self.MAX_BATCH_SIZE, - self.MAX_DURATION_SEC, - False - ) + self.event_processor = BatchEventProcessor( + event_dispatcher, + self.optimizely.logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + self.MAX_DURATION_SEC, + False + ) # default timeout interval is 5s. - self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5)) - mock_config_logging.info.assert_called_with('Using default value for timeout_interval.') + self.assertEqual(datetime.timedelta(5), self.event_processor.timeout_interval) + mock_config_logging.info.assert_called_with('Using default value 5 for timeout_interval.') def test_notification_center__on_log_event(self): @@ -411,9 +418,9 @@ def on_log_event(log_event): self._set_event_processor(mock_event_dispatcher, mock_config_logging) user_event = self._build_conversion_event(self.event_name, self.project_config) - self._event_processor.process(user_event) + self.event_processor.process(user_event) - self._event_processor.stop() + self.event_processor.stop() self.assertEqual(True, callback_hit[0]) self.assertEqual(1, len(self.optimizely.notification_center.notification_listeners[ @@ -443,7 +450,7 @@ def setUp(self, *args, **kwargs): self.event_dispatcher = TestForwardingEventDispatcher(is_updated=False) with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: - self._event_processor = ForwardingEventProcessor(self.event_dispatcher, + self.event_processor = ForwardingEventProcessor(self.event_dispatcher, mock_config_logging, self.notification_center ) @@ -475,7 +482,7 @@ def test_event_processor__dispatch_raises_exception(self): def test_event_processor__with_test_event_dispatcher(self): user_event = self._build_conversion_event(self.event_name) - self._event_processor.process(user_event) + self.event_processor.process(user_event) self.assertStrictTrue(self.event_dispatcher.is_updated) def test_notification_center(self): @@ -491,7 +498,7 @@ def on_log_event(log_event): ) user_event = self._build_conversion_event(self.event_name) - self._event_processor.process(user_event) + self.event_processor.process(user_event) self.assertEqual(True, callback_hit[0]) self.assertEqual(1, len(self.optimizely.notification_center.notification_listeners[ From d9050bb2d652f3f3d6acbc1b7bda3a23e266f088 Mon Sep 17 00:00:00 2001 From: aliabbasrizvi Date: Wed, 23 Oct 2019 22:41:34 -0700 Subject: [PATCH 2/6] Fixing setting of config --- optimizely/config_manager.py | 13 +++++++++++-- optimizely/event/event_processor.py | 1 + tests/test_config_manager.py | 1 + 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/optimizely/config_manager.py b/optimizely/config_manager.py index 11eb1959..9724fa2f 100644 --- a/optimizely/config_manager.py +++ b/optimizely/config_manager.py @@ -96,7 +96,6 @@ def __init__(self, notification_center=notification_center) self._config = None self.validate_schema = not skip_json_validation - self._config_ready_event = threading.Event() self._set_config(datafile) def _set_config(self, datafile): @@ -135,7 +134,6 @@ def _set_config(self, datafile): return self._config = config - self._config_ready_event.set() self.notification_center.send_notifications(enums.NotificationTypes.OPTIMIZELY_CONFIG_UPDATE) self.logger.debug( 'Received new datafile and updated config. ' @@ -186,6 +184,7 @@ def __init__(self, JSON schema validation will be performed. """ + self._config_ready_event = threading.Event() super(PollingConfigManager, self).__init__(datafile=datafile, logger=logger, error_handler=error_handler, @@ -232,6 +231,16 @@ def get_datafile_url(sdk_key, url, url_template): return url + def _set_config(self, datafile): + """ Looks up and sets datafile and config based on response body. + + Args: + datafile: JSON string representing the Optimizely project. + """ + if datafile or self._config_ready_event.is_set(): + super(PollingConfigManager, self)._set_config(datafile=datafile) + self._config_ready_event.set() + def get_config(self): """ Returns instance of ProjectConfig. Returns immediately if project config is ready otherwise blocks maximum for value of blocking_timeout in seconds. diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 000d5923..293ae36f 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -45,6 +45,7 @@ def process(self, user_event): class BatchEventProcessor(BaseEventProcessor): """ BatchEventProcessor is an implementation of the BaseEventProcessor that batches events. + The BatchEventProcessor maintains a single consumer thread that pulls events off of the blocking queue and buffers them for either a configured batch size or for a maximum duration before the resulting LogEvent is sent to the EventDispatcher. diff --git a/tests/test_config_manager.py b/tests/test_config_manager.py index 905b7a65..832ff88f 100644 --- a/tests/test_config_manager.py +++ b/tests/test_config_manager.py @@ -262,6 +262,7 @@ def test_set_blocking_timeout(self, _): self.assertEqual(5, project_config_manager.blocking_timeout) # Assert get_config should block until blocking timeout. + project_config_manager._config_ready_event.clear() start_time = time.time() project_config_manager.get_config() end_time = time.time() From 2393d6a8b06475654118f4bae78fc30012265789 Mon Sep 17 00:00:00 2001 From: aliabbasrizvi Date: Thu, 24 Oct 2019 11:41:47 -0700 Subject: [PATCH 3/6] Fixing typo --- optimizely/event/event_processor.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 293ae36f..416396fa 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -87,17 +87,17 @@ def __init__(self, self.event_dispatcher = event_dispatcher or default_event_dispatcher self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) - self.batch_size = batch_size if self._validate_intantiation_props(batch_size, + self.batch_size = batch_size if self._validate_instantiation_props(batch_size, 'batch_size', self._DEFAULT_BATCH_SIZE) \ else self._DEFAULT_BATCH_SIZE self.flush_interval = timedelta(seconds=flush_interval) \ - if self._validate_intantiation_props(flush_interval, + if self._validate_instantiation_props(flush_interval, 'flush_interval', self._DEFAULT_FLUSH_INTERVAL) \ else timedelta(self._DEFAULT_FLUSH_INTERVAL) self.timeout_interval = timedelta(seconds=timeout_interval) \ - if self._validate_intantiation_props(timeout_interval, + if self._validate_instantiation_props(timeout_interval, 'timeout_interval', self._DEFAULT_TIMEOUT_INTERVAL) \ else timedelta(self._DEFAULT_TIMEOUT_INTERVAL) @@ -119,7 +119,7 @@ def is_running(self): """ Property to check if consumer thread is alive or not. """ return self.executor.isAlive() if self.executor else False - def _validate_intantiation_props(self, prop, prop_name, default_value): + def _validate_instantiation_props(self, prop, prop_name, default_value): """ Method to determine if instantiation properties like batch_size, flush_interval and timeout_interval are valid. From 521ebf1afc6eea100fe9cdcffb503af6871ebd5f Mon Sep 17 00:00:00 2001 From: aliabbasrizvi Date: Thu, 24 Oct 2019 12:00:15 -0700 Subject: [PATCH 4/6] Lint fixes --- optimizely/event/event_processor.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index 416396fa..4b7bd5f1 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -88,18 +88,18 @@ def __init__(self, self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger()) self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY) self.batch_size = batch_size if self._validate_instantiation_props(batch_size, - 'batch_size', - self._DEFAULT_BATCH_SIZE) \ + 'batch_size', + self._DEFAULT_BATCH_SIZE) \ else self._DEFAULT_BATCH_SIZE self.flush_interval = timedelta(seconds=flush_interval) \ if self._validate_instantiation_props(flush_interval, - 'flush_interval', - self._DEFAULT_FLUSH_INTERVAL) \ + 'flush_interval', + self._DEFAULT_FLUSH_INTERVAL) \ else timedelta(self._DEFAULT_FLUSH_INTERVAL) self.timeout_interval = timedelta(seconds=timeout_interval) \ if self._validate_instantiation_props(timeout_interval, - 'timeout_interval', - self._DEFAULT_TIMEOUT_INTERVAL) \ + 'timeout_interval', + self._DEFAULT_TIMEOUT_INTERVAL) \ else timedelta(self._DEFAULT_TIMEOUT_INTERVAL) self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger) From 153e8f93d014c4d0aef4d742386d00c49b512db3 Mon Sep 17 00:00:00 2001 From: aliabbasrizvi Date: Thu, 24 Oct 2019 12:38:04 -0700 Subject: [PATCH 5/6] More lint fixes --- tests/test_event_processor.py | 41 +++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index e061d5f7..82f0c9ca 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -123,15 +123,16 @@ def _build_conversion_event(self, event_name, project_config=None): return UserEventFactory.create_conversion_event(config, event_name, self.test_user_id, {}, {}) def _set_event_processor(self, event_dispatcher, logger): - self.event_processor = BatchEventProcessor(event_dispatcher, - logger, - True, - self.event_queue, - self.MAX_BATCH_SIZE, - self.MAX_DURATION_SEC, - self.MAX_TIMEOUT_INTERVAL_SEC, - self.optimizely.notification_center - ) + self.event_processor = BatchEventProcessor( + event_dispatcher, + logger, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + self.MAX_DURATION_SEC, + self.MAX_TIMEOUT_INTERVAL_SEC, + self.optimizely.notification_center + ) def test_drain_on_stop(self): event_dispatcher = TestEventDispatcher() @@ -450,18 +451,20 @@ def setUp(self, *args, **kwargs): self.event_dispatcher = TestForwardingEventDispatcher(is_updated=False) with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: - self.event_processor = ForwardingEventProcessor(self.event_dispatcher, - mock_config_logging, - self.notification_center - ) + self.event_processor = ForwardingEventProcessor( + self.event_dispatcher, + mock_config_logging, + self.notification_center + ) def _build_conversion_event(self, event_name): - return UserEventFactory.create_conversion_event(self.project_config, - event_name, - self.test_user_id, - {}, - {} - ) + return UserEventFactory.create_conversion_event( + self.project_config, + event_name, + self.test_user_id, + {}, + {} + ) def test_event_processor__dispatch_raises_exception(self): """ Test that process logs dispatch failure gracefully. """ From b2eb96c99cd7e7fef0652f7ca7a7d54e92476899 Mon Sep 17 00:00:00 2001 From: aliabbasrizvi Date: Thu, 24 Oct 2019 13:41:30 -0700 Subject: [PATCH 6/6] Trying to fix test --- tests/test_config_manager.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/tests/test_config_manager.py b/tests/test_config_manager.py index 832ff88f..38be849d 100644 --- a/tests/test_config_manager.py +++ b/tests/test_config_manager.py @@ -159,6 +159,16 @@ def test_get_config(self): # Assert that config is set. self.assertIsInstance(project_config_manager.get_config(), project_config.ProjectConfig) + def test_get_config_blocks(self): + """ Test that get_config blocks until blocking timeout is hit. """ + start_time = time.time() + project_config_manager = config_manager.PollingConfigManager(sdk_key='sdk_key', + blocking_timeout=5) + # Assert get_config should block until blocking timeout. + project_config_manager.get_config() + end_time = time.time() + self.assertEqual(5, round(end_time - start_time)) + @mock.patch('requests.get') class PollingConfigManagerTest(base.BaseTest): @@ -217,7 +227,8 @@ def test_get_datafile_url__sdk_key_and_url_and_template_provided(self, _): def test_set_update_interval(self, _): """ Test set_update_interval with different inputs. """ - project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key') + with mock.patch('optimizely.config_manager.PollingConfigManager.fetch_datafile'): + project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key') # Assert that if invalid update_interval is set, then exception is raised. with self.assertRaisesRegexp(optimizely_exceptions.InvalidInputException, @@ -238,7 +249,8 @@ def test_set_update_interval(self, _): def test_set_blocking_timeout(self, _): """ Test set_blocking_timeout with different inputs. """ - project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key') + with mock.patch('optimizely.config_manager.PollingConfigManager.fetch_datafile'): + project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key') # Assert that if invalid blocking_timeout is set, then exception is raised. with self.assertRaisesRegexp(optimizely_exceptions.InvalidInputException, @@ -261,16 +273,10 @@ def test_set_blocking_timeout(self, _): project_config_manager.set_blocking_timeout(5) self.assertEqual(5, project_config_manager.blocking_timeout) - # Assert get_config should block until blocking timeout. - project_config_manager._config_ready_event.clear() - start_time = time.time() - project_config_manager.get_config() - end_time = time.time() - self.assertEqual(5, round(end_time - start_time)) - def test_set_last_modified(self, _): """ Test that set_last_modified sets last_modified field based on header. """ - project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key') + with mock.patch('optimizely.config_manager.PollingConfigManager.fetch_datafile'): + project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key') last_modified_time = 'Test Last Modified Time' test_response_headers = {