Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added IFF feature to async branch #492

Merged
merged 3 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
'pyyaml>=5.4',
'docopt>=0.6.2',
'enum34;python_version<"3.4"',
'bloom-filter2>=2.0.0',
'bloom-filter2>=2.0.0'
]

with open(path.join(path.abspath(path.dirname(__file__)), 'splitio', 'version.py')) as f:
Expand All @@ -44,7 +44,7 @@
'uwsgi': ['uwsgi>=2.0.0'],
'cpphash': ['mmh3cffi==0.2.1'],
},
setup_requires=['pytest-runner'],
setup_requires=['pytest-runner', 'pluggy==1.0.0;python_version<"3.7"'],
classifiers=[
'Environment :: Console',
'Intended Audience :: Developers',
Expand Down
4 changes: 0 additions & 4 deletions splitio/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,6 @@ def _get_treatment(self, method, key, feature, attributes=None):
result = self._evaluator.eval_with_context(key, bucketing, feature, attributes, ctx)
except Exception as e: # toto narrow this
_LOGGER.error('Error getting treatment for feature flag')
_LOGGER.error(str(e))
_LOGGER.debug('Error: ', exc_info=True)
self._telemetry_evaluation_producer.record_exception(method)
result = self._FAILED_EVAL_RESULT
Expand Down Expand Up @@ -382,7 +381,6 @@ def _get_treatments(self, key, features, method, attributes=None):
results = self._evaluator.eval_many_with_context(key, bucketing, features, attributes, ctx)
except Exception as e: # toto narrow this
_LOGGER.error('Error getting treatment for feature flag')
_LOGGER.error(str(e))
_LOGGER.debug('Error: ', exc_info=True)
self._telemetry_evaluation_producer.record_exception(method)
results = {n: self._FAILED_EVAL_RESULT for n in features}
Expand Down Expand Up @@ -572,7 +570,6 @@ async def _get_treatment(self, method, key, feature, attributes=None):
result = self._evaluator.eval_with_context(key, bucketing, feature, attributes, ctx)
except Exception as e: # toto narrow this
_LOGGER.error('Error getting treatment for feature flag')
_LOGGER.error(str(e))
_LOGGER.debug('Error: ', exc_info=True)
await self._telemetry_evaluation_producer.record_exception(method)
result = self._FAILED_EVAL_RESULT
Expand Down Expand Up @@ -662,7 +659,6 @@ async def _get_treatments(self, key, features, method, attributes=None):
results = self._evaluator.eval_many_with_context(key, bucketing, features, attributes, ctx)
except Exception as e: # toto narrow this
_LOGGER.error('Error getting treatment for feature flag')
_LOGGER.error(str(e))
_LOGGER.debug('Error: ', exc_info=True)
await self._telemetry_evaluation_producer.record_exception(method)
results = {n: self._FAILED_EVAL_RESULT for n in features}
Expand Down
2 changes: 1 addition & 1 deletion splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ async def block_until_ready(self, timeout=None):
await asyncio.wait_for(asyncio.shield(self._sdk_ready_flag.wait()), timeout)
except asyncio.TimeoutError as e:
_LOGGER.error("Exception initializing SDK")
_LOGGER.error(str(e))
_LOGGER.debug(str(e))
await self._telemetry_init_producer.record_bur_time_out()
raise TimeoutException('SDK Initialization: time of %d exceeded' % timeout)

Expand Down
18 changes: 17 additions & 1 deletion splitio/engine/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
_LOGGER = logging.getLogger(__name__)

from splitio.storage.inmemmory import InMemoryTelemetryStorage
from splitio.models.telemetry import CounterConstants
from splitio.models.telemetry import CounterConstants, UpdateFromSSE

class TelemetryStorageProducerBase(object):
"""Telemetry storage producer base class."""
Expand Down Expand Up @@ -212,6 +212,9 @@ def record_session_length(self, session):
"""Record session length."""
self._telemetry_storage.record_session_length(session)

def record_update_from_sse(self, event):
"""Record update from sse."""
self._telemetry_storage.record_update_from_sse(event)

class TelemetryRuntimeProducerAsync(object):
"""Telemetry runtime producer async class."""
Expand Down Expand Up @@ -260,6 +263,9 @@ async def record_session_length(self, session):
"""Record session length."""
await self._telemetry_storage.record_session_length(session)

async def record_update_from_sse(self, event):
"""Record update from sse."""
await self._telemetry_storage.record_update_from_sse(event)

class TelemetryStorageConsumerBase(object):
"""Telemetry storage consumer base class."""
Expand Down Expand Up @@ -539,6 +545,10 @@ def pop_streaming_events(self):
"""Get and reset streaming events."""
return self._telemetry_storage.pop_streaming_events()

def pop_update_from_sse(self, event):
"""Get and reset update from sse."""
return self._telemetry_storage.pop_update_from_sse(event)

def get_session_length(self):
"""Get session length"""
return self._telemetry_storage.get_session_length()
Expand All @@ -561,6 +571,7 @@ def pop_formatted_stats(self):
'eQ': self.get_events_stats(CounterConstants.EVENTS_QUEUED),
'eD': self.get_events_stats(CounterConstants.EVENTS_DROPPED),
'lS': self._last_synchronization_to_json(last_synchronization),
'ufs': {event.value: self.pop_update_from_sse(event) for event in UpdateFromSSE},
't': self.pop_tags(),
'hE': self._http_errors_to_json(http_errors),
'hL': self._http_latencies_to_json(http_latencies),
Expand Down Expand Up @@ -615,6 +626,10 @@ async def pop_streaming_events(self):
"""Get and reset streaming events."""
return await self._telemetry_storage.pop_streaming_events()

async def pop_update_from_sse(self, event):
"""Get and reset update from sse."""
return await self._telemetry_storage.pop_update_from_sse(event)

async def get_session_length(self):
"""Get session length"""
return await self._telemetry_storage.get_session_length()
Expand All @@ -636,6 +651,7 @@ async def pop_formatted_stats(self):
'iDr': await self.get_impressions_stats(CounterConstants.IMPRESSIONS_DROPPED),
'eQ': await self.get_events_stats(CounterConstants.EVENTS_QUEUED),
'eD': await self.get_events_stats(CounterConstants.EVENTS_DROPPED),
'ufs': {event.value: await self.pop_update_from_sse(event) for event in UpdateFromSSE},
'lS': self._last_synchronization_to_json(last_synchronization),
't': await self.pop_tags(),
'hE': self._http_errors_to_json(http_errors['httpErrors']),
Expand Down
53 changes: 49 additions & 4 deletions splitio/models/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,10 @@ class OperationMode(Enum):
CONSUMER = 'consumer'
PARTIAL_CONSUMER = 'partial_consumer'

class UpdateFromSSE(Enum):
"""Update from sse constants"""
SPLIT_UPDATE = 'sp'

def get_latency_bucket_index(micros):
"""
Find the bucket index for a measured latency.
Expand Down Expand Up @@ -856,6 +860,7 @@ def _reset_all(self):
self._auth_rejections = 0
self._token_refreshes = 0
self._session_length = 0
self._update_from_sse = {}

@abc.abstractmethod
def record_impressions_value(self, resource, value):
Expand Down Expand Up @@ -959,22 +964,42 @@ def record_events_value(self, resource, value):
else:
return

def record_update_from_sse(self, event):
"""
Increment the update from sse resource by one.
"""
with self._lock:
if event.value not in self._update_from_sse:
self._update_from_sse[event.value] = 0
self._update_from_sse[event.value] += 1

def record_auth_rejections(self):
"""
Increament the auth rejection resource by one.
Increment the auth rejection resource by one.

"""
with self._lock:
self._auth_rejections += 1

def record_token_refreshes(self):
"""
Increament the token refreshes resource by one.
Increment the token refreshes resource by one.

"""
with self._lock:
self._token_refreshes += 1

def pop_update_from_sse(self, event):
"""
Pop update from sse
:return: update from sse value
:rtype: int
"""
with self._lock:
update_from_sse = self._update_from_sse[event.value]
self._update_from_sse[event.value] = 0
return update_from_sse

def record_session_length(self, session):
"""
Set the session length value
Expand Down Expand Up @@ -1094,22 +1119,42 @@ async def record_events_value(self, resource, value):
else:
return

async def record_update_from_sse(self, event):
"""
Increment the update from sse resource by one.
"""
async with self._lock:
if event.value not in self._update_from_sse:
self._update_from_sse[event.value] = 0
self._update_from_sse[event.value] += 1

async def record_auth_rejections(self):
"""
Increament the auth rejection resource by one.
Increment the auth rejection resource by one.

"""
async with self._lock:
self._auth_rejections += 1

async def record_token_refreshes(self):
"""
Increament the token refreshes resource by one.
Increment the token refreshes resource by one.

"""
async with self._lock:
self._token_refreshes += 1

async def pop_update_from_sse(self, event):
"""
Pop update from sse
:return: update from sse value
:rtype: int
"""
async with self._lock:
update_from_sse = self._update_from_sse[event.value]
self._update_from_sse[event.value] = 0
return update_from_sse

async def record_session_length(self, session):
"""
Set the session length value
Expand Down
5 changes: 2 additions & 3 deletions splitio/push/manager.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Push subsystem manager class and helpers."""

import logging
from threading import Timer
import abc
Expand Down Expand Up @@ -67,7 +66,7 @@ def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, telemetr
"""
self._auth_api = auth_api
self._feedback_loop = feedback_loop
self._processor = MessageProcessor(synchronizer)
self._processor = MessageProcessor(synchronizer, telemetry_runtime_producer)
self._status_tracker = PushStatusTracker(telemetry_runtime_producer)
self._event_handlers = {
EventType.MESSAGE: self._handle_message,
Expand Down Expand Up @@ -300,7 +299,7 @@ def __init__(self, auth_api, synchronizer, feedback_loop, sdk_metadata, telemetr
"""
self._auth_api = auth_api
self._feedback_loop = feedback_loop
self._processor = MessageProcessorAsync(synchronizer)
self._processor = MessageProcessorAsync(synchronizer, telemetry_runtime_producer)
self._status_tracker = PushStatusTrackerAsync(telemetry_runtime_producer)
self._event_handlers = {
EventType.MESSAGE: self._handle_message,
Expand Down
58 changes: 44 additions & 14 deletions splitio/push/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def __str__(self):


class BaseUpdate(BaseMessage, metaclass=abc.ABCMeta):
"""Split data update notification."""
"""Feature flag data update notification."""

def __init__(self, channel, timestamp, change_number):
"""
Expand Down Expand Up @@ -324,11 +324,14 @@ def change_number(self):


class SplitChangeUpdate(BaseUpdate):
"""Split Change notification."""
"""Feature flag Change notification."""

def __init__(self, channel, timestamp, change_number):
def __init__(self, channel, timestamp, change_number, previous_change_number, feature_flag_definition, compression):
"""Class constructor."""
BaseUpdate.__init__(self, channel, timestamp, change_number)
self._previous_change_number = previous_change_number
self._feature_flag_definition = feature_flag_definition
self._compression = compression

@property
def update_type(self): # pylint:disable=no-self-use
Expand All @@ -340,18 +343,45 @@ def update_type(self): # pylint:disable=no-self-use
"""
return UpdateType.SPLIT_UPDATE

@property
def previous_change_number(self): # pylint:disable=no-self-use
"""
Return previous change number
:returns: The previous change number
:rtype: int
"""
return self._previous_change_number

@property
def feature_flag_definition(self): # pylint:disable=no-self-use
"""
Return feature flag definition
:returns: The new feature flag definition
:rtype: str
"""
return self._feature_flag_definition

@property
def compression(self): # pylint:disable=no-self-use
"""
Return previous compression type
:returns: The compression type
:rtype: int
"""
return self._compression

def __str__(self):
"""Return string representation."""
return "SplitChange - changeNumber=%d" % (self.change_number)


class SplitKillUpdate(BaseUpdate):
"""Split Kill notification."""
"""Feature flag Kill notification."""

def __init__(self, channel, timestamp, change_number, split_name, default_treatment): # pylint:disable=too-many-arguments
def __init__(self, channel, timestamp, change_number, feature_flag_name, default_treatment): # pylint:disable=too-many-arguments
"""Class constructor."""
BaseUpdate.__init__(self, channel, timestamp, change_number)
self._split_name = split_name
self._feature_flag_name = feature_flag_name
self._default_treatment = default_treatment

@property
Expand All @@ -365,14 +395,14 @@ def update_type(self): # pylint:disable=no-self-use
return UpdateType.SPLIT_KILL

@property
def split_name(self):
def feature_flag_name(self):
"""
Return the name of the killed split.
Return the name of the killed feature flag.

:returns: name of the killed split
:returns: name of the killed feature flag
:rtype: str
"""
return self._split_name
return self._feature_flag_name

@property
def default_treatment(self):
Expand All @@ -387,7 +417,7 @@ def default_treatment(self):
def __str__(self):
"""Return string representation."""
return "SplitKill - changeNumber=%d, name=%s, defaultTreatment=%s" % \
(self.change_number, self.split_name, self.default_treatment)
(self.change_number, self.feature_flag_name, self.default_treatment)


class SegmentChangeUpdate(BaseUpdate):
Expand Down Expand Up @@ -471,9 +501,9 @@ def _parse_update(channel, timestamp, data):
"""
update_type = UpdateType(data['type'])
change_number = data['changeNumber']
if update_type == UpdateType.SPLIT_UPDATE:
return SplitChangeUpdate(channel, timestamp, change_number)
elif update_type == UpdateType.SPLIT_KILL:
if update_type == UpdateType.SPLIT_UPDATE and change_number is not None:
return SplitChangeUpdate(channel, timestamp, change_number, data.get('pcn'), data.get('d'), data.get('c'))
elif update_type == UpdateType.SPLIT_KILL and change_number is not None:
return SplitKillUpdate(channel, timestamp, change_number,
data['splitName'], data['defaultTreatment'])
elif update_type == UpdateType.SEGMENT_UPDATE:
Expand Down
Loading