Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Question : How can I consume my kafka topic as fast as possible ? #37

Open
pocman opened this issue May 20, 2015 · 22 comments
Open

Question : How can I consume my kafka topic as fast as possible ? #37

pocman opened this issue May 20, 2015 · 22 comments

Comments

@pocman
Copy link

pocman commented May 20, 2015

Hi,

I'm using Kafka between ruby and scala and I noticed that my ruby HermannConsumer is consuming message faster than my AkkaConsumer.

I can set commitConfig = CommitConfig(commitInterval = Some(10 millis)) in kafkaProps but I'm not sure that it's the right thing to do.

How can I consume my kafka topic as fast as possible ?

@pocman
Copy link
Author

pocman commented May 21, 2015

In other words, when producing a constant flux of message in my topic, my receiver is only receiving message every second.
How can I increase this rate ?

@coreyauger
Copy link
Contributor

I am having a similar issue. Did you manage to find a solution to yours? Thanks :)

@sclasen
Copy link
Owner

sclasen commented Feb 3, 2016

When you commit every 10 milliseconds you are writing to zookeeper every 10 milliseconds. Your commit interval should be much, much larger. try 10 seconds?

@sclasen
Copy link
Owner

sclasen commented Feb 3, 2016

and @pocman sorry for the huge lag, gmail started putting my github notifications in spam.

@coreyauger
Copy link
Contributor

Thanks for the reply @sclasen. I seem to have about a 1 to 3 second delay before I receive a message to my receiver actor.

val props = AkkaConsumerProps.forSystemWithFilter(
      system = actorSystem,
      zkConnect = config.zookeeperHosts,
      topicFilter = filter,
      group = subscriberSet.map(_.name) getOrElse new UUID().toString,
      streams = config.numStreams,
      keyDecoder = new StringDecoder(),
      msgDecoder = new KafkaJsonDecoder[T](mapper),
      receiver = actorSystem.actorOf(Props(classOf[ConsumerActor[T]], consumer))
    )

    val akkaConsumer = new AkkaConsumer(props)
    akkaConsumer.start()
    consumerRegistry.put(topicPattern.pattern, akkaConsumer)

And then the ConsumerActor

class ConsumerActor[T](consumer: Consumer[T]) extends Actor with Logging {
  def receive = {
    case event: PublishedEvent[T] =>
      logger.trace(s"Received Event [$event]")
      try {
        consumer(event) 
        logger.trace(s"Event Processing complete for [$event]")
      } catch {
        case ex: Exception =>
          logger.error(s"Error Processing Event(id=${event.eventId})", ex)
      }
      sender ! StreamFSM.Processed
  }
}

Any idea where that would be occurring? Thanks for your work :)

@sclasen
Copy link
Owner

sclasen commented Feb 3, 2016

@coreyauger can you set the log level to DEBUG and send me the output?

@coreyauger
Copy link
Contributor

Hi @sclasen .. thanks again for your help in this.
I have put a small dump of the log file here
https://gist.github.com/coreyauger/0ce0b743c4e987218238

The log explodes fast as akka is logging "heartbeats" for the cluster and other things that turn my local log file into a mess. I hope there is something in there that helps.

There are a lot of msgs similar to this
stream=stream1 state=Unused msg=Drain outstanding=0

I suspect that this is more what you are looking for?

Let me know if I can provide anything else :)

@sclasen
Copy link
Owner

sclasen commented Feb 3, 2016

This is helpful. Things appear to be working as expected to me.

when you see things like this

[DEBUG] [02/03/2016 13:56:27.251] [core-akka.actor.default-dispatcher-27] [akka.tcp://core@127.0.0.1:44691/user/$B/stream3] stream=stream3 at=transition from=Processing to=Unused
[DEBUG] [02/03/2016 13:56:27.251] [core-akka.actor.default-dispatcher-25] [akka.tcp://core@127.0.0.1:44691/user/$B/stream0] stream=stream0 at=transition from=Processing to=Unused
[DEBUG] [02/03/2016 13:56:27.251] [core-akka.actor.default-dispatcher-30] [akka.tcp://core@127.0.0.1:44691/user/$B/stream1] stream=stream1 at=transition from=Processing to=Unused
[DEBUG] [02/03/2016 13:56:27.251] [core-akka.actor.default-dispatcher-19] [akka.tcp://core@127.0.0.1:44691/user/$B/stream2] stream=stream2 at=transition from=Processing to=Unused

it means that the underlying kafka consumer iterator hasNext() is returning false, meaning there are no messages to consume. When this happens the stream goes into 'Unused' and will not attempt to consume again until the next commit.

What is your use case here? akka-kafka is designed more for high-throughput while still giving some processing/commit guarantees. Commit is a blocking operation in that when a commit happens we wait until all in-flight messages are processsed. Latencies for low-throughput topics are not expected to be very low.

@sclasen
Copy link
Owner

sclasen commented Feb 3, 2016

related #23

@coreyauger
Copy link
Contributor

There are a few use cases.. but one of which is IM or (chat) based messaging.. so low latency is certainly required for this.

In this case am I better to adapt a new kind of consumer or are you saying that I should not use kafka for this?

I found this
http://stackoverflow.com/questions/20520492/how-to-minimize-the-latency-involved-in-kafka-messaging-framework

which lead me to believe that new version of kafka can achieve this low latency.

Thanks again :)

@sclasen
Copy link
Owner

sclasen commented Feb 3, 2016

Well not sure about kafka being suitable for low latency, quote from the highest ranked answer on the link provided... I wouldn't use Kafka for anything where latency needed to be consistenly less than 1-2 seconds.

I suppose it depends on how you design and tune things, but generally I dont think akka-kafka is a good fit for a low-latency, low-throughput chat application.

How do the topics and partitions work? is there a consumer for each user?

I think you will have much better chances of success for this particular use case if you just use the kafka consumer directly (this library uses it under the covers), with auto commit enabled, it will keep polling on the iterator during commits, and doesnt have any notion of an unused state. You may have the odd lost message during hard crashes but seems totally fine for a chat app imo.

@coreyauger
Copy link
Contributor

I will take a look at this. Would there be any interest if I submit a pr for another type of low latency consumer for this lib?

Or is this use case best left for my own lib?

Let me know Either way. Your help is much appreciate :) Thanks again !

@sclasen
Copy link
Owner

sclasen commented Feb 3, 2016

Sure, happy to take a look at a PR, please do!

On Wed, Feb 3, 2016 at 2:52 PM, Corey Auger notifications@github.com
wrote:

I will take a look at this. Would there be any interest if I submit a pr
for another type of low latency consumer for this lib?

Or is this use case best left for my own lib?

Let me know Either way. You help is much appreciate :) Thanks again !


Reply to this email directly or view it on GitHub
#37 (comment).

@coreyauger
Copy link
Contributor

Hi @sclasen ,

I have built a initial actor that is working well for my use case. It has been modelled from the Kafka HighLevel Consumer.
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Here is the code I have so far.
https://github.com/coreyauger/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/AkkaHighLevelConsumer.scala

Please take a look and if it is something that is you think we should have. I will clean it up and add it to the documentation. I will also branch the code and fold the commits into a single commit before I do a PR.

Thanks :)

@sclasen
Copy link
Owner

sclasen commented Feb 7, 2016

Thanks! I think you can get rid of the hasNext stuff, since it is only used
to avoid blocking, you can just loop on calling next() which will block
until messages arrive, which is fine in this usage.

Otherwise I fear the code will spin on hasNext and burn cpu

On Sunday, February 7, 2016, Corey Auger notifications@github.com wrote:

Hi @sclasen https://github.com/sclasen ,

I have built a initial actor that is working well for my use case. It has
been modelled from the Kafka HighLevel Consumer.
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

Here is the code I have so far.

https://github.com/coreyauger/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/AkkaHighLevelConsumer.scala

Please take a look and if it is something that is you think we should
have. I will clean it up and add it to the documentation. I will also
branch the code and fold the commits into a single commit before I do a PR.

Thanks :)


Reply to this email directly or view it on GitHub
#37 (comment).

@coreyauger
Copy link
Contributor

That was indeed my original implementation.. but I continue to get the time-out exception. There is most likely a setting that I can pass in for an infinite wait... I will have a look for that. :)

@sclasen
Copy link
Owner

sclasen commented Feb 8, 2016

Yeah, you will still have to handle timeout excetion in the next() call.
Which is probably good, you can then check if the consumer has been shut
down and exit.

On Mon, Feb 8, 2016 at 11:18 AM, Corey Auger notifications@github.com
wrote:

That was indeed my original implementation.. but I continue to get the
time-out exception. There is most likely a setting that I can pass in for
an infinite wait... I will have a look for that. :)


Reply to this email directly or view it on GitHub
#37 (comment).

@coreyauger
Copy link
Contributor

adding this as a ref
http://mail-archives.apache.org/mod_mbox/kafka-users/201408.mbox/%3CCACim9RkJ6+6jJKzP3UQKQPbsOUsboifBoySpOYPm=2RPp9XUcw@mail.gmail.com%3E

> > If you want to restart the consumer in handling the timeout exception,
> then
> > you should probably just increasing the timeout value in the configs to
> > avoid it throwing timeout exception.

Config

consumer.timeout.ms -1  By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption. By setting the value to a positive integer, a timeout exception is thrown to the consumer if no message is available for consumption after the specified timeout value.

The blocking in my case will happen on numStreams threads for each topic.
http://doc.akka.io/docs/akka/2.2.3/general/actor-systems.html#Blocking_Needs_Careful_Management

@coreyauger
Copy link
Contributor

Since timeout and auto commit are required for the HighLevel Consumer to work.. would it be best to append these arguments when the actor is creating the config? That way we can leave the settings intact for the other types of actors?

I plan to use the system with services that support both actors with different goals..

@sclasen
Copy link
Owner

sclasen commented Feb 9, 2016

Want to open a PR? easier to review/discuss.

On Mon, Feb 8, 2016 at 1:16 PM, Corey Auger notifications@github.com
wrote:

Updated

https://github.com/coreyauger/akka-kafka/blob/master/src/main/scala/com/sclasen/akka/kafka/AkkaHighLevelConsumer.scala


Reply to this email directly or view it on GitHub
#37 (comment).

@coreyauger
Copy link
Contributor

Sounds good :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants