Skip to content
2 changes: 1 addition & 1 deletion splitio/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def set_telemetry_data(self, metric_name, telemetry_runtime_producer):
self._metric_name = metric_name

def is_sdk_endpoint_overridden(self):
return self._urls['sdk'] == SDK_URL
return self._urls['sdk'] != SDK_URL

def _get_headers(self, extra_headers, sdk_key):
headers = _build_basic_headers(sdk_key)
Expand Down
28 changes: 19 additions & 9 deletions splitio/api/splits.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from splitio.models.telemetry import HTTPExceptionsAndLatencies
from splitio.util.time import utctime_ms
from splitio.spec import SPEC_VERSION
from splitio.sync import util

_LOGGER = logging.getLogger(__name__)
_SPEC_1_1 = "1.1"
Expand Down Expand Up @@ -36,15 +37,20 @@ def __init__(self, client, sdk_key, sdk_metadata, telemetry_runtime_producer):
self._spec_version = SPEC_VERSION
self._last_proxy_check_timestamp = 0
self.clear_storage = False
self._old_spec_since = None

def _convert_to_new_spec(self, body):
return {"ff": {"d": body["splits"], "s": body["since"], "t": body["till"]},
"rbs": {"d": [], "s": -1, "t": -1}}

def _check_last_proxy_check_timestamp(self):
def _check_last_proxy_check_timestamp(self, since):
if self._spec_version == _SPEC_1_1 and ((utctime_ms() - self._last_proxy_check_timestamp) >= _PROXY_CHECK_INTERVAL_MILLISECONDS_SS):
_LOGGER.info("Switching to new Feature flag spec (%s) and fetching.", SPEC_VERSION);
self._spec_version = SPEC_VERSION
self._old_spec_since = since

def _check_old_spec_since(self, change_number):
if self._spec_version == _SPEC_1_1 and self._old_spec_since is not None:
since = self._old_spec_since
self._old_spec_since = None
return since
return change_number


class SplitsAPI(SplitsAPIBase): # pylint: disable=too-few-public-methods
Expand Down Expand Up @@ -80,7 +86,9 @@ def fetch_splits(self, change_number, rbs_change_number, fetch_options):
:rtype: dict
"""
try:
self._check_last_proxy_check_timestamp()
self._check_last_proxy_check_timestamp(change_number)
change_number = self._check_old_spec_since(change_number)

query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
response = self._client.get(
'sdk',
Expand All @@ -91,7 +99,7 @@ def fetch_splits(self, change_number, rbs_change_number, fetch_options):
)
if 200 <= response.status_code < 300:
if self._spec_version == _SPEC_1_1:
return self._convert_to_new_spec(json.loads(response.body))
return util.convert_to_new_spec(json.loads(response.body))

self.clear_storage = self._last_proxy_check_timestamp != 0
self._last_proxy_check_timestamp = 0
Expand Down Expand Up @@ -148,7 +156,9 @@ async def fetch_splits(self, change_number, rbs_change_number, fetch_options):
:rtype: dict
"""
try:
self._check_last_proxy_check_timestamp()
self._check_last_proxy_check_timestamp(change_number)
change_number = self._check_old_spec_since(change_number)

query, extra_headers = build_fetch(change_number, fetch_options, self._metadata, rbs_change_number)
response = await self._client.get(
'sdk',
Expand All @@ -159,7 +169,7 @@ async def fetch_splits(self, change_number, rbs_change_number, fetch_options):
)
if 200 <= response.status_code < 300:
if self._spec_version == _SPEC_1_1:
return self._convert_to_new_spec(json.loads(response.body))
return util.convert_to_new_spec(json.loads(response.body))

self.clear_storage = self._last_proxy_check_timestamp != 0
self._last_proxy_check_timestamp = 0
Expand Down
4 changes: 2 additions & 2 deletions splitio/client/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ def _build_in_memory_factory(api_key, cfg, sdk_url=None, events_url=None, # pyl

synchronizers = SplitSynchronizers(
SplitSynchronizer(apis['splits'], storages['splits'], storages['rule_based_segments']),
SegmentSynchronizer(apis['segments'], storages['splits'], storages['segments']),
SegmentSynchronizer(apis['segments'], storages['splits'], storages['segments'], storages['rule_based_segments']),
ImpressionSynchronizer(apis['impressions'], storages['impressions'],
cfg['impressionsBulkSize']),
EventSynchronizer(apis['events'], storages['events'], cfg['eventsBulkSize']),
Expand Down Expand Up @@ -693,7 +693,7 @@ async def _build_in_memory_factory_async(api_key, cfg, sdk_url=None, events_url=

synchronizers = SplitSynchronizers(
SplitSynchronizerAsync(apis['splits'], storages['splits'], storages['rule_based_segments']),
SegmentSynchronizerAsync(apis['segments'], storages['splits'], storages['segments']),
SegmentSynchronizerAsync(apis['segments'], storages['splits'], storages['segments'], storages['rule_based_segments']),
ImpressionSynchronizerAsync(apis['impressions'], storages['impressions'],
cfg['impressionsBulkSize']),
EventSynchronizerAsync(apis['events'], storages['events'], cfg['eventsBulkSize']),
Expand Down
121 changes: 53 additions & 68 deletions splitio/engine/evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
from splitio.models.grammar.condition import ConditionType
from splitio.models.grammar.matchers.misc import DependencyMatcher
from splitio.models.grammar.matchers.keys import UserDefinedSegmentMatcher
from splitio.models.grammar.matchers.rule_based_segment import RuleBasedSegmentMatcher
from splitio.models.grammar.matchers import RuleBasedSegmentMatcher
from splitio.models.rule_based_segments import SegmentType
from splitio.optional.loaders import asyncio

CONTROL = 'control'
EvaluationContext = namedtuple('EvaluationContext', ['flags', 'segment_memberships', 'segment_rbs_memberships', 'segment_rbs_conditions'])
EvaluationContext = namedtuple('EvaluationContext', ['flags', 'segment_memberships', 'rbs_segments'])

_LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -114,47 +115,24 @@ def context_for(self, key, feature_names):
:rtype: EvaluationContext
"""
pending = set(feature_names)
pending_rbs = set()
splits = {}
rb_segments = {}
pending_memberships = set()
pending_rbs_memberships = set()
while pending:
while pending or pending_rbs:
fetched = self._flag_storage.fetch_many(list(pending))
features = filter_missing(fetched)
splits.update(features)
pending = set()
for feature in features.values():
cf, cs, crbs = get_dependencies(feature)
pending.update(filter(lambda f: f not in splits, cf))
pending_memberships.update(cs)
pending_rbs_memberships.update(crbs)

rbs_segment_memberships = {}
rbs_segment_conditions = {}
key_membership = False
segment_memberhsip = False
for rbs_segment in pending_rbs_memberships:
rbs_segment_obj = self._rbs_segment_storage.get(rbs_segment)
pending_memberships.update(rbs_segment_obj.get_condition_segment_names())

key_membership = key in rbs_segment_obj.excluded.get_excluded_keys()
segment_memberhsip = False
for segment_name in rbs_segment_obj.excluded.get_excluded_segments():
if self._segment_storage.segment_contains(segment_name, key):
segment_memberhsip = True
break

rbs_segment_memberships.update({rbs_segment: segment_memberhsip or key_membership})
if not (segment_memberhsip or key_membership):
rbs_segment_conditions.update({rbs_segment: [condition for condition in rbs_segment_obj.conditions]})

fetched_rbs = self._rbs_segment_storage.fetch_many(list(pending_rbs))
features, rbsegments, splits, rb_segments = update_objects(fetched, fetched_rbs, splits, rb_segments)
pending, pending_memberships, pending_rbs = get_pending_objects(features, splits, rbsegments, rb_segments, pending_memberships)

return EvaluationContext(
splits,
{ segment: self._segment_storage.segment_contains(segment, key)
for segment in pending_memberships
},
rbs_segment_memberships,
rbs_segment_conditions
rb_segments
)


class AsyncEvaluationDataFactory:

Expand All @@ -173,60 +151,36 @@ async def context_for(self, key, feature_names):
:rtype: EvaluationContext
"""
pending = set(feature_names)
pending_rbs = set()
splits = {}
rb_segments = {}
pending_memberships = set()
pending_rbs_memberships = set()
while pending:
while pending or pending_rbs:
fetched = await self._flag_storage.fetch_many(list(pending))
features = filter_missing(fetched)
splits.update(features)
pending = set()
for feature in features.values():
cf, cs, crbs = get_dependencies(feature)
pending.update(filter(lambda f: f not in splits, cf))
pending_memberships.update(cs)
pending_rbs_memberships.update(crbs)

rbs_segment_memberships = {}
rbs_segment_conditions = {}
key_membership = False
segment_memberhsip = False
for rbs_segment in pending_rbs_memberships:
rbs_segment_obj = await self._rbs_segment_storage.get(rbs_segment)
pending_memberships.update(rbs_segment_obj.get_condition_segment_names())

key_membership = key in rbs_segment_obj.excluded.get_excluded_keys()
segment_memberhsip = False
for segment_name in rbs_segment_obj.excluded.get_excluded_segments():
if await self._segment_storage.segment_contains(segment_name, key):
segment_memberhsip = True
break

rbs_segment_memberships.update({rbs_segment: segment_memberhsip or key_membership})
if not (segment_memberhsip or key_membership):
rbs_segment_conditions.update({rbs_segment: [condition for condition in rbs_segment_obj.conditions]})
fetched_rbs = await self._rbs_segment_storage.fetch_many(list(pending_rbs))
features, rbsegments, splits, rb_segments = update_objects(fetched, fetched_rbs, splits, rb_segments)
pending, pending_memberships, pending_rbs = get_pending_objects(features, splits, rbsegments, rb_segments, pending_memberships)

segment_names = list(pending_memberships)
segment_memberships = await asyncio.gather(*[
self._segment_storage.segment_contains(segment, key)
for segment in segment_names
])

return EvaluationContext(
splits,
dict(zip(segment_names, segment_memberships)),
rbs_segment_memberships,
rbs_segment_conditions
rb_segments
)


def get_dependencies(feature):
def get_dependencies(object):
"""
:rtype: tuple(list, list)
"""
feature_names = []
segment_names = []
rbs_segment_names = []
for condition in feature.conditions:
for condition in object.conditions:
for matcher in condition.matchers:
if isinstance(matcher,RuleBasedSegmentMatcher):
rbs_segment_names.append(matcher._rbs_segment_name)
Expand All @@ -239,3 +193,34 @@ def get_dependencies(feature):

def filter_missing(features):
return {k: v for (k, v) in features.items() if v is not None}

def get_pending_objects(features, splits, rbsegments, rb_segments, pending_memberships):
pending = set()
pending_rbs = set()
for feature in features.values():
cf, cs, crbs = get_dependencies(feature)
pending.update(filter(lambda f: f not in splits, cf))
pending_memberships.update(cs)
pending_rbs.update(filter(lambda f: f not in rb_segments, crbs))

for rb_segment in rbsegments.values():
cf, cs, crbs = get_dependencies(rb_segment)
pending.update(filter(lambda f: f not in splits, cf))
pending_memberships.update(cs)
for excluded_segment in rb_segment.excluded.get_excluded_segments():
if excluded_segment.type == SegmentType.STANDARD:
pending_memberships.add(excluded_segment.name)
else:
pending_rbs.update(filter(lambda f: f not in rb_segments, [excluded_segment.name]))
pending_rbs.update(filter(lambda f: f not in rb_segments, crbs))

return pending, pending_memberships, pending_rbs

def update_objects(fetched, fetched_rbs, splits, rb_segments):
features = filter_missing(fetched)
rbsegments = filter_missing(fetched_rbs)
splits.update(features)
rb_segments.update(rbsegments)

return features, rbsegments, splits, rb_segments

39 changes: 31 additions & 8 deletions splitio/models/grammar/matchers/rule_based_segment.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Rule based segment matcher classes."""
from splitio.models.grammar.matchers.base import Matcher
from splitio.models.rule_based_segments import SegmentType

class RuleBasedSegmentMatcher(Matcher):

Expand Down Expand Up @@ -29,20 +30,42 @@ def _match(self, key, attributes=None, context=None):
if self._rbs_segment_name == None:
return False

# Check if rbs segment has exclusions
if context['ec'].segment_rbs_memberships.get(self._rbs_segment_name):
rb_segment = context['ec'].rbs_segments.get(self._rbs_segment_name)

if key in rb_segment.excluded.get_excluded_keys():
return False

if self._match_dep_rb_segments(rb_segment.excluded.get_excluded_segments(), key, attributes, context):
return False

for parsed_condition in context['ec'].segment_rbs_conditions.get(self._rbs_segment_name):
if parsed_condition.matches(key, attributes, context):
return True

return False
return self._match_conditions(rb_segment.conditions, key, attributes, context)

def _add_matcher_specific_properties_to_json(self):
"""Return UserDefinedSegment specific properties."""
return {
'userDefinedSegmentMatcherData': {
'segmentName': self._rbs_segment_name
}
}
}

def _match_conditions(self, rbs_segment_conditions, key, attributes, context):
for parsed_condition in rbs_segment_conditions:
if parsed_condition.matches(key, attributes, context):
return True

return False

def _match_dep_rb_segments(self, excluded_rb_segments, key, attributes, context):
for excluded_rb_segment in excluded_rb_segments:
if excluded_rb_segment.type == SegmentType.STANDARD:
if context['ec'].segment_memberships[excluded_rb_segment.name]:
return True
else:
excluded_segment = context['ec'].rbs_segments.get(excluded_rb_segment.name)
if key in excluded_segment.excluded.get_excluded_keys():
return False

if self._match_dep_rb_segments(excluded_segment.excluded.get_excluded_segments(), key, attributes, context):
return True

return self._match_conditions(excluded_segment.conditions, key, attributes, context)
Loading