Permalink
Browse files

- removed take, reserve/delete became the default behaviors

  • Loading branch information...
1 parent 52fae5a commit 201cc87abe6a3c14024cad65af017867873e8ecc @superisaac committed Jan 12, 2010
Showing with 54 additions and 79 deletions.
  1. +39 −33 client_test.py
  2. +8 −21 redqueue/queue.py
  3. +5 −24 redqueue/server.py
  4. +2 −1 redqueue/task.py
View
@@ -15,63 +15,68 @@ def get_mc():
return mc
mc = get_mc()
-def clean_cache(key):
+def take(key):
+ v = mc.get(key)
+ if v is not None:
+ mc.delete(key)
+ return v
+
+def clean_queue(key):
+ mc.delete(key)
while True:
- if mc.get(key) is None:
+ if take(key) is None:
break
def test_queue():
- clean_cache('abc/def')
+ clean_queue('abc/def')
mc.set('abc/def', 'I')
mc.set('abc/def', 'really')
mc.set('abc/def', 'love')
mc.set('abc/def', 'it')
- assert(mc.get('abc/def') == 'I')
- assert(mc.get('abc/def') == 'really')
- assert(mc.get('abc/def') == 'love')
- assert(mc.get('abc/def') == 'it')
- assert(mc.get('abc/def') is None)
+ assert(take('abc/def') == 'I')
+ assert(take('abc/def') == 'really')
+ assert(take('abc/def') == 'love')
+ assert(take('abc/def') == 'it')
+ assert(take('abc/def') is None)
print 'test queue ok'
def test_timeout():
- clean_cache('abc/def')
+ clean_queue('abc/def')
mc.set('abc/def', 'I')
mc.set('abc/def', 'really', 3) # time out is 3 seconds
mc.set('abc/def', 'love')
mc.set('abc/def', 'it')
time.sleep(5)
- assert(mc.get('abc/def') == 'I')
- assert(mc.get('abc/def') == 'love')
- assert(mc.get('abc/def') == 'it')
- assert(mc.get('abc/def') is None)
+ assert(take('abc/def') == 'I')
+ assert(take('abc/def') == 'love')
+ assert(take('abc/def') == 'it')
+ assert(take('abc/def') is None)
print 'test queue timeout ok'
def test_reservation():
- clean_cache('abc')
- clean_cache('def')
+ clean_queue('abc')
+ clean_queue('def')
mc.set('abc', 'I')
mc.set('abc', 'really')
mc.set('config:reserv', 1)
assert(mc.get('abc') == 'I')
assert(mc.get('abc') is None)
mc.delete('abc')
- mc.set('config:reserv', 0)
- assert(mc.get('abc') == 'really')
+ assert(take('abc') == 'really')
print 'test reservation ok'
def test_reservation_close():
global mc
- clean_cache('abc')
+ clean_queue('abc')
mc.set('abc', 'I')
mc.set('abc', 'love')
- mc.set('config:reserv', 1)
assert(mc.get('abc') == 'I')
mc.disconnect_all()
mc = get_mc()
- assert(mc.get('abc') == 'love')
+ assert(take('abc') == 'love')
assert(mc.get('abc') == 'I')
print 'test reservation on close ok'
@@ -86,26 +91,27 @@ def test_server_error():
% python client_test.py
I
% python client_test.py
- None
+ love
+ ...
"""
if sys.argv[1:] == ['send']:
mc.set('xyz', 'I')
mc.set('xyz', 'love')
- mc.set('config:reserv', 1)
print mc.get('xyz')
else:
print mc.get('xyz')
def test_get_multi():
- clean_cache('abc')
- clean_cache('def')
- clean_cache('ghi')
- clean_cache('jkl')
+ clean_queue('abc')
+ clean_queue('def')
+ clean_queue('ghi')
+ clean_queue('jkl')
mc.set('def', 'I')
mc.set('abc', 'love')
mc.set('ghi', 'it')
assert(mc.get('def') == 'I')
+ #print mc.get_multi(['abc', 'def', 'ghi', 'jkl'])
assert(mc.get_multi(['abc', 'def', 'ghi', 'jkl']) ==
{'abc': 'love', 'ghi': 'it'})
print 'test get multi ok'
@@ -116,15 +122,15 @@ def test_performance():
for i in xrange(100):
mc.set('perf', i)
for i in xrange(100):
- mc.get('perf')
+ take('perf')
if __name__ == '__main__':
- test_queue()
- test_timeout()
- test_reservation()
- test_reservation_close()
- test_get_multi()
+ #test_queue()
+ #test_timeout()
+ #test_reservation()
+ #test_reservation_close()
+ #test_get_multi()
test_server_error()
- test_performance()
+ #test_performance()
View
@@ -63,47 +63,34 @@ def reserve(self, prot_id):
self.borrowing[prot_id] = (timeout, data)
return timeout, data
- def take(self):
- """ Take an element away and never return it
- """
- while True:
- try:
- timeout, data = self._queue.pop()
- except IndexError:
- return None
- self.addlog('G\r\n')
- self.rotate_log()
- if timeout > 0 and timeout < time.time():
- continue
- return timeout, data
+ def take(self, prot_id):
+ t = self.reserve(prot_id)
+ if t is not None:
+ self.use(prot_id)
+ return t
def load_from_log(self, logpath):
logfile = open(logpath, 'rb')
while True:
line = logfile.readline()
if not line:
break
- if line.startswith('B'):
+ if line.startswith('B'): # Borrow an item
_, prot_id = line.split()
try:
data = self._queue.pop()
self.borrowing[prot_id] = data
except IndexError:
logging.error('Pop from empty stack')
- elif line.startswith('U'):
+ elif line.startswith('U'): # Use an item
_, prot_id = line.split()
assert prot_id in self.borrowing
del self.borrowing[prot_id]
- elif line.startswith('R'):
+ elif line.startswith('R'): # Return an item
_, prot_id = line.split()
assert prot_id in self.borrowing
t = self.borrowing.pop(prot_id)
self._queue.appendleft(t)
- elif line.startswith('G'):
- try:
- self._queue.pop()
- except IndexError:
- logging.error('Pop from empty stack')
elif line.startswith('S'):
t, timeout, lendata = line.split()
data = logfile.read(int(lendata))
View
@@ -53,16 +53,8 @@ def __init__(self, stream):
'set': self.handle_set,
'delete': self.handle_delete}
self.wait_for_line()
- self.reservation = False
self.resved_keys = set()
- def set_reservation(self, value):
- orig_reservation = self.reservation
- self.reservation = value in ('1', 'true')
- if orig_reservation != self.reservation:
- logging.info('Set reservation to be %s' % self.reservation)
- self.use_key()
-
def use_key(self, key=None):
""" Mark all reserved keys or the specified key as used """
if key is None:
@@ -96,34 +88,23 @@ def handle_set(self, key, flags, exptime, bytes, *args):
exptime = int(exptime)
if exptime > 0:
exptime = time.time() + exptime
-
def on_set_data(data):
data = data[:-2]
- if key == 'config:reserv':
- self.set_reservation(data)
- else:
- self.server.queue_factory.get_queue(key).give(exptime, data)
+ self.server.queue_factory.get_queue(key).give(exptime, data)
self.stream.write('STORED\r\n')
self.wait_for_line()
self.stream.read_bytes(bytes + 2, on_set_data)
return True
def _get_data(self, key):
- if self.reservation and (key in self.resved_keys):
+ if key in self.resved_keys:
return None
- prot_id = None
- if self.reservation:
- prot_id = self.protocol_id
q = self.server.queue_factory.get_queue(key, auto_create=False)
t = None
if q:
- if self.reservation:
- t = q.reserve(prot_id=prot_id)
- else:
- t = q.take()
+ t = q.reserve(prot_id=self.protocol_id)
if t:
- if self.reservation:
- self.resved_keys.add(key)
+ self.resved_keys.add(key)
return t[1] # t is a tuple of (timeout, data)
def handle_get(self, *keys):
@@ -147,7 +128,7 @@ def handle_gets(self, *keys):
def handle_delete(self, key, *args):
if key in self.resved_keys:
self.use_key(key)
- self.stream.write('DELETED\r\n')
+ self.stream.write('DLETED\r\n')
else:
self.stream.write('NOT_DELETED\r\n')
View
@@ -16,10 +16,11 @@ class Task(object):
def __init__(self, factory):
self.timeout = 0.02
self.queue_factory = factory
+ self.prot_id = 'task:%s' % id(self)
def check(self):
q = self.queue_factory.get_queue(self.key)
- t = q.take()
+ t = q.take(self.prot_id)
if t is None:
# time out increase with the null result
self.timeout *= 1.5

0 comments on commit 201cc87

Please sign in to comment.