Permalink
Browse files

Merge pull request #25 from felinx/master

Add async Writer to support pub&mpub
  • Loading branch information...
2 parents 89ba22d + 764944a commit 9ad4f3039bbef9d03d37d605bf13bbe68c385c13 @mreiferson mreiferson committed Apr 30, 2013
Showing with 285 additions and 6 deletions.
  1. +2 −2 nsq/Reader.py
  2. +256 −0 nsq/Writer.py
  3. +5 −4 nsq/__init__.py
  4. +14 −0 nsq/nsq.py
  5. +8 −0 tests/test_command.py
View
@@ -421,7 +421,7 @@ def _identify_response_callback(self, conn, data):
try:
data = json.loads(data)
- except json.JSONDecodeError:
+ except ValueError:
logging.warning("[%s] failed to parse JSON from nsqd: %r", conn.id, data)
return
@@ -461,7 +461,7 @@ def _finish_query_lookupd(self, response, endpoint):
try:
lookup_data = json.loads(response.body)
- except json.JSONDecodeError:
+ except ValueError:
logging.warning("[%s] failed to parse JSON from lookupd: %r", endpoint, response.body)
return
View
@@ -0,0 +1,256 @@
+# -*- coding: utf-8 -*-
+"""
+high-level NSQ writer class built on top of a Tornado IOLoop supporting async
+pub & mpub messages to nsqds.
+
+PUB message in a Tornado web handler ex.
+ # -*- coding: utf-8 -*-
+ import functools
+ import tornado.httpserver
+ import tornado.ioloop
+ import tornado.options
+ import tornado.web
+ from nsq import Writer, Error
+
+ from tornado.options import define, options
+
+ define("port", default=8888, help="run on the given port", type=int)
+
+ class MainHandler(tornado.web.RequestHandler):
+ @property
+ def nsq(self):
+ return self.application.nsq
+
+ def get(self):
+ topic = "log"
+ msg = "Hello world"
+ msg_cn = "Hello 世界"
+
+ self.nsq.pub(topic, msg) # pub
+ self.nsq.mpub(topic, [msg, msg_cn]) # mpub
+
+ # customize callback
+ callback = functools.partial(self.finish_pub, topic=topic, msg=msg)
+ self.nsq.pub(topic, msg, callback=callback)
+
+ self.write(msg)
+
+ def finish_pub(self, conn, data, topic, msg):
+ if isinstance(data, Error):
+ # try to re-pub message again if pub failed
+ self.nsq.pub(topic, msg)
+
+
+ class Application(tornado.web.Application):
+ def __init__(self, handlers, **settings):
+ self.nsq = Writer(["127.0.0.1:4150", ])
+ super(Application, self).__init__(handlers, **settings)
+
+
+ def main():
+ tornado.options.parse_command_line()
+ application = Application([
+ (r"/", MainHandler),
+ ])
+ http_server = tornado.httpserver.HTTPServer(application)
+ http_server.listen(options.port)
+
+ tornado.ioloop.IOLoop.instance().start()
+
+
+ if __name__ == "__main__":
+ main()
+"""
+import logging
+try:
+ import simplejson as json
+except ImportError:
+ import json # pyflakes.ignore
+import time
+import socket
+import functools
+import random
+
+import tornado.ioloop
+
+import nsq
+import async
+
+
+class Writer(object):
+ def __init__(self, nsqd_tcp_addresses, heartbeat_interval=30):
+ """
+ Writer pub&mpub messages to the specified ``nsqd_tcp_addresses`` in async modes.
+
+ ``nsqd_tcp_addresses`` a sequence of (addresses, port) of the nsqd instances this writer
+ should connect to
+ """
+ assert isinstance(heartbeat_interval, (int, float)) and heartbeat_interval >= 1
+ if not isinstance(nsqd_tcp_addresses, (list, set, tuple)):
+ assert isinstance(nsqd_tcp_addresses, (str, unicode))
+ nsqd_tcp_addresses = [nsqd_tcp_addresses]
+ assert nsqd_tcp_addresses
+
+ self.nsqd_tcp_addresses = nsqd_tcp_addresses
+ self.heartbeat_interval = int(heartbeat_interval * 1000)
+ self.hostname = socket.gethostname()
+ self.short_hostname = self.hostname.split('.')[0]
+ self.conns = {}
+
+ logging.info("starting writer...")
+ self.connect()
+
+ tornado.ioloop.PeriodicCallback(self.check_last_recv_timestamps, 60 * 1000).start()
+
+ def pub(self, topic, msg, callback=None):
+ self._pub("pub", topic, msg, callback)
+
+ def mpub(self, topic, msg, callback=None):
+ if isinstance(msg, (str, unicode)):
+ msg = [msg]
+ assert isinstance(msg, (list, set, tuple))
+
+ self._pub("mpub", topic, msg, callback)
+
+ def _pub(self, command, topic, msg, callback):
+ if not callback:
+ callback = functools.partial(self.finish_pub, command=command,
+ topic=topic, msg=msg)
+
+ conn = random.choice(self.conns.values())
+ try:
+ cmd = getattr(nsq, command)
+ conn.send(cmd(topic, msg))
+
+ conn.callback_queue.append(callback)
+ except Exception, error:
+ logging.exception('[%s] failed to send %s' % (conn.id, command))
+ conn.close()
+
+ callback(conn, SendError(error))
+
+ def _data_callback(self, conn, raw_data):
+ do_callback = False
+ conn.last_recv_timestamp = time.time()
+ frame, data = nsq.unpack_response(raw_data)
+ if frame == nsq.FRAME_TYPE_RESPONSE and data == "_heartbeat_":
+ logging.info("[%s] received heartbeat", conn.id)
+ self.heartbeat(conn)
+ conn.send(nsq.nop())
+ elif frame == nsq.FRAME_TYPE_RESPONSE:
+ do_callback = True
+ elif frame == nsq.FRAME_TYPE_ERROR:
+ logging.error("[%s] ERROR: %s", conn.id, data)
+ data = DataError(data)
+ do_callback = True
+
+ if do_callback and conn.callback_queue:
+ callback = conn.callback_queue.pop(0)
+ callback(conn, data)
+
+ def connect(self):
+ for addr in self.nsqd_tcp_addresses:
+ host, port = addr.split(':')
+ self.connect_to_nsqd(host, int(port))
+
+ def connect_to_nsqd(self, host, port):
+ assert isinstance(host, (str, unicode))
+ assert isinstance(port, int)
+
+ conn_id = host + ':' + str(port)
+ if conn_id in self.conns:
+ return
+
+ logging.info("[%s] connecting to nsqd", conn_id)
+ conn = async.AsyncConn(host, port, self._connect_callback,
+ self._data_callback, self._close_callback)
+ conn.connect()
+
+ conn.id = conn_id
+ conn.last_recv_timestamp = time.time()
+ conn.callback_queue = []
+
+ self.conns[conn_id] = conn
+
+ def _connect_callback(self, conn):
+ try:
+ identify_data = {
+ 'short_id': self.short_hostname,
+ 'long_id': self.hostname,
+ 'heartbeat_interval': self.heartbeat_interval,
+ 'feature_negotiation': True,
+ }
+ logging.info("[%s] IDENTIFY sent %r", conn.id, identify_data)
+ conn.send(nsq.identify(identify_data))
+ conn.callback_queue.append(self._identify_response_callback)
+ except Exception:
+ conn.close()
+ logging.exception('[%s] failed to bootstrap connection' % conn.id)
+
+ def _identify_response_callback(self, conn, data):
+ if data == 'OK' or isinstance(data, nsq.Error):
+ return
+
+ try:
+ data = json.loads(data)
+ except ValueError:
+ logging.warning("[%s] failed to parse JSON from nsqd: %r", conn.id, data)
+ return
+
+ logging.info('[%s] IDENTIFY received %r', conn.id, data)
+
+ def _close_callback(self, conn):
+ if conn.id in self.conns:
+ del self.conns[conn.id]
+
+ for callback in conn.callback_queue:
+ try:
+ callback(conn, ConnectionClosedError())
+ except Exception, error:
+ logging.exception("[%s] failed to callback: %s", conn.id, error)
+
+ logging.warning("[%s] connection closed", conn.id)
+ logging.info("[%s] attempting to reconnect in 15s", conn.id)
+ reconnect_callback = functools.partial(self.connect_to_nsqd,
+ host=conn.host, port=conn.port)
+ tornado.ioloop.IOLoop.instance().add_timeout(time.time() + 15, reconnect_callback)
+
+ def check_last_recv_timestamps(self):
+ now = time.time()
+ for conn_id, conn in self.conns.iteritems():
+ timestamp = conn.last_recv_timestamp
+ if (now - timestamp) > ((self.heartbeat_interval * 2) / 1000.0):
+ # this connection hasnt received data beyond
+ # the configured heartbeat interval, close it
+ logging.warning("[%s] connection is stale (%.02fs), closing", conn.id, (now - timestamp))
+ conn.close()
+
+ #
+ # subclass overwriteable
+ #
+ def heartbeat(self, conn):
+ pass
+
+ def finish_pub(self, conn, data, command, topic, msg):
+ """Default pub&mpub callback, overwrite it to do re-pub etc"""
+ if isinstance(data, nsq.Error):
+ logging.error('[%s] failed to %s (%s, %s), data is %s',
+ conn.id, command, topic, msg, data)
+
+
+class DataError(nsq.Error):
+ def __init__(self, data):
+ self.data = data
+
+ def __str__(self):
+ return "DataError: %s" % self.data
+
+class SendError(nsq.Error):
+ def __init__(self, error):
+ self.error = error
+
+ def __str__(self):
+ return "SendError: %s" % self.error
+
+class ConnectionClosedError(nsq.Error):
+ pass
View
@@ -2,13 +2,14 @@
import tornado.ioloop
import logging
-from nsq import Message, unpack_response, decode_message, valid_topic_name, valid_channel_name
-from nsq import identify, subscribe, ready, finish, touch, requeue, nop
+from nsq import Message, Error, unpack_response, decode_message, valid_topic_name, valid_channel_name
+from nsq import identify, subscribe, ready, finish, touch, requeue, nop, pub, mpub
from nsq import FRAME_TYPE_RESPONSE, FRAME_TYPE_ERROR, FRAME_TYPE_MESSAGE, TOUCH, FIN, REQ
from BackoffTimer import BackoffTimer
from sync import SyncConn
from async import AsyncConn
from Reader import Reader
+from Writer import Writer
def _handle_term_signal(sig_num, frame):
@@ -23,9 +24,9 @@ def run():
__version__ = '0.4.1'
__author__ = "Matt Reiferson <snakes@gmail.com>"
-__all__ = ["Reader", "run", "BackoffTimer", "Message",
+__all__ = ["Reader", "Writer", "run", "BackoffTimer", "Message", "Error",
"SyncConn", "AsyncConn", "unpack_response", "decode_message",
- "identify", "subscribe", "ready", "finish", "touch", "requeue", "nop",
+ "identify", "subscribe", "ready", "finish", "touch", "requeue", "nop","pub", "mpub",
"valid_topic_name", "valid_channel_name",
"FRAME_TYPE_RESPONSE", "FRAME_TYPE_ERROR", "FRAME_TYPE_MESSAGE",
"TOUCH", "FIN", "REQ"]
View
@@ -51,6 +51,10 @@ def touch(self):
self.respond(TOUCH)
+class Error(Exception):
+ pass
+
+
def unpack_response(data):
frame = struct.unpack('>l', data[:4])[0]
return frame, data[4:]
@@ -97,6 +101,16 @@ def touch(id):
def nop():
return _command('NOP', None)
+def pub(topic, data):
+ return _command('PUB', data, topic)
+
+def mpub(topic, data):
+ assert isinstance(data, (set, list))
+ body = struct.pack('>l', len(data))
+ for m in data:
+ body += struct.pack('>l', len(m)) + m
+ return _command('MPUB', body, topic)
+
def valid_topic_name(topic):
if not 0 < len(topic) < 33:
return False
View
@@ -16,6 +16,8 @@
def pytest_generate_tests(metafunc):
identify_body = json.dumps({'a': 1, 'b': 2})
+ msgs = ['asdf', 'ghjk', 'abcd']
+ mpub_body = struct.pack('>l', len(msgs)) + ''.join(struct.pack('>l', len(m)) + m for m in msgs)
if metafunc.function == test_command:
for cmd_method, kwargs, result in [
(nsq.identify,
@@ -42,6 +44,12 @@ def pytest_generate_tests(metafunc):
(nsq.nop,
{},
'NOP\n'),
+ (nsq.pub,
+ {'topic': 'test', 'data': msgs[0]},
+ 'PUB test\n' + struct.pack('>l', len(msgs[0])) + msgs[0]),
+ (nsq.mpub,
+ {'topic': 'test', 'data': msgs},
+ 'MPUB test\n' + struct.pack('>l', len(mpub_body)) + mpub_body)
]:
metafunc.addcall(funcargs=dict(cmd_method=cmd_method, kwargs=kwargs, result=result))

0 comments on commit 9ad4f30

Please sign in to comment.