Permalink
Browse files

improved handling of connection errors and some txn fixes

  • Loading branch information...
1 parent 29d5c6a commit d1fb33d225f6208e443243e490d1d83ead9cccfe @rhs committed Jul 26, 2011
Showing with 41 additions and 24 deletions.
  1. +19 −18 brokerlib.py
  2. +4 −0 client.py
  3. +5 −0 connection.py
  4. +3 −0 dispatcher.py
  5. +1 −1 send
  6. +9 −5 util.py
View
@@ -51,24 +51,21 @@ def __init__(self):
self.next_id = 0
def declare(self):
- id = buffer(str(self.next_id))
+ id = str(self.next_id)
self.next_id += 1
+ self.transactions[id] = Transaction(id)
return id
def discharge(self, txn_id, fail):
- if txn_id in self.transactions:
- txn = self.transactions.pop(txn_id)
- txn.discharge(fail)
+ txn = self.transactions.pop(txn_id)
+ txn.discharge(fail)
def get_transaction(self, state):
if isinstance(state, TransactionalState):
txn_id = state.txn_id
- if txn_id in self.transactions:
- return self.transactions[txn_id]
- else:
- txn = Transaction(txn_id)
- self.transactions[txn_id] = txn
- return txn
+ return self.transactions[txn_id]
+ else:
+ return None
def target(self):
return TxnTarget(self)
@@ -90,12 +87,14 @@ def put(self, dtag, xfr, owner=None):
return self.dispatch[msg.content.__class__](xfr.state, msg)
def declare(self, state, msg):
- txn_id = self.coordinator.declare()
- return Declared(txn_id)
+ return Declared(buffer(self.coordinator.declare()))
def discharge(self, state, msg):
- self.coordinator.discharge(msg.content.txn_id, msg.content.fail)
- return ACCEPTED
+ try:
+ self.coordinator.discharge(msg.content.txn_id, msg.content.fail)
+ return ACCEPTED
+ except KeyError:
+ return Rejected(Error("amqp:transaction:unknown-id"))
def settle(self, dtag, state):
return state
@@ -225,14 +224,15 @@ def amqp_tick(self, connection):
link.attach()
link.detach()
+ for link in links:
+ self.process[link.role](link)
+
while True:
link = ssn.next_receiver()
if link is None: break
self.process_incoming(link)
for link in links:
- self.process[link.role](link)
-
if link.detaching():
self.detach[link.role](link)
link.detach()
@@ -328,8 +328,9 @@ def doit(t=t, s=r.state):
state = source.settle(t, r.state)
link.settle(t, state)
def undo(t=t):
- state = source.settle(t, None)
- link.settle(t, state)
+ pass
+# state = source.settle(t, None)
+# link.settle(t, state)
if r.state:
txn = self.coordinator.get_transaction(r.state)
else:
View
@@ -87,6 +87,10 @@ def write(self, bytes):
def closed(self):
self.sasl.closed()
+ @synchronized
+ def error(self, exc):
+ self.sasl.error(exc)
+
@synchronized
def tick(self, connection):
self.proto.tick()
View
@@ -103,6 +103,11 @@ def closed(self):
self.close_rcvd = True
self.close_sent = True
+ def error(self, exc):
+ Dispatcher.error(self, exc)
+ self.close_rcvd = True
+ self.close_sent = True
+
def add(self, ssn):
ssn.channel = self.allocate_channel()
ssn.max_frame_size = self.max_frame_size
View
@@ -71,6 +71,9 @@ def write(self, bytes):
def closed(self):
self.trace(("raw", "frm"), "CLOSED")
+ def error(self, exc):
+ pass
+
def __proto_header(self):
if self.input.pending() >= PROTO_HDR_SIZE:
hdr = self.input.read(PROTO_HDR_SIZE)
View
2 send
@@ -82,7 +82,7 @@ try:
else:
content = " ".join(args[1:])
- lnk.send(settled=False, message=Message(content), txn=txn)
+ lnk.send(settled=False, message=Message(content, message_id=count), txn=txn)
count += 1
if opts.sleep:
time.sleep(opts.sleep)
View
14 util.py
@@ -17,7 +17,7 @@
# under the License.
#
-import os, mllib, traceback, time
+import os, sys, mllib, traceback, time
__SELF__ = object()
@@ -134,8 +134,10 @@ def readable(self, selector):
self.connection.closed()
self.tick(self.connection)
except:
- self.connection.trace("err", traceback.format_exc().strip())
- # XXX: need to signal connection so it can cleanup links
+ cls, exc, tb = sys.exc_info()
+ self.connection.trace("err", "".join(traceback.format_exception(cls, exc, tb)).strip())
+ self.connection.error(exc)
+ self.tick(self.connection)
selector.unregister(self)
self.socket.close()
self.socket = None
@@ -146,8 +148,10 @@ def writeable(self, selector):
bytes = self.connection.read(n)
return
except:
- self.connection.trace("err", traceback.format_exc().strip())
- # XXX: need to signal connection so it can cleanup links
+ cls, exc, tb = sys.exc_info()
+ self.connection.trace("err", "".join(traceback.format_exception(cls, exc, tb)).strip())
+ self.connection.error(exc)
+ self.tick(self.connection)
selector.unregister(self)
self.socket.close()
self.socket = None

0 comments on commit d1fb33d

Please sign in to comment.