Browse files

split one file into installable packages

  • Loading branch information...
1 parent 4a539c5 commit de24dc577a781089efe7672c2e8a3ff032e224db @superisaac committed Jan 8, 2010
Showing with 364 additions and 307 deletions.
  1. +3 −0 .gitignore
  2. +5 −4 README
  3. +0 −303 redqueue.py
  4. 0 redqueue/__init__.py
  5. +164 −0 redqueue/queue.py
  6. +149 −0 redqueue/server.py
  7. +34 −0 redqueue_server.py
  8. +9 −0 setup.py
View
3 .gitignore
@@ -0,0 +1,3 @@
+log
+*.pyc
+*~
View
9 README
@@ -7,16 +7,17 @@ of effective code.
Redqueue is free and unencumbered public domain software.
-== INSTALL
+== Install and Run
Install tornado and (optional) python-memcached for client testing
Get the source from
git@github.com:superisaac/redqueue.git
-
- % python redqueue.py
+ % python setup.py install
+ % mkdir -p log
+ % redqueue_server.py
For more options please run
- % python redqueue.py --help
+ % redqueue_server.py --help
== Reserve/delete mode
Now the program is able to behave like beanstalk to handle client crash, yet it uses memcached protocol
View
303 redqueue.py
@@ -1,303 +0,0 @@
-#!/usr/bin/python
-####################################################################
-#
-# All of the deliverable code in REDQUEUE has been dedicated to the
-# PUBLIC DOMAIN by the authors.
-#
-# Author: Zeng Ke superisaac.ke at gmail dot com
-#
-####################################################################
-import re, os, sys
-import socket
-import logging
-import time
-import urllib
-from collections import deque
-
-from tornado import iostream
-from tornado import ioloop
-import tornado.options
-from tornado.options import define, options
-
-define('host', default="0.0.0.0", help="The binded ip host")
-define('port', default=11211, type=int, help='The port to be listened')
-define('logdir', default='log', help='The directory to put logs')
-define('reliable', default='no', help='Store data to log files, options: (no, yes, sync)')
-LOG_CAPACITY = 1024 * 1024 # 1 mega bytes for each chunk
-
-class Server(object):
- def __init__(self, logdir):
- self.queue_collection = {}
- self.logdir = logdir
- if options.reliable in ('yes', 'sync'):
- self.queue_class = ReliableQueue
- if options.reliable == 'sync':
- ReliableQueue.addlog = ReliableQueue.addlog_sync
- else:
- self.queue_class = Queue
-
- def get_queue(self, key, auto_create=True):
- if key not in self.queue_collection and auto_create:
- self.queue_collection[key] = self.queue_class(key)
- return self.queue_collection.get(key)
-
- def handle_accept(self, fd, events):
- conn, addr = self._sock.accept()
- p = Protocol(iostream.IOStream(conn))
-
- def start(self, host, port):
- self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- self._sock.setblocking(0)
- self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- self._sock.bind((host, port))
- self._sock.listen(128)
- ioloop.IOLoop.instance().add_handler(self._sock.fileno(),
- self.handle_accept,
- ioloop.IOLoop.READ)
- def scan_logs(self):
- logging.info('Sanning logs ...')
- queue_fns = {}
- for fn in os.listdir(self.logdir):
- m = re.search(r'-(\d+).log$', fn)
- if m:
- key = fn[:m.start()]
- tm = int(m.group(1))
- if (key not in queue_fns or
- tm > queue_fns[key]):
- queue_fns[key] = tm
-
- for key, tm in queue_fns.iteritems():
- ukey = urllib.unquote_plus(key)
- logging.info('Restoring queue %s ...' % ukey)
- queue = self.get_queue(ukey)
- queue.load_from_log(os.path.join(self.logdir,
- '%s-%d.log' % (key, tm)))
- #self.queue_collection[ukey] = queue
- logging.info('Redqueue is ready to serve.')
-
-server = None
-
-#TODO: binary log
-class Queue(object):
- def __init__(self, key):
- self.key = key
- self._queue = deque()
- self.log = None
- self.borrowing = {}
- self.rotate_log()
-
- def addlog(self, w):
- pass
- def rotate_log(self):
- pass
-
- def return_(self, prot_id):
- timeout, data = self.borrowing.pop(prot_id)
- self._queue.appendleft((timeout, data))
- self.addlog('R %s\r\n' % prot_id)
-
- def enqueue(self, timeout, data):
- self._queue.appendleft((timeout, data))
-
- self.addlog('S %d %d\r\n%s\r\n' % (timeout,
- len(data), data))
-
- def use(self, prot_id):
- if prot_id in self.borrowing:
- self.addlog("U %s\r\n" % prot_id)
- del self.borrowing[prot_id]
-
- def dequeue(self, prot_id=None):
- while True:
- try:
- timeout, data = self._queue.pop()
- except IndexError:
- return None
- if prot_id is None:
- self.addlog('G\r\n')
- else:
- self.addlog('B %s\r\n' % prot_id)
-
- self.rotate_log()
- if timeout > 0 and timeout < time.time():
- continue
- if prot_id:
- assert prot_id not in self.borrowing
- self.borrowing[prot_id] = (timeout, data)
- return timeout, data
-
- def load_from_log(self, logpath):
- logfile = open(logpath, 'rb')
- while True:
- line = logfile.readline()
- if not line:
- break
- if line.startswith('B'):
- _, prot_id = line.split()
- try:
- data = self._queue.pop()
- self.borrowing[prot_id] = data
- except IndexError:
- logging.error('Pop from empty stack')
- elif line.startswith('U'):
- _, prot_id = line.split()
- assert prot_id in self.borrowing
- del self.borrowing[prot_id]
- elif line.startswith('R'):
- _, prot_id = line.split()
- assert prot_id in self.borrowing
- t = self.borrowing.pop(prot_id)
- self._queue.appendleft(t)
- elif line.startswith('G'):
- try:
- self._queue.pop()
- except IndexError:
- logging.error('Pop from empty stack')
- elif line.startswith('S'):
- t, timeout, lendata = line.split()
- data = logfile.read(int(lendata))
- logfile.read(2) # line break
- self._queue.appendleft((int(timeout),
- data))
- else:
- logging.error('Bad format for log file %s' % logpath)
-
- for t in self.borrowing.itervalues():
- self._queue.append(t)
- self.borrowing = {}
- logfile.close()
-
-class ReliableQueue(Queue):
- def addlog(self, w):
- os.write(self.log.fileno(), w)
- self.log.flush()
-
- def addlog_sync(self, w):
- os.write(self.log.fileno(), w)
- self.log.flush()
- #os.fdatasync(self.log.fileno())
- os.fsync(self.log.fileno())
-
- def rotate_log(self):
- if self.log is None or (len(self._queue) == 0 and
- len(self.borrowing) == 0 and
- self.log.tell() >= LOG_CAPACITY):
- if self.log:
- self.log.close()
- fn = os.path.join(server.logdir,
- '%s-%d.log' % (urllib.quote_plus(self.key),
- time.time()))
- self.log = open(fn, 'ab')
-
-class Protocol(object):
- def __init__(self, stream):
- self.protocol_id = str(id(self))
- self.stream = stream
- self.stream.set_close_callback(self._return_data)
- self.route = {
- 'get': self.handle_get,
- 'gets': self.handle_gets,
- 'set': self.handle_set,
- 'delete': self.handle_delete}
- self.wait_for_line()
- self.reservation = False
- self.resved_keys = set()
-
- def set_reservation(self, value):
- orig_reservation = self.reservation
- self.reservation = value in ('1', 'true')
- if orig_reservation != self.reservation:
- logging.info('Set reservation to be %s' % self.reservation)
- self.use_key()
-
- def use_key(self, key=None):
- if key is None:
- for k in self.resved_keys:
- server.get_queue(k).use(self.protocol_id)
- self.resved_keys = set()
- elif key in self.resved_keys:
- server.get_queue(key).use(self.protocol_id)
- self.resved_keys.remove(key)
-
- def _return_data(self):
- for key in self.resved_keys:
- server.get_queue(key).return_(self.protocol_id)
- self.resved_keys = set()
-
- def wait_for_line(self):
- self.stream.read_until('\r\n', self.line_received)
-
- def line_received(self, line):
- args = line.split()
- data_required = self.route.get(args[0].lower(),
- self.handle_unknown)(*args[1:])
- if not data_required:
- self.wait_for_line()
-
- def handle_unknown(self, *args):
- self.stream.write("CLIENT_ERROR bad command line format\r\n")
-
- def handle_set(self, key, flags, exptime, bytes, *args):
- bytes = int(bytes)
- exptime = int(exptime)
- if exptime > 0:
- exptime = time.time() + exptime
-
- def on_set_data(data):
- data = data[:-2]
- if key == 'config:reserv':
- self.set_reservation(data)
- else:
- server.get_queue(key).enqueue(exptime, data)
- self.stream.write('STORED\r\n')
- self.wait_for_line()
- self.stream.read_bytes(bytes + 2, on_set_data)
- return True
-
- def _get_data(self, key):
- if self.reservation and (key in self.resved_keys):
- return None
- prot_id = None
- if self.reservation:
- prot_id = self.protocol_id
- q = server.get_queue(key)
- t = q and q.dequeue(prot_id=prot_id) or None
- if t:
- if self.reservation:
- self.resved_keys.add(key)
- return t[1] # t is a tuple of (timeout, data)
-
- def handle_get(self, *keys):
- for key in keys:
- data = self._get_data(key)
- if data:
- self.stream.write('VALUE %s 0 %d\r\n%s\r\n' % (key, len(data), data))
- self.stream.write('END\r\n')
-
- def handle_gets(self, *keys):
- """ Gets here is like a poll(), return the first non-empty queue
- number, so that a client can wait several queues.
- """
- for key in keys:
- data = self._get_data(key)
- if data:
- self.stream.write('VALUE %s 0 %d\r\n%s\r\n' % (key, len(data), data))
- break
- self.stream.write('END\r\n')
-
- def handle_delete(self, key, *args):
- if key in self.resved_keys:
- self.use_key(key)
- self.stream.write('DELETED\r\n')
- else:
- self.stream.write('NOT_DELETED\r\n')
-
-if __name__ == '__main__':
- tornado.options.parse_command_line()
- if not os.path.isdir(options.logdir):
- logging.error('Log directory %s does not exist.' % options.logdir)
- sys.exit(1)
- server = Server(options.logdir)
- server.scan_logs()
- server.start(options.host, options.port)
- ioloop.IOLoop.instance().start()
View
0 redqueue/__init__.py
No changes.
View
164 redqueue/queue.py
@@ -0,0 +1,164 @@
+#!/usr/bin/python
+####################################################################
+#
+# All of the deliverable code in REDQUEUE has been dedicated to the
+# PUBLIC DOMAIN by the authors.
+#
+# Author: Zeng Ke superisaac.ke at gmail dot com
+#
+####################################################################
+import re, os, sys
+import logging
+import time
+import urllib
+from collections import deque
+
+from tornado import iostream
+from tornado import ioloop
+
+LOG_CAPACITY = 1024 * 1024 # 1 mega bytes for each chunk
+
+#TODO: binary log
+class Queue(object):
+ def __init__(self, key):
+ self.key = key
+ self._queue = deque()
+ self.log = None
+ self.borrowing = {}
+
+ def addlog(self, w):
+ pass
+ def rotate_log(self):
+ pass
+
+ def return_(self, prot_id):
+ timeout, data = self.borrowing.pop(prot_id)
+ self._queue.appendleft((timeout, data))
+ self.addlog('R %s\r\n' % prot_id)
+
+ def enqueue(self, timeout, data):
+ self._queue.appendleft((timeout, data))
+ self.addlog('S %d %d\r\n%s\r\n' % (timeout,
+ len(data), data))
+
+ def use(self, prot_id):
+ if prot_id in self.borrowing:
+ self.addlog("U %s\r\n" % prot_id)
+ del self.borrowing[prot_id]
+
+ def dequeue(self, prot_id=None):
+ while True:
+ try:
+ timeout, data = self._queue.pop()
+ except IndexError:
+ return None
+ if prot_id is None:
+ self.addlog('G\r\n')
+ else:
+ self.addlog('B %s\r\n' % prot_id)
+
+ self.rotate_log()
+ if timeout > 0 and timeout < time.time():
+ continue
+ if prot_id:
+ assert prot_id not in self.borrowing
+ self.borrowing[prot_id] = (timeout, data)
+ return timeout, data
+
+ def load_from_log(self, logpath):
+ logfile = open(logpath, 'rb')
+ while True:
+ line = logfile.readline()
+ if not line:
+ break
+ if line.startswith('B'):
+ _, prot_id = line.split()
+ try:
+ data = self._queue.pop()
+ self.borrowing[prot_id] = data
+ except IndexError:
+ logging.error('Pop from empty stack')
+ elif line.startswith('U'):
+ _, prot_id = line.split()
+ assert prot_id in self.borrowing
+ del self.borrowing[prot_id]
+ elif line.startswith('R'):
+ _, prot_id = line.split()
+ assert prot_id in self.borrowing
+ t = self.borrowing.pop(prot_id)
+ self._queue.appendleft(t)
+ elif line.startswith('G'):
+ try:
+ self._queue.pop()
+ except IndexError:
+ logging.error('Pop from empty stack')
+ elif line.startswith('S'):
+ t, timeout, lendata = line.split()
+ data = logfile.read(int(lendata))
+ logfile.read(2) # line break
+ self._queue.appendleft((int(timeout),
+ data))
+ else:
+ logging.error('Bad format for log file %s' % logpath)
+
+ for t in self.borrowing.itervalues():
+ self._queue.append(t)
+ self.borrowing = {}
+ logfile.close()
+
+class ReliableQueue(Queue):
+ def addlog(self, w):
+ os.write(self.log.fileno(), w)
+ self.log.flush()
+
+ def addlog_sync(self, w):
+ os.write(self.log.fileno(), w)
+ self.log.flush()
+ #os.fdatasync(self.log.fileno())
+ os.fsync(self.log.fileno())
+
+ def rotate_log(self):
+ if self.log is None or (len(self._queue) == 0 and
+ len(self.borrowing) == 0 and
+ self.log.tell() >= LOG_CAPACITY):
+ if self.log:
+ self.log.close()
+ fn = os.path.join(self.server.logdir,
+ '%s-%d.log' % (urllib.quote_plus(self.key),
+ time.time()))
+ logging.info('rotate log to %s' % fn)
+ self.log = open(fn, 'ab')
+
+class QueueFactory(object):
+ queue_class = Queue
+ def __init__(self, logdir):
+ self.queue_collection = {}
+ self.logdir = logdir
+
+ def get_queue(self, key, auto_create=True):
+ if key not in self.queue_collection and auto_create:
+ q = self.queue_class(key)
+ q.server = self
+ q.rotate_log()
+ self.queue_collection[key] = q
+ return self.queue_collection.get(key)
+
+ def scan_logs(self):
+ logging.info('Sanning logs ...')
+ queue_fns = {}
+ for fn in os.listdir(self.logdir):
+ m = re.search(r'-(\d+).log$', fn)
+ if m:
+ key = fn[:m.start()]
+ tm = int(m.group(1))
+ if (key not in queue_fns or
+ tm > queue_fns[key]):
+ queue_fns[key] = tm
+
+ for key, tm in queue_fns.iteritems():
+ ukey = urllib.unquote_plus(key)
+ logging.info('Restoring queue %s ...' % ukey)
+ queue = self.get_queue(ukey)
+ queue.load_from_log(os.path.join(self.logdir,
+ '%s-%d.log' % (key, tm)))
+ logging.info('Redqueue is ready to serve.')
View
149 redqueue/server.py
@@ -0,0 +1,149 @@
+#!/usr/bin/python
+####################################################################
+#
+# All of the deliverable code in REDQUEUE has been dedicated to the
+# PUBLIC DOMAIN by the authors.
+#
+# Author: Zeng Ke superisaac.ke at gmail dot com
+#
+####################################################################
+import re, os, sys
+import socket
+import logging
+import time
+
+from tornado import iostream
+from tornado import ioloop
+
+from redqueue.queue import QueueFactory, Queue, ReliableQueue
+
+class MemcacheServer(object):
+ def __init__(self, logdir, reliable='no'):
+ self.queue_factory = QueueFactory(logdir)
+ if reliable in ('yes', 'sync'):
+ self.queue_factory.queue_class = ReliableQueue
+ if reliable == 'sync':
+ ReliableQueue.addlog = ReliableQueue.addlog_sync
+ else:
+ self.queue_factory.queue_class = Queue
+
+ def handle_accept(self, fd, events):
+ conn, addr = self._sock.accept()
+ p = MemcacheProtocol(iostream.IOStream(conn))
+ p.server = self
+
+ def start(self, host, port):
+ self.queue_factory.scan_logs()
+ self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
+ self._sock.setblocking(0)
+ self._sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ self._sock.bind((host, port))
+ self._sock.listen(128)
+ ioloop.IOLoop.instance().add_handler(self._sock.fileno(),
+ self.handle_accept,
+ ioloop.IOLoop.READ)
+
+class MemcacheProtocol(object):
+ def __init__(self, stream):
+ self.protocol_id = str(id(self))
+ self.stream = stream
+ self.stream.set_close_callback(self._return_data)
+ self.route = {
+ 'get': self.handle_get,
+ 'gets': self.handle_gets,
+ 'set': self.handle_set,
+ 'delete': self.handle_delete}
+ self.wait_for_line()
+ self.reservation = False
+ self.resved_keys = set()
+
+ def set_reservation(self, value):
+ orig_reservation = self.reservation
+ self.reservation = value in ('1', 'true')
+ if orig_reservation != self.reservation:
+ logging.info('Set reservation to be %s' % self.reservation)
+ self.use_key()
+
+ def use_key(self, key=None):
+ if key is None:
+ for k in self.resved_keys:
+ self.server.queue_factory.get_queue(k).use(self.protocol_id)
+ self.resved_keys = set()
+ elif key in self.resved_keys:
+ self.server.queue_factory.get_queue(key).use(self.protocol_id)
+ self.resved_keys.remove(key)
+
+ def _return_data(self):
+ for key in self.resved_keys:
+ self.server.queue_factory.get_queue(key).return_(self.protocol_id)
+ self.resved_keys = set()
+
+ def wait_for_line(self):
+ self.stream.read_until('\r\n', self.line_received)
+
+ def line_received(self, line):
+ args = line.split()
+ data_required = self.route.get(args[0].lower(),
+ self.handle_unknown)(*args[1:])
+ if not data_required:
+ self.wait_for_line()
+
+ def handle_unknown(self, *args):
+ self.stream.write("CLIENT_ERROR bad command line format\r\n")
+
+ def handle_set(self, key, flags, exptime, bytes, *args):
+ bytes = int(bytes)
+ exptime = int(exptime)
+ if exptime > 0:
+ exptime = time.time() + exptime
+
+ def on_set_data(data):
+ data = data[:-2]
+ if key == 'config:reserv':
+ self.set_reservation(data)
+ else:
+ self.server.queue_factory.get_queue(key).enqueue(exptime, data)
+ self.stream.write('STORED\r\n')
+ self.wait_for_line()
+ self.stream.read_bytes(bytes + 2, on_set_data)
+ return True
+
+ def _get_data(self, key):
+ if self.reservation and (key in self.resved_keys):
+ return None
+ prot_id = None
+ if self.reservation:
+ prot_id = self.protocol_id
+ q = self.server.queue_factory.get_queue(key)
+ t = q and q.dequeue(prot_id=prot_id) or None
+ if t:
+ if self.reservation:
+ self.resved_keys.add(key)
+ return t[1] # t is a tuple of (timeout, data)
+
+ def handle_get(self, *keys):
+ for key in keys:
+ data = self._get_data(key)
+ if data:
+ self.stream.write('VALUE %s 0 %d\r\n%s\r\n' % (key, len(data), data))
+ self.stream.write('END\r\n')
+
+ def handle_gets(self, *keys):
+ """ Gets here is like a poll(), return the first non-empty queue
+ number, so that a client can wait several queues.
+ """
+ for key in keys:
+ data = self._get_data(key)
+ if data:
+ self.stream.write('VALUE %s 0 %d\r\n%s\r\n' % (key, len(data), data))
+ break
+ self.stream.write('END\r\n')
+
+ def handle_delete(self, key, *args):
+ if key in self.resved_keys:
+ self.use_key(key)
+ self.stream.write('DELETED\r\n')
+ else:
+ self.stream.write('NOT_DELETED\r\n')
+
+Server = MemcacheServer
View
34 redqueue_server.py
@@ -0,0 +1,34 @@
+#!/usr/bin/python
+####################################################################
+#
+# All of the deliverable code in REDQUEUE has been dedicated to the
+# PUBLIC DOMAIN by the authors.
+#
+# Author: Zeng Ke superisaac.ke at gmail dot com
+#
+####################################################################
+import re, os, sys
+import logging
+
+from tornado import ioloop
+import tornado.options
+from tornado.options import define, options
+from redqueue.server import Server
+
+define('host', default="0.0.0.0", help="The binded ip host")
+define('port', default=11211, type=int, help='The port to be listened')
+define('logdir', default='log', help='The directory to put logs')
+define('reliable', default='no', help='Store data to log files, options: (no, yes, sync)')
+
+
+def main():
+ tornado.options.parse_command_line()
+ if not os.path.isdir(options.logdir):
+ logging.error('Log directory %s does not exist.' % options.logdir)
+ sys.exit(1)
+ server = Server(options.logdir, options.reliable)
+ server.start(options.host, options.port)
+ ioloop.IOLoop.instance().start()
+
+if __name__ == '__main__':
+ main()
View
9 setup.py
@@ -0,0 +1,9 @@
+from distutils.core import setup
+
+setup(name='REDqueue',
+ verseion='0.0.1',
+ description='A light weigth queue server in python',
+ author='Zeng Ke',
+ packages = ['redqueue'],
+ scripts = ['redqueue_server.py']
+ )

0 comments on commit de24dc5

Please sign in to comment.