diff --git a/README.rdoc b/README.rdoc index 4fcdb0d..5c8893f 100644 --- a/README.rdoc +++ b/README.rdoc @@ -27,6 +27,7 @@ You need to install ZeroMQ libraries before installing this plugin pubkey ${tag}:${key1} bindaddr tcp://*:5556 flush_interval 1s + bulk_send true * 'pubkey' specifies the publish key to ZeroMQ. @@ -34,6 +35,7 @@ You need to install ZeroMQ libraries before installing this plugin * Actual record to be published is ' '. * Subscriber can subscribe by ''. * 'bindaddr' is the address to which ZeroMQ publisher socket to be bound. +* If 'bulk_send' is set to true, send multiple records with the same publish key in one 'send_string' method. This improves the performance. == Example usage @@ -87,6 +89,17 @@ Then you will get the following output from sample_sub.rb The nice thing is that once you put this plugin to your fluentd.conf and start fluentd, you can start and stop any subscriber programs without changing fluentd configuration. +== zmq_sub input plugin + +Input plugin to subscribe the output of zmq_pub is also included. Here is the example configuration. + + + type zmq_sub + publisher tcp://127.0.0.1:5556 + bulk_send true + + + == Copyright * Copyright (c) 2013- OGIBAYASHI Hironori (@angostura11) diff --git a/lib/fluent/plugin/in_zmq_sub.rb b/lib/fluent/plugin/in_zmq_sub.rb new file mode 100644 index 0000000..4f51c88 --- /dev/null +++ b/lib/fluent/plugin/in_zmq_sub.rb @@ -0,0 +1,87 @@ +module Fluent + + class ZmqSubInput < Fluent::Input + Fluent::Plugin.register_input('zmq_sub', self) + + config_param :subkey, :string, :default => "" + config_param :publisher, :string, :default => "tcp://127.0.0.1:5556" + config_param :bulk_send, :bool, :default => false + + attr_reader :subkeys + + def initialize + super + require 'ffi-rzmq' + end + + def configure(conf) + super + @subkeys = @subkey.split(",") + end + + def start + super + @context =ZMQ::Context.new() + @thread = Thread.new(&method(:run)) + end + + def shutdown + super + Thread.kill(@thread) + @thread.join + @context.terminate + end + + def run + begin + @subscriber = @context.socket(ZMQ::SUB) + @subscriber.connect(@publisher) + if @subkeys.size > 0 + @subkeys.each do |k| + @subscriber.setsockopt(ZMQ::SUBSCRIBE,k) + end + else + @subscriber.setsockopt(ZMQ::SUBSCRIBE,'') + end + loop do + msg = '' + while @subscriber.recv_string(msg,ZMQ::DONTWAIT) && msg.size > 0 + begin + (key, records) = msg.split(" ",2) + records = MessagePack.unpack(records) + if @bulk_send && records[0].class == Array + es = MultiEventStream.new + prev_tag = nil + records.each do |tag, time, record| + if prev_tag && prev_tag != tag + Engine.emit_stream(prev_tag, es) + es = MultiEventStream.new + end + es.add(time, record) + prev_tag = tag + end + Engine.emit_stream(prev_tag, es) if es.to_a.size > 0 + else + Engine.emit(*records) + end + rescue => e + log.warn "Error in processing message.",:error_class => e.class, :error => e + log.warn_backtrace + end + msg = '' + end + sleep(0.1) + end + rescue => e + log.error "error occurred while executing plugin.", :error_class => e.class, :error => e + log.warn_backtrace + ensure + if @subscriber + @subscriber.close + end + end + end + + end + +end diff --git a/test/helper.rb b/test/helper.rb index 075f22e..a1d92a1 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -23,6 +23,7 @@ def method_missing(method, *args) end require 'fluent/plugin/out_zmq_pub' +require 'fluent/plugin/in_zmq_sub' class Test::Unit::TestCase end diff --git a/test/plugin/test_in_zmq_sub.rb b/test/plugin/test_in_zmq_sub.rb new file mode 100644 index 0000000..c1aacbf --- /dev/null +++ b/test/plugin/test_in_zmq_sub.rb @@ -0,0 +1,97 @@ +require 'helper' +require 'ffi-rzmq' + +class ZmqSubIutputTest < Test::Unit::TestCase + def setup + Fluent::Test.setup + @context = ZMQ::Context.new() + @publisher = @context.socket(ZMQ::PUB) + @publisher.bind("tcp://*:5556") + end + + def teardown + @publisher.close + @context.terminate + end + + PUBLISHER = "tcp://127.0.0.1:5556" + CONFIG = %[ + publisher #{PUBLISHER} + subkey test1.,test2. + ] + + + def create_driver(conf=CONFIG) + Fluent::Test::InputTestDriver.new(Fluent::ZmqSubInput).configure(conf) + end + + def test_configure + d = create_driver(CONFIG + "bulk_send true") + assert_equal PUBLISHER, d.instance.publisher + assert_equal ["test1.","test2."], d.instance.subkeys + assert_equal true, d.instance.bulk_send + end + + def test_receive + d = create_driver + + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + Fluent::Engine.now = time + + d.expect_emit "test1.aa", time, {"a"=>1} + d.expect_emit "test2.bb", time, {"a"=>2} + + d.run do + d.expected_emits.each {|tag,time,record| + send_record("dummy",time,record) # This record should not be received. + send_record(tag,time,record) + } + sleep 1 + end + end + + def test_no_subkey + d = create_driver("") + + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + Fluent::Engine.now = time + + d.expect_emit "test1.aa", time, {"a"=>1} + d.expect_emit "test2.bb", time, {"a"=>2} + + d.run do + d.expected_emits.each {|tag,time,record| + send_record(tag,time,record) + } + sleep 1 + end + end + + + def test_receive_bulk + d = create_driver(CONFIG + "bulk_send true") + + time = Time.parse("2011-01-02 13:14:15 UTC").to_i + Fluent::Engine.now = time + + d.expect_emit "test3.aa", time, {"a"=>1} + d.expect_emit "test4.bb", time, {"a"=>2} + + d.run do + record_to_send = [] + d.expected_emits.each {|tag,time,record| + record_to_send << [tag,time,record] + } + send_record_bulk("test1.aa",record_to_send) + sleep 1 + end + end + + def send_record(tag,time,record) + @publisher.send_string(tag + " " + [tag,time,record].to_msgpack) + end + + def send_record_bulk(tag,records) + @publisher.send_string(tag + " " + records.to_msgpack) + end +end