Skip to content
Browse files

- Rewrite journal scheme, the prior one has bugs

 - logging
  • Loading branch information...
1 parent 201cc87 commit cbecc5e3ba6930e03089f74c1d183d2b7f747a0b @superisaac committed
Showing with 100 additions and 85 deletions.
  1. +21 −7 client_test.py
  2. +69 −71 redqueue/queue.py
  3. +1 −1 redqueue/server.py
  4. +9 −6 redqueue_server.py
View
28 client_test.py
@@ -9,7 +9,9 @@
####################################################################
import sys, os
import time
+import logging
import memcache
+logging.basicConfig(stream=sys.stdout)
def get_mc():
mc = memcache.Client(['127.0.0.1:12345'])
return mc
@@ -28,12 +30,11 @@ def clean_queue(key):
break
def test_queue():
- clean_queue('abc/def')
+ #clean_queue('abc/def')
mc.set('abc/def', 'I')
mc.set('abc/def', 'really')
mc.set('abc/def', 'love')
mc.set('abc/def', 'it')
-
assert(take('abc/def') == 'I')
assert(take('abc/def') == 'really')
assert(take('abc/def') == 'love')
@@ -116,6 +117,18 @@ def test_get_multi():
{'abc': 'love', 'ghi': 'it'})
print 'test get multi ok'
+def test_delete_multi():
+ clean_queue('abc')
+ clean_queue('def')
+ clean_queue('ghi')
+ clean_queue('jkl')
+
+ mc.set('def', 'I')
+ mc.set('abc', 'love')
+ assert(mc.get('def') == 'I')
+ mc.delete_multi(['abc', 'def', 'ghi', 'jkl'])
+ assert(mc.get_multi(['abc', 'def', 'ghi', 'jkl']) ==
+ {'abc': 'love'})
def test_performance():
for _ in xrange(100):
@@ -125,11 +138,12 @@ def test_performance():
take('perf')
if __name__ == '__main__':
- #test_queue()
- #test_timeout()
- #test_reservation()
- #test_reservation_close()
- #test_get_multi()
+ test_queue()
+ test_timeout()
+ test_reservation()
+ test_reservation_close()
+ test_get_multi()
+ test_delete_multi()
test_server_error()
#test_performance()
View
140 redqueue/queue.py
@@ -13,39 +13,40 @@
import urllib
from collections import deque
-LOG_CAPACITY = 1024 * 1024 # 1 mega bytes for each chunk
+JOURNAL_CAPACITY = 1 #024 * 1024 # 1 mega bytes for each chunk
#TODO: binary log
class Queue(object):
def __init__(self, key):
self.key = key
self._queue = deque()
- self.log = None
- self.borrowing = {}
+ self._jfile = None
+ self._lent = {}
- def addlog(self, w):
+ def addjournal(self, w):
pass
- def rotate_log(self):
+ def rotate_journal(self):
pass
def give_back(self, prot_id):
""" Give the elememt borrowed by prot_id back for future calling"""
- timeout, data = self.borrowing.pop(prot_id)
+ timeout, data = self._lent.pop(prot_id)
self._queue.appendleft((timeout, data))
- self.addlog('R %s\r\n' % prot_id)
+ self.addjournal('R %s\r\n' % prot_id)
def give(self, timeout, data):
self._queue.appendleft((timeout, data))
- self.addlog('S %d %d\r\n%s\r\n' % (timeout,
+ self.addjournal('S %d %d\r\n%s\r\n' % (timeout,
len(data), data))
def use(self, prot_id):
""" Mark the element borrowed by prot_id used
"""
- if prot_id in self.borrowing:
- self.addlog("U %s\r\n" % prot_id)
- del self.borrowing[prot_id]
-
+ if prot_id in self._lent:
+ self.addjournal("U %s\r\n" % prot_id)
+ del self._lent[prot_id]
+ self.rotate_journal()
+
def reserve(self, prot_id):
""" Reserve an element by prot_id and return it later, or the
server will recycle it"""
@@ -54,13 +55,12 @@ def reserve(self, prot_id):
timeout, data = self._queue.pop()
except IndexError:
return None
- self.addlog('B %s\r\n' % prot_id)
+ self.addjournal('B %s\r\n' % prot_id)
- self.rotate_log()
if timeout > 0 and timeout < time.time():
continue
- assert prot_id not in self.borrowing
- self.borrowing[prot_id] = (timeout, data)
+ assert prot_id not in self._lent
+ self._lent[prot_id] = (timeout, data)
return timeout, data
def take(self, prot_id):
@@ -69,95 +69,93 @@ def take(self, prot_id):
self.use(prot_id)
return t
- def load_from_log(self, logpath):
- logfile = open(logpath, 'rb')
+ def load_from_journal(self, jpath):
+ jfile = open(jpath, 'rb')
+ lent = {}
while True:
- line = logfile.readline()
+ line = jfile.readline()
if not line:
break
if line.startswith('B'): # Borrow an item
_, prot_id = line.split()
try:
data = self._queue.pop()
- self.borrowing[prot_id] = data
+ lent[prot_id] = data
except IndexError:
logging.error('Pop from empty stack')
elif line.startswith('U'): # Use an item
_, prot_id = line.split()
- assert prot_id in self.borrowing
- del self.borrowing[prot_id]
+ assert prot_id in lent
+ del lent[prot_id]
elif line.startswith('R'): # Return an item
_, prot_id = line.split()
- assert prot_id in self.borrowing
- t = self.borrowing.pop(prot_id)
+ assert prot_id in lent
+ t = lent.pop(prot_id)
self._queue.appendleft(t)
elif line.startswith('S'):
t, timeout, lendata = line.split()
- data = logfile.read(int(lendata))
- logfile.read(2) # line break
+ data = jfile.read(int(lendata))
+ jfile.read(2) # line break
self._queue.appendleft((int(timeout),
data))
else:
- logging.error('Bad format for log file %s' % logpath)
+ journalging.error('Bad format for journal file %s' % jpath)
- for t in self.borrowing.itervalues():
- self._queue.append(t)
- self.borrowing = {}
- logfile.close()
+ for t in lent.itervalues():
+ self._queue.appendleft(t)
+ self._lent = {}
+ jfile.close()
class ReliableQueue(Queue):
- def addlog(self, w):
- os.write(self.log.fileno(), w)
- self.log.flush()
-
- def addlog_sync(self, w):
- os.write(self.log.fileno(), w)
- self.log.flush()
- #os.fdatasync(self.log.fileno())
- os.fsync(self.log.fileno())
-
- def rotate_log(self):
- if self.log is None or (len(self._queue) == 0 and
- len(self.borrowing) == 0 and
- self.log.tell() >= LOG_CAPACITY):
- if self.log:
- self.log.close()
- fn = os.path.join(self.server.logdir,
- '%s-%d.log' % (urllib.quote_plus(self.key),
- time.time()))
- logging.info('rotate log to %s' % fn)
- self.log = open(fn, 'ab')
+ def addjournal(self, w):
+ os.write(self._jfile.fileno(), w)
+ self._jfile.flush()
+
+ def addjournal_sync(self, w):
+ os.write(self._jfile.fileno(), w)
+ self._jfile.flush()
+ #os.fdatasync(self._jfile.fileno())
+ os.fsync(self._jfile.fileno())
+ def _journal_file_name(self):
+ return os.path.join(self.server.jdir,
+ '%s.log' % urllib.quote_plus(self.key))
+ def rotate_journal(self):
+ if self._jfile is None:
+ self._jfile = open(self._journal_file_name(), 'ab')
+ elif (len(self._queue) == 0 and
+ len(self._lent) == 0 and
+ self._jfile.tell() >= JOURNAL_CAPACITY):
+ self._jfile.close()
+ curr_journal_fn = self._journal_file_name()
+ journal_fn = '%s.%d' % (curr_journal_fn, time.time())
+ os.rename(curr_journal_fn, journal_fn)
+ logging.info('rotate journal to %s' % journal_fn)
+ self._jfile = open(curr_journal_fn, 'ab')
+
class QueueFactory(object):
queue_class = Queue
- def __init__(self, logdir):
+ def __init__(self, jdir):
self.queue_collection = {}
- self.logdir = logdir
+ self.jdir = jdir
def get_queue(self, key, auto_create=True):
if key not in self.queue_collection and auto_create:
q = self.queue_class(key)
q.server = self
- q.rotate_log()
+ q.rotate_journal()
self.queue_collection[key] = q
return self.queue_collection.get(key)
- def scan_logs(self):
- logging.info('Sanning logs ...')
- queue_fns = {}
- for fn in os.listdir(self.logdir):
- m = re.search(r'-(\d+).log$', fn)
+ def scan_journals(self):
+ logging.info('Sanning journals ...')
+ for fn in os.listdir(self.jdir):
+ m = re.search(r'\.log$', fn)
if m:
key = fn[:m.start()]
- tm = int(m.group(1))
- if (key not in queue_fns or
- tm > queue_fns[key]):
- queue_fns[key] = tm
-
- for key, tm in queue_fns.iteritems():
- ukey = urllib.unquote_plus(key)
- logging.info('Restoring queue %s ...' % ukey)
- queue = self.get_queue(ukey)
- queue.load_from_log(os.path.join(self.logdir,
- '%s-%d.log' % (key, tm)))
+ ukey = urllib.unquote_plus(key)
+ logging.info('Restoring queue %s ...' % ukey)
+ queue = self.get_queue(ukey)
+ queue.load_from_journal(os.path.join(self.jdir,
+ '%s.log' % key))
logging.info('Redqueue is ready to serve.')
View
2 redqueue/server.py
@@ -33,7 +33,7 @@ def handle_accept(self, fd, events):
p.server = self
def start(self, host, port):
- self.queue_factory.scan_logs()
+ self.queue_factory.scan_journals()
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
self._sock.setblocking(0)
self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
View
15 redqueue_server.py
@@ -18,16 +18,19 @@
define('host', default="0.0.0.0", help="The binded ip host")
define('port', default=11211, type=int, help='The port to be listened')
-define('logdir', default='log', help='The directory to put logs')
-define('reliable', default='no', help='Store data to log files, options: (no, yes, sync)')
-
+define('jdir', default='journal', help='The directory to put journals')
+define('reliable', default='yes', help='Store data to log files, options: (no, yes, sync)')
+define('logfile', default='', help='Place where logging rows(info, debug, ...) are put.')
def main():
tornado.options.parse_command_line()
- if not os.path.isdir(options.logdir):
- logging.error('Log directory %s does not exist.' % options.logdir)
+ if options.logfile:
+ logging.basicConfig(filename=options.logfile, level=logging.DEBUG)
+
+ if not os.path.isdir(options.jdir):
+ logging.error('Log directory %s does not exist.' % options.jdir)
sys.exit(1)
- server = Server(options.logdir, options.reliable)
+ server = Server(options.jdir, options.reliable)
server.start(options.host, options.port)
task.run_all(server)
ioloop.IOLoop.instance().start()

0 comments on commit cbecc5e

Please sign in to comment.
Something went wrong with that request. Please try again.