Skip to content
This repository has been archived by the owner on Dec 1, 2018. It is now read-only.

kafka sink #648

Closed
wants to merge 4 commits into from
Closed

kafka sink #648

wants to merge 4 commits into from

Conversation

huangyuqi
Copy link

Apache Kafka is publish-subscribe messaging rethought as a distributed commit log.
Kafka sink just transforms the monitoring metrics and events to kafka message with two topics: timeseriestopic, eventstopic.

To use the kafka sink add the following flag:

--sink=kafka:<KAFKA_SERVER_URL>[?<OPTIONS>]

Normally, kafka server has multi brokers, so brokers' list need be configured for producer.
So, we can set KAFKA_SERVER_URL to a dummy value, and provide kafka brokers' list in url's query string.
Besides,the following options need be set in query string:

  • timeseriestopic - Kafka's topic for timeseries
  • eventstopic - Kafka's topic for events

Like this:

--sink=kafka:http://kafka/?brokers=0.0.0.0:9092&brokers=0.0.0.0:9093&timeseriestopic=test&eventstopic=test 

@cadvisorJenkinsBot
Copy link

Can one of the admins verify this patch?

@googlebot
Copy link

We found a Contributor License Agreement for you (the sender of this pull request) and all commit authors, but as best as we can tell these commits were authored by someone else. If that's the case, please add them to this pull request and have them confirm that they're okay with these commits being contributed to Google. If we're mistaken and you did author these commits, just reply here to confirm.

1 similar comment
@googlebot
Copy link

We found a Contributor License Agreement for you (the sender of this pull request) and all commit authors, but as best as we can tell these commits were authored by someone else. If that's the case, please add them to this pull request and have them confirm that they're okay with these commits being contributed to Google. If we're mistaken and you did author these commits, just reply here to confirm.

@vishh
Copy link
Contributor

vishh commented Oct 12, 2015

ok to test

@huangyuqi huangyuqi closed this Oct 13, 2015
@huangyuqi huangyuqi reopened this Oct 13, 2015
@googlebot
Copy link

CLAs look good, thanks!

1 similar comment
@googlebot
Copy link

CLAs look good, thanks!

@vishh vishh closed this Oct 13, 2015
@vishh vishh reopened this Oct 13, 2015
@vishh vishh closed this Oct 15, 2015
@cadvisorJenkinsBot
Copy link

Can one of the admins verify this patch?

@vishh vishh reopened this Oct 15, 2015
@vishh
Copy link
Contributor

vishh commented Oct 15, 2015

ok to test

On Wed, Oct 14, 2015 at 5:00 PM, cadvisorJenkinsBot <
notifications@github.com> wrote:

Can one of the admins verify this patch?


Reply to this email directly or view it on GitHub
#648 (comment).

@huangyuqi
Copy link
Author

Thanks , @vishh , would you please kindly review this PR? : )

@mvdan
Copy link
Contributor

mvdan commented Oct 16, 2015

@huangyuqi if this PR is ready on your end, please remove WIP from the title

To use the kafka sink add the following flag:

