diff --git a/CHANGELOG.md b/CHANGELOG.md index 716b8499..a11b8aa7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -84,12 +84,12 @@ targeting functionality. * Note that for results segmentation in Optimizely results, the user attribute values from one event are automatically applied to all other events in the same session, as long as the events in question were actually received by our backend. This behavior was already in place and is not affected by the 3.0 release. * Support for all types of attribute values, not just strings. * All values are passed through to notification listeners. - * Strings, booleans, and valid numbers are passed to the event dispatcher and can be used for Optimizely results segmentation. A valid number is a finite float or numbers.Integral in the inclusive range \[-2⁵³, 2⁵³\]. + * Strings, booleans, and valid numbers are passed to the event dispatcher and can be used for Optimizely results segmentation. A valid number is a finite float or numbers.Integral in the inclusive range \[-2 ^ 53, 2 ^ 53\]. * Strings, booleans, and valid numbers are relevant for audience conditions. * Support for additional matchers in audience conditions: * An `exists` matcher that passes if the user has a non-null value for the targeted user attribute and fails otherwise. * A `substring` matcher that resolves if the user has a string value for the targeted attribute. - * `gt` (greater than) and `lt` (less than) matchers that resolve if the user has a valid number value for the targeted attribute. A valid number is a finite float or numbers.Integral in the inclusive range \[-2⁵³, 2⁵³\]. + * `gt` (greater than) and `lt` (less than) matchers that resolve if the user has a valid number value for the targeted attribute. A valid number is a finite float or numbers.Integral in the inclusive range \[-2 ^ 53, 2 ^ 53\]. * The original (`exact`) matcher can now be used to target booleans and valid numbers, not just strings. * Support for A/B tests, feature tests, and feature rollouts whose audiences are combined using `"and"` and `"not"` operators, not just the `"or"` operator. * Datafile-version compatibility check: The SDK will remain uninitialized (i.e., will gracefully fail to activate experiments and features) if given a datafile version greater than 4. diff --git a/optimizely/event/event_processor.py b/optimizely/event/event_processor.py index db44c041..3f82a7fe 100644 --- a/optimizely/event/event_processor.py +++ b/optimizely/event/event_processor.py @@ -150,18 +150,18 @@ def _validate_instantiation_props(self, prop, prop_name, default_value): return is_valid def _get_time(self, _time=None): - """ Method to return rounded off time as integer in seconds. If _time is None, uses current time. + """ Method to return time as float in seconds. If _time is None, uses current time. Args: - _time: time in seconds that needs to be rounded off. + _time: time in seconds. Returns: - Integer time in seconds. + Float time in seconds. """ if _time is None: - return int(round(time.time())) + return time.time() - return int(round(_time)) + return _time def start(self): """ Starts the batch processing thread to batch events. """ @@ -182,12 +182,18 @@ def _run(self): while True: if self._get_time() >= self.flushing_interval_deadline: self._flush_queue() + self.flushing_interval_deadline = self._get_time() + \ + self._get_time(self.flush_interval.total_seconds()) + self.logger.debug('Flush interval deadline. Flushed queue.') try: - item = self.event_queue.get(False) + interval = self.flushing_interval_deadline - self._get_time() + item = self.event_queue.get(True, interval) + + if item is None: + continue except queue.Empty: - time.sleep(0.05) continue if item == self._SHUTDOWN_SIGNAL: diff --git a/tests/test_event_processor.py b/tests/test_event_processor.py index e16032fe..a8a954f4 100644 --- a/tests/test_event_processor.py +++ b/tests/test_event_processor.py @@ -173,6 +173,28 @@ def test_flush_on_max_timeout(self): self.assertStrictTrue(event_dispatcher.compare_events()) self.assertEqual(0, self.event_processor.event_queue.qsize()) + def test_flush_once_max_timeout(self): + event_dispatcher = TestEventDispatcher() + + self.optimizely.logger = SimpleLogger(enums.LogLevels.DEBUG) + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self._set_event_processor(event_dispatcher, mock_config_logging) + + user_event = self._build_conversion_event(self.event_name) + self.event_processor.process(user_event) + event_dispatcher.expect_conversion(self.event_name, self.test_user_id) + + time.sleep(1.75) + + self.assertStrictTrue(event_dispatcher.compare_events()) + self.assertEqual(0, self.event_processor.event_queue.qsize()) + self.assertTrue(mock_config_logging.debug.called) + mock_config_logging.debug.assert_any_call('Received event of type ConversionEvent for user test_user.') + mock_config_logging.debug.assert_any_call('Flush interval deadline. Flushed queue.') + self.assertTrue(mock_config_logging.debug.call_count == 2) + self.optimizely.logger = SimpleLogger() + def test_flush_max_batch_size(self): event_dispatcher = TestEventDispatcher() @@ -339,6 +361,40 @@ def test_init__invalid_flush_interval(self): self.assertEqual(datetime.timedelta(seconds=30), self.event_processor.flush_interval) mock_config_logging.info.assert_called_with('Using default value 30 for flush_interval.') + def test_init__float_flush_interval(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self.event_processor = BatchEventProcessor( + event_dispatcher, + mock_config_logging, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + 0.5, + self.MAX_TIMEOUT_INTERVAL_SEC, + ) + + # default flush interval is 30s. + self.assertEqual(datetime.timedelta(seconds=0.5), self.event_processor.flush_interval) + + def test_init__float_flush_deadline(self): + event_dispatcher = TestEventDispatcher() + + with mock.patch.object(self.optimizely, 'logger') as mock_config_logging: + self.event_processor = BatchEventProcessor( + event_dispatcher, + mock_config_logging, + True, + self.event_queue, + self.MAX_BATCH_SIZE, + 0.5, + self.MAX_TIMEOUT_INTERVAL_SEC, + ) + + # default flush interval is 30s. + self.assertTrue(isinstance(self.event_processor.flushing_interval_deadline, float)) + def test_init__bool_flush_interval(self): event_dispatcher = TestEventDispatcher()