Skip to content

Commit

Permalink
Added zmq_sub input plugin which receive records published by out_zmq…
Browse files Browse the repository at this point in the history
…_pub.
  • Loading branch information
Hironori Ogibayashi committed Sep 27, 2014
1 parent a350e41 commit 68b9960
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 0 deletions.
13 changes: 13 additions & 0 deletions README.rdoc
Expand Up @@ -27,13 +27,15 @@ You need to install ZeroMQ libraries before installing this plugin
pubkey ${tag}:${key1}
bindaddr tcp://*:5556
flush_interval 1s
bulk_send true
</match>

* 'pubkey' specifies the publish key to ZeroMQ.
* '${tag}' is replace by fluentd tag. '${name}' is replaced by fluentd record['name'].
* Actual record to be published is '<pubkey> <reocord.to_msgpack>'.
* Subscriber can subscribe by '<pubkey>'.
* 'bindaddr' is the address to which ZeroMQ publisher socket to be bound.
* If 'bulk_send' is set to true, send multiple records with the same publish key in one 'send_string' method. This improves the performance.

== Example usage

Expand Down Expand Up @@ -87,6 +89,17 @@ Then you will get the following output from sample_sub.rb

The nice thing is that once you put this plugin to your fluentd.conf and start fluentd, you can start and stop any subscriber programs without changing fluentd configuration.

== zmq_sub input plugin

Input plugin to subscribe the output of zmq_pub is also included. Here is the example configuration.

<source>
type zmq_sub
publisher tcp://127.0.0.1:5556
bulk_send true
</source>


== Copyright

* Copyright (c) 2013- OGIBAYASHI Hironori (@angostura11)
Expand Down
87 changes: 87 additions & 0 deletions lib/fluent/plugin/in_zmq_sub.rb
@@ -0,0 +1,87 @@
module Fluent

class ZmqSubInput < Fluent::Input
Fluent::Plugin.register_input('zmq_sub', self)

config_param :subkey, :string, :default => ""
config_param :publisher, :string, :default => "tcp://127.0.0.1:5556"
config_param :bulk_send, :bool, :default => false

attr_reader :subkeys

def initialize
super
require 'ffi-rzmq'
end

def configure(conf)
super
@subkeys = @subkey.split(",")
end

def start
super
@context =ZMQ::Context.new()
@thread = Thread.new(&method(:run))
end

def shutdown
super
Thread.kill(@thread)
@thread.join
@context.terminate
end

def run
begin
@subscriber = @context.socket(ZMQ::SUB)
@subscriber.connect(@publisher)
if @subkeys.size > 0
@subkeys.each do |k|
@subscriber.setsockopt(ZMQ::SUBSCRIBE,k)
end
else
@subscriber.setsockopt(ZMQ::SUBSCRIBE,'')
end
loop do
msg = ''
while @subscriber.recv_string(msg,ZMQ::DONTWAIT) && msg.size > 0
begin
(key, records) = msg.split(" ",2)
records = MessagePack.unpack(records)
if @bulk_send && records[0].class == Array
es = MultiEventStream.new
prev_tag = nil
records.each do |tag, time, record|
if prev_tag && prev_tag != tag
Engine.emit_stream(prev_tag, es)
es = MultiEventStream.new
end
es.add(time, record)
prev_tag = tag
end
Engine.emit_stream(prev_tag, es) if es.to_a.size > 0
else
Engine.emit(*records)
end
rescue => e
log.warn "Error in processing message.",:error_class => e.class, :error => e
log.warn_backtrace
end
msg = ''
end
sleep(0.1)
end
rescue => e
log.error "error occurred while executing plugin.", :error_class => e.class, :error => e
log.warn_backtrace
ensure
if @subscriber
@subscriber.close
end
end
end

end

end
1 change: 1 addition & 0 deletions test/helper.rb
Expand Up @@ -23,6 +23,7 @@ def method_missing(method, *args)
end

require 'fluent/plugin/out_zmq_pub'
require 'fluent/plugin/in_zmq_sub'

class Test::Unit::TestCase
end
97 changes: 97 additions & 0 deletions test/plugin/test_in_zmq_sub.rb
@@ -0,0 +1,97 @@
require 'helper'
require 'ffi-rzmq'

class ZmqSubIutputTest < Test::Unit::TestCase
def setup
Fluent::Test.setup
@context = ZMQ::Context.new()
@publisher = @context.socket(ZMQ::PUB)
@publisher.bind("tcp://*:5556")
end

def teardown
@publisher.close
@context.terminate
end

PUBLISHER = "tcp://127.0.0.1:5556"
CONFIG = %[
publisher #{PUBLISHER}
subkey test1.,test2.
]


def create_driver(conf=CONFIG)
Fluent::Test::InputTestDriver.new(Fluent::ZmqSubInput).configure(conf)
end

def test_configure
d = create_driver(CONFIG + "bulk_send true")
assert_equal PUBLISHER, d.instance.publisher
assert_equal ["test1.","test2."], d.instance.subkeys
assert_equal true, d.instance.bulk_send
end

def test_receive
d = create_driver

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
Fluent::Engine.now = time

d.expect_emit "test1.aa", time, {"a"=>1}
d.expect_emit "test2.bb", time, {"a"=>2}

d.run do
d.expected_emits.each {|tag,time,record|
send_record("dummy",time,record) # This record should not be received.
send_record(tag,time,record)
}
sleep 1
end
end

def test_no_subkey
d = create_driver("")

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
Fluent::Engine.now = time

d.expect_emit "test1.aa", time, {"a"=>1}
d.expect_emit "test2.bb", time, {"a"=>2}

d.run do
d.expected_emits.each {|tag,time,record|
send_record(tag,time,record)
}
sleep 1
end
end


def test_receive_bulk
d = create_driver(CONFIG + "bulk_send true")

time = Time.parse("2011-01-02 13:14:15 UTC").to_i
Fluent::Engine.now = time

d.expect_emit "test3.aa", time, {"a"=>1}
d.expect_emit "test4.bb", time, {"a"=>2}

d.run do
record_to_send = []
d.expected_emits.each {|tag,time,record|
record_to_send << [tag,time,record]
}
send_record_bulk("test1.aa",record_to_send)
sleep 1
end
end

def send_record(tag,time,record)
@publisher.send_string(tag + " " + [tag,time,record].to_msgpack)
end

def send_record_bulk(tag,records)
@publisher.send_string(tag + " " + records.to_msgpack)
end
end

0 comments on commit 68b9960

Please sign in to comment.