Skip to content

Commit

Permalink
Merge b2eb96c into 77ec185
Browse files Browse the repository at this point in the history
  • Loading branch information
aliabbasrizvi committed Oct 24, 2019
2 parents 77ec185 + b2eb96c commit 73d510a
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 164 deletions.
13 changes: 11 additions & 2 deletions optimizely/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ def __init__(self,
notification_center=notification_center)
self._config = None
self.validate_schema = not skip_json_validation
self._config_ready_event = threading.Event()
self._set_config(datafile)

def _set_config(self, datafile):
Expand Down Expand Up @@ -135,7 +134,6 @@ def _set_config(self, datafile):
return

self._config = config
self._config_ready_event.set()
self.notification_center.send_notifications(enums.NotificationTypes.OPTIMIZELY_CONFIG_UPDATE)
self.logger.debug(
'Received new datafile and updated config. '
Expand Down Expand Up @@ -186,6 +184,7 @@ def __init__(self,
JSON schema validation will be performed.
"""
self._config_ready_event = threading.Event()
super(PollingConfigManager, self).__init__(datafile=datafile,
logger=logger,
error_handler=error_handler,
Expand Down Expand Up @@ -232,6 +231,16 @@ def get_datafile_url(sdk_key, url, url_template):

return url

def _set_config(self, datafile):
""" Looks up and sets datafile and config based on response body.
Args:
datafile: JSON string representing the Optimizely project.
"""
if datafile or self._config_ready_event.is_set():
super(PollingConfigManager, self)._set_config(datafile=datafile)
self._config_ready_event.set()

def get_config(self):
""" Returns instance of ProjectConfig. Returns immediately if project config is ready otherwise
blocks maximum for value of blocking_timeout in seconds.
Expand Down
75 changes: 46 additions & 29 deletions optimizely/event/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class BaseEventProcessor(ABC):
""" Class encapsulating event processing. Override with your own implementation. """

@abc.abstractmethod
def process(user_event):
def process(self, user_event):
""" Method to provide intermediary processing stage within event production.
Args:
user_event: UserEvent instance that needs to be processed and dispatched.
Expand All @@ -45,33 +45,34 @@ def process(user_event):
class BatchEventProcessor(BaseEventProcessor):
"""
BatchEventProcessor is an implementation of the BaseEventProcessor that batches events.
The BatchEventProcessor maintains a single consumer thread that pulls events off of
the blocking queue and buffers them for either a configured batch size or for a
maximum duration before the resulting LogEvent is sent to the EventDispatcher.
"""

_DEFAULT_QUEUE_CAPACITY = 1000
_DEFAULT_BATCH_SIZE = 10
_DEFAULT_FLUSH_INTERVAL = timedelta(seconds=30)
_DEFAULT_TIMEOUT_INTERVAL = timedelta(seconds=5)
_DEFAULT_FLUSH_INTERVAL = 30
_DEFAULT_TIMEOUT_INTERVAL = 5
_SHUTDOWN_SIGNAL = object()
_FLUSH_SIGNAL = object()
LOCK = threading.Lock()

def __init__(self,
event_dispatcher,
logger,
logger=None,
start_on_init=False,
event_queue=None,
batch_size=None,
flush_interval=None,
timeout_interval=None,
notification_center=None):
""" EventProcessor init method to configure event batching.
""" BatchEventProcessor init method to configure event batching.
Args:
event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it.
logger: Provides a log method to log messages. By default nothing would be logged.
logger: Optional component which provides a log method to log messages. By default nothing would be logged.
start_on_init: Optional boolean param which starts the consumer thread if set to True.
Default value is False.
event_queue: Optional component which accumulates the events until dispacthed.
Expand All @@ -86,20 +87,28 @@ def __init__(self,
self.event_dispatcher = event_dispatcher or default_event_dispatcher
self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger())
self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY)
self.batch_size = batch_size if self._validate_intantiation_props(batch_size, 'batch_size') \
self.batch_size = batch_size if self._validate_instantiation_props(batch_size,
'batch_size',
self._DEFAULT_BATCH_SIZE) \
else self._DEFAULT_BATCH_SIZE
self.flush_interval = timedelta(seconds=flush_interval) \
if self._validate_intantiation_props(flush_interval, 'flush_interval') \
else self._DEFAULT_FLUSH_INTERVAL
if self._validate_instantiation_props(flush_interval,
'flush_interval',
self._DEFAULT_FLUSH_INTERVAL) \
else timedelta(self._DEFAULT_FLUSH_INTERVAL)
self.timeout_interval = timedelta(seconds=timeout_interval) \
if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \
else self._DEFAULT_TIMEOUT_INTERVAL
self.notification_center = notification_center
if self._validate_instantiation_props(timeout_interval,
'timeout_interval',
self._DEFAULT_TIMEOUT_INTERVAL) \
else timedelta(self._DEFAULT_TIMEOUT_INTERVAL)

self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger)
self._current_batch = list()

if not validator.is_notification_center_valid(self.notification_center):
self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center'))
self.notification_center = _notification_center.NotificationCenter()
self.logger.debug('Creating notification center for use.')
self.notification_center = _notification_center.NotificationCenter(self.logger)

self.executor = None
if start_on_init is True:
Expand All @@ -110,13 +119,14 @@ def is_running(self):
""" Property to check if consumer thread is alive or not. """
return self.executor.isAlive() if self.executor else False

def _validate_intantiation_props(self, prop, prop_name):
def _validate_instantiation_props(self, prop, prop_name, default_value):
""" Method to determine if instantiation properties like batch_size, flush_interval
and timeout_interval are valid.
Args:
prop: Property value that needs to be validated.
prop_name: Property name.
default_value: Default value for property.
Returns:
False if property value is None or less than or equal to 0 or not a finite number.
Expand All @@ -132,7 +142,7 @@ def _validate_intantiation_props(self, prop, prop_name):
is_valid = False

if is_valid is False:
self.logger.info('Using default value for {}.'.format(prop_name))
self.logger.info('Using default value {} for {}.'.format(default_value, prop_name))

return is_valid

Expand Down Expand Up @@ -213,11 +223,10 @@ def _flush_queue(self):

log_event = EventFactory.create_log_event(to_process_batch, self.logger)

if self.notification_center is not None:
self.notification_center.send_notifications(
enums.NotificationTypes.LOG_EVENT,
log_event
)
self.notification_center.send_notifications(
enums.NotificationTypes.LOG_EVENT,
log_event
)

try:
self.event_dispatcher.dispatch_event(log_event)
Expand All @@ -226,14 +235,17 @@ def _flush_queue(self):

def process(self, user_event):
""" Method to process the user_event by putting it in event_queue.
Args:
user_event: UserEvent Instance.
"""
if not isinstance(user_event, UserEvent):
self.logger.error('Provided event is in an invalid format.')
return

self.logger.debug('Received user_event: ' + str(user_event))
self.logger.debug('Received event of type {} for user {}.'.format(
type(user_event).__name__, user_event.user_id)
)

try:
self.event_queue.put_nowait(user_event)
Expand All @@ -242,6 +254,7 @@ def process(self, user_event):

def _add_to_batch(self, user_event):
""" Method to append received user event to current batch.
Args:
user_event: UserEvent Instance.
"""
Expand All @@ -261,9 +274,11 @@ def _add_to_batch(self, user_event):

def _should_split(self, user_event):
""" Method to check if current event batch should split into two.
Args:
user_event: UserEvent Instance.
Return Value:
Returns:
- True, if revision number and project_id of last event in current batch do not match received event's
revision number and project id respectively.
- False, otherwise.
Expand Down Expand Up @@ -311,30 +326,32 @@ def __init__(self, event_dispatcher, logger=None, notification_center=None):
"""
self.event_dispatcher = event_dispatcher
self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger())
self.notification_center = notification_center
self.notification_center = notification_center or _notification_center.NotificationCenter(self.logger)

if not validator.is_notification_center_valid(self.notification_center):
self.logger.error(enums.Errors.INVALID_INPUT.format('notification_center'))
self.notification_center = _notification_center.NotificationCenter()

def process(self, user_event):
""" Method to process the user_event by dispatching it.
Args:
user_event: UserEvent Instance.
"""
if not isinstance(user_event, UserEvent):
self.logger.error('Provided event is in an invalid format.')
return

self.logger.debug('Received user_event: ' + str(user_event))
self.logger.debug('Received event of type {} for user {}.'.format(
type(user_event).__name__, user_event.user_id)
)

log_event = EventFactory.create_log_event(user_event, self.logger)

if self.notification_center is not None:
self.notification_center.send_notifications(
enums.NotificationTypes.LOG_EVENT,
log_event
)
self.notification_center.send_notifications(
enums.NotificationTypes.LOG_EVENT,
log_event
)

try:
self.event_dispatcher.dispatch_event(log_event)
Expand Down
9 changes: 6 additions & 3 deletions optimizely/optimizely.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ def __init__(self,
notification_center: Optional instance of notification_center.NotificationCenter. Useful when providing own
config_manager.BaseConfigManager implementation which can be using the
same NotificationCenter instance.
event_processor: Processes the given event(s) by creating LogEvent(s) and then dispatching it.
event_processor: Optional component which processes the given event(s).
By default optimizely.event.event_processor.ForwardingEventProcessor is used
which simply forwards events to the event dispatcher.
To enable event batching configure and use optimizely.event.event_processor.BatchEventProcessor.
"""
self.logger_name = '.'.join([__name__, self.__class__.__name__])
self.is_valid = True
Expand All @@ -68,8 +71,8 @@ def __init__(self,
self.config_manager = config_manager
self.notification_center = notification_center or NotificationCenter(self.logger)
self.event_processor = event_processor or ForwardingEventProcessor(self.event_dispatcher,
self.logger,
self.notification_center)
logger=self.logger,
notification_center=self.notification_center)

try:
self._validate_instantiation_options()
Expand Down
25 changes: 16 additions & 9 deletions tests/test_config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,16 @@ def test_get_config(self):
# Assert that config is set.
self.assertIsInstance(project_config_manager.get_config(), project_config.ProjectConfig)

def test_get_config_blocks(self):
""" Test that get_config blocks until blocking timeout is hit. """
start_time = time.time()
project_config_manager = config_manager.PollingConfigManager(sdk_key='sdk_key',
blocking_timeout=5)
# Assert get_config should block until blocking timeout.
project_config_manager.get_config()
end_time = time.time()
self.assertEqual(5, round(end_time - start_time))


@mock.patch('requests.get')
class PollingConfigManagerTest(base.BaseTest):
Expand Down Expand Up @@ -217,7 +227,8 @@ def test_get_datafile_url__sdk_key_and_url_and_template_provided(self, _):

def test_set_update_interval(self, _):
""" Test set_update_interval with different inputs. """
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')
with mock.patch('optimizely.config_manager.PollingConfigManager.fetch_datafile'):
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')

# Assert that if invalid update_interval is set, then exception is raised.
with self.assertRaisesRegexp(optimizely_exceptions.InvalidInputException,
Expand All @@ -238,7 +249,8 @@ def test_set_update_interval(self, _):

def test_set_blocking_timeout(self, _):
""" Test set_blocking_timeout with different inputs. """
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')
with mock.patch('optimizely.config_manager.PollingConfigManager.fetch_datafile'):
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')

# Assert that if invalid blocking_timeout is set, then exception is raised.
with self.assertRaisesRegexp(optimizely_exceptions.InvalidInputException,
Expand All @@ -261,15 +273,10 @@ def test_set_blocking_timeout(self, _):
project_config_manager.set_blocking_timeout(5)
self.assertEqual(5, project_config_manager.blocking_timeout)

# Assert get_config should block until blocking timeout.
start_time = time.time()
project_config_manager.get_config()
end_time = time.time()
self.assertEqual(5, round(end_time - start_time))

def test_set_last_modified(self, _):
""" Test that set_last_modified sets last_modified field based on header. """
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')
with mock.patch('optimizely.config_manager.PollingConfigManager.fetch_datafile'):
project_config_manager = config_manager.PollingConfigManager(sdk_key='some_key')

last_modified_time = 'Test Last Modified Time'
test_response_headers = {
Expand Down

0 comments on commit 73d510a

Please sign in to comment.