fluentd plugin to publish/subscribe records through ZeroMQ.
Ruby
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
lib/fluent/plugin
test
.gitignore
Gemfile
LICENSE
README.rdoc
Rakefile
fluent-plugin-zmq-pub.gemspec

README.rdoc

fluent-plugin-zmq-pub

Overview

Fluentd plugin to publish records to ZeroMQ.

Why this plugin was created?

Sometimes I wanted to 'sniff' fluentd stream – running my own programs to the stream without changing fluentd configuration and restarting fluentd. With this plugin, fluentd records are always published to ZeroMQ regardless of the existance of subscriber. After that I can start and stop my subcriber programs at any time.

Dependence

This plugin use ffi-rzmq to support ZMQ, and need v3.2 or greater version of ZMQ library installed in your system.

Installation

You need to install ZeroMQ libraries before installing this plugin

## (RedHat/CentOS)
# yum install zeromq3 zeromq3-devel
# fluent-gem install fluent-plugin-zmq-pub

Configuration

<match zmq.**>
  type zmq_pub
  pubkey ${tag}:${key1}
  bindaddr tcp://*:5556
  flush_interval 1s
  bulk_send true
</match>
  • 'pubkey' specifies the publish key to ZeroMQ.

    • '${tag}' is replace by fluentd tag. '${name}' is replaced by fluentd record.

    • Actual record to be published is '<pubkey> <reocord.to_msgpack>'.

    • Subscriber can subscribe by '<pubkey>'.

  • 'bindaddr' is the address to which ZeroMQ publisher socket to be bound.

  • If 'bulk_send' is set to true, send multiple records with the same publish key in one 'send_string' method. This improves the performance.

Example usage

Put the configuration above to fluentd.conf, and save this sample code as 'sample_sub.rb'.

#!/usr/bin/env ruby

require 'ffi-rzmq'
require 'msgpack'

context = ZMQ::Context.new(1)
subscriber = context.socket(ZMQ::SUB)
subscriber.connect("tcp://localhost:5556")

if ARGV.length > 0
  ARGV.each{|s|
    subscriber.setsockopt(ZMQ::SUBSCRIBE,s)
  }
else
  subscriber.setsockopt(ZMQ::SUBSCRIBE,"")
end

while true
  msg = ''
  while subscriber.recv_string(msg,ZMQ::DONTWAIT) && msg.size > 0
    record =  MessagePack.unpack(msg.split(" ",2)[1])
    puts "tag: #{record[0]}"
    puts "time: #{record[1]}"
    puts "record: #{record[2]}"
    msg = ''
  end
  sleep(0.1)
end

Run sample_sub.rb. Argument is the key to subscribe. (Correspond to 'pubkey' in zmq_pub configuration). If you give no arguments, all key will be subscribed.

% ./sample_sub.rb zmq.test.tag:aaa

Submit records to fluentd.

% echo '{"key1": "aaa", "key2":"foo"}' | fluent-cat zmq.test.tag
% echo '{"key1": "bbb", "key2":"foo"}' | fluent-cat zmq.test.tag

Then you will get the following output from sample_sub.rb

tag: zmq.test.tag
time: 1376033265
record: {"key1"=>"aaa", "key2"=>"foo"}

(You should not get the second record(“key1”:“bbb”) because the publish key to zmq was “zmq.test.tag:bbb” and specified subscibe key was “zmq.test.tag:aaa”)

The nice thing is that once you put this plugin to your fluentd.conf and start fluentd, you can start and stop any subscriber programs without changing fluentd configuration.

zmq_sub input plugin

Input plugin to subscribe the output of zmq_pub is also included. Here is the example configuration.

<source>
   type zmq_sub
   publisher tcp://127.0.0.1:5556
   bulk_send true
   subkey zmq.,zmq2
</source>
  • If zmq_pub set `bulk_send` to true, zmq_sub also set it to true.

  • `subkey` is a comma separated list of keys to subscribe. In this example, keys starting “zmq.” or “zmq2.” will be subscribed.

Copyright

  • Copyright © 2013- OGIBAYASHI Hironori (@angostura11)

  • License

    • Apache License, Version 2.0