diff --git a/lib/logstash/inputs/log4j.rb b/lib/logstash/inputs/log4j.rb index f5af436..d2bbb76 100644 --- a/lib/logstash/inputs/log4j.rb +++ b/lib/logstash/inputs/log4j.rb @@ -41,6 +41,10 @@ class LogStash::Inputs::Log4j < LogStash::Inputs::Base # When mode is `client`, the port to connect to. config :port, :validate => :number, :default => 4560 + # Proxy protocol support, only v1 is supported at this time + # http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt + config :proxy_protocol, :validate => :boolean, :default => false + # Read timeout in seconds. If a particular TCP connection is # idle for more than this timeout period, we will assume # it is dead and close it. @@ -100,10 +104,25 @@ def create_event(log4j_obj) private def handle_socket(socket, output_queue) begin + peer = socket.peer + if @proxy_protocol + pp_hdr = socket.readline + pp_info = pp_hdr.split(/\s/) + + # PROXY proto clientip proxyip clientport proxyport + if pp_info[0] != "PROXY" + @logger.error("invalid proxy protocol header label", :hdr => pp_hdr) + return + else + # would be nice to log the proxy host and port data as well, but minimizing changes + peer = pp_info[2] + end + end ois = socket_to_inputstream(socket) + while !stop? event = create_event(ois.readObject) - event.set("host", socket.peer) + event.set("host", peer) decorate(event) output_queue << event end # loop do diff --git a/spec/inputs/log4j_spec.rb b/spec/inputs/log4j_spec.rb index 6c437b4..61290df 100644 --- a/spec/inputs/log4j_spec.rb +++ b/spec/inputs/log4j_spec.rb @@ -1,16 +1,20 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" +require "socket" require "logstash/inputs/log4j" require "logstash/plugin" +require "stud/try" +require "stud/task" +require 'timeout' describe LogStash::Inputs::Log4j do it "should register" do - input = LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") + plugin = LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") # register will try to load jars and raise if it cannot find jars or if org.apache.log4j.spi.LoggingEvent class is not present - expect {input.register}.to_not raise_error + expect {plugin.register}.to_not raise_error end context "when interrupting the plugin in server mode" do @@ -35,7 +39,7 @@ end context "reading general information from a org.apache.log4j.spi.LoggingEvent" do - let (:input) { LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") } + let (:plugin) { LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") } let (:log_obj) { org.apache.log4j.spi.LoggingEvent.new( "org.apache.log4j.Logger", @@ -59,7 +63,7 @@ } it "creates event with general information" do - subject = input.create_event(log_obj) + subject = plugin.create_event(log_obj) expect(subject.get("timestamp")).to eq(1426366971) expect(subject.get("path")).to eq("org.apache.log4j.LayoutTest") expect(subject.get("priority")).to eq("INFO") @@ -74,14 +78,52 @@ end it "creates event without stacktrace" do - subject = input.create_event(log_obj) + subject = plugin.create_event(log_obj) expect(subject.get("stack_trace")).to be_nil end it "creates event with stacktrace" do - subject = input.create_event(log_obj_with_stacktrace) + subject = plugin.create_event(log_obj_with_stacktrace) #checks stack_trace is collected, exact value is too monstruous expect(subject.get("stack_trace")).not_to be_empty end end + + context "full socket tests" do + it "should instantiate with port and let us send content" do + p "starting my test" + port = rand(1024..65535) + + conf = <<-CONFIG + input { + log4j { + mode => "server" + port => #{port} + } + } + CONFIG + p conf + + p "before pipeline" + events = input(conf) do |pipeline, queue| + + p "before socket" + socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } + data = File.binread("testdata/log4j.capture") + socket.puts(data) + socket.flush + socket.close + + p "before collect" + static = Timeout::timeout(5) { + 1.times.collect { queue.pop } + } + end + p "after pipeline" + + p "after loop" + insist { events.length } == 1 + insist { events[0].get("logger_name") } == "sender" + end + end end diff --git a/testdata/gen-proxyproto.pl b/testdata/gen-proxyproto.pl new file mode 100755 index 0000000..6752a0f --- /dev/null +++ b/testdata/gen-proxyproto.pl @@ -0,0 +1,9 @@ +#!/usr/bin/perl + +open(my $s, ">log4j-proxyproto.capture"); +print $s "PROXY TCP4 1.2.3.4 127.0.0.1 12345 2456\r\n"; +open(my $in, " log4j.capture diff --git a/testdata/log4jtest/log4j.properties b/testdata/log4jtest/log4j.properties new file mode 100644 index 0000000..ab536fc --- /dev/null +++ b/testdata/log4jtest/log4j.properties @@ -0,0 +1,20 @@ +log4j.rootLogger=DEBUG, A1, LOGSTASH +log4j.appender.A1=org.apache.log4j.ConsoleAppender +log4j.appender.A1.layout=org.apache.log4j.PatternLayout + +log4j.appender.LOGSTASH=org.apache.log4j.net.SocketAppender +log4j.appender.LOGSTASH.Port=2518 +log4j.appender.LOGSTASH.RemoteHost=localhost +log4j.appender.LOGSTASH.ReconnectionDelay=500 + +# Print the date in ISO 8601 format +log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n + +#Set the threshold for each appender +log4j.appender.LOGSTASH.Threshold=DEBUG +log4j.appender.A1.Threshold=DEBUG + +log4j.logger.org.apache.http=WARN + +log4j.logger.org.apache.pdfbox=ERROR + diff --git a/testdata/log4jtest/sender.java b/testdata/log4jtest/sender.java new file mode 100644 index 0000000..e9dcb26 --- /dev/null +++ b/testdata/log4jtest/sender.java @@ -0,0 +1,14 @@ +import org.apache.log4j.Logger; + +import java.io.*; +import java.util.*; + +public class sender { + + /* Get actual class name to be printed on */ + static Logger log = Logger.getLogger(sender.class.getName()); + + public static void main(String[] args)throws IOException { + log.debug("Hello"); + } +}