Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Add pipeline method #2

Closed
wants to merge 6 commits into from

2 participants

@pdc
pdc commented

Here’s a fake version of the Redis-Py Pipeline class that wraps use of the WATCH, MULTI, and EXECUTE commands.

This should be enough to run code that uses the pipeline protocol in a test against a fake Redis. It makes some attempts to raise the right exception when WATCH conflicts occur, but probably would need to be extended to do detailed testing of potential race conditions.

@jamesls
Owner

Thanks for this. I've merged in your changes.

@jamesls jamesls closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 197 additions and 0 deletions.
  1. +102 −0 fakeredis.py
  2. +95 −0 test_fakeredis.py
View
102 fakeredis.py
@@ -1,5 +1,6 @@
import random
import warnings
+import copy
from ctypes import CDLL, c_double
from ctypes.util import find_library
@@ -870,3 +871,104 @@ def _list_or_args(self, keys, args):
if args:
keys.extend(args)
return keys
+
+ def pipeline(self, transaction=True):
+ """Return an object that can be used to issue Redis commands in a batch.
+
+ Arguments --
+ transaction (bool) -- whether the buffered commands
+ are issued atomically. True by default.
+ """
+ return FakePipeline(self, transaction)
+
+ def transaction(self, func, *keys):
+ # We use a for loop instead of while
+ # because if the test this is being used in
+ # goes wrong we don’t want an infinite loop!
+ with self.pipeline() as p:
+ for _ in range(5):
+ try:
+ p.watch(*keys)
+ func(p)
+ return p.execute()
+ except redis.WatchError:
+ continue
+ raise redis.WatchError('Could not run transaction after 5 tries')
+
+
+class FakePipeline(object):
+ """Helper class for FakeRedis to implement pipelines.
+
+ A pipeline is a collection of commands that
+ are buffered until you call ``execute``, at which
+ point they are called sequentially and a list
+ of their return values is returned.
+ """
+
+ # Now wondering whether the real Pipeline class
+ # could be made to work with FakeRedis and
+ # save me some work. Too late now!
+
+ def __init__(self, owner, transaction=True):
+ """Create a pipeline for the specified FakeRedis instance.
+
+ Arguments --
+ owner -- a FakeRedis instance.
+ """
+ self.owner = owner
+ self.transaction = transaction
+ self.commands = []
+ self.need_reset = False
+ self.is_immediate = False
+ self.watching = {}
+
+ def __getattr__(self, name):
+ """Magic method to allow FakeRedis commands to be called.
+
+ Returns a method that records the command for later.
+ """
+ if not hasattr(self.owner, name):
+ raise AttributeError('%r: does not have attribute %r' % (self.owner, name))
+ def meth(*args, **kwargs):
+ if self.is_immediate:
+ # Special mode during watch…multi sequence.
+ return getattr(self.owner, name)(*args, **kwargs)
+ self.commands.append((name, args, kwargs))
+ return self
+ setattr(self, name, meth)
+ return meth
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ self.reset()
+
+ def execute(self):
+ """Run all the commands in the pipeline and return the results."""
+ if self.watching:
+ mismatches = [(k, v, u)
+ for (k, v, u) in [(k, v, self.owner._db.get(k)) for (k, v) in self.watching.items()]
+ if v != u]
+ if mismatches:
+ self.commands = []
+ raise redis.WatchError('Watched key%s %s changed'
+ % ('' if len(mismatches) == 1 else 's', ', '.join(k for (k, _, _) in mismatches)))
+ return [getattr(self.owner, name)(*args, **kwargs)
+ for name, args, kwargs in self.commands]
+
+ def watch(self, *keys):
+ self.watching.update((key, copy.deepcopy(self.owner._db.get(key))) for key in keys)
+ self.need_reset = True
+ self.is_immediate = True
+ pass
+
+ def multi(self):
+ self.is_immediate = False
+
+ def reset(self):
+ self.need_reset = False
+
+
+
+
View
95 test_fakeredis.py
@@ -6,6 +6,7 @@
from nose.plugins.skip import SkipTest
import redis
+import redis.client
import fakeredis
@@ -1033,6 +1034,100 @@ def test_sort_with_hash(self):
self.redis.sort('foo', by='record_*->age', get='record_*->name'),
['baby', 'teen', 'adult'])
+ def test_pipeline(self):
+ # The pipeline method returns ann object for
+ # issuing multiple commands in a batch.
+ p = self.redis.pipeline()
+ p.set('foo', 'bar').get('foo')
+ p.lpush('baz', 'quux')
+ p.lpush('baz', 'quux2').lrange('baz', 0, -1)
+ res = p.execute()
+
+ # Check return values returned as list.
+ self.assertEqual([True, 'bar', 1, 2, ['quux2', 'quux']], res)
+
+ # Check side effects happened as expected.
+ self.assertEqual(['quux2', 'quux'], self.redis.lrange('baz', 0, -1))
+
+ def test_pipeline_non_transational(self):
+ # For our simple-minded model I don’t think
+ # there is any observable difference.
+ p = self.redis.pipeline(transaction=False)
+ res = p.set('baz', 'quux').get('baz').execute()
+
+ self.assertEqual([True, 'quux'], res)
+
+ def test_pipeline_raises_when_watched_key_changed(self):
+ self.redis.set('foo', 'bar')
+ self.redis.rpush('greet', 'hello')
+ p = self.redis.pipeline()
+ try:
+ p.watch('greet', 'foo', 'quux')
+ nextf = p.get('foo') + 'baz'
+ # simulate change happening on another thread:
+ self.redis.rpush('greet', 'world')
+ p.multi() # begin pipelining
+ p.set('foo', nextf)
+
+ self.assertRaises(redis.WatchError, p.execute)
+ finally:
+ p.reset()
+
+ def test_pipeline_succeeds_despite_unwatched_key_changed(self):
+ # Same setup as before except for the params to the WATCH command.
+ self.redis.set('foo', 'bar')
+ self.redis.rpush('greet', 'hello')
+ p = self.redis.pipeline()
+ try:
+ p.watch('foo') # only watch one of the 2 keys
+ nextf = p.get('foo') + 'baz'
+ # simulate change happening on another thread:
+ self.redis.rpush('greet', 'world')
+ p.multi() # begin pipelining
+ p.set('foo', nextf)
+ p.execute()
+
+ # Check the commands were executed.
+ self.assertEqual('barbaz', self.redis.get('foo'))
+ finally:
+ p.reset()
+
+ def test_pipeline_as_context_manager(self):
+ self.redis.set('foo', 'bar')
+ with self.redis.pipeline() as p:
+ p.watch('foo')
+ self.assertTrue(isinstance(p, redis.client.BasePipeline) or p.need_reset)
+ p.multi() # begin pipelining
+ p.set('foo', 'baz')
+ p.execute()
+
+ # Usually you would consider the pipeline to
+ # have been destroyed
+ # after the with statement, but we need to check
+ # it was reset properly:
+ self.assertTrue(isinstance(p, redis.client.BasePipeline) or not p.need_reset)
+
+ def test_pipeline_transaction_shortcut(self):
+ # This example taken pretty much from the redis-py documetnation.
+ self.redis.set('OUR-SEQUENCE-KEY', 13)
+ calls = []
+ def client_side_incr(pipe):
+ calls.append((pipe,))
+ current_value = pipe.get('OUR-SEQUENCE-KEY')
+ next_value = int(current_value) + 1
+
+ if len(calls) < 3:
+ # Simulate a change from another thread.
+ self.redis.set('OUR-SEQUENCE-KEY', next_value)
+
+ pipe.multi()
+ pipe.set('OUR-SEQUENCE-KEY', next_value)
+ res = self.redis.transaction(client_side_incr, 'OUR-SEQUENCE-KEY')
+
+ self.assertEqual([True], res)
+ self.assertEqual(16, int(self.redis.get('OUR-SEQUENCE-KEY')))
+ self.assertEqual(3, len(calls))
+
@redis_must_be_running
class TestRealRedis(TestFakeRedis):
Something went wrong with that request. Please try again.