Skip to content
This repository has been archived by the owner on Nov 17, 2020. It is now read-only.

Continuation frames without delivery_id are not handled correctly. #11

Closed
Lutziferbox opened this issue Dec 14, 2017 · 17 comments
Closed
Assignees
Labels
Milestone

Comments

@Lutziferbox
Copy link

Hi,

I am using this rabbitmq-amqp1.0-client in an elixir project to read AMQP messages from an Azure Event Hub.
Yesterday, there appeared some broken message in the queue and the amqp-client did break during handling it. The message looks something like this: "":null,"CorrelationId":null,..." and the amqp-client reacts with the warning log "Unhandled session frame {{:"v1_0.transfer", {:uint, 0}, :undefined, :undefined, :undefined, :undefined, false, :undefined, :undefined, :undefined, :undefined, :undefined},", followed by the frame and "in state {:state, 1, 1, 30588, 65535, 1, 65535, 5000, 34948, #PID<0.5502.0>,".
Afterwards it terminates with "** State machine #PID<0.5511.0> terminating
** Last event in was {{:"v1_0.transfer", {:uint, 0}, {:uint, 30586}, {:binary, ""}, {:uint, 0}, true,
false, :undefined, :undefined, :undefined, :undefined, true},
<<0, 83, 114, 193, 90, 6, 163, 21, 120, 45, 111, 112, 116, 45, 115, 101, 113,
117, 101, 110, 99, 101, 45, 110, 117, 109, 98, 101, 114, 129, 0, 0, 0, 0, 0,
12, 20, 56, 163, 12, 120, 45, 111, 112, 116, 45, 111, 102, 102, ...>>}
** When State == :mapped
** Data == {:state, 1, 1, 30588, 65535, 1, 65535, 5000, 34948, #PID<0.5502.0>,
{:ssl,
{:sslsocket, {:gen_tcp, #Port<0.158324>, :tls_connection, :undefined},
#PID<0.5503.0>}},
%{0 => {:link, "receiver", {:link_ref, :receiver, #PID<0.5511.0>, 0},
:attached, #PID<0.5499.0>, 0, 0, :receiver, :undefined,
{:pid, #PID<0.5499.0>}, 30585, 415, 415, 0, false,
{{:"v1_0.transfer", {:uint, 0}, {:uint, 30585}, {:binary, ""}, {:uint, 0},
true, true, :undefined, :undefined, :undefined, :undefined, true}," followed by the frame again and something like this: "<<0, 83, 114, 193, 90, 6, 163, 21, 120, 45, 111, 112, 116, 45, 115, 101,
113, 117, 101, 110, ...>>]}, :never}}, %{}, %{0 => 0}, 1, [],
%{address: 'xxx', container_id: "xxx",
hostname: "xxx", notify: #PID<0.5499.0>,
outgoing_max_frame_size: 65536, port: 5671,
sasl: {:plain, "xxx", "xxx"},
tls_opts: {:secure_port, []}, transfer_limit_margin: 0}, %{}, %{},
#PID<0.5499.0>}
** Reason for termination =
** {:bad_return_value, {:primitive_type_unsupported, 176}}"

I did not find a way to react to this error from within my wrapping code.

Could you please help me figure out what exactly is going wrong and what to do about it? Thanks a lot in advance!

@Lutziferbox
Copy link
Author

Lutziferbox commented Dec 15, 2017

Hi again,
first of all thank you for your answer and your tipps on how to get support for my issue.
My problem is that I am not using rabbitmq at all but only this client to connect to an Azure Event Hub instead.

My client is running an Elixir umbrella app within a Docker container that uses the rabbitmq-amqp1.0-client like this:

def start_link(partition) do
  GenServer.start_link(__MODULE__, partition)
end

def init(partition) do
  send(self(), :connect)
  {:ok, %{status: :disconnected, receiver: nil, partition: partition}}
end

def handle_info(:connect, %{status: :disconnected, partition: partition} = state) do
  connection_info = %{
    address: 'xxx',
    hostname: "xxx",
    container_id: "xxx",
    port: 5671,
    sasl: {:plain, xxx-policy, xxx-key},
    tls_opts: {:secure_port, []}
  }

  with {:ok, connection} <- :amqp10_client.open_connection(connection_info),
       {:ok, session} <- :amqp10_client.begin_session(connection),
       {:ok, receiver} <- apply(:amqp10_client, :attach_receiver_link, [session, "receiver", partition]),
       :ok <- :amqp10_client.flow_link_credit(receiver, 1000, :never)
  do
    {:noreply, %{state | status: :connected, receiver: receiver}}
  end
end

def handle_info({:amqp10_msg, _link, message}, state) do
  {message_content, message_annotations} = do_something_with_message(message)
  {:noreply, state}
end

The code did work fine until recently when we started to see the following error message:

17:43:18.712 [warn] Unhandled session frame {{:"v1_0.transfer", {:uint, 0}, :undefined, :undefined, :undefined, :undefined, false, :undefined, :undefined, :undefined, :undefined, :undefined}, "xxx-very-long-not-valid-json-string" <> ...} in state {:state, 1, 1, 30588, 65535, 1, 65535, 5000, 34948, #PID<0.5502.0>, {:ssl, {:sslsocket, {:gen_tcp, #Port<0.158324>, :tls_connection, :undefined}, #PID<0.5503.0>}}, %{0 => {:link, "receiver", {:link_ref, :receiver, #PID<0.5511.0>, 0}, :attached, #PID<0.5499.0>, 0, 0, :receiver, :undefined, {:pid, #PID<0.5499.0>}, 30585, 415, 415, 0, false, {{:"v1_0.transfer", {:uint, 0}, {:uint, 30585}, {:binary, ""}, {:uint, 0}, true, true, :undefined, :undefined, :undefined, :undefined, true}, ["xxx-another-very-long-not-valid-json-string (truncated) 17:43:18.717 [error] ** State machine #PID<0.5511.0> terminating ** Last event in was {{:"v1_0.transfer", {:uint, 0}, {:uint, 30586}, {:binary, ""}, {:uint, 0}, true, false, :undefined, :undefined, :undefined, :undefined, true}, <<0, 83, 114, 193, 90, 6, 163, 21, 120, 45, 111, 112, 116, 45, 115, 101, 113, 117, 101, 110, 99, 101, 45, 110, 117, 109, 98, 101, 114, 129, 0, 0, 0, 0, 0, 12, 20, 56, 163, 12, 120, 45, 111, 112, 116, 45, 111, 102, 102, ...>>} ** When State == :mapped ** Data == {:state, 1, 1, 30588, 65535, 1, 65535, 5000, 34948, #PID<0.5502.0>, {:ssl, {:sslsocket, {:gen_tcp, #Port<0.158324>, :tls_connection, :undefined}, #PID<0.5503.0>}}, %{0 => {:link, "receiver", {:link_ref, :receiver, #PID<0.5511.0>, 0}, :attached, #PID<0.5499.0>, 0, 0, :receiver, :undefined, {:pid, #PID<0.5499.0>}, 30585, 415, 415, 0, false, {{:"v1_0.transfer", {:uint, 0}, {:uint, 30585}, {:binary, ""}, {:uint, 0}, true, true, :undefined, :undefined, :undefined, :undefined, true}, ["xxx-another-very-long-not-valid-json-string" <> ..., <<0, 83, 114, 193, 90, 6, 163, 21, 120, 45, 111, 112, 116, 45, 115, 101, 113, 117, 101, 110, ...>>]}, :never}}, %{}, %{0 => 0}, 1, [], %{address: 'xxx', container_id: "xxx", hostname: "xxx", notify: #PID<0.5499.0>, outgoing_max_frame_size: 65536, port: 5671, sasl: {:plain, "xxx", "xxx"}, tls_opts: {:secure_port, []}, transfer_limit_margin: 0}, %{}, %{}, #PID<0.5499.0>} ** Reason for termination = ** {:bad_return_value, {:primitive_type_unsupported, 176}}

I am still working on means to reproduce this issue with a short script, but didn't find any good way to do so yet.

Any help is much appreciated.

@kjnilsson
Copy link
Contributor

We happily try to answer questions on the mailing list about the wider RabbitMQ ecosystem which this client is a part of.

That said what would really help in this case is a network capture of the offending frame (tcpdump -w or wireshark) as well as full stack traces.

@kjnilsson
Copy link
Contributor

kjnilsson commented Dec 15, 2017

Ok I think I know what is going on. The delivery_id of the transfer frame you received isn't set. The client doesn't expect this however according to the spec delivery_id can be omitted for continuation transfers. I think I can see a partial frame in the state dump which means this this is a continuation transfer. I'm not sure why it results in the parsing error at the end but there definitely is a bug around continuation transfers that needs fixing.

@kjnilsson kjnilsson reopened this Dec 15, 2017
@kjnilsson kjnilsson changed the title Error with Unhandled session frame Continuation frames without delivery_id are not handled correctly. Dec 15, 2017
@Lutziferbox
Copy link
Author

Lutziferbox commented Dec 15, 2017

Wow, thank you so much for keeping thinking about this. Sadly I am not a big help in solving this issue since my erlang is absolutely basic, but I really appreciate your work here. I will try to encourage some colleagues of mine to also have a look!
Your assessment of the situation however looks good to me since the message in question was very long and might have been bigger then the allowed frame size.

@lukebakken
Copy link
Contributor

@Lutziferbox - is there a chance you can try out @kjnilsson 's fix in the rabbitmq-amqp1.0-client-11 branch? If there is a packaging step or other assistance I can provide let me know.

@Lutziferbox
Copy link
Author

Hey Luke,
First of all, thanks for the quick action from all fo you.
I pulled the new master and tried processing the broken message again and it looks like the issue still persists. Testing is not very easy at the moment, so it will take me some time to provide the full stacktrace and all additional information.
I‘ll add to this thread as soon as I have more evidence.

@michaelklishin
Copy link
Member

michaelklishin commented Dec 20, 2017 via email

@Lutziferbox
Copy link
Author

My solution is running within an Ubuntu Docker Image.

@kjnilsson
Copy link
Contributor

I just performed a manual test using the QPid amqp broker that like Azure Event Hub also omits the delivery_id for continuation frames and the client works fine with that using current master so I'm hoping the "Unhandled session frame" warning now shouldn't be there. That said there may be some other issue that occurs after that.

@michaelklishin
Copy link
Member

3.8.0-alpha.51 should include #13. @Lutziferbox can you please conduct your test with that artifact and if it goes well, report back here? If it doesn't, let's continue on the mailing list until we find out enough to file a follow-up issue.

@michaelklishin
Copy link
Member

michaelklishin commented Dec 20, 2017

Err, meant to link to 3.7.1-alpha.49 but they are effectively identical at the moment.

@Lutziferbox
Copy link
Author

I created two test projects to test the broken messages with your new version and to extreact the complete message from the stream. They are not completely done yet, so I will need until tomorrow to finish everything and give more detailed feedback. I will write as soon as I have more information.

@Lutziferbox
Copy link
Author

Lutziferbox commented Dec 21, 2017

I finally could reconstruct the full message that was send from the Azure Event Hub which can not be processed in a correct way. From the structure it is not different from the other messages, meaning multiple JSON objects separated by each other with a new line.
However the length is insane. This are 65 different JSON objects glued together with new lines resulting in a single message containing 124340 characters.
I think the real issue here is located somewhere in the decoder not being able to handle a buffer that big or something like that. The message is not meant to be pasted here but also not really confidential so I could send it to you on some other way.

@Lutziferbox
Copy link
Author

Concerning 3.7.1-alpha.49, I am not using any RabbitMQ Server but only the Azure Services and have not set up any other infrastructure. Thats why its so hard to dig into this.

@michaelklishin
Copy link
Member

Moved to #14. We can upload builds of only this client easily. Most people currently use it via the cross-protocol Shovel feature in 3.7.0.

@Lutziferbox
Copy link
Author

Its a little bit late by now, but I can confirm that the error is fixed and the newest version of the rabbitmq-amqp1.0-client can handle even very large messages from the Azure Event Hub.
Thank you so much for your support with this issue!

@kjnilsson
Copy link
Contributor

great, thanks for reporting back.

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

4 participants