Skip to content

Commit

Permalink
Merge branch 'master' of github.com:ruby-amqp/amq-protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Klishin committed Apr 10, 2013
2 parents 793cf26 + d618043 commit 9ee82b6
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 75 deletions.
4 changes: 2 additions & 2 deletions Gemfile
@@ -1,13 +1,13 @@
# encoding: utf-8

source :rubygems
source "https://rubygems.org"

group :development do
# excludes Windows, Rubinius and JRuby
gem "perftools.rb", :platform => :mri_18
end

group :test do
gem "rspec", ">= 2.6.0"
gem "rspec", ">= 2.13.0"
gem "effin_utf8"
end
10 changes: 5 additions & 5 deletions codegen/codegen_helpers.py
Expand Up @@ -16,9 +16,9 @@ def genSingleEncode(spec, cValue, unresolved_domain):
elif type == 'long':
buffer.append("buffer << [%s].pack(PACK_UINT32)" % (cValue,))
elif type == 'longlong':
buffer.append("buffer << AMQ::Hacks.pack_64_big_endian(%s)" % (cValue,))
buffer.append("buffer << AMQ::Hacks.pack_uint64_big_endian(%s)" % (cValue,))
elif type == 'timestamp':
buffer.append("buffer << AMQ::Hacks.pack_64_big_endian(%s)" % (cValue,))
buffer.append("buffer << AMQ::Hacks.pack_uint64_big_endian(%s)" % (cValue,))
elif type == 'bit':
raise "Can't encode bit in genSingleEncode"
elif type == 'table':
Expand Down Expand Up @@ -58,7 +58,7 @@ def genSingleDecode(spec, field):
buffer.append("%s = data[offset, 4].unpack(PACK_UINT32).first" % (cLvalue,))
buffer.append("offset += 4")
elif type == 'longlong':
buffer.append("%s = AMQ::Hacks.unpack_64_big_endian(data[offset, 8]).first" % (cLvalue,))
buffer.append("%s = AMQ::Hacks.unpack_uint64_big_endian(data[offset, 8]).first" % (cLvalue,))
buffer.append("offset += 8")
elif type == 'timestamp':
buffer.append("%s = data[offset, 8].unpack(PACK_UINT32_X2).first" % (cLvalue,))
Expand Down Expand Up @@ -91,13 +91,13 @@ def genSingleSimpleDecode(spec, field):
elif type == 'longstr':
buffer.append("data.to_s")
elif type == 'octet':
buffer.append("data.unpack(PACK_CHAR).first")
buffer.append("data.unpack(PACK_INT8).first")
elif type == 'short':
buffer.append("data.unpack(PACK_UINT16).first")
elif type == 'long':
buffer.append("data.unpack(PACK_UINT32).first")
elif type == 'longlong':
buffer.append("AMQ::Hacks.unpack_64_big_endian(data).first")
buffer.append("AMQ::Hacks.unpack_uint64_big_endian(data).first")
elif type == 'timestamp':
buffer.append("Time.at(data.unpack(PACK_UINT32_X2).last)")
elif type == 'bit':
Expand Down
8 changes: 5 additions & 3 deletions codegen/protocol.rb.pytemplate
Expand Up @@ -20,6 +20,7 @@ module AMQ
# caching
EMPTY_STRING = "".freeze

PACK_INT8 = 'c'.freeze
PACK_CHAR = 'C'.freeze
PACK_UINT16 = 'n'.freeze
PACK_UINT16_X2 = 'n2'.freeze
Expand All @@ -30,7 +31,7 @@ module AMQ
PACK_CHAR_UINT16_UINT32 = 'cnN'.freeze

PACK_32BIT_FLOAT = 'f'.freeze
PACK_64BIT_FLOAT = 'd'.freeze
PACK_64BIT_FLOAT = 'G'.freeze



Expand Down Expand Up @@ -262,7 +263,7 @@ module AMQ

# result = [${klass.index}, 0, body_size, flags].pack('n2Qn')
result = [${klass.index}, 0].pack(PACK_UINT16_X2)
result += AMQ::Hacks.pack_64_big_endian(body_size)
result += AMQ::Hacks.pack_uint64_big_endian(body_size)
result += [flags].pack(PACK_UINT16)
result + pieces.join(EMPTY_STRING)
end
Expand Down Expand Up @@ -392,7 +393,8 @@ module AMQ
frames << HeaderFrame.new(properties_payload, channel)
% endif
% if "payload" in method.args():
frames + self.encode_body(payload, channel, frame_size)
frames += self.encode_body(payload, channel, frame_size)
frames
% endif
% else:
MethodFrame.new(buffer, channel)
Expand Down
15 changes: 15 additions & 0 deletions lib/amq/endianness.rb
@@ -0,0 +1,15 @@
module AMQ
module Endianness
BIG_ENDIAN = ([1].pack("s") == "\x00\x01")

def big_endian?
BIG_ENDIAN
end

def little_endian?
!BIG_ENDIAN
end

extend self
end
end
46 changes: 33 additions & 13 deletions lib/amq/hacks.rb
@@ -1,33 +1,53 @@
# encoding: binary

require 'amq/endianness'

# Ruby doesn't support pack to/unpack from
# 64bit string in network byte order.
module AMQ
module Hacks
BIG_ENDIAN = ([1].pack("s") == "\x00\x01")
Q = "Q".freeze
UINT64 = "Q".freeze
INT16 = "c".freeze

if Endianness.big_endian?
def self.pack_uint64_big_endian(long_long)
[long_long].pack(UINT64)
end

if BIG_ENDIAN
def self.pack_64_big_endian(long_long)
[long_long].pack(Q)
def self.unpack_uint64_big_endian(data)
data.unpack(UINT64)
end

def self.unpack_64_big_endian(data)
data.unpack(Q)
def self.pack_int16_big_endian(short)
[long_long].pack(INT16)
end

def self.unpack_int16_big_endian(data)
data.unpack(INT16)
end
else
def self.pack_64_big_endian(long_long)
result = [long_long].pack(Q)
def self.pack_uint64_big_endian(long_long)
result = [long_long].pack(UINT64)
result.bytes.to_a.reverse.map(&:chr).join
end

def self.unpack_uint64_big_endian(data)
data = data.bytes.to_a.reverse.map(&:chr).join
data.unpack(UINT64)
end

def self.pack_int16_big_endian(short)
result = [long_long].pack(INT16)
result.bytes.to_a.reverse.map(&:chr).join
end

def self.unpack_64_big_endian(data)
def self.unpack_int16_big_endian(data)
data = data.bytes.to_a.reverse.map(&:chr).join
data.unpack(Q)
data.unpack(INT16)
end
end
end
end

# AMQ::Hacks.pack_64_big_endian(17)
# AMQ::Hacks.unpack_64_big_endian("\x00\x00\x00\x00\x00\x00\x00\x11")
# AMQ::Hacks.pack_uint64_big_endian(17)
# AMQ::Hacks.unpack_uint64_big_endian("\x00\x00\x00\x00\x00\x00\x00\x11")
26 changes: 12 additions & 14 deletions lib/amq/protocol/client.rb
Expand Up @@ -19,6 +19,7 @@ module Protocol
# caching
EMPTY_STRING = "".freeze

PACK_INT8 = 'c'.freeze
PACK_CHAR = 'C'.freeze
PACK_UINT16 = 'n'.freeze
PACK_UINT16_X2 = 'n2'.freeze
Expand All @@ -29,10 +30,7 @@ module Protocol
PACK_CHAR_UINT16_UINT32 = 'cnN'.freeze

PACK_32BIT_FLOAT = 'f'.freeze
PACK_64BIT_FLOAT = 'd'.freeze

PACK_SIGNED_8BIT = 'c'.freeze
PACK_SIGNED_16BIT = 's'.freeze
PACK_64BIT_FLOAT = 'G'.freeze



Expand Down Expand Up @@ -268,6 +266,7 @@ def self.encode_body(body, channel, frame_size)
array = Array.new
while body
payload, body = body[0, limit], body[limit, body.length - limit]
# array << [0x03, payload]
array << BodyFrame.new(payload, channel)
end

Expand Down Expand Up @@ -1438,7 +1437,7 @@ def self.encode_message_id(value)
# 1 << 6
def self.encode_timestamp(value)
buffer = ''
buffer << AMQ::Hacks.pack_64_big_endian(value)
buffer << AMQ::Hacks.pack_uint64_big_endian(value)
[9, 0x0040, buffer]
end

Expand Down Expand Up @@ -1487,7 +1486,7 @@ def self.encode_properties(body_size, properties)

# result = [60, 0, body_size, flags].pack('n2Qn')
result = [60, 0].pack(PACK_UINT16_X2)
result += AMQ::Hacks.pack_64_big_endian(body_size)
result += AMQ::Hacks.pack_uint64_big_endian(body_size)
result += [flags].pack(PACK_UINT16)
result + pieces.join(EMPTY_STRING)
end
Expand Down Expand Up @@ -1796,7 +1795,6 @@ def self.encode(channel, payload, user_headers, exchange, routing_key, mandatory
properties_payload = Basic.encode_properties(payload.bytesize, properties)
frames << HeaderFrame.new(properties_payload, channel)
frames += self.encode_body(payload, channel, frame_size)

frames
end

Expand Down Expand Up @@ -1856,7 +1854,7 @@ def self.decode(data)
offset += 1
consumer_tag = data[offset, length]
offset += length
delivery_tag = AMQ::Hacks.unpack_64_big_endian(data[offset, 8]).first
delivery_tag = AMQ::Hacks.unpack_uint64_big_endian(data[offset, 8]).first
offset += 8
bit_buffer = data[offset, 1].unpack(PACK_CHAR).first
offset += 1
Expand Down Expand Up @@ -1925,7 +1923,7 @@ class GetOk < Protocol::Method
# @return
def self.decode(data)
offset = 0
delivery_tag = AMQ::Hacks.unpack_64_big_endian(data[offset, 8]).first
delivery_tag = AMQ::Hacks.unpack_uint64_big_endian(data[offset, 8]).first
offset += 8
bit_buffer = data[offset, 1].unpack(PACK_CHAR).first
offset += 1
Expand Down Expand Up @@ -1996,7 +1994,7 @@ class Ack < Protocol::Method
# @return
def self.decode(data)
offset = 0
delivery_tag = AMQ::Hacks.unpack_64_big_endian(data[offset, 8]).first
delivery_tag = AMQ::Hacks.unpack_uint64_big_endian(data[offset, 8]).first
offset += 8
bit_buffer = data[offset, 1].unpack(PACK_CHAR).first
offset += 1
Expand All @@ -2019,7 +2017,7 @@ def self.has_content?
def self.encode(channel, delivery_tag, multiple)
buffer = ''
buffer << @packed_indexes
buffer << AMQ::Hacks.pack_64_big_endian(delivery_tag)
buffer << AMQ::Hacks.pack_uint64_big_endian(delivery_tag)
bit_buffer = 0
bit_buffer = bit_buffer | (1 << 0) if multiple
buffer << [bit_buffer].pack(PACK_CHAR)
Expand All @@ -2044,7 +2042,7 @@ def self.has_content?
def self.encode(channel, delivery_tag, requeue)
buffer = ''
buffer << @packed_indexes
buffer << AMQ::Hacks.pack_64_big_endian(delivery_tag)
buffer << AMQ::Hacks.pack_uint64_big_endian(delivery_tag)
bit_buffer = 0
bit_buffer = bit_buffer | (1 << 0) if requeue
buffer << [bit_buffer].pack(PACK_CHAR)
Expand Down Expand Up @@ -2132,7 +2130,7 @@ class Nack < Protocol::Method
# @return
def self.decode(data)
offset = 0
delivery_tag = AMQ::Hacks.unpack_64_big_endian(data[offset, 8]).first
delivery_tag = AMQ::Hacks.unpack_uint64_big_endian(data[offset, 8]).first
offset += 8
bit_buffer = data[offset, 1].unpack(PACK_CHAR).first
offset += 1
Expand All @@ -2157,7 +2155,7 @@ def self.has_content?
def self.encode(channel, delivery_tag, multiple, requeue)
buffer = ''
buffer << @packed_indexes
buffer << AMQ::Hacks.pack_64_big_endian(delivery_tag)
buffer << AMQ::Hacks.pack_uint64_big_endian(delivery_tag)
bit_buffer = 0
bit_buffer = bit_buffer | (1 << 0) if multiple
bit_buffer = bit_buffer | (1 << 1) if requeue
Expand Down
2 changes: 1 addition & 1 deletion lib/amq/protocol/frame.rb
Expand Up @@ -161,7 +161,7 @@ def decode_payload
# the total size of the content body, that is, the sum of the body sizes for the
# following content body frames. Zero indicates that there are no content body frames.
# So this is NOT related to this very header frame!
@body_size = AMQ::Hacks.unpack_64_big_endian(@payload[4..11]).first
@body_size = AMQ::Hacks.unpack_uint64_big_endian(@payload[4..11]).first
@data = @payload[12..-1]
@properties = Basic.decode_properties(@data)
end
Expand Down
15 changes: 6 additions & 9 deletions lib/amq/protocol/table_value_decoder.rb
@@ -1,5 +1,6 @@
# encoding: binary

require "amq/endianness"
require "amq/protocol/client"
require "amq/protocol/type_constants"
require "amq/protocol/table"
Expand All @@ -20,10 +21,6 @@ class TableValueDecoder
# API
#

BIG_ENDIAN = ([1].pack("s") == "\x00\x01")
Q = "q".freeze


def self.decode_array(data, initial_offset)
array_length = data.slice(initial_offset, 4).unpack(PACK_UINT32).first

Expand Down Expand Up @@ -102,17 +99,17 @@ def self.decode_integer(data, offset)
end # self.decode_integer(data, offset)


if BIG_ENDIAN
if AMQ::Endianness.big_endian?
def self.decode_long(data, offset)
v = data.slice(offset, 8).unpack(Q)
v = data.slice(offset, 8).unpack(PACK_INT64)

offset += 8
[v, offset]
end
else
def self.decode_long(data, offset)
slice = data.slice(offset, 8).bytes.to_a.reverse.map(&:chr).join
v = slice.unpack(Q).first
v = slice.unpack(PACK_INT64).first

offset += 8
[v, offset]
Expand Down Expand Up @@ -177,13 +174,13 @@ def self.decode_hash(data, offset)


def self.decode_short_short(data, offset)
v = data.slice(offset, 1).unpack(PACK_SIGNED_8BIT).first
v = data.slice(offset, 1).unpack(PACK_INT8).first
offset += 1
[v, offset]
end

def self.decode_short(data, offset)
v = data.slice(offset, 2).unpack(PACK_SIGNED_16BIT).first
v = AMQ::Hacks.unpack_int16_big_endian(data.slice(offset, 2)).first
offset += 2
[v, offset]
end
Expand Down
16 changes: 8 additions & 8 deletions lib/amq/protocol/type_constants.rb
Expand Up @@ -5,18 +5,18 @@ module Protocol
module TypeConstants
TYPE_STRING = 'S'.freeze
TYPE_INTEGER = 'I'.freeze
TYPE_HASH = 'F'.freeze
TYPE_TIME = 'T'.freeze
TYPE_DECIMAL = 'D'.freeze
TYPE_BOOLEAN = 't'.freeze
TYPE_SIGNED_8BIT = 'c'.freeze
TYPE_SIGNED_16BIT = 's'.freeze
TYPE_SIGNED_64BIT = 'l'.freeze
TYPE_32BIT_FLOAT = 'f'.freeze
TYPE_HASH = 'F'.freeze
TYPE_ARRAY = 'A'.freeze
TYPE_SIGNED_8BIT = 'b'.freeze
TYPE_64BIT_FLOAT = 'd'.freeze
TYPE_VOID = 'V'.freeze
TYPE_32BIT_FLOAT = 'f'.freeze
TYPE_SIGNED_64BIT = 'l'.freeze
TYPE_SIGNED_16BIT = 's'.freeze
TYPE_BOOLEAN = 't'.freeze
TYPE_BYTE_ARRAY = 'x'.freeze
TYPE_ARRAY = 'A'.freeze
TYPE_VOID = 'V'.freeze
TEN = '10'.freeze

BOOLEAN_TRUE = "\x01".freeze
Expand Down
2 changes: 1 addition & 1 deletion lib/amq/protocol/version.rb
@@ -1,5 +1,5 @@
module AMQ
module Protocol
VERSION = "1.3.0.pre1"
VERSION = "1.4.0.pre1"
end # Protocol
end # AMQ

0 comments on commit 9ee82b6

Please sign in to comment.