Permalink
Browse files

updated to PR3 draft

  • Loading branch information...
1 parent 578794f commit a1e637f9400a097c5c1f17dfc894aebe1e23f060 Rafael H. Schloming committed May 5, 2010
Showing with 851 additions and 861 deletions.
  1. +8 −5 README
  2. +45 −30 broker
  3. +28 −17 client.py
  4. +7 −83 operations.py → compound.py
  5. +67 −45 connection.py
  6. +29 −65 framing.py
  7. +139 −188 link.py
  8. +36 −0 protocol.py
  9. +8 −6 recv
  10. +216 −188 session.py
  11. +54 −49 test.py
  12. +115 −185 transport.xml
  13. +99 −0 util.py
View
13 README
@@ -1,4 +1,4 @@
-This project contains a partial prototype of AMQP 1-0 PR2. This
+This project contains a partial prototype of AMQP 1-0 PR3 draft. This
currently includes relatively complete components for framing, codec,
connection, session, and link endpoints, as well as a very simple
broker, client library, and send/recv utilities.
@@ -23,8 +23,8 @@ Getting Started:
option which may be used to print a protocol trace:
./send -t raw queue-a trace raw bytes
- ./send -t ops queue-b trace operations before/after encode/decode
- ./send -t "raw ops err" queue-c trace everything
+ ./send -t frm queue-b trace operations before/after encode/decode
+ ./send -t "raw frm err" queue-c trace everything
Files:
@@ -38,6 +38,9 @@ Files:
codec.py -- An implementation of the AMQP type system.
+ compound.py -- Base class for compound types and utilities for
+ loading compound type definitions from the xml.
+
concurrency.py -- Concurrency utilities.
connection.py -- An implementation of an AMQP connection endpoint.
@@ -48,8 +51,8 @@ Files:
mllib/ -- An XML parsing library used to load type definitions.
- operations.py -- Classes representing all the operations and related
- types defined by the AMQP transport specification.
+ protocol.py -- Classes representing the types defined by the
+ protocol specification.
README -- This file.
View
75 broker
@@ -39,30 +39,36 @@ class Broker:
if connection.opening():
connection.open()
- for ssn in connection.sessions.values():
- if ssn.attaching():
- ssn.attach()
+ # XXX
+ for ssn in connection.incoming.values():
+ if ssn.beginning():
+ ssn.begin()
+ for ssn in connection.outgoing.values():
for link in ssn.links.values():
if link.opening():
- if link.local in self.queues:
- link.link()
- self.init[link.direction](link)
+ if self.init[link.direction](link):
+ link.local = link.remote
+ link.attach()
else:
link.local = None
- link.link()
- link.unlink()
- ssn.remove(link)
- continue
+ # XXX
+ link.modified = True
+ link.attach()
+ link.detach()
- self.process[link.direction](link)
+ # XXX
+ if link.local:
+ self.process[link.direction](link)
if link.closing():
- link.unlink()
+ link.detach()
+
+ if link.closed():
ssn.remove(link)
- if ssn.detaching():
- ssn.detach(True)
+ if ssn.ending():
+ ssn.end()
connection.remove(ssn)
if connection.closing():
@@ -71,42 +77,51 @@ class Broker:
connection.tick()
def init_sender(self, link):
- q = self.queues[link.source]
- cursor = q.cursor(DESTRUCTIVE)
- self.cursors[link] = cursor, {}
+ if link.remote.source in self.queues:
+ q = self.queues[link.remote.source]
+ cursor = q.cursor(DESTRUCTIVE)
+ self.cursors[link] = cursor, {}
+ return True
+ else:
+ return False
def init_receiver(self, link):
- link.flow(10)
+ if link.remote.target in self.queues:
+ link.flow(10)
+ return True
+ else:
+ return False
def process_sender(self, link):
cursor, unacked = self.cursors[link]
while link.capacity() > 0:
entry = cursor.get()
if entry is None:
- link.empty()
+ link.drained()
break
elif entry.item is None:
continue
else:
xfr = entry.item
- print "TRANSFER:", xfr
tag = link.send(fragments = xfr.fragments)
+ print "TRANSFER:", tag, xfr
unacked[tag] = entry
- while link.pending():
- t, d = link.get()
- e = unacked[t]
- xfr = e.item
- e.dequeue()
- print "DEQUEUED:", xfr
- del unacked[t]
- link.settle(t)
+
+ for t, r in link.get_remote():
+ if r.settled or r.state is not None:
+ e = unacked[t]
+ xfr = e.item
+ e.dequeue()
+ print "DEQUEUED:", t, r
+ del unacked[t]
+ link.settle(t, "DEQUEUED")
def process_receiver(self, link):
while link.pending():
xfr = link.get()
- self.queues[link.target].put(xfr)
+ self.queues[link.local.target].put(xfr)
print "ENQUEUED:", xfr
- link.ack(xfr)
+ link.settle(xfr.delivery_tag, "ENQUEUED")
if link.capacity() < 10: link.flow(10)
def bind(host, port):
View
45 client.py
@@ -27,7 +27,7 @@
from concurrency import synchronized, Condition, Waiter
from threading import RLock
from uuid import uuid4
-from operations import Fragment
+from protocol import Fragment, Linkage
class Timeout(Exception):
pass
@@ -62,7 +62,7 @@ def session(self, name = None):
name = str(uuid4())
ssn = Session(self, name)
self.add(ssn)
- ssn.attach()
+ ssn.begin()
return ssn
@synchronized
@@ -84,29 +84,29 @@ def wait(self, predicate, timeout=10):
def sender(self, target):
snd = Sender(self.connection, target)
self.add(snd)
- snd.link()
- self.wait(lambda: snd.opened())
- if snd.target is None:
+ snd.attach()
+ self.wait(lambda: snd.opened() or snd.closing())
+ if snd.remote is None:
snd.close()
raise LinkError("no such target: %s" % target)
return snd
@synchronized
- def receiver(self, source, limit=0):
+ def receiver(self, source, limit=0, drain=False):
rcv = Receiver(self.connection, source)
self.add(rcv)
- rcv.link()
+ rcv.attach()
if limit:
- rcv.flow(limit)
- self.wait(lambda: rcv.opened())
- if rcv.source is None:
+ rcv.flow(limit, drain=drain)
+ self.wait(lambda: rcv.opened() or rcv.closing())
+ if rcv.remote is None:
rcv.close()
raise LinkError("no such source: %s" % source)
return rcv
@synchronized
def close(self):
- self.detach(True)
+ self.end()
class Link:
@@ -119,13 +119,13 @@ def wait(self, predicate, timeout=10):
@synchronized
def close(self):
- self.unlink()
- self.wait(lambda: self.closed())
+ self.detach()
+ self.wait(self.closed)
class Sender(BaseSender, Link):
def __init__(self, connection, target):
- BaseSender.__init__(self, str(uuid4()), None, target)
+ BaseSender.__init__(self, str(uuid4()), Linkage(None, target))
Link.__init__(self, connection)
@synchronized
@@ -136,12 +136,23 @@ def send(self, **kwargs):
class Receiver(BaseReceiver, Link):
def __init__(self, connection, source):
- BaseReceiver.__init__(self, str(uuid4()), source, None)
+ BaseReceiver.__init__(self, str(uuid4()), Linkage(source, None))
Link.__init__(self, connection)
@synchronized
def pending(self, block=False, timeout=None):
if block:
- self.wait(lambda: self.capacity() == 0 or BaseReceiver.pending(self) > 0,
- timeout)
+ self.wait(self._pending_unblocked, timeout)
return BaseReceiver.pending(self)
+
+ def _pending_unblocked(self):
+ return self.capacity() == 0 or BaseReceiver.pending(self) > 0
+
+ @synchronized
+ def draining(self, block=False, timeout=None):
+ if block:
+ self.wait(self._draining_unblocked, timeout)
+ return BaseReceiver.draining(self)
+
+ def _draining_unblocked(self):
+ return BaseReceiver.draining(self)
View
90 operations.py → compound.py
@@ -18,8 +18,6 @@
#
import inspect
-
-from framing import ConnectionFrame, SessionFrame, SSN_FRAME
from util import load_xml, pythonize, decode_numeric_desc
class Field:
@@ -64,71 +62,15 @@ def _args(self):
def __repr__(self):
return "%s(%s)" % (self.__class__.__name__, ", ".join(self._args()))
-class Operation(Compound):
- FIELDS = [Field("channel", 0)]
-
- def init(self, frame):
- self.channel = frame.channel
- if frame.type == SSN_FRAME:
- if frame.flags & 0x4:
- self.sync = True
- self.executed = frame.executed
- self.acknowledged = frame.acknowledged
- self.command_id = frame.command_id
- self.capacity = frame.capacity
-
-class ConnectionOp(Operation):
-
- def frame(self, enc):
- return ConnectionFrame(0, self.channel, enc.encode(self))
-
-class SessionOp(Operation):
- FIELDS = [Field("acknowledged"),
- Field("executed"),
- Field("capacity"),
- Field("command_id"),
- Field("sync")]
-
- COMMAND = False
-
- def frame(self, enc):
- flags = 0
- # XXX: hardcoded flags
- if self.COMMAND:
- flags |= 0x1
- if self.executed is None:
- flags |= 0x2
- if self.sync:
- flags |= 0x4
- # XXX: default these all to zero until I build the machinery to
- # set/use them properly
- return SessionFrame(flags, self.channel,
- self.acknowledged or 0,
- self.executed or 0,
- self.capacity or 0,
- self.command_id or 0,
- enc.encode(self))
-
-class EmptyEncoder:
-
- def encode(self, obj):
- return ""
-
-EMPTY_ENC = EmptyEncoder()
-
-class Empty(SessionOp):
- NAME="empty"
- def frame(self, enc):
- return SessionOp.frame(self, EMPTY_ENC)
-
-class Command(SessionOp):
- COMMAND = True
-
-def load_compound(sections, *default_bases, **kwargs):
+def load_compound(types, *default_bases, **kwargs):
result = []
- for nd in sections.query["type", lambda n: n["@class"] == "compound"]:
+ for nd in types:
+ archetype = pythonize(nd["@provides"])
cls_name = pythonize(nd["@name"], camel=True)
- bases = kwargs.get(cls_name, default_bases)
+ if cls_name in kwargs:
+ bases = kwargs[cls_name]
+ else:
+ bases = kwargs.get(archetype, default_bases)
if inspect.isclass(bases):
bases = (bases,)
@@ -148,21 +90,3 @@ def load_compound(sections, *default_bases, **kwargs):
dict["FIELDS"] = fields
result.append(type(cls_name, bases, dict))
return result
-
-TRANSPORT = load_xml("transport.xml")
-def named(name):
- return lambda nd: nd["@name"] == name
-COMPOUND = \
- load_compound(TRANSPORT["amqp/section", named("controls")], Operation,
- Open=ConnectionOp,
- Attach=SessionOp,
- Detach=SessionOp,
- Close=ConnectionOp) + \
- load_compound(TRANSPORT["amqp/section", named("commands")], Command) + \
- load_compound(TRANSPORT["amqp/section", named("definitions")], Compound)
-
-__all__ = ["COMPOUND", "Empty"]
-
-for cls in COMPOUND:
- globals()[cls.__name__] = cls
- __all__.append(cls.__name__)
View
112 connection.py
@@ -18,15 +18,20 @@
#
import inspect, mllib, os, struct
-from operations import *
+from protocol import *
-from framing import CONN_FRAME, FrameDecoder, FrameEncoder
+from framing import AMQP_FRAME, Frame, FrameDecoder, FrameEncoder
from codec import TypeDecoder, TypeEncoder
from util import Buffer, parse
from uuid import uuid4
-PROTO_HDR_SIZE=8
+PROTO_HDR_FMT = "!4sBBBB"
+PROTO_HDR_SIZE = struct.calcsize(PROTO_HDR_FMT)
+assert PROTO_HDR_SIZE == 8
+
+class ConnectionError(Exception):
+ pass
class Connection:
@@ -35,7 +40,7 @@ def __init__(self, factory):
self.factory = factory
self.tracing = set(os.environ.get("AMQP_TRACE", "err").split())
self.input = Buffer()
- self.output = Buffer("AMQP\x00\x01\x00\x00")
+ self.output = Buffer(struct.pack(PROTO_HDR_FMT, "AMQP", 0, 1, 0, 0))
self.state = self.__proto_header
@@ -45,7 +50,7 @@ def __init__(self, factory):
self.type_decoder = TypeDecoder()
self.type_encoder = TypeEncoder()
- for cls in COMPOUND:
+ for cls in CLASSES:
self.type_encoder.deconstructors[cls] = lambda v: (v.DESCRIPTORS[0], v.deconstruct())
for d in cls.DESCRIPTORS:
self.type_decoder.constructors[d] = lambda d, v, c=cls: c(*v)
@@ -55,8 +60,10 @@ def __init__(self, factory):
self.close_rcvd = False
self.close_sent = False
- self.sessions = {}
- self.channels = {}
+ # incoming channel -> session
+ self.incoming = {}
+ # outgoing channel -> session
+ self.outgoing = {}
def trace(self, category, format, *args):
if category in self.tracing:
@@ -70,6 +77,13 @@ def trace(self, category, format, *args):
def opening(self):
return self.open_rcvd and not self.open_sent
+ def opened(self):
+ return self.open_rcvd and self.open_sent and \
+ not (self.close_rcvd or self.close_sent)
+
+ def closed(self):
+ self.close_rcvd and self.close_sent
+
def closing(self):
return self.close_rcvd and not self.close_sent
@@ -81,7 +95,7 @@ def write(self, bytes):
def __proto_header(self):
if self.input.pending() >= PROTO_HDR_SIZE:
hdr = self.input.read(PROTO_HDR_SIZE)
- magic, proto, major, minor, revision = struct.unpack("!4sBBBB", hdr)
+ magic, proto, major, minor, revision = struct.unpack(PROTO_HDR_FMT, hdr)
if (magic, proto, major, minor, revision) == ("AMQP", 0, 1, 0, 0):
return self.__framing
else:
@@ -93,16 +107,13 @@ def __framing(self):
self.process_frame(f)
def process_frame(self, f):
- if f.payload:
- op, _ = self.type_decoder.decode(f.payload)
- else:
- op = Empty()
- op.init(f)
- self.trace("ops", "RECV: %s", op)
- getattr(self, "do_%s" % op.NAME, self.unhandled)(op)
+ body, remainder = self.type_decoder.decode(f.payload)
+ assert remainder == ""
+ self.trace("frm", "RECV[%s]: %s", f.channel, body)
+ getattr(self, "do_%s" % body.NAME, self.unhandled)(f.channel, body)
- def unhandled(self, op):
- ssn = self.channels[op.channel]
+ def unhandled(self, channel, op):
+ ssn = self.incoming[channel]
ssn.write(op)
def read(self, n=None):
@@ -119,24 +130,23 @@ def pending(self):
return self.output.pending()
def tick(self):
- for ssn in self.sessions.values():
+ for ch, ssn in self.outgoing.items():
ssn.tick()
- for op in ssn.read():
- op.channel = ssn.channel
- self.write_op(op)
+ for body in ssn.read():
+ self.post_frame(ssn.channel, body)
- def write_op(self, op):
+ def post_frame(self, channel, body):
assert not self.close_sent
- self.trace("ops", "SENT: %s", op)
- f = op.frame(self.type_encoder)
+ self.trace("frm", "SENT[%s]: %s", channel, body)
+ f = Frame(AMQP_FRAME, channel, None, self.type_encoder.encode(body))
self.frame_encoder.write(f)
self.output.write(self.frame_encoder.read())
def open(self, *args, **kwargs):
- self.write_op(Open(*args, **kwargs))
+ self.post_frame(0, Open(*args, **kwargs))
self.open_sent = True
- def do_open(self, open):
+ def do_open(self, channel, open):
if self.open_rcvd:
self.close(ConnectionError(error_code=501, description="double open"))
else:
@@ -145,34 +155,46 @@ def do_open(self, open):
def close(self, *args, **kwargs):
# avoid stranding frames inside sessions
self.tick()
- self.write_op(Close(*args, **kwargs))
+ self.post_frame(0, Close(*args, **kwargs))
self.close_sent = True
- def do_close(self, close):
+ def do_close(self, channel, close):
if not self.close_rcvd:
self.close_rcvd = True
- def do_attach(self, attach):
- if self.sessions.has_key(attach.name):
- ssn = self.sessions[attach.name]
- else:
- ssn = self.factory(attach.name)
- self.add(ssn)
- self.channels[attach.channel] = ssn
- ssn.write(attach)
-
- def do_detach(self, detach):
- ssn = self.channels.pop(detach.channel)
- ssn.write(detach)
-
def add(self, ssn):
ssn.channel = self.allocate_channel()
- self.sessions[ssn.name] = ssn
+ self.outgoing[ssn.channel] = ssn
def allocate_channel(self):
- return max([-1] + [s.channel for s in self.sessions.values()]) + 1
+ return max([-1] + self.outgoing.keys()) + 1
def remove(self, ssn):
- # avoid stranding frames inside the session
+ # avoid stranding frames inside sessions
self.tick()
- del self.sessions[ssn.name]
+ if ssn.channel in self.outgoing and self.outgoing[ssn.channel] == ssn:
+ del self.outgoing[ssn.channel]
+ ssn.channel = None
+ else:
+ raise ConnectionError("no such session")
+
+ def do_begin(self, channel, begin):
+ if channel in self.incoming:
+ raise ConnectionError("double begin")
+
+ if begin.remote_channel in self.outgoing:
+ ssn = self.outgoing[begin.remote_channel]
+ else:
+ ssn = self.factory(begin.name)
+ ssn.remote_channel = channel
+ self.add(ssn)
+
+ self.incoming[channel] = ssn
+ ssn.write(begin)
+
+ def do_end(self, channel, end):
+ if channel not in self.incoming:
+ raise ConnectionError("double end")
+
+ ssn = self.incoming.pop(channel)
+ ssn.write(end)
View
94 framing.py
@@ -20,67 +20,39 @@
import struct
from util import Buffer, parse
-CONN_FRAME = 0
-SSN_FRAME = 1
-FRAME_HDR = 24
+AMQP_FRAME = 0
+FRAME_HDR_FMT = "!I2BH"
+FRAME_HDR_SIZE = struct.calcsize(FRAME_HDR_FMT)
+assert FRAME_HDR_SIZE == 8
class Frame:
- def __init__(self, type, flags, channel, payload):
+ def __init__(self, type, channel, extended, payload):
self.type = type
- self.flags = flags
self.channel = channel
+ self.extended = extended
self.payload = payload
-class SessionFrame(Frame):
-
- def __init__(self, flags, channel, acknowledged, executed, capacity,
- command_id, payload):
- Frame.__init__(self, SSN_FRAME, flags, channel, payload)
- self.acknowledged = acknowledged
- self.executed = executed
- self.capacity = capacity
- self.command_id = command_id
-
- def __repr__(self):
- args = (self.flags, self.channel, self.executed, self.capacity,
- self.command_id, self.payload)
- return "SessionFrame(%s)" % ", ".join(map(repr, args))
-
-class ConnectionFrame(Frame):
-
- def __init__(self, flags, channel, payload):
- Frame.__init__(self, CONN_FRAME, flags, channel, payload)
-
def __repr__(self):
- args = (self.flags, self.channel, self.payload)
- return "ConnectionFrame(%s)" % ", ".join(map(repr, args))
+ return "Frame(%s, %s, %r, %r)" % \
+ (self.type, self.channel, self.extended, self.payload)
class FrameEncoder:
def __init__(self):
self.buffer = Buffer()
- self.encoders = {
- SSN_FRAME: self.encode_ssn,
- CONN_FRAME: self.encode_conn
- }
def write(self, frame):
self.buffer.write(self.encode(frame))
def encode(self, frame):
- return self.encoders[frame.type](frame)
-
- def encode_ssn(self, frame):
- header = struct.pack("!I2BH4I", FRAME_HDR + len(frame.payload), frame.type,
- frame.flags, frame.channel, frame.acknowledged,
- frame.executed, frame.capacity, frame.command_id)
- return "%s%s" % (header, frame.payload)
-
- def encode_conn(self, frame):
- header = struct.pack("!I2BH16x", FRAME_HDR + len(frame.payload),
- frame.type, frame.flags, frame.channel)
- return "%s%s" % (header, frame.payload)
+ extended = frame.extended or ""
+ padd = len(extended) % 4
+ if padd: extended += "\x00"*(4-padd)
+ size = FRAME_HDR_SIZE + len(extended) + len(frame.payload)
+ doff = (FRAME_HDR_SIZE + len(extended))/4
+ header = struct.pack(FRAME_HDR_FMT, size, doff, frame.type, frame.channel)
+ return "%s%s%s" % (header, extended, frame.payload)
def read(self, n=None):
return self.buffer.read(n)
@@ -94,44 +66,36 @@ def __init__(self):
self.state = self.__frame_header
self.size = None
+ self.doff = None
self.type = None
- self.flags = None
self.channel = None
- self.control = None
-
- self.decoders = {
- SSN_FRAME: self.__decode_ssn,
- CONN_FRAME: self.__decode_conn
- }
+ self.extended = None
def write(self, bytes):
self.input.write(bytes)
self.state = parse(self.state)
def __frame_header(self):
- if self.input.pending() >= FRAME_HDR:
- st = self.input.read(FRAME_HDR)
- self.size, self.type, self.flags, self.channel, self.control = \
- struct.unpack("!I2BH16s", st)
+ if self.input.pending() >= FRAME_HDR_SIZE:
+ st = self.input.read(FRAME_HDR_SIZE)
+ self.size, self.doff, self.type, self.channel = \
+ struct.unpack(FRAME_HDR_FMT, st)
+ return self.__frame_extended
+
+ def __frame_extended(self):
+ size = self.doff*4 - FRAME_HDR_SIZE
+ if self.input.pending() >= size:
+ self.extended = self.input.read(size)
return self.__frame_body
def __frame_body(self):
- size = self.size - FRAME_HDR
+ size = self.size - self.doff*4
if self.input.pending() >= size:
payload = self.input.read(size)
- frame = self.decoders[self.type](payload)
+ frame = Frame(self.type, self.channel, self.extended, payload)
self.output.append(frame)
return self.__frame_header
- def __decode_ssn(self, payload):
- acknowledged, executed, capacity, command_id = \
- struct.unpack("!4I", self.control)
- return SessionFrame(self.flags, self.channel, acknowledged, executed,
- capacity, command_id, payload)
-
- def __decode_conn(self, payload):
- return ConnectionFrame(self.flags, self.channel, payload)
-
def read(self, n=None):
result = self.output[:n]
del self.output[:n]
View
327 link.py
@@ -17,98 +17,143 @@
# under the License.
#
-from operations import Drain, Flow, Link as LinkCmd, Unlink, Transfer, \
- Disposition
+from protocol import Attach, Detach, Transfer, Disposition, Flow, FlowState, \
+ Extent
from util import Constant
from uuid import uuid4
class LinkError(Exception):
pass
-PENDING = Constant("PENDING")
+class State:
-class Link(object):
+ def __init__(self, state=None, settled=False, modified=False):
+ self.state = state
+ self.settled = settled
+ self.modified = modified
- def __init__(self, name, source, target):
- self.name = name
- self.source = source
- self.target = target
+ def __hash__(self):
+ return hash(self.state) ^ hash(self.settled)
- self.session = None
- self.handle = None
+ def __eq__(self, o):
+ return self.state == o.state and self.settled == o.settled
- self.links_rcvd = 0
- self.links_sent = 0
- self.unlinks_rcvd = 0
- self.unlinks_sent = 0
+ def __repr__(self):
+ return "State(%s, %s, %s)" % (self.state, self.settled, self.modified)
- self.limit = 0
- self.count = 0
- self.link_tag = uuid4()
+ATTACHED = Constant("ATTACHED")
+DETACHED = Constant("DETACHED")
- # delivery-tag -> disposition
- self.unsettled = {}
+class Link(object):
- self.init()
+ def __init__(self, name, local, remote=None):
+ self.name = name
+ self.local = local
+ self.remote = remote
- def write_cmd(self, cmd, action=None):
- cmd.handle = self.handle
- self.session.write_cmd(cmd, action)
+ self.handle = None
- def done(self, cmd):
- self.session.done(cmd)
+ # local and remote state can be None, ATTACHED, DETACHED
+ self.local_state = None
+ self.remote_state = None
- def actioned(self, cmd):
- self.session.actioned(cmd)
+ # flow state
+ self.transfer_count = 0
+ self.transfer_limit = 0
+ self.attainable_limit = 0
+ self.drain = False
+ self.modified = False
- def write(self, cmd):
- return self.dispatch(cmd)
+ # used to provide default delivery-tag
+ self.delivery_count = 0
- def dispatch(self, cmd):
- return getattr(self, "do_%s" % cmd.NAME)(cmd)
+ # delivery-tag -> (local_state, remote_state)
+ self.unsettled = {}
+
+ self.init()
def opening(self):
- return self.links_sent < self.links_rcvd
+ return self.local_state is None and self.remote_state is ATTACHED
def closing(self):
- return self.unlinks_sent < self.unlinks_rcvd
+ return self.remote_state is DETACHED and self.local_state is ATTACHED
def opened(self):
- return self.links_rcvd == self.links_sent and self.links_sent > self.unlinks_sent
+ return self.local_state is ATTACHED and self.remote_state is ATTACHED
def closed(self):
- return self.unlinks_rcvd == self.unlinks_sent and self.unlinks_sent == self.links_sent
+ return self.local_state is DETACHED and self.remote_state is DETACHED
def capacity(self):
- return self.limit - self.count
-
- def link(self):
- self.write_cmd(LinkCmd(name = self.name,
- direction = self.direction,
- source = self.source,
- target = self.target))
- self.links_sent += 1
-
- def do_link(self, link_cmd):
- if self.links_rcvd > self.unlinks_rcvd:
- raise LinkError("double link")
- else:
- if self.direction == Sender.direction:
- self.target = link_cmd.target
- else:
- self.source = link_cmd.source
- self.links_rcvd += 1
-
- def unlink(self):
- self.write_cmd(Unlink())
- self.unlinks_sent += 1
- self.handle = None
+ return self.transfer_limit - self.transfer_count
+
+ def write(self, cmd):
+ self.dispatch(cmd)
- def do_unlink(self, unlink):
- if self.unlinks_rcvd > self.links_rcvd:
- raise LinkError("double unlink")
+ def dispatch(self, cmd):
+ return getattr(self, "do_%s" % cmd.NAME)(cmd)
+
+ def attach(self):
+ if self.local_state is ATTACHED:
+ raise LinkError("already attached")
+ self.local_state = ATTACHED
+ self.modified = True
+
+ def do_attach(self, attach):
+ self.remote_state = ATTACHED
+ self.remote = attach.local
+ self.do_flow(attach.flow_state)
+
+ # XXX: closing and errors
+ def detach(self):
+ if self.local_state is not ATTACHED:
+ raise LinkError("not attached")
+ self.local_state = DETACHED
+
+ def do_detach(self, detach):
+ if detach.closing:
+ # XXX: we should probably have a separate state for this
+ self.remote_state = None
else:
- self.unlinks_rcvd += 1
+ self.remote_state = DETACHED
+
+ def do_disposition(self, delivery_tag, state, settled):
+ if delivery_tag in self.unsettled:
+ local, remote = self.unsettled[delivery_tag]
+ remote.state = state
+ remote.settled = settled
+ remote.modified = True
+
+ def _query(self, index, settled=None, modified=None):
+ return [(delivery_tag, pair[index])
+ for delivery_tag, pair in self.unsettled.items()
+ if (settled is None or settled == pair[index].settled) and
+ (modified is None or modified == pair[index].modified)]
+
+ def get_local(self, settled=None, modified=None):
+ return self._query(0, settled, modified)
+
+ def get_remote(self, settled=None, modified=None):
+ return self._query(1, settled, modified)
+
+ def flow_state(self):
+ return FlowState(self.transfer_count, self.transfer_limit, self.attainable_limit, self.drain)
+
+ def disposition(self, delivery_tag, state=None, settled=False):
+ local, remote = self.unsettled[delivery_tag]
+ local.state = state
+ local.settled = settled
+ local.modified = True
+ # XXX
+ if local.settled and self.handle is None:
+ self.unsettled.pop(delivery_tag)
+ return local, remote
+
+ def settle(self, delivery_tag, state=None):
+ if state is None:
+ local, _ = self.unsettled[delivery_tag]
+ state = local.state
+ return self.disposition(delivery_tag, state, settled=True)
class Sender(Link):
@@ -117,103 +162,33 @@ class Sender(Link):
def init(self):
self.outgoing = []
- self.drains = []
-
- def get_local(self):
- return self.source
-
- def set_local(self, value):
- self.source = value
-
- local = property(fget=get_local, fset=set_local)
- def get_remote(self):
- return self.target
+ def do_flow(self, state):
+ self.transfer_limit = state.transfer_limit
+ self.drain = state.drain
- def set_remote(self, value):
- self.target = value
-
- remote = property(fget=get_remote, fset=set_remote)
-
- def do_flow(self, flow):
- if self.drains:
- self.drains[-1][-1] = flow.limit
- else:
- self.limit = flow.limit
- self.done(flow)
-
- def do_drain(self, drain):
- self.drains.append([drain, None])
- if self.limit == self.count:
- self.drain_done()
-
- def empty(self):
- if self.drains:
- self.limit = self.count
- self.drain_done()
-
- def drain_done(self):
- drain, limit = self.drains.pop()
- self.done(drain)
- if limit is not None:
- self.limit = limit
+ def drained(self, flow=True):
+ if self.drain:
+ self.transfer_count = self.transfer_limit
+ self.modified = True
def send(self, **kwargs):
- if self.count >= self.limit:
+ if self.transfer_count >= self.transfer_limit:
raise LinkError("would block")
+ self.transfer_count += 1
xfr = Transfer(**kwargs)
+ # XXX: should we do this in session?
+ xfr.flow_state = self.flow_state()
if xfr.delivery_tag is None:
- xfr.delivery_tag = "%s:%s" % (self.link_tag, self.count)
- self.count += 1
- self.write_cmd(xfr, self.transferred)
+ xfr.delivery_tag = self.delivery_count
+ if not xfr.more:
+ self.delivery_count += 1
self.outgoing.append(xfr)
- self.unsettled[xfr.delivery_tag] = PENDING
- if self.drains and self.limit == self.count:
- self.drain_done()
+ self.unsettled[xfr.delivery_tag] = (State(), State(xfr.state, xfr.settled))
+ if self.transfer_limit == self.transfer_count:
+ self.drained(flow=False)
return xfr.delivery_tag
- def transferred(self, xfr):
- if self.unsettled.get(xfr.delivery_tag) is PENDING:
- self.unsettled[xfr.delivery_tag] = None
- if xfr.delivery_tag not in self.unsettled:
- self.actioned(xfr)
-
- def do_disposition(self, disp):
- on = False
- for xfr in self.outgoing:
- if xfr.delivery_tag == disp.first:
- on = True
- if on:
- self.unsettled[xfr.delivery_tag] = disp.disposition
- if xfr.delivery_tag == disp.last:
- break
- self.done(disp)
-
- def pending(self):
- return len([d for d in self.unsettled.values() if d is not PENDING])
-
- def get(self):
- if self.outgoing:
- xfr = self.outgoing[0]
- d = self.unsettled[xfr.delivery_tag]
- return xfr.delivery_tag, d
- else:
- raise LinkError("empty")
-
- def settle(self, delivery_tag):
- if delivery_tag not in self.unsettled:
- return
- idx = 0
- while idx < len(self.outgoing):
- xfr = self.outgoing[idx]
- if xfr.delivery_tag == delivery_tag:
- del self.outgoing[idx]
- self.unsettled.pop(xfr.delivery_tag)
- self.actioned(xfr)
- break
- else:
- idx += 1
-
class Receiver(Link):
# XXX
@@ -222,37 +197,23 @@ class Receiver(Link):
def init(self):
self.incoming = []
- def get_local(self):
- return self.target
-
- def set_local(self, value):
- self.target = value
-
- local = property(fget=get_local, fset=set_local)
-
- def get_remote(self):
- return self.source
-
- def set_remote(self, value):
- self.source = value
-
- remote = property(fget=get_remote, fset=set_remote)
-
def do_transfer(self, xfr):
- self.count += 1
self.incoming.append(xfr)
- self.unsettled[xfr.delivery_tag] = PENDING
- return lambda id: self.unsettled.pop(xfr.delivery_tag)
+ self.unsettled[xfr.delivery_tag] = (State(), State(xfr.state, xfr.settled))
+ # XXX: should we do this from session?
+ self.do_flow(xfr.flow_state)
- def flow(self, n):
- self.limit += n
- self.write_cmd(Flow(limit=self.limit))
+ def do_flow(self, state):
+ self.transfer_count = state.transfer_count
+ self.attainable_limit = state.attainable_limit
- def drain(self):
- self.write_cmd(Drain(), self.drained)
+ def flow(self, n, drain=False):
+ self.transfer_limit += n
+ self.drain = drain
+ self.modified = True
- def drained(self, drain):
- self.limit = self.count
+ def drain(self):
+ self.flow(0, True)
def pending(self):
return len(self.incoming)
@@ -263,21 +224,11 @@ def get(self):
else:
raise LinkError("empty")
- def ack(self, xfr, **disposition):
- self.done(xfr)
- if disposition:
- self.unsettled[xfr.delivery_tag] = disposition
- self.write_cmd(Disposition(disposition=disposition,
- first=xfr.delivery_tag,
- last=xfr.delivery_tag))
- else:
- self.unsettled[xfr.delivery_tag] = None
-
DIRECTIONS = {
Sender.direction: Sender,
Receiver.direction: Receiver
}
-def link(link_cmd):
- cls = DIRECTIONS[1 - link_cmd.direction]
- return cls(link_cmd.name, link_cmd.source, link_cmd.target)
+def link(attach):
+ cls = DIRECTIONS[1 - attach.direction]
+ return cls(attach.name, attach.remote, attach.local)
View
36 protocol.py
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from compound import Compound, load_compound
+from util import load_xml
+
+class Body(Compound):
+ pass
+
+TRANSPORT = load_xml("transport.xml")
+def equals(attr, value):
+ return lambda nd: nd[attr] == value
+TYPES = TRANSPORT.query["amqp/section/type", equals("@class", "compound")]
+CLASSES = load_compound(TYPES, Compound, frame=Body)
+
+__all__ = ["CLASSES"]
+
+for cls in CLASSES:
+ globals()[cls.__name__] = cls
+ __all__.append(cls.__name__)
View
14 recv
@@ -30,7 +30,7 @@ parser.add_option("-p", "--port", type=int, default=5672,
parser.add_option("-b", "--block", action="store_true",
help="block until messages arrive")
parser.add_option("-c", "--count", type=int, default=1,
- help="number of messages to send (default %default)")
+ help="number of messages to receive (default %default)")
parser.add_option("-t", "--trace", default="err",
help="enable tracing for specified categories")
@@ -51,8 +51,7 @@ if opts.count:
credit = min(10, opts.count)
else:
credit = 10
-lnk = ssn.receiver(addr, limit=credit)
-if not opts.block: lnk.drain()
+lnk = ssn.receiver(addr, limit=credit, drain=not opts.block)
count = 0
try:
@@ -61,21 +60,24 @@ try:
count += 1
print "".join([f.payload for f in xfr.fragments])
sys.stdout.flush()
- lnk.ack(xfr)
+ lnk.disposition(xfr.delivery_tag, "ACCEPTED")
if lnk.capacity() == 0:
if opts.count == 0:
credit = 10
else:
credit = min(10, opts.count - (count + lnk.pending()))
if credit:
- lnk.flow(credit)
- if not opts.block: lnk.drain()
+ lnk.flow(credit, drain=not opts.block)
except KeyboardInterrupt:
pass
lnk.close()
ssn.close()
conn.close()
+# XXX
+for tag, st in lnk.get_remote(settled=True):
+ lnk.settle(tag)
+
if lnk.unsettled:
print lnk.unsettled
View
404 session.py
@@ -17,7 +17,10 @@
# under the License.
#
-from operations import Attach, Detach, Empty
+from link import ATTACHED, DETACHED, Sender, Receiver
+from protocol import Begin, Attach, Flow, Transfer, Disposition, Extent, \
+ Detach, End
+from util import RangeSet
class SessionError(Exception):
pass
@@ -27,225 +30,250 @@ class Session:
def __init__(self, name, factory):
self.name = name
self.factory = factory
+ self.remote_channel = None
self.channel = None
- self.opening = True
- self.timeout = 0
-
- self.attaches_sent = 0
- self.attaches_rcvd = 0
- self.detaches_sent = 0
- self.detaches_rcvd = 0
-
- # XXX: this makes for some confusing terminology
-
- # sender state (from spec)
- self.outgoing = []
- self.command_id = 1
- self.acknowledged = 0
- self.command_limit = None
- # track commands we've 'acknowledged' but can't report yet
- self.ack_deferred = set()
- # command-id -> action to take when command is executed
- self.on_exe = {}
-
- # receiver state (from spec)
- self.received = None
- self.executed = None
- self.capacity = 1024
- self.syncpoint = None
- self.syncedto = None
- # track commands we've executed but can't report yet
- self.exe_deferred = set()
- # command-id -> action to take when command is 'acknowledged'
- self.on_ack = {}
- self.ack_actioned = 0
-
- # link name -> link end
+
+ self.begin_sent = False
+ self.begin_rcvd = False
+ self.end_sent = False
+ self.end_rcvd = False
+
+ self.incoming = Incoming()
+ self.outgoing = Outgoing()
+ self.directions = {}
+ self.directions[Sender.direction] = self.outgoing
+ self.directions[Receiver.direction] = self.incoming
+
+ # link name -> link endpoint
self.links = {}
- # handle -> link end
+ # handle -> link endpoint
self.handles = {}
self.output = []
def write(self, op):
- if op.executed is None:
- self.command_limit = self.command_id + op.capacity
- else:
- self.command_limit = op.executed + op.capacity
- self.do_exe(op.executed)
- self.do_ack(op.acknowledged)
- if op.COMMAND:
- self.received = op.command_id
- result = self.dispatch(op)
- if op.COMMAND and result:
- self.on_ack[op.command_id] = result
- if op.sync:
- if op.COMMAND:
- self.syncpoint = op.command_id
- else:
- # XXX: is this supposed to be correlated?
- self.flush()
- self.tick()
-
- def flush(self):
- self.write_op(Empty())
-
- def do_sync(self):
- if self.syncpoint is None or self.syncpoint > self.executed:
- return
- if self.syncedto < self.syncpoint:
- self.flush()
- self.syncpoint = None
-
- def do_exe(self, executed):
- idx = self.acknowledged
- while idx < executed:
- cmd = self.outgoing[idx - self.acknowledged]
- assert cmd.command_id <= executed
- if cmd.command_id in self.on_exe:
- pre = self.acknowledged
- self.on_exe.pop(cmd.command_id)(cmd)
- post = self.acknowledged
- idx += max(1, post - pre)
- else:
- idx += 1
-
- def do_ack(self, acknowledged):
- while self.ack_actioned < acknowledged:
- next = self.ack_actioned + 1
- if next in self.on_ack:
- self.on_ack.pop(next)(next)
- self.ack_actioned = next
-
- def tick(self):
- pass
-
- def done(self, cmd):
- if cmd.command_id <= self.executed:
- return
-
- self.exe_deferred.add(cmd.command_id)
- while (self.executed + 1) in self.exe_deferred:
- self.executed += 1
- self.exe_deferred.discard(self.executed)
-
- def actioned(self, cmd):
- if cmd.command_id <= self.acknowledged:
- return
-
- self.ack_deferred.add(cmd.command_id)
- self.on_exe.pop(cmd.command_id, None)
- while (self.acknowledged + 1) in self.ack_deferred:
- self.acknowledged += 1
- self.outgoing.pop(0)
- self.ack_deferred.discard(self.acknowledged)
+ self.dispatch(op)
# XXX: dup of read in framing
def read(self, n=None):
- self.do_sync()
result = self.output[:n]
del self.output[:n]
return result
def dispatch(self, op):
- return getattr(self, "do_%s" % op.NAME, self.unhandled)(op)
-
- def unhandled(self, cmd):
- link = self.handles[cmd.handle]
- return link.write(cmd)
-
- def write_op(self, op):
- if not isinstance(op, Attach):
- assert self.attaches_sent > self.detaches_sent
- op.acknowledged = self.acknowledged
- op.executed = self.executed
- op.capacity = self.capacity
- op.command_id = self.command_id
- self.output.append(op)
- self.syncedto = self.executed
+ getattr(self, "do_%s" % op.NAME)(op)
- def write_cmd(self, cmd, action=None):
- if action:
- cmd.sync = True
- self.outgoing.append(cmd)
- self.write_op(cmd)
- self.on_exe[cmd.command_id] = action or self.actioned
- self.command_id += 1
+ def post_frame(self, op):
+ assert self.begin_sent and not self.end_sent
+ self.output.append(op)
- def attaching(self):
- return self.attaches_rcvd > self.attaches_sent
+ def beginning(self):
+ return self.begin_rcvd and not self.begin_sent
- def detaching(self):
- return self.detaches_rcvd > self.detaches_sent
+ def ending(self):
+ return self.end_rcvd and not self.end_sent
- def attach(self):
- if self.attaches_sent > self.detaches_sent:
- raise SessionError("double attach")
- self.write_op(Attach(name = self.name,
- opening = self.opening,
- timeout = self.timeout,
- received = self.received,
- acknowledged = self.acknowledged,
- executed = self.executed,
- capacity = self.capacity,
- command_id = self.command_id))
- self.attaches_sent += 1
-
- def do_attach(self, att):
- if self.attaches_rcvd > self.detaches_rcvd:
- raise SessionError("double attach")
- else:
- self.attaches_rcvd += 1
+ def begin(self):
+ if self.begin_sent:
+ raise SessionError("already begun")
+ self.begin_sent = True
+ self.post_frame(Begin(remote_channel = self.remote_channel,
+ name = self.name))
- if att.opening:
- self.executed = att.command_id - 1
+ def do_begin(self, begin):
+ self.begin_rcvd = True
- def detach(self, closing, exception=None):
- if self.detaches_sent > self.attaches_sent:
- raise SessionError("double detach")
- # process any outstanding work before detaching
+ def end(self, error=None):
+ if self.end_sent:
+ raise SessionError("already ended")
+ # process any outstanding work before ending
self.tick()
- self.write_op(Detach(name = self.name,
- closing = closing,
- acknowledged = self.acknowledged,
- executed = self.executed,
- exception = exception))
- self.detaches_sent += 1
-
- def do_detach(self, det):
- if self.detaches_rcvd > self.attaches_rcvd:
- raise SessionError("double detach")
- else:
- self.detaches_rcvd += 1
-
- def do_empty(self, op):
- pass
-
- def do_link(self, link_cmd):
- if self.links.has_key(link_cmd.name):
- link = self.links[link_cmd.name]
- else:
- link = self.factory(link_cmd)
- self.add(link)
+ self.post_frame(End())
+ self.end_sent = True
- self.handles[link_cmd.handle] = link
- self.done(link_cmd)
- link.write(link_cmd)
-
- def do_unlink(self, unlink):
- link = self.handles.pop(unlink.handle)
- self.done(unlink)
- link.write(unlink)
+ def do_end(self, det):
+ self.end_rcvd = True
def add(self, link):
link.session = self
- link.handle = self.allocate_handle()
self.links[link.name] = link
def allocate_handle(self):
return max([-1] + [l.handle for l in self.links.values()]) + 1
def remove(self, link):
+ # process any outstanding work before removing
+ self.tick()
link.session = None
link.handle = None
del self.links[link.name]
+
+ def do_attach(self, attach):
+ if attach.handle in self.handles:
+ raise SessionError("double attach")
+
+ if self.links.has_key(attach.name):
+ link = self.links[attach.name]
+ else:
+ link = self.factory(attach)
+ self.add(link)
+
+ self.handles[attach.handle] = link
+ link.write(attach)
+
+ def do_flow(self, flow):
+ link = self.handles[flow.handle]
+ link.do_flow(flow.flow_state)
+
+ def do_transfer(self, xfr):
+ link = self.handles[xfr.handle]
+ self.incoming.append(link, xfr)
+ link.write(xfr)
+
+ def do_disposition(self, disp):
+ if disp.extents:
+ # XXX
+ direction = self.directions[1 - disp.direction]
+ for e in disp.extents:
+ start = max(direction.unsettled_lwm, e.first)
+ for id in range(start, e.last+1):
+ delivery = direction.get_delivery(id)
+ if delivery:
+ link, tag = delivery
+ link.do_disposition(tag, e.state, e.settled)
+
+ def do_detach(self, detach):
+ if detach.handle not in self.handles:
+ raise SessionError("double detach")
+
+ link = self.handles.pop(detach.handle)
+ link.write(detach)
+
+ def tick(self):
+ for link in self.links.values():
+ if (link.modified or link.local_state is ATTACHED) and link.handle is None:
+ link.handle = self.allocate_handle()
+ self.post_frame(Attach(name = link.name,
+ handle = link.handle,
+ flow_state = link.flow_state(),
+ direction = link.direction,
+ local = link.local,
+ remote = link.remote))
+ link.modified = False
+
+ if link.handle is not None:
+ self.process_link(link)
+
+ if link.local_state is DETACHED:
+ self.post_frame(Detach(handle=link.handle))
+ link.handle = None
+
+ def process_link(self, l):
+ # XXX
+ if l.direction == Sender.direction:
+ while l.outgoing:
+ xfr = l.outgoing.pop(0)
+ xfr.handle = l.handle
+ self.outgoing.append(l, xfr)
+ self.post_frame(xfr)
+
+ if l.modified:
+ self.post_frame(Flow(handle=l.handle, flow_state=l.flow_state()))
+ l.modified = False
+
+ states = {}
+
+ for dtag, local in l.get_local(modified=True):
+ direction = self.directions[l.direction]
+ ids = direction.transfers[(l, dtag)]
+ if local in states:
+ ranges = states[local]
+ else:
+ ranges = RangeSet()
+ states[local] = ranges
+ for r in ids:
+ ranges.add_range(r)
+
+ if local.settled:
+ direction.settle(l, dtag)
+ l.unsettled.pop(dtag)
+ local.modified = False
+
+ for local, ranges in states.items():
+ extents = []
+ for r in ranges:
+ ext = Extent(r.lower, r.upper, settled=local.settled, state=local.state)
+ extents.append(ext)
+ self.post_frame(Disposition(direction=l.direction, extents=extents))
+
+class DeliveryMap:
+
+ def __init__(self):
+ # (link, delivery_tag) -> ranges
+ self.transfers = {}
+ # transfer_id -> (link, delivery_tag)
+ self.deliveries = []
+ # lowest unsettled transfer_id
+ self.unsettled_lwm = None
+ # highest unsettled transfer_id
+ self.unsettled_hwm = None
+ self.capacity = None
+ self.init()
+
+ def append(self, link, transfer):
+ delivery = (link, transfer.delivery_tag)
+ self.mark(transfer)
+ id = transfer.transfer_id
+ if delivery in self.transfers:
+ ranges = self.transfers[delivery]
+ else:
+ ranges = RangeSet()
+ self.transfers[delivery] = ranges
+ ranges.add(id)
+ self.deliveries.append(delivery)
+
+ def settle(self, link, delivery_tag):
+ delivery = (link, delivery_tag)
+ ranges = self.transfers.pop(delivery)
+ for r in ranges:
+ for id in r:
+ idx = id - self.unsettled_lwm
+ assert self.deliveries[idx] == delivery
+ self.deliveries[idx] = None
+
+ while self.deliveries:
+ if self.deliveries[0] is None:
+ self.deliveries.pop(0)
+ self.unsettled_lwm += 1
+ else:
+ break
+
+ def get_delivery(self, transfer_id):
+ return self.deliveries[transfer_id - self.unsettled_lwm]
+
+ def __repr__(self):
+ return "%s(%r, %r, %s, %s)" % (self.__class__, self.transfers, self.deliveries,
+ self.unsettled_lwm, self.unsettled_hwm)
+
+class Incoming(DeliveryMap):
+
+ def init(self):
+ pass
+
+ def mark(self, transfer):
+ if self.unsettled_lwm is None:
+ self.unsettled_lwm = transfer.transfer_id
+ else:
+ assert transfer.transfer_id == self.unsettled_hwm + 1
+ self.unsettled_hwm = transfer.transfer_id
+
+class Outgoing(DeliveryMap):
+
+ def init(self):
+ self.unsettled_lwm = 1
+ self.unsettled_hwm = 0
+
+ def mark(self, transfer):
+ assert transfer.transfer_id is None
+ self.unsettled_hwm += 1
+ transfer.transfer_id = self.unsettled_hwm
View
103 test.py
@@ -57,12 +57,12 @@ def __repr__(self):
print
-from framing import FrameDecoder, FrameEncoder, ConnectionFrame, SessionFrame
+from framing import Frame, FrameDecoder, FrameEncoder
frenc = FrameEncoder()
-frenc.write(ConnectionFrame(0, 0, "frame1 body"))
-frenc.write(SessionFrame(0, 0, 0, 0, 0, 0, "frame2 body"))
-frenc.write(ConnectionFrame(0, 0, "frame3 body"))
+frenc.write(Frame(0, 1, None, "frame1 body"))
+frenc.write(Frame(0, 2, "extended header", "frame2 body"))
+frenc.write(Frame(1, 0, "e", "frame3 body"))
bytes = frenc.read()
print "Encoded Frames:", repr(bytes)
@@ -80,10 +80,14 @@ def connection():
from connection import Connection
from session import Session
from link import Sender, Receiver, link
- from operations import Fragment
+ from protocol import Fragment, Linkage
a = Connection(lambda n: Session(n, link))
b = Connection(lambda n: Session(n, link))
+ a.id = "A"
+ a.tracing = set(["ops", "err"])
+ b.id = "B"
+ b.tracing = set(["ops", "err"])
def pump():
while a.pending() or b.pending():
@@ -97,20 +101,18 @@ def pump():
a.open(hostname="asdf")
b.open()
- s.attach()
- s2.attach()
- s.detach(False)
- s.attach()
- l = Sender("qwer", source="S", target="T")
+ s.begin()
+ s2.begin()
+ l = Sender("qwer", local=Linkage("S", "T"))
s.add(l)
- l.link()
+ l.attach()
pump()
- bssn = b.sessions["test-ssn"]
- bssn.attach()
+ bssn = [x for x in b.incoming.values() if x.name == "test-ssn"][0]
+ bssn.begin()
bl = bssn.links["qwer"]
- bl.link()
+ bl.attach()
bl.flow(10)
pump()
@@ -123,30 +125,30 @@ def pump():
ln = bssn.links["qwer"]
x = ln.get()
print "INCOMING XFR:", x
- ln.ack(x)
+ ln.disposition(x.delivery_tag, "ACCEPTED")
xfr = ln.get()
print "INCOMING XFR:", xfr
- ln.ack(xfr, key="value")
+ ln.disposition(xfr.delivery_tag, "ASDF")
print "--"
pump()
print "--"
- print "DISPOSITION", l.get()
+ print "DISPOSITION", l.get_remote(modified=True)
l.settle(tag)
- l.unlink()
- bl.unlink()
+ l.detach()
+ bl.detach()
pump()
- s.detach(True)
+ s.end(True)
pump()
- bssn.detach(True)
- s2.detach(True)
+ bssn.end(True)
+ s2.end(True)
a.close()
b.close()
@@ -158,46 +160,49 @@ def session():
from connection import Connection
from session import Session
from link import link, Sender, Receiver
+ from protocol import Fragment, Linkage
- a = Connection(None)
+ a = Connection(lambda n: Session(n, link))
a.tracing = set(["ops", "err"])
a.id = "A"
- b = Connection(None)
+ b = Connection(lambda n: Session(n, link))
b.tracing = set(["err"])
b.id = "B"
ssn = Session("test", link)
a.add(ssn)
- nss = Session("test", link)
- b.add(nss)
+ ssn.begin()
+# nss = Session("test", link)
+# b.add(nss)
def pump():
a.tick()
b.tick()
b.write(a.read())
a.write(b.read())
- ssn.attach()
- nss.attach()
+ pump()
+
+ nss = [s for s in b.incoming.values() if s.name == "test"][0]
+ nss.begin()
snd = Sender("L", "S", "T")
ssn.add(snd)
rcv = Receiver("L", "S", "T")
nss.add(rcv)
- snd.link()
- rcv.link()
+ snd.attach()
+ rcv.attach()
rcv.flow(10)
pump()
- from operations import Fragment
snd.send(fragments=Fragment(True, True, 0, 0, "m1"))
snd.send(fragments=Fragment(True, True, 0, 0, "m2"))
dt3 = snd.send(fragments=Fragment(True, True, 0, 0, "m3"))
pump()
- print rcv.pending()
+ print "PENDING", rcv.pending()
pump()
@@ -211,50 +216,50 @@ def pump():
xfrs.append(x)
print "XFR", x
- rcv.ack(xfrs[-1])
+ rcv.disposition(xfrs[-1].delivery_tag, "ACCEPTED")
pump()
snd.send(fragments=Fragment(True, True, 0, 0, "m5"))
pump()
- print nss.exe_deferred
-
- print nss.executed
- rcv.ack(xfrs[0])
- print nss.executed
- print nss.syncpoint
+ rcv.disposition(xfrs[0].delivery_tag, "ACCEPTED")
print "----------"
pump()
print "----------"
- print nss.executed
+ print "ssn.outgoing:", ssn.outgoing
+ print "snd.unsettled:", snd.unsettled
for xfr in xfrs[1:-1]:
- rcv.ack(xfr)
- print nss.executed, nss.exe_deferred
- print rcv.pending()
- rcv.ack(rcv.get())
+ rcv.disposition(xfr.delivery_tag, "ACCEPTED")
+ print "rcv.unsettled", rcv.unsettled
+ print "rcv.pending()", rcv.pending()
+ rcv.disposition(rcv.get().delivery_tag)
pump()
print "----------"
- print ssn.acknowledged, ssn.ack_deferred
+ print "ssn.outgoing:", ssn.outgoing
+ print "snd.unsettled:", snd.unsettled
+ print "settling:", dt3
snd.settle(dt3)
- print ssn.acknowledged, ssn.ack_deferred
+ print "ssn.outgoing:", ssn.outgoing
+ print "snd.unsettled:", snd.unsettled
for dt in list(snd.unsettled):
snd.settle(dt)
- snd.unlink()
- rcv.unlink()
+ snd.detach()
+ rcv.detach()
pump()
- print ssn.acknowledged, ssn.ack_deferred, ssn.on_exe
+ print "ssn.outgoing:", ssn.outgoing
+ print "snd.unsettled:", snd.unsettled
print "=========="
session()
View
300 transport.xml
@@ -34,8 +34,8 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-->
<amqp name="transport" xmlns="http://www.amqp.org/schema/amqp.xsd">
- <section name="controls">
- <type name="open" class="compound">
+ <section name="frame-bodies">
+ <type name="open" class="compound" source="list" provides="frame">
<descriptor name="amqp:open:list" code="0x00000001:0x00000201"/>
<field name="options" type="options"/>
<field name="container-id" type="string" required="true"/>
@@ -49,223 +49,113 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
<field name="offered-capabilities" type="symbol" multiple="true"/>
<field name="desired-capabilities" type="symbol" multiple="true"/>
</type>
- <type name="attach" class="compound">
- <descriptor name="amqp:attach:list" code="0x00000001:0x00000202"/>
+ <type name="begin" class="compound" source="list" provides="frame">
+ <descriptor name="amqp:begin:list" code="0x00000001:0x00000202"/>
<field name="options" type="options"/>
- <field name="name" type="session-name" required="true"/>
- <field name="opening" type="boolean"/>
- <field name="received" type="sequence-no"/>
- <field name="timeout" type="uint"/>
- <field name="txn-mode" type="txn-level"/>
- <field name="txn-support" type="txn-level"/>
- </type>
- <type name="detach" class="compound">
- <descriptor name="amqp:detach:list" code="0x00000001:0x00000203"/>
- <field name="options" type="options"/>
- <field name="name" type="session-name" required="true"/>
- <field name="closing" type="boolean"/>
- <field name="exception" type="session-error"/>
- </type>
- <type name="close" class="compound">
- <descriptor name="amqp:close:list" code="0x00000001:0x00000204"/>
- <field name="options" type="options"/>
- <field name="exception" type="connection-error"/>
- </type>
- </section>
- <section name="commands">
- <type name="noop" class="compound">
- <descriptor name="amqp:noop:list" code="0x00000001:0x00000301"/>
- <field name="options" type="options"/>
- </type>
- <type name="enlist" class="compound">
- <descriptor name="amqp:enlist:list" code="0x00000001:0x00000302"/>
- <exception name="illegal-state" error-code="illegal-state"/>
- <exception name="already-known" error-code="not-allowed"/>
- <exception name="join-and-resume" error-code="not-allowed"/>
- <exception name="xa-rbrollback" error-code="xa-rbrollback"/>
- <exception name="xa-rbtimeout" error-code="xa-rbtimeout"/>
- <field name="options" type="options"/>
- <field name="xid" type="xid" required="true">
- <exception name="unknown-xid" error-code="not-allowed"/>
- </field>
- <field name="join" type="boolean">
- <exception name="unsupported" error-code="not-implemented"/>
- </field>
- <field name="resume" type="boolean"/>
- </type>
- <type name="txn" class="compound">
- <descriptor name="amqp:txn:list" code="0x00000001:0x00000303"/>
- <exception name="illegal-state" error-code="illegal-state"/>
- <exception name="suspend-and-fail" error-code="not-allowed"/>
- <exception name="xa-rbrollback" error-code="xa-rbrollback"/>
- <exception name="xa-rbtimeout" error-code="xa-rbtimeout"/>
- <field name="options" type="options"/>
- <field name="fail" type="boolean"/>
- <field name="suspend" type="boolean"/>
+ <field name="remote-channel" type="ushort"/>
+ <field name="name" type="string"/>
</type>
- <type name="link" class="compound">
- <descriptor name="amqp:link:list" code="0x00000001:0x00000304"/>
+ <type name="attach" class="compound" source="list" provides="frame">
+ <descriptor name="amqp:attach:list" code="0x00000001:0x00000304"/>
<field name="options" type="options"/>
<field name="name" type="string" required="true"/>
- <field name="scope" type="scope"/>
<field name="handle" type="handle" required="true">
- <exception name="duplicate-link" error-code="invalid-argument"/>
- <exception name="duplicate-handle" error-code="invalid-argument"/>
+ <error name="handle-busy" type="amqp-error" value="invalid-field"/>
</field>
+ <field name="flow-state" type="flow-state" required="true"/>
<field name="direction" type="direction"/>
- <field name="source" type="map"/>
- <field name="target" type="map"/>
+ <field name="local" type="linkage"/>
+ <field name="remote" type="linkage"/>
+ <field name="durable" type="boolean"/>
+ <field name="opening" type="boolean"/>
+ <field name="timeout" type="uint"/>
+ <field name="unsettled" type="map"/>
<field name="transfer-unit" type="ulong"/>
- <field name="resume-tag" type="delivery-tag"/>
- </type>
- <type name="relink" class="compound">
- <descriptor name="amqp:relink:list" code="0x00000001:0x00000305"/>
- <field name="options" type="options"/>
- <field name="handle" type="handle"/>
- <field name="source" type="map"/>
- <field name="target" type="map"/>
+ <field name="error-mode" type="error-mode"/>
</type>
- <type name="unlink" class="compound">
- <descriptor name="amqp:unlink:list" code="0x00000001:0x00000306"/>
- <field name="options" type="options"/>
- <field name="handle" type="handle"/>
- <field name="exception" type="link-error"/>
- </type>
- <type name="flow" class="compound">
+ <type name="flow" class="compound" source="list" provides="frame">
<descriptor name="amqp:flow:list" code="0x00000001:0x00000307"/>
<field name="options" type="options"/>
<field name="handle" type="handle" required="true">
- <exception name="nonexistent-link" error-code="invalid-argument"/>
- <exception name="invalid-directionality" error-code="invalid-argument"/>
+ <error name="unattached-handle" type="amqp-error" value="invalid-field"/>
</field>
- <field name="limit" type="sequence-no"/>
+ <field name="flow-state" type="flow-state" required="true"/>
+ <field name="echo" type="boolean"/>
</type>
- <type name="drain" class="compound">
- <descriptor name="amqp:drain:list" code="0x00000001:0x00000308"/>
- <field name="options" type="options"/>
- <field name="handle" type="handle" required="true">
- <exception name="nonexistent-link" error-code="invalid-argument"/>
- <exception name="invalid-directionality" error-code="invalid-argument"/>
- </field>
- </type>
- <type name="transfer" class="compound">
+ <type name="transfer" class="compound" source="list" provides="frame">
<descriptor name="amqp:transfer:list" code="0x00000001:0x00000309"/>
<field name="options" type="options"/>
<field name="handle" type="handle" required="true">
- <exception name="nonexistent-link" error-code="not-found"/>
+ <error name="unattached-handle" type="amqp-error" value="invalid-field"/>
</field>
+ <field name="flow-state" type="flow-state" required="true"/>
<field name="delivery-tag" type="delivery-tag" required="true"/>
+ <field name="transfer-id" type="transfer-number" required="true"/>
+ <field name="settled" type="boolean"/>
+ <field name="state" type="*"/>
+ <field name="resume" type="boolean"/>
<field name="more" type="boolean"/>
<field name="aborted" type="boolean"/>
+ <field name="batchable" type="boolean"/>
<field name="fragments" type="fragment" multiple="true"/>
</type>
- <type name="bar" class="compound">
- <descriptor name="amqp:bar:list" code="0x00000001:0x00009901"/>
+ <type name="disposition" class="compound" source="list" provides="frame">
+ <descriptor name="amqp:disposition:list" code="0x00000001:0x00009902"/>
<field name="options" type="options"/>
- <field name="handle" type="handle" required="true">
- <exception name="nonexistent-link" error-code="not-found"/>
- </field>
- <field name="barrier" type="delivery-tag"/>
+ <field name="direction" type="direction" required="true"/>
+ <field name="settled" type="transfer-number" required="true"/>
+ <field name="unsettled-limit" type="transfer-number" required="true"/>
+ <field name="batchable" type="boolean"/>
+ <field name="extents" type="extent" multiple="true"/>
+ </type>
+ <type name="detach" class="compound" source="list" provides="frame">
+ <descriptor name="amqp:detach:list" code="0x00000001:0x00000306"/>
+ <field name="options" type="options"/>
+ <field name="handle" type="handle" required="true"/>
+ <field name="closing" type="boolean"/>
+ <field name="error" type="error"/>
</type>
- <type name="disposition" class="compound">
- <descriptor name="amqp:disposition:list" code="0x00000001:0x00009902"/>
+ <type name="end" class="compound" source="list" provides="frame">
+ <descriptor name="amqp:end:list" code="0x00000001:0x00000203"/>
<field name="options" type="options"/>
- <field name="handle" type="handle" required="true">
- <exception name="nonexistent-link" error-code="not-found"/>
- </field>
- <field name="disposition" type="map" required="true"/>
- <field name="first" type="delivery-tag" required="true"/>
- <field name="last" type="delivery-tag"/>
+ <field name="error" type="error"/>
+ </type>
+ <type name="close" class="compound" source="list" provides="frame">
+ <descriptor name="amqp:close:list" code="0x00000001:0x00000204"/>
+ <field name="options" type="options