Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problems detecting when Kafka server is down #48

Open
porcobravo opened this issue Mar 11, 2016 · 13 comments
Open

Problems detecting when Kafka server is down #48

porcobravo opened this issue Mar 11, 2016 · 13 comments

Comments

@porcobravo
Copy link

Hello,

Recently the Kafka server I was outputting logstash was down (only Kafka, Zookeeper was running). I'm using http input plugin, but instead of returning timeout (because of the logstash internal queue is full because kafka output plugin can't deliver messages to Kafka), it returned status 200, so the client pushing events thinks everything seems ok, resulting in a loss of data.

I already asked about this topic in http input plugin project (see logstash-plugins/logstash-input-http#48), but it seems it could be a problem related to this plugin.

Thanks for your help.

@porcobravo
Copy link
Author

Hello,

Did anyone managed to reproduce the issue?

Thanks.

@joekiller
Copy link
Collaborator

My guess is with retries at 0 the sends fail and Kafka drops the message
with an exception but since the plugin is catching errors and printing a
warning you don't see it unless it is verbose.

The exceptions should print to error instead to make error detection easier.
On Apr 8, 2016 8:38 AM, "porcobravo" notifications@github.com wrote:

Hello,

Did anyone managed to reproduce the issue?

Thanks.


You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub
#48 (comment)

@porcobravo
Copy link
Author

Thanks for the answer, I tested with --debug so I could see what happened.

Indeed there was an exception:

log4j, [2016-04-11T13:49:08.353]  WARN: org.apache.kafka.common.network.Selector: Error in I/O with datapipe-queue-01.cern.ch/188.184.65.67
java.net.ConnectException: Connection refused
        at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
        at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
        at java.lang.Thread.run(Thread.java:745)
log4j, [2016-04-11T13:49:08.361] DEBUG: org.apache.kafka.clients.NetworkClient: Node 0 disconnected.

Error in reactor loop escaped: Bad file descriptor - Bad file descriptor (Errno::EBADF)
org/jruby/RubyIO.java:3682:in `select'
/opt/logstash/vendor/bundle/jruby/1.9/gems/puma-2.16.0-java/lib/puma/reactor.rb:29:in `run_internal'
/opt/logstash/vendor/bundle/jruby/1.9/gems/puma-2.16.0-java/lib/puma/reactor.rb:138:in `run_in_thread'

But the fact is that the client is not noticing the error so I potentially could miss messages. From the error message would you say that this is something related to the Kafka output plugin? Or maybe this is something related to the input plugin?

@talevy
Copy link
Contributor

talevy commented Apr 13, 2016

The input plugin is relying on the underlying Kafka 0.8 Java Consumer Client. This updated client runs connection attempts in the background in a loop and catches this exception, and then attempts to reconnect. The first thing it does in this loop is to check for new metadata.

If you set metadata_max_age_ms to something much smaller than a minute (the default), you should see that error out and Logstash will appropriately catch the exception and fail.

it is unfortunate, but that is how it is built under the hood. I am working on working around this so we can more reasonably find ourselves in this situation.

let me know if that makes sense!

@JNachtwey
Copy link

We have same issue with logstash and logstash-output-kafka plugin.
Logstash loses messages when the kafka-broker is not available.
This issue is reproduceable.

our environment and tested versions:

Logstash 2.3.0, 2.3.2, 2.3.4
Logstash-output-kafka: 2.0.3 and 3.0.0.beta1
Kafka-Broker: 0.9.0.1
Filebeat 1.2.2

Tested pipeline (only single instances):
Filebeat->Logstash->KafkaBroker->KafkaConsolenConsumer

Filebeat listens to a file (also tested with logstash file-inputplugin)
Logstash Config for output:

output {
   kafka {
       bootstrap_server => "servername"
       client_id => "logstash"
       topic_id => "topic_name"
    }
}

tested scenario

Scenario: Pipeline is working
pipe message to file
Filebeat catches message
Logstash receives message
Kafka-Broker receives message
KafkaConsolenConsumer receives sent message
==> Working

Scenario: Shutting down Kafka-Broker from scenario above and afterwards Kafka-Broker start
shut down Kafka-Broker (kafka stop)
pipe message to file
Filebeat catches message
Logstash receives message
Kafka-Broker is down
KafkaConsolenConsumer receives no message

start Kafka-Broker (kafka start)
KafkaConsolenConsumer receives still no message
==> sent message is not received

We expected that:

  • no message will get lost
  • the beats-input-plugin from logstash will block filebeat.

We also tried different logstash configurations.

output {
    kafka {
        bootstrap_server => "servername"
        client_id => "logstash"
        topic_id => "topic_name"
        acks => "all"
        block_on_buffer_full => true
        metadata_max_age_ms => 1000
        retries => 10000
        retry_backoff_ms => 1000
        timeout_ms => 1000
    }
}

==> retries in combination with retry_backoff_ms enable to buffer messages during the retry process
==> still no blocking of filebeat
==> ugly workaround to lose no messages

We also tried block_on_buffer_full => true in combination with a small buffer_memory size (100 Bytes).
We expected that:

  • no message will get lost
  • the beats-input-plugin from logstash is blocking filebeat.
    But all messages got lost and filebeat was not blocked. block_on_buffer_full is not working.

@JNachtwey
Copy link

We also tested the following scenario:

Scenario: Kafka-Broker will start after Logstash and filebeat is running
Kafka-Broker is stopped
start Filebeat
start Logstash
pipe message to file
Filebeat catches message
Logstash receives message
Logstash detect that Kafka-Broker is not running
Logstash reject new messages from filebeat
Log from logstash:

CircuitBreaker::rescuing exceptions {"msg and metadata", :name=>"Beats input", :exception=>LogStash::Inputs::Beats::InsertingToQueueTakeTooLong, :level=>:warn, :file=>"logstash/inputs/beats_support/circuit_breaker.rb", :line=>"53", :method=>"execute"}
LogStash::Inputs::Beats::InsertingToQueueTakeTooLong {:errors_count=>2, :error_threshold=>5, :exception=>LogStash::Inputs::Beats::InsertingToQueueTakeTooLong, :level=>:debug, :file=>"logstash/inputs/beats_support/circuit_breaker.rb", :line=>"85", :method=>"increment_errors"}
Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover. {:exception=>LogStash::Inputs::BeatsSupport::CircuitBreaker::HalfOpenBreaker, :level=>:warn, :file=>"logstash/inputs/beats.rb", :line=>"220", :method=>"handle_new_connection"}

After the Kafka-Broker is started Logstash sends buffered messages and opens a connection to filebeat.
But in our test we send 6500 messages over filebeat and received about 7000 messages in kafka.
We haven´t tested yet if all unique 6500 messages were send to kafka or logstash lost messages.

@talevy talevy self-assigned this Jul 19, 2016
@talevy
Copy link
Contributor

talevy commented Jul 19, 2016

thank you for reporting this issue! I will take a look at this shortly

@anuragbsb
Copy link

Any update on this ??

@jordansissel
Copy link
Contributor

@anuragbsb This issue is used to track all activity we do on this issue, so no comments yet probably means no updates yet.

Also, it appears the description of this issue is specific to the http input behavior, not the Kafka output. i would like to close this issue because it is not a Kafka issue.

@anuragbsb
Copy link

anuragbsb commented Feb 4, 2017

@jordansissel seems to be a logstash issue, how do I prevent logstash from processing further input (from file in my case) when my output (kafka) is down.
Presently this causes data loss, as logstash keeps updating sincedb for the file even if the message is not delivered to kafka.

@anuragbsb
Copy link

anuragbsb commented Feb 5, 2017

A temp solution for this, till logstash implements 'acking'
buffer_memory => 56384 ( keep this small, 56384 in my case, but > 16384 if batch_size is kept default)
so the memory buffer will be exhausted soon and will stop accepting new records as block_on_buffer_full is true by default, thus sincedb won't get updated.
retries => 99999 (can even keep MAX.LONG, this will keep retrying till it finds a broker up and by this time the buffer memory will get exhausted if your input rate is high and logstash will stop accepting new records.
You can also change retry_backoff_ms and keep acks => "all" if required and have a proper trade off b/w buffer_memory and retries :)

output {
kafka {
bootstrap_servers => "localhost:9092"
key_serializer => "org.apache.kafka.common.serialization.StringSerializer"
value_serializer => "org.apache.kafka.common.serialization.StringSerializer"
topic_id => "test123"
codec => plain {
format => "%{message}"
}
buffer_memory => 56384
retries => 99999
}
}

@talevy talevy removed their assignment May 15, 2017
@magazov
Copy link

magazov commented Feb 6, 2018

@jordansissel, does #151 solves the issue?

@spattnaik2311
Copy link

Any update on this? What can I do to prevent data loss in Logstash if output Kafka is down?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

9 participants