Consuming messages

Maciej Mensfeld edited this page Mar 23, 2018 · 13 revisions

Depending on your application and/or consumer group settings, Karafka's consumer can consume messages in two modes:

  • Batch messages consuming - allows you to use #params_batch method that will contain an array of messages
  • Single message consuming - allows you to use #params method that will always contain a single message. You can think of this mode, as an equivalent to a standard HTTP way of doing things.

Which mode you decide to use strongly depends on your business logic.

Note: batch_consuming and batch_fetching aren't the same. Please visit the config section of this Wiki for an explanation.

Batch messages consuming

When the batch consuming mode is enabled, a single #consume method will receive a batch of messages from Kafka (although they will always be from a single partition of a single topic). You can access them using the #params_batch method as presented:

class UsersConsumer < ApplicationConsumer
  def consume
    params_batch.each do |message|
      User.create!(message[:user])
    end
  end
end

Keep in mind, that params_batch is not just a simple array. The messages inside are lazy parsed upon the first usage, so you shouldn't directly flush them into DB. To do so, please use the #parsed params batch method to parse all the messages:

class EventsConsumer < ApplicationConsumer
  def consume
    EventStore.store(params_batch.parsed)
  end
end

Parsing will be automatically performed as well if you decide to map parameters (or use any Enumerable module method):

class EventsConsumer < ApplicationConsumer
  def consume
    EventStore.store(params_batch.map { |param| param[:user] })
  end
end

This was implemented that way because there are cases, in which based on some external parameters you may want to drop consuming of a given batch. If so, then why would you even want to parse them in the first place? :)

Single message consuming

In this mode, Karafka's consumer will consume messages separately, one after another. You can think of it as an equivalent of a typical HTTP request processing consumer. Inside of your #consume method, you will be able to use the #params method and it will contain a single Kafka message in it.

class UsersConsumer < ApplicationConsumer
  def consume
    puts params #=> { 'parsed' =>true, 'topic' => 'example', 'partition' => 0, ... }
    User.create!(params[:user])
  end
end

Backends

Due to different scenarios and cases when working with Kafka messages, Karafka supports using backends that allow you to choose, where you want to consume your data:

  • :inline - default mode that will consume messages right after they were received by Karafka
  • :sidekiq - mode in which Karafka will schedule a background Sidekiq job to consume given message or messages in a background worker. To use Sidekiq backend, please follow the instructions in the README of Karafka Sidekiq Backend repository.

You can just set the backend setting in your config file (to make it a default) or you can set it per topic in your routing.

# Per app
class App < Karafka::App
  setup do |config|
    config.backend = :inline
  end
end

# Per topic
App.consumer_groups.draw do
  consumer_group :group_name do
    topic :example do
      consumer ExampleConsumer
      backend :sidekiq
    end

    topic :example2 do
      consumer Example2Consumer
      backend :inline
    end
  end
end
You can’t perform that action at this time.
You signed in with another tab or window. Reload to refresh your session. You signed out in another tab or window. Reload to refresh your session.
Press h to open a hovercard with more details.