Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a new configuration value to run multiple consumers in separate threads. #160

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Adithya-copart
Copy link

This is a draft PR to try multiple consumers using threads.

  1. Support multiple consumers using threads when threaded option is passed.
  2. Shutdown all the consumers in different threads.

This is tested using rdkafka-ruby:master branch that supports JRuby.

## test_multiple.rb

Racecar.configure do |config|
  # Each config variable can be set using a writer attribute.
  config.brokers = ['localhost:9092']
end

class TestConsumer1 < Racecar::Consumer
  subscribes_to "topic1"
  group_id = 'test1'

  def process(message)
    p "#{self.class} #{message.value} #{message.offset}"
  end
end

class TestConsumer2 < Racecar::Consumer
  subscribes_to "topic2"
  group_id = 'test2'

  def process(message)
    p "#{self.class} #{message.value} #{message.offset}"
 end
end

Command:

bundle exec racecar --require ./test_multiple.rb --threaded TestConsumer1 TestConsumer2

Result:

$ bundle exec racecar --require ./test_multiple.rb --threaded TestConsumer1 TestConsumer2
=> Starting Racecar consumer TestConsumer1, TestConsumer2...
=> Wrooooom!
=> Ctrl-C to shutdown consumer
Starting consumer TestConsumer1
The signal QUIT is in use by the JVM and will not work correctly on this platform
The signal USR1 is in use by the JVM and will not work correctly on this platform
Starting consumer TestConsumer2
W, [2020-03-10T16:35:13.388905 #372]  WARN -- : ActiveSupport::Notifications not available, instrumentation is disabled
W, [2020-03-10T16:35:13.389168 #372]  WARN -- : ActiveSupport::Notifications not available, instrumentation is disabled
^CI, [2020-03-10T16:35:15.001110 #372]  INFO -- : Gracefully shutting down TestConsumer2
I, [2020-03-10T16:35:15.001110 #372]  INFO -- : Gracefully shutting down TestConsumer1
E, [2020-03-10T16:35:15.006739 #372] ERROR -- : rdkafka: [thrd:GroupCoordinator]: 1/1 brokers are down
I, [2020-03-10T16:35:15.006740 #372]  INFO -- : Graceful shutdown of TestConsumer1 completed
I, [2020-03-10T16:35:15.007616 #372]  INFO -- : Graceful shutdown of TestConsumer2 completed

@dasch
Copy link
Collaborator

dasch commented Sep 15, 2020

@Adithya-copart would you be interested in restarting this work?

@dasch
Copy link
Collaborator

dasch commented Sep 15, 2020

Specifically: scoping out support for running different consumers in a single process, but instead having a single consumer process its assigned topic partitions concurrently.

@adi-pen
Copy link

adi-pen commented Apr 13, 2021

Sorry for the super late reply but I'm no longer using the handle @Adithya-copart.
I'm not using JRuby or racecar anymore and will not be able to do this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants