Permalink
Browse files

learn message length and create large enough buffer

  • Loading branch information...
1 parent d1e1d9d commit 6d309d3c648ff64b1df620603d7b07e53b49e658 @snscaimito committed Feb 16, 2012
Showing with 61 additions and 22 deletions.
  1. +3 −1 lib/rmq/constants.rb
  2. +3 −1 lib/rmq/message_descriptor.rb
  3. +35 −11 lib/rmq/mqclient.rb
  4. +16 −9 lib/rmq/queue.rb
  5. +4 −0 spec/rmq/queue_spec.rb
View
@@ -773,6 +773,8 @@ def decode_completion_code(completion_code)
completion_codes[completion_code]
end
+ MQRC_TRUNCATED_MSG_FAILED = 2080
+
def decode_reason_code(reason_code)
reason_codes = {
0 => :MQRC_NONE,
@@ -781,7 +783,7 @@ def decode_reason_code(reason_code)
2033 => :MQRC_NO_MSG_AVAILABLE,
2037 => :MQRC_NOT_OPEN_FOR_INPUT,
2079 => :MQRC_TRUNCATED_MSG_ACCEPTED,
- 2080 => :MQRC_TRUNCATED_MSG_FAILED,
+ MQRC_TRUNCATED_MSG_FAILED => :MQRC_TRUNCATED_MSG_FAILED,
2186 => :MQRC_GMO_ERROR,
2309 => :MQRC_SELECTOR_NOT_PRESENT,
2320 => :MQRC_HBAG_ERROR,
@@ -9,6 +9,8 @@ class MessageDescriptor < FFI::Struct
MQMT_DATAGRAM = 8
MQPER_PERSISTENT = 1
+ MSG_ID_LENGTH = 24
+
layout :StrucId, [:char, 4],
:Version, :long,
:Report, :long,
@@ -20,7 +22,7 @@ class MessageDescriptor < FFI::Struct
:Format, [:char, 8],
:Priority, :long,
:Persistence, :long,
- :MsgId, [:char, 24],
+ :MsgId, [:char, MSG_ID_LENGTH],
:CorrelId, [:char, 24],
:BackoutCount, :long,
:ReplyToQ, [:char, 48],
View
@@ -252,33 +252,44 @@ def queue_depth(connection_handle, queue_name)
end
def get_message_from_queue(connection_handle, queue_handle, timeout)
+ puts "--- get message from queue"
completion_code_ptr = FFI::MemoryPointer.new :long
reason_code_ptr = FFI::MemoryPointer.new :long
data_length_ptr = FFI::MemoryPointer.new :long
- message_options = prepare_get_message_options(timeout)
+ message_options = prepare_get_message_options(timeout, false)
message_descriptor = prepare_get_message_descriptor
- # TODO determine message length and then reissue call
-# mqget(connection_handle, queue_handle, message_descriptor, message_options, 0, nil, data_length_ptr, completion_code_ptr, reason_code_ptr)
-# raise RMQException.new(completion_code_ptr.read_long, reason_code_ptr.read_long), "Cannot learn message length" if completion_code_ptr.read_long == MQCC_FAILED
-
-# data_length = data_length_ptr.read_long
- data_length = 8192
+ data_length = 8 * 1024
buffer_ptr = FFI::MemoryPointer.new :char, data_length
+
mqget(connection_handle, queue_handle, message_descriptor, message_options, data_length, buffer_ptr, data_length_ptr, completion_code_ptr, reason_code_ptr)
- raise RMQException.new(completion_code_ptr.read_long, reason_code_ptr.read_long), "Cannot learn message length" if completion_code_ptr.read_long == MQCC_FAILED
+
+ if (completion_code_ptr.read_long == MQCC_WARNING && reason_code_ptr.read_long == MQRC_TRUNCATED_MSG_FAILED)
+ msg_id = message_descriptor[:MsgId]
+ data_length = data_length_ptr.read_long
+ buffer_ptr = FFI::MemoryPointer.new :char, data_length
+ message_descriptor = prepare_get_message_descriptor(msg_id)
+ mqget(connection_handle, queue_handle, message_descriptor, message_options, data_length, buffer_ptr, data_length_ptr, completion_code_ptr, reason_code_ptr)
+ raise RMQException.new(completion_code_ptr.read_long, reason_code_ptr.read_long), "Cannot read message again after learning message length" if completion_code_ptr.read_long == MQCC_FAILED
+ else
+ raise RMQException.new(completion_code_ptr.read_long, reason_code_ptr.read_long), "Cannot learn message length" unless completion_code_ptr.read_long == MQCC_OK
+ end
Message.new(buffer_ptr.read_string, message_descriptor)
end
private
- def prepare_get_message_options(timeout)
+ def prepare_get_message_options(timeout, accept_truncated_msg)
message_options = GetMessageOptions.new
message_options[:StrucId] = GetMessageOptions::MQGMO_STRUC_ID
message_options[:Version] = GetMessageOptions::MQGMO_VERSION_1
- message_options[:Options] = GetMessageOptions::MQGMO_ACCEPT_TRUNCATED_MSG
+ message_options[:Options] = 0
+
+ if accept_truncated_msg
+ message_options[:Options] = GetMessageOptions::MQGMO_ACCEPT_TRUNCATED_MSG | message_options[:Options]
+ end
if timeout > 0
message_options[:Options] = GetMessageOptions::MQGMO_WAIT | message_options[:Options]
@@ -288,11 +299,24 @@ def prepare_get_message_options(timeout)
message_options
end
- def prepare_get_message_descriptor
+ def print_msg_id(msg_id)
+ puts "printing msg id"
+ for i in (0..MQClient::MessageDescriptor::MSG_ID_LENGTH-1) do
+ puts "#{i} = #{msg_id[i]}"
+ end
+ end
+
+ def prepare_get_message_descriptor(msg_id = nil)
message_descriptor = MQClient::MessageDescriptor.new
message_descriptor[:StrucId] = MQClient::MessageDescriptor::MQMD_STRUC_ID
message_descriptor[:Version] = MQClient::MessageDescriptor::MQMD_VERSION_1
+ if !msg_id.nil?
+ for i in (0..MQClient::MessageDescriptor::MSG_ID_LENGTH-1) do
+ message_descriptor[:MsgId][i] = msg_id[i]
+ end
+ end
+
message_descriptor
end
View
@@ -9,6 +9,7 @@ def initialize(queue_manager, name)
@queue_name = name
end
+ # Puts a message onto the queue. A reply to queue name can be specified.
def put_message(payload, reply_queue_name = "")
@queue_handle = open_queue(@queue_manager.connection_handle, @queue_name, Constants::MQOO_OUTPUT) if @queue_handle.nil?
@@ -22,16 +23,20 @@ def depth
queue_depth(@queue_manager.connection_handle, @queue_name)
end
- # timeout is in seconds
+ # Gets a message from the queue. A timeout period can be specified in seconds.
def get_message(timeout = 0)
@queue_handle = open_queue(@queue_manager.connection_handle, @queue_name, Constants::MQOO_INPUT_SHARED) if @queue_handle.nil?
- begin_time = Time.now.to_i
- begin
- message = get_message_from_queue(@queue_manager.connection_handle, @queue_handle, timeout)
- rescue RMQException
- end_time = Time.now.to_i
- raise RMQTimeOutError.new if end_time - begin_time >= timeout
+ if (timeout > 0)
+ begin_time = Time.now.to_i
+ begin
+ message = get_message_from_queue(@queue_manager.connection_handle, @queue_handle, timeout)
+ rescue RMQException
+ end_time = Time.now.to_i
+ raise RMQTimeOutError.new if end_time - begin_time >= timeout
+ end
+ else
+ message = get_message_from_queue(@queue_manager.connection_handle, @queue_handle, 0)
end
close_queue(@queue_manager.connection_handle, @queue_handle, Constants::MQCO_NONE)
@@ -40,8 +45,10 @@ def get_message(timeout = 0)
message
end
- def get_message_payload
- message = get_message
+ # Gets a message from the queue and returns the payload only. A timeout period can be specified
+ # in seconds.
+ def get_message_payload(timeout = 0)
+ message = get_message(timeout)
message.payload
end
end
View
@@ -40,4 +40,8 @@
lambda { @queue.get_message(2) }.should raise_error(RMQ::RMQTimeOutError)
end
+ it "should time out waiting for a message and not return any payload" do
+ lambda { @queue.get_message_payload(2) }.should raise_error(RMQ::RMQTimeOutError)
+ end
+
end

0 comments on commit 6d309d3

Please sign in to comment.