-
Notifications
You must be signed in to change notification settings - Fork 336
Closed
Description
- Version of Ruby: 2.3.1
- Version of Kafka: 2.12-0.10.2.2
- Version of ruby-kafka: 0.7.6 and master
I'm new to Kafka, so if I'm doing something wrong, please let me know!
Steps to reproduce
I'm basically trying to stream data into Druid (druid.io) through kafka, using the versions they recommended in their quickstart tutorial. I have verified that I set up the tutorial correctly because their example works. Dumping my json output into a json file and using their producer script also works.
Here's the code I used
# config/initializers/kafka_producer.rb
require "kafka"
# Configure the Kafka client with the broker hosts and the Rails
# logger.
$kafka = Kafka.new(Rubybase.envar.KAFKA_BROKERS.split(','), logger: Rails.logger, client_id: 'dojo')
# Set up an asynchronous producer that delivers its buffered messages
# every ten seconds:
$kafka_producer = $kafka.async_producer(
delivery_interval: 10,
)
# Make sure to shut down the producer when exiting.
at_exit { $kafka_producer.shutdown }
$kafka_producer.produce(JSON.dump(fact), topic: 'publisher_activity')
result of JSON.dump(fact)
{"datetime":"2019-05-02T22:48:20-07:00","program_id":1,"program_name":"AcmeAcmeTimes","user_id":null,"user_first_name":"Admin","user_last_name":"McAdminson","user_role":"super_admin","content_id":761,"content_type":"article","content_title":"A great article.","content_title_length":16,"content_summary":"Listen to me!","content_summary_length":13,"content_body_length":207,"content_source_type":"admin_created","content_featured_at":null,"content_expired_at":null,"content_language":null,"content_published_at":"2019-05-02T23:26:53.539Z","content_publication_state":"published","content_posted_at":null,"content_is_shareable":false,"content_is_video":false,"content_video_duration_in_seconds":null,"content_video_filename":null,"content_is_promoted":null,"content_promotion_type":null,"content_is_submitted":false,"content_template_id":14,"content_template_name":"Modern Overlay","content_is_commentable":null,"content_estimated_reach":null,"content_channels_count":4,"publisher_user_id":1,"publisher_name":"Admin McAdminson"}
Expected outcome
Message should be accepted/consumed.
Actual outcome
Ruby console:
Failed to send messages to publisher_activity/0
/Users/ericchan/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/ruby-kafka-0.7.6/lib/kafka/producer.rb:414:in `deliver_messages_with_retries'
/Users/ericchan/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/ruby-kafka-0.7.6/lib/kafka/producer.rb:251:in `block in deliver_messages'
/Users/ericchan/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/activesupport-4.2.7.1/lib/active_support/notifications.rb:166:in `instrument'
/Users/ericchan/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/ruby-kafka-0.7.6/lib/kafka/instrumenter.rb:21:in `instrument'
/Users/ericchan/.rbenv/versions/2.3.1/lib/ruby/gems/2.3.0/gems/ruby-kafka-0.7.6/lib/kafka/producer.rb:244:in `deliver_messages'
Kafka console:
[2019-05-03 10:25:09,549] ERROR Closing socket for 192.168.1.124:9092-192.168.1.124:51240 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: 0 and apiVersion: 3
Caused by: java.lang.IllegalArgumentException: Invalid version for API key 0: 3
at org.apache.kafka.common.protocol.ProtoUtils.schemaFor(ProtoUtils.java:31)
at org.apache.kafka.common.protocol.ProtoUtils.requestSchema(ProtoUtils.java:50)
at org.apache.kafka.common.protocol.ProtoUtils.parseRequest(ProtoUtils.java:66)
at org.apache.kafka.common.requests.ProduceRequest.parse(ProduceRequest.java:210)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:83)
at kafka.network.RequestChannel$Request.liftedTree2$1(RequestChannel.scala:94)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:89)
at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:515)
at kafka.network.Processor.$anonfun$processCompletedReceives$1$adapted(SocketServer.scala:505)
at scala.collection.Iterator.foreach(Iterator.scala:929)
at scala.collection.Iterator.foreach$(Iterator.scala:929)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1406)
at scala.collection.IterableLike.foreach(IterableLike.scala:71)
at scala.collection.IterableLike.foreach$(IterableLike.scala:70)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.processCompletedReceives(SocketServer.scala:505)
at kafka.network.Processor.run(SocketServer.scala:433)
at java.lang.Thread.run(Thread.java:748)
Metadata
Metadata
Assignees
Labels
No labels