Skip to content

Commit

Permalink
updates to test proposed transfer changes
Browse files Browse the repository at this point in the history
  • Loading branch information
rhs committed Jun 8, 2011
1 parent c802dcc commit fa1d44d
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 131 deletions.
5 changes: 2 additions & 3 deletions broker
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,8 @@ class Broker:
link.drained()
break
else:
# XXX: we should pass the format through
link.send(delivery_tag = tag, message_format = 0,
fragments = xfr.fragments)
link.send(delivery_tag = tag, message_format = xfr.message_format,
payload = xfr.payload)

for t, _, r in link.get_remote(modified=True):
if r.settled or r.state is not None:
Expand Down
2 changes: 1 addition & 1 deletion client.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ def send(self, message=None, delivery_tag=None, **kwargs):
self.wait(self.capacity)
if message:
kwargs["message_format"] = 0
kwargs["fragments"] = encode(message)
kwargs["payload"] = encode(message)
return self.proto.send(delivery_tag=delivery_tag, **kwargs)

class Receiver(Link):
Expand Down
10 changes: 5 additions & 5 deletions codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,17 +262,17 @@ def __init__(self, encodings=ENCODINGS):
for enc in encodings:
self.encodings[enc.code] = (enc, getattr(self, "dec_%s" % enc.name))
self.constructors = {
UNDESCRIBED: lambda d, v: v
UNDESCRIBED: lambda t, v, d: v
}

def construct(self, descriptor, value):
def construct(self, type, value, descriptor):
constructor = self.constructors.get(descriptor, Value)
return constructor(descriptor, value)
return constructor(type, value, descriptor)

def decode(self, bytes):
descriptor, (encoding, decoder), bytes = self.decode_type(bytes)
value, bytes = decoder(bytes)
return self.construct(descriptor, value), bytes
return self.construct(encoding.type, value, descriptor), bytes

def decode_type(self, bytes):
code, bytes = self.unpack("!B", bytes)
Expand Down Expand Up @@ -411,7 +411,7 @@ def dec_array(self, format, bytes, constructor=identity):
values = []
while count > 0:
element, bytes = decoder(bytes)
values.append(self.construct(descriptor, element))
values.append(self.construct(encoding.type, element, descriptor))
count -= 1

return Array(encoding.type, values), bytes
Expand Down
6 changes: 3 additions & 3 deletions composite.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ def __init__(self, name, key, type, source, descriptor, mandatory, multiple,
self.default = default

def __repr__(self):
return "Field(%r, %r, %r, %r, %r, %r, %r)" % \
(self.name, self.type, self.source, self.mandatory, self.multiple,
self.category, self.default)
return "Field(%r, %r, %r, %r, %r, %r, %r, %r, %r)" % \
(self.name, self.key, self.type, self.source, self.descriptor,
self.mandatory, self.multiple, self.category, self.default)

OPENS = "[("
CLOSES = ")]"
Expand Down
6 changes: 4 additions & 2 deletions dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,15 @@ def __framing(self):

def process_frame(self, f):
body, remainder = self.type_decoder.decode(f.payload)
assert remainder == ""
body.payload = remainder
self.trace("frm", "RECV[%s]: %s", f.channel, body.format(self.multiline))
return getattr(self, "do_%s" % body.NAME, self.unhandled)(f.channel, body)

def post_frame(self, channel, body):
self.trace("frm", "SENT[%s]: %s", channel, body.format(self.multiline))
f = Frame(self.frame_type, channel, None, self.type_encoder.encode(body))
encoded = self.type_encoder.encode(body)
if body.payload: encoded += body.payload
f = Frame(self.frame_type, channel, None, encoded)
self.output.write(encode(f))

def read(self, n=None):
Expand Down
54 changes: 17 additions & 37 deletions link.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
# under the License.
#

from protocol import Attach, Flow, Transfer, Fragment, Disposition, Detach
from protocol import Attach, Flow, Transfer, Disposition, Detach
from util import Constant, RangeSet
from uuid import uuid4

Expand Down Expand Up @@ -281,39 +281,18 @@ def send(self, **kwargs):
def xfr_overhead(self, xfr):
return 192 + len(xfr.delivery_tag or "")

def frag_overhead(self, frag):
return 48

def fragment(self, **kwargs):
fragments = list(kwargs.pop("fragments", []))
payload = kwargs.pop("payload", "")
result = []
xfr = Transfer(**kwargs)
xfr.fragments = []
result.append(xfr)
remaining = self.session.max_frame_size - self.xfr_overhead(xfr)
while fragments:
f = fragments.pop(0)
remaining -= self.frag_overhead(f)
if len(f.payload) > remaining:
if remaining > 0:
p1 = f.payload[:remaining]
p2 = f.payload[remaining:]
f1 = Fragment(f.first, False, f.section_code, f.section_number,
f.section_offset, p1)
f2 = Fragment(False, f.last, f.section_code, f.section_number,
f.section_offset + remaining, p2)
fragments.insert(0, f2)
xfr.fragments.append(f1)
else:
fragments.insert(0, f)
xfr.more = True
xfr = Transfer(**kwargs)
result.append(xfr)
xfr.fragments = []
remaining = self.session.max_frame_size - self.xfr_overhead(xfr)
else:
xfr.fragments.append(f)
remaining -= len(f.payload)
while True:
xfr = Transfer(**kwargs)
max_size = self.session.max_frame_size - self.xfr_overhead(xfr)
xfr.payload = payload[:max_size]
payload = payload[max_size:]
if result: result[-1].more = True
result.append(xfr)
if not payload:
break
return result

class Receiver(Link):
Expand All @@ -324,7 +303,7 @@ class Receiver(Link):
def init(self):
self.incoming = []
self.tag = None
self.fragments = []
self.payloads = []

def do_transfer(self, xfr):
self.session.incoming.append(self, xfr)
Expand All @@ -333,14 +312,15 @@ def do_transfer(self, xfr):
elif self.tag != xfr.delivery_tag:
raise ValueError("mismatched tags: %s, %s" % (self.tag, xfr.delivery_tag))

self.fragments.extend(xfr.fragments)
if xfr.payload:
self.payloads.extend(xfr.payload)

if not xfr.more:
self.tag = None
fragments = self.fragments
payload = "".join(self.payloads)
self.tag = None
self.fragments = []
xfr.fragments = fragments
self.payloads = []
xfr.payload = payload
self.incoming.append(xfr)
self.unsettled[xfr.delivery_tag] = (State(), State(xfr.state, xfr.settled))
self.link_credit -= 1
Expand Down
113 changes: 70 additions & 43 deletions messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
# under the License.
#

from protocol import Fragment, Header, Properties, Footer
from connection import Connection
from codec import Value
from protocol import Header, Properties, Footer, PROTOCOL_DECODER, PROTOCOL_ENCODER

class Message:

Expand Down Expand Up @@ -47,59 +47,86 @@ def __repr__(self):
args.append("%s=%r" % (f, v))
return "Message(%s)" % ", ".join(args)

# XXX: encode(message) -> fragments, decode(transfer) -> message
# XXX: encode: message -> str, decode: transfer -> message

def encode(message):
encoder = Connection.type_encoder
# XXX: constants
head = Fragment(True, True, 0, 0, 0, encoder.encode(message.header))
prop = Fragment(True, True, 3, 1, 0, encoder.encode(message.properties))
body = Fragment(True, True, 5, 2, 0, message.content)
foot = Fragment(True, True, 9, 3, 0, encoder.encode(message.footer))
return (head, prop, body, foot)
encoder = PROTOCOL_ENCODER
encoded = ""
if message.header:
encoded += encoder.encode(message.header)
if message.properties:
encoded += encoder.encode(message.properties)
if message.content is not None:
# XXX: should dispatch
if isinstance(message.content, str):
encoded += encoder.encode(Value("binary", message.content,
Value("long", 0x75)))
elif isinstance(message.content, dict):
encoded += encoder.encode(Value("map", message.content,
Value("long", 0x77)))
else:
encoded += encoder.encode(Value("list", [message.content],
Value("long", 0x76)))
if message.footer:
encoded += encoder.encode(message.footer)
return encoded

def process_header(msg, bytes):
msg.header = Connection.type_decoder.decode(bytes)[0]
def process_delivery_annotations(msg, bytes):
def process_header(msg, header):
msg.header = header
def process_properties(msg, props):
msg.properties = props

def process_delivery_annotations(msg, ann):
# XXX: we drop these
print "warning, ignoring delivery annotations"
def process_message_annotations(msg, bytes):
msg.annotations = Connection.type_decoder.decode(bytes)[0]
def process_properties(msg, bytes):
msg.properties = Connection.type_decoder.decode(bytes)[0]
def process_application_properties(msg, bytes):
print "warning, ignoring delivery annotations", ann
def process_message_annotations(msg, ann):
msg.annotations = ann
def process_application_properties(msg, props):
# XXX: we don't do anything with this yet
print "warning, ignoring app properties"
def process_data(msg, bytes):
msg.content = bytes
def process_amqp_data(msg, bytes):
msg.content = Connection.type_decoder.decode(bytes)[0]
def process_footer(msg, bytes):
msg.footer = Connection.type_decoder.decode(bytes)[0]
print "warning, ignoring app properties", props
def process_data(msg, v):
msg.content = v.value
def process_sequence(msg, v):
if msg.content is None:
msg.content = []
else:
msg.content.extend(v.value)
def process_mappings(msg, v):
if msg.content is None:
msg.content = {}
else:
msg.content.update(v.value)

VALUE_PROCESSORS = {
0x71: process_delivery_annotations,
0x72: process_message_annotations,
0x74: process_application_properties,
0x75: process_data,
0x76: process_sequence,
0x77: process_mappings
}

def process_value(msg, value):
VALUE_PROCESSORS[value.descriptor](msg, value)
def process_footer(msg, footer):
msg.footer = footer

SECTION_PROCESSORS = {
0: process_header,
1: process_delivery_annotations,
2: process_message_annotations,
3: process_properties,
4: process_application_properties,
5: process_data,
6: process_amqp_data,
7: process_amqp_data,
8: process_amqp_data,
9: process_footer
Header: process_header,
Properties: process_properties,
Value: process_value,
Footer: process_footer
}

def decode(transfer):
message = Message()
message.delivery_tag = transfer.delivery_tag
fragments = transfer.fragments
remaining = transfer.payload
sections = []
for f in fragments:
if f.first:
sections.append([f.section_code, ""])
sections[-1][-1] += f.payload
while remaining:
sect, remaining = PROTOCOL_DECODER.decode(remaining)
sections.append(sect)
while sections:
code, payload = sections.pop(0)
SECTION_PROCESSORS[code](message, payload)
sect = sections.pop(0)
SECTION_PROCESSORS[sect.__class__](message, sect)
return message
39 changes: 22 additions & 17 deletions messaging.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,22 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

<amqp name="messaging" xmlns="http://www.amqp.org/schema/amqp.xsd">
<section name="message-format">
<type name="section-codes" class="restricted" source="uint">
<choice name="header" value="0"/>
<choice name="delivery-annotations" value="1"/>
<choice name="message-annotations" value="2"/>
<choice name="properties" value="3"/>
<choice name="application-properties" value="4"/>
<choice name="data" value="5"/>
<choice name="amqp-data" value="6"/>
<choice name="amqp-map" value="7"/>
<choice name="amqp-list" value="8"/>
<choice name="footer" value="9"/>
</type>
<type name="header" class="composite" source="list" provides="section">
<descriptor name="amqp:header:list" code="0x00000000:0x00000020"/>
<descriptor name="amqp:header:list" code="0x00000000:0x00000070"/>
<field name="durable" type="boolean"/>
<field name="priority" type="ubyte"/>
<field name="ttl" type="milliseconds"/>
<field name="first-acquirer" type="boolean"/>
<field name="delivery-count" type="uint"/>
</type>
<type name="delivery-annotations" class="restricted" source="annotations" provides="section">
<descriptor name="amqp:delivery-annotations:map" code="0x00000000:0x00000071"/>
</type>
<type name="message-annotations" class="restricted" source="annotations" provides="section">
<descriptor name="amqp:message-annotations:map" code="0x00000000:0x00000072"/>
</type>
<type name="properties" class="composite" source="list" provides="section">
<descriptor name="amqp:properties:list" code="0x00000000:0x00000021"/>
<descriptor name="amqp:properties:list" code="0x00000000:0x00000073"/>
<field name="message-id" type="*" requires="message-id"/>
<field name="user-id" type="binary"/>
<field name="to" type="*" requires="address"/>
Expand All @@ -70,18 +64,29 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
<field name="group-sequence" type="sequence-no"/>
<field name="reply-to-group-id" type="string"/>
</type>
<type name="application-properties" class="restricted" source="map" provides="section">
<descriptor name="amqp:application-properties:map" code="0x00000000:0x00000074"/>
</type>
<type name="data" class="restricted" source="binary" provides="section">
<descriptor name="amqp:data:binary" code="0x00000000:0x00000075"/>
</type>
<type name="sequence" class="restricted" source="list" provides="section">
<descriptor name="amqp:sequence:list" code="0x00000000:0x00000076"/>
</type>
<type name="mappings" class="restricted" source="map" provides="section">
<descriptor name="amqp:mappings:map" code="0x00000000:0x00000077"/>
</type>
<type name="footer" class="composite" source="list" provides="section">
<descriptor name="amqp:footer:list" code="0x00000000:0x00000022"/>
<descriptor name="amqp:footer:list" code="0x00000000:0x00000078"/>
<field name="message-attrs" type="map"/>
<field name="delivery-attrs" type="map"/>
</type>
<type name="annotations" class="restricted" source="map"/>
<type name="message-id-ulong" class="restricted" source="ulong" provides="message-id"/>
<type name="message-id-uuid" class="restricted" source="uuid" provides="message-id"/>
<type name="message-id-binary" class="restricted" source="binary" provides="message-id"/>
<type name="message-id-string" class="restricted" source="string" provides="message-id"/>
<type name="address-string" class="restricted" source="string" provides="address"/>
<type name="annotations" class="restricted" source="fields"/>
<type name="application-properties" class="restricted" source="map"/>
<definition name="MESSAGE-FORMAT" value="0"/>
</section>
<section name="delivery-state">
Expand Down
Loading

0 comments on commit fa1d44d

Please sign in to comment.