Skip to content

Commit

Permalink
Merge branch 'mnoman/log_event_notification' into rashid/forwarding_e…
Browse files Browse the repository at this point in the history
…vent_processor
  • Loading branch information
mariamjamal94 committed Sep 18, 2019
2 parents 2d37cbe + 4c8b7f3 commit 9d917a4
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 129 deletions.
25 changes: 0 additions & 25 deletions optimizely/closeable.py

This file was deleted.

4 changes: 2 additions & 2 deletions optimizely/config_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ def set_update_interval(self, update_interval):
'Invalid update_interval "{}" provided.'.format(update_interval)
)

# If polling interval is less than minimum allowed interval then set it to default update interval.
if update_interval < enums.ConfigManager.MIN_UPDATE_INTERVAL:
# If polling interval is less than or equal to 0 then set it to default update interval.
if update_interval <= 0:
self.logger.debug('update_interval value {} too small. Defaulting to {}'.format(
update_interval,
enums.ConfigManager.DEFAULT_UPDATE_INTERVAL)
Expand Down
90 changes: 46 additions & 44 deletions optimizely/event/event_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .user_event import ConversionEvent, ImpressionEvent
from .payload import Decision, EventBatch, Snapshot, SnapshotEvent, Visitor, VisitorAttribute
from .log_event import LogEvent
from optimizely.helpers import enums, event_tag_utils, validator
from optimizely.helpers import enums
from optimizely.helpers import event_tag_utils
from optimizely.helpers import validator
from . import user_event
from . import payload
from . import log_event

CUSTOM_ATTRIBUTE_FEATURE_TYPE = 'custom'

Expand Down Expand Up @@ -47,15 +49,15 @@ def create_log_event(cls, user_events, logger):

visitors = []

for user_event in user_events:
visitor = cls._create_visitor(user_event, logger)
for event in user_events:
visitor = cls._create_visitor(event, logger)

if visitor:
visitors.append(visitor)

user_context = user_event.event_context
user_context = event.event_context

event_batch = EventBatch(
event_batch = payload.EventBatch(
user_context.account_id,
user_context.project_id,
user_context.revision,
Expand All @@ -72,58 +74,58 @@ def create_log_event(cls, user_events, logger):

event_params = event_batch.get_event_params()

return LogEvent(cls.EVENT_ENDPOINT, event_params, cls.HTTP_VERB, cls.HTTP_HEADERS)
return log_event.LogEvent(cls.EVENT_ENDPOINT, event_params, cls.HTTP_VERB, cls.HTTP_HEADERS)

@classmethod
def _create_visitor(cls, user_event, logger):
def _create_visitor(cls, event, logger):
""" Helper method to create Visitor instance for event_batch.
Args:
user_event: Instance of UserEvent.
event: Instance of UserEvent.
logger: Provides a logger instance.
Returns:
Instance of Visitor. None if:
- user_event is invalid.
- event is invalid.
"""

if isinstance(user_event, ImpressionEvent):
decision = Decision(
user_event.experiment.layerId,
user_event.experiment.id,
user_event.variation.id,
if isinstance(event, user_event.ImpressionEvent):
decision = payload.Decision(
event.experiment.layerId,
event.experiment.id,
event.variation.id,
)

snapshot_event = SnapshotEvent(
user_event.experiment.layerId,
user_event.uuid,
snapshot_event = payload.SnapshotEvent(
event.experiment.layerId,
event.uuid,
cls.ACTIVATE_EVENT_KEY,
user_event.timestamp
event.timestamp
)

snapshot = Snapshot([snapshot_event], [decision])
snapshot = payload.Snapshot([snapshot_event], [decision])

visitor = Visitor([snapshot], user_event.visitor_attributes, user_event.user_id)
visitor = payload.Visitor([snapshot], event.visitor_attributes, event.user_id)

return visitor

elif isinstance(user_event, ConversionEvent):
revenue = event_tag_utils.get_revenue_value(user_event.event_tags)
value = event_tag_utils.get_numeric_value(user_event.event_tags, logger)
elif isinstance(event, user_event.ConversionEvent):
revenue = event_tag_utils.get_revenue_value(event.event_tags)
value = event_tag_utils.get_numeric_value(event.event_tags, logger)

snapshot_event = SnapshotEvent(
user_event.event.id,
user_event.uuid,
user_event.event.key,
user_event.timestamp,
snapshot_event = payload.SnapshotEvent(
event.event.id,
event.uuid,
event.event.key,
event.timestamp,
revenue,
value,
user_event.event_tags
event.event_tags
)

snapshot = Snapshot([snapshot_event])
snapshot = payload.Snapshot([snapshot_event])

visitor = Visitor([snapshot], user_event.visitor_attributes, user_event.user_id)
visitor = payload.Visitor([snapshot], event.visitor_attributes, event.user_id)

return visitor

Expand Down Expand Up @@ -156,22 +158,22 @@ def build_attribute_list(attributes, project_config):
attribute_id = project_config.get_attribute_id(attribute_key)
if attribute_id:
attributes_list.append(
VisitorAttribute(
attribute_id,
attribute_key,
CUSTOM_ATTRIBUTE_FEATURE_TYPE,
attribute_value)
payload.VisitorAttribute(
attribute_id,
attribute_key,
CUSTOM_ATTRIBUTE_FEATURE_TYPE,
attribute_value)
)

# Append Bot Filtering Attribute
bot_filtering_value = project_config.get_bot_filtering_value()
if isinstance(bot_filtering_value, bool):
attributes_list.append(
VisitorAttribute(
enums.ControlAttributes.BOT_FILTERING,
enums.ControlAttributes.BOT_FILTERING,
CUSTOM_ATTRIBUTE_FEATURE_TYPE,
bot_filtering_value)
payload.VisitorAttribute(
enums.ControlAttributes.BOT_FILTERING,
enums.ControlAttributes.BOT_FILTERING,
CUSTOM_ATTRIBUTE_FEATURE_TYPE,
bot_filtering_value)
)

return attributes_list
50 changes: 32 additions & 18 deletions optimizely/event/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
from .user_event import UserEvent
from .event_factory import EventFactory
from optimizely import logger as _logging
from optimizely.closeable import Closeable
from optimizely.event_dispatcher import EventDispatcher as default_event_dispatcher
from optimizely.helpers import validator, enums
from optimizely.helpers import enums
from optimizely.helpers import validator

ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})

Expand All @@ -37,7 +37,7 @@ def process(user_event):
pass


class BatchEventProcessor(EventProcessor, Closeable):
class BatchEventProcessor(EventProcessor):
"""
BatchEventProcessor is a batched implementation of the EventProcessor.
Expand All @@ -48,38 +48,53 @@ class BatchEventProcessor(EventProcessor, Closeable):

_DEFAULT_QUEUE_CAPACITY = 1000
_DEFAULT_BATCH_SIZE = 10
_DEFAULT_FLUSH_INTERVAL = timedelta(milliseconds=30000)
_DEFAULT_TIMEOUT_INTERVAL = timedelta(milliseconds=5000)
_DEFAULT_FLUSH_INTERVAL = timedelta(seconds=30)
_DEFAULT_TIMEOUT_INTERVAL = timedelta(seconds=5)
_SHUTDOWN_SIGNAL = object()
_FLUSH_SIGNAL = object()
LOCK = threading.Lock()

def __init__(self,
event_dispatcher,
logger,
default_start=False,
start_on_init=False,
event_queue=None,
batch_size=None,
flush_interval=None,
timeout_interval=None,
notification_center=None):
""" EventProcessor init method to configure event batching.
Args:
event_dispatcher: Provides a dispatch_event method which if given a URL and params sends a request to it.
logger: Provides a log method to log messages. By default nothing would be logged.
start_on_init: Optional boolean param which starts the consumer thread if set to True.
By default thread does not start unless 'start' method is called.
event_queue: Optional component which accumulates the events until dispacthed.
batch_size: Optional param which defines the upper limit of the number of events in event_queue after which
the event_queue will be flushed.
flush_interval: Optional floating point number representing time interval in seconds after which event_queue will
be flushed.
timeout_interval: Optional floating point number representing time interval in seconds before joining the consumer
thread.
notification_center: Optional instance of notification_center.NotificationCenter.
"""
self.event_dispatcher = event_dispatcher or default_event_dispatcher
self.logger = _logging.adapt_logger(logger or _logging.NoOpLogger())
self.event_queue = event_queue or queue.Queue(maxsize=self._DEFAULT_QUEUE_CAPACITY)
self.batch_size = batch_size if self._validate_intantiation_props(batch_size, 'batch_size') \
else self._DEFAULT_BATCH_SIZE
self.flush_interval = timedelta(milliseconds=flush_interval) \
self.flush_interval = timedelta(seconds=flush_interval) \
if self._validate_intantiation_props(flush_interval, 'flush_interval') \
else self._DEFAULT_FLUSH_INTERVAL
self.timeout_interval = timedelta(milliseconds=timeout_interval) \
self.timeout_interval = timedelta(seconds=timeout_interval) \
if self._validate_intantiation_props(timeout_interval, 'timeout_interval') \
else self._DEFAULT_TIMEOUT_INTERVAL
self.notification_center = notification_center
self._disposed = False
self._is_started = False
self._current_batch = list()

if default_start is True:
if start_on_init is True:
self.start()

@property
Expand All @@ -98,18 +113,18 @@ def _validate_intantiation_props(self, prop, prop_name):

return True

def _get_time_in_ms(self, _time=None):
def _get_time(self, _time=None):
if _time is None:
return int(round(time.time() * 1000))
return int(round(time.time()))

return int(round(_time * 1000))
return int(round(_time))

def start(self):
if self.is_started and not self.disposed:
self.logger.warning('Service already started')
return

self.flushing_interval_deadline = self._get_time_in_ms() + self._get_time_in_ms(self.flush_interval.total_seconds())
self.flushing_interval_deadline = self._get_time() + self._get_time(self.flush_interval.total_seconds())
self.executor = threading.Thread(target=self._run)
self.executor.setDaemon(True)
self.executor.start()
Expand All @@ -120,7 +135,7 @@ def _run(self):
""" Scheduler method that periodically flushes events queue. """
try:
while True:
if self._get_time_in_ms() > self.flushing_interval_deadline:
if self._get_time() > self.flushing_interval_deadline:
self._flush_queue()

try:
Expand Down Expand Up @@ -197,8 +212,8 @@ def _add_to_batch(self, user_event):

# Reset the deadline if starting a new batch.
if len(self._current_batch) == 0:
self.flushing_interval_deadline = self._get_time_in_ms() + \
self._get_time_in_ms(self.flush_interval.total_seconds())
self.flushing_interval_deadline = self._get_time() + \
self._get_time(self.flush_interval.total_seconds())

with self.LOCK:
self._current_batch.append(user_event)
Expand All @@ -220,9 +235,8 @@ def _should_split(self, user_event):

return False

def close(self):
def stop(self):
""" Stops and disposes batch event processor. """
self.logger.info('Start close.')

self.event_queue.put(self._SHUTDOWN_SIGNAL)
self.executor.join(self.timeout_interval.total_seconds())
Expand Down
2 changes: 1 addition & 1 deletion optimizely/event/log_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class LogEvent(object):
def __init__(self, url, params, http_verb=None, headers=None):
self.url = url
self.params = params
self.http_verb = http_verb or 'GET'
self.http_verb = http_verb or 'POST'
self.headers = headers

def __str__(self):
Expand Down
16 changes: 8 additions & 8 deletions optimizely/event/user_event_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from .user_event import EventContext, ConversionEvent, ImpressionEvent
from .event_factory import EventFactory
from . import event_factory
from . import user_event


class UserEventFactory(object):
Expand Down Expand Up @@ -40,18 +40,18 @@ def create_impression_event(cls, project_config, activated_experiment, variation
experiment_key = activated_experiment.key
variation = project_config.get_variation_from_id(experiment_key, variation_id)

event_context = EventContext(
event_context = user_event.EventContext(
project_config.account_id,
project_config.project_id,
project_config.revision,
project_config.anonymize_ip
)

return ImpressionEvent(
return user_event.ImpressionEvent(
event_context,
user_id,
activated_experiment,
EventFactory.build_attribute_list(user_attributes, project_config),
event_factory.EventFactory.build_attribute_list(user_attributes, project_config),
variation,
project_config.get_bot_filtering_value()
)
Expand All @@ -71,18 +71,18 @@ def create_conversion_event(cls, project_config, event_key, user_id, user_attrib
Event object encapsulating the conversion event.
"""

event_context = EventContext(
event_context = user_event.EventContext(
project_config.account_id,
project_config.project_id,
project_config.revision,
project_config.anonymize_ip
)

return ConversionEvent(
return user_event.ConversionEvent(
event_context,
project_config.get_event(event_key),
user_id,
EventFactory.build_attribute_list(user_attributes, project_config),
event_factory.EventFactory.build_attribute_list(user_attributes, project_config),
event_tags,
project_config.get_bot_filtering_value()
)
2 changes: 0 additions & 2 deletions optimizely/helpers/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ class ConfigManager(object):
DATAFILE_URL_TEMPLATE = 'https://cdn.optimizely.com/datafiles/{sdk_key}.json'
# Default config update interval of 5 minutes
DEFAULT_UPDATE_INTERVAL = 5 * 60
# Minimum config update interval of 1 second
MIN_UPDATE_INTERVAL = 1
# Time in seconds before which request for datafile times out
REQUEST_TIMEOUT = 10

Expand Down
Loading

0 comments on commit 9d917a4

Please sign in to comment.