```
--sink=kafka:<KAFKA_SERVER_URL>[?<OPTIONS>]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please indent via four spaces if it's just one line.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mvdan ,thanks for your comments so much. The adding lines of Kafka sink are copied from the above sinks to keep pace with their format. So i think these lines are not one line, i will retain this. : )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the format was changed weeks ago, the other sink configuration lines are indented with four spaces.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks so much. : )

@huangyuqi huangyuqi changed the title kafka sink (WIP) kafka sink Oct 20, 2015
@mvdan
Copy link
Contributor

mvdan commented Oct 20, 2015

You applied some of the fixes by amending your original commit, i.e. rebasing the history. Please don't do that until the PR is ready, so that the history of changes is kept.

return nil, fmt.Errorf("failed to parser url's query string: %s", err)
}

if len(opts["timeseriestopic"]) < 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really nitpicky, but usually people test if strings are empty like if s == ""

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mvdan , thanks for your attentive comments. Here, I think the opt of query string is often an array, so i test it by len(). Do you agree with me? ^_^

@mvdan
Copy link
Contributor

mvdan commented Oct 21, 2015

Thanks for all the fixes! The code looks a lot better now. Someone with a better understanding of kafka can probably speak as to the quality of the implementation.

@huangyuqi
Copy link
Author

Thanks @mvdan for your meticulous comments.

@mvdan
Copy link
Contributor

mvdan commented Oct 22, 2015

Note that four comments are still standing - they will be hidden once fixed.

@huangyuqi
Copy link
Author

@mvdan , thanks for your attentive comments. In the four remaining comments:

  1. kafka sink  #648 (diff) : i have dealed with goimports. ^-^
    2~3. kafka sink  #648 (diff) kafka sink  #648 (diff) i have removed the corresponding testcase, so it may not be hidden here. ^-^
  2. kafka sink  #648 (diff) I think the opt of query string is often an array, so i test it by len().

if err != nil {
return fmt.Errorf("failed to transform the items to json : %s", err)
}
message := &proto.Message{Value: []byte(string(jsonItems))}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are exposing an internal data structure TimeSeries over the wire here. If we update the data structure in the future, this will break all client of this sink. That is not desirable.

What is the minimum amount of data that you need? Are there any standard formats for exposing metrics and events kind of data through kafka?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your comments. @vishh .
I need all the ops information, includes metrics and events.
Exposing an internal data structure TimeSeries over the wire is not well advised. I will pick out the effective information about metrics and events to give the kafka sink.Include: timestamp, value of metrics and events.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM. You can create a versioned structure in this sink.

On Mon, Oct 26, 2015 at 4:32 AM, yuqi huang notifications@github.com
wrote:

In sinks/kafka/driver.go
#648 (comment):

  •       return fmt.Errorf("failed to produce Kafka messages: %s", err)
    
  •   }
    
  • }
  • return nil
    +}

+// produceKafkaMessage produces messages to kafka
+func (self *kafkaSink) produceKafkaMessage(v interface{}, topic string) error {

  • if v == nil {
  •   return nil
    
  • }
  • jsonItems, err := json.Marshal(v)
  • if err != nil {
  •   return fmt.Errorf("failed to transform the items to json : %s", err)
    
  • }
  • message := &proto.Message{Value: []byte(string(jsonItems))}

Thanks for your comments. @vishh https://github.com/vishh .
I need all the ops information, includes metrics and events.
Exposing an internal data structure TimeSeries over the wire is not well
advised. I will pick out the effective information about metrics and events
to give the kafka sink.Include: timestamp, value of metrics and events.


Reply to this email directly or view it on GitHub
https://github.com/kubernetes/heapster/pull/648/files#r42982284.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vishh , Sorry for the delay. I have add two internal data structures for kafka sink. Would you please check again? Thanks ^_^

@vishh
Copy link
Contributor

vishh commented Oct 24, 2015

Sorry for the delay @huangyuqi! Thanks for the PR! I have posted a couple of comments. Other than that, this PR LGTM. Lets get this in quickly.

@vishh
Copy link
Contributor

vishh commented Oct 31, 2015

@huangyuqi: Is this PR ready for review? Have you addressed the pending comments?

@huangyuqi
Copy link
Author

@vishh : this PR is ready, I have fixed all the comments. : )

@huangyuqi
Copy link
Author

@vishh , Would you please review the PR of kafka sink for some modification about data structure and kafka's client? : )
BTW, Our OPS subsystem uses the ES to collect the metrics and events, so i realize a simple ES's sink. May I take another new PR about this ?

@vishh
Copy link
Contributor

vishh commented Nov 2, 2015

@vishh , Would you please review the PR of kafka sink for some modification about data structure and kafka's client? : )

The part about sink setup isn't clear to me yet. Once you clarify the behavior there, this PR is good to go.

BTW, Our OPS subsystem uses the ES to collect the metrics and events, so i realize a simple ES's sink. May I take another new PR about this ?

SGTM! Just curious - does ES support retention policies?

@huangyuqi
Copy link
Author

Thanks, @vishh :
sink setup

  • Kafka is run as a cluster comprised of one or more servers each of which is called a broker; So we need to assign the broker list to the sink, with the query string brokers
  • Kafka maintains feeds of messages in categories called topics. So the topics should also be specified, with the query string timeseriestopic and eventstopic
  • In kafka sink, we use a client to connect these brokers, with the below config:
type BrokerConf struct {
    // Kafka client ID.
    ClientID string

    // LeaderRetryLimit limits the number of connection attempts to a single
    // node before failing. Use LeaderRetryWait to control the wait time
    // between retries.
    // Defaults to 10.
    LeaderRetryLimit int

    // LeaderRetryWait sets a limit to the waiting time when trying to connect
    // to a single node after failure.
    // Defaults to 500ms.
    // Timeout on a connection is controlled by the DialTimeout setting.
    LeaderRetryWait time.Duration

    // AllowTopicCreation enables a last-ditch "send produce request" which
    // happens if we do not know about a topic. This enables topic creation
    // if your Kafka cluster is configured to allow it.
    // Defaults to False.
    AllowTopicCreation bool

    // Any new connection dial timeout.
    // Default is 10 seconds.
    DialTimeout time.Duration

    // DialRetryLimit limits the number of connection attempts to every node in
    // cluster before failing. Use DialRetryWait to control the wait time
    // between retries.
    // Defaults to 10.
    DialRetryLimit int

    // DialRetryWait sets a limit to the waiting time when trying to establish
    // broker connection to single node to fetch cluster metadata.
    // Defaults to 500ms.
    DialRetryWait time.Duration
...
}

In this sink, I set the count of all retries to 1, and waiting time to zero, means that if the client can not connect the broker, it will return immediately

  • We'll call processes that publish messages to a Kafka topic producers.So we need instantiate a producer base on the broker. And use the producer to send message to kafka server.

Elasticsearch
About retention policies, ES support gateway( gateway module stores the cluster state and shard data), data persistence. Now , we use the gateway to save the data with local file system.

@rvrignaud
Copy link

@huangyuqi I really would be interested to see a PR for an ES sink. Is this for events only or also metrics ? This would fix #227 ?

@huangyuqi
Copy link
Author

Hi, @rvrignaud , Elasticsearch is a distributed full text search engine, ES sink is able to save metrics and events; And we can use the RestAPI to search.

@vishh
Copy link
Contributor

vishh commented Nov 3, 2015

$ ./heapster --logtostderr --source=cadvisor:external?standalone=true --sink="kafka:?brokers=localhost:2181&timeseriestopic=test&eventstopic=test"
I1103 10:00:04.764864    2337 heapster.go:55] ./heapster --logtostderr --source=cadvisor:external?standalone=true --sink=kafka:?brokers=localhost:2181&timeseriestopic=test&eventstopic=test
I1103 10:00:04.764964    2337 heapster.go:56] Heapster version 0.18.0
I1103 10:00:04.765003    2337 external.go:88] Running in standalone mode, external nodes source will only use localhost
F1103 10:00:04.766070    2337 heapster.go:62] failed to connect to kafka cluster: cannot connect

This is the error that I was suggesting to avoid. I will post a patch against this PR to fix this.

@vishh vishh mentioned this pull request Nov 3, 2015
@vishh
Copy link
Contributor

vishh commented Nov 3, 2015

@huangyuqi: I opened #683 based on this PR. I tested that against a local kafka cluster and it seems to be resilient against cluster outages. If you are OK with that, we can merge that in.

We need a simple integration test for this client.

@vishh
Copy link
Contributor

vishh commented Nov 3, 2015

I'm closing this PR.

@vishh vishh closed this Nov 3, 2015
@huangyuqi
Copy link
Author

@vishh , I think something wrong about the sink's format: ^_^

your format : --sink=kafka:?brokers=localhost:2181&timeseriestopic=test&eventstopic=test
mine : --sink=kafka:"http://?brokers=localhost:9092&timeseriestopic=test&eventstopic=test"

two issues, let me explain : )

  • symbol "&"

"&" is used for support multi sinks, like this:

--sink=influxdb:http://0.0.0.0:80&gcm:http://0.0.0.0:81

so, the kafka sink's url needs quotation mark to avoid confuse. Maybe, we can modify the use of symbol "&" .

  • url format

Kafka sink gets the brokers list from the url's query string.So kafka sink's url need follow the format of URL.

So, i use --sink=kafka:"http://?brokers=localhost:9092&timeseriestopic=test&eventstopic=test", the sink works normally. Do you have a better suggestion ?

@vishh
Copy link
Contributor

vishh commented Nov 4, 2015

Enclosing the entire sink config inside " should do the trick. --sink="kafka:?brokers=localhost:9092" should work even if you have multiple parameters. Can you try that?

@huangyuqi
Copy link
Author

./heapster --logtostderr --source=cadvisor:external?standalone=true --sink="kafka:?brokers=localhost:9092&timeseriestopic=test&eventstopic=test"
It can work normally.Enclosing the entire sink config inside "" 👍 ^_^

@rvrignaud
Copy link

@huangyuqi Hi. Would you mind open a PR or at least send a gist for the Elasticsearch sink as you mentioned above ? It would be very useful to us to get that feature ! The issue is describe here : #227.

Thanks !

@huangyuqi
Copy link
Author

@rvrignaud , i am very sorry for delay. I am working on this, I will open a PR in recent two days. I have finished most of the sink code. ^_^

@huangyuqi
Copy link
Author

Hi, @rvrignaud , so sorry for delay. I have finished the ES sink, would you please help me to review it?
Or, if you have any suggestion, please tell me. ^_^
#733

BTW, Hi, @vishh , I found that I can not test any sink in standalone mode.

@rvrignaud
Copy link

@huangyuqi : Hi. Thanks a lot for the PR. Unfortunately I'm not a Golang developer but I'll try to find some time to build and test it.

@vishh
Copy link
Contributor

vishh commented Nov 30, 2015

@huangyuqi: Why are you not able to test sink in standalone mode?

@huangyuqi huangyuqi deleted the hw-kafka-sink branch December 1, 2015 00:42
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

7 participants