Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka 0.9+ consumer support #2487

Merged
merged 1 commit into from
Jun 8, 2017

Conversation

seuf
Copy link
Contributor

@seuf seuf commented Mar 3, 2017

Plugin Input kafka_consumer

This PR allow the kafka_consumer input plugin to connect directly to kafka brokers using the new kafka consumer offset management.
Here is an example of configuraiton

[[inputs.kafka_consumer]]
  ## kafka servers
  brokers = ["localhost:9092"]
  ## topic(s) to consume
  topics = ["telegraf"]

  ## Optional SSL Config
  # ssl_ca = "/etc/telegraf/ca.pem"
  # ssl_cert = "/etc/telegraf/cert.pem"
  # ssl_key = "/etc/telegraf/key.pem"
  ## Use SSL but skip chain & host verification
  # insecure_skip_verify = false

  ## Optional SASL Config
  # sasl_username = "kafka"
  # sasl_password = "secret"

  ## the name of the consumer group
  consumer_group = "telegraf_metrics_consumers"
  ## Offset (must be either "oldest" or "newest")
  offset = "oldest"

  ## Data format to consume.
  ## Each data format has it's own unique set of configuration options, read
  ## more about them here:
  ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
  data_format = "influx"

Note : the old Zookeeper Connection type is no more supported.

Related Issue : #1312

@sparrc
Copy link
Contributor

sparrc commented Mar 6, 2017

Thanks for submitting this @seuf.

First thing is that you'll need to fix the tests that rely on spotify/docker, which IIRC is a kafka 0.8 cluster. We'll probably need to use https://github.com/wurstmeister/kafka-docker instead.

Secondly, instead of completely removing Kafka 0.8 support, we should instead change the current kafka_consumer to be named kafka_consumer_legacy.

Does this need to get ported to the kafka output plugin as well? did anything change for kafka producers in 0.9+?

Since this is a breaking change we're going to have to be a bit more careful about rolling it out. Probably what we want to do is submit a PR for telegraf 1.3 that will warn kafka_consumer users that support for 0.8 and earlier is going to be changed to kafka_consumer_legacy, and that in Telegraf 1.4 the kafka_consumer plugin will only support kafka 0.9+

cc @danielnelson

@sparrc sparrc added this to the 1.4.0 milestone Mar 6, 2017
@danielnelson
Copy link
Contributor

It looks like the code changes are minimal, what are the reasons we can't support both 0.8 and 0.9 in this plugin?

@sparrc
Copy link
Contributor

sparrc commented Mar 7, 2017

We probably could but two issues are that "zookeeper peers" don't apply to Kafka 0.9+, and there isn't a dynamic way of telling which version we are interacting with.

So it would require having a separate broker list, where if the user specified "zookeeper peers" we would use the old consumer library, and if they specified "brokers" then we use the new consumer library.

AFAICT the Kafka project is trying to heavily push users to use 0.9+ clusters. This migration may take some time but IMO it's better if telegraf attempts to be on the front of this rather than maintaining messy cross-compatibility.

This would mirror how Apache is maintaining the official scala kafka consumer, which defaults to only work with "new" clusters, and a CLI flag has to be used to enable "old" clusters: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/ConsoleConsumer.scala#L268

@seuf
Copy link
Contributor Author

seuf commented Mar 13, 2017

Hi.
I've updated the PR to use the wurstmeister/kafka and wurstmeister/zookeeper for testing, but the tests are still failing :(
Do I need to rename the old kafka_consumer plugin into kafka_consumer_legacy for 1.3 release ?

@danielnelson
Copy link
Contributor

For the last test run it looks like it failed because you need to run go fmt.

We have this change marked for the 1.4 milestone, so we don't need to rename the current plugin for the 1.3 release, but we do need to update the 1.3 release notes to warn about the breaking change.

@seuf seuf force-pushed the input-kafka-consumer-sasl-ssl branch 3 times, most recently from 9ed44a8 to 7f0946f Compare March 21, 2017 15:46
@seuf seuf force-pushed the input-kafka-consumer-sasl-ssl branch from 7f0946f to d928b8c Compare April 4, 2017 12:47
@seuf seuf force-pushed the input-kafka-consumer-sasl-ssl branch from d928b8c to 5cf8f1a Compare April 14, 2017 13:22
@seuf
Copy link
Contributor Author

seuf commented Apr 14, 2017

Hi, I've rebased the PR to add the commit 0193cbe

@JulienChampseix
Copy link

@seuf is there a way to test your new version of kafka inputs plugin ? nightly build ?

@seuf
Copy link
Contributor Author

seuf commented Apr 24, 2017

@JulienChampseix I've made a pre release on my github : https://github.com/seuf/telegraf/releases/tag/v1.3.0

@TinaRen
Copy link

TinaRen commented Apr 26, 2017

@seuf I'm able to connect kafka client with kafak broker using SSL, but I‘m failed to communicate telegraf using this kafka-consumer-plugin with the broker.
I have two questions,

  1. in my config of kafka client for ssl security, I use jks cert as below, followed the guide of http://docs.confluent.io/2.0.0/kafka/ssl.html,
    security.protocol=SSL
    ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
    ssl.truststore.password=test1234

but I see in this pull request, I need to specify a different config, are the 3 entries(ssl_ca,ssl_cert,ssl_key) all required? and is it possible for me to change to .jks cert instead in this ssl-kafka-consumer-plugin? Could u pls give me an example to test this plugin?

## Optional SSL Config
# ssl_ca = "/etc/telegraf/ca.pem"
# ssl_cert = "/etc/telegraf/cert.pem"
# ssl_key = "/etc/telegraf/key.pem"
## Use SSL but skip chain & host verification
# insecure_skip_verify = false

@seuf
Copy link
Contributor Author

seuf commented Apr 26, 2017

Hi @TinaRen
the JKS keystore is for Java clients.

The ssl_cert and ssl_key are only required if you want a bi-directional ssl connection. Othewise, just give the ssl_ca parameter with the path of the certificate autority .pem you have imported in your trustore.

For example :

[[inputs.kafka_consumer]]
  brokers = ["localhost:9093"]
  topics = ["telegraf"]
  ssl_ca = "/etc/ssl/certs/ca.pem"
  consumer_group = "telegraf_metrics_consumers"
  offset = "oldest"
  data_format = "influx"

@TinaRen
Copy link

TinaRen commented Apr 28, 2017

Hi @seuf, thanks for your reply.
I'm trying to test the plugin, but I downloaded the source code from pre-release and found your code about ssl is not there, could u pls take a look and made another pre-release? Thanks.
https://github.com/seuf/telegraf/releases/tag/v1.3.0

@ololduck
Copy link

Hey, @seuf, i'm currently testing this MR, and i am running into some error message with the following config:

  [[inputs.kafka_consumer]]
    topics = ["telegraf"]
    brokers = ["ip1:9092", "ip2:9092", ip3:9092]
    consumer_group = "telegraf_metrics_consumers"
    offset = "oldest"
    ssl_ca = "/opt/kafka/kafka.server.truststore.pem"
    ssl_cert = "/opt/kafka/kafka.cer.pem"
    ssl_key = "/opt/kafka/kafka.key.pem"
    insecure_skip_verify = true
    data_format = "influx"

And i am getting the following error:

Could not load TLS client key/certificate from /opt/kafka/kafka.key.pem:/opt/kafka/kafka.cer.pem: tls: failed to parse private key

However, /opt/kafka/kafka.{key,cer}.pem both look correct, and are unencrypted.

What i tried:

  • telnet ip{1,2,3}:9092 works
  • using only the ssl_ca option -> can't connect
  • not using insecure_skip_verify

I am currently in the dark place where you start to wonder if you get the right error message, or if there is a bug in a library.

If you see something obvious, could you point it to me?

Thanks for your work.

@xiaoduo
Copy link

xiaoduo commented May 2, 2017

Hi, @paulollivier I couldn't use this plugin to connet kafka, too.

But my error is different, it always report "kafka ran out of brokers", strangely, the output plugin using the similar library have the same error, too, I even tried directly coding with the Cluster library, seems I couldn't use it and don't know how to use it because my key file has password.

So I re-write this plugin with another library confluent-kafka-go, now it works fine for me, it's in my repositoriy, you could have a try if you also need the SSL connection as kafka consumer.

BTW, I'm using kafka 0.10

@seuf seuf force-pushed the input-kafka-consumer-sasl-ssl branch from 5cf8f1a to 35d1513 Compare May 2, 2017 11:51
@seuf
Copy link
Contributor Author

seuf commented May 2, 2017

Hi @paulollivier Are you sure your kafka is listening on the 9092 port for SSL ? 9092 is the default port for no ssl / no sasl.
I'me able to consume my kafka on 9094 over ssl + sasl with just the ssl_ca option + the sasl_username and the sasl_password. You can try to start telegraf with the --debug option to display more error messages.

@ololduck
Copy link

ololduck commented May 3, 2017

Hi @seuf.

Yeah, i have SSL://0.0.0.0:9092 defined in listeners.

What is event stranger is that my clients can connect, using the same pem files. All clients but telegraf.

I did not enable SASL.

@xiaoduo: i will have a look at your project.

@jackzampolin
Copy link
Contributor

jackzampolin commented May 25, 2017

I've tried this with the default brew install of kafka (0.10.2.0) and this branch of telegraf:

brew install kafka
brew services start kafka
brew services start zookeeper
telegraf -sample-config -input-filter cpu:mem:disk -output-filter kafka > producer.conf
telegraf -sample-config -input-filter kafka_consumer -output-filter influxdb > consumer.conf
influxd
telegraf -config producer.conf -debug
telegraf -config consumer.conf -debug

Works great!

@danielnelson
Copy link
Contributor

@paulollivier The Could not load TLS client key/certificate from log message comes from the shared TLS loader used across all of Telegraf. Since I haven't heard other issues with keys I think it must be caused by the private key you are using. The paths are essentially passed into https://golang.org/pkg/crypto/tls/#LoadX509KeyPair, I hope this helps with debugging.

@danielnelson
Copy link
Contributor

@seuf Now that 1.3 is out we can go forward with renaming the old version, include a note in the new README pointing 0.8 users to it. We should also add the new dependency to the list of dependency licenses.

@ololduck
Copy link

@danielnelson: triple-checked, my keys are in x509 pem format :/
i will try with a whole new arch/keys, thanks for the input.

@seuf
Copy link
Contributor Author

seuf commented May 29, 2017

@danielnelson OK, I have :

  • renamed the kafka_consumer from the master branch to kafka_consumer_legacy,
  • Added a message in the new README file to point 0.8 users to it,
  • Updated the list of dependencies,
  • Updated the changelog,
  • rebased + squashed over origin master

Don't know why, but the circle tests are failling when kafka_consumer_legacy produce a message to kafka.
kafka_consumer and kafka_consumer_legacy are using the same code for message producing in integration tests. Any Idea ?

@seuf seuf force-pushed the input-kafka-consumer-sasl-ssl branch 3 times, most recently from 06e63e6 to ded031c Compare May 29, 2017 09:33
@nfirvine
Copy link
Contributor

nfirvine commented May 30, 2017

Can we clarify the name of this bug to mention the new kafka consumer format? (To me at least, that's the more important change.)

(Also, thanks @seuf for your hard work :) )

@danielnelson
Copy link
Contributor

@seuf Thanks for doing this, code looks awesome. I'll try to figure out the unit tests tomorrow, can you come up with a better name for the issue and to use in the changelog though? It's okay if we list the item twice if needed.

@seuf seuf changed the title Added SSL and SASL support for input plugin kafka_consumer Add Kafka 0.9+ consumer support May 31, 2017
@seuf seuf force-pushed the input-kafka-consumer-sasl-ssl branch from ded031c to 7a6606b Compare May 31, 2017 09:52
@danielnelson
Copy link
Contributor

Didn't have enough time to test but maybe we need to add back the spotify/docker container for the legacy version?

Use wurstmeister/kafka docker images for input kafka_consumer tests
@seuf seuf force-pushed the input-kafka-consumer-sasl-ssl branch from 7a6606b to 1e7666a Compare June 1, 2017 09:48
@danielnelson
Copy link
Contributor

I tried to run both containers but for some reason I can't get it to work on circleci by just changing the ports, despite having no problem on my laptop. I'm going to add an unconditional skip to the legacy test after merging as I'm not sure how to get them both working at this time. Perhaps when we move to circleci 2.0 it will work again.

@danielnelson danielnelson merged commit 2092443 into influxdata:master Jun 8, 2017
jeichorn pushed a commit to jeichorn/telegraf that referenced this pull request Jul 24, 2017
maxunt pushed a commit that referenced this pull request Jun 26, 2018
@bigdatasunil
Copy link

I have Kerberos kafka cluster, would like use telegraf/kafka_consumer input plug in for consuming messages to influx db. I could not be able to find right config for this, there are lot of questions on this none of them are solving my problem.
Could anyone give me right configuration ??
i am using telegraf version 1.14.4
I also read a lot about sarama api which supports GSSAPI_Kerberos. My doubt is my current version of telegraf will support this sarama reafure or not?
if yes, what would the telegraf configuration for this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

10 participants