Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

try/except for callbacks + various small refactoring

  • Loading branch information...
commit 626274f811952cd2d5d5be34799864d6d7de979f 1 parent 29f5b96
@paolo-losi authored
View
1  TODO
@@ -1,4 +1,5 @@
* review of all methods
+* check conn/channel status before sending frames
* logging
* handling of callback failures
* README
View
8 stormed/connection.py
@@ -24,6 +24,7 @@ def __init__(self, host, username='guest', password='guest', vhost='/',
self.vhost = vhost
self.heartbeat = heartbeat
self.last_received_frame = None
+ self.frame_max = 0
self.io_loop = io_loop or IOLoop.instance()
self.stream = None
self.status = status.CLOSED
@@ -117,3 +118,10 @@ def on_closed_stream(self):
if self.status != status.CLOSED:
if self.on_disconnect:
self.on_disconnect()
+
+ def reset(self):
+ for c in self.channels:
+ if c is not self:
+ c.reset()
+ super(Connection, self).reset()
+ self.close_stream()
View
45 stormed/frame.py
@@ -1,6 +1,6 @@
import struct
-from stormed.util import Enum
+from stormed.util import Enum, AmqpError, logger
from stormed.message import MessageBuilder
from stormed.serialization import parse_method, dump_method, \
parse_content_header, dump_content_header
@@ -65,22 +65,22 @@ def __repr__(self):
self.channel,
self.size)
-def from_method(method, channel=0):
+def from_method(method, ch):
payload = dump_method(method)
- header = frame_header.pack('\x01', channel, len(payload))
+ header = frame_header.pack('\x01', ch.channel_id, len(payload))
return '%s%s%s' % (header, payload, '\xCE')
-def content_header_from_msg(msg, channel):
+def content_header_from_msg(msg, ch):
payload = dump_content_header(msg)
- header = frame_header.pack('\x02', channel, len(payload))
+ header = frame_header.pack('\x02', ch.channel_id, len(payload))
return '%s%s%s' % (header, payload, '\xCE')
-def body_frames_from_msg(msg, channel):
- max_size = 2**16 #FIXME should be set by connection negotiation
+def body_frames_from_msg(msg, ch):
+ max_size = ch.conn.frame_max
frames = []
for offset in range(0, len(msg.body), max_size):
payload = msg.body[offset:offset + max_size]
- header = frame_header.pack('\x03', channel, len(payload))
+ header = frame_header.pack('\x03', ch.channel_id, len(payload))
frames.append('%s%s%s' % (header, payload, '\xCE'))
return frames
@@ -104,8 +104,13 @@ def callback(self):
return self._pending_cb
def invoke_callback(self, *args, **kargs):
- self._pending_cb(*args, **kargs)
- self._pending_cb = None
+ if self._pending_cb:
+ try:
+ self._pending_cb(*args, **kargs)
+ except Exception:
+ logger.error('Error in callback for %s', self._pending_meth,
+ exc_info=True)
+ self._pending_cb = None
def process_frame(self, frame):
processor = getattr(self, 'process_'+frame.frame_type)
@@ -131,12 +136,16 @@ def process_heartbeat(self, hb):
self.conn.stream.write(HEARTBEAT)
def handle_method(self, method):
- if hasattr(method, 'handle'):
- method.handle(self)
pending_meth = self._pending_meth
+ if hasattr(method, 'handle'):
+ try:
+ method.handle(self)
+ except AmqpError:
+ logger.error('Error while handling %s', method, exc_info=True)
+ self.reset()
+ return
if pending_meth and method._name.startswith(pending_meth._name):
- if self._pending_cb:
- self.invoke_callback()
+ self.invoke_callback()
self._flush()
def send_method(self, method, callback=None, message=None):
@@ -160,16 +169,16 @@ def _flush(self):
callback()
def write_method(self, method):
- f = from_method(method, self.channel_id)
+ f = from_method(method, self)
self.conn.stream.write(f)
def write_msg(self, msg):
frames = []
- frames.append(content_header_from_msg(msg, self.channel_id))
- frames.extend(body_frames_from_msg(msg, self.channel_id))
+ frames.append(content_header_from_msg(msg, self))
+ frames.extend(body_frames_from_msg(msg, self))
self.conn.stream.write(''.join(frames))
- def hard_reset(self):
+ def reset(self):
self.status = status.CLOSED
self._method_queue = []
self._pending_meth = None
View
2  stormed/method/channel.py
@@ -28,7 +28,7 @@ def handle(self, channel):
except:
method = None
raise
- channel.hard_reset()
+ channel.reset()
error_code = id2constant.get(self.reply_code, '')
if channel.on_error:
channel.on_error(ChannelError(error_code, self.reply_text, method))
View
24 stormed/method/connection.py
@@ -1,4 +1,4 @@
-from stormed.util import add_method
+from stormed.util import add_method, AmqpError
from stormed.serialization import table2str
from stormed.heartbeat import HeartbeatMonitor
from stormed.frame import status
@@ -8,12 +8,12 @@
@add_method(Start)
def handle(self, conn):
- #FIXME handle missing AMQPLAIN mechanism
- assert 'AMQPLAIN' in self.mechanisms.split(' ')
- assert 'en_US' in self.locales.split(' ')
+ if 'AMQPLAIN' not in self.mechanisms.split(' '):
+ raise AmqpError("'AMQPLAIN' not in mechanisms")
+ if 'en_US' not in self.locales.split(' '):
+ raise AmqpError("'en_US' not in locales")
response = table2str(dict(LOGIN = conn.username,
PASSWORD = conn.password))
- #TODO more client_properties
client_properties = {'client': 'stormed-amqp'}
start_ok = StartOk(client_properties=client_properties,
@@ -23,7 +23,8 @@ def handle(self, conn):
@add_method(Tune)
def handle(self, conn):
- tune_ok = TuneOk(frame_max = self.frame_max,
+ conn.frame_max = self.frame_max or 2**16
+ tune_ok = TuneOk(frame_max = 2**16,
channel_max = self.channel_max,
heartbeat = conn.heartbeat)
conn.write_method(tune_ok)
@@ -37,11 +38,15 @@ def handle(self, conn):
conn.status = status.OPENED
if conn.heartbeat:
HeartbeatMonitor(conn).start()
- conn.on_connect()
+ try:
+ conn.on_connect()
+ except Exception:
+ logger.error('ERROR in on_connect() callback', exc_info=True)
@add_method(CloseOk)
def handle(self, conn):
- conn.close_stream()
+ conn.invoke_callback()
+ conn.reset()
class ConnectionError(object):
@@ -57,8 +62,7 @@ def handle(self, conn):
method = getattr(mod, 'id2method')[self.method_id]
except:
method = None
- for c in conn.channels:
- c.hard_reset()
+ conn.reset()
error_code = id2constant.get(self.reply_code, '')
if conn.on_error:
conn.on_error(ConnectionError(error_code, self.reply_text, method))
View
15 stormed/util.py
@@ -1,3 +1,9 @@
+import logging
+
+logger = logging.getLogger('stormed-amqp')
+
+class AmqpError(Exception): pass
+
class Enum(object):
def __init__(self, *names):
@@ -16,10 +22,17 @@ def __init__(self, **kargs):
fnames = [ fname for fname, ftype in self._fields ]
unvalid_kargs = set(kargs.keys()) - set(fnames)
if unvalid_kargs:
- raise AttributeError('unvalid field name/s: %s' % unvalid_kargs)
+ raise AttributeError('invalid field name/s: %s' % unvalid_kargs)
for fn in fnames:
setattr(self, fn, kargs.get(fn))
+ def __repr__(self):
+ fs = ['%s=%r' % (f, getattr(self,f)) for f, _ in self._fields]
+ t = type(self)
+ klass = (getattr(self, '_name', None) or
+ '%s.%s' % (t.__module__, t.__name__))
+ return '%s(%s)' % (klass, ', '.join(fs))
+
def add_method(klass):
def decorator(f):
setattr(klass, f.__name__, f)
Please sign in to comment.
Something went wrong with that request. Please try again.