From 0cd872be39e019e7cd4b0d5a9607df323edd43a2 Mon Sep 17 00:00:00 2001 From: Elias Levy Date: Wed, 22 Jun 2016 18:49:43 -0700 Subject: [PATCH] KafkaProducer: Publishing messages with timestamps. (#52) * KafkaProducer: Publishing messages with timestamps. Allow passing in a timestamp for messages at publishing time. * Fix broken test. --- lib/jruby-kafka/kafka-producer.rb | 20 ++++++++++++++---- test/test_kafka-producer.rb | 35 ++++++++++++++++++++++++++++++- test/util/kafka-producer.rb | 10 +++++++++ 3 files changed, 60 insertions(+), 5 deletions(-) diff --git a/lib/jruby-kafka/kafka-producer.rb b/lib/jruby-kafka/kafka-producer.rb index 2240542..a25b87a 100644 --- a/lib/jruby-kafka/kafka-producer.rb +++ b/lib/jruby-kafka/kafka-producer.rb @@ -40,12 +40,24 @@ def initialize(opts = {}) java_alias :send_method , :send, [ProducerRecord] java_alias :send_cb_method, :send, [ProducerRecord, Callback.java_class] - # throws FailedToSendMessageException or if not connected, StandardError. - def send_msg(topic, partition, key, value, &block) + # Send a message to the cluster. + # + # @param [String] topic The topic to send the message to. + # @param [Integer,nil] partition The topic partition to send the message to, or nil to allow + # the configured partitioner class to select the partition. + # @param [String,nil] key The message key, if there is one. Otherwise, nil. + # @param [String] value The message value. + # @param [Integer,nil] timestamp The message timestamp in milliseconds. If nil, the + # producer will assign it the current time. + # + # @raise [FailedToSendMessageException] if it can't send the message + def send_msg(topic, partition, key, value, timestamp=nil, &block) + record = ProducerRecord.new(topic, partition, timestamp, key, value) + if block - send_cb_method ProducerRecord.new(topic, partition, key, value), RubyCallback.new(block) + send_cb_method record, RubyCallback.new(block) else - send_method ProducerRecord.new(topic, partition, key, value) + send_method record end end end diff --git a/test/test_kafka-producer.rb b/test/test_kafka-producer.rb index 9da4f14..7e4379b 100644 --- a/test/test_kafka-producer.rb +++ b/test/test_kafka-producer.rb @@ -19,7 +19,6 @@ def test_01_send_message assert(future.isDone(), 'expected message to be done') assert(future.get().topic(), topic) assert_equal(future.get().partition(), 0) - end def test_02_send_msg_with_cb @@ -63,4 +62,38 @@ def test_03_get_sent_msg end assert(found.include?('test message'), 'expected to find message: test message') end + + def test_04_send_message_with_ts + topic = 'test_send' + future = send_kafka_producer_msg_ts topic, (Time.now.to_i * 1000) + assert_not_nil(future) + begin + timeout(30) do + until future.isDone() do + next + end + end + end + assert(future.isDone(), 'expected message to be done') + assert(future.get().topic(), topic) + assert_equal(future.get().partition(), 0) + end + + def test_05_send_msg_with_ts_and_cb + metadata = exception = nil + future = send_kafka_producer_msg_ts_cb(Time.now.to_i * 1000) { |md,e| metadata = md; exception = e } + assert_not_nil(future) + begin + timeout(30) do + while metadata.nil? && exception.nil? do + next + end + end + end + assert_not_nil(metadata) + assert_instance_of(Java::OrgApacheKafkaClientsProducer::RecordMetadata, metadata) + assert_nil(exception) + assert(future.isDone(), 'expected message to be done') + end + end diff --git a/test/util/kafka-producer.rb b/test/util/kafka-producer.rb index 45177dd..39215a2 100644 --- a/test/util/kafka-producer.rb +++ b/test/util/kafka-producer.rb @@ -15,3 +15,13 @@ def send_kafka_producer_msg_cb(&block) producer = Kafka::KafkaProducer.new(KAFKA_PRODUCER_OPTIONS) producer.send_msg('test',nil, nil, 'test message', &block) end + +def send_kafka_producer_msg_ts(topic, timestamp) + producer = Kafka::KafkaProducer.new(KAFKA_PRODUCER_OPTIONS) + producer.send_msg(topic,nil, nil, 'test message', timestamp) +end + +def send_kafka_producer_msg_ts_cb(timestamp, &block) + producer = Kafka::KafkaProducer.new(KAFKA_PRODUCER_OPTIONS) + producer.send_msg('test',nil, nil, 'test message', timestamp, &block) +end