From 33fb7f56fe91c9870e3d1b11e40e9f89494aa93f Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 18 Jan 2018 15:30:08 -0800 Subject: [PATCH 01/26] support segments --- ldclient/config.py | 2 +- ldclient/feature_store.py | 73 ------------- ldclient/flag.py | 61 ++++++++--- ldclient/in_memory_store.py | 101 ++++++++++++++++++ ldclient/interfaces.py | 68 +++++++++++- ...{redis_feature_store.py => redis_store.py} | 0 ldclient/twisted_redis_feature_store.py | 2 +- testing/test_feature_store.py | 4 +- testing/test_ldclient.py | 2 +- 9 files changed, 222 insertions(+), 91 deletions(-) delete mode 100644 ldclient/feature_store.py create mode 100644 ldclient/in_memory_store.py rename ldclient/{redis_feature_store.py => redis_store.py} (100%) diff --git a/ldclient/config.py b/ldclient/config.py index 1ec6a82a..7d42a31b 100644 --- a/ldclient/config.py +++ b/ldclient/config.py @@ -1,5 +1,5 @@ from ldclient.event_consumer import EventConsumerImpl -from ldclient.feature_store import InMemoryFeatureStore +from ldclient.in_memory_store import InMemoryFeatureStore from ldclient.util import log GET_LATEST_FEATURES_PATH = '/sdk/latest-flags' diff --git a/ldclient/feature_store.py b/ldclient/feature_store.py deleted file mode 100644 index 9daf5f9c..00000000 --- a/ldclient/feature_store.py +++ /dev/null @@ -1,73 +0,0 @@ -from ldclient.util import log -from ldclient.interfaces import FeatureStore -from ldclient.rwlock import ReadWriteLock - - -class InMemoryFeatureStore(FeatureStore): - - def __init__(self): - self._lock = ReadWriteLock() - self._initialized = False - self._features = {} - - def get(self, key, callback): - try: - self._lock.rlock() - f = self._features.get(key) - if f is None: - log.debug("Attempted to get missing feature: " + str(key) + " Returning None") - return callback(None) - if 'deleted' in f and f['deleted']: - log.debug("Attempted to get deleted feature: " + str(key) + " Returning None") - return callback(None) - return callback(f) - finally: - self._lock.runlock() - - def all(self, callback): - try: - self._lock.rlock() - return callback(dict((k, f) for k, f in self._features.items() if ('deleted' not in f) or not f['deleted'])) - finally: - self._lock.runlock() - - def init(self, features): - try: - self._lock.lock() - self._features = dict(features) - self._initialized = True - log.debug("Initialized feature store with " + str(len(features)) + " features") - finally: - self._lock.unlock() - - # noinspection PyShadowingNames - def delete(self, key, version): - try: - self._lock.lock() - f = self._features.get(key) - if f is not None and f['version'] < version: - f['deleted'] = True - f['version'] = version - elif f is None: - f = {'deleted': True, 'version': version} - self._features[key] = f - finally: - self._lock.unlock() - - def upsert(self, key, feature): - try: - self._lock.lock() - f = self._features.get(key) - if f is None or f['version'] < feature['version']: - self._features[key] = feature - log.debug("Updated feature {0} to version {1}".format(key, feature['version'])) - finally: - self._lock.unlock() - - @property - def initialized(self): - try: - self._lock.rlock() - return self._initialized - finally: - self._lock.runlock() diff --git a/ldclient/flag.py b/ldclient/flag.py index 34211c8e..38daf973 100644 --- a/ldclient/flag.py +++ b/ldclient/flag.py @@ -24,18 +24,18 @@ def evaluate(flag, user, store): return _get_off_variation(flag), prereq_events -def _evaluate(flag, user, store, prereq_events=None): +def _evaluate(flag, user, feature_store, segment_store, prereq_events=None): events = prereq_events or [] failed_prereq = None prereq_value = None for prereq in flag.get('prerequisites') or []: - prereq_flag = store.get(prereq.get('key'), lambda x: x) + prereq_flag = feature_store.get(prereq.get('key'), lambda x: x) if prereq_flag is None: log.warn("Missing prereq flag: " + prereq.get('key')) failed_prereq = prereq break if prereq_flag.get('on', False) is True: - prereq_value, events = _evaluate(prereq_flag, user, store, events) + prereq_value, events = _evaluate(prereq_flag, user, feature_store, segment_store, events) variation = _get_variation(prereq_flag, prereq.get('variation')) if prereq_value is None or not prereq_value == variation: failed_prereq = prereq @@ -49,11 +49,11 @@ def _evaluate(flag, user, store, prereq_events=None): if failed_prereq is not None: return None, events - index = _evaluate_index(flag, user) + index = _evaluate_index(flag, user, segment_store) return _get_variation(flag, index), events -def _evaluate_index(feature, user): +def _evaluate_index(feature, user, segment_store): # Check to see if any user targets match: for target in feature.get('targets') or []: for value in target.get('values') or []: @@ -62,7 +62,7 @@ def _evaluate_index(feature, user): # Now walk through the rules to see if any match for rule in feature.get('rules') or []: - if _rule_matches_user(rule, user): + if _rule_matches_user(rule, user, segment_store): return _variation_index_for_user(feature, rule, user) # Walk through fallthrough and see if it matches @@ -103,7 +103,7 @@ def _variation_index_for_user(feature, rule, user): bucket_by = 'key' if rule['rollout'].get('bucketBy') is not None: bucket_by = rule['rollout']['bucketBy'] - bucket = _bucket_user(user, feature, bucket_by) + bucket = _bucket_user(user, feature['key'], feature['salt'], bucket_by) sum = 0.0 for wv in rule['rollout'].get('variations') or []: sum += wv.get('weight', 0.0) / 100000.0 @@ -113,7 +113,7 @@ def _variation_index_for_user(feature, rule, user): return None -def _bucket_user(user, feature, bucket_by): +def _bucket_user(user, key, salt, bucket_by): u_value, should_pass = _get_user_attribute(user, bucket_by) if should_pass is True or not isinstance(u_value, six.string_types): return 0.0 @@ -121,21 +121,31 @@ def _bucket_user(user, feature, bucket_by): id_hash = u_value if user.get('secondary') is not None: id_hash = id_hash + '.' + user['secondary'] - hash_key = '%s.%s.%s' % (feature['key'], feature['salt'], id_hash) + hash_key = '%s.%s.%s' % (key, salt, id_hash) hash_val = int(hashlib.sha1(hash_key.encode('utf-8')).hexdigest()[:15], 16) result = hash_val / __LONG_SCALE__ return result -def _rule_matches_user(rule, user): +def _rule_matches_user(rule, user, segment_store): for clause in rule.get('clauses') or []: if clause.get('attribute') is not None: - if not _clause_matches_user(clause, user): + if not _clause_matches_user(clause, user, segment_store): return False return True -def _clause_matches_user(clause, user): +def _clause_matches_user(clause, user, segment_store): + if clause.get('op') == 'segmentMatch': + for seg_key in clause.get('values') or []: + segment = segment_store.get(seg_key) + if segment and _segment_matches_user(segment, user): + return _maybe_negate(clause, true) + return _maybe_negate(clause, false) + else: + return _clause_matches_user_no_segments(clause, user) + +def _clause_matches_user_no_segments(clause, user): u_value, should_pass = _get_user_attribute(user, clause.get('attribute')) if should_pass is True: return False @@ -151,6 +161,33 @@ def _clause_matches_user(clause, user): else: return _maybe_negate(clause, _match_any(op_fn, u_value, clause.get('values') or [])) +def _segment_matches_user(segment, user): + if user.get('key'): + key = user['key'] + if key in (segment.get('included') or []): + return true + if key in (segment.get('excluded') or []): + return false + for rule in segment.get('rules') or []: + if _segment_rule_matches_user(rule, user, segment.get('key'), segment.get('salt')): + return true + return false + +def _segment_rule_matches_user(rule, user, segment_key, salt): + for clause in rule.get('clauses') or []: + if not _clause_matches_user_no_segments(clause, user): + return false + + # If the weight is absent, this rule matches + if not rule.get('weight'): + return true + + # All of the clauses are met. See if the user buckets in + bucket_by = 'key' if rule.get('bucketBy') is None else rule['bucketBy'] + bucket = _bucket_user(user, segment_key, salt, bucket_by) + weight = rule['weight'] / 100000.0 + return bucket < weight + def _match_any(op_fn, u, vals): for v in vals: diff --git a/ldclient/in_memory_store.py b/ldclient/in_memory_store.py new file mode 100644 index 00000000..543837ea --- /dev/null +++ b/ldclient/in_memory_store.py @@ -0,0 +1,101 @@ +from abc import ABCMeta, abstractmethod + +from ldclient.util import log +from ldclient.interfaces import FeatureStore, SegmentStore +from ldclient.rwlock import ReadWriteLock + + +class InMemoryStoreBase(object): + """ + Abstract base class for in-memory data stores. + """ + __metaclass__ = ABCMeta + + def __init__(self): + self._lock = ReadWriteLock() + self._initialized = False + self._items = {} + + def get(self, key, callback): + try: + self._lock.rlock() + item = self._items.get(key) + if item is None: + log.debug("Attempted to get missing %s: %s, returning None", self.item_name(), key) + return callback(None) + if 'deleted' in item and item['deleted']: + log.debug("Attempted to get deleted %s: %s, returning None", self.item_name(), key) + return callback(None) + return callback(item) + finally: + self._lock.runlock() + + def all(self, callback): + try: + self._lock.rlock() + return callback(dict((k, i) for k, i in self._items.items() if ('deleted' not in i) or not i['deleted'])) + finally: + self._lock.runlock() + + def init(self, items): + try: + self._lock.lock() + self._items = dict(items) + self._initialized = True + log.debug("Initialized %s store with %d items", self.item_name(), len(items)) + finally: + self._lock.unlock() + + # noinspection PyShadowingNames + def delete(self, key, version): + try: + self._lock.lock() + i = self._items.get(key) + if i is not None and i['version'] < version: + i['deleted'] = True + i['version'] = version + elif i is None: + i = {'deleted': True, 'version': version} + self._items[key] = i + finally: + self._lock.unlock() + + def upsert(self, key, item): + try: + self._lock.lock() + i = self._items.get(key) + if i is None or i['version'] < item['version']: + self._items[key] = item + log.debug("Updated %s %s to version %d", self.item_name(), key, item['version']) + finally: + self._lock.unlock() + + @property + def initialized(self): + try: + self._lock.rlock() + return self._initialized + finally: + self._lock.runlock() + + @abstractmethod + def item_name(self): + """ + Returns a description of the kind of item held in this store (feature or segment). + """ + + +class InMemoryFeatureStore(InMemoryStoreBase, FeatureStore): + def __init__(self): + InMemoryStoreBase.__init__(self) + + def item_name(self): + return 'feature' + + +class InMemorySegmentStore(InMemoryStoreBase, SegmentStore): + def __init__(self): + InMemoryStoreBase.__init__(self) + + def item_name(self): + return 'segment' diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index 80ae7a8c..7e505f1d 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -52,7 +52,7 @@ def upsert(self, key, feature): """ Inserts a feature flag if its version is newer or missing - :param key: The feature flag + :param key: The feature flag key :type key: str :param feature: The feature information :type feature: dict @@ -67,6 +67,72 @@ def initialized(self): """ +class SegmentStore(object): + """ + Stores and retrieves the state of user segments + """ + __metaclass__ = ABCMeta + + @abstractmethod + def get(self, key, callback): + """ + Gets a segment and calls the callback with the segment data to return the result + :param key: The segment key + :type key: str + :param callback: The function that accepts the segment data and returns the segment value + :type callback: Function that processes the segment flag once received. + :return: The result of executing callback. + """ + + @abstractmethod + def all(self, callback): + """ + Returns all user segments and their data + :param callback: The function that accepts the segment data + :type callback: Function that processes the segments once received. + :rtype: The result of executing callback. + """ + + @abstractmethod + def init(self, features): + """ + Initializes the store with a set of user segments. Meant to be called by the UpdateProcessor + + :param features: The segments and their data as provided by LD + :type features: dict[str, dict] + """ + + @abstractmethod + def delete(self, key, version): + """ + Marks a segment as deleted + + :param key: The segment key + :type key: str + :param version: The version of the segment to mark as deleted + :type version: str + """ + + @abstractmethod + def upsert(self, key, feature): + """ + Inserts a segment if its version is newer or missing + + :param key: The segment key + :type key: str + :param feature: The segment information + :type feature: dict + """ + + @abstractproperty + def initialized(self): + """ + Returns whether the store has been initialized yet or not + + :rtype: bool + """ + + class BackgroundOperation(object): """ Performs a task in the background diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_store.py similarity index 100% rename from ldclient/redis_feature_store.py rename to ldclient/redis_store.py diff --git a/ldclient/twisted_redis_feature_store.py b/ldclient/twisted_redis_feature_store.py index de2566ed..43c34d94 100644 --- a/ldclient/twisted_redis_feature_store.py +++ b/ldclient/twisted_redis_feature_store.py @@ -9,7 +9,7 @@ from ldclient.expiringdict import ExpiringDict from ldclient.interfaces import FeatureStore -from ldclient.redis_feature_store import ForgetfulDict, INIT_KEY +from ldclient.redis_store import ForgetfulDict, INIT_KEY from ldclient.util import log diff --git a/testing/test_feature_store.py b/testing/test_feature_store.py index ef458986..8976aef5 100644 --- a/testing/test_feature_store.py +++ b/testing/test_feature_store.py @@ -1,8 +1,8 @@ import pytest import redis -from ldclient.feature_store import InMemoryFeatureStore -from ldclient.redis_feature_store import RedisFeatureStore +from ldclient.in_memory_store import InMemoryFeatureStore +from ldclient.redis_store import RedisFeatureStore class TestFeatureStore: diff --git a/testing/test_ldclient.py b/testing/test_ldclient.py index b6585362..ecd96d89 100644 --- a/testing/test_ldclient.py +++ b/testing/test_ldclient.py @@ -1,6 +1,6 @@ from builtins import object from ldclient.client import LDClient, Config -from ldclient.feature_store import InMemoryFeatureStore +from ldclient.in_memory_store import InMemoryFeatureStore from ldclient.interfaces import FeatureRequester, FeatureStore import pytest from testing.sync_util import wait_until From 91cd0131377621b2ae9c8d002cd1992022b15dbd Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 13:08:58 -0800 Subject: [PATCH 02/26] genericized feature store + misc fixes --- ldclient/client.py | 5 +- ldclient/config.py | 8 ++ ldclient/feature_requester.py | 35 +++++---- ldclient/flag.py | 37 ++++----- ldclient/in_memory_store.py | 78 ++++++++----------- ldclient/polling.py | 3 +- ldclient/redis_store.py | 131 +++++++++++++++++--------------- ldclient/streaming.py | 55 +++++++++----- ldclient/versioned_data_kind.py | 34 +++++++++ testing/test_feature_store.py | 51 +++++++------ 10 files changed, 255 insertions(+), 182 deletions(-) create mode 100644 ldclient/versioned_data_kind.py diff --git a/ldclient/client.py b/ldclient/client.py index bb293297..14a87e04 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -14,6 +14,7 @@ from ldclient.polling import PollingUpdateProcessor from ldclient.streaming import StreamingUpdateProcessor from ldclient.util import check_uwsgi, log +from ldclient.versioned_data_kind import FEATURES, SEGMENTS # noinspection PyBroadException try: @@ -184,7 +185,7 @@ def cb(flag): return default - return self._store.get(key, cb) + return self._store.get(FEATURES, key, cb) def _evaluate(self, flag, user): return evaluate(flag, user, self._store) @@ -223,7 +224,7 @@ def cb(all_flags): log.error("Exception caught in all_flags: " + e.message + " for user: " + str(user)) return {} - return self._store.all(cb) + return self._store.all(FEATURES, cb) def _evaluate_multi(self, user, flags): return dict([(k, self._evaluate(v, user)[0]) for k, v in flags.items() or {}]) diff --git a/ldclient/config.py b/ldclient/config.py index 7d42a31b..a557ccdf 100644 --- a/ldclient/config.py +++ b/ldclient/config.py @@ -135,6 +135,10 @@ def get_default(self, key, default): def sdk_key(self): return self.__sdk_key + @property + def base_uri(self): + return self.__base_uri + @property def get_latest_flags_uri(self): return self.__base_uri + GET_LATEST_FEATURES_PATH @@ -143,6 +147,10 @@ def get_latest_flags_uri(self): def events_uri(self): return self.__events_uri + '/bulk' + @property + def stream_base_uri(self): + return self.__stream_uri + @property def stream_uri(self): return self.__stream_uri + STREAM_FLAGS_PATH diff --git a/ldclient/feature_requester.py b/ldclient/feature_requester.py index 6b71f99d..96106793 100644 --- a/ldclient/feature_requester.py +++ b/ldclient/feature_requester.py @@ -6,6 +6,10 @@ from ldclient.interfaces import FeatureRequester from ldclient.util import _headers from ldclient.util import log +from ldclient.versioned_data_kind import FEATURES, SEGMENTS + + +LATEST_ALL_URI = '/sdk/latest-all' class FeatureRequesterImpl(FeatureRequester): @@ -14,32 +18,35 @@ def __init__(self, config): self._session_no_cache = requests.Session() self._config = config - def get_all(self): + def get_all_data(self): hdrs = _headers(self._config.sdk_key) - uri = self._config.get_latest_flags_uri + uri = self._config.base_uri + LATEST_ALL_URI r = self._session_cache.get(uri, headers=hdrs, timeout=( self._config.connect_timeout, self._config.read_timeout)) r.raise_for_status() - flags = r.json() - versions_summary = list(map(lambda f: "{0}:{1}".format(f.get("key"), f.get("version")), flags.values())) - log.debug("Get All flags response status:[{0}] From cache?[{1}] ETag:[{2}] flag versions: {3}" - .format(r.status_code, r.from_cache, r.headers.get('ETag'), versions_summary)) - return flags + allData = r.json() + log.debug("Get All flags response status:[%d] From cache?[%s] ETag:[%s]", + r.status_code, r.from_cache, r.headers.get('ETag')) + return { + FEATURES: allData['flags'], + SEGMENTS: allData['segments'] + } - def get_one(self, key): + def get_one(self, kind, key): hdrs = _headers(self._config.sdk_key) - uri = self._config.get_latest_flags_uri + '/' + key - log.debug("Getting one feature flag using uri: " + uri) + path = kind.request_api_path + '/' + key + uri = config.base_uri + path + log.debug("Getting %s from %s using uri: %s", key, kind['namespace'], uri) r = self._session_no_cache.get(uri, headers=hdrs, timeout=( self._config.connect_timeout, self._config.read_timeout)) r.raise_for_status() - flag = r.json() - log.debug("Get one flag response status:[{0}] Flag key:[{1}] version:[{2}]" - .format(r.status_code, key, flag.get("version"))) - return flag + obj = r.json() + log.debug("%s response status:[%d] key:[%s] version:[%d]", + path, r.status_code, key, segment.get("version")) + return obj diff --git a/ldclient/flag.py b/ldclient/flag.py index 38daf973..c10b851c 100644 --- a/ldclient/flag.py +++ b/ldclient/flag.py @@ -5,6 +5,7 @@ import sys from ldclient import operators +from ldclient.versioned_data_kind import FEATURES, SEGMENTS __LONG_SCALE__ = float(0xFFFFFFFFFFFFFFF) @@ -24,18 +25,18 @@ def evaluate(flag, user, store): return _get_off_variation(flag), prereq_events -def _evaluate(flag, user, feature_store, segment_store, prereq_events=None): +def _evaluate(flag, user, feature_store, prereq_events=None): events = prereq_events or [] failed_prereq = None prereq_value = None for prereq in flag.get('prerequisites') or []: - prereq_flag = feature_store.get(prereq.get('key'), lambda x: x) + prereq_flag = feature_store.get(FEATURES, prereq.get('key'), lambda x: x) if prereq_flag is None: log.warn("Missing prereq flag: " + prereq.get('key')) failed_prereq = prereq break if prereq_flag.get('on', False) is True: - prereq_value, events = _evaluate(prereq_flag, user, feature_store, segment_store, events) + prereq_value, events = _evaluate(prereq_flag, user, feature_store, events) variation = _get_variation(prereq_flag, prereq.get('variation')) if prereq_value is None or not prereq_value == variation: failed_prereq = prereq @@ -49,11 +50,11 @@ def _evaluate(flag, user, feature_store, segment_store, prereq_events=None): if failed_prereq is not None: return None, events - index = _evaluate_index(flag, user, segment_store) + index = _evaluate_index(flag, user, feature_store) return _get_variation(flag, index), events -def _evaluate_index(feature, user, segment_store): +def _evaluate_index(feature, user, store): # Check to see if any user targets match: for target in feature.get('targets') or []: for value in target.get('values') or []: @@ -62,7 +63,7 @@ def _evaluate_index(feature, user, segment_store): # Now walk through the rules to see if any match for rule in feature.get('rules') or []: - if _rule_matches_user(rule, user, segment_store): + if _rule_matches_user(rule, user, store): return _variation_index_for_user(feature, rule, user) # Walk through fallthrough and see if it matches @@ -127,21 +128,21 @@ def _bucket_user(user, key, salt, bucket_by): return result -def _rule_matches_user(rule, user, segment_store): +def _rule_matches_user(rule, user, store): for clause in rule.get('clauses') or []: if clause.get('attribute') is not None: - if not _clause_matches_user(clause, user, segment_store): + if not _clause_matches_user(clause, user, store): return False return True -def _clause_matches_user(clause, user, segment_store): +def _clause_matches_user(clause, user, store): if clause.get('op') == 'segmentMatch': for seg_key in clause.get('values') or []: - segment = segment_store.get(seg_key) + segment = store.get(SEGMENTS, seg_key, lambda x: x) if segment and _segment_matches_user(segment, user): - return _maybe_negate(clause, true) - return _maybe_negate(clause, false) + return _maybe_negate(clause, True) + return _maybe_negate(clause, False) else: return _clause_matches_user_no_segments(clause, user) @@ -165,22 +166,22 @@ def _segment_matches_user(segment, user): if user.get('key'): key = user['key'] if key in (segment.get('included') or []): - return true + return True if key in (segment.get('excluded') or []): - return false + return False for rule in segment.get('rules') or []: if _segment_rule_matches_user(rule, user, segment.get('key'), segment.get('salt')): - return true - return false + return True + return False def _segment_rule_matches_user(rule, user, segment_key, salt): for clause in rule.get('clauses') or []: if not _clause_matches_user_no_segments(clause, user): - return false + return False # If the weight is absent, this rule matches if not rule.get('weight'): - return true + return True # All of the clauses are met. See if the user buckets in bucket_by = 'key' if rule.get('bucketBy') is None else rule['bucketBy'] diff --git a/ldclient/in_memory_store.py b/ldclient/in_memory_store.py index 543837ea..05ee785e 100644 --- a/ldclient/in_memory_store.py +++ b/ldclient/in_memory_store.py @@ -1,72 +1,78 @@ -from abc import ABCMeta, abstractmethod - from ldclient.util import log from ldclient.interfaces import FeatureStore, SegmentStore from ldclient.rwlock import ReadWriteLock -class InMemoryStoreBase(object): +class InMemoryFeatureStore(object): """ - Abstract base class for in-memory data stores. + In-memory implementation of a store that holds feature flags and related data received from the streaming API. """ - __metaclass__ = ABCMeta def __init__(self): self._lock = ReadWriteLock() self._initialized = False self._items = {} - def get(self, key, callback): + def get(self, kind, key, callback): try: self._lock.rlock() - item = self._items.get(key) + itemsOfKind = self._items.get(kind, {}) + item = itemsOfKind.get(key) if item is None: - log.debug("Attempted to get missing %s: %s, returning None", self.item_name(), key) + log.debug("Attempted to get missing key %s in '%s', returning None", key, kind.namespace) return callback(None) if 'deleted' in item and item['deleted']: - log.debug("Attempted to get deleted %s: %s, returning None", self.item_name(), key) + log.debug("Attempted to get deleted key %s in '%s', returning None", key, kind.namespace) return callback(None) return callback(item) finally: self._lock.runlock() - def all(self, callback): + def all(self, kind, callback): try: self._lock.rlock() - return callback(dict((k, i) for k, i in self._items.items() if ('deleted' not in i) or not i['deleted'])) + itemsOfKind = self._items.get(kind, {}) + return callback(dict((k, i) for k, i in itemsOfKind.items() if ('deleted' not in i) or not i['deleted'])) finally: self._lock.runlock() - def init(self, items): + def init(self, allData): try: self._lock.lock() - self._items = dict(items) + self._items = dict(allData) self._initialized = True - log.debug("Initialized %s store with %d items", self.item_name(), len(items)) + for k in allData: + log.debug("Initialized '%s' store with %d items", k.namespace, len(allData[k])) finally: self._lock.unlock() # noinspection PyShadowingNames - def delete(self, key, version): + def delete(self, kind, key, version): try: self._lock.lock() - i = self._items.get(key) - if i is not None and i['version'] < version: - i['deleted'] = True - i['version'] = version - elif i is None: + itemsOfKind = self._items.get(kind) + if itemsOfKind is None: + itemsOfKind = dict() + self._items[kind] = itemsOfKind + i = itemsOfKind.get(key) + if i is None or i['version'] < version: i = {'deleted': True, 'version': version} - self._items[key] = i + itemsOfKind[key] = i finally: self._lock.unlock() - def upsert(self, key, item): + def upsert(self, kind, item): + key = item['key'] try: self._lock.lock() - i = self._items.get(key) + itemsOfKind = self._items.get(kind) + if itemsOfKind is None: + itemsOfKind = dict() + self._items[kind] = itemsOfKind + i = itemsOfKind.get(key) if i is None or i['version'] < item['version']: - self._items[key] = item - log.debug("Updated %s %s to version %d", self.item_name(), key, item['version']) + itemsOfKind[key] = item + log.debug("Updated %s in '%s' to version %d", key, kind.namespace, item['version']) finally: self._lock.unlock() @@ -77,25 +83,3 @@ def initialized(self): return self._initialized finally: self._lock.runlock() - - @abstractmethod - def item_name(self): - """ - Returns a description of the kind of item held in this store (feature or segment). - """ - - -class InMemoryFeatureStore(InMemoryStoreBase, FeatureStore): - def __init__(self): - InMemoryStoreBase.__init__(self) - - def item_name(self): - return 'feature' - - -class InMemorySegmentStore(InMemoryStoreBase, SegmentStore): - def __init__(self): - InMemoryStoreBase.__init__(self) - - def item_name(self): - return 'segment' diff --git a/ldclient/polling.py b/ldclient/polling.py index 3e6bec4a..85a25af3 100644 --- a/ldclient/polling.py +++ b/ldclient/polling.py @@ -23,7 +23,8 @@ def run(self): while self._running: start_time = time.time() try: - self._store.init(self._requester.get_all()) + allData = self._requester.get_all_data() + self._store.init(allData) if not self._ready.is_set() is True and self._store.initialized is True: log.info("PollingUpdateProcessor initialized ok") self._ready.set() diff --git a/ldclient/redis_store.py b/ldclient/redis_store.py index 111811dd..864ccd4a 100644 --- a/ldclient/redis_store.py +++ b/ldclient/redis_store.py @@ -7,6 +7,7 @@ from ldclient.expiringdict import ExpiringDict from ldclient.interfaces import FeatureStore from ldclient.memoized_value import MemoizedValue +from ldclient.versioned_data_kind import FEATURES class ForgetfulDict(dict): @@ -22,92 +23,100 @@ def __init__(self, expiration=15, capacity=1000): - self._features_key = "{0}:features".format(prefix) + self._prefix = prefix self._cache = ForgetfulDict() if expiration == 0 else ExpiringDict(max_len=capacity, max_age_seconds=expiration) self._pool = redis.ConnectionPool.from_url(url=url, max_connections=max_connections) self._inited = MemoizedValue(lambda: self._query_init()) log.info("Started RedisFeatureStore connected to URL: " + url + " using prefix: " + prefix) - def init(self, features): - pipe = redis.Redis(connection_pool=self._pool).pipeline() - pipe.delete(self._features_key) + def _items_key(self, kind): + return "{0}:{1}".format(self._prefix, kind.namespace) - self._cache.clear() + def _cache_key(self, kind, key): + return "{0}:{1}".format(kind.namespace, key) - for k, f in features.items(): - f_json = json.dumps(f) - pipe.hset(self._features_key, k, f_json) - self._cache[k] = f + def init(self, allData): + pipe = redis.Redis(connection_pool=self._pool).pipeline() + + self._cache.clear() + all_count = 0 + + for kind, items in allData.items(): + base_key = self._items_key(kind) + pipe.delete(base_key) + for key, item in items.items(): + item_json = json.dumps(item) + pipe.hset(base_key, key, item_json) + self._cache[self._cache_key(kind, key)] = item + all_count = all_count + len(items) pipe.execute() - log.info("Initialized RedisFeatureStore with " + str(len(features)) + " feature flags") + log.info("Initialized RedisFeatureStore with %d items", all_count) self._inited.set(True) - def all(self, callback): + def all(self, kind, callback): r = redis.Redis(connection_pool=self._pool) try: - all_features = r.hgetall(self._features_key) + all_items = r.hgetall(self._items_key(kind)) except BaseException as e: - log.error("RedisFeatureStore: Could not retrieve all flags from Redis with error: " - + e.message + " Returning None") + log.error("RedisFeatureStore: Could not retrieve '%s' from Redis with error: %s. Returning None.", + kind.namespace, e.message) return callback(None) - if all_features is None or all_features is "": - log.warn("RedisFeatureStore: call to get all flags returned no results. Returning None.") + if all_items is None or all_items is "": + log.warn("RedisFeatureStore: call to get all '%s' returned no results. Returning None.", kind.namespace) return callback(None) results = {} - for k, f_json in all_features.items() or {}: - f = json.loads(f_json.decode('utf-8')) - if 'deleted' in f and f['deleted'] is False: - results[f['key']] = f + for key, item_json in all_items.items(): + item = json.loads(item_json.decode('utf-8')) + if item.get('deleted', False) is False: + results[key] = item return callback(results) - def get(self, key, callback=lambda x: x): - f = self._get_even_if_deleted(key) - if f is not None: - if f.get('deleted', False) is True: - log.debug("RedisFeatureStore: get returned deleted flag from Redis. Returning None.") + def get(self, kind, key, callback=lambda x: x): + item = self._get_even_if_deleted(kind, key) + if item is not None: + if item.get('deleted', False) is True: + log.debug("RedisFeatureStore: get returned deleted item %s in '%s'. Returning None.", key, kind.namespace) return callback(None) - return callback(f) + return callback(item) - def _get_even_if_deleted(self, key): - f = self._cache.get(key) - if f is not None: + def _get_even_if_deleted(self, kind, key): + cacheKey = self._cache_key(kind, key) + item = self._cache.get(cacheKey) + if item is not None: # reset ttl - self._cache[key] = f - return f + self._cache[cacheKey] = item + return item try: r = redis.Redis(connection_pool=self._pool) - f_json = r.hget(self._features_key, key) + item_json = r.hget(self._items_key(kind), key) except BaseException as e: - log.error("RedisFeatureStore: Could not retrieve flag from redis with error: " + e.message - + ". Returning None for key: " + key) + log.error("RedisFeatureStore: Could not retrieve key %s from '%s' with error: %s", + key, kind.namespace, e.message) return None - if f_json is None or f_json is "": - log.debug("RedisFeatureStore: feature flag with key: " + key + " not found in Redis. Returning None.") + if item_json is None or item_json is "": + log.debug("RedisFeatureStore: key %s not found in '%s'. Returning None.", key, kind.namespace) return None - f = json.loads(f_json.decode('utf-8')) - self._cache[key] = f - return f + item = json.loads(item_json.decode('utf-8')) + self._cache[cacheKey] = item + return item - def delete(self, key, version): + def delete(self, kind, key, version): r = redis.Redis(connection_pool=self._pool) - r.watch(self._features_key) - f_json = r.hget(self._features_key, key) - if f_json: - f = json.loads(f_json.decode('utf-8')) - if f is not None and f['version'] < version: - f['deleted'] = True - f['version'] = version - elif f is None: - f = {'deleted': True, 'version': version} - f_json = json.dumps(f) - r.hset(self._features_key, key, f_json) - self._cache[key] = f + baseKey = self._items_key(kind) + r.watch(baseKey) + item_json = r.hget(baseKey, key) + item = None if item_json is None else json.loads(item_json.decode('utf-8')) + if item is None or item['version'] < version: + deletedItem = { "deleted": True, "version": version } + item_json = json.dumps(deletedItem) + r.hset(baseKey, key, item_json) + self._cache[self._cache_key(kind, key)] = deletedItem r.unwatch() @property @@ -116,18 +125,20 @@ def initialized(self): def _query_init(self): r = redis.Redis(connection_pool=self._pool) - return r.exists(self._features_key) + return r.exists(self._items_key(FEATURES)) - def upsert(self, key, feature): + def upsert(self, kind, item): r = redis.Redis(connection_pool=self._pool) - r.watch(self._features_key) - old = self._get_even_if_deleted(key) + baseKey = self._items_key(kind) + key = item['key'] + r.watch(baseKey) + old = self._get_even_if_deleted(kind, key) if old: - if old['version'] >= feature['version']: + if old['version'] >= item['version']: r.unwatch() return - feature_json = json.dumps(feature) - r.hset(self._features_key, key, feature_json) - self._cache[key] = feature + item_json = json.dumps(item) + r.hset(baseKey, key, item_json) + self._cache[self._cache_key(kind, key)] = item r.unwatch() diff --git a/ldclient/streaming.py b/ldclient/streaming.py index 0f6a29f3..bbb14d5b 100644 --- a/ldclient/streaming.py +++ b/ldclient/streaming.py @@ -10,17 +10,20 @@ from ldclient.interfaces import UpdateProcessor from ldclient.sse_client import SSEClient from ldclient.util import _stream_headers, log +from ldclient.versioned_data_kind import FEATURES, SEGMENTS # allows for up to 5 minutes to elapse without any data sent across the stream. The heartbeats sent as comments on the # stream will keep this from triggering stream_read_timeout = 5 * 60 +STREAM_ALL_PATH = '/all' + class StreamingUpdateProcessor(Thread, UpdateProcessor): def __init__(self, config, requester, store, ready): Thread.__init__(self) self.daemon = True - self._uri = config.stream_uri + self._uri = config.stream_base_uri + STREAM_ALL_PATH self._config = config self._requester = requester self._store = store @@ -83,34 +86,50 @@ def initialized(self): @staticmethod def process_message(store, requester, msg): if msg.event == 'put': - flags = json.loads(msg.data) - versions_summary = list(map(lambda f: "{0}:{1}".format(f.get("key"), f.get("version")), flags.values())) - log.debug("Received put event with {0} flags and versions: {1}".format(len(flags), versions_summary)) - store.init(flags) + allData = json.loads(msg.data) + initData = { + FEATURES: allData['data']['flags'], + SEGMENTS: allData['data']['segments'] + } + log.debug("Received put event with %d flags and %d segments", + len(initData[FEATURES]), len(initData[SEGMENTS])) + store.init(initData) return True elif msg.event == 'patch': payload = json.loads(msg.data) - key = payload['path'][1:] - flag = payload['data'] - log.debug("Received patch event for flag key: [{0}] New version: [{1}]" - .format(flag.get("key"), str(flag.get("version")))) - store.upsert(key, flag) + path = payload['path'] + obj = payload['data'] + log.debug("Received patch event for %s, New version: [%d]", path, obj.get("version")) + for kind in [FEATURES, SEGMENTS]: + key = _get_key_from_path(kind, path) + if key: + store.upsert(kind, obj) elif msg.event == "indirect/patch": - key = msg.data - log.debug("Received indirect/patch event for flag key: " + key) - store.upsert(key, requester.get_one(key)) + path = msg.data + log.debug("Received indirect/patch event for %s", path) + for kind in [FEATURES, SEGMENTS]: + key = _get_key_from_path(kind, path) + if key: + store.upsert(kind, requester.get_one(kind, key)) elif msg.event == "indirect/put": log.debug("Received indirect/put event") - store.init(requester.get_all()) + store.init(requester.get_all_data()) return True elif msg.event == 'delete': payload = json.loads(msg.data) - key = payload['path'][1:] + path = payload['path'] # noinspection PyShadowingNames version = payload['version'] - log.debug("Received delete event for flag key: [{0}] New version: [{1}]" - .format(key, version)) - store.delete(key, version) + log.debug("Received delete event for %s, New version: [%d]", path, version) + for kind in [FEATURES, SEGMENTS]: + key = _get_key_from_path(kind, path) + if key: + store.delete(kind, key, version) else: log.warning('Unhandled event in stream processor: ' + msg.event) return False + + def _get_key_from_path(self, kind, path): + if path.startsWith(kind.stream_api_path): + return path.substring(len(kind.stream_api_path)) + return None diff --git a/ldclient/versioned_data_kind.py b/ldclient/versioned_data_kind.py new file mode 100644 index 00000000..716bd222 --- /dev/null +++ b/ldclient/versioned_data_kind.py @@ -0,0 +1,34 @@ + + +""" +These objects denote the types of data that can be stored in the feature store and +referenced in the API. If we add another storable data type in the future, as long as it +follows the same pattern (having "key", "version", and "deleted" properties), we only need +to add a corresponding constant here and the existing store should be able to handle it. +""" + +class VersionedDataKind(object): + def __init__(self, namespace, request_api_path, stream_api_path): + self.__namespace = namespace + self.__request_api_path = request_api_path + self.__stream_api_path = stream_api_path + + @property + def namespace(self): + return self.__namespace + + @property + def request_api_path(self): + return self.__request_api_path + + @property + def stream_api_path(self): + return self.__stream_api_path + +FEATURES = VersionedDataKind("features", + "/sdk/latest-flags", + "/flags/") + +SEGMENTS = VersionedDataKind("segments", + "/sdk/latest-segments", + "/segments/") diff --git a/testing/test_feature_store.py b/testing/test_feature_store.py index 4dbaf8bb..793b74d7 100644 --- a/testing/test_feature_store.py +++ b/testing/test_feature_store.py @@ -3,6 +3,7 @@ from ldclient.in_memory_store import InMemoryFeatureStore from ldclient.redis_store import RedisFeatureStore +from ldclient.versioned_data_kind import FEATURES class TestFeatureStore: @@ -51,14 +52,13 @@ def make_feature(key, ver): def base_initialized_store(self, store): store.init({ - 'foo': self.make_feature('foo', 10), - 'bar': self.make_feature('bar', 10), + FEATURES: { + 'foo': self.make_feature('foo', 10), + 'bar': self.make_feature('bar', 10), + } }) return store - def test_not_initially_initialized(self, store): - assert store.initialized is False - def test_initialized(self, store): store = self.base_initialized_store(store) assert store.initialized is True @@ -66,50 +66,57 @@ def test_initialized(self, store): def test_get_existing_feature(self, store): store = self.base_initialized_store(store) expected = self.make_feature('foo', 10) - assert store.get('foo', lambda x: x) == expected + assert store.get(FEATURES, 'foo', lambda x: x) == expected def test_get_nonexisting_feature(self, store): store = self.base_initialized_store(store) - assert store.get('biz', lambda x: x) is None + assert store.get(FEATURES, 'biz', lambda x: x) is None + + def test_get_all_versions(self, store): + store = self.base_initialized_store(store) + result = store.all(FEATURES, lambda x: x) + assert len(result) is 2 + assert result.get('foo') == self.make_feature('foo', 10) + assert result.get('bar') == self.make_feature('bar', 10) def test_upsert_with_newer_version(self, store): store = self.base_initialized_store(store) new_ver = self.make_feature('foo', 11) - store.upsert('foo', new_ver) - assert store.get('foo', lambda x: x) == new_ver + store.upsert(FEATURES, new_ver) + assert store.get(FEATURES, 'foo', lambda x: x) == new_ver def test_upsert_with_older_version(self, store): store = self.base_initialized_store(store) new_ver = self.make_feature('foo', 9) expected = self.make_feature('foo', 10) - store.upsert('foo', new_ver) - assert store.get('foo', lambda x: x) == expected + store.upsert(FEATURES, new_ver) + assert store.get(FEATURES, 'foo', lambda x: x) == expected def test_upsert_with_new_feature(self, store): store = self.base_initialized_store(store) new_ver = self.make_feature('biz', 1) - store.upsert('biz', new_ver) - assert store.get('biz', lambda x: x) == new_ver + store.upsert(FEATURES, new_ver) + assert store.get(FEATURES, 'biz', lambda x: x) == new_ver def test_delete_with_newer_version(self, store): store = self.base_initialized_store(store) - store.delete('foo', 11) - assert store.get('foo', lambda x: x) is None + store.delete(FEATURES, 'foo', 11) + assert store.get(FEATURES, 'foo', lambda x: x) is None def test_delete_unknown_feature(self, store): store = self.base_initialized_store(store) - store.delete('biz', 11) - assert store.get('biz', lambda x: x) is None + store.delete(FEATURES, 'biz', 11) + assert store.get(FEATURES, 'biz', lambda x: x) is None def test_delete_with_older_version(self, store): store = self.base_initialized_store(store) - store.delete('foo', 9) + store.delete(FEATURES, 'foo', 9) expected = self.make_feature('foo', 10) - assert store.get('foo', lambda x: x) == expected + assert store.get(FEATURES, 'foo', lambda x: x) == expected def test_upsert_older_version_after_delete(self, store): store = self.base_initialized_store(store) - store.delete('foo', 11) + store.delete(FEATURES, 'foo', 11) old_ver = self.make_feature('foo', 9) - store.upsert('foo', old_ver) - assert store.get('foo', lambda x: x) is None + store.upsert(FEATURES, old_ver) + assert store.get(FEATURES, 'foo', lambda x: x) is None From 7ae9b3ae975c5f03a3275f305fa16fd02877f1f8 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 13:35:58 -0800 Subject: [PATCH 03/26] unit tests, misc cleanup --- ldclient/flag.py | 8 +-- testing/test_segment.py | 118 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 4 deletions(-) create mode 100644 testing/test_segment.py diff --git a/ldclient/flag.py b/ldclient/flag.py index c10b851c..4d2cbd49 100644 --- a/ldclient/flag.py +++ b/ldclient/flag.py @@ -165,11 +165,11 @@ def _clause_matches_user_no_segments(clause, user): def _segment_matches_user(segment, user): if user.get('key'): key = user['key'] - if key in (segment.get('included') or []): + if key in segment.get('included', []): return True - if key in (segment.get('excluded') or []): + if key in segment.get('excluded', []): return False - for rule in segment.get('rules') or []: + for rule in segment.get('rules', []): if _segment_rule_matches_user(rule, user, segment.get('key'), segment.get('salt')): return True return False @@ -180,7 +180,7 @@ def _segment_rule_matches_user(rule, user, segment_key, salt): return False # If the weight is absent, this rule matches - if not rule.get('weight'): + if not 'weight' in rule: return True # All of the clauses are met. See if the user buckets in diff --git a/testing/test_segment.py b/testing/test_segment.py new file mode 100644 index 00000000..785d2c5a --- /dev/null +++ b/testing/test_segment.py @@ -0,0 +1,118 @@ +import pytest + +from ldclient.flag import _segment_matches_user + + +def test_explicit_include_user(): + s = { + "key": "test", + "included": [ "foo" ], + "version": 1 + } + u = { "key": "foo" } + assert _segment_matches_user(s, u) is True + +def test_explicit_exclude_user(): + s = { + "key": "test", + "excluded": [ "foo" ], + "version": 1 + } + u = { "key": "foo" } + assert _segment_matches_user(s, u) is False + +def test_explicit_include_has_precedence(): + s = { + "key": "test", + "included": [ "foo" ], + "excluded": [ "foo" ], + "version": 1 + } + u = { "key": "foo" } + assert _segment_matches_user(s, u) is True + +def test_matching_rule_with_full_rollout(): + s = { + "key": "test", + "rules": [ + { + "clauses": [ + { + "attribute": "email", + "op": "in", + "values": [ "test@example.com" ] + } + ], + "weight": 100000 + } + ] + } + u = { "key": "foo", "email": "test@example.com" } + assert _segment_matches_user(s, u) is True + +def test_matching_rule_with_zero_rollout(): + s = { + "key": "test", + "rules": [ + { + "clauses": [ + { + "attribute": "email", + "op": "in", + "values": [ "test@example.com" ] + } + ], + "weight": 0 + } + ] + } + u = { "key": "foo", "email": "test@example.com" } + assert _segment_matches_user(s, u) is False + +def test_matching_rule_with_multiple_clauses(): + s = { + "key": "test", + "rules": [ + { + "clauses": [ + { + "attribute": "email", + "op": "in", + "values": [ "test@example.com" ] + }, + { + "attribute": "name", + "op": "in", + "values": [ "bob" ] + } + ], + "weight": 100000 + } + ] + } + u = { "key": "foo", "email": "test@example.com", "name": "bob" } + assert _segment_matches_user(s, u) is True + +def test_non_matching_rule_with_multiple_clauses(): + s = { + "key": "test", + "rules": [ + { + "clauses": [ + { + "attribute": "email", + "op": "in", + "values": [ "test@example.com" ] + }, + { + "attribute": "name", + "op": "in", + "values": [ "bill" ] + } + ], + "weight": 100000 + } + ] + } + u = { "key": "foo", "email": "test@example.com", "name": "bob" } + assert _segment_matches_user(s, u) is False From 6aaa7e89d326784595d632cd8f63398630f72a92 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Thu, 1 Feb 2018 13:44:57 -0800 Subject: [PATCH 04/26] undo renaming of modules --- ldclient/config.py | 2 +- ldclient/{in_memory_store.py => feature_store.py} | 0 ldclient/{redis_store.py => redis_feature_store.py} | 0 ldclient/twisted_redis_feature_store.py | 2 +- testing/test_feature_store.py | 4 ++-- testing/test_ldclient.py | 2 +- 6 files changed, 5 insertions(+), 5 deletions(-) rename ldclient/{in_memory_store.py => feature_store.py} (100%) rename ldclient/{redis_store.py => redis_feature_store.py} (100%) diff --git a/ldclient/config.py b/ldclient/config.py index a557ccdf..8abd96a8 100644 --- a/ldclient/config.py +++ b/ldclient/config.py @@ -1,5 +1,5 @@ from ldclient.event_consumer import EventConsumerImpl -from ldclient.in_memory_store import InMemoryFeatureStore +from ldclient.feature_store import InMemoryFeatureStore from ldclient.util import log GET_LATEST_FEATURES_PATH = '/sdk/latest-flags' diff --git a/ldclient/in_memory_store.py b/ldclient/feature_store.py similarity index 100% rename from ldclient/in_memory_store.py rename to ldclient/feature_store.py diff --git a/ldclient/redis_store.py b/ldclient/redis_feature_store.py similarity index 100% rename from ldclient/redis_store.py rename to ldclient/redis_feature_store.py diff --git a/ldclient/twisted_redis_feature_store.py b/ldclient/twisted_redis_feature_store.py index 43c34d94..de2566ed 100644 --- a/ldclient/twisted_redis_feature_store.py +++ b/ldclient/twisted_redis_feature_store.py @@ -9,7 +9,7 @@ from ldclient.expiringdict import ExpiringDict from ldclient.interfaces import FeatureStore -from ldclient.redis_store import ForgetfulDict, INIT_KEY +from ldclient.redis_feature_store import ForgetfulDict, INIT_KEY from ldclient.util import log diff --git a/testing/test_feature_store.py b/testing/test_feature_store.py index 793b74d7..cabc40df 100644 --- a/testing/test_feature_store.py +++ b/testing/test_feature_store.py @@ -1,8 +1,8 @@ import pytest import redis -from ldclient.in_memory_store import InMemoryFeatureStore -from ldclient.redis_store import RedisFeatureStore +from ldclient.feature_store import InMemoryFeatureStore +from ldclient.redis_feature_store import RedisFeatureStore from ldclient.versioned_data_kind import FEATURES diff --git a/testing/test_ldclient.py b/testing/test_ldclient.py index ecd96d89..b6585362 100644 --- a/testing/test_ldclient.py +++ b/testing/test_ldclient.py @@ -1,6 +1,6 @@ from builtins import object from ldclient.client import LDClient, Config -from ldclient.in_memory_store import InMemoryFeatureStore +from ldclient.feature_store import InMemoryFeatureStore from ldclient.interfaces import FeatureRequester, FeatureStore import pytest from testing.sync_util import wait_until From 55baede29643b98b35a9f159f6a4ad23c854e47c Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 5 Feb 2018 17:24:29 -0800 Subject: [PATCH 05/26] more test coverage --- testing/test_flag.py | 62 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 testing/test_flag.py diff --git a/testing/test_flag.py b/testing/test_flag.py new file mode 100644 index 00000000..d2c56c45 --- /dev/null +++ b/testing/test_flag.py @@ -0,0 +1,62 @@ +import pytest + +from ldclient.feature_store import InMemoryFeatureStore +from ldclient.flag import evaluate +from ldclient.versioned_data_kind import SEGMENTS + + +def test_segment_match_clause_retrieves_segment_from_store(): + store = InMemoryFeatureStore() + segment = { + "key": "segkey", + "included": [ "foo" ], + "version": 1 + } + store.upsert(SEGMENTS, segment) + + user = { "key": "foo" } + flag = { + "key": "test", + "variations": [ False, True ], + "fallthrough": { "variation": 0 }, + "on": True, + "rules": [ + { + "clauses": [ + { + "attribute": "", + "op": "segmentMatch", + "values": [ "segkey" ] + } + ], + "variation": 1 + } + ] + } + + assert evaluate(flag, user, store) == (True, []) + +def test_segment_match_clause_falls_through_with_no_errors_if_segment_not_found(): + store = InMemoryFeatureStore() + + user = { "key": "foo" } + flag = { + "key": "test", + "variations": [ False, True ], + "fallthrough": { "variation": 0 }, + "on": True, + "rules": [ + { + "clauses": [ + { + "attribute": "", + "op": "segmentMatch", + "values": [ "segkey" ] + } + ], + "variation": 1 + } + ] + } + + assert evaluate(flag, user, store) == (False, []) From 983ae6007b5adb08268794363aa805f05ad71ece Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Tue, 6 Feb 2018 12:10:28 -0800 Subject: [PATCH 06/26] misc cleanup --- ldclient/feature_requester.py | 6 +- ldclient/feature_store.py | 22 +++--- ldclient/flag.py | 8 +-- ldclient/interfaces.py | 114 ++++++++------------------------ ldclient/polling.py | 4 +- ldclient/redis_feature_store.py | 17 +++-- ldclient/streaming.py | 12 ++-- 7 files changed, 63 insertions(+), 120 deletions(-) diff --git a/ldclient/feature_requester.py b/ldclient/feature_requester.py index 96106793..c29d4d79 100644 --- a/ldclient/feature_requester.py +++ b/ldclient/feature_requester.py @@ -27,12 +27,12 @@ def get_all_data(self): self._config.connect_timeout, self._config.read_timeout)) r.raise_for_status() - allData = r.json() + all_data = r.json() log.debug("Get All flags response status:[%d] From cache?[%s] ETag:[%s]", r.status_code, r.from_cache, r.headers.get('ETag')) return { - FEATURES: allData['flags'], - SEGMENTS: allData['segments'] + FEATURES: all_data['flags'], + SEGMENTS: all_data['segments'] } def get_one(self, kind, key): diff --git a/ldclient/feature_store.py b/ldclient/feature_store.py index 05ee785e..547c9a51 100644 --- a/ldclient/feature_store.py +++ b/ldclient/feature_store.py @@ -1,5 +1,5 @@ from ldclient.util import log -from ldclient.interfaces import FeatureStore, SegmentStore +from ldclient.interfaces import FeatureStore from ldclient.rwlock import ReadWriteLock @@ -36,20 +36,20 @@ def all(self, kind, callback): finally: self._lock.runlock() - def init(self, allData): + def init(self, all_data): try: - self._lock.lock() - self._items = dict(allData) + self._lock.rlock() + self._items = dict(all_data) self._initialized = True - for k in allData: - log.debug("Initialized '%s' store with %d items", k.namespace, len(allData[k])) + for k in all_data: + log.debug("Initialized '%s' store with %d items", k.namespace, len(all_data[k])) finally: - self._lock.unlock() + self._lock.runlock() # noinspection PyShadowingNames def delete(self, kind, key, version): try: - self._lock.lock() + self._lock.rlock() itemsOfKind = self._items.get(kind) if itemsOfKind is None: itemsOfKind = dict() @@ -59,12 +59,12 @@ def delete(self, kind, key, version): i = {'deleted': True, 'version': version} itemsOfKind[key] = i finally: - self._lock.unlock() + self._lock.runlock() def upsert(self, kind, item): key = item['key'] try: - self._lock.lock() + self._lock.rlock() itemsOfKind = self._items.get(kind) if itemsOfKind is None: itemsOfKind = dict() @@ -74,7 +74,7 @@ def upsert(self, kind, item): itemsOfKind[key] = item log.debug("Updated %s in '%s' to version %d", key, kind.namespace, item['version']) finally: - self._lock.unlock() + self._lock.runlock() @property def initialized(self): diff --git a/ldclient/flag.py b/ldclient/flag.py index 4d2cbd49..ffc14d34 100644 --- a/ldclient/flag.py +++ b/ldclient/flag.py @@ -140,7 +140,7 @@ def _clause_matches_user(clause, user, store): if clause.get('op') == 'segmentMatch': for seg_key in clause.get('values') or []: segment = store.get(SEGMENTS, seg_key, lambda x: x) - if segment and _segment_matches_user(segment, user): + if segment is not None and _segment_matches_user(segment, user): return _maybe_negate(clause, True) return _maybe_negate(clause, False) else: @@ -163,8 +163,8 @@ def _clause_matches_user_no_segments(clause, user): return _maybe_negate(clause, _match_any(op_fn, u_value, clause.get('values') or [])) def _segment_matches_user(segment, user): - if user.get('key'): - key = user['key'] + key = user.get('key') + if key is not None: if key in segment.get('included', []): return True if key in segment.get('excluded', []): @@ -180,7 +180,7 @@ def _segment_rule_matches_user(rule, user, segment_key, salt): return False # If the weight is absent, this rule matches - if not 'weight' in rule: + if 'weight' not in rule: return True # All of the clauses are met. See if the user buckets in diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index 7e505f1d..af1caa86 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -3,18 +3,20 @@ class FeatureStore(object): """ - Stores and retrieves the state of feature flags + Stores and retrieves the state of feature flags and related data """ __metaclass__ = ABCMeta @abstractmethod - def get(self, key, callback): + def get(self, kind, key, callback): """ Gets a feature and calls the callback with the feature data to return the result - :param key: The feature key + :param kind: Denotes which collection to access - one of the constants in versioned_data_kind + :type kind: VersionedDataKind + :param key: The key of the object :type key: str - :param callback: The function that accepts the feature data and returns the feature value - :type callback: Function that processes the feature flag once received. + :param callback: The function that accepts the retrieved data and returns a transformed value + :type callback: Function that processes the retrieved object once received. :return: The result of executing callback. """ @@ -22,105 +24,43 @@ def get(self, key, callback): def all(self, callback): """ Returns all feature flags and their data - :param callback: The function that accepts the feature data and returns the feature value - :type callback: Function that processes the feature flags once received. + :param kind: Denotes which collection to access - one of the constants in versioned_data_kind + :type kind: VersionedDataKind + :param callback: The function that accepts the retrieved data and returns a transformed value + :type callback: Function that processes the retrieved objects once received. :rtype: The result of executing callback. """ @abstractmethod - def init(self, features): + def init(self, all_data): """ - Initializes the store with a set of feature flags. Meant to be called by the UpdateProcessor + Initializes the store with a set of objects. Meant to be called by the UpdateProcessor - :param features: The features and their data as provided by LD - :type features: dict[str, dict] + :param all_data: The features and their data as provided by LD + :type all_data: dict[VersionedDataKind, dict[str, dict]] """ @abstractmethod - def delete(self, key, version): + def delete(self, kind, key, version): """ - Marks a feature flag as deleted + Marks an object as deleted - :param key: The feature flag key + :param kind: Denotes which collection to access - one of the constants in versioned_data_kind + :type kind: VersionedDataKind + :param key: The object key :type key: str - :param version: The version of the flag to mark as deleted - :type version: str + :param version: The version of the object to mark as deleted + :type version: int """ @abstractmethod - def upsert(self, key, feature): + def upsert(self, kind, item): """ - Inserts a feature flag if its version is newer or missing + Inserts an object if its version is newer or missing - :param key: The feature flag key - :type key: str - :param feature: The feature information - :type feature: dict - """ - - @abstractproperty - def initialized(self): - """ - Returns whether the store has been initialized yet or not - - :rtype: bool - """ - - -class SegmentStore(object): - """ - Stores and retrieves the state of user segments - """ - __metaclass__ = ABCMeta - - @abstractmethod - def get(self, key, callback): - """ - Gets a segment and calls the callback with the segment data to return the result - :param key: The segment key - :type key: str - :param callback: The function that accepts the segment data and returns the segment value - :type callback: Function that processes the segment flag once received. - :return: The result of executing callback. - """ - - @abstractmethod - def all(self, callback): - """ - Returns all user segments and their data - :param callback: The function that accepts the segment data - :type callback: Function that processes the segments once received. - :rtype: The result of executing callback. - """ - - @abstractmethod - def init(self, features): - """ - Initializes the store with a set of user segments. Meant to be called by the UpdateProcessor - - :param features: The segments and their data as provided by LD - :type features: dict[str, dict] - """ - - @abstractmethod - def delete(self, key, version): - """ - Marks a segment as deleted - - :param key: The segment key - :type key: str - :param version: The version of the segment to mark as deleted - :type version: str - """ - - @abstractmethod - def upsert(self, key, feature): - """ - Inserts a segment if its version is newer or missing - - :param key: The segment key - :type key: str - :param feature: The segment information + :param kind: Denotes which collection to access - one of the constants in versioned_data_kind + :type kind: VersionedDataKind + :param item: The object to be inserted or updated - must have key and version properties :type feature: dict """ diff --git a/ldclient/polling.py b/ldclient/polling.py index 85a25af3..4b71f668 100644 --- a/ldclient/polling.py +++ b/ldclient/polling.py @@ -23,8 +23,8 @@ def run(self): while self._running: start_time = time.time() try: - allData = self._requester.get_all_data() - self._store.init(allData) + all_data = self._requester.get_all_data() + self._store.init(all_data) if not self._ready.is_set() is True and self._store.initialized is True: log.info("PollingUpdateProcessor initialized ok") self._ready.set() diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py index 864ccd4a..f3850cbe 100644 --- a/ldclient/redis_feature_store.py +++ b/ldclient/redis_feature_store.py @@ -36,13 +36,13 @@ def _items_key(self, kind): def _cache_key(self, kind, key): return "{0}:{1}".format(kind.namespace, key) - def init(self, allData): + def init(self, all_data): pipe = redis.Redis(connection_pool=self._pool).pipeline() self._cache.clear() all_count = 0 - for kind, items in allData.items(): + for kind, items in all_data.items(): base_key = self._items_key(kind) pipe.delete(base_key) for key, item in items.items(): @@ -50,7 +50,11 @@ def init(self, allData): pipe.hset(base_key, key, item_json) self._cache[self._cache_key(kind, key)] = item all_count = all_count + len(items) - pipe.execute() + try: + pipe.execute() + except: + self._cache.clear() + raise log.info("Initialized RedisFeatureStore with %d items", all_count) self._inited.set(True) @@ -76,10 +80,9 @@ def all(self, kind, callback): def get(self, kind, key, callback=lambda x: x): item = self._get_even_if_deleted(kind, key) - if item is not None: - if item.get('deleted', False) is True: - log.debug("RedisFeatureStore: get returned deleted item %s in '%s'. Returning None.", key, kind.namespace) - return callback(None) + if item is not None and item.get('deleted', False) is True: + log.debug("RedisFeatureStore: get returned deleted item %s in '%s'. Returning None.", key, kind.namespace) + return callback(None) return callback(item) def _get_even_if_deleted(self, kind, key): diff --git a/ldclient/streaming.py b/ldclient/streaming.py index bbb14d5b..0c9cf640 100644 --- a/ldclient/streaming.py +++ b/ldclient/streaming.py @@ -86,14 +86,14 @@ def initialized(self): @staticmethod def process_message(store, requester, msg): if msg.event == 'put': - allData = json.loads(msg.data) - initData = { - FEATURES: allData['data']['flags'], - SEGMENTS: allData['data']['segments'] + all_data = json.loads(msg.data) + init_data = { + FEATURES: all_data['data']['flags'], + SEGMENTS: all_data['data']['segments'] } log.debug("Received put event with %d flags and %d segments", - len(initData[FEATURES]), len(initData[SEGMENTS])) - store.init(initData) + len(init_data[FEATURES]), len(init_data[SEGMENTS])) + store.init(init_data) return True elif msg.event == 'patch': payload = json.loads(msg.data) From 21b07ba944e90a0ee89675d43c93b887fbe2716b Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Tue, 6 Feb 2018 12:16:59 -0800 Subject: [PATCH 07/26] cleaner path-parsing logic --- ldclient/streaming.py | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/ldclient/streaming.py b/ldclient/streaming.py index 0c9cf640..541f64b5 100644 --- a/ldclient/streaming.py +++ b/ldclient/streaming.py @@ -100,17 +100,19 @@ def process_message(store, requester, msg): path = payload['path'] obj = payload['data'] log.debug("Received patch event for %s, New version: [%d]", path, obj.get("version")) - for kind in [FEATURES, SEGMENTS]: - key = _get_key_from_path(kind, path) - if key: - store.upsert(kind, obj) + target = _parse_path(path) + if target: + store.upsert(target[0], obj) + else: + log.warning("Patch for unknown path: %s", path) elif msg.event == "indirect/patch": path = msg.data log.debug("Received indirect/patch event for %s", path) - for kind in [FEATURES, SEGMENTS]: - key = _get_key_from_path(kind, path) - if key: - store.upsert(kind, requester.get_one(kind, key)) + target = _parse_path(path) + if target: + store.upsert(target[0], requester.get_one(target[0], target[1])) + else: + log.warning("Indirect patch for unknown path: %s", path) elif msg.event == "indirect/put": log.debug("Received indirect/put event") store.init(requester.get_all_data()) @@ -121,15 +123,17 @@ def process_message(store, requester, msg): # noinspection PyShadowingNames version = payload['version'] log.debug("Received delete event for %s, New version: [%d]", path, version) - for kind in [FEATURES, SEGMENTS]: - key = _get_key_from_path(kind, path) - if key: - store.delete(kind, key, version) + target = _parse_path(path) + if target: + store.delete(target[0], target[1], version) + else: + log.warning("Delete for unknown path: %s", path) else: log.warning('Unhandled event in stream processor: ' + msg.event) return False - def _get_key_from_path(self, kind, path): - if path.startsWith(kind.stream_api_path): - return path.substring(len(kind.stream_api_path)) + def _parse_path(self, path): + for kind in [FEATURES, SEGMENTS]: + if path.startsWith(kind.stream_api_path): + return (kind, path.substring(len(kind.stream_api_path))) return None From 796a1fcde732df591b7ce6681e3d7a2b8f30dd33 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Tue, 6 Feb 2018 12:21:08 -0800 Subject: [PATCH 08/26] InMemoryFeatureStore should implement FeatureStore --- ldclient/feature_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ldclient/feature_store.py b/ldclient/feature_store.py index 547c9a51..34701c3e 100644 --- a/ldclient/feature_store.py +++ b/ldclient/feature_store.py @@ -3,7 +3,7 @@ from ldclient.rwlock import ReadWriteLock -class InMemoryFeatureStore(object): +class InMemoryFeatureStore(FeatureStore): """ In-memory implementation of a store that holds feature flags and related data received from the streaming API. """ From 745b3b928816993364078a2a96303403ff72df18 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Tue, 6 Feb 2018 14:11:12 -0800 Subject: [PATCH 09/26] add more unit test coverage of flag evals --- testing/test_flag.py | 123 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 122 insertions(+), 1 deletion(-) diff --git a/testing/test_flag.py b/testing/test_flag.py index d2c56c45..2d5435a3 100644 --- a/testing/test_flag.py +++ b/testing/test_flag.py @@ -2,9 +2,130 @@ from ldclient.feature_store import InMemoryFeatureStore from ldclient.flag import evaluate -from ldclient.versioned_data_kind import SEGMENTS +from ldclient.versioned_data_kind import FEATURES, SEGMENTS +def test_flag_returns_off_variation_if_flag_is_off(): + store = InMemoryFeatureStore() + flag = { + 'key': 'feature', + 'on': False, + 'offVariation': 1, + 'fallthrough': { 'variation': 0 }, + 'variations': ['a', 'b', 'c'] + } + user = { 'key': 'x' } + assert evaluate(flag, user, store) == ('b', []) + +def test_flag_returns_none_if_flag_is_off_and_off_variation_is_unspecified(): + store = InMemoryFeatureStore() + flag = { + 'key': 'feature', + 'on': False, + 'fallthrough': { 'variation': 0 }, + 'variations': ['a', 'b', 'c'] + } + user = { 'key': 'x' } + assert evaluate(flag, user, store) == (None, []) + +def test_flag_returns_off_variation_if_prerequisite_not_found(): + store = InMemoryFeatureStore() + flag = { + 'key': 'feature0', + 'on': True, + 'prerequisites': [{'key': 'badfeature', 'variation': 1}], + 'fallthrough': { 'variation': 0 }, + 'offVariation': 1, + 'variations': ['a', 'b', 'c'] + } + user = { 'key': 'x' } + assert evaluate(flag, user, store) == ('b', []) + +def test_flag_returns_off_variation_and_event_if_prerequisite_is_not_met(): + store = InMemoryFeatureStore() + flag = { + 'key': 'feature0', + 'on': True, + 'prerequisites': [{'key': 'feature1', 'variation': 1}], + 'fallthrough': { 'variation': 0 }, + 'offVariation': 1, + 'variations': ['a', 'b', 'c'], + 'version': 1 + } + flag1 = { + 'key': 'feature1', + 'on': True, + 'fallthrough': { 'variation': 0 }, + 'variations': ['d', 'e'], + 'version': 2 + } + store.upsert(FEATURES, flag1) + user = { 'key': 'x' } + events_should_be = [{'kind': 'feature', 'key': 'feature1', 'value': 'd', 'version': 2, + 'user': user, 'prereqOf': 'feature0'}] + assert evaluate(flag, user, store) == ('b', events_should_be) + +def test_flag_returns_fallthrough_and_event_if_prereq_is_met_and_there_are_no_rules(): + store = InMemoryFeatureStore() + flag = { + 'key': 'feature0', + 'on': True, + 'prerequisites': [{ 'key': 'feature1', 'variation': 1 }], + 'fallthrough': { 'variation': 0 }, + 'offVariation': 1, + 'variations': ['a', 'b', 'c'], + 'version': 1 + } + flag1 = { + 'key': 'feature1', + 'on': True, + 'fallthrough': { 'variation': 1 }, + 'variations': ['d', 'e'], + 'version': 2 + } + store.upsert(FEATURES, flag1) + user = { 'key': 'x' } + events_should_be = [{'kind': 'feature', 'key': 'feature1', 'value': 'e', 'version': 2, + 'user': user, 'prereqOf': 'feature0'}] + assert evaluate(flag, user, store) == ('a', events_should_be) + +def test_flag_matches_user_from_targets(): + store = InMemoryFeatureStore() + flag = { + 'key': 'feature0', + 'on': True, + 'targets': [{ 'values': ['whoever', 'userkey'], 'variation': 2 }], + 'fallthrough': { 'variation': 0 }, + 'offVariation': 1, + 'variations': ['a', 'b', 'c'] + } + user = { 'key': 'userkey' } + assert evaluate(flag, user, store) == ('c', []) + +def test_flag_matches_user_from_rules(): + store = InMemoryFeatureStore() + flag = { + 'key': 'feature0', + 'on': True, + 'rules': [ + { + 'clauses': [ + { + 'attribute': 'key', + 'op': 'in', + 'values': [ 'userkey' ] + } + ], + 'variation': 2 + } + ], + 'fallthrough': { 'variation': 0 }, + 'offVariation': 1, + 'variations': ['a', 'b', 'c'] + } + user = { 'key': 'userkey' } + assert evaluate(flag, user, store) == ('c', []) + def test_segment_match_clause_retrieves_segment_from_store(): store = InMemoryFeatureStore() segment = { From f03aaa1398cc7dbebd14a22d9038789f10de19d0 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Tue, 6 Feb 2018 14:11:31 -0800 Subject: [PATCH 10/26] fix bug in flag evals - putting wrong flag in "prereqOf" --- ldclient/flag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ldclient/flag.py b/ldclient/flag.py index ffc14d34..1fe81529 100644 --- a/ldclient/flag.py +++ b/ldclient/flag.py @@ -44,7 +44,7 @@ def _evaluate(flag, user, feature_store, prereq_events=None): failed_prereq = prereq event = {'kind': 'feature', 'key': prereq.get('key'), 'user': user, - 'value': prereq_value, 'version': prereq_flag.get('version'), 'prereqOf': prereq.get('key')} + 'value': prereq_value, 'version': prereq_flag.get('version'), 'prereqOf': flag.get('key')} events.append(event) if failed_prereq is not None: From d245ef202d636e8596efcc346df96c569b321c03 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Tue, 6 Feb 2018 14:41:39 -0800 Subject: [PATCH 11/26] use namedtuple --- ldclient/versioned_data_kind.py | 33 +++++++++------------------------ 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/ldclient/versioned_data_kind.py b/ldclient/versioned_data_kind.py index 716bd222..6df96a32 100644 --- a/ldclient/versioned_data_kind.py +++ b/ldclient/versioned_data_kind.py @@ -1,4 +1,4 @@ - +from collections import namedtuple """ These objects denote the types of data that can be stored in the feature store and @@ -7,28 +7,13 @@ to add a corresponding constant here and the existing store should be able to handle it. """ -class VersionedDataKind(object): - def __init__(self, namespace, request_api_path, stream_api_path): - self.__namespace = namespace - self.__request_api_path = request_api_path - self.__stream_api_path = stream_api_path - - @property - def namespace(self): - return self.__namespace - - @property - def request_api_path(self): - return self.__request_api_path - - @property - def stream_api_path(self): - return self.__stream_api_path +VersionedDataKind = namedtuple('VersionedDataKind', + ['namespace', 'request_api_path', 'stream_api_path']) -FEATURES = VersionedDataKind("features", - "/sdk/latest-flags", - "/flags/") +FEATURES = VersionedDataKind(namespace = "features", + request_api_path = "/sdk/latest-flags", + stream_api_path = "/flags/") -SEGMENTS = VersionedDataKind("segments", - "/sdk/latest-segments", - "/segments/") +SEGMENTS = VersionedDataKind(namespace = "segments", + request_api_path = "/sdk/latest-segments", + stream_api_path = "/segments/") From 8fdfd409ce08b73f4f733bd88bc77c318adfd216 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Tue, 6 Feb 2018 15:30:42 -0800 Subject: [PATCH 12/26] use namedtuple again --- ldclient/streaming.py | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/ldclient/streaming.py b/ldclient/streaming.py index 541f64b5..86b1ddd1 100644 --- a/ldclient/streaming.py +++ b/ldclient/streaming.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +from collections import namedtuple import json from threading import Thread @@ -18,6 +19,8 @@ STREAM_ALL_PATH = '/all' +KindAndKey = namedtuple('KindAndKey', ['kind', 'key']) + class StreamingUpdateProcessor(Thread, UpdateProcessor): def __init__(self, config, requester, store, ready): @@ -101,16 +104,16 @@ def process_message(store, requester, msg): obj = payload['data'] log.debug("Received patch event for %s, New version: [%d]", path, obj.get("version")) target = _parse_path(path) - if target: - store.upsert(target[0], obj) + if target is not None: + store.upsert(target.kind, obj) else: log.warning("Patch for unknown path: %s", path) elif msg.event == "indirect/patch": path = msg.data log.debug("Received indirect/patch event for %s", path) target = _parse_path(path) - if target: - store.upsert(target[0], requester.get_one(target[0], target[1])) + if target is not None: + store.upsert(target.kind, requester.get_one(target.kind, target.key)) else: log.warning("Indirect patch for unknown path: %s", path) elif msg.event == "indirect/put": @@ -124,8 +127,8 @@ def process_message(store, requester, msg): version = payload['version'] log.debug("Received delete event for %s, New version: [%d]", path, version) target = _parse_path(path) - if target: - store.delete(target[0], target[1], version) + if target is not None: + store.delete(target.kind, target.key, version) else: log.warning("Delete for unknown path: %s", path) else: @@ -135,5 +138,5 @@ def process_message(store, requester, msg): def _parse_path(self, path): for kind in [FEATURES, SEGMENTS]: if path.startsWith(kind.stream_api_path): - return (kind, path.substring(len(kind.stream_api_path))) + return KindAndKey(kind = kind, key = path.substring(len(kind.stream_api_path))) return None From 51853eb6f352db012ca4db58b4845e2e06c6c02c Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Tue, 6 Feb 2018 16:07:43 -0800 Subject: [PATCH 13/26] misc cleanup --- ldclient/flag.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ldclient/flag.py b/ldclient/flag.py index 1fe81529..56e7bfa2 100644 --- a/ldclient/flag.py +++ b/ldclient/flag.py @@ -25,18 +25,18 @@ def evaluate(flag, user, store): return _get_off_variation(flag), prereq_events -def _evaluate(flag, user, feature_store, prereq_events=None): +def _evaluate(flag, user, store, prereq_events=None): events = prereq_events or [] failed_prereq = None prereq_value = None for prereq in flag.get('prerequisites') or []: - prereq_flag = feature_store.get(FEATURES, prereq.get('key'), lambda x: x) + prereq_flag = store.get(FEATURES, prereq.get('key'), lambda x: x) if prereq_flag is None: log.warn("Missing prereq flag: " + prereq.get('key')) failed_prereq = prereq break if prereq_flag.get('on', False) is True: - prereq_value, events = _evaluate(prereq_flag, user, feature_store, events) + prereq_value, events = _evaluate(prereq_flag, user, store, events) variation = _get_variation(prereq_flag, prereq.get('variation')) if prereq_value is None or not prereq_value == variation: failed_prereq = prereq @@ -50,7 +50,7 @@ def _evaluate(flag, user, feature_store, prereq_events=None): if failed_prereq is not None: return None, events - index = _evaluate_index(flag, user, feature_store) + index = _evaluate_index(flag, user, store) return _get_variation(flag, index), events From 21389b6dd7c354d0743891edab6c79c3b151f22b Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Tue, 6 Feb 2018 16:07:52 -0800 Subject: [PATCH 14/26] use defaultdict --- ldclient/feature_store.py | 20 ++++++++------------ 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/ldclient/feature_store.py b/ldclient/feature_store.py index 34701c3e..155743ea 100644 --- a/ldclient/feature_store.py +++ b/ldclient/feature_store.py @@ -1,3 +1,4 @@ +from collections import defaultdict from ldclient.util import log from ldclient.interfaces import FeatureStore from ldclient.rwlock import ReadWriteLock @@ -11,12 +12,12 @@ class InMemoryFeatureStore(FeatureStore): def __init__(self): self._lock = ReadWriteLock() self._initialized = False - self._items = {} + self._items = defaultdict(dict) def get(self, kind, key, callback): try: self._lock.rlock() - itemsOfKind = self._items.get(kind, {}) + itemsOfKind = self._items[kind] item = itemsOfKind.get(key) if item is None: log.debug("Attempted to get missing key %s in '%s', returning None", key, kind.namespace) @@ -31,7 +32,7 @@ def get(self, kind, key, callback): def all(self, kind, callback): try: self._lock.rlock() - itemsOfKind = self._items.get(kind, {}) + itemsOfKind = self._items[kind] return callback(dict((k, i) for k, i in itemsOfKind.items() if ('deleted' not in i) or not i['deleted'])) finally: self._lock.runlock() @@ -39,7 +40,8 @@ def all(self, kind, callback): def init(self, all_data): try: self._lock.rlock() - self._items = dict(all_data) + self._items.clear() + self._items.update(all_data) self._initialized = True for k in all_data: log.debug("Initialized '%s' store with %d items", k.namespace, len(all_data[k])) @@ -50,10 +52,7 @@ def init(self, all_data): def delete(self, kind, key, version): try: self._lock.rlock() - itemsOfKind = self._items.get(kind) - if itemsOfKind is None: - itemsOfKind = dict() - self._items[kind] = itemsOfKind + itemsOfKind = self._items[kind] i = itemsOfKind.get(key) if i is None or i['version'] < version: i = {'deleted': True, 'version': version} @@ -65,10 +64,7 @@ def upsert(self, kind, item): key = item['key'] try: self._lock.rlock() - itemsOfKind = self._items.get(kind) - if itemsOfKind is None: - itemsOfKind = dict() - self._items[kind] = itemsOfKind + itemsOfKind = self._items[kind] i = itemsOfKind.get(key) if i is None or i['version'] < item['version']: itemsOfKind[key] = item From 74beca352d23c9c108bdbe4693351e5d1a1c4d92 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Tue, 6 Feb 2018 20:55:29 -0800 Subject: [PATCH 15/26] change class name --- ldclient/streaming.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ldclient/streaming.py b/ldclient/streaming.py index 86b1ddd1..55957405 100644 --- a/ldclient/streaming.py +++ b/ldclient/streaming.py @@ -19,7 +19,7 @@ STREAM_ALL_PATH = '/all' -KindAndKey = namedtuple('KindAndKey', ['kind', 'key']) +ParsedPath = namedtuple('ParsedPath', ['kind', 'key']) class StreamingUpdateProcessor(Thread, UpdateProcessor): @@ -138,5 +138,5 @@ def process_message(store, requester, msg): def _parse_path(self, path): for kind in [FEATURES, SEGMENTS]: if path.startsWith(kind.stream_api_path): - return KindAndKey(kind = kind, key = path.substring(len(kind.stream_api_path))) + return ParsedPath(kind = kind, key = path.substring(len(kind.stream_api_path))) return None From 7e02fa229a490b509518a894572ca25c5b1d610e Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 7 Feb 2018 10:51:38 -0800 Subject: [PATCH 16/26] fix merge --- testing/test_flag.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/testing/test_flag.py b/testing/test_flag.py index 9f778967..8b9740aa 100644 --- a/testing/test_flag.py +++ b/testing/test_flag.py @@ -236,18 +236,16 @@ def _make_bool_flag_from_clause(clause): def test_bucket_by_user_key(): - feature = { u'key': u'hashKey', u'salt': u'saltyA' } - user = { u'key': u'userKeyA' } - bucket = _bucket_user(user, feature, 'key') + bucket = _bucket_user(user, 'hashKey', 'saltyA', 'key') assert bucket == pytest.approx(0.42157587) user = { u'key': u'userKeyB' } - bucket = _bucket_user(user, feature, 'key') + bucket = _bucket_user(user, 'hashKey', 'saltyA', 'key') assert bucket == pytest.approx(0.6708485) user = { u'key': u'userKeyC' } - bucket = _bucket_user(user, feature, 'key') + bucket = _bucket_user(user, 'hashKey', 'saltyA', 'key') assert bucket == pytest.approx(0.10343106) def test_bucket_by_int_attr(): @@ -259,9 +257,9 @@ def test_bucket_by_int_attr(): u'stringAttr': u'33333' } } - bucket = _bucket_user(user, feature, 'intAttr') + bucket = _bucket_user(user, 'hashKey', 'saltyA', 'intAttr') assert bucket == pytest.approx(0.54771423) - bucket2 = _bucket_user(user, feature, 'stringAttr') + bucket2 = _bucket_user(user, 'hashKey', 'saltyA', 'stringAttr') assert bucket2 == bucket def test_bucket_by_float_attr_not_allowed(): @@ -272,5 +270,5 @@ def test_bucket_by_float_attr_not_allowed(): u'floatAttr': 33.5 } } - bucket = _bucket_user(user, feature, 'floatAttr') + bucket = _bucket_user(user, 'hashKey', 'saltyA', 'floatAttr') assert bucket == 0.0 From 2018a25e709f3551c8fff0ff5aeaa19c1ea46105 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Tue, 13 Feb 2018 11:22:59 -0800 Subject: [PATCH 17/26] fix & test edge case of weight=None --- ldclient/flag.py | 2 +- testing/test_segment.py | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+), 1 deletion(-) diff --git a/ldclient/flag.py b/ldclient/flag.py index b25a24a7..06787de9 100644 --- a/ldclient/flag.py +++ b/ldclient/flag.py @@ -190,7 +190,7 @@ def _segment_rule_matches_user(rule, user, segment_key, salt): return False # If the weight is absent, this rule matches - if 'weight' not in rule: + if 'weight' not in rule or rule['weight'] is None: return True # All of the clauses are met. See if the user buckets in diff --git a/testing/test_segment.py b/testing/test_segment.py index 785d2c5a..02b9ecfa 100644 --- a/testing/test_segment.py +++ b/testing/test_segment.py @@ -31,6 +31,43 @@ def test_explicit_include_has_precedence(): u = { "key": "foo" } assert _segment_matches_user(s, u) is True +def test_matching_rule_with_no_weight(): + s = { + "key": "test", + "rules": [ + { + "clauses": [ + { + "attribute": "email", + "op": "in", + "values": [ "test@example.com" ] + } + ] + } + ] + } + u = { "key": "foo", "email": "test@example.com" } + assert _segment_matches_user(s, u) is True + +def test_matching_rule_with_none_weight(): + s = { + "key": "test", + "rules": [ + { + "clauses": [ + { + "attribute": "email", + "op": "in", + "values": [ "test@example.com" ] + } + ], + "weight": None + } + ] + } + u = { "key": "foo", "email": "test@example.com" } + assert _segment_matches_user(s, u) is True + def test_matching_rule_with_full_rollout(): s = { "key": "test", From 29a05b64c3cd70508d05f58a835e1e7b5c6aa95e Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 21 Feb 2018 14:02:25 -0800 Subject: [PATCH 18/26] remove all Twisted support --- MANIFEST.in | 1 - README.md | 27 ----- ldclient/twisted_client.py | 80 -------------- ldclient/twisted_event_consumer.py | 91 ---------------- ldclient/twisted_redis_feature_store.py | 133 ------------------------ setup.py | 4 - twisted-requirements.txt | 5 - 7 files changed, 341 deletions(-) delete mode 100644 ldclient/twisted_client.py delete mode 100644 ldclient/twisted_event_consumer.py delete mode 100644 ldclient/twisted_redis_feature_store.py delete mode 100644 twisted-requirements.txt diff --git a/MANIFEST.in b/MANIFEST.in index 2bd71dcc..4ec6f0b0 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1,6 +1,5 @@ include requirements.txt include README.txt include test-requirements.txt -include twisted-requirements.txt include redis-requirements.txt include python2.6-requirements.txt \ No newline at end of file diff --git a/README.md b/README.md index 8074e1ea..a480b616 100644 --- a/README.md +++ b/README.md @@ -48,32 +48,6 @@ Python 2.6 is supported for polling mode only and requires an extra dependency. 1. Due to Python 2.6's lack of SNI support, LaunchDarkly's streaming flag updates are not available. Set the `stream=False` option in the client config to disable it. You'll still receive flag updates, but via a polling mechanism with efficient caching. Here's an example: `config = ldclient.Config(stream=False, sdk_key="SDK_KEY")` - -Twisted -------- -Twisted is supported for LDD mode only. To run in Twisted/LDD mode, - -1. Use this dependency: - - ``` - ldclient-py[twisted]>=3.0.1 - ``` -2. Configure the client: - - ``` - feature_store = TwistedRedisFeatureStore(url='YOUR_REDIS_URL', redis_prefix="ldd-restwrapper", expiration=0) - ldclient.config.feature_store = feature_store - - ldclient.config = ldclient.Config( - use_ldd=use_ldd, - event_consumer_class=TwistedEventConsumer, - ) - ldclient.sdk_key = 'YOUR_SDK_KEY' - ``` -3. Get the client: - - ```client = ldclient.get()``` - Learn more ----------- @@ -104,7 +78,6 @@ About LaunchDarkly * [JavaScript](http://docs.launchdarkly.com/docs/js-sdk-reference "LaunchDarkly JavaScript SDK") * [PHP](http://docs.launchdarkly.com/docs/php-sdk-reference "LaunchDarkly PHP SDK") * [Python](http://docs.launchdarkly.com/docs/python-sdk-reference "LaunchDarkly Python SDK") - * [Python Twisted](http://docs.launchdarkly.com/docs/python-twisted-sdk-reference "LaunchDarkly Python Twisted SDK") * [Go](http://docs.launchdarkly.com/docs/go-sdk-reference "LaunchDarkly Go SDK") * [Node.JS](http://docs.launchdarkly.com/docs/node-sdk-reference "LaunchDarkly Node SDK") * [.NET](http://docs.launchdarkly.com/docs/dotnet-sdk-reference "LaunchDarkly .Net SDK") diff --git a/ldclient/twisted_client.py b/ldclient/twisted_client.py deleted file mode 100644 index 90ce50dc..00000000 --- a/ldclient/twisted_client.py +++ /dev/null @@ -1,80 +0,0 @@ -from functools import partial - -from twisted.internet import defer -from twisted.internet.defer import DeferredList - -from ldclient import LDClient -from ldclient import log -from ldclient.flag import _get_variation, _evaluate_index, _get_off_variation - - -class TwistedLDClient(LDClient): - @defer.inlineCallbacks - def _evaluate_and_send_events(self, flag, user, default): - value = yield self._evaluate(flag, user) - if value is None: - value = default - log.info("value: " + str(value)) - self._send_event({'kind': 'feature', 'key': flag.get('key'), 'user': user, 'value': value, - 'default': default, 'version': flag.get('version')}) - defer.returnValue(value) - - def _evaluate(self, flag, user): - if flag.get('on', False): - def cb(result): - if result is not None: - return result - return _get_off_variation(flag) - - value = self._evaluate_internal(flag, user) - value.addBoth(cb) - return value - - return _get_off_variation(flag) - - def _evaluate_internal(self, flag, user): - def check_prereq_results(result): - prereq_ok = True - for (success, prereq_ok) in result: - if success is False or prereq_ok is False: - prereq_ok = False - - if prereq_ok is True: - index = _evaluate_index(flag, user) - variation = _get_variation(flag, index) - return variation - return None - - results = DeferredList(map(partial(self._evaluate_prereq, user), flag.get('prerequisites') or [])) - results.addBoth(check_prereq_results) - return results - - # returns False if the prereq failed or there was an error evaluating it. Otherwise returns True - def _evaluate_prereq(self, user, prereq): - - @defer.inlineCallbacks - def eval_prereq(prereq_flag): - if prereq_flag is None: - log.warn("Missing prereq flag: " + prereq.get('key')) - defer.returnValue(False) - if prereq_flag.get('on', False) is True: - prereq_value = yield self._evaluate_internal(prereq_flag, user) - variation = _get_variation(prereq_flag, prereq.get('variation')) - if prereq_value is None or not prereq_value == variation: - ok = False - else: - ok = True - else: - ok = False - defer.returnValue(ok) - - result = self._store.get(prereq.get('key'), eval_prereq) - return result - - @defer.inlineCallbacks - def _evaluate_multi(self, user, flags): - results = {} - for k, v in flags.items() or {}: - r = yield self._evaluate(v, user) - results[k] = r - defer.returnValue(results) diff --git a/ldclient/twisted_event_consumer.py b/ldclient/twisted_event_consumer.py deleted file mode 100644 index e2f69266..00000000 --- a/ldclient/twisted_event_consumer.py +++ /dev/null @@ -1,91 +0,0 @@ -from __future__ import absolute_import - -import errno -import json - -import txrequests -from cachecontrol import CacheControl -from queue import Empty -from requests.packages.urllib3.exceptions import ProtocolError -from twisted.internet import task, defer - -from ldclient.event_serializer import EventSerializer -from ldclient.interfaces import EventConsumer -from ldclient.util import _headers, log - - -class TwistedEventConsumer(EventConsumer): - - def __init__(self, queue, config): - self._queue = queue - """ :type: queue.Queue """ - - self._session = CacheControl(txrequests.Session()) - """ :type: txrequests.Session """ - - self._config = config - """ :type: ldclient.twisted.TwistedConfig """ - - self._serializer = EventSerializer(config) - - self._looping_call = None - """ :type: LoopingCall""" - - def start(self): - self._looping_call = task.LoopingCall(self._consume) - self._looping_call.start(5) - - def stop(self): - self._looping_call.stop() - - def is_alive(self): - return self._looping_call is not None and self._looping_call.running - - def flush(self): - return self._consume() - - def _consume(self): - items = [] - try: - while True: - items.append(self._queue.get_nowait()) - except Empty: - pass - - if items: - return self.send_batch(items) - - @defer.inlineCallbacks - def send_batch(self, events): - @defer.inlineCallbacks - def do_send(should_retry): - # noinspection PyBroadException - try: - json_body = self._serializer.serialize_events(events) - hdrs = _headers(self._config.sdk_key) - r = yield self._session.post(self._config.events_uri, - headers=hdrs, - timeout=(self._config.connect_timeout, self._config.read_timeout), - data=json_body) - if r.status_code == 401 - log.error('Received 401 error, no further events will be posted since SDK key is invalid') - self.stop() - return - r.raise_for_status() - except ProtocolError as e: - inner = e.args[1] - if inner.errno == errno.ECONNRESET and should_retry: - log.warning( - 'ProtocolError exception caught while sending events. Retrying.') - yield do_send(False) - else: - log.exception( - 'Unhandled exception in event consumer. Analytics events were not processed.') - except: - log.exception( - 'Unhandled exception in event consumer. Analytics events were not processed.') - try: - yield do_send(True) - finally: - for _ in events: - self._queue.task_done() diff --git a/ldclient/twisted_redis_feature_store.py b/ldclient/twisted_redis_feature_store.py deleted file mode 100644 index de2566ed..00000000 --- a/ldclient/twisted_redis_feature_store.py +++ /dev/null @@ -1,133 +0,0 @@ -from __future__ import absolute_import - -import json -import urlparse - -from twisted.internet import defer -from twisted.internet import protocol, reactor -from txredis.client import RedisClient - -from ldclient.expiringdict import ExpiringDict -from ldclient.interfaces import FeatureStore -from ldclient.redis_feature_store import ForgetfulDict, INIT_KEY -from ldclient.util import log - - -class TwistedRedisFeatureStore(FeatureStore): - def __init__(self, - url='redis://localhost:6379/0', - expiration=15, - capacity=1000, - redis_prefix='launchdarkly'): - self._url = url - parsed_url = urlparse.urlparse(url) - self._redis_host = parsed_url.hostname - self._redis_port = parsed_url.port - self._features_key = "{0}:features".format(redis_prefix) - self._cache = ForgetfulDict() if expiration == 0 else ExpiringDict(max_len=capacity, - max_age_seconds=expiration) - log.info("Created TwistedRedisFeatureStore with url: " + url + " using key: " + self._features_key) - - def _get_connection(self): - client_creator = protocol.ClientCreator(reactor, RedisClient) - return client_creator.connectTCP(self._redis_host, self._redis_port) - - def initialized(self): - initialized = self._cache.get(INIT_KEY) - if initialized: - # reset ttl - self._cache[INIT_KEY] = True - return True - - @defer.inlineCallbacks - def redis_initialized(): - r = yield self._get_connection() - """ :type: RedisClient """ - i = yield r.exists(self._features_key) - if i: - # reset ttl - self._cache[INIT_KEY] = True - defer.returnValue(i) - - initialized = redis_initialized() - return initialized - - def upsert(self, key, feature): - raise NotImplementedError() - - def all(self, callback): - @defer.inlineCallbacks - def redis_get_all(): - r = None - try: - r = yield self._get_connection() - """ :type: RedisClient """ - all_features = yield r.hgetall(self._features_key) - if all_features is None or all_features is "": - log.warn("TwistedRedisFeatureStore: call to get all flags returned no results. Returning None.") - defer.returnValue(None) - - results = {} - for k, f_json in all_features.items() or {}: - f = json.loads(f_json.decode('utf-8')) - if 'deleted' in f and f['deleted'] is False: - results[f['key']] = f - defer.returnValue(results) - except Exception as e: - log.error("Could not connect to Redis using url: " + self._url + " with error message: " + e.message) - defer.returnValue(None) - finally: - if r: - r.quit() - defer.returnValue(None) - - all_flags = redis_get_all() - all_flags.addBoth(callback) - return all_flags - - def delete(self, key, version): - raise NotImplementedError() - - def init(self, features): - raise NotImplementedError() - - def get(self, key, callback): - @defer.inlineCallbacks - def redis_get(): - r = None - try: - r = yield self._get_connection() - """ :type: RedisClient """ - get_result = yield r.hget(self._features_key, key) - if not get_result: - log.warn("Didn't get response from redis for key: " + key + " Returning None.") - defer.returnValue(None) - f_json = get_result.get(key) - if f_json is None or f_json is "": - log.warn( - "TwistedRedisFeatureStore: feature flag with key: " + key + " not found in Redis. Returning None.") - defer.returnValue(None) - - f = json.loads(f_json.decode('utf-8')) - if f.get('deleted', False) is True: - log.warn("TwistedRedisFeatureStore: get returned deleted flag from Redis. Returning None.") - defer.returnValue(None) - self._cache[key] = f - defer.returnValue(f) - except Exception as e: - log.error("Could not connect to Redis using url: " + self._url + " with error message: " + e.message) - defer.returnValue(None) - finally: - if r: - r.quit() - defer.returnValue(None) - - cached = self._cache.get(key) - if cached is not None: - # reset ttl - self._cache[key] = cached - return callback(cached) - - f = redis_get() - f.addBoth(callback) - return f diff --git a/setup.py b/setup.py index 79856397..f7549f5c 100644 --- a/setup.py +++ b/setup.py @@ -14,8 +14,6 @@ install_reqs = parse_requirements('requirements.txt', session=uuid.uuid1()) python26_reqs = parse_requirements('python2.6-requirements.txt', session=uuid.uuid1()) test_reqs = parse_requirements('test-requirements.txt', session=uuid.uuid1()) -twisted_reqs = parse_requirements( - 'twisted-requirements.txt', session=uuid.uuid1()) redis_reqs = parse_requirements('redis-requirements.txt', session=uuid.uuid1()) # reqs is a list of requirement @@ -23,7 +21,6 @@ reqs = [str(ir.req) for ir in install_reqs] python26reqs = [str(ir.req) for ir in python26_reqs] testreqs = [str(ir.req) for ir in test_reqs] -txreqs = [str(ir.req) for ir in twisted_reqs] redisreqs = [str(ir.req) for ir in redis_reqs] @@ -66,7 +63,6 @@ def run(self): 'Topic :: Software Development :: Libraries', ], extras_require={ - "twisted": txreqs, "redis": redisreqs, "python2.6": python26reqs }, diff --git a/twisted-requirements.txt b/twisted-requirements.txt deleted file mode 100644 index e99d9e35..00000000 --- a/twisted-requirements.txt +++ /dev/null @@ -1,5 +0,0 @@ -cryptography>=1.0 -pyOpenSSL>=0.14 -service_identity>=16.0 -txredis>=2.4 -txrequests>=0.9.2 From 35c787a1e66b4979a369a7a6f2fe2a2c11e1279a Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 21 Feb 2018 14:39:00 -0800 Subject: [PATCH 19/26] update readme: we do support streaming for Python 2.6 --- README.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/README.md b/README.md index a480b616..aed91f43 100644 --- a/README.md +++ b/README.md @@ -40,14 +40,11 @@ Your first feature flag Python 2.6 ---------- -Python 2.6 is supported for polling mode only and requires an extra dependency. Here's how to set it up: +Python 2.6 requires an extra dependency. Here's how to set it up: 1. Use the `python2.6` extra in your requirements.txt: `ldclient-py[python2.6]` -1. Due to Python 2.6's lack of SNI support, LaunchDarkly's streaming flag updates are not available. Set the `stream=False` option in the client config to disable it. You'll still receive flag updates, but via a polling mechanism with efficient caching. Here's an example: - `config = ldclient.Config(stream=False, sdk_key="SDK_KEY")` - Learn more ----------- From 739cf75247d118e93a63d25f0f6ed5044a0e5ee8 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 21 Feb 2018 17:42:47 -0800 Subject: [PATCH 20/26] fix ridiculous mistakes that broke the stream --- ldclient/streaming.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ldclient/streaming.py b/ldclient/streaming.py index 55957405..be90e175 100644 --- a/ldclient/streaming.py +++ b/ldclient/streaming.py @@ -103,7 +103,7 @@ def process_message(store, requester, msg): path = payload['path'] obj = payload['data'] log.debug("Received patch event for %s, New version: [%d]", path, obj.get("version")) - target = _parse_path(path) + target = StreamingUpdateProcessor._parse_path(path) if target is not None: store.upsert(target.kind, obj) else: @@ -111,7 +111,7 @@ def process_message(store, requester, msg): elif msg.event == "indirect/patch": path = msg.data log.debug("Received indirect/patch event for %s", path) - target = _parse_path(path) + target = StreamingUpdateProcessor._parse_path(path) if target is not None: store.upsert(target.kind, requester.get_one(target.kind, target.key)) else: @@ -126,7 +126,7 @@ def process_message(store, requester, msg): # noinspection PyShadowingNames version = payload['version'] log.debug("Received delete event for %s, New version: [%d]", path, version) - target = _parse_path(path) + target = StreamingUpdateProcessor._parse_path(path) if target is not None: store.delete(target.kind, target.key, version) else: @@ -135,8 +135,9 @@ def process_message(store, requester, msg): log.warning('Unhandled event in stream processor: ' + msg.event) return False - def _parse_path(self, path): + @staticmethod + def _parse_path(path): for kind in [FEATURES, SEGMENTS]: - if path.startsWith(kind.stream_api_path): - return ParsedPath(kind = kind, key = path.substring(len(kind.stream_api_path))) + if path.startswith(kind.stream_api_path): + return ParsedPath(kind = kind, key = path[:len(kind.stream_api_path)]) return None From d52ab9def74825158bdca4ab2b38e7ccdc4994dd Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Wed, 21 Feb 2018 18:36:29 -0800 Subject: [PATCH 21/26] fix further breakage in StreamProcessor --- ldclient/streaming.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ldclient/streaming.py b/ldclient/streaming.py index be90e175..58356f34 100644 --- a/ldclient/streaming.py +++ b/ldclient/streaming.py @@ -139,5 +139,5 @@ def process_message(store, requester, msg): def _parse_path(path): for kind in [FEATURES, SEGMENTS]: if path.startswith(kind.stream_api_path): - return ParsedPath(kind = kind, key = path[:len(kind.stream_api_path)]) + return ParsedPath(kind = kind, key = path[len(kind.stream_api_path):]) return None From 9a73a16c8ff6824096872ab6430782007b557e91 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 26 Mar 2018 11:03:01 -0700 Subject: [PATCH 22/26] fix Redis store to use optimistic locking and retry as needed --- ldclient/redis_feature_store.py | 70 +++++++++++++++++++-------------- test-requirements.txt | 1 + testing/test_feature_store.py | 42 ++++++++++++++++++++ 3 files changed, 83 insertions(+), 30 deletions(-) diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py index f3850cbe..01c1cb3d 100644 --- a/ldclient/redis_feature_store.py +++ b/ldclient/redis_feature_store.py @@ -79,19 +79,20 @@ def all(self, kind, callback): return callback(results) def get(self, kind, key, callback=lambda x: x): - item = self._get_even_if_deleted(kind, key) + item = self._get_even_if_deleted(kind, key, True) if item is not None and item.get('deleted', False) is True: log.debug("RedisFeatureStore: get returned deleted item %s in '%s'. Returning None.", key, kind.namespace) return callback(None) return callback(item) - def _get_even_if_deleted(self, kind, key): + def _get_even_if_deleted(self, kind, key, check_cache = True): cacheKey = self._cache_key(kind, key) - item = self._cache.get(cacheKey) - if item is not None: - # reset ttl - self._cache[cacheKey] = item - return item + if check_cache: + item = self._cache.get(cacheKey) + if item is not None: + # reset ttl + self._cache[cacheKey] = item + return item try: r = redis.Redis(connection_pool=self._pool) @@ -110,17 +111,11 @@ def _get_even_if_deleted(self, kind, key): return item def delete(self, kind, key, version): - r = redis.Redis(connection_pool=self._pool) - baseKey = self._items_key(kind) - r.watch(baseKey) - item_json = r.hget(baseKey, key) - item = None if item_json is None else json.loads(item_json.decode('utf-8')) - if item is None or item['version'] < version: - deletedItem = { "deleted": True, "version": version } - item_json = json.dumps(deletedItem) - r.hset(baseKey, key, item_json) - self._cache[self._cache_key(kind, key)] = deletedItem - r.unwatch() + deleted_item = { "key": key, "version": version, "deleted": True } + self._update_with_versioning(kind, deleted_item) + + def upsert(self, kind, item): + self._update_with_versioning(kind, item) @property def initialized(self): @@ -130,18 +125,33 @@ def _query_init(self): r = redis.Redis(connection_pool=self._pool) return r.exists(self._items_key(FEATURES)) - def upsert(self, kind, item): + def _update_with_versioning(self, kind, item): r = redis.Redis(connection_pool=self._pool) - baseKey = self._items_key(kind) + base_key = self._items_key(kind) key = item['key'] - r.watch(baseKey) - old = self._get_even_if_deleted(kind, key) - if old: - if old['version'] >= item['version']: - r.unwatch() - return - item_json = json.dumps(item) - r.hset(baseKey, key, item_json) - self._cache[self._cache_key(kind, key)] = item - r.unwatch() + + try_again = True + while try_again: + try_again = False + pipeline = r.pipeline() + pipeline.watch(base_key) + old = self._get_even_if_deleted(kind, key, False) + self._before_update_transaction(base_key, key) + if old and old['version'] >= item['version']: + pipeline.unwatch() + else: + try: + pipeline.multi() + pipeline.hset(base_key, key, item_json) + pipeline.execute() + # Unlike Redis implementations for other platforms, in redis-py a failed WATCH + # produces an exception rather than a null result from execute(). + self._cache[self._cache_key(kind, key)] = item + except redis.exceptions.WatchError: + log.debug("RedisFeatureStore: concurrent modification detected, retrying") + try_again = True + + def _before_update_transaction(self, base_key, key): + # exposed for testing + pass diff --git a/test-requirements.txt b/test-requirements.txt index a01536a9..a75fc427 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,3 +1,4 @@ +mock>=2.0.0 pytest>=2.8 pytest-timeout>=1.0 redis>=2.10.5 diff --git a/testing/test_feature_store.py b/testing/test_feature_store.py index cabc40df..71fabfad 100644 --- a/testing/test_feature_store.py +++ b/testing/test_feature_store.py @@ -1,3 +1,5 @@ +import json +from mock import patch import pytest import redis @@ -120,3 +122,43 @@ def test_upsert_older_version_after_delete(self, store): old_ver = self.make_feature('foo', 9) store.upsert(FEATURES, old_ver) assert store.get(FEATURES, 'foo', lambda x: x) is None + + +class TestRedisFeatureStoreExtraTests: + @patch.object(RedisFeatureStore, '_before_update_transaction') + def test_upsert_race_condition_against_external_client_with_higher_version(self, mock_method): + other_client = redis.StrictRedis(host='localhost', port=6379, db=0) + store = RedisFeatureStore() + store.init({ FEATURES: {} }) + + other_version = {u'key': u'flagkey', u'version': 2} + def hook(base_key, key): + if other_version['version'] <= 4: + other_client.hset(base_key, key, json.dumps(other_version)) + other_version['version'] = other_version['version'] + 1 + mock_method.side_effect = hook + + feature = { u'key': 'flagkey', u'version': 1 } + + store.upsert(FEATURES, feature) + result = store.get(FEATURES, 'flagkey', lambda x: x) + assert result['version'] == 2 + + @patch.object(RedisFeatureStore, '_before_update_transaction') + def test_upsert_race_condition_against_external_client_with_lower_version(self, mock_method): + other_client = redis.StrictRedis(host='localhost', port=6379, db=0) + store = RedisFeatureStore() + store.init({ FEATURES: {} }) + + other_version = {u'key': u'flagkey', u'version': 2} + def hook(base_key, key): + if other_version['version'] <= 4: + other_client.hset(base_key, key, json.dumps(other_version)) + other_version['version'] = other_version['version'] + 1 + mock_method.side_effect = hook + + feature = { u'key': 'flagkey', u'version': 5 } + + store.upsert(FEATURES, feature) + result = store.get(FEATURES, 'flagkey', lambda x: x) + assert result['version'] == 5 From aee760698b89585ef05ca0ec4e3788f31c7f095b Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 26 Mar 2018 12:14:00 -0700 Subject: [PATCH 23/26] make parameter name explicit --- ldclient/redis_feature_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py index 01c1cb3d..a5618af4 100644 --- a/ldclient/redis_feature_store.py +++ b/ldclient/redis_feature_store.py @@ -79,7 +79,7 @@ def all(self, kind, callback): return callback(results) def get(self, kind, key, callback=lambda x: x): - item = self._get_even_if_deleted(kind, key, True) + item = self._get_even_if_deleted(kind, key, check_cache=True) if item is not None and item.get('deleted', False) is True: log.debug("RedisFeatureStore: get returned deleted item %s in '%s'. Returning None.", key, kind.namespace) return callback(None) @@ -136,7 +136,7 @@ def _update_with_versioning(self, kind, item): try_again = False pipeline = r.pipeline() pipeline.watch(base_key) - old = self._get_even_if_deleted(kind, key, False) + old = self._get_even_if_deleted(kind, key, check_cache=False) self._before_update_transaction(base_key, key) if old and old['version'] >= item['version']: pipeline.unwatch() From 57255c7ba29775e6fcce3fd4949424c8cf78f38a Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 26 Mar 2018 12:16:16 -0700 Subject: [PATCH 24/26] narrower try block --- ldclient/redis_feature_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py index a5618af4..70fdf01e 100644 --- a/ldclient/redis_feature_store.py +++ b/ldclient/redis_feature_store.py @@ -141,9 +141,9 @@ def _update_with_versioning(self, kind, item): if old and old['version'] >= item['version']: pipeline.unwatch() else: + pipeline.multi() + pipeline.hset(base_key, key, item_json) try: - pipeline.multi() - pipeline.hset(base_key, key, item_json) pipeline.execute() # Unlike Redis implementations for other platforms, in redis-py a failed WATCH # produces an exception rather than a null result from execute(). From 243bf5b824052e644ed2a3aa88541b4842da51b2 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 26 Mar 2018 13:44:52 -0700 Subject: [PATCH 25/26] use break/continue --- ldclient/redis_feature_store.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py index 70fdf01e..582ea6b1 100644 --- a/ldclient/redis_feature_store.py +++ b/ldclient/redis_feature_store.py @@ -131,15 +131,14 @@ def _update_with_versioning(self, kind, item): key = item['key'] item_json = json.dumps(item) - try_again = True - while try_again: - try_again = False + while True: pipeline = r.pipeline() pipeline.watch(base_key) old = self._get_even_if_deleted(kind, key, check_cache=False) self._before_update_transaction(base_key, key) if old and old['version'] >= item['version']: pipeline.unwatch() + break else: pipeline.multi() pipeline.hset(base_key, key, item_json) @@ -147,10 +146,11 @@ def _update_with_versioning(self, kind, item): pipeline.execute() # Unlike Redis implementations for other platforms, in redis-py a failed WATCH # produces an exception rather than a null result from execute(). - self._cache[self._cache_key(kind, key)] = item except redis.exceptions.WatchError: log.debug("RedisFeatureStore: concurrent modification detected, retrying") - try_again = True + continue + self._cache[self._cache_key(kind, key)] = item + break def _before_update_transaction(self, base_key, key): # exposed for testing From 8071ace07a25183ab8609c24e18e2d113016cb44 Mon Sep 17 00:00:00 2001 From: Eli Bishop Date: Mon, 26 Mar 2018 16:30:22 -0700 Subject: [PATCH 26/26] add debug logging for out-of-order update --- ldclient/redis_feature_store.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py index 582ea6b1..3206884d 100644 --- a/ldclient/redis_feature_store.py +++ b/ldclient/redis_feature_store.py @@ -137,6 +137,9 @@ def _update_with_versioning(self, kind, item): old = self._get_even_if_deleted(kind, key, check_cache=False) self._before_update_transaction(base_key, key) if old and old['version'] >= item['version']: + log.debug('RedisFeatureStore: Attempted to %s key: %s version %d with a version that is the same or older: %d in "%s"', + 'delete' if item.get('deleted') else 'update', + key, old['version'], item['version'], kind.namespace) pipeline.unwatch() break else: