Permalink
0693f4a May 3, 2016
@avishai-ish-shalom @suyograo @electrical @ph @talevy @jsvd
187 lines (165 sloc) 5.23 KB
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "socket"
# Read events over a 0MQ SUB socket.
#
# You need to have the 0mq 2.1.x library installed to be able to use
# this input plugin.
#
# The default settings will create a subscriber binding to `tcp://127.0.0.1:2120`
# waiting for connecting publishers.
#
class LogStash::Inputs::ZeroMQ < LogStash::Inputs::Base
config_name "zeromq"
default :codec, "json"
# 0mq socket address to connect or bind
# Please note that `inproc://` will not work with logstash
# as each we use a context per thread.
# By default, inputs bind/listen
# and outputs connect
config :address, :validate => :array, :default => ["tcp://*:2120"]
# 0mq topology
# The default logstash topologies work as follows:
#
# * pushpull - inputs are pull, outputs are push
# * pubsub - inputs are subscribers, outputs are publishers
# * pair - inputs are clients, inputs are servers
#
# If the predefined topology flows don't work for you,
# you can change the `mode` setting
# TODO (lusis) add req/rep MAYBE
# TODO (lusis) add router/dealer
config :topology, :validate => ["pushpull", "pubsub", "pair"], :required => true
# 0mq topic
# This is used for the `pubsub` topology only
# On inputs, this allows you to filter messages by topic
# On outputs, this allows you to tag a message for routing
# NOTE: ZeroMQ does subscriber side filtering.
# NOTE: All topics have an implicit wildcard at the end
# You can specify multiple topics here
config :topic, :validate => :array
# Event topic field
# This is used for the `pubsub` topology only
# When a message is received on a topic, the topic name on which
# the message was received will saved in this field.
config :topic_field, :validate => :string, :default => "topic"
# mode
# server mode binds/listens
# client mode connects
config :mode, :validate => ["server", "client"], :default => "server"
# sender
# overrides the sender to
# set the source of the event
# default is `zmq+topology://type/`
config :sender, :validate => :string
# 0mq socket options
# This exposes `zmq_setsockopt`
# for advanced tuning
# see http://api.zeromq.org/2-1:zmq-setsockopt for details
#
# This is where you would set values like:
#
# * `ZMQ::HWM` - high water mark
# * `ZMQ::IDENTITY` - named queues
# * `ZMQ::SWAP_SIZE` - space for disk overflow
#
# Example:
# [source,ruby]
# sockopt => {
# "ZMQ::HWM" => 50
# "ZMQ::IDENTITY" => "my_named_queue"
# }
#
# defaults to: `sockopt => { "ZMQ::RCVTIMEO" => "1000" }`, which has the effect of "interrupting"
# the recv operation at least once every second to allow for properly shutdown handling.
config :sockopt, :validate => :hash, :default => { "ZMQ::RCVTIMEO" => "1000" }
public
def register
require "ffi-rzmq"
require "logstash/plugin_mixins/zeromq"
self.class.send(:include, LogStash::PluginMixins::ZeroMQ)
@host = Socket.gethostname
init_socket
end # def register
def init_socket
case @topology
when "pair"
zmq_const = ZMQ::PAIR
when "pushpull"
zmq_const = ZMQ::PULL
when "pubsub"
zmq_const = ZMQ::SUB
end # case socket_type
@zsocket = context.socket(zmq_const)
error_check(@zsocket.setsockopt(ZMQ::LINGER, 1),
"while setting ZMQ::LINGER == 1)")
if @sockopt
setopts(@zsocket, @sockopt)
end
@address.each do |addr|
setup(@zsocket, addr)
end
if @topology == "pubsub"
if @topic.nil?
@logger.debug("ZMQ - No topic provided. Subscribing to all messages")
error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, ""),
"while setting ZMQ::SUBSCRIBE")
else
@topic.each do |t|
@logger.debug("ZMQ subscribing to topic: #{t}")
error_check(@zsocket.setsockopt(ZMQ::SUBSCRIBE, t),
"while setting ZMQ::SUBSCRIBE == #{t}")
end
end
end
end
def close
begin
error_check(@zsocket.close, "while closing the zmq socket")
context.terminate
rescue RuntimeError => e
@logger.error("Failed to properly teardown ZeroMQ")
end
end # def close
def server?
@mode == "server"
end # def server?
def run(output_queue)
begin
while !stop?
handle_message(output_queue)
end
rescue => e
@logger.debug? && @logger.debug("ZMQ Error", :subscriber => @zsocket,
:exception => e)
retry
end # begin
end # def run
private
def build_source_string
id = @address.first.clone
end
def handle_message(output_queue)
# Here's the unified receiver
more = true
parts = []
rc = @zsocket.recv_strings(parts)
error_check(rc, "in recv_strings", true)
return unless ZMQ::Util.resultcode_ok?(rc)
if @topology == "pubsub" && parts.length > 1
# assume topic is a simple string
topic, *parts = parts
else
topic = nil
end
parts.each do |msg|
@codec.decode(msg) do |event|
event.set("host", event.get("host") || @host)
event.set(@topic_field, topic.force_encoding('UTF-8')) unless topic.nil?
decorate(event)
output_queue << event
end
end
end
end # class LogStash::Inputs::ZeroMQ