Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NullPointerException on consumer #167

Open
r0qs opened this issue Jan 20, 2017 · 2 comments
Open

NullPointerException on consumer #167

r0qs opened this issue Jan 20, 2017 · 2 comments

Comments

@r0qs
Copy link

r0qs commented Jan 20, 2017

Hello, I updated my brokers to kafka 0.10.1.1 and now the plugin is throwing the errors below when started.

  • Version: logstash-input-kafka-6.2.x, logstash 2.4 and kafka brokers with version 0.10.1.1
  • Operating System: Amazon Linux AMI release 2015.03
  • Sample Data:

$ tail -n100 /var/log/logstash/logstash.err

Exception in thread "Ruby-0-Thread-36: /opt/logstash/vendor/local_gems/5a1a2485/logstash-input-kafka-6.2.2/lib/logstash/inputs/kafka.rb:231" java.lang.NullPointerException
	at org.apache.kafka.common.record.ByteBufferInputStream.read(org/apache/kafka/common/record/ByteBufferInputStream.java:34)
	at java.util.zip.CheckedInputStream.read(java/util/zip/CheckedInputStream.java:59)
	at java.util.zip.GZIPInputStream.readUByte(java/util/zip/GZIPInputStream.java:266)
	at java.util.zip.GZIPInputStream.readUShort(java/util/zip/GZIPInputStream.java:258)
	at java.util.zip.GZIPInputStream.readHeader(java/util/zip/GZIPInputStream.java:164)
	at java.util.zip.GZIPInputStream.<init>(java/util/zip/GZIPInputStream.java:79)
	at java.util.zip.GZIPInputStream.<init>(java/util/zip/GZIPInputStream.java:91)
	at org.apache.kafka.common.record.Compressor.wrapForInput(org/apache/kafka/common/record/Compressor.java:280)
	at org.apache.kafka.common.record.MemoryRecords$RecordsIterator.<init>(org/apache/kafka/common/record/MemoryRecords.java:247)
	at org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(org/apache/kafka/common/record/MemoryRecords.java:316)
	at org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(org/apache/kafka/common/record/MemoryRecords.java:222)
	at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(org/apache/kafka/common/utils/AbstractIterator.java:79)
	at org.apache.kafka.common.utils.AbstractIterator.hasNext(org/apache/kafka/common/utils/AbstractIterator.java:45)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(org/apache/kafka/clients/consumer/internals/Fetcher.java:685)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(org/apache/kafka/clients/consumer/internals/Fetcher.java:424)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(org/apache/kafka/clients/consumer/KafkaConsumer.java:1045)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(org/apache/kafka/clients/consumer/KafkaConsumer.java:979)
	at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
	at RUBY.thread_runner(/opt/logstash/vendor/local_gems/5a1a2485/logstash-input-kafka-6.2.2/lib/logstash/inputs/kafka.rb:241)
	at java.lang.Thread.run(java/lang/Thread.java:745)
Exception in thread "Ruby-0-Thread-23: /opt/logstash/vendor/local_gems/5a1a2485/logstash-input-kafka-6.2.2/lib/logstash/inputs/kafka.rb:231" java.lang.NullPointerException
	at org.apache.kafka.common.record.ByteBufferInputStream.read(org/apache/kafka/common/record/ByteBufferInputStream.java:34)
	at java.util.zip.CheckedInputStream.read(java/util/zip/CheckedInputStream.java:59)
	at java.util.zip.GZIPInputStream.readUByte(java/util/zip/GZIPInputStream.java:266)
	at java.util.zip.GZIPInputStream.readUShort(java/util/zip/GZIPInputStream.java:258)
	at java.util.zip.GZIPInputStream.readHeader(java/util/zip/GZIPInputStream.java:164)
	at java.util.zip.GZIPInputStream.<init>(java/util/zip/GZIPInputStream.java:79)
	at java.util.zip.GZIPInputStream.<init>(java/util/zip/GZIPInputStream.java:91)
	at org.apache.kafka.common.record.Compressor.wrapForInput(org/apache/kafka/common/record/Compressor.java:280)
	at org.apache.kafka.common.record.MemoryRecords$RecordsIterator.<init>(org/apache/kafka/common/record/MemoryRecords.java:247)
	at org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(org/apache/kafka/common/record/MemoryRecords.java:316)
	at org.apache.kafka.common.record.MemoryRecords$RecordsIterator.makeNext(org/apache/kafka/common/record/MemoryRecords.java:222)
	at org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(org/apache/kafka/common/utils/AbstractIterator.java:79)
	at org.apache.kafka.common.utils.AbstractIterator.hasNext(org/apache/kafka/common/utils/AbstractIterator.java:45)
	at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(org/apache/kafka/clients/consumer/internals/Fetcher.java:685)
	at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(org/apache/kafka/clients/consumer/internals/Fetcher.java:424)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(org/apache/kafka/clients/consumer/KafkaConsumer.java:1045)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(org/apache/kafka/clients/consumer/KafkaConsumer.java:979)
	at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
	at RUBY.thread_runner(/opt/logstash/vendor/local_gems/5a1a2485/logstash-input-kafka-6.2.2/lib/logstash/inputs/kafka.rb:241)
	at java.lang.Thread.run(java/lang/Thread.java:745)

And on kafka brokers are reported this error:

$ tail -n100 /opt/kafka/logs/kafkaServer.out

[2017-01-20 15:40:00,115] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47)
	at org.apache.kafka.common.network.Selector.close(Selector.java:487)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:368)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
	at kafka.network.Processor.poll(SocketServer.scala:476)
	at kafka.network.Processor.run(SocketServer.scala:416)
	at java.lang.Thread.run(Thread.java:745)
[2017-01-20 15:40:00,400] WARN Failed to send SSL Close message  (org.apache.kafka.common.network.SslTransportLayer)
java.io.IOException: Broken pipe
	at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
	at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
	at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
	at sun.nio.ch.IOUtil.write(IOUtil.java:65)
	at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
	at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195)
	at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163)
	at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690)
	at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47)
	at org.apache.kafka.common.network.Selector.close(Selector.java:487)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:368)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
	at kafka.network.Processor.poll(SocketServer.scala:476)
	at kafka.network.Processor.run(SocketServer.scala:416)
	at java.lang.Thread.run(Thread.java:745)

My logstash configuration:

  kafka {
    codec => "json"
    auto_commit_interval_ms => "5000"
    auto_offset_reset => "earliest"
    bootstrap_servers => "broker-kafka001.aws.zup.com.br:9092,broker-kafka002.aws.zup.com.br:9092,broker-kafka003.aws.zup.com.br:9092"
    client_id => "logstash"
    connections_max_idle_ms => "30000"
    consumer_threads => 25
    enable_auto_commit => "true"
    fetch_max_wait_ms => "500"
    fetch_min_bytes => "1000"
    group_id => "event_handler"
    heartbeat_interval_ms => "10000"
    max_partition_fetch_bytes => "60000000"
    session_timeout_ms => "30000"
    poll_timeout_ms => 600
    security_protocol => "SSL"
    ssl_truststore_type => "JKS"
    ssl_truststore_location => "/var/private/ssl/logstash.truststore.jks"
    ssl_truststore_password => "test123"
    ssl_keystore_type => "JKS"
    ssl_keystore_location => "/var/private/ssl/logstash.keystore.jks"
    ssl_keystore_password => "test123"
    ssl_key_password => ""
    topics => ["my-gateway"]
  }

And one of kafka brokers configuration:

[2017-01-18 17:20:39,490] INFO KafkaConfig values: 
        advertised.host.name = null
        advertised.listeners = SSL://broker-kafka003.aws.zup.com.br:9092
        advertised.port = null
        authorizer.class.name = 
        auto.create.topics.enable = false
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.id = -1
        broker.id.generation.enable = true
        broker.rack = null
        compression.type = producer
        connections.max.idle.ms = 600000
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.socket.timeout.ms = 30000
        default.replication.factor = 2
        delete.topic.enable = true
        fetch.purgatory.purge.interval.requests = 1000
        group.max.session.timeout.ms = 300000
        group.min.session.timeout.ms = 6000
        host.name = 
        inter.broker.protocol.version = 0.10.1.1
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listeners = SSL://:9092
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.min.compaction.lag.ms = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /data/kafka_vol01,/data/kafka_vol02
        log.flush.interval.messages = 20000
        log.flush.interval.ms = 10000
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 9223372036854775807
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.format.version = 0.10.1.1
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = 168
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides = 
        message.max.bytes = 30000000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        num.io.threads = 8
        num.network.threads = 8
        num.partitions = 8
        num.recovery.threads.per.data.dir = 1
        num.replica.fetchers = 1
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 1440
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 3
        offsets.topic.segment.bytes = 104857600
        port = 9092
        principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder
        producer.purgatory.purge.interval.requests = 1000
        queued.max.requests = 500
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 300000000
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 10000
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        replication.quota.window.num = 11
        replication.quota.window.size.seconds = 1
        request.timeout.ms = 30000
        reserved.broker.max.id = 1000
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism.inter.broker.protocol = GSSAPI
        security.inter.broker.protocol = SSL
        socket.receive.buffer.bytes = 1048576
        socket.request.max.bytes = 600000000
        socket.send.buffer.bytes = 1048576
        ssl.cipher.suites = null
        ssl.client.auth = required
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = HTTPS
        ssl.key.password = [hidden]
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = /var/private/ssl/kafka.server.keystore.jks
        ssl.keystore.password = [hidden]
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = /var/private/ssl/kafka.server.truststore.jks
        ssl.truststore.password = [hidden]
        ssl.truststore.type = JKS
        unclean.leader.election.enable = true
        zookeeper.connect =zookeeper-zk001.aws.zup.com.br:2181
        zookeeper.connection.timeout.ms = 6000
        zookeeper.session.timeout.ms = 6000
        zookeeper.set.acl = false
        zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
  • Steps to Reproduce:
  1. start kafka brokers with SSL support (I follow this documentation: https://kafka.apache.org/documentation/#security)
  2. Start logstash-input-kafka. I also tested the versions: 6.2.0, 6.2.2 and 6.2.4, in all version I get the error.

Anyone have some hint about what cause this problem?

@r0qs
Copy link
Author

r0qs commented Jan 20, 2017

I added the following configuration in all brokers and the problem seems to disappear:

inter.broker.protocol.version=0.10.1

But this "solution" don't make much sense to me.

@codefritz
Copy link

I had the same issues when consume a topic with compaction enabled and value is null. I will create a fix/pr for it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants