Skip to content

Commit

Permalink
LLD implementations of requester and processor
Browse files Browse the repository at this point in the history
* Add RedisLDDRequester to pull features from LDD-populated redis
* Add ExpiringDict from master to add caching to RedisLDDRequester
* Add 'events' config parameter to control whether events are sent to LD
* Add new 'redis' optional deps for sync redis RedisLDDRequester
  • Loading branch information
mrdon committed Sep 21, 2015
1 parent dd2516a commit 650d33a
Show file tree
Hide file tree
Showing 10 changed files with 287 additions and 7 deletions.
2 changes: 2 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
This product includes software (ExpiringDict) developed by
Mailgun (https://github.com/mailgun/expiringdict).
16 changes: 15 additions & 1 deletion ldclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,23 @@ def __init__(self,
stream=False,
verify=True,
defaults=None,
events=True,
stream_processor_class=None,
feature_store_class=None,
feature_requester_class=None,
consumer_class=None):
"""
:param stream_processor_class: A factory for a StreamProcessor implementation taking the api key, config,
and FeatureStore implementation
:type stream_processor_class: (str, Config, FeatureStore) -> StreamProcessor
:param feature_store_class: A factory for a FeatureStore implementation
:type feature_store_class: () -> FeatureStore
:param feature_requester_class: A factory for a FeatureRequester implementation taking the api key and config
:type feature_requester_class: (str, Config) -> FeatureRequester
:param consumer_class: A factory for an EventConsumer implementation taking the event queue, api key, and config
:type consumer_class: (queue.Queue, str, Config) -> EventConsumer
"""
if defaults is None:
defaults = {}

Expand All @@ -53,6 +66,7 @@ def __init__(self,
self.capacity = capacity
self.verify = verify
self.defaults = defaults
self.events = events

def get_default(self, key, default):
return default if key not in self.defaults else self.defaults[key]
Expand Down Expand Up @@ -164,7 +178,7 @@ def _stop_consumers(self):
self._stream_processor.stop()

def _send(self, event):
if self._offline:
if self._offline or not self._config.events:
return
self._check_consumer()
event['creationDate'] = int(time.time() * 1000)
Expand Down
155 changes: 155 additions & 0 deletions ldclient/expiringdict.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
'''
Dictionary with auto-expiring values for caching purposes.
Expiration happens on any access, object is locked during cleanup from expired
values. Can not store more than max_len elements - the oldest will be deleted.
>>> ExpiringDict(max_len=100, max_age_seconds=10)
The values stored in the following way:
{
key1: (value1, created_time1),
key2: (value2, created_time2)
}
NOTE: iteration over dict and also keys() do not remove expired values!
Copied from https://github.com/mailgun/expiringdict/commit/d17d071721dd12af6829819885a74497492d7fb7 under the APLv2
'''

import time
from threading import RLock

try:
from collections import OrderedDict
except ImportError:
# Python < 2.7
from ordereddict import OrderedDict


class ExpiringDict(OrderedDict):
def __init__(self, max_len, max_age_seconds):
assert max_age_seconds >= 0
assert max_len >= 1

OrderedDict.__init__(self)
self.max_len = max_len
self.max_age = max_age_seconds
self.lock = RLock()

def __contains__(self, key):
""" Return True if the dict has a key, else return False. """
try:
with self.lock:
item = OrderedDict.__getitem__(self, key)
if time.time() - item[1] < self.max_age:
return True
else:
del self[key]
except KeyError:
pass
return False

def __getitem__(self, key, with_age=False):
""" Return the item of the dict.
Raises a KeyError if key is not in the map.
"""
with self.lock:
item = OrderedDict.__getitem__(self, key)
item_age = time.time() - item[1]
if item_age < self.max_age:
if with_age:
return item[0], item_age
else:
return item[0]
else:
del self[key]
raise KeyError(key)

def __setitem__(self, key, value):
""" Set d[key] to value. """
with self.lock:
if len(self) == self.max_len:
self.popitem(last=False)
OrderedDict.__setitem__(self, key, (value, time.time()))

def pop(self, key, default=None):
""" Get item from the dict and remove it.
Return default if expired or does not exist. Never raise KeyError.
"""
with self.lock:
try:
item = OrderedDict.__getitem__(self, key)
del self[key]
return item[0]
except KeyError:
return default

def ttl(self, key):
""" Return TTL of the `key` (in seconds).
Returns None for non-existent or expired keys.
"""
key_value, key_age = self.get(key, with_age=True)
if key_age:
key_ttl = self.max_age - key_age
if key_ttl > 0:
return key_ttl
return None

def get(self, key, default=None, with_age=False):
" Return the value for key if key is in the dictionary, else default. "
try:
return self.__getitem__(key, with_age)
except KeyError:
if with_age:
return default, None
else:
return default

def items(self):
""" Return a copy of the dictionary's list of (key, value) pairs. """
r = []
for key in self:
try:
r.append((key, self[key]))
except KeyError:
pass
return r

def values(self):
""" Return a copy of the dictionary's list of values.
See the note for dict.items(). """
r = []
for key in self:
try:
r.append(self[key])
except KeyError:
pass
return r

def fromkeys(self):
" Create a new dictionary with keys from seq and values set to value. "
raise NotImplementedError()

def iteritems(self):
""" Return an iterator over the dictionary's (key, value) pairs. """
raise NotImplementedError()

def itervalues(self):
""" Return an iterator over the dictionary's values. """
raise NotImplementedError()

def viewitems(self):
" Return a new view of the dictionary's items ((key, value) pairs). "
raise NotImplementedError()

def viewkeys(self):
""" Return a new view of the dictionary's keys. """
raise NotImplementedError()

def viewvalues(self):
""" Return a new view of the dictionary's values. """
raise NotImplementedError()
48 changes: 48 additions & 0 deletions ldclient/redis_requester.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import json
from ldclient.expiringdict import ExpiringDict
from ldclient.interfaces import FeatureRequester
import redis


# noinspection PyUnusedLocal
def create_redis_ldd_requester(api_key, config, store, **kwargs):
return RedisLDDRequester(config, **kwargs)


class RedisLDDRequester(FeatureRequester):
"""
Requests features from redis, usually stored via the LaunchDarkly Daemon (LDD). Recommended to be combined
with the ExpiringInMemoryFeatureStore
"""
def __init__(self, config,
expiration=15,
redis_host='localhost',
redis_port=6379,
redis_prefix='launchdarkly'):
"""
:type config: Config
"""
self._redis_host = redis_host
self._redis_port = redis_port
self._features_key = "{}:features".format(redis_prefix)
self._cache = ExpiringDict(max_len=config.capacity, max_age_seconds=expiration)
self._pool = None

def _get_connection(self):
if self._pool is None:
self._pool = redis.ConnectionPool(host=self._redis_host, port=self._redis_port)
return redis.Redis(connection_pool=self._pool)

def get(self, key, callback):
cached = self._cache.get(key)
if cached is not None:
return cached
else:
rd = self._get_connection()
raw = rd.hget(self._features_key, key)
if raw:
val = json.loads(raw)
else:
val = None
self._cache[key] = val
return val
6 changes: 2 additions & 4 deletions ldclient/twisted_impls.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import txrequests


class TwistedFeatureRequester(FeatureRequester):
class TwistedHttpFeatureRequester(FeatureRequester):

def __init__(self, api_key, config):
self._api_key = api_key
Expand Down Expand Up @@ -64,7 +64,7 @@ def __init__(self, *args, **kwargs):
super(TwistedConfig, self).__init__(*args, **kwargs)
self.stream_processor_class = TwistedStreamProcessor
self.consumer_class = TwistedEventConsumer
self.feature_requester_class = TwistedFeatureRequester
self.feature_requester_class = TwistedHttpFeatureRequester


class TwistedStreamProcessor(StreamProcessor):
Expand Down Expand Up @@ -108,10 +108,8 @@ def __init__(self, queue, api_key, config):

self._looping_call = None
""" :type: LoopingCall"""
self._flushed = None

def start(self):
self._flushed = defer.Deferred()
self._looping_call = task.LoopingCall(self._consume)
self._looping_call.start(5)

Expand Down
51 changes: 51 additions & 0 deletions ldclient/twisted_redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import json
from ldclient.interfaces import StreamProcessor
from twisted.internet import task, defer, protocol, reactor
from txredis.client import RedisClient


# noinspection PyUnusedLocal
def create_redis_ldd_processor(api_key, config, store, **kwargs):
return TwistedRedisLDDStreamProcessor(store, **kwargs)


class TwistedRedisLDDStreamProcessor(StreamProcessor):
def __init__(self, store, update_delay=15, redis_host='localhost',
redis_port=6379,
redis_prefix='launchdarkly'):
self._running = False

if update_delay == 0:
update_delay = .5
self._update_delay = update_delay

self._store = store
""" :type: ldclient.interfaces.FeatureStore """

self._features_key = "{}:features".format(redis_prefix)
self._redis_host = redis_host
self._redis_port = redis_port
self._looping_call = None

def start(self):
self._running = True
self._looping_call = task.LoopingCall(self._refresh)
self._looping_call.start(self._update_delay)

def stop(self):
self._looping_call.stop()

def is_alive(self):
return self._looping_call is not None and self._looping_call.running

def _get_connection(self):
client_creator = protocol.ClientCreator(reactor, RedisClient)
return client_creator.connectTCP(self._redis_host, self._redis_port)

@defer.inlineCallbacks
def _refresh(self):
redis = yield self._get_connection()
""" :type: RedisClient """
result = yield redis.hgetall(self._features_key)
data = json.loads(result)
self._store.init(data)
1 change: 1 addition & 0 deletions redis-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
redis>=2.10
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
install_reqs = parse_requirements('requirements.txt', session=uuid.uuid1())
test_reqs = parse_requirements('test-requirements.txt', session=uuid.uuid1())
twisted_reqs = parse_requirements('twisted-requirements.txt', session=uuid.uuid1())
redis_reqs = parse_requirements('redis-requirements.txt', session=uuid.uuid1())

# reqs is a list of requirement
# e.g. ['django==1.5.1', 'mezzanine==1.4.6']
reqs = [str(ir.req) for ir in install_reqs]
testreqs = [str(ir.req) for ir in test_reqs]
txreqs = [str(ir.req) for ir in twisted_reqs]
redisreqs = [str(ir.req) for ir in redis_reqs]


class PyTest(Command):
Expand Down Expand Up @@ -47,7 +49,8 @@ def run(self):
'Programming Language :: Python :: 2 :: Only',
],
extras_require={
"twisted": txreqs
"twisted": txreqs,
"redis": redisreqs
},
tests_require=testreqs,
cmdclass = {'test': PyTest},
Expand Down
7 changes: 7 additions & 0 deletions testing/test_integration_twisted.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ def test_sse_reconnect(server, stream):
yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "jim"))


@pytest.inlineCallbacks
def test_toggle_redis_background(server):
server.add_feature("foo", feature("foo", "jim")['foo'])
client = LDClient("apikey", TwistedConfig(base_uri=server.url, ))
yield wait_until(is_equal(lambda: client.toggle("foo", user('xyz'), "blah"), "jim"))


def feature(key, val):
return {
key: {"name": "Feature {}".format(key), "key": key, "kind": "flag", "salt": "Zm9v", "on": val,
Expand Down
3 changes: 2 additions & 1 deletion twisted-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
txrequests>=0.9
pyOpenSSL>=0.14
pyOpenSSL>=0.14
txredis>=2.3

0 comments on commit 650d33a

Please sign in to comment.