Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

pynsq: low-level python client for nsq

  • Loading branch information...
commit e5107e7677f3805a254a684124ef0c338f36e803 1 parent 2f76710
@mreiferson mreiferson authored
View
1  pynsq/.gitignore
@@ -0,0 +1 @@
+*.pyc
View
11 pynsq/nsq/__init__.py
@@ -0,0 +1,11 @@
+from nsq import unpack_response, decode_message, subscribe, ready, finish, requeue, nop
+from nsq import FRAME_TYPE_RESPONSE, FRAME_TYPE_ERROR, FRAME_TYPE_MESSAGE
+from sync import SyncConn
+from async import AsyncConn
+
+
+__version__ = '0.1'
+__author__ = "Matt Reiferson <snakes@gmail.com>"
+__all__ = ["SyncConn", "AsyncConn", "unpack_response", "decode_message",
+ "subscribe", "ready", "finish", "requeue", "nop",
+ "FRAME_TYPE_RESPONSE", "FRAME_TYPE_ERROR", "FRAME_TYPE_MESSAGE"]
View
91 pynsq/nsq/async.py
@@ -0,0 +1,91 @@
+import socket
+import struct
+
+import tornado.iostream
+
+import nsq
+
+
+class AsyncConn(object):
+ def __init__(self, host, port, connect_callback, data_callback, close_callback, timeout=1.0):
+ assert isinstance(host, (str, unicode))
+ assert isinstance(port, int)
+ assert callable(data_callback)
+ assert callable(close_callback)
+ assert isinstance(timeout, float)
+
+ self.connecting = False
+ self.connected = False
+ self.host = host
+ self.port = port
+ self.connect_callback = connect_callback
+ self.data_callback = data_callback
+ self.close_callback = close_callback
+ self.timeout = timeout
+
+ def connect(self):
+ if self.connected or self.connecting:
+ return
+
+ self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.s.settimeout(self.timeout)
+ self.s.setblocking(0)
+
+ self.stream = tornado.iostream.IOStream(self.s)
+ self.stream.set_close_callback(self._socket_close)
+
+ self.connecting = True
+ self.stream.connect((self.host, self.port), self._connect_callback)
+
+ def _connect_callback(self):
+ self.connecting = False
+ self.connected = True
+ self.stream.write(nsq.MAGIC_V2)
+ self.stream.read_bytes(4, self._read_size)
+ self.connect_callback(self)
+
+ def _socket_close(self):
+ self.connected = False
+ self.close_callback(self)
+
+ def close(self):
+ self.connected = False
+ self.stream.close()
+
+ def _read_size(self, data):
+ size = struct.unpack('>l', data)[0]
+ self.stream.read_bytes(size, self._read_body)
+
+ def _read_body(self, data):
+ self.stream.read_bytes(4, self._read_size)
+ self.data_callback(self, data)
+
+ def send(self, data):
+ self.stream.write(data)
+
+ def __str__(self):
+ return self.host + ':' + str(self.port)
+
+
+if __name__ == '__main__':
+ def connect_callback(c):
+ print "connected"
+ c.send(nsq.subscribe('test', 'ch', 'a', 'b'))
+ c.send(nsq.ready(1))
+
+ def close_callback(c):
+ print "connection closed"
+
+ def data_callback(c, data):
+ unpacked = nsq.unpack_response(data)
+ if unpacked[0] == nsq.FRAME_TYPE_MESSAGE:
+ c.send(nsq.ready(1))
+ msg = nsq.decode_message(unpacked[1])
+ print msg.id, msg.body
+ c.send(nsq.finish(msg.id))
+
+ c = AsyncConn("127.0.0.1", 4150, connect_callback, data_callback, close_callback)
+ c.connect()
+
+ tornado.ioloop.IOLoop.instance().start()
+
View
47 pynsq/nsq/nsq.py
@@ -0,0 +1,47 @@
+import struct
+
+
+MAGIC_V2 = " V2"
+NL = "\n"
+
+FRAME_TYPE_RESPONSE = 0
+FRAME_TYPE_ERROR = 1
+FRAME_TYPE_MESSAGE = 2
+
+
+class Message(object):
+ def __init__(self, id, body, timestamp, attempts):
+ self.id = id
+ self.body = body
+ self.timestamp = timestamp
+ self.attempts = attempts
+
+
+def unpack_response(data):
+ frame = struct.unpack('>l', data[:4])[0]
+ return frame, data[4:]
+
+def decode_message(data):
+ timestamp = struct.unpack('>q', data[:8])[0]
+ attempts = struct.unpack('>h', data[8:10])[0]
+ id = data[10:26]
+ body = data[26:]
+ return Message(id, body, timestamp, attempts)
+
+def _command(cmd, *params):
+ return "%s %s%s" % (cmd, ' '.join(params), NL)
+
+def subscribe(topic, channel, short_id, long_id):
+ return _command('SUB', topic, channel, short_id, long_id)
+
+def ready(count):
+ return _command('RDY', str(count))
+
+def finish(id):
+ return _command('FIN', id)
+
+def requeue(id, time_ms):
+ return _command('REQ', id, time_ms)
+
+def nop():
+ return _command('NOP')
View
50 pynsq/nsq/sync.py
@@ -0,0 +1,50 @@
+import socket
+import struct
+
+import nsq
+
+
+class SyncConn(object):
+ def __init__(self, timeout=1.0):
+ self.buffer = ''
+ self.timeout = timeout
+
+ def connect(self, host, port):
+ assert isinstance(host, (str, unicode))
+ assert isinstance(port, int)
+ self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.s.settimeout(self.timeout)
+ self.s.connect((host, port))
+ self.s.send(nsq.MAGIC_V2)
+
+ def _readn(self, size):
+ while True:
+ if len(self.buffer) >= size:
+ break
+ packet = self.s.recv(4096)
+ if not packet:
+ raise Exception("failed to read %d" % size)
+ self.buffer += packet
+ data = self.buffer[:size]
+ self.buffer = self.buffer[size:]
+ return data
+
+ def read_response(self):
+ size = struct.unpack('>l', self._readn(4))[0]
+ return self._readn(size)
+
+ def send(self, data):
+ self.s.send(data)
+
+
+if __name__ == '__main__':
+ c = SyncConn()
+ c.connect("127.0.0.1", 4150)
+ c.send(nsq.subscribe('test', 'ch', 'a', 'b'))
+ for i in xrange(10):
+ c.send(nsq.ready(1))
+ resp = c.read_response()
+ unpacked = nsq.unpack_response(resp)
+ msg = nsq.decode_message(unpacked[1])
+ print msg.id, msg.body
+ c.send(nsq.finish(msg.id))
View
15 pynsq/setup.py
@@ -0,0 +1,15 @@
+from setuptools import setup
+
+version = '0.1'
+
+setup(name='pynsq',
+ version=version,
+ description="a Python reader for NSQ",
+ keywords='python nsq',
+ author='Matt Reiferson',
+ author_email='snakes@gmail.com',
+ url='http://github.com/bitly/nsq/pynsq',
+ packages=['nsq'],
+ include_package_data=True,
+ zip_safe=True,
+ )
Please sign in to comment.
Something went wrong with that request. Please try again.