Skip to content
Karafka Sidekiq backend for background messages processing
Branch: master
Clone or download
Latest commit 4a69e68 Apr 23, 2019
Type Name Latest commit message Commit time
Failed to load latest commit information.
.coditsu better coditsu setup Apr 6, 2019
lib #26 (#27) Apr 10, 2019
spec #26 (#27) Apr 10, 2019
.gitignore better coditsu setup Apr 6, 2019
.rspec specs and travis Sep 1, 2017
.ruby-version ruby 2.6.3 (#28) Apr 19, 2019
.travis.yml travis build update Apr 23, 2019 ruby 2.6.3 (#28) Apr 19, 2019
Gemfile fix temp branch dependency Feb 18, 2019
Gemfile.lock gem bump-2019-04-23 Apr 23, 2019
MIT-LICENCE sidekiq backend Sep 1, 2017
Rakefile sidekiq backend Sep 1, 2017
karafka-sidekiq-backend.gemspec drop ruby 2.3 and update default to 2.6.2 Mar 25, 2019

Karafka Sidekiq Backend

Build Status Join the chat at

Karafka Sidekiq Backend provides support for consuming (processing) received Kafka messages inside of Sidekiq workers.


Add this to your gemfile:

gem 'karafka-sidekiq-backend'

and create a file called application_worker.rb inside of your app/workers directory, that looks like that:

class ApplicationWorker < Karafka::BaseWorker

and you are ready to go. Karafka Sidekiq Backend integrates with Karafka automatically

Note: You can name your application worker base class with any name you want. The only thing that is required is a direct inheritance from the Karafka::BaseWorker class.


If you want to process messages with Sidekiq backend, you need to tell this to Karafka.

To do so, you can either configure that in a configuration block:

class App < Karafka::App
  setup do |config|
    config.backend = :sidekiq
    # Other config options...

or on a per topic level:

App.routes.draw do
  consumer_group :videos_consumer do
    topic :binary_video_details do
      consumer Videos::DetailsConsumer
      worker Workers::DetailsWorker
      interchanger Interchangers::MyCustomInterchanger

You don't need to do anything beyond that. Karafka will know, that you want to run your consumer's #consume method in a background job.


There are two options you can set inside of the topic block:

Option Value type Description
worker Class Name of a worker class that we want to use to schedule perform code
interchanger Class Name of an interchanger class that we want to use to pass the incoming data to Sidekiq


Karafka by default will build a worker that will correspond to each of your consumers (so you will have a pair - consumer and a worker). All of them will inherit from ApplicationWorker and will share all its settings.

To run Sidekiq you should have sidekiq.yml file in config folder. The example of sidekiq.yml file will be generated to config/sidekiq.yml.example once you run bundle exec karafka install.

However, if you want to use a raw Sidekiq worker (without any Karafka additional magic), or you want to use SidekiqPro (or any other queuing engine that has the same API as Sidekiq), you can assign your own custom worker:

topic :incoming_messages do
  consumer MessagesConsumer
  worker MyCustomWorker

Note that even then, you need to specify a consumer that will schedule a background task.

Custom workers need to provide a #perform_async method. It needs to accept two arguments:

  • topic_id - first argument is a current topic id from which a given message comes
  • params_batch - all the params that came from Kafka + additional metadata. This data format might be changed if you use custom interchangers. Otherwise, it will be an instance of Karafka::Params::ParamsBatch.

Note: If you use custom interchangers, keep in mind, that params inside params batch might be in two states: parsed or unparsed when passed to #perform_async. This means, that if you use custom interchangers and/or custom workers, you might want to look into Karafka's sources to see exactly how it works.


Custom interchangers target issues with non-standard (binary, etc.) data that we want to store when we do #perform_async. This data might be corrupted when fetched in a worker (see this issue). With custom interchangers, you can encode/compress data before it is being passed to scheduling and decode/decompress it when it gets into the worker.

To specify the interchanger for a topic, specify the interchanger inside routes like this:

App.routes.draw do
  consumer_group :videos_consumer do
    topic :binary_video_details do
      consumer Videos::DetailsConsumer
      interchanger Interchangers::MyCustomInterchanger

Each custom interchanger should define encode to encode params before they get stored in Redis, and decode to convert the params to hash format, as shown below:

class Base64Interchanger
  class << self
    def encode(params_batch)
      # Note, that you need to cast the params_batch to an array in order to get it work
      # in sidekiq later

    def decode(params_string)

Warning: if you decide to use slow interchangers, they might significantly slow down Karafka.


Note on contributions

First, thank you for considering contributing to Karafka! It's people like you that make the open source community such a great community!

Each pull request must pass all the RSpec specs and meet our quality requirements.

To check if everything is as it should be, we use Coditsu that combines multiple linters and code analyzers for both code and documentation. Once you're done with your changes, submit a pull request.

Coditsu will automatically check your work against our quality standards. You can find your commit check results on the builds page of Karafka Sidekiq Backend repository.


You can’t perform that action at this time.