diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 437ea54d..b564861d 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -18,6 +18,6 @@ Development information (for developing this module itself) pip install -r test-requirements.txt pip install -r twisted-requirements.txt -1. Run tests: +1. Run tests: You'll need redis running locally on its default port of 6379. $ py.test testing diff --git a/README.md b/README.md index 75b51ead..1e36ff20 100644 --- a/README.md +++ b/README.md @@ -13,9 +13,14 @@ Quick setup pip install ldclient-py -2. Create a new LDClient with your API key: +2. Configure the library with your api key: - client = LDClient("your_api_key") + import ldclient + ldclient.api_key = "your api key" + +3. Get the client: + + client = ldclient.get() Your first feature flag ----------------------- diff --git a/circle.yml b/circle.yml index 393d32cf..7250c793 100644 --- a/circle.yml +++ b/circle.yml @@ -1,3 +1,6 @@ +machine: + services: + - redis dependencies: pre: - pyenv shell 2.7.10; $(pyenv which pip) install --upgrade pip setuptools diff --git a/demo/demo.py b/demo/demo.py index 4b57bd35..9cf4e3fc 100644 --- a/demo/demo.py +++ b/demo/demo.py @@ -1,7 +1,25 @@ from __future__ import print_function -from ldclient import LDClient + +import logging +import sys + +import ldclient + +root = logging.getLogger() +root.setLevel(logging.DEBUG) + +ch = logging.StreamHandler(sys.stdout) +ch.setLevel(logging.DEBUG) +formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') +ch.setFormatter(formatter) +root.addHandler(ch) if __name__ == '__main__': - apiKey = 'feefifofum' - client = LDClient(apiKey) - print(client.api_key) + ldclient._api_key = 'api_key' + ldclient.start_wait = 10 + client = ldclient.get() + + user = {u'key': 'userKey'} + print(client.toggle("update-app", user, False)) + + client.close() diff --git a/ldclient/__init__.py b/ldclient/__init__.py index 97ebd5ec..0d80a640 100644 --- a/ldclient/__init__.py +++ b/ldclient/__init__.py @@ -1,7 +1,9 @@ -from .client import * +import logging + +from ldclient.rwlock import ReadWriteLock from ldclient.version import VERSION +from .client import * from .util import log -import logging __version__ = VERSION @@ -11,6 +13,34 @@ "firstName", "lastName", "avatar", "name", "anonymous"] +"""Settings.""" +client = None +api_key = None +start_wait = 5 +config = Config() + +_lock = ReadWriteLock() + + +def get(): + try: + _lock.rlock() + if client: + return client + finally: + _lock.runlock() + + try: + global client + _lock.lock() + if not client: + log.debug("Initializing LaunchDarkly Client") + client = LDClient(api_key, config, start_wait) + return client + finally: + _lock.unlock() + + # Add a NullHandler for Python < 2.7 compatibility class NullHandler(logging.Handler): diff --git a/ldclient/client.py b/ldclient/client.py index 63daefe5..60a0ecd6 100644 --- a/ldclient/client.py +++ b/ldclient/client.py @@ -1,12 +1,18 @@ from __future__ import division, with_statement, absolute_import -from builtins import object + +import threading import time -from ldclient.interfaces import FeatureStore -from ldclient.requests import RequestsStreamProcessor, RequestsEventConsumer, RequestsFeatureRequester -from ldclient.util import check_uwsgi, _evaluate, log import requests +from builtins import object +from ldclient.event_consumer import EventConsumerImpl +from ldclient.feature_requester import FeatureRequesterImpl +from ldclient.feature_store import InMemoryFeatureStore +from ldclient.interfaces import FeatureStore +from ldclient.polling import PollingUpdateProcessor +from ldclient.streaming import StreamingUpdateProcessor +from ldclient.util import check_uwsgi, _evaluate, log # noinspection PyBroadException try: @@ -18,58 +24,66 @@ from cachecontrol import CacheControl from threading import Lock -from ldclient.rwlock import ReadWriteLock +GET_LATEST_FEATURES_PATH = '/api/eval/latest-features' +STREAM_FEATURES_PATH = '/features' class Config(object): - def __init__(self, base_uri='https://app.launchdarkly.com', events_uri='https://events.launchdarkly.com', connect_timeout=2, read_timeout=10, - upload_limit=100, - capacity=10000, + events_upload_max_batch_size=100, + events_max_pending=10000, stream_uri='https://stream.launchdarkly.com', stream=True, - verify=True, + verify_ssl=True, defaults=None, - events=True, - stream_processor_class=None, - feature_store_class=None, + events_enabled=True, + update_processor_class=None, + poll_interval=1, + use_ldd=False, + feature_store=InMemoryFeatureStore(), feature_requester_class=None, - consumer_class=None): + event_consumer_class=None, + offline=False): """ - :param stream_processor_class: A factory for a StreamProcessor implementation taking the api key, config, + :param update_processor_class: A factory for an UpdateProcessor implementation taking the api key, config, and FeatureStore implementation - :type stream_processor_class: (str, Config, FeatureStore) -> StreamProcessor - :param feature_store_class: A factory for a FeatureStore implementation - :type feature_store_class: () -> FeatureStore + :type update_processor_class: (str, Config, FeatureStore) -> UpdateProcessor + :param feature_store: A FeatureStore implementation + :type feature_store: FeatureStore :param feature_requester_class: A factory for a FeatureRequester implementation taking the api key and config - :type feature_requester_class: (str, Config) -> FeatureRequester - :param consumer_class: A factory for an EventConsumer implementation taking the event queue, api key, and config - :type consumer_class: (queue.Queue, str, Config) -> EventConsumer + :type feature_requester_class: (str, Config, FeatureStore) -> FeatureRequester + :param event_consumer_class: A factory for an EventConsumer implementation taking the event queue, api key, and config + :type event_consumer_class: (queue.Queue, str, Config) -> EventConsumer """ if defaults is None: defaults = {} self.base_uri = base_uri.rstrip('\\') - self.events_uri = events_uri.rstrip('\\') - self.stream_uri = stream_uri.rstrip('\\') + self.get_latest_features_uri = self.base_uri + GET_LATEST_FEATURES_PATH + self.events_uri = events_uri.rstrip('\\') + '/bulk' + self.stream_uri = stream_uri.rstrip('\\') + STREAM_FEATURES_PATH + self.update_processor_class = update_processor_class self.stream = stream - self.stream_processor_class = RequestsStreamProcessor if not stream_processor_class else stream_processor_class - self.feature_store_class = InMemoryFeatureStore if not feature_store_class else feature_store_class - self.consumer_class = RequestsEventConsumer if not consumer_class else consumer_class - self.feature_requester_class = RequestsFeatureRequester if not feature_requester_class else \ - feature_requester_class - self.connect = connect_timeout - self.read = read_timeout - self.upload_limit = upload_limit - self.capacity = capacity - self.verify = verify + if poll_interval < 1: + poll_interval = 1 + self.poll_interval = poll_interval + self.use_ldd = use_ldd + self.feature_store = InMemoryFeatureStore() if not feature_store else feature_store + self.event_consumer_class = EventConsumerImpl if not event_consumer_class else event_consumer_class + self.feature_requester_class = feature_requester_class + self.connect_timeout = connect_timeout + self.read_timeout = read_timeout + self.events_enabled = events_enabled + self.events_upload_max_batch_size = events_upload_max_batch_size + self.events_max_pending = events_max_pending + self.verify_ssl = verify_ssl self.defaults = defaults - self.events = events + self.offline = offline def get_default(self, key, default): return default if key not in self.defaults else self.defaults[key] @@ -79,117 +93,81 @@ def default(cls): return cls() -class InMemoryFeatureStore(FeatureStore): - - def __init__(self): - self._lock = ReadWriteLock() - self._initialized = False - self._features = {} - - def get(self, key): - try: - self._lock.rlock() - f = self._features.get(key) - if f is None or 'deleted' in f and f['deleted']: - return None - return f - finally: - self._lock.runlock() - - def all(self): - try: - self._lock.rlock() - return dict((k, f) for k, f in self._features.items() if ('deleted' not in f) or not f['deleted']) - finally: - self._lock.runlock() - - def init(self, features): - try: - self._lock.lock() - self._features = dict(features) - self._initialized = True - finally: - self._lock.unlock() - - # noinspection PyShadowingNames - def delete(self, key, version): - try: - self._lock.lock() - f = self._features.get(key) - if f is not None and f['version'] < version: - f['deleted'] = True - f['version'] = version - elif f is None: - f = {'deleted': True, 'version': version} - self._features[key] = f - finally: - self._lock.unlock() - - def upsert(self, key, feature): - try: - self._lock.lock() - f = self._features.get(key) - if f is None or f['version'] < feature['version']: - self._features[key] = feature - log.debug("Updated feature {} to version {}".format(key, feature['version'])) - finally: - self._lock.unlock() - - @property - def initialized(self): - try: - self._lock.rlock() - return self._initialized - finally: - self._lock.runlock() - - class LDClient(object): - - def __init__(self, api_key, config=None): + def __init__(self, api_key, config=None, start_wait=5): check_uwsgi() self._api_key = api_key self._config = config or Config.default() self._session = CacheControl(requests.Session()) - self._queue = queue.Queue(self._config.capacity) - self._consumer = None - self._offline = False + self._queue = queue.Queue(self._config.events_max_pending) + self._event_consumer = None self._lock = Lock() - self._store = self._config.feature_store_class() + self._store = self._config.feature_store """ :type: FeatureStore """ - self._feature_requester = self._config.feature_requester_class( - api_key, self._config) + if self._config.offline: + log.info("Started LaunchDarkly Client in offline mode") + return + + if self._config.events_enabled: + self._event_consumer = self._config.event_consumer_class( + self._queue, self._api_key, self._config) + self._event_consumer.start() + + if self._config.use_ldd: + if self._store.__class__ == "RedisFeatureStore": + log.info("Started LaunchDarkly Client in LDD mode") + return + log.error("LDD mode requires a RedisFeatureStore.") + return + + if self._config.feature_requester_class: + self._feature_requester = self._config.feature_requester_class( + api_key, self._config) + else: + self._feature_requester = FeatureRequesterImpl(api_key, self._config) """ :type: FeatureRequester """ - self._stream_processor = None - if self._config.stream: - self._stream_processor = self._config.stream_processor_class( - api_key, self._config, self._store) - self._stream_processor.start() + update_processor_ready = threading.Event() + + if self._config.update_processor_class: + self._update_processor = self._config.update_processor_class( + api_key, self._config, self._feature_requester, self._store, update_processor_ready) + else: + if self._config.stream: + self._update_processor = StreamingUpdateProcessor( + api_key, self._config, self._feature_requester, self._store, update_processor_ready) + else: + self._update_processor = PollingUpdateProcessor( + api_key, self._config, self._feature_requester, self._store, update_processor_ready) + """ :type: UpdateProcessor """ + + self._update_processor.start() + log.info("Waiting up to " + str(start_wait) + " seconds for LaunchDarkly client to initialize...") + update_processor_ready.wait(start_wait) + + if self._update_processor.initialized: + log.info("Started LaunchDarkly Client: OK") + else: + log.info("Initialization timeout exceeded for LaunchDarkly Client. Feature Flags may not yet be available.") @property def api_key(self): return self._api_key - def _check_consumer(self): - with self._lock: - if not self._consumer or not self._consumer.is_alive(): - self._consumer = self._config.consumer_class( - self._queue, self._api_key, self._config) - self._consumer.start() - - def _stop_consumers(self): - if self._consumer and self._consumer.is_alive(): - self._consumer.stop() - if self._stream_processor and self._stream_processor.is_alive(): - self._stream_processor.stop() - - def _send(self, event): - if self._offline or not self._config.events: + def close(self): + log.info("Closing LaunchDarkly client..") + if self.is_offline(): + return + if self._event_consumer and self._event_consumer.is_alive(): + self._event_consumer.stop() + if self._update_processor and self._update_processor.is_alive(): + self._update_processor.stop() + + def _send_event(self, event): + if self._config.offline or not self._config.events_enabled: return - self._check_consumer() event['creationDate'] = int(time.time() * 1000) if self._queue.full(): log.warning("Event queue is full-- dropped an event") @@ -198,64 +176,61 @@ def _send(self, event): def track(self, event_name, user, data=None): self._sanitize_user(user) - self._send({'kind': 'custom', 'key': event_name, + self._send_event({'kind': 'custom', 'key': event_name, 'user': user, 'data': data}) def identify(self, user): self._sanitize_user(user) - self._send({'kind': 'identify', 'key': user['key'], 'user': user}) - - def set_offline(self): - self._offline = True - self._stop_consumers() - - def set_online(self): - self._offline = False - self._check_consumer() + self._send_event({'kind': 'identify', 'key': user['key'], 'user': user}) def is_offline(self): - return self._offline + return self._config.offline def flush(self): - if self._offline: + if self._config.offline or not self._config.events_enabled: return - self._check_consumer() - return self._consumer.flush() + return self._event_consumer.flush() def get_flag(self, key, user, default=False): return self.toggle(key, user, default) def toggle(self, key, user, default=False): - self._sanitize_user(user) default = self._config.get_default(key, default) - if self._offline: + def send_event(value): + self._send_event({'kind': 'feature', 'key': key, + 'user': user, 'value': value, 'default': default}) + + if self._config.offline: return default - def cb(feature): - if feature is None: - val = default - else: - val = _evaluate(feature, user) - if val is None: - val = default - self._send({'kind': 'feature', 'key': key, - 'user': user, 'value': val, 'default': default}) - return val - - if self._config.stream and self._store.initialized: - return cb(self._store.get(key)) + self._sanitize_user(user) + + if 'key' in user and user['key']: + feature = self._store.get(key) + else: + send_event(default) + log.warning("Missing or empty User key when evaluating Feature Flag key: " + key + ". Returning default.") + return default + + if feature: + val = _evaluate(feature, user) else: - # noinspection PyBroadException - try: - return self._feature_requester.get(key, cb) - except Exception: - log.exception( - 'Unhandled exception. Returning default value for flag.') - return cb(None) + log.warning("Feature Flag key: " + key + " not found in Feature Store. Returning default.") + send_event(default) + return default + + if val is None: + send_event(default) + log.warning("Feature Flag key: " + key + " evaluation returned None. Returning default.") + return default + + send_event(val) + return val def _sanitize_user(self, user): if 'key' in user: user['key'] = str(user['key']) + __all__ = ['LDClient', 'Config'] diff --git a/ldclient/event_consumer.py b/ldclient/event_consumer.py new file mode 100644 index 00000000..98e94dc9 --- /dev/null +++ b/ldclient/event_consumer.py @@ -0,0 +1,102 @@ +from __future__ import absolute_import + +import errno +import json +from threading import Thread + +import requests +from requests.packages.urllib3.exceptions import ProtocolError + +from ldclient.interfaces import EventConsumer +from ldclient.util import _headers +from ldclient.util import log + + +class EventConsumerImpl(Thread, EventConsumer): + def __init__(self, event_queue, api_key, config): + Thread.__init__(self) + self._session = requests.Session() + self.daemon = True + self._api_key = api_key + self._config = config + self._queue = event_queue + self._running = True + + def run(self): + log.debug("Starting event consumer") + self._running = True + while self._running: + self.send() + + def stop(self): + self._running = False + + def flush(self): + self._queue.join() + + def send_batch(self, events): + def do_send(should_retry): + # noinspection PyBroadException + try: + if isinstance(events, dict): + body = [events] + else: + body = events + hdrs = _headers(self._api_key) + uri = self._config.events_uri + r = self._session.post(uri, + headers=hdrs, + timeout=(self._config.connect_timeout, self._config.read_timeout), + data=json.dumps(body)) + r.raise_for_status() + except ProtocolError as e: + inner = e.args[1] + if inner.errno == errno.ECONNRESET and should_retry: + log.warning( + 'ProtocolError exception caught while sending events. Retrying.') + do_send(False) + else: + log.exception( + 'Unhandled exception in event consumer. Analytics events were not processed.') + except: + log.exception( + 'Unhandled exception in event consumer. Analytics events were not processed.') + + try: + do_send(True) + finally: + for _ in events: + self._queue.task_done() + + def send(self): + events = self.next() + + if len(events) == 0: + return + else: + self.send_batch(events) + + def next(self): + q = self._queue + items = [] + + item = self.next_item() + if item is None: + return items + + items.append(item) + while len(items) < self._config.events_upload_max_batch_size and not q.empty(): + item = self.next_item() + if item: + items.append(item) + + return items + + def next_item(self): + q = self._queue + # noinspection PyBroadException + try: + item = q.get(block=True, timeout=5) + return item + except Exception: + return None diff --git a/ldclient/feature_requester.py b/ldclient/feature_requester.py new file mode 100644 index 00000000..1c72c34a --- /dev/null +++ b/ldclient/feature_requester.py @@ -0,0 +1,34 @@ +from __future__ import absolute_import + +import requests +from cachecontrol import CacheControl + +from ldclient.interfaces import FeatureRequester +from ldclient.util import _headers + + +class FeatureRequesterImpl(FeatureRequester): + def __init__(self, api_key, config): + self._api_key = api_key + self._session = CacheControl(requests.Session()) + self._config = config + + def get_all(self): + hdrs = _headers(self._api_key) + uri = self._config.get_latest_features_uri + r = self._session.get(uri, headers=hdrs, timeout=( + self._config.connect_timeout, self._config.read_timeout)) + r.raise_for_status() + features = r.json() + return features + + def get_one(self, key): + hdrs = _headers(self._api_key) + uri = self._config.get_latest_features_uri + '/' + key + r = self._session.get(uri, + headers=hdrs, + timeout=(self._config.connect_timeout, + self._config.read_timeout)) + r.raise_for_status() + feature = r.json() + return feature diff --git a/ldclient/feature_store.py b/ldclient/feature_store.py new file mode 100644 index 00000000..f24335d2 --- /dev/null +++ b/ldclient/feature_store.py @@ -0,0 +1,73 @@ +from ldclient.util import log +from ldclient.interfaces import FeatureStore +from ldclient.rwlock import ReadWriteLock + + +class InMemoryFeatureStore(FeatureStore): + + def __init__(self): + self._lock = ReadWriteLock() + self._initialized = False + self._features = {} + + def get(self, key): + try: + self._lock.rlock() + f = self._features.get(key) + if f is None: + log.debug("Attempted to get missing feature: " + str(key) + " Returning None") + return None + if 'deleted' in f and f['deleted']: + log.debug("Attempted to get deleted feature: " + str(key) + " Returning None") + return None + return f + finally: + self._lock.runlock() + + def all(self): + try: + self._lock.rlock() + return dict((k, f) for k, f in self._features.items() if ('deleted' not in f) or not f['deleted']) + finally: + self._lock.runlock() + + def init(self, features): + try: + self._lock.lock() + self._features = dict(features) + self._initialized = True + log.debug("Initialized feature store with " + str(len(features)) + " features") + finally: + self._lock.unlock() + + # noinspection PyShadowingNames + def delete(self, key, version): + try: + self._lock.lock() + f = self._features.get(key) + if f is not None and f['version'] < version: + f['deleted'] = True + f['version'] = version + elif f is None: + f = {'deleted': True, 'version': version} + self._features[key] = f + finally: + self._lock.unlock() + + def upsert(self, key, feature): + try: + self._lock.lock() + f = self._features.get(key) + if f is None or f['version'] < feature['version']: + self._features[key] = feature + log.debug("Updated feature {} to version {}".format(key, feature['version'])) + finally: + self._lock.unlock() + + @property + def initialized(self): + try: + self._lock.rlock() + return self._initialized + finally: + self._lock.runlock() diff --git a/ldclient/interfaces.py b/ldclient/interfaces.py index 5e919d1f..d6504503 100644 --- a/ldclient/interfaces.py +++ b/ldclient/interfaces.py @@ -29,7 +29,7 @@ def all(self): @abstractmethod def init(self, features): """ - Initializes the store with a set of feature flags. Meant to be called by the optional StreamProcessor + Initializes the store with a set of feature flags. Meant to be called by the UpdateProcessor :param features: The features and their data as provided by LD :type features: dict[str, dict] @@ -93,12 +93,18 @@ def is_alive(self): return True -class StreamProcessor(BackgroundOperation): +class UpdateProcessor(BackgroundOperation): """ - Populates a store from an external data source + Responsible for retrieving Feature Flag updates from LaunchDarkly and saving them to the feature store """ __metaclass__ = ABCMeta + def initialized(self): + """ + Returns whether the update processor has received feature flags and has initialized its feature store. + :rtype: bool + """ + class EventConsumer(BackgroundOperation): """ @@ -115,18 +121,19 @@ def flush(self): class FeatureRequester(object): """ - Requests features if they aren't in the store + Requests features. """ __metaclass__ = ABCMeta - @abstractmethod - def get(self, key, callback): + def get_all(self): """ - Gets a feature and calls the callback with the feature data to return the result + Gets all feature flags. + """ + pass - :param key: The feature key - :type key: str - :param callback: The function that accepts the feature data and returns the feature value - :type callback: function - :return: The feature value. None if not found + def get_one(self, key): """ + Gets one Feature flag + :return: + """ + pass diff --git a/ldclient/noop.py b/ldclient/noop.py deleted file mode 100644 index 4b497088..00000000 --- a/ldclient/noop.py +++ /dev/null @@ -1,10 +0,0 @@ -from ldclient.interfaces import FeatureRequester - - -class NoOpFeatureRequester(FeatureRequester): - - def __init__(self, *_): - pass - - def get(self, key, callback): - return None diff --git a/ldclient/polling.py b/ldclient/polling.py new file mode 100644 index 00000000..dace8724 --- /dev/null +++ b/ldclient/polling.py @@ -0,0 +1,38 @@ +from threading import Thread + +from ldclient.interfaces import UpdateProcessor +from ldclient.util import log +import time + + +class PollingUpdateProcessor(Thread, UpdateProcessor): + def __init__(self, api_key, config, requester, store, ready): + Thread.__init__(self) + self.daemon = True + self._api_key = api_key + self._config = config + self._requester = requester + self._store = store + self._running = False + self._ready = ready + + def run(self): + if not self._running: + log.info("Starting PollingUpdateProcessor with request interval: " + str(self._config.poll_interval)) + self._running = True + while self._running: + start_time = time.time() + self._store.init(self._requester.get_all()) + if not self._ready.is_set() and self._store.initialized: + log.info("PollingUpdateProcessor initialized ok") + self._ready.set() + elapsed = time.time() - start_time + if elapsed < self._config.poll_interval: + time.sleep(self._config.poll_interval - elapsed) + + def initialized(self): + return self._running and self._ready.is_set() and self._store.initialized + + def stop(self): + log.info("Stopping PollingUpdateProcessor") + self._running = False diff --git a/ldclient/redis_feature_store.py b/ldclient/redis_feature_store.py new file mode 100644 index 00000000..ddd615ed --- /dev/null +++ b/ldclient/redis_feature_store.py @@ -0,0 +1,114 @@ +import json + +import redis + +from ldclient.expiringdict import ExpiringDict +from ldclient.interfaces import FeatureStore + +INIT_KEY = "$initialized$" + + +class ForgetfulDict(dict): + def __setitem__(self, key, value): + pass + + +class RedisFeatureStore(FeatureStore): + def __init__(self, + url='redis://localhost:6379/0', + prefix='launchdarkly', + max_connections=16, + expiration=15, + capacity=1000): + + self._features_key = "{}:features".format(prefix) + self._cache = ForgetfulDict() if expiration == 0 else ExpiringDict(max_len=capacity, + max_age_seconds=expiration) + self._pool = redis.ConnectionPool.from_url(url=url, max_connections=max_connections) + + def init(self, features): + pipe = redis.Redis(connection_pool=self._pool).pipeline() + pipe.delete(self._features_key) + + self._cache.clear() + + for k, f in features.items(): + f_json = json.dumps(f) + pipe.hset(self._features_key, k, f_json) + self._cache[k] = f + pipe.execute() + + def all(self): + r = redis.Redis(connection_pool=self._pool) + all_features = r.hgetall(self._features_key) + results = {} + for f_json in all_features: + f = json.loads(f_json.decode('utf-8')) + if 'deleted' in f and f['deleted'] is False: + results[f['key']] = f + return results + + def get(self, key): + f = self._cache.get(key) + if f: + # reset ttl + self._cache[key] = f + if 'deleted' in f and f['deleted']: + return None + return f + + r = redis.Redis(connection_pool=self._pool) + f_json = r.hget(self._features_key, key) + if f_json: + f = json.loads(f_json.decode('utf-8')) + if f: + if 'deleted' in f and f['deleted']: + return None + self._cache[key] = f + return f + + return None + + def delete(self, key, version): + r = redis.Redis(connection_pool=self._pool) + r.watch(self._features_key) + f_json = r.hget(self._features_key, key) + if f_json: + f = json.loads(f_json.decode('utf-8')) + if f is not None and f['version'] < version: + f['deleted'] = True + f['version'] = version + elif f is None: + f = {'deleted': True, 'version': version} + f_json = json.dumps(f) + r.hset(self._features_key, key, f_json) + self._cache[key] = f + r.unwatch() + + @property + def initialized(self): + initialized = self._cache.get(INIT_KEY) + if initialized: + # reset ttl + self._cache[INIT_KEY] = True + return True + + r = redis.Redis(connection_pool=self._pool) + if r.exists(self._features_key): + self._cache[INIT_KEY] = True + return True + return False + + def upsert(self, key, feature): + r = redis.Redis(connection_pool=self._pool) + r.watch(self._features_key) + old = self.get(key) + if old: + if old['version'] >= feature['version']: + r.unwatch() + return + + feature_json = json.dumps(feature) + r.hset(self._features_key, key, feature_json) + self._cache[key] = feature + r.unwatch() diff --git a/ldclient/redis_requester.py b/ldclient/redis_requester.py deleted file mode 100644 index 74a2a352..00000000 --- a/ldclient/redis_requester.py +++ /dev/null @@ -1,57 +0,0 @@ -import json -from ldclient.expiringdict import ExpiringDict -from ldclient.interfaces import FeatureRequester -import redis - - -# noinspection PyUnusedLocal -def create_redis_ldd_requester(api_key, config, **kwargs): - return RedisLDDRequester(config, **kwargs) - - -class ForgetfulDict(dict): - - def __setitem__(self, key, value): - pass - - -class RedisLDDRequester(FeatureRequester): - """ - Requests features from redis, usually stored via the LaunchDarkly Daemon (LDD). Recommended to be combined - with the ExpiringInMemoryFeatureStore - """ - - def __init__(self, config, - expiration=15, - redis_host='localhost', - redis_port=6379, - redis_prefix='launchdarkly'): - """ - :type config: Config - """ - self._redis_host = redis_host - self._redis_port = redis_port - self._features_key = "{}:features".format(redis_prefix) - self._cache = ForgetfulDict() if expiration == 0 else ExpiringDict(max_len=config.capacity, - max_age_seconds=expiration) - self._pool = None - - def _get_connection(self): - if self._pool is None: - self._pool = redis.ConnectionPool( - host=self._redis_host, port=self._redis_port) - return redis.Redis(connection_pool=self._pool) - - def get(self, key, callback): - cached = self._cache.get(key) - if cached is not None: - return callback(cached) - else: - rd = self._get_connection() - raw = rd.hget(self._features_key, key) - if raw: - val = json.loads(raw.decode('utf-8')) - else: - val = None - self._cache[key] = val - return callback(val) diff --git a/ldclient/requests.py b/ldclient/requests.py deleted file mode 100644 index ee56296f..00000000 --- a/ldclient/requests.py +++ /dev/null @@ -1,186 +0,0 @@ -from __future__ import absolute_import -import errno -import json -from threading import Thread -from cachecontrol import CacheControl -from ldclient.util import log -from ldclient.interfaces import FeatureRequester, StreamProcessor, EventConsumer -from ldclient.util import _headers, _stream_headers -import requests -from requests.packages.urllib3.exceptions import ProtocolError -from sseclient import SSEClient - - -class RequestsFeatureRequester(FeatureRequester): - - def __init__(self, api_key, config): - self._api_key = api_key - self._session = CacheControl(requests.Session()) - self._config = config - - def get(self, key, callback): - # return callback(do_toggle(key)) - - def do_toggle(should_retry): - # noinspection PyBroadException,PyUnresolvedReferences - try: - val = self._toggle(key) - return val - except ProtocolError as e: - inner = e.args[1] - if inner.errno == errno.ECONNRESET and should_retry: - log.warning( - 'ProtocolError exception caught while getting flag. Retrying.') - return do_toggle(False) - else: - log.exception( - 'Unhandled exception. Returning default value for flag.') - return None - except Exception: - log.exception( - 'Unhandled exception. Returning default value for flag.') - return None - - return callback(do_toggle(True)) - - def _toggle(self, key): - hdrs = _headers(self._api_key) - uri = self._config.base_uri + '/api/eval/features/' + key - r = self._session.get(uri, headers=hdrs, timeout=( - self._config.connect, self._config.read)) - r.raise_for_status() - feature = r.json() - return feature - - -class RequestsStreamProcessor(Thread, StreamProcessor): - - def __init__(self, api_key, config, store): - Thread.__init__(self) - self.daemon = True - self._api_key = api_key - self._config = config - self._store = store - self._running = False - - def run(self): - log.debug("Starting stream processor") - self._running = True - hdrs = _stream_headers(self._api_key) - uri = self._config.stream_uri + "/features" - messages = SSEClient(uri, verify=self._config.verify, headers=hdrs) - for msg in messages: - if not self._running: - break - self.process_message(self._store, msg) - - def stop(self): - self._running = False - - @staticmethod - def process_message(store, msg): - payload = json.loads(msg.data) - log.debug("Recieved stream event {}".format(msg.event)) - if msg.event == 'put': - store.init(payload) - elif msg.event == 'patch': - key = payload['path'][1:] - feature = payload['data'] - log.debug("Updating feature {}".format(key)) - store.upsert(key, feature) - elif msg.event == 'delete': - key = payload['path'][1:] - # noinspection PyShadowingNames - version = payload['version'] - store.delete(key, version) - else: - log.warning('Unhandled event in stream processor: ' + msg.event) - - -class RequestsEventConsumer(Thread, EventConsumer): - - def __init__(self, event_queue, api_key, config): - Thread.__init__(self) - self._session = requests.Session() - self.daemon = True - self._api_key = api_key - self._config = config - self._queue = event_queue - self._running = False - - def run(self): - log.debug("Starting event consumer") - self._running = True - while self._running: - self.send() - - def stop(self): - self._running = False - - def flush(self): - self._queue.join() - - def send_batch(self, events): - def do_send(should_retry): - # noinspection PyBroadException - try: - if isinstance(events, dict): - body = [events] - else: - body = events - hdrs = _headers(self._api_key) - uri = self._config.events_uri + '/bulk' - r = self._session.post(uri, headers=hdrs, timeout=(self._config.connect, self._config.read), - data=json.dumps(body)) - r.raise_for_status() - except ProtocolError as e: - inner = e.args[1] - if inner.errno == errno.ECONNRESET and should_retry: - log.warning( - 'ProtocolError exception caught while sending events. Retrying.') - do_send(False) - else: - log.exception( - 'Unhandled exception in event consumer. Analytics events were not processed.') - except: - log.exception( - 'Unhandled exception in event consumer. Analytics events were not processed.') - - try: - do_send(True) - finally: - for _ in events: - self._queue.task_done() - - def send(self): - events = self.next() - - if len(events) == 0: - return - else: - self.send_batch(events) - - def next(self): - q = self._queue - items = [] - - item = self.next_item() - if item is None: - return items - - items.append(item) - while len(items) < self._config.upload_limit and not q.empty(): - item = self.next_item() - if item: - items.append(item) - - return items - - def next_item(self): - q = self._queue - # noinspection PyBroadException - try: - item = q.get(block=True, timeout=5) - return item - except Exception: - return None diff --git a/ldclient/streaming.py b/ldclient/streaming.py new file mode 100644 index 00000000..f7e66632 --- /dev/null +++ b/ldclient/streaming.py @@ -0,0 +1,68 @@ +import json +from threading import Thread + +from sseclient import SSEClient + +from ldclient.interfaces import UpdateProcessor +from ldclient.util import _stream_headers, log + + +class StreamingUpdateProcessor(Thread, UpdateProcessor): + + def __init__(self, api_key, config, requester, store, ready): + Thread.__init__(self) + self.daemon = True + self._api_key = api_key + self._config = config + self._requester = requester + self._store = store + self._running = False + self._ready = ready + + def run(self): + log.info("Starting StreamingUpdateProcessor connecting to uri: " + self._config.stream_uri) + self._running = True + hdrs = _stream_headers(self._api_key) + uri = self._config.stream_uri + messages = SSEClient(uri, verify=self._config.verify_ssl, headers=hdrs) + for msg in messages: + if not self._running: + break + self.process_message(self._store, self._requester, msg, self._ready) + + def stop(self): + log.info("Stopping StreamingUpdateProcessor") + self._running = False + + def initialized(self): + return self._running and self._ready.is_set() and self._store.initialized + + @staticmethod + def process_message(store, requester, msg, ready): + payload = json.loads(msg.data) + log.debug("Received stream event {}".format(msg.event)) + if msg.event == 'put': + store.init(payload) + if not ready.is_set() and store.initialized: + ready.set() + log.info("StreamingUpdateProcessor initialized ok") + elif msg.event == 'patch': + key = payload['path'][1:] + feature = payload['data'] + log.debug("Updating feature {}".format(key)) + store.upsert(key, feature) + elif msg.event == "indirect/patch": + key = payload['data'] + store.upsert(key, requester.get_one(key)) + elif msg.event == "indirect/put": + store.init(requester.get_all()) + if not ready.is_set() and store.initialized: + ready.set() + log.info("StreamingUpdateProcessor initialized ok") + elif msg.event == 'delete': + key = payload['path'][1:] + # noinspection PyShadowingNames + version = payload['version'] + store.delete(key, version) + else: + log.warning('Unhandled event in stream processor: ' + msg.event) \ No newline at end of file diff --git a/ldclient/twisted_impls.py b/ldclient/twisted_impls.py index c1835c43..acf299d2 100644 --- a/ldclient/twisted_impls.py +++ b/ldclient/twisted_impls.py @@ -6,8 +6,8 @@ from cachecontrol import CacheControl from ldclient.client import Config, LDClient -from ldclient.interfaces import FeatureRequester, StreamProcessor, EventConsumer -from ldclient.requests import RequestsStreamProcessor +from ldclient.interfaces import FeatureRequester, EventConsumer, UpdateProcessor +from ldclient.streaming import StreamingUpdateProcessor from ldclient.twisted_sse import TwistedSSEClient from ldclient.util import _headers, _stream_headers, log from requests.packages.urllib3.exceptions import ProtocolError @@ -22,40 +22,33 @@ def __init__(self, api_key, config): self._session = CacheControl(txrequests.Session()) self._config = config - def get(self, key, callback): - d = self.toggle(key) - d.addBoth(callback) - return d - - def toggle(self, key): + def get_all(self): @defer.inlineCallbacks def run(should_retry): # noinspection PyBroadException try: - val = yield self._toggle(key) + val = yield self._get_all(self) defer.returnValue(val) except ProtocolError as e: inner = e.args[1] if inner.errno == errno.ECONNRESET and should_retry: log.warning( - 'ProtocolError exception caught while getting flag. Retrying.') + 'ProtocolError exception caught while getting flags. Retrying.') d = yield run(False) defer.returnValue(d) else: - log.exception( - 'Unhandled exception. Returning default value for flag.') + log.exception('Unhandled exception.') defer.returnValue(None) except Exception: - log.exception( - 'Unhandled exception. Returning default value for flag.') + log.exception('Unhandled exception.') defer.returnValue(None) return run(True) @defer.inlineCallbacks - def _toggle(self, key): + def _get_all(self): hdrs = _headers(self._api_key) - uri = self._config.base_uri + '/api/eval/features/' + key + uri = self._config.get_latest_features_uri r = yield self._session.get(uri, headers=hdrs, timeout=(self._config.connect, self._config.read)) r.raise_for_status() feature = r.json() @@ -65,20 +58,27 @@ def _toggle(self, key): class TwistedConfig(Config): def __init__(self, *args, **kwargs): - self.stream_processor_class = TwistedStreamProcessor - self.consumer_class = TwistedEventConsumer + self.update_processor_class = TwistedStreamProcessor + self.event_consumer_class = TwistedEventConsumer self.feature_requester_class = TwistedHttpFeatureRequester super(TwistedConfig, self).__init__(*args, **kwargs) -class TwistedStreamProcessor(StreamProcessor): +class TwistedStreamProcessor(UpdateProcessor): + def close(self): + self.sse_client.stop() - def __init__(self, api_key, config, store): + def __init__(self, api_key, config, store, requester, ready): self._store = store - self.sse_client = TwistedSSEClient(config.stream_uri + "/", headers=_stream_headers(api_key, - "PythonTwistedClient"), - verify=config.verify, - on_event=partial(RequestsStreamProcessor.process_message, self._store)) + self._requester = requester + self._ready = ready + self.sse_client = TwistedSSEClient(config.stream_uri, + headers=_stream_headers(api_key, "PythonTwistedClient"), + verify_ssl=config.verify_ssl, + on_event=partial(StreamingUpdateProcessor.process_message, + self._store, + self._requester, + self._ready)) self.running = False def start(self): @@ -88,14 +88,11 @@ def start(self): def stop(self): self.sse_client.stop() - def get_feature(self, key): - return self._store.get(key) - def initialized(self): - return self._store.initialized() + return self._ready.is_set() and self._store.initialized() def is_alive(self): - return self.running + return self.running and self._store.initialized() class TwistedEventConsumer(EventConsumer): @@ -149,8 +146,9 @@ def do_send(should_retry): else: body = events hdrs = _headers(self._api_key) - uri = self._config.events_uri + '/bulk' - r = yield self._session.post(uri, headers=hdrs, timeout=(self._config.connect, self._config.read), + r = yield self._session.post(self._config.events_uri, + headers=hdrs, + timeout=(self._config.connect, self._config.read), data=json.dumps(body)) r.raise_for_status() except ProtocolError as e: diff --git a/ldclient/twisted_redis.py b/ldclient/twisted_redis.py deleted file mode 100644 index c4558a59..00000000 --- a/ldclient/twisted_redis.py +++ /dev/null @@ -1,58 +0,0 @@ -import json -from ldclient.interfaces import StreamProcessor -from twisted.internet import task, defer, protocol, reactor -from txredis.client import RedisClient - - -# noinspection PyUnusedLocal -def create_redis_ldd_processor(api_key, config, store, **kwargs): - return TwistedRedisLDDStreamProcessor(store, **kwargs) - - -class TwistedRedisLDDStreamProcessor(StreamProcessor): - - def __init__(self, store, update_delay=15, redis_host='localhost', - redis_port=6379, - redis_prefix='launchdarkly'): - self._running = False - - if update_delay == 0: - update_delay = .5 - self._update_delay = update_delay - - self._store = store - """ :type: ldclient.interfaces.FeatureStore """ - - self._features_key = "{}:features".format(redis_prefix) - self._redis_host = redis_host - self._redis_port = redis_port - self._looping_call = None - - def start(self): - self._running = True - self._looping_call = task.LoopingCall(self._refresh) - self._looping_call.start(self._update_delay) - - def stop(self): - self._looping_call.stop() - - def is_alive(self): - return self._looping_call is not None and self._looping_call.running - - def _get_connection(self): - client_creator = protocol.ClientCreator(reactor, RedisClient) - return client_creator.connectTCP(self._redis_host, self._redis_port) - - @defer.inlineCallbacks - def _refresh(self): - redis = yield self._get_connection() - """ :type: RedisClient """ - result = yield redis.hgetall(self._features_key) - if result: - data = {} - for key, value in result.items(): - if value: - data[key] = json.loads(value.decode('utf-8')) - self._store.init(data) - else: - self._store.init({}) diff --git a/ldclient/twisted_sse.py b/ldclient/twisted_sse.py index 745d7f20..b78c98ef 100644 --- a/ldclient/twisted_sse.py +++ b/ldclient/twisted_sse.py @@ -17,9 +17,9 @@ def getContext(self, *_): class TwistedSSEClient(object): - def __init__(self, url, headers, verify, on_event): - self.url = url + "/features" - self.verify = verify + def __init__(self, url, headers, verify_ssl, on_event): + self.url = url + self.verify_ssl = verify_ssl self.headers = headers self.on_event = on_event self.on_error_retry = 30 @@ -53,7 +53,7 @@ def connect(self, last_id=None): headers = dict([(x, [y.encode('utf-8')]) for x, y in headers.items()]) url = self.url.encode('utf-8') from twisted.internet import reactor - if self.verify: + if self.verify_ssl: agent = Agent(reactor) else: agent = Agent(reactor, NoValidationContextFactory()) diff --git a/ldclient/util.py b/ldclient/util.py index 20e4c0af..d67a1f82 100644 --- a/ldclient/util.py +++ b/ldclient/util.py @@ -100,11 +100,8 @@ def check_uwsgi(): import uwsgi if not uwsgi.opt.get('enable-threads'): - log.warning('The LaunchDarkly client requires the enable-threads option ' - 'be passed to uWSGI. If enable-threads is not provided, no ' - 'threads will run and event data will not be sent to LaunchDarkly. ' - 'To learn more, see ' - 'http://docs.launchdarkly.com/v1.0/docs/python-sdk-reference#configuring-uwsgi') + log.error('The LaunchDarkly client requires the enable-threads option be passed to uWSGI. ' + 'To learn more, see http://docs.launchdarkly.com/v1.0/docs/python-sdk-reference#configuring-uwsgi') def _evaluate(feature, user): diff --git a/ldclient/version.py b/ldclient/version.py index c95652e6..3277f64c 100644 --- a/ldclient/version.py +++ b/ldclient/version.py @@ -1 +1 @@ -VERSION = "0.20.3" +VERSION = "1.0.0" diff --git a/ldd/test_ldd.py b/ldd/test_ldd.py index 46bb9e44..e661d88d 100644 --- a/ldd/test_ldd.py +++ b/ldd/test_ldd.py @@ -1,10 +1,12 @@ from functools import partial import sys + +from ldclient.redis_feature_store import RedisFeatureStore + sys.path.append("..") sys.path.append("../testing") from ldclient.util import Event -from ldclient.redis_requester import create_redis_ldd_requester import logging from ldclient.client import Config, LDClient import pytest @@ -27,8 +29,9 @@ def fin(): def test_sse_init(stream): stream.queue.put(Event(event="put", data=feature("foo", "jim"))) - client = LDClient("apikey", Config(feature_requester_class=partial(create_redis_ldd_requester, expiration=0), - events=False)) + client = LDClient("apikey", Config(use_ldd=True, + feature_store=RedisFeatureStore(), + events_enabled=False)) wait_until(lambda: client.toggle( "foo", user('xyz'), "blah") == "jim", timeout=10) diff --git a/ldd/test_ldd_twisted.py b/ldd/test_ldd_twisted.py index 00253338..cb33a139 100644 --- a/ldd/test_ldd_twisted.py +++ b/ldd/test_ldd_twisted.py @@ -29,7 +29,7 @@ def fin(): @pytest.inlineCallbacks def test_sse_init(stream): stream.queue.put(Event(event="put", data=feature("foo", "jim"))) - client = LDClient("apikey", TwistedConfig(stream=True, stream_processor_class=create_redis_ldd_processor, + client = LDClient("apikey", TwistedConfig(stream=True, update_processor_class=create_redis_ldd_processor, feature_requester_class=NoOpFeatureRequester, events=False)) yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "jim")) diff --git a/pytest.ini b/pytest.ini index df0d38d0..b86adf8e 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,2 +1,3 @@ [pytest] +# enables pytest-twisted twisted = 1 \ No newline at end of file diff --git a/redis-requirements.txt b/redis-requirements.txt index dc4f9bfd..e3fc618b 100644 --- a/redis-requirements.txt +++ b/redis-requirements.txt @@ -1 +1 @@ -redis>=2.10 +redis>=2.10.5 \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5295651d..4cdeaa9a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ CacheControl>=0.10.2 -requests>=2.4.0 -future>=0.14.3 -sseclient>=0.0.9 \ No newline at end of file +requests>=2.10.0 +sseclient>=0.0.12 +future>=0.15.2 \ No newline at end of file diff --git a/setup.py b/setup.py index 0f700076..49b2f794 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ def run(self): setup( name='ldclient-py', - version='0.20.3', + version='1.0.0', author='Catamorphic Co.', author_email='team@catamorphic.com', packages=['ldclient'], diff --git a/test-requirements.txt b/test-requirements.txt index 2b820b06..1e455c0c 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,2 +1,4 @@ -pytest==2.7.2 +pytest>=2.8 pytest-twisted==1.5 +pytest-timeout>=1.0 +redis>=2.10.5 diff --git a/testing/server_util.py b/testing/server_util.py index 980a7759..b2d3e629 100644 --- a/testing/server_util.py +++ b/testing/server_util.py @@ -91,14 +91,14 @@ def do_nothing(handler): self.post_paths["/bulk"] = do_nothing return q - def add_feature(self, key, data): + def add_feature(self, data): def handle(handler): handler.send_response(200) handler.send_header('Content-type', 'application/json') handler.end_headers() handler.wfile.write(json.dumps(data).encode('utf-8')) - self.get("/api/eval/features/{}".format(key), handle) + self.get("/api/eval/latest-features", handle) def get(self, path, func): """ @@ -150,7 +150,6 @@ def feed_forever(handler): if event: lines = "event: {event}\ndata: {data}\n\n".format(event=event.event, data=json.dumps(event.data)) - print("returning {}".format(lines)) handler.wfile.write(lines.encode('utf-8')) except Empty: pass diff --git a/testing/test_feature_store.py b/testing/test_feature_store.py new file mode 100644 index 00000000..96bb140c --- /dev/null +++ b/testing/test_feature_store.py @@ -0,0 +1,108 @@ +import pytest +import redis + +from ldclient.feature_store import InMemoryFeatureStore +from ldclient.redis_feature_store import RedisFeatureStore + + +class TestFeatureStore: + redis_host = 'localhost' + redis_port = 6379 + + def in_memory(self): + return InMemoryFeatureStore() + + def redis_with_local_cache(self): + r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0) + r.delete("launchdarkly:features") + return RedisFeatureStore() + + def redis_no_local_cache(self): + r = redis.StrictRedis(host=self.redis_host, port=self.redis_port, db=0) + r.delete("launchdarkly:features") + return RedisFeatureStore(expiration=0) + + params = [in_memory, redis_with_local_cache, redis_no_local_cache] + + @pytest.fixture(params=params) + def store(self, request): + return request.param(self) + + @staticmethod + def make_feature(key, ver): + return { + u'key': key, + u'version': ver, + u'salt': u'abc', + u'on': True, + u'variations': [ + { + u'value': True, + u'weight': 100, + u'targets': [] + }, + { + u'value': False, + u'weight': 0, + u'targets': [] + } + ] + } + + def base_initialized_store(self, store): + store.init({ + 'foo': self.make_feature('foo', 10), + 'bar': self.make_feature('bar', 10), + }) + return store + + def test_not_initially_initialized(self, store): + assert store.initialized is False + + def test_initialized(self, store): + store = self.base_initialized_store(store) + assert store.initialized is True + + def test_get_existing_feature(self, store): + store = self.base_initialized_store(store) + expected = self.make_feature('foo', 10) + assert store.get('foo') == expected + + def test_get_nonexisting_feature(self, store): + store = self.base_initialized_store(store) + assert store.get('biz') is None + + def test_upsert_with_newer_version(self, store): + store = self.base_initialized_store(store) + new_ver = self.make_feature('foo', 11) + store.upsert('foo', new_ver) + assert store.get('foo') == new_ver + + def test_upsert_with_older_version(self, store): + store = self.base_initialized_store(store) + new_ver = self.make_feature('foo', 9) + expected = self.make_feature('foo', 10) + store.upsert('foo', new_ver) + assert store.get('foo') == expected + + def test_upsert_with_new_feature(self, store): + store = self.base_initialized_store(store) + new_ver = self.make_feature('biz', 1) + store.upsert('biz', new_ver) + assert store.get('biz') == new_ver + + def test_delete_with_newer_version(self, store): + store = self.base_initialized_store(store) + store.delete('foo', 11) + assert store.get('foo') is None + + def test_delete_unknown_feature(self, store): + store = self.base_initialized_store(store) + store.delete('biz', 11) + assert store.get('biz') is None + + def test_delete_with_older_version(self, store): + store = self.base_initialized_store(store) + store.delete('foo', 9) + expected = self.make_feature('foo', 10) + assert store.get('foo') == expected diff --git a/testing/test_inmemoryfeaturestore.py b/testing/test_inmemoryfeaturestore.py deleted file mode 100644 index d46dbbfa..00000000 --- a/testing/test_inmemoryfeaturestore.py +++ /dev/null @@ -1,82 +0,0 @@ -from ldclient.client import InMemoryFeatureStore -import pytest - -def make_feature(key, ver): - return { - u'key': key, - u'version': ver, - u'salt': u'abc', - u'on': True, - u'variations': [ - { - u'value': True, - u'weight': 100, - u'targets': [] - }, - { - u'value': False, - u'weight': 0, - u'targets': [] - } - ] - } - -def base_initialized_store(): - store = InMemoryFeatureStore() - store.init({ - 'foo': make_feature('foo', 10), - 'bar': make_feature('bar', 10), - }) - return store - -def test_not_initially_initialized(): - store = InMemoryFeatureStore() - assert store.initialized == False - -def test_initialized(): - store = base_initialized_store() - assert store.initialized == True - -def test_get_existing_feature(): - store = base_initialized_store() - expected = make_feature('foo', 10) - assert store.get('foo') == expected - -def test_get_nonexisting_feature(): - store = base_initialized_store() - assert store.get('biz') is None - -def test_upsert_with_newer_version(): - store = base_initialized_store() - new_ver = make_feature('foo', 11) - store.upsert('foo', new_ver) - assert store.get('foo') == new_ver - -def test_upsert_with_older_version(): - store = base_initialized_store() - new_ver = make_feature('foo', 9) - expected = make_feature('foo', 10) - store.upsert('foo', new_ver) - assert store.get('foo') == expected - -def test_upsert_with_new_feature(): - store = base_initialized_store() - new_ver = make_feature('biz', 1) - store.upsert('biz', new_ver) - assert store.get('biz') == new_ver - -def test_delete_with_newer_version(): - store = base_initialized_store() - store.delete('foo', 11) - assert store.get('foo') is None - -def test_delete_unknown_feature(): - store = base_initialized_store() - store.delete('biz', 11) - assert store.get('biz') is None - -def test_delete_with_older_version(): - store = base_initialized_store() - store.delete('foo', 9) - expected = make_feature('foo', 10) - assert store.get('foo') == expected \ No newline at end of file diff --git a/testing/test_integration.py b/testing/test_integration.py index ca747ce9..3dd7bbba 100644 --- a/testing/test_integration.py +++ b/testing/test_integration.py @@ -30,18 +30,10 @@ def fin(): return server -def test_toggle(server): - server.add_feature("foo", feature("foo", "jim")['foo']) - client = LDClient("apikey", Config(base_uri=server.url, events_uri=server.url)) - wait_until(lambda: client.toggle("foo", user('xyz'), "blah") == "jim") - - -def test_sse_init(server, stream): - stream.queue.put(Event(event="put", data=feature("foo", "jim"))) - client = LDClient("apikey", Config( - stream=True, base_uri=server.url, events_uri=server.url, stream_uri=stream.url)) - wait_until(lambda: client.toggle("foo", user('xyz'), "blah") == "jim") - +def test_toggle(server, stream): + stream.queue.put(Event(event="put", data=feature("foo", True))) + client = LDClient("apikey", Config(stream=True, base_uri=server.url, events_uri=server.url, stream_uri=stream.url)) + wait_until(lambda: client.toggle("foo", user('xyz'), False) is True) # Doesn't seem to handle disconnects? # def test_sse_reconnect(server, stream): diff --git a/testing/test_integration_twisted.py b/testing/test_integration_twisted.py index a1b1107e..8f7d3a7f 100644 --- a/testing/test_integration_twisted.py +++ b/testing/test_integration_twisted.py @@ -1,5 +1,5 @@ import logging -from ldclient import TwistedConfig, TwistedLDClient, LDClient +from ldclient import LDClient, TwistedLDClient, TwistedConfig from ldclient.twisted_sse import Event import pytest from testing.server_util import SSEServer, GenericServer @@ -29,14 +29,6 @@ def fin(): request.addfinalizer(fin) return server - -@pytest.inlineCallbacks -def test_toggle(server): - server.add_feature("foo", feature("foo", "jim")['foo']) - client = TwistedLDClient("apikey", TwistedConfig(base_uri=server.url)) - yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "jim")) - - @pytest.inlineCallbacks def test_sse_init(server, stream): stream.queue.put(Event(event="put", data=feature("foo", "jim"))) diff --git a/testing/test_ldclient.py b/testing/test_ldclient.py index 1c8c0b31..c85abd63 100644 --- a/testing/test_ldclient.py +++ b/testing/test_ldclient.py @@ -1,6 +1,7 @@ from builtins import object from ldclient.client import LDClient, Config -from ldclient.interfaces import FeatureRequester +from ldclient.feature_store import InMemoryFeatureStore +from ldclient.interfaces import FeatureRequester, FeatureStore import pytest from testing.sync_util import wait_until @@ -10,14 +11,29 @@ import Queue as queue -class MockFeatureRequester(FeatureRequester): +class MockFeatureStore(FeatureStore): + def delete(self, key, version): + pass + + @property + def initialized(self): + pass + + def init(self, features): + pass + + def all(self): + pass + + def upsert(self, key, feature): + pass def __init__(self, *_): pass - def get(self, key, callback): + def get(self, key): if key == "feature.key": - return callback({ + return { u'key': u'feature.key', u'salt': u'abc', u'on': True, @@ -33,13 +49,13 @@ def get(self, key, callback): u'targets': [] } ] - }) + } else: - return callback(None) + return None -client = LDClient("API_KEY", Config("http://localhost:3000", - feature_requester_class=MockFeatureRequester)) +client = LDClient("API_KEY", Config("http://localhost:3000", feature_store=MockFeatureStore())) +offline_client = LDClient("API_KEY", Config("http://localhost:3000", feature_store=MockFeatureStore(), offline=True)) user = { u'key': u'xyz', @@ -59,7 +75,6 @@ def get(self, key, callback): class MockConsumer(object): - def __init__(self, *_): self._running = False @@ -76,6 +91,14 @@ def flush(self): pass +class MockFeatureRequester(FeatureRequester): + def __init__(self, *_): + pass + + def get_all(self): + pass + + def mock_consumer(): return MockConsumer() @@ -92,14 +115,8 @@ def setup_function(function): u'bizzle': u'def' } } - client.set_online() client._queue = queue.Queue(10) - client._consumer = mock_consumer() - - -@pytest.fixture(autouse=True) -def noop_check_consumer(monkeypatch): - monkeypatch.setattr(client, '_check_consumer', noop_consumer) + client._event_consumer = mock_consumer() def wait_for_event(c, cb): @@ -107,51 +124,42 @@ def wait_for_event(c, cb): return cb(e) -def test_set_offline(): - client.set_offline() - assert client.is_offline() == True - - -def test_set_online(): - client.set_offline() - client.set_online() - assert client.is_offline() == False - - def test_toggle(): assert client.toggle('feature.key', user, default=None) == True def test_toggle_offline(): - client.set_offline() - assert client.toggle('feature.key', user, default=None) == None + assert offline_client.toggle('feature.key', user, default=None) == None def test_toggle_event(): client.toggle('feature.key', user, default=None) def expected_event(e): - return e['kind'] == 'feature' and e['key'] == 'feature.key' and e['user'] == user and e['value'] == True and e['default'] == None + return e['kind'] == 'feature' and e['key'] == 'feature.key' and e['user'] == user and e['value'] == True \ + and e['default'] == None assert expected_event(client._queue.get(False)) + def test_sanitize_user(): client._sanitize_user(numeric_key_user) assert numeric_key_user == sanitized_numeric_key_user + def test_toggle_event_numeric_user_key(): client.toggle('feature.key', numeric_key_user, default=None) def expected_event(e): - return e['kind'] == 'feature' and e['key'] == 'feature.key' and e['user'] == sanitized_numeric_key_user and e['value'] == True and e['default'] == None + return e['kind'] == 'feature' and e['key'] == 'feature.key' and e['user'] == sanitized_numeric_key_user \ + and e['value'] == True and e['default'] == None assert expected_event(client._queue.get(False)) def test_toggle_event_offline(): - client.set_offline() - client.toggle('feature.key', user, default=None) - assert client._queue.empty() + offline_client.toggle('feature.key', user, default=None) + assert offline_client._queue.empty() def test_identify(): @@ -173,9 +181,7 @@ def expected_event(e): def test_identify_offline(): - client.set_offline() - client.identify(user) - assert client._queue.empty() + assert offline_client._queue.empty() def test_track(): @@ -191,62 +197,62 @@ def test_track_numeric_key_user(): client.track('my_event', numeric_key_user, 42) def expected_event(e): - return e['kind'] == 'custom' and e['key'] == 'my_event' and e['user'] == sanitized_numeric_key_user and e['data'] == 42 + return e['kind'] == 'custom' and e['key'] == 'my_event' and e['user'] == sanitized_numeric_key_user \ + and e['data'] == 42 assert expected_event(client._queue.get(False)) def test_track_offline(): - client.set_offline() - client.track('my_event', user, 42) - assert client._queue.empty() + offline_client.track('my_event', user, 42) + assert offline_client._queue.empty() def test_defaults(): client = LDClient("API_KEY", Config( - "http://localhost:3000", defaults={"foo": "bar"})) - client.set_offline() + "http://localhost:3000", defaults={"foo": "bar"}, offline=True)) assert "bar" == client.toggle('foo', user, default=None) def test_defaults_and_online(): - client = LDClient("API_KEY", Config("http://localhost:3000", defaults={"foo": "bar"}, - feature_requester_class=MockFeatureRequester, - consumer_class=MockConsumer)) - assert "bar" == client.toggle('foo', user, default="jim") - assert wait_for_event(client, lambda e: e['kind'] == 'feature' and e[ - 'key'] == u'foo' and e['user'] == user) + expected = "bar" + my_client = LDClient("API_KEY", Config("http://localhost:3000", + defaults={"foo": expected}, + event_consumer_class=MockConsumer, + feature_requester_class=MockFeatureRequester, + feature_store=InMemoryFeatureStore())) + actual = my_client.toggle('foo', user, default="originalDefault") + assert actual == expected + assert wait_for_event(my_client, lambda e: e['kind'] == 'feature' and e['key'] == u'foo' and e['user'] == user) def test_defaults_and_online_no_default(): - client = LDClient("API_KEY", Config("http://localhost:3000", defaults={"foo": "bar"}, - feature_requester_class=MockFeatureRequester, - consumer_class=MockConsumer)) + client = LDClient("API_KEY", Config("http://localhost:3000", + defaults={"foo": "bar"}, + event_consumer_class=MockConsumer, + feature_requester_class=MockFeatureRequester)) assert "jim" == client.toggle('baz', user, default="jim") - assert wait_for_event(client, lambda e: e['kind'] == 'feature' and e[ - 'key'] == u'baz' and e['user'] == user) + assert wait_for_event(client, lambda e: e['kind'] == 'feature' and e['key'] == u'baz' and e['user'] == user) def test_exception_in_retrieval(): class ExceptionFeatureRequester(FeatureRequester): - def __init__(self, *_): pass - def get(self, key, callback): + def get_all(self): raise Exception("blah") client = LDClient("API_KEY", Config("http://localhost:3000", defaults={"foo": "bar"}, + feature_store=InMemoryFeatureStore(), feature_requester_class=ExceptionFeatureRequester, - consumer_class=MockConsumer)) + event_consumer_class=MockConsumer)) assert "bar" == client.toggle('foo', user, default="jim") - assert wait_for_event(client, lambda e: e['kind'] == 'feature' and e[ - 'key'] == u'foo' and e['user'] == user) + assert wait_for_event(client, lambda e: e['kind'] == 'feature' and e['key'] == u'foo' and e['user'] == user) def test_no_defaults(): - client.set_offline() - assert "bar" == client.toggle('foo', user, default="bar") + assert "bar" == offline_client.toggle('foo', user, default="bar") def drain(queue): @@ -262,11 +268,3 @@ def test_flush_empties_queue(): drain(client._queue) client.flush() assert client._queue.empty() - - -def test_flush_offline_does_not_empty_queue(): - client.track('my_event', user, 42) - client.track('my_event', user, 33) - client.set_offline() - client.flush() - assert not client._queue.empty() diff --git a/twisted-requirements.txt b/twisted-requirements.txt index 96f39790..787ab140 100644 --- a/twisted-requirements.txt +++ b/twisted-requirements.txt @@ -1,4 +1,4 @@ txrequests>=0.9 pyOpenSSL>=0.14 -txredis>=2.3 -cryptography>=1.0 \ No newline at end of file +cryptography>=1.0 +service_identity>=16.0 \ No newline at end of file