diff --git a/.circleci/config.yml b/.circleci/config.yml index 05cb973c..92699a3c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -40,28 +40,34 @@ jobs: docker: - image: circleci/python:2.7-jessie - image: redis + - image: amazon/dynamodb-local test-3.3: <<: *test-template docker: - image: circleci/python:3.3-jessie - image: redis + - image: amazon/dynamodb-local test-3.4: <<: *test-template docker: - image: circleci/python:3.4-jessie - image: redis + - image: amazon/dynamodb-local test-3.5: <<: *test-template docker: - image: circleci/python:3.5-jessie - image: redis + - image: amazon/dynamodb-local test-3.6: <<: *test-template docker: - image: circleci/python:3.6-jessie - image: redis + - image: amazon/dynamodb-local test-3.7: <<: *test-template docker: - image: circleci/python:3.7-stretch - image: redis + - image: amazon/dynamodb-local diff --git a/.gitignore b/.gitignore index 0d1700ee..d988c61f 100644 --- a/.gitignore +++ b/.gitignore @@ -44,6 +44,7 @@ nosetests.xml coverage.xml *,cover .hypothesis/ +.pytest_cache # Translations *.mo diff --git a/README.md b/README.md index edef13e6..d25ee307 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,6 @@ Or it can be set from within python: os.environ["https_proxy"] = "https://web-proxy.domain.com:8080" ``` - If your proxy requires authentication then you can prefix the URN with your login information: ``` export HTTPS_PROXY=http://user:pass@web-proxy.domain.com:8080 @@ -75,12 +74,19 @@ Your first feature flag # the code to run if the feature is off Supported Python versions ----------- +------------------------- + The SDK is tested with the most recent patch releases of Python 2.7, 3.3, 3.4, 3.5, and 3.6. Python 2.6 is no longer supported. +Database integrations +--------------------- + +Feature flag data can be kept in a persistent store using Redis or DynamoDB. These adapters are implemented in the `DynamoDB` and `Redis` classes in `ldclient.integrations`; to use them, call the `new_feature_store` method in the appropriate class, and put the returned object in the `feature_store` property of your client configuration. See [`ldclient.integrations`](https://github.com/launchdarkly/python-client-private/blob/master/ldclient/integrations.py) and the [SDK reference guide](https://docs.launchdarkly.com/v2.0/docs/using-a-persistent-feature-store) for more information. + Using flag data from a file --------------------------- -For testing purposes, the SDK can be made to read feature flag state from a file or files instead of connecting to LaunchDarkly. See [`file_data_source.py`](https://github.com/launchdarkly/python-client/blob/master/ldclient/file_data_source.py) for more details. + +For testing purposes, the SDK can be made to read feature flag state from a file or files instead of connecting to LaunchDarkly. See [`file_data_source.py`](https://github.com/launchdarkly/python-client/blob/master/ldclient/file_data_source.py) and the [SDK reference guide](https://docs.launchdarkly.com/v2.0/docs/reading-flags-from-a-file) for more details. Learn more ----------- @@ -100,7 +106,7 @@ Contributing See [CONTRIBUTING](CONTRIBUTING.md) for more information. About LaunchDarkly ------------ +------------------ * LaunchDarkly is a continuous delivery platform that provides feature flags as a service and allows developers to iterate quickly and safely. We allow you to easily flag your features and manage them from the LaunchDarkly dashboard. With LaunchDarkly, you can: * Roll out a new feature to a subset of your users (like a group of users who opt-in to a beta tester group), gathering feedback and bug reports from real-world use cases. diff --git a/dynamodb-requirements.txt b/dynamodb-requirements.txt new file mode 100644 index 00000000..b72b66b6 --- /dev/null +++ b/dynamodb-requirements.txt @@ -0,0 +1 @@ +boto3>=1.9.71 diff --git a/ldclient/client.py b/ldclient/client.py index 039fad52..30c37e53 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -10,8 +10,10 @@ from ldclient.config import Config as Config from ldclient.event_processor import NullEventProcessor from ldclient.feature_requester import FeatureRequesterImpl +from ldclient.feature_store import _FeatureStoreDataSetSorter from ldclient.flag import EvaluationDetail, evaluate, error_reason from ldclient.flags_state import FeatureFlagsState +from ldclient.interfaces import FeatureStore from ldclient.polling import PollingUpdateProcessor from ldclient.streaming import StreamingUpdateProcessor from ldclient.util import check_uwsgi, log @@ -27,6 +29,35 @@ from threading import Lock +class _FeatureStoreClientWrapper(FeatureStore): + """Provides additional behavior that the client requires before or after feature store operations. + Currently this just means sorting the data set for init(). In the future we may also use this + to provide an update listener capability. + """ + + def __init__(self, store): + self.store = store + + def init(self, all_data): + return self.store.init(_FeatureStoreDataSetSorter.sort_all_collections(all_data)) + + def get(self, kind, key, callback): + return self.store.get(kind, key, callback) + + def all(self, kind, callback): + return self.store.all(kind, callback) + + def delete(self, kind, key, version): + return self.store.delete(kind, key, version) + + def upsert(self, kind, item): + return self.store.upsert(kind, item) + + @property + def initialized(self): + return self.store.initialized + + class LDClient(object): def __init__(self, sdk_key=None, config=None, start_wait=5): """Constructs a new LDClient instance. @@ -55,7 +86,7 @@ def __init__(self, sdk_key=None, config=None, start_wait=5): self._event_processor = None self._lock = Lock() - self._store = self._config.feature_store + self._store = _FeatureStoreClientWrapper(self._config.feature_store) """ :type: FeatureStore """ if self._config.offline or not self._config.send_events: @@ -243,7 +274,14 @@ def send_event(value, variation=None, flag=None, reason=None): if user is not None and user.get('key', "") == "": log.warn("User key is blank. Flag evaluation will proceed, but the user will not be stored in LaunchDarkly.") - flag = self._store.get(FEATURES, key, lambda x: x) + try: + flag = self._store.get(FEATURES, key, lambda x: x) + except Exception as e: + log.error("Unexpected error while retrieving feature flag \"%s\": %s" % (key, repr(e))) + log.debug(traceback.format_exc()) + reason = error_reason('EXCEPTION') + send_event(default, None, None, reason) + return EvaluationDetail(default, None, reason) if not flag: reason = error_reason('FLAG_NOT_FOUND') send_event(default, None, None, reason) @@ -264,7 +302,7 @@ def send_event(value, variation=None, flag=None, reason=None): send_event(detail.value, detail.variation_index, flag, detail.reason) return detail except Exception as e: - log.error("Unexpected error while evaluating feature flag \"%s\": %s" % (key, e)) + log.error("Unexpected error while evaluating feature flag \"%s\": %s" % (key, repr(e))) log.debug(traceback.format_exc()) reason = error_reason('EXCEPTION') send_event(default, None, flag, reason) @@ -328,7 +366,7 @@ def all_flags_state(self, user, **kwargs): if flags_map is None: raise ValueError("feature store error") except Exception as e: - log.error("Unable to read flags for all_flag_state: %s" % e) + log.error("Unable to read flags for all_flag_state: %s" % repr(e)) return FeatureFlagsState(False) for key, flag in flags_map.items(): @@ -339,7 +377,7 @@ def all_flags_state(self, user, **kwargs): state.add_flag(flag, detail.value, detail.variation_index, detail.reason if with_reasons else None, details_only_if_tracked) except Exception as e: - log.error("Error evaluating flag \"%s\" in all_flags_state: %s" % (key, e)) + log.error("Error evaluating flag \"%s\" in all_flags_state: %s" % (key, repr(e))) log.debug(traceback.format_exc()) reason = {'kind': 'ERROR', 'errorKind': 'EXCEPTION'} state.add_flag(flag, None, None, reason if with_reasons else None, details_only_if_tracked) diff --git a/ldclient/dynamodb_feature_store.py b/ldclient/dynamodb_feature_store.py new file mode 100644 index 00000000..23ca3fce --- /dev/null +++ b/ldclient/dynamodb_feature_store.py @@ -0,0 +1,191 @@ +import json + +have_dynamodb = False +try: + import boto3 + have_dynamodb = True +except ImportError: + pass + +from ldclient import log +from ldclient.feature_store import CacheConfig +from ldclient.feature_store_helpers import CachingStoreWrapper +from ldclient.interfaces import FeatureStore, FeatureStoreCore + +# +# Internal implementation of the DynamoDB feature store. +# +# Implementation notes: +# +# * Feature flags, segments, and any other kind of entity the LaunchDarkly client may wish +# to store, are all put in the same table. The only two required attributes are "key" (which +# is present in all storeable entities) and "namespace" (a parameter from the client that is +# used to disambiguate between flags and segments). +# +# * Because of DynamoDB's restrictions on attribute values (e.g. empty strings are not +# allowed), the standard DynamoDB marshaling mechanism with one attribute per object property +# is not used. Instead, the entire object is serialized to JSON and stored in a single +# attribute, "item". The "version" property is also stored as a separate attribute since it +# is used for updates. +# +# * Since DynamoDB doesn't have transactions, the init() method - which replaces the entire data +# store - is not atomic, so there can be a race condition if another process is adding new data +# via upsert(). To minimize this, we don't delete all the data at the start; instead, we update +# the items we've received, and then delete all other items. That could potentially result in +# deleting new data from another process, but that would be the case anyway if the init() +# happened to execute later than the upsert(); we are relying on the fact that normally the +# process that did the init() will also receive the new data shortly and do its own upsert(). +# +# * DynamoDB has a maximum item size of 400KB. Since each feature flag or user segment is +# stored as a single item, this mechanism will not work for extremely large flags or segments. +# + +class _DynamoDBFeatureStoreCore(FeatureStoreCore): + PARTITION_KEY = 'namespace' + SORT_KEY = 'key' + VERSION_ATTRIBUTE = 'version' + ITEM_JSON_ATTRIBUTE = 'item' + + def __init__(self, table_name, prefix, dynamodb_opts): + if not have_dynamodb: + raise NotImplementedError("Cannot use DynamoDB feature store because AWS SDK (boto3 package) is not installed") + self._table_name = table_name + self._prefix = None if prefix == "" else prefix + self._client = boto3.client('dynamodb', **dynamodb_opts) + + def init_internal(self, all_data): + # Start by reading the existing keys; we will later delete any of these that weren't in all_data. + unused_old_keys = self._read_existing_keys(all_data.keys()) + requests = [] + num_items = 0 + inited_key = self._inited_key() + + # Insert or update every provided item + for kind, items in all_data.items(): + for key, item in items.items(): + encoded_item = self._marshal_item(kind, item) + requests.append({ 'PutRequest': { 'Item': encoded_item } }) + combined_key = (self._namespace_for_kind(kind), key) + unused_old_keys.discard(combined_key) + num_items = num_items + 1 + + # Now delete any previously existing items whose keys were not in the current data + for combined_key in unused_old_keys: + if combined_key[0] != inited_key: + requests.append({ 'DeleteRequest': { 'Key': self._make_keys(combined_key[0], combined_key[1]) } }) + + # Now set the special key that we check in initialized_internal() + requests.append({ 'PutRequest': { 'Item': self._make_keys(inited_key, inited_key) } }) + + _DynamoDBHelpers.batch_write_requests(self._client, self._table_name, requests) + log.info('Initialized table %s with %d items', self._table_name, num_items) + + def get_internal(self, kind, key): + resp = self._get_item_by_keys(self._namespace_for_kind(kind), key) + return self._unmarshal_item(resp.get('Item')) + + def get_all_internal(self, kind): + items_out = {} + paginator = self._client.get_paginator('query') + for resp in paginator.paginate(**self._make_query_for_kind(kind)): + for item in resp['Items']: + item_out = self._unmarshal_item(item) + items_out[item_out['key']] = item_out + return items_out + + def upsert_internal(self, kind, item): + encoded_item = self._marshal_item(kind, item) + try: + req = { + 'TableName': self._table_name, + 'Item': encoded_item, + 'ConditionExpression': 'attribute_not_exists(#namespace) or attribute_not_exists(#key) or :version > #version', + 'ExpressionAttributeNames': { + '#namespace': self.PARTITION_KEY, + '#key': self.SORT_KEY, + '#version': self.VERSION_ATTRIBUTE + }, + 'ExpressionAttributeValues': { + ':version': { 'N': str(item['version']) } + } + } + self._client.put_item(**req) + except self._client.exceptions.ConditionalCheckFailedException: + # The item was not updated because there's a newer item in the database. We must now + # read the item that's in the database and return it, so CachingStoreWrapper can cache it. + return self.get_internal(kind, item['key']) + return item + + def initialized_internal(self): + resp = self._get_item_by_keys(self._inited_key(), self._inited_key()) + return resp.get('Item') is not None and len(resp['Item']) > 0 + + def _prefixed_namespace(self, base): + return base if self._prefix is None else (self._prefix + ':' + base) + + def _namespace_for_kind(self, kind): + return self._prefixed_namespace(kind.namespace) + + def _inited_key(self): + return self._prefixed_namespace('$inited') + + def _make_keys(self, namespace, key): + return { + self.PARTITION_KEY: { 'S': namespace }, + self.SORT_KEY: { 'S': key } + } + + def _make_query_for_kind(self, kind): + return { + 'TableName': self._table_name, + 'ConsistentRead': True, + 'KeyConditions': { + self.PARTITION_KEY: { + 'AttributeValueList': [ + { 'S': self._namespace_for_kind(kind) } + ], + 'ComparisonOperator': 'EQ' + } + } + } + + def _get_item_by_keys(self, namespace, key): + return self._client.get_item(TableName=self._table_name, Key=self._make_keys(namespace, key)) + + def _read_existing_keys(self, kinds): + keys = set() + for kind in kinds: + req = self._make_query_for_kind(kind) + req['ProjectionExpression'] = '#namespace, #key' + req['ExpressionAttributeNames'] = { + '#namespace': self.PARTITION_KEY, + '#key': self.SORT_KEY + } + paginator = self._client.get_paginator('query') + for resp in paginator.paginate(**req): + for item in resp['Items']: + namespace = item[self.PARTITION_KEY]['S'] + key = item[self.SORT_KEY]['S'] + keys.add((namespace, key)) + return keys + + def _marshal_item(self, kind, item): + json_str = json.dumps(item) + ret = self._make_keys(self._namespace_for_kind(kind), item['key']) + ret[self.VERSION_ATTRIBUTE] = { 'N': str(item['version']) } + ret[self.ITEM_JSON_ATTRIBUTE] = { 'S': json_str } + return ret + + def _unmarshal_item(self, item): + if item is None: + return None + json_attr = item.get(self.ITEM_JSON_ATTRIBUTE) + return None if json_attr is None else json.loads(json_attr['S']) + + +class _DynamoDBHelpers(object): + @staticmethod + def batch_write_requests(client, table_name, requests): + batch_size = 25 + for batch in (requests[i:i+batch_size] for i in range(0, len(requests), batch_size)): + client.batch_write_item(RequestItems={ table_name: batch }) diff --git a/ldclient/feature_store.py b/ldclient/feature_store.py index 155743ea..fccef5b5 100644 --- a/ldclient/feature_store.py +++ b/ldclient/feature_store.py @@ -1,7 +1,55 @@ -from collections import defaultdict +from collections import OrderedDict, defaultdict from ldclient.util import log from ldclient.interfaces import FeatureStore from ldclient.rwlock import ReadWriteLock +from six import iteritems + + +class CacheConfig: + """Encapsulates caching parameters for feature store implementations that support local caching. + """ + + DEFAULT_EXPIRATION = 15 + DEFAULT_CAPACITY = 1000 + + def __init__(self, + expiration = DEFAULT_EXPIRATION, + capacity = DEFAULT_CAPACITY): + """Constructs an instance of CacheConfig. + :param float expiration: The cache TTL, in seconds. Items will be evicted from the cache after + this amount of time from the time when they were originally cached. If the time is less than or + equal to zero, caching is disabled. + :param int capacity: The maximum number of items that can be in the cache at a time. + """ + self._expiration = expiration + self._capacity = capacity + + @staticmethod + def default(): + """Returns an instance of CacheConfig with default properties. By default, caching is enabled. + This is the same as calling the constructor with no parameters. + :rtype: CacheConfig + """ + return CacheConfig() + + @staticmethod + def disabled(): + """Returns an instance of CacheConfig specifying that caching should be disabled. + :rtype: CacheConfig + """ + return CacheConfig(expiration = 0) + + @property + def enabled(self): + return self._expiration > 0 + + @property + def expiration(self): + return self._expiration + + @property + def capacity(self): + return self._capacity class InMemoryFeatureStore(FeatureStore): @@ -79,3 +127,54 @@ def initialized(self): return self._initialized finally: self._lock.runlock() + + +class _FeatureStoreDataSetSorter: + """ + Implements a dependency graph ordering for data to be stored in a feature store. We must use this + on every data set that will be passed to the feature store's init() method. + """ + @staticmethod + def sort_all_collections(all_data): + """ Returns a copy of the input data that has the following guarantees: the iteration order of the outer + dictionary will be in ascending order by the VersionDataKind's :priority property (if any), and for each + data kind that has a "get_dependency_keys" function, the inner dictionary will have an iteration order + where B is before A if A has a dependency on B. + """ + outer_hash = OrderedDict() + kinds = list(all_data.keys()) + def priority_order(kind): + if hasattr(kind, 'priority'): + return kind.priority + return len(kind.namespace) # use arbitrary order if there's no priority + kinds.sort(key=priority_order) + for kind in kinds: + items = all_data[kind] + outer_hash[kind] = _FeatureStoreDataSetSorter._sort_collection(kind, items) + return outer_hash + + @staticmethod + def _sort_collection(kind, input): + if len(input) == 0 or not hasattr(kind, 'get_dependency_keys'): + return input + dependency_fn = kind.get_dependency_keys + if dependency_fn is None or len(input) == 0: + return input + remaining_items = input.copy() + items_out = OrderedDict() + while len(remaining_items) > 0: + # pick a random item that hasn't been updated yet + for key, item in iteritems(remaining_items): + _FeatureStoreDataSetSorter._add_with_dependencies_first(item, dependency_fn, remaining_items, items_out) + break + return items_out + + @staticmethod + def _add_with_dependencies_first(item, dependency_fn, remaining_items, items_out): + key = item.get('key') + del remaining_items[key] # we won't need to visit this item again + for dep_key in dependency_fn(item): + dep_item = remaining_items.get(dep_key) + if dep_item is not None: + _FeatureStoreDataSetSorter._add_with_dependencies_first(dep_item, dependency_fn, remaining_items, items_out) + items_out[key] = item diff --git a/ldclient/feature_store_helpers.py b/ldclient/feature_store_helpers.py new file mode 100644 index 00000000..2ba83713 --- /dev/null +++ b/ldclient/feature_store_helpers.py @@ -0,0 +1,103 @@ +from expiringdict import ExpiringDict + +from ldclient.interfaces import FeatureStore + + +class CachingStoreWrapper(FeatureStore): + """CachingStoreWrapper is a partial implementation of :class:ldclient.interfaces.FeatureStore that + delegates the basic functionality to an implementation of :class:ldclient.interfaces.FeatureStoreCore - + while adding optional caching behavior and other logic that would otherwise be repeated in every + feature store implementation. This makes it easier to create new database integrations by implementing + only the database-specific logic. + """ + __INITED_CACHE_KEY__ = "$inited" + + def __init__(self, core, cache_config): + self._core = core + if cache_config.enabled: + self._cache = ExpiringDict(max_len=cache_config.capacity, max_age_seconds=cache_config.expiration) + else: + self._cache = None + self._inited = False + + def init(self, all_data): + self._core.init_internal(all_data) + if self._cache is not None: + self._cache.clear() + for kind, items in all_data.items(): + self._cache[self._all_cache_key(kind)] = self._items_if_not_deleted(items) + for key, item in items.items(): + self._cache[self._item_cache_key(kind, key)] = [item] # note array wrapper + self._inited = True + + def get(self, kind, key, callback=lambda x: x): + if self._cache is not None: + cache_key = self._item_cache_key(kind, key) + cached_item = self._cache.get(cache_key) + # note, cached items are wrapped in an array so we can cache None values + if cached_item is not None: + return callback(self._item_if_not_deleted(cached_item[0])) + item = self._core.get_internal(kind, key) + if self._cache is not None: + self._cache[cache_key] = [item] + return callback(self._item_if_not_deleted(item)) + + def all(self, kind, callback=lambda x: x): + if self._cache is not None: + cache_key = self._all_cache_key(kind) + cached_items = self._cache.get(cache_key) + if cached_items is not None: + return callback(cached_items) + items = self._items_if_not_deleted(self._core.get_all_internal(kind)) + if self._cache is not None: + self._cache[cache_key] = items + return callback(items) + + def delete(self, kind, key, version): + deleted_item = { "key": key, "version": version, "deleted": True } + self.upsert(kind, deleted_item) + + def upsert(self, kind, item): + new_state = self._core.upsert_internal(kind, item) + if self._cache is not None: + self._cache[self._item_cache_key(kind, item.get('key'))] = [new_state] + self._cache.pop(self._all_cache_key(kind), None) + + @property + def initialized(self): + if self._inited: + return True + if self._cache is None: + result = bool(self._core.initialized_internal()) + else: + result = self._cache.get(CachingStoreWrapper.__INITED_CACHE_KEY__) + if result is None: + result = bool(self._core.initialized_internal()) + self._cache[CachingStoreWrapper.__INITED_CACHE_KEY__] = result + if result: + self._inited = True + return result + + @staticmethod + def _item_cache_key(kind, key): + return "{0}:{1}".format(kind.namespace, key) + + @staticmethod + def _all_cache_key(kind): + return kind.namespace + + @staticmethod + def _item_if_not_deleted(item): + if item is not None and item.get('deleted', False): + return None + return item + + @staticmethod + def _items_if_not_deleted(items): + results = {} + if items is not None: + for key, item in items.items(): + if not item.get('deleted', False): + results[key] = item + return results + \ No newline at end of file diff --git a/ldclient/integrations.py b/ldclient/integrations.py new file mode 100644 index 00000000..63c01202 --- /dev/null +++ b/ldclient/integrations.py @@ -0,0 +1,71 @@ +from ldclient.feature_store import CacheConfig +from ldclient.feature_store_helpers import CachingStoreWrapper +from ldclient.dynamodb_feature_store import _DynamoDBFeatureStoreCore +from ldclient.redis_feature_store import _RedisFeatureStoreCore + + +class DynamoDB(object): + """Provides factory methods for integrations between the LaunchDarkly SDK and DynamoDB. + """ + + @staticmethod + def new_feature_store(table_name, + prefix=None, + dynamodb_opts={}, + caching=CacheConfig.default()): + """Creates a DynamoDB-backed implementation of `:class:ldclient.feature_store.FeatureStore`. + + To use this method, you must first install the `boto3` package containing the AWS SDK gems. + Then, put the object returned by this method into the `feature_store` property of your + client configuration (:class:ldclient.config.Config). + + Note that the DynamoDB table must already exist; the LaunchDarkly SDK does not create the table + automatically, because it has no way of knowing what additional properties (such as permissions + and throughput) you would want it to have. The table must have a partition key called + "namespace" and a sort key called "key", both with a string type. + + By default, the DynamoDB client will try to get your AWS credentials and region name from + environment variables and/or local configuration files, as described in the AWS SDK documentation. + You may also pass configuration settings in `dynamodb_opts`. + + :param string table_name: The name of an existing DynamoDB table + :param string prefix: An optional namespace prefix to be prepended to all DynamoDB keys + :param dict dynamodb_opts: Optional parameters for configuring the DynamoDB client, as defined in + the boto3 API; see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html#boto3.session.Session.client + :param CacheConfig caching: Specifies whether local caching should be enabled and if so, + sets the cache properties; defaults to `CacheConfig.default()` + """ + core = _DynamoDBFeatureStoreCore(table_name, prefix, dynamodb_opts) + return CachingStoreWrapper(core, caching) + + +class Redis(object): + """Provides factory methods for integrations between the LaunchDarkly SDK and Redis. + """ + DEFAULT_URL = 'redis://localhost:6379/0' + DEFAULT_PREFIX = 'launchdarkly' + DEFAULT_MAX_CONNECTIONS = 16 + + @staticmethod + def new_feature_store(url='redis://localhost:6379/0', + prefix='launchdarkly', + max_connections=16, + caching=CacheConfig.default()): + """Creates a Redis-backed implementation of `:class:ldclient.feature_store.FeatureStore`. + + To use this method, you must first install the `redis` package. Then, put the object + returned by this method into the `feature_store` property of your client configuration + (:class:ldclient.config.Config). + + :param string url: The URL of the Redis host; defaults to `DEFAULT_URL` + :param string prefix: A namespace prefix to be prepended to all Redis keys; defaults to + `DEFAULT_PREFIX` + :param int max_connections: The maximum number of Redis connections to keep in the + connection pool; defaults to `DEFAULT_MAX_CONNECTIONS` + :param CacheConfig caching: Specifies whether local caching should be enabled and if so, + sets the cache properties; defaults to `CacheConfig.default()` + """ + core = _RedisFeatureStoreCore(url, prefix, max_connections) + wrapper = CachingStoreWrapper(core, caching) + wrapper.core = core # exposed for testing + return wrapper diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index 39898408..9556bdfc 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -3,64 +3,86 @@ class FeatureStore(object): """ - Stores and retrieves the state of feature flags and related data + A versioned store for feature flags and related objects received from LaunchDarkly. + Implementations should permit concurrent access and updates. + + An "object", for `FeatureStore`, is simply a dict of arbitrary data which must have at least + three properties: "key" (its unique key), "version" (the version number provided by + LaunchDarkly), and "deleted" (True if this is a placeholder for a deleted object). + + Delete and upsert requests are versioned-- if the version number in the request is less than + the currently stored version of the object, the request should be ignored. + + These semantics support the primary use case for the store, which synchronizes a collection + of objects based on update messages that may be received out-of-order. """ __metaclass__ = ABCMeta @abstractmethod - def get(self, kind, key, callback): + def get(self, kind, key, callback=lambda x: x): """ - Gets a feature and calls the callback with the feature data to return the result - :param kind: Denotes which collection to access - one of the constants in versioned_data_kind + Retrieves the object to which the specified key is mapped, or None if the key is not found + or the associated object has a "deleted" property of True. The retrieved object, if any (a + dict) can be transformed by the specified callback. + + :param kind: The kind of object to get :type kind: VersionedDataKind - :param key: The key of the object + :param key: The key whose associated object is to be returned :type key: str - :param callback: The function that accepts the retrieved data and returns a transformed value - :type callback: Function that processes the retrieved object once received. - :return: The result of executing callback. + :param callback: A function that accepts the retrieved data and returns a transformed value + :type callback: function + :return: The result of executing callback """ @abstractmethod - def all(self, callback): + def all(self, kind, callback=lambda x: x): """ - Returns all feature flags and their data - :param kind: Denotes which collection to access - one of the constants in versioned_data_kind + Retrieves a dictionary of all associated objects of a given kind. The retrieved dict of keys + to objects can be transformed by the specified callback. + + :param kind: The kind of objects to get :type kind: VersionedDataKind - :param callback: The function that accepts the retrieved data and returns a transformed value - :type callback: Function that processes the retrieved objects once received. - :rtype: The result of executing callback. + :param callback: A function that accepts the retrieved data and returns a transformed value + :type callback: function + :rtype: The result of executing callback """ @abstractmethod def init(self, all_data): """ - Initializes the store with a set of objects. Meant to be called by the UpdateProcessor + Initializes (or re-initializes) the store with the specified set of objects. Any existing entries + will be removed. Implementations can assume that this set of objects is up to date-- there is no + need to perform individual version comparisons between the existing objects and the supplied data. - :param all_data: The features and their data as provided by LD + :param all_data: All objects to be stored :type all_data: dict[VersionedDataKind, dict[str, dict]] """ @abstractmethod def delete(self, kind, key, version): """ - Marks an object as deleted + Deletes the object associated with the specified key, if it exists and its version is less than + the specified version. The object should be replaced in the data store by a + placeholder with the specified version and a "deleted" property of TErue. - :param kind: Denotes which collection to access - one of the constants in versioned_data_kind + :param kind: The kind of object to delete :type kind: VersionedDataKind - :param key: The object key + :param key: The key of the object to be deleted :type key: str - :param version: The version of the object to mark as deleted + :param version: The version for the delete operation :type version: int """ @abstractmethod def upsert(self, kind, item): """ - Inserts an object if its version is newer or missing + Updates or inserts the object associated with the specified key. If an item with the same key + already exists, it should update it only if the new item's version property is greater than + the old one. - :param kind: Denotes which collection to access - one of the constants in versioned_data_kind + :param kind: The kind of object to update :type kind: VersionedDataKind - :param item: The object to be inserted or updated - must have key and version properties + :param item: The object to update or insert :type feature: dict """ @@ -73,6 +95,85 @@ def initialized(self): """ +class FeatureStoreCore(object): + """ + `FeatureStoreCore` is an interface for a simplified subset of the functionality of :class:`FeatureStore`, + to be used in conjunction with :class:`feature_store_helpers.CachingStoreWrapper`. This allows developers + developers of custom `FeatureStore` implementations to avoid repeating logic that would + commonly be needed in any such implementation, such as caching. Instead, they can implement + only `FeatureStoreCore` and then create a `CachingStoreWrapper`. + """ + __metaclass__ = ABCMeta + + @abstractmethod + def get_internal(self, kind, key): + """ + Returns the object to which the specified key is mapped, or None if no such item exists. + The method should not attempt to filter out any items based on their deleted property, + nor to cache any items. + + :param kind: The kind of object to get + :type kind: VersionedDataKind + :param key: The key of the object + :type key: str + :return: The object to which the specified key is mapped, or None + :rtype: dict + """ + + @abstractmethod + def get_all_internal(self, callback): + """ + Returns a dictionary of all associated objects of a given kind. The method should not attempt + to filter out any items based on their deleted property, nor to cache any items. + + :param kind: The kind of objects to get + :type kind: VersionedDataKind + :return: A dictionary of keys to items + :rtype: dict[str, dict] + """ + + @abstractmethod + def init_internal(self, all_data): + """ + Initializes (or re-initializes) the store with the specified set of objects. Any existing entries + will be removed. Implementations can assume that this set of objects is up to date-- there is no + need to perform individual version comparisons between the existing objects and the supplied + data. + + :param all_data: A dictionary of data kinds to item collections + :type all_data: dict[VersionedDataKind, dict[str, dict]] + """ + + @abstractmethod + def upsert_internal(self, kind, item): + """ + Updates or inserts the object associated with the specified key. If an item with the same key + already exists, it should update it only if the new item's version property is greater than + the old one. It should return the final state of the item, i.e. if the update succeeded then + it returns the item that was passed in, and if the update failed due to the version check + then it returns the item that is currently in the data store (this ensures that + `CachingStoreWrapper` will update the cache correctly). + + :param kind: The kind of object to update + :type kind: VersionedDataKind + :param item: The object to update or insert + :type item: dict + :return: The state of the object after the update + :rtype: dict + """ + + @abstractmethod + def initialized_internal(self): + """ + Returns true if this store has been initialized. In a shared data store, it should be able to + detect this even if initInternal was called in a different process, i.e. the test should be + based on looking at what is in the data store. The method does not need to worry about caching + this value; `CachingStoreWrapper` will only call it when necessary. + + :rtype: bool + """ + + class BackgroundOperation(object): """ Performs a task in the background diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py index 71b7261b..27139567 100644 --- a/ldclient/redis_feature_store.py +++ b/ldclient/redis_feature_store.py @@ -1,45 +1,75 @@ import json -from pprint import pprint -from expiringdict import ExpiringDict -import redis +have_redis = False +try: + import redis + have_redis = True +except ImportError: + pass from ldclient import log -from ldclient.interfaces import FeatureStore -from ldclient.memoized_value import MemoizedValue +from ldclient.feature_store import CacheConfig +from ldclient.feature_store_helpers import CachingStoreWrapper +from ldclient.interfaces import FeatureStore, FeatureStoreCore from ldclient.versioned_data_kind import FEATURES -class ForgetfulDict(dict): - def __setitem__(self, key, value): - pass - +# Note that this class is now just a facade around CachingStoreWrapper, which is in turn delegating +# to _RedisFeatureStoreCore where the actual database logic is. This class was retained for historical +# reasons, to support existing code that calls the RedisFeatureStore constructor. In the future, we +# will migrate away from exposing these concrete classes and use only the factory methods. class RedisFeatureStore(FeatureStore): + """A Redis-backed implementation of :class:`ldclient.feature_store.FeatureStore`. + + This implementation class is deprecated and may be changed or removed in the future. Please use + :func:`ldclient.integrations.Redis.new_feature_store()`. + """ def __init__(self, url='redis://localhost:6379/0', prefix='launchdarkly', max_connections=16, expiration=15, capacity=1000): + if not have_redis: + raise NotImplementedError("Cannot use Redis feature store because redis package is not installed") + self.core = _RedisFeatureStoreCore(url, prefix, max_connections) # exposed for testing + self._wrapper = CachingStoreWrapper(self.core, CacheConfig(expiration=expiration, capacity=capacity)) + + def get(self, kind, key, callback = lambda x: x): + return self._wrapper.get(kind, key, callback) + + def all(self, kind, callback): + return self._wrapper.all(kind, callback) + + def init(self, all_data): + return self._wrapper.init(all_data) + + def upsert(self, kind, item): + return self._wrapper.upsert(kind, item) + + def delete(self, kind, key, version): + return self._wrapper.delete(kind, key, version) + + @property + def initialized(self): + return self._wrapper.initialized + +class _RedisFeatureStoreCore(FeatureStoreCore): + def __init__(self, url, prefix, max_connections): + self._prefix = prefix - self._cache = ForgetfulDict() if expiration == 0 else ExpiringDict(max_len=capacity, - max_age_seconds=expiration) self._pool = redis.ConnectionPool.from_url(url=url, max_connections=max_connections) - self._inited = MemoizedValue(lambda: self._query_init()) + self.test_update_hook = None # exposed for testing log.info("Started RedisFeatureStore connected to URL: " + url + " using prefix: " + prefix) def _items_key(self, kind): return "{0}:{1}".format(self._prefix, kind.namespace) - def _cache_key(self, kind, key): - return "{0}:{1}".format(kind.namespace, key) - - def init(self, all_data): + def init_internal(self, all_data): pipe = redis.Redis(connection_pool=self._pool).pipeline() - self._cache.clear() all_count = 0 for kind, items in all_data.items(): @@ -48,85 +78,34 @@ def init(self, all_data): for key, item in items.items(): item_json = json.dumps(item) pipe.hset(base_key, key, item_json) - self._cache[self._cache_key(kind, key)] = item all_count = all_count + len(items) - try: - pipe.execute() - except: - self._cache.clear() - raise + pipe.execute() log.info("Initialized RedisFeatureStore with %d items", all_count) - self._inited.set(True) - def all(self, kind, callback): + def get_all_internal(self, kind): r = redis.Redis(connection_pool=self._pool) - try: - all_items = r.hgetall(self._items_key(kind)) - except BaseException as e: - log.error("RedisFeatureStore: Could not retrieve '%s' from Redis with error: %s. Returning None.", - kind.namespace, e) - return callback(None) + all_items = r.hgetall(self._items_key(kind)) if all_items is None or all_items is "": - log.warn("RedisFeatureStore: call to get all '%s' returned no results. Returning None.", kind.namespace) - return callback(None) + all_items = {} results = {} for key, item_json in all_items.items(): key = key.decode('utf-8') # necessary in Python 3 - item = json.loads(item_json.decode('utf-8')) - if item.get('deleted', False) is False: - results[key] = item - return callback(results) - - def get(self, kind, key, callback=lambda x: x): - item = self._get_even_if_deleted(kind, key, check_cache=True) - if item is not None and item.get('deleted', False) is True: - log.debug("RedisFeatureStore: get returned deleted item %s in '%s'. Returning None.", key, kind.namespace) - return callback(None) - return callback(item) - - def _get_even_if_deleted(self, kind, key, check_cache = True): - cacheKey = self._cache_key(kind, key) - if check_cache: - item = self._cache.get(cacheKey) - if item is not None: - # reset ttl - self._cache[cacheKey] = item - return item - - try: - r = redis.Redis(connection_pool=self._pool) - item_json = r.hget(self._items_key(kind), key) - except BaseException as e: - log.error("RedisFeatureStore: Could not retrieve key %s from '%s' with error: %s", - key, kind.namespace, e) - return None + results[key] = json.loads(item_json.decode('utf-8')) + return results + + def get_internal(self, kind, key): + r = redis.Redis(connection_pool=self._pool) + item_json = r.hget(self._items_key(kind), key) if item_json is None or item_json is "": log.debug("RedisFeatureStore: key %s not found in '%s'. Returning None.", key, kind.namespace) return None - item = json.loads(item_json.decode('utf-8')) - self._cache[cacheKey] = item - return item - - def delete(self, kind, key, version): - deleted_item = { "key": key, "version": version, "deleted": True } - self._update_with_versioning(kind, deleted_item) - - def upsert(self, kind, item): - self._update_with_versioning(kind, item) + return json.loads(item_json.decode('utf-8')) - @property - def initialized(self): - return self._inited.get() - - def _query_init(self): - r = redis.Redis(connection_pool=self._pool) - return r.exists(self._items_key(FEATURES)) - - def _update_with_versioning(self, kind, item): + def upsert_internal(self, kind, item): r = redis.Redis(connection_pool=self._pool) base_key = self._items_key(kind) key = item['key'] @@ -135,14 +114,15 @@ def _update_with_versioning(self, kind, item): while True: pipeline = r.pipeline() pipeline.watch(base_key) - old = self._get_even_if_deleted(kind, key, check_cache=False) - self._before_update_transaction(base_key, key) + old = self.get_internal(kind, key) + if self.test_update_hook is not None: + self.test_update_hook(base_key, key) if old and old['version'] >= item['version']: log.debug('RedisFeatureStore: Attempted to %s key: %s version %d with a version that is the same or older: %d in "%s"', 'delete' if item.get('deleted') else 'update', key, old['version'], item['version'], kind.namespace) pipeline.unwatch() - break + return old else: pipeline.multi() pipeline.hset(base_key, key, item_json) @@ -153,8 +133,11 @@ def _update_with_versioning(self, kind, item): except redis.exceptions.WatchError: log.debug("RedisFeatureStore: concurrent modification detected, retrying") continue - self._cache[self._cache_key(kind, key)] = item - break + return item + + def initialized_internal(self): + r = redis.Redis(connection_pool=self._pool) + return r.exists(self._items_key(FEATURES)) def _before_update_transaction(self, base_key, key): # exposed for testing diff --git a/ldclient/versioned_data_kind.py b/ldclient/versioned_data_kind.py index 6df96a32..04acce43 100644 --- a/ldclient/versioned_data_kind.py +++ b/ldclient/versioned_data_kind.py @@ -7,13 +7,22 @@ to add a corresponding constant here and the existing store should be able to handle it. """ +# Note that VersionedDataKind without the extra attributes is no longer used in the SDK, +# but it's preserved here for backward compatibility just in case someone else used it VersionedDataKind = namedtuple('VersionedDataKind', ['namespace', 'request_api_path', 'stream_api_path']) -FEATURES = VersionedDataKind(namespace = "features", +VersionedDataKindWithOrdering = namedtuple('VersionedDataKindWithOrdering', + ['namespace', 'request_api_path', 'stream_api_path', 'priority', 'get_dependency_keys']) + +FEATURES = VersionedDataKindWithOrdering(namespace = "features", request_api_path = "/sdk/latest-flags", - stream_api_path = "/flags/") + stream_api_path = "/flags/", + priority = 1, + get_dependency_keys = lambda flag: (p.get('key') for p in flag.get('prerequisites', []))) -SEGMENTS = VersionedDataKind(namespace = "segments", +SEGMENTS = VersionedDataKindWithOrdering(namespace = "segments", request_api_path = "/sdk/latest-segments", - stream_api_path = "/segments/") + stream_api_path = "/segments/", + priority = 0, + get_dependency_keys = None) diff --git a/test-requirements.txt b/test-requirements.txt index 413ef355..88cbbc2e 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,6 +1,7 @@ mock>=2.0.0 pytest>=2.8 redis>=2.10.5 +boto3>=1.9.71 coverage>=4.4 pytest-capturelog>=0.7 pytest-cov>=2.4.0 diff --git a/testing/stub_util.py b/testing/stub_util.py index bcb45ef2..80e53af6 100644 --- a/testing/stub_util.py +++ b/testing/stub_util.py @@ -1,14 +1,13 @@ from email.utils import formatdate from requests.structures import CaseInsensitiveDict -from ldclient.interfaces import EventProcessor, FeatureRequester, UpdateProcessor +from ldclient.interfaces import EventProcessor, FeatureRequester, FeatureStore, UpdateProcessor class MockEventProcessor(EventProcessor): def __init__(self, *_): self._running = False self._events = [] - mock_event_processor = self def stop(self): self._running = False @@ -103,3 +102,27 @@ def is_alive(self): def initialized(self): return True + +class CapturingFeatureStore(FeatureStore): + def init(self, all_data): + self.data = all_data + + def get(self, kind, key, callback=lambda x: x): + pass + + def all(self, kind, callback=lambda x: x): + pass + + def delete(self, kind, key, version): + pass + + def upsert(self, kind, item): + pass + + @property + def initialized(self): + return True + + @property + def received_data(self): + return self.data diff --git a/testing/test_feature_store.py b/testing/test_feature_store.py index 245341ec..8ab8c422 100644 --- a/testing/test_feature_store.py +++ b/testing/test_feature_store.py @@ -1,43 +1,139 @@ +import boto3 import json -from mock import patch import pytest import redis +import time -from ldclient.feature_store import InMemoryFeatureStore +from ldclient.dynamodb_feature_store import _DynamoDBFeatureStoreCore, _DynamoDBHelpers +from ldclient.feature_store import CacheConfig, InMemoryFeatureStore +from ldclient.integrations import DynamoDB, Redis from ldclient.redis_feature_store import RedisFeatureStore from ldclient.versioned_data_kind import FEATURES -def get_log_lines(caplog): - loglines = caplog.records - if callable(loglines): - # records() is a function in older versions of the caplog plugin - loglines = loglines() - return loglines +class InMemoryTester(object): + def init_store(self): + return InMemoryFeatureStore() -class TestFeatureStore: +class RedisTester(object): redis_host = 'localhost' redis_port = 6379 - def in_memory(self): - return InMemoryFeatureStore() + def __init__(self, cache_config): + self._cache_config = cache_config + + def init_store(self): + self._clear_data() + return Redis.new_feature_store(caching=self._cache_config) - def redis_with_local_cache(self): + def _clear_data(self): r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0) r.delete("launchdarkly:features") - return RedisFeatureStore() - def redis_no_local_cache(self): - r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0) - r.delete("launchdarkly:features") - return RedisFeatureStore(expiration=0) - params = [in_memory, redis_with_local_cache, redis_no_local_cache] +class RedisWithDeprecatedConstructorTester(RedisTester): + def init_store(self): + self._clear_data() + return RedisFeatureStore(expiration=(30 if self._cache_config.enabled else 0)) + + +class DynamoDBTester(object): + table_name = 'LD_DYNAMODB_TEST_TABLE' + table_created = False + options = { + 'aws_access_key_id': 'key', # not used by local DynamoDB, but still required + 'aws_secret_access_key': 'secret', + 'endpoint_url': 'http://localhost:8000', + 'region_name': 'us-east-1' + } + + def __init__(self, cache_config): + self._cache_config = cache_config + + def init_store(self): + self._create_table() + self._clear_data() + return DynamoDB.new_feature_store(self.table_name, dynamodb_opts=self.options) + + def _create_table(self): + if self.table_created: + return + client = boto3.client('dynamodb', **self.options) + try: + client.describe_table(TableName=self.table_name) + self.table_created = True + return + except client.exceptions.ResourceNotFoundException: + pass + req = { + 'TableName': self.table_name, + 'KeySchema': [ + { + 'AttributeName': _DynamoDBFeatureStoreCore.PARTITION_KEY, + 'KeyType': 'HASH', + }, + { + 'AttributeName': _DynamoDBFeatureStoreCore.SORT_KEY, + 'KeyType': 'RANGE' + } + ], + 'AttributeDefinitions': [ + { + 'AttributeName': _DynamoDBFeatureStoreCore.PARTITION_KEY, + 'AttributeType': 'S' + }, + { + 'AttributeName': _DynamoDBFeatureStoreCore.SORT_KEY, + 'AttributeType': 'S' + } + ], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 1, + 'WriteCapacityUnits': 1 + } + } + client.create_table(**req) + while True: + try: + client.describe_table(TableName=self.table_name) + self.table_created = True + return + except client.exceptions.ResourceNotFoundException: + time.sleep(0.5) + + def _clear_data(self): + client = boto3.client('dynamodb', **self.options) + delete_requests = [] + req = { + 'TableName': self.table_name, + 'ConsistentRead': True, + 'ProjectionExpression': '#namespace, #key', + 'ExpressionAttributeNames': { + '#namespace': _DynamoDBFeatureStoreCore.PARTITION_KEY, + '#key': _DynamoDBFeatureStoreCore.SORT_KEY + } + } + for resp in client.get_paginator('scan').paginate(**req): + for item in resp['Items']: + delete_requests.append({ 'DeleteRequest': { 'Key': item } }) + _DynamoDBHelpers.batch_write_requests(client, self.table_name, delete_requests) + + +class TestFeatureStore: + params = [ + InMemoryTester(), + RedisTester(CacheConfig.default()), + RedisTester(CacheConfig.disabled()), + RedisWithDeprecatedConstructorTester(CacheConfig.default()), + RedisWithDeprecatedConstructorTester(CacheConfig.disabled()), + DynamoDBTester(CacheConfig.default()), + DynamoDBTester(CacheConfig.disabled()) + ] @pytest.fixture(params=params) def store(self, request): - return request.param(self) + return request.param.init_store() @staticmethod def make_feature(key, ver): @@ -69,6 +165,9 @@ def base_initialized_store(self, store): }) return store + def test_not_initialized_before_init(self, store): + assert store.initialized is False + def test_initialized(self, store): store = self.base_initialized_store(store) assert store.initialized is True @@ -133,8 +232,7 @@ def test_upsert_older_version_after_delete(self, store): class TestRedisFeatureStoreExtraTests: - @patch.object(RedisFeatureStore, '_before_update_transaction') - def test_upsert_race_condition_against_external_client_with_higher_version(self, mock_method): + def test_upsert_race_condition_against_external_client_with_higher_version(self): other_client = redis.StrictRedis(host='localhost', port=6379, db=0) store = RedisFeatureStore() store.init({ FEATURES: {} }) @@ -144,7 +242,7 @@ def hook(base_key, key): if other_version['version'] <= 4: other_client.hset(base_key, key, json.dumps(other_version)) other_version['version'] = other_version['version'] + 1 - mock_method.side_effect = hook + store.core.test_update_hook = hook feature = { u'key': 'flagkey', u'version': 1 } @@ -152,8 +250,7 @@ def hook(base_key, key): result = store.get(FEATURES, 'flagkey', lambda x: x) assert result['version'] == 2 - @patch.object(RedisFeatureStore, '_before_update_transaction') - def test_upsert_race_condition_against_external_client_with_lower_version(self, mock_method): + def test_upsert_race_condition_against_external_client_with_lower_version(self): other_client = redis.StrictRedis(host='localhost', port=6379, db=0) store = RedisFeatureStore() store.init({ FEATURES: {} }) @@ -163,32 +260,10 @@ def hook(base_key, key): if other_version['version'] <= 4: other_client.hset(base_key, key, json.dumps(other_version)) other_version['version'] = other_version['version'] + 1 - mock_method.side_effect = hook + store.core.test_update_hook = hook feature = { u'key': 'flagkey', u'version': 5 } store.upsert(FEATURES, feature) result = store.get(FEATURES, 'flagkey', lambda x: x) assert result['version'] == 5 - - def test_exception_is_handled_in_get(self, caplog): - # This just verifies the fix for a bug that caused an error during exception handling in Python 3 - store = RedisFeatureStore(url='redis://bad') - feature = store.get(FEATURES, 'flagkey') - assert feature is None - loglines = get_log_lines(caplog) - assert len(loglines) == 2 - message = loglines[1].message - assert message.startswith("RedisFeatureStore: Could not retrieve key flagkey from 'features' with error:") - assert "connecting to bad:6379" in message - - def test_exception_is_handled_in_all(self, caplog): - # This just verifies the fix for a bug that caused an error during exception handling in Python 3 - store = RedisFeatureStore(url='redis://bad') - all = store.all(FEATURES, lambda x: x) - assert all is None - loglines = get_log_lines(caplog) - assert len(loglines) == 2 - message = loglines[1].message - assert message.startswith("RedisFeatureStore: Could not retrieve 'features' from Redis") - assert "connecting to bad:6379" in message diff --git a/testing/test_feature_store_helpers.py b/testing/test_feature_store_helpers.py new file mode 100644 index 00000000..77ccb6f8 --- /dev/null +++ b/testing/test_feature_store_helpers.py @@ -0,0 +1,332 @@ +import pytest +from time import sleep + +from ldclient.feature_store import CacheConfig +from ldclient.feature_store_helpers import CachingStoreWrapper +from ldclient.versioned_data_kind import VersionedDataKind + +THINGS = VersionedDataKind(namespace = "things", request_api_path = "", stream_api_path = "") +WRONG_THINGS = VersionedDataKind(namespace = "wrong", request_api_path = "", stream_api_path = "") + +def make_wrapper(core, cached): + return CachingStoreWrapper(core, CacheConfig(expiration=30) if cached else CacheConfig.disabled()) + +class MockCore: + def __init__(self): + self.data = {} + self.inited = False + self.inited_query_count = 0 + self.error = None + + def init_internal(self, all_data): + self._maybe_throw() + self.data = {} + for kind, items in all_data.items(): + self.data[kind] = items.copy() + + def get_internal(self, kind, key): + self._maybe_throw() + items = self.data.get(kind) + return None if items is None else items.get(key) + + def get_all_internal(self, kind): + self._maybe_throw() + return self.data.get(kind) + + def upsert_internal(self, kind, item): + self._maybe_throw() + key = item.get('key') + items = self.data.get(kind) + if items is None: + items = {} + self.data[kind] = items + old_item = items.get(key) + if old_item is None or old_item.get('version') < item.get('version'): + items[key] = item + return item + return old_item + + def initialized_internal(self): + self._maybe_throw() + self.inited_query_count = self.inited_query_count + 1 + return self.inited + + def _maybe_throw(self): + if self.error is not None: + raise self.error + + def force_set(self, kind, item): + items = self.data.get(kind) + if items is None: + items = {} + self.data[kind] = items + items[item.get('key')] = item + + def force_remove(self, kind, key): + items = self.data.get(kind) + if items is not None: + items.pop(key, None) + +class CustomError(Exception): + pass + +class TestCachingStoreWrapper: + @pytest.mark.parametrize("cached", [False, True]) + def test_get_item(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + key = "flag" + itemv1 = { "key": key, "version": 1 } + itemv2 = { "key": key, "version": 2 } + + core.force_set(THINGS, itemv1) + assert wrapper.get(THINGS, key) == itemv1 + + core.force_set(THINGS, itemv2) + assert wrapper.get(THINGS, key) == (itemv1 if cached else itemv2) # if cached, we will not see the new underlying value yet + + @pytest.mark.parametrize("cached", [False, True]) + def test_get_deleted_item(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + key = "flag" + itemv1 = { "key": key, "version": 1, "deleted": True } + itemv2 = { "key": key, "version": 2 } + + core.force_set(THINGS, itemv1) + assert wrapper.get(THINGS, key) is None # item is filtered out because deleted is true + + core.force_set(THINGS, itemv2) + assert wrapper.get(THINGS, key) == (None if cached else itemv2) # if cached, we will not see the new underlying value yet + + @pytest.mark.parametrize("cached", [False, True]) + def test_get_missing_item(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + key = "flag" + item = { "key": key, "version": 1 } + + assert wrapper.get(THINGS, key) is None + + core.force_set(THINGS, item) + assert wrapper.get(THINGS, key) == (None if cached else item) # the cache can retain a nil result + + @pytest.mark.parametrize("cached", [False, True]) + def test_get_with_lambda(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + key = "flag" + item = { "key": key, "version": 1 } + modified_item = { "key": key, "version": 99 } + + core.force_set(THINGS, item) + assert wrapper.get(THINGS, key, lambda x: modified_item) == modified_item + + def test_cached_get_uses_values_from_init(self): + core = MockCore() + wrapper = make_wrapper(core, True) + item1 = { "key": "flag1", "version": 1 } + item2 = { "key": "flag2", "version": 1 } + + wrapper.init({ THINGS: { item1["key"]: item1, item2["key"]: item2 } }) + core.force_remove(THINGS, item1["key"]) + assert wrapper.get(THINGS, item1["key"]) == item1 + + @pytest.mark.parametrize("cached", [False, True]) + def test_get_can_throw_exception(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + core.error = CustomError() + with pytest.raises(CustomError, message="expected exception"): + wrapper.get(THINGS, "key", lambda x: x) + + @pytest.mark.parametrize("cached", [False, True]) + def test_get_all(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + item1 = { "key": "flag1", "version": 1 } + item2 = { "key": "flag2", "version": 1 } + + core.force_set(THINGS, item1) + core.force_set(THINGS, item2) + assert wrapper.all(THINGS) == { item1["key"]: item1, item2["key"]: item2 } + + core.force_remove(THINGS, item2["key"]) + if cached: + assert wrapper.all(THINGS) == { item1["key"]: item1, item2["key"]: item2 } + else: + assert wrapper.all(THINGS) == { item1["key"]: item1 } + + @pytest.mark.parametrize("cached", [False, True]) + def test_get_all_removes_deleted_items(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + item1 = { "key": "flag1", "version": 1 } + item2 = { "key": "flag2", "version": 1, "deleted": True } + + core.force_set(THINGS, item1) + core.force_set(THINGS, item2) + assert wrapper.all(THINGS) == { item1["key"]: item1 } + + @pytest.mark.parametrize("cached", [False, True]) + def test_get_all_changes_None_to_empty_dict(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + + assert wrapper.all(WRONG_THINGS) == {} + + @pytest.mark.parametrize("cached", [False, True]) + def test_get_all_iwith_lambda(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + extra = { "extra": True } + item1 = { "key": "flag1", "version": 1 } + item2 = { "key": "flag2", "version": 1 } + core.force_set(THINGS, item1) + core.force_set(THINGS, item2) + assert wrapper.all(THINGS, lambda x: dict(x, **extra)) == { + item1["key"]: item1, item2["key"]: item2, "extra": True + } + + def test_cached_get_all_uses_values_from_init(self): + core = MockCore() + wrapper = make_wrapper(core, True) + item1 = { "key": "flag1", "version": 1 } + item2 = { "key": "flag2", "version": 1 } + both = { item1["key"]: item1, item2["key"]: item2 } + + wrapper.init({ THINGS: both }) + core.force_remove(THINGS, item1["key"]) + assert wrapper.all(THINGS) == both + + @pytest.mark.parametrize("cached", [False, True]) + def test_get_all_can_throw_exception(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + core.error = CustomError() + with pytest.raises(CustomError, message="expected exception"): + wrapper.all(THINGS) + + @pytest.mark.parametrize("cached", [False, True]) + def test_upsert_successful(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + key = "flag" + itemv1 = { "key": key, "version": 1 } + itemv2 = { "key": key, "version": 2 } + + wrapper.upsert(THINGS, itemv1) + assert core.data[THINGS][key] == itemv1 + + wrapper.upsert(THINGS, itemv2) + assert core.data[THINGS][key] == itemv2 + + # if we have a cache, verify that the new item is now cached by writing a different value + # to the underlying data - Get should still return the cached item + if cached: + itemv3 = { "key": key, "version": 3 } + core.force_set(THINGS, itemv3) + + assert wrapper.get(THINGS, key) == itemv2 + + def test_cached_upsert_unsuccessful(self): + # This is for an upsert where the data in the store has a higher version. In an uncached + # store, this is just a no-op as far as the wrapper is concerned so there's nothing to + # test here. In a cached store, we need to verify that the cache has been refreshed + # using the data that was found in the store. + core = MockCore() + wrapper = make_wrapper(core, True) + key = "flag" + itemv1 = { "key": key, "version": 1 } + itemv2 = { "key": key, "version": 2 } + + wrapper.upsert(THINGS, itemv2) + assert core.data[THINGS][key] == itemv2 + + wrapper.upsert(THINGS, itemv1) + assert core.data[THINGS][key] == itemv2 # value in store remains the same + + itemv3 = { "key": key, "version": 3 } + core.force_set(THINGS, itemv3) # bypasses cache so we can verify that itemv2 is in the cache + assert wrapper.get(THINGS, key) == itemv2 + + @pytest.mark.parametrize("cached", [False, True]) + def test_upsert_can_throw_exception(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + core.error = CustomError() + with pytest.raises(CustomError, message="expected exception"): + wrapper.upsert(THINGS, { "key": "x", "version": 1 }) + + @pytest.mark.parametrize("cached", [False, True]) + def test_delete(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + key = "flag" + itemv1 = { "key": key, "version": 1 } + itemv2 = { "key": key, "version": 2, "deleted": True } + itemv3 = { "key": key, "version": 3 } + + core.force_set(THINGS, itemv1) + assert wrapper.get(THINGS, key) == itemv1 + + wrapper.delete(THINGS, key, 2) + assert core.data[THINGS][key] == itemv2 + + core.force_set(THINGS, itemv3) # make a change that bypasses the cache + assert wrapper.get(THINGS, key) == (None if cached else itemv3) + + @pytest.mark.parametrize("cached", [False, True]) + def test_delete_can_throw_exception(self, cached): + core = MockCore() + wrapper = make_wrapper(core, cached) + core.error = CustomError() + with pytest.raises(CustomError, message="expected exception"): + wrapper.delete(THINGS, "x", 1) + + def test_uncached_initialized_queries_state_only_until_inited(self): + core = MockCore() + wrapper = make_wrapper(core, False) + + assert wrapper.initialized is False + assert core.inited_query_count == 1 + + core.inited = True + assert wrapper.initialized is True + assert core.inited_query_count == 2 + + core.inited = False + assert wrapper.initialized is True + assert core.inited_query_count == 2 + + def test_uncached_initialized_does_not_query_state_if_init_was_called(self): + core = MockCore() + wrapper = make_wrapper(core, False) + + assert wrapper.initialized is False + assert core.inited_query_count == 1 + + wrapper.init({}) + + assert wrapper.initialized is True + assert core.inited_query_count == 1 + + def test_cached_initialized_can_cache_false_result(self): + core = MockCore() + wrapper = CachingStoreWrapper(core, CacheConfig(expiration=0.2)) # use a shorter cache TTL for this test + + assert wrapper.initialized is False + assert core.inited_query_count == 1 + + core.inited = True + assert wrapper.initialized is False + assert core.inited_query_count == 1 + + sleep(0.5) + + assert wrapper.initialized is True + assert core.inited_query_count == 2 + + # From this point on it should remain true and the method should not be called + assert wrapper.initialized is True + assert core.inited_query_count == 2 diff --git a/testing/test_ldclient.py b/testing/test_ldclient.py index 1766386b..a31d2324 100644 --- a/testing/test_ldclient.py +++ b/testing/test_ldclient.py @@ -2,10 +2,10 @@ from ldclient.client import LDClient, Config from ldclient.event_processor import NullEventProcessor from ldclient.feature_store import InMemoryFeatureStore -from ldclient.interfaces import FeatureRequester, FeatureStore, UpdateProcessor -from ldclient.versioned_data_kind import FEATURES +from ldclient.interfaces import UpdateProcessor +from ldclient.versioned_data_kind import FEATURES, SEGMENTS import pytest -from testing.stub_util import MockEventProcessor, MockUpdateProcessor +from testing.stub_util import CapturingFeatureStore, MockEventProcessor, MockUpdateProcessor from testing.sync_util import wait_until try: @@ -259,3 +259,58 @@ def test_event_for_existing_feature_with_no_user_key(): def test_secure_mode_hash(): user = {'key': 'Message'} assert offline_client.secure_mode_hash(user) == "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597" + + +dependency_ordering_test_data = { + FEATURES: { + "a": { "key": "a", "prerequisites": [ { "key": "b" }, { "key": "c" } ] }, + "b": { "key": "b", "prerequisites": [ { "key": "c" }, { "key": "e" } ] }, + "c": { "key": "c" }, + "d": { "key": "d" }, + "e": { "key": "e" }, + "f": { "key": "f" } + }, + SEGMENTS: { + "o": { "key": "o" } + } +} + +class DependencyOrderingDataUpdateProcessor(UpdateProcessor): + def __init__(self, config, store, ready): + store.init(dependency_ordering_test_data) + ready.set() + + def start(self): + pass + + def initialized(self): + return True + + +def test_store_data_set_ordering(): + store = CapturingFeatureStore() + config = Config(sdk_key = 'SDK_KEY', send_events=False, feature_store=store, + update_processor_class=DependencyOrderingDataUpdateProcessor) + LDClient(config=config) + + data = store.received_data + assert data is not None + assert len(data) == 2 + keys = list(data.keys()) + values = list(data.values()) + + assert keys[0] == SEGMENTS + assert len(values[0]) == len(dependency_ordering_test_data[SEGMENTS]) + + assert keys[1] == FEATURES + flags_map = values[1] + flags_list = list(flags_map.values()) + assert len(flags_list) == len(dependency_ordering_test_data[FEATURES]) + for item_index, item in enumerate(flags_list): + for prereq in item.get("prerequisites", []): + prereq_item = flags_map[prereq["key"]] + prereq_index = flags_list.index(prereq_item) + if prereq_index > item_index: + all_keys = (f["key"] for f in flags_list) + raise Exception("%s depends on %s, but %s was listed first; keys in order are [%s]" % + (item["key"], prereq["key"], item["key"], ", ".join(all_keys))) diff --git a/testing/test_ldclient_evaluation.py b/testing/test_ldclient_evaluation.py index 46c48756..be925a5c 100644 --- a/testing/test_ldclient_evaluation.py +++ b/testing/test_ldclient_evaluation.py @@ -4,6 +4,7 @@ from ldclient.client import LDClient, Config from ldclient.feature_store import InMemoryFeatureStore from ldclient.flag import EvaluationDetail +from ldclient.interfaces import FeatureStore from ldclient.versioned_data_kind import FEATURES from testing.stub_util import MockEventProcessor, MockUpdateProcessor from testing.test_ldclient import make_off_flag_with_value @@ -28,6 +29,26 @@ 'debugEventsUntilDate': 1000 } +class ErroringFeatureStore(FeatureStore): + def get(self, kind, key, callback=lambda x: x): + raise NotImplementedError() + + def all(self, kind, callback=lambda x: x): + raise NotImplementedError() + + def upsert(self, kind, item): + pass + + def delete(self, key, version): + pass + + def init(self, data): + pass + + @property + def initialized(self): + return True + def make_client(store): return LDClient(config=Config(sdk_key='SDK_KEY', base_uri='http://test', @@ -35,6 +56,14 @@ def make_client(store): update_processor_class=MockUpdateProcessor, feature_store=store)) +def get_log_lines(caplog, level): + loglines = caplog.records + if callable(loglines): + # records() is a function in older versions of the caplog plugin + loglines = loglines() + return [line.message for line in loglines if line.levelname == level] + + def test_variation_for_existing_feature(): feature = make_off_flag_with_value('feature.key', 'value') store = InMemoryFeatureStore() @@ -116,6 +145,23 @@ def test_variation_detail_for_flag_that_evaluates_to_none(): assert expected == actual assert actual.is_default_value() == True +def test_variation_when_feature_store_throws_error(caplog): + store = ErroringFeatureStore() + client = make_client(store) + assert client.variation('feature.key', { "key": "user" }, default='default') == 'default' + errlog = get_log_lines(caplog, 'ERROR') + assert errlog == [ 'Unexpected error while retrieving feature flag "feature.key": NotImplementedError()' ] + +def test_variation_detail_when_feature_store_throws_error(caplog): + store = ErroringFeatureStore() + client = make_client(store) + expected = EvaluationDetail('default', None, {'kind': 'ERROR', 'errorKind': 'EXCEPTION'}) + actual = client.variation_detail('feature.key', { "key": "user" }, default='default') + assert expected == actual + assert actual.is_default_value() == True + errlog = get_log_lines(caplog, 'ERROR') + assert errlog == [ 'Unexpected error while retrieving feature flag "feature.key": NotImplementedError()' ] + def test_all_flags_returns_values(): store = InMemoryFeatureStore() store.init({ FEATURES: { 'key1': flag1, 'key2': flag2 } }) @@ -137,6 +183,13 @@ def test_all_flags_returns_none_if_user_has_no_key(): result = client.all_flags({ }) assert result is None +def test_all_flags_returns_none_if_feature_store_throws_error(caplog): + store = ErroringFeatureStore() + client = make_client(store) + assert client.all_flags({ "key": "user" }) is None + errlog = get_log_lines(caplog, 'ERROR') + assert errlog == [ 'Unable to read flags for all_flag_state: NotImplementedError()' ] + def test_all_flags_state_returns_state(): store = InMemoryFeatureStore() store.init({ FEATURES: { 'key1': flag1, 'key2': flag2 } }) @@ -297,3 +350,11 @@ def test_all_flags_state_returns_empty_state_if_user_has_no_key(): client = make_client(store) state = client.all_flags_state({ }) assert state.valid == False + +def test_all_flags_returns_empty_state_if_feature_store_throws_error(caplog): + store = ErroringFeatureStore() + client = make_client(store) + state = client.all_flags_state({ "key": "user" }) + assert state.valid == False + errlog = get_log_lines(caplog, 'ERROR') + assert errlog == [ 'Unable to read flags for all_flag_state: NotImplementedError()' ]