-
Notifications
You must be signed in to change notification settings - Fork 336
Description
I recently came across the MessageSizeTooLarge
error response from kafka, which I think it's not being properly handled by the ruby-kafka
, at least in the async call from the producer. Currently it seems to be disconnecting the producer, thus triggering a bunch on other issues and ultimately 500s. The synchronized call seems to be handling the error fine.
- Version of Ruby: 2.2.3, 2.3.3
- Version of Kafka: 0.10.1.0
- Version of ruby-kafka: 0.3, master
Steps to reproduce
When calling it synchronously, the exception is raised properly without affecting the kafka connection:
2.2.3 (main):0 > k = Kafka.new(logger: Logger.new(STDOUT), seed_brokers: [ 'localhost'], client_id: "my-application")
2.2.3 (main):0 > k.deliver_message("A"*1500000, topic: "development_events")
I, [2017-05-24T17:36:14.573805 #70030] INFO -- : New topics added to target list: development_events
I, [2017-05-24T17:36:14.573922 #70030] INFO -- : Fetching cluster metadata from kafka://localhost:9092
D, [2017-05-24T17:36:14.574013 #70030] DEBUG -- : Opening connection to localhost:9092 with client id my-application...
D, [2017-05-24T17:36:14.576325 #70030] DEBUG -- : Sending request 1 to localhost:9092
D, [2017-05-24T17:36:14.576523 #70030] DEBUG -- : Waiting for response 1 from localhost:9092
D, [2017-05-24T17:36:14.580413 #70030] DEBUG -- : Received response 1 from localhost:9092
I, [2017-05-24T17:36:14.580479 #70030] INFO -- : Discovered cluster metadata; nodes: localhost:9092 (node_id=0)
D, [2017-05-24T17:36:14.580559 #70030] DEBUG -- : Closing socket to localhost:9092
D, [2017-05-24T17:36:14.580825 #70030] DEBUG -- : Current leader for development_events/0 is node localhost:9092 (node_id=0)
I, [2017-05-24T17:36:14.580913 #70030] INFO -- : Sending 1 messages to localhost:9092 (node_id=0)
D, [2017-05-24T17:36:14.580956 #70030] DEBUG -- : Opening connection to localhost:9092 with client id my-application...
D, [2017-05-24T17:36:14.581860 #70030] DEBUG -- : Sending request 1 to localhost:9092
D, [2017-05-24T17:36:14.587700 #70030] DEBUG -- : Waiting for response 1 from localhost:9092
D, [2017-05-24T17:36:14.597992 #70030] DEBUG -- : Received response 1 from localhost:9092
Kafka::MessageSizeTooLarge: Kafka::MessageSizeTooLarge
from /Users/joao/dev/ruby-kafka/lib/kafka/protocol.rb:50:in `handle_error'
(...)
However, when calling the async version, the exception gets raised and the connection to the broker is terminated:
2.2.3 (main):0 > k = Kafka.new(logger: Logger.new(STDOUT), seed_brokers: [ 'localhost'], client_id: "my-application")
2.2.3 (main):0 > p = k.async_producer
2.2.3 (main):0 > p.produce("A"*1500000, topic: "development_events")
=> nil
2.2.3 (main):0 > p.deliver_messages
I, [2017-05-24T17:46:22.724993 #70268] INFO -- : New topics added to target list: development_events
I, [2017-05-24T17:46:22.725138 #70268] INFO -- : Fetching cluster metadata from kafka://localhost:9092
D, [2017-05-24T17:46:22.725322 #70268] DEBUG -- : Opening connection to localhost:9092 with client id my-application...
=> nil
2.2.3 (main):0 > D, [2017-05-24T17:46:22.727822 #70268] DEBUG -- : Sending request 1 to localhost:9092
D, [2017-05-24T17:46:22.728048 #70268] DEBUG -- : Waiting for response 1 from localhost:9092
D, [2017-05-24T17:46:22.731578 #70268] DEBUG -- : Received response 1 from localhost:9092
I, [2017-05-24T17:46:22.731628 #70268] INFO -- : Discovered cluster metadata; nodes: localhost:9092 (node_id=0)
D, [2017-05-24T17:46:22.731651 #70268] DEBUG -- : Closing socket to localhost:9092
D, [2017-05-24T17:46:22.731870 #70268] DEBUG -- : Current leader for development_events/0 is node localhost:9092 (node_id=0)
I, [2017-05-24T17:46:22.731937 #70268] INFO -- : Sending 1 messages to localhost:9092 (node_id=0)
D, [2017-05-24T17:46:22.731989 #70268] DEBUG -- : Opening connection to localhost:9092 with client id my-application...
D, [2017-05-24T17:46:22.732612 #70268] DEBUG -- : Sending request 1 to localhost:9092
D, [2017-05-24T17:46:22.738458 #70268] DEBUG -- : Waiting for response 1 from localhost:9092
D, [2017-05-24T17:46:22.747448 #70268] DEBUG -- : Received response 1 from localhost:9092
I, [2017-05-24T17:46:22.747647 #70268] INFO -- : Disconnecting broker 0
D, [2017-05-24T17:46:22.747681 #70268] DEBUG -- : Closing socket to localhost:9092
Error: Kafka::MessageSizeTooLarge
/Users/joao/dev/ruby-kafka/lib/kafka/protocol.rb:50:in `handle_error'(...)
Expected outcome
When using the async call, the connection shouldn't be terminated when kafka returns MessageSizeTooLarge
error. Behavior should be similar to the non-async version.
Actual outcome
When using the async call, the connection gets terminated when kafka returns MessageSizeTooLarge
error.