Skip to content

Commit

Permalink
Merge 9ebf1a4 into 498db55
Browse files Browse the repository at this point in the history
  • Loading branch information
mnoman09 committed Oct 3, 2019
2 parents 498db55 + 9ebf1a4 commit 4557d0c
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 5 deletions.
20 changes: 18 additions & 2 deletions optimizely/event/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
from six.moves import queue

from optimizely import logger as _logging
from optimizely import notification_center as _notification_center
from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher
from optimizely.helpers import enums
from optimizely.helpers import validator
from .event_factory import EventFactory
from .user_event import UserEvent
Expand Down Expand Up @@ -62,8 +64,10 @@ def __init__(self,
event_queue=None,
batch_size=None,
flush_interval=None,
timeout_interval=None):
""" BatchEventProcessor init method to configure event batching.
timeout_interval=None,
notification_center=None):
""" EventProcessor init method to configure event batching.
Args:
event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it.
logger: Provides a log method to log messages. By default nothing would be logged.
Expand All @@ -76,6 +80,7 @@ def __init__(self,
be flushed.
timeout_interval: Optional floating point number representing time interval in seconds before joining the consumer
thread.
notification_center: Optional instance of notification_center.NotificationCenter.
"""
self.event_dispatcher = event_dispatcher or default_event_dispatcher
self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger())
Expand All @@ -88,8 +93,13 @@ def __init__(self,
self.timeout_interval = timedelta(seconds=timeout_interval) \
if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \
else self._DEFAULT_TIMEOUT_INTERVAL
self.notification_center = notification_center
self._current_batch = list()

if not validator.is_notification_center_valid(self.notification_center):
self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center'))
self.notification_center = _notification_center.NotificationCenter()

if start_on_init is True:
self.start()

Expand Down Expand Up @@ -195,6 +205,12 @@ def _flush_queue(self):

log_event = EventFactory.create_log_event(to_process_batch, self.logger)

if self.notification_center is not None:
self.notification_center.send_notifications(
enums.NotificationTypes.LOG_EVENT,
log_event
)

try:
self.event_dispatcher.dispatch_event(log_event)
except Exception as e:
Expand Down
4 changes: 4 additions & 0 deletions optimizely/helpers/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,12 @@ class NotificationTypes(object):
TRACK notification listener has the following parameters:
str event_key, str user_id, dict attributes (can be None), event_tags (can be None), Event event
LOG_EVENT notification listener has the following parameter(s):
LogEvent log_event
"""
ACTIVATE = 'ACTIVATE:experiment, user_id, attributes, variation, event'
DECISION = 'DECISION:type, user_id, attributes, decision_info'
OPTIMIZELY_CONFIG_UPDATE = 'OPTIMIZELY_CONFIG_UPDATE'
TRACK = 'TRACK:event_key, user_id, attributes, event_tags, event'
LOG_EVENT = 'LOG_EVENT:log_event'
36 changes: 33 additions & 3 deletions tests/test_event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
from six.moves import queue

from . import base
from optimizely.logger import SimpleLogger
from optimizely.event.payload import Decision, Visitor
from optimizely.event.user_event_factory import UserEventFactory
from optimizely.event.event_processor import BatchEventProcessor
from optimizely.event.log_event import LogEvent
from optimizely.event.user_event_factory import UserEventFactory
from optimizely.helpers import enums
from optimizely.logger import SimpleLogger


class CanonicalEvent(object):
Expand Down Expand Up @@ -110,6 +112,7 @@ def setUp(self, *args, **kwargs):
self.event_name = 'test_event'
self.event_queue = queue.Queue(maxsize=self.DEFAULT_QUEUE_CAPACITY)
self.optimizely.logger = SimpleLogger()
self.notification_center = self.optimizely.notification_center

def tearDown(self):
self._event_processor.stop()
Expand All @@ -125,7 +128,8 @@ def _set_event_processor(self, event_dispatcher, logger):
self.event_queue,
self.MAX_BATCH_SIZE,
self.MAX_DURATION_SEC,
self.MAX_TIMEOUT_INTERVAL_SEC
self.MAX_TIMEOUT_INTERVAL_SEC,
self.optimizely.notification_center
)

def test_drain_on_stop(self):
Expand Down Expand Up @@ -371,3 +375,29 @@ def test_init__NaN_timeout_interval(self):
# default timeout interval is 5s.
self.assertEqual(self._event_processor.timeout_interval, timedelta(seconds=5))
mock_config_logging.info.assert_called_with('Using default value for timeout_interval.')

def test_notification_center__on_log_event(self):

mock_event_dispatcher = mock.Mock()
callback_hit = [False]

def on_log_event(log_event):
self.assertStrictTrue(isinstance(log_event, LogEvent))
callback_hit[0] = True

self.optimizely.notification_center.add_notification_listener(
enums.NotificationTypes.LOG_EVENT, on_log_event
)

with mock.patch.object(self.optimizely, 'logger') as mock_config_logging:
self._set_event_processor(mock_event_dispatcher, mock_config_logging)

user_event = self._build_conversion_event(self.event_name, self.project_config)
self._event_processor.process(user_event)

self._event_processor.stop()

self.assertEqual(True, callback_hit[0])
self.assertEqual(1, len(self.optimizely.notification_center.notification_listeners[
enums.NotificationTypes.LOG_EVENT
]))
16 changes: 16 additions & 0 deletions tests/test_notification_center.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ def on_track_listener(*args):
pass


def on_log_event_listener(*args):
pass


class NotificationCenterTest(unittest.TestCase):

def test_add_notification_listener__valid_type(self):
Expand All @@ -59,6 +63,11 @@ def test_add_notification_listener__valid_type(self):
4, test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener)
)

self.assertEqual(
5, test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT,
on_log_event_listener)
)

def test_add_notification_listener__multiple_listeners(self):
""" Test that multiple listeners of the same type can be successfully added. """

Expand Down Expand Up @@ -138,6 +147,7 @@ def another_on_activate_listener(*args):
self.assertEqual(2, len(test_notification_center.notification_listeners[enums.NotificationTypes.ACTIVATE]))
self.assertEqual(1, len(test_notification_center.notification_listeners[enums.NotificationTypes.DECISION]))
self.assertEqual(0, len(test_notification_center.notification_listeners[enums.NotificationTypes.TRACK]))
self.assertEqual(0, len(test_notification_center.notification_listeners[enums.NotificationTypes.LOG_EVENT]))

# Remove one of the activate listeners and assert.
self.assertTrue(test_notification_center.remove_notification_listener(3))
Expand All @@ -164,6 +174,10 @@ def another_on_activate_listener(*args):
3, test_notification_center.add_notification_listener(enums.NotificationTypes.ACTIVATE,
another_on_activate_listener)
)
self.assertEqual(
4, test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT,
on_log_event_listener)
)

# Try removing a listener which does not exist.
self.assertFalse(test_notification_center.remove_notification_listener(42))
Expand All @@ -180,6 +194,7 @@ def test_clear_notification_listeners(self):
on_config_update_listener)
test_notification_center.add_notification_listener(enums.NotificationTypes.DECISION, on_decision_listener)
test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener)
test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, on_log_event_listener)

# Assert all listeners are there:
for notification_type in notification_center.NOTIFICATION_TYPES:
Expand Down Expand Up @@ -210,6 +225,7 @@ def test_clear_all_notification_listeners(self):
on_config_update_listener)
test_notification_center.add_notification_listener(enums.NotificationTypes.DECISION, on_decision_listener)
test_notification_center.add_notification_listener(enums.NotificationTypes.TRACK, on_track_listener)
test_notification_center.add_notification_listener(enums.NotificationTypes.LOG_EVENT, on_log_event_listener)

# Assert all listeners are there:
for notification_type in notification_center.NOTIFICATION_TYPES:
Expand Down

0 comments on commit 4557d0c

Please sign in to comment.