diff --git a/lib/logstash/inputs/log4j.rb b/lib/logstash/inputs/log4j.rb index f5af436..d2bbb76 100644 --- a/lib/logstash/inputs/log4j.rb +++ b/lib/logstash/inputs/log4j.rb @@ -41,6 +41,10 @@ class LogStash::Inputs::Log4j < LogStash::Inputs::Base # When mode is `client`, the port to connect to. config :port, :validate => :number, :default => 4560 + # Proxy protocol support, only v1 is supported at this time + # http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt + config :proxy_protocol, :validate => :boolean, :default => false + # Read timeout in seconds. If a particular TCP connection is # idle for more than this timeout period, we will assume # it is dead and close it. @@ -100,10 +104,25 @@ def create_event(log4j_obj) private def handle_socket(socket, output_queue) begin + peer = socket.peer + if @proxy_protocol + pp_hdr = socket.readline + pp_info = pp_hdr.split(/\s/) + + # PROXY proto clientip proxyip clientport proxyport + if pp_info[0] != "PROXY" + @logger.error("invalid proxy protocol header label", :hdr => pp_hdr) + return + else + # would be nice to log the proxy host and port data as well, but minimizing changes + peer = pp_info[2] + end + end ois = socket_to_inputstream(socket) + while !stop? event = create_event(ois.readObject) - event.set("host", socket.peer) + event.set("host", peer) decorate(event) output_queue << event end # loop do diff --git a/logstash-input-log4j.gemspec b/logstash-input-log4j.gemspec index f5f6cb9..c2c1055 100644 --- a/logstash-input-log4j.gemspec +++ b/logstash-input-log4j.gemspec @@ -26,5 +26,6 @@ Gem::Specification.new do |s| s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_development_dependency 'logstash-devutils' + s.add_development_dependency 'flores' end diff --git a/spec/fixtures/log4j.payload b/spec/fixtures/log4j.payload new file mode 100644 index 0000000..c801858 Binary files /dev/null and b/spec/fixtures/log4j.payload differ diff --git a/spec/inputs/log4j_spec.rb b/spec/inputs/log4j_spec.rb index 6c437b4..fc82898 100644 --- a/spec/inputs/log4j_spec.rb +++ b/spec/inputs/log4j_spec.rb @@ -1,16 +1,20 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" +require "socket" require "logstash/inputs/log4j" require "logstash/plugin" +require "stud/try" +require "stud/task" +require 'timeout' +require "flores/random" describe LogStash::Inputs::Log4j do it "should register" do - input = LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") - + plugin = LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") # register will try to load jars and raise if it cannot find jars or if org.apache.log4j.spi.LoggingEvent class is not present - expect {input.register}.to_not raise_error + expect {plugin.register}.to_not raise_error end context "when interrupting the plugin in server mode" do @@ -35,7 +39,7 @@ end context "reading general information from a org.apache.log4j.spi.LoggingEvent" do - let (:input) { LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") } + let (:plugin) { LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") } let (:log_obj) { org.apache.log4j.spi.LoggingEvent.new( "org.apache.log4j.Logger", @@ -59,7 +63,7 @@ } it "creates event with general information" do - subject = input.create_event(log_obj) + subject = plugin.create_event(log_obj) expect(subject.get("timestamp")).to eq(1426366971) expect(subject.get("path")).to eq("org.apache.log4j.LayoutTest") expect(subject.get("priority")).to eq("INFO") @@ -74,14 +78,103 @@ end it "creates event without stacktrace" do - subject = input.create_event(log_obj) + subject = plugin.create_event(log_obj) expect(subject.get("stack_trace")).to be_nil end it "creates event with stacktrace" do - subject = input.create_event(log_obj_with_stacktrace) + subject = plugin.create_event(log_obj_with_stacktrace) #checks stack_trace is collected, exact value is too monstruous expect(subject.get("stack_trace")).not_to be_empty end end + + context "integration test" do + let(:host) { "127.0.0.1" } + let(:port) do + socket, address, port = Flores::Random.tcp_listener + socket.close + port + end + + let(:config) do + { + "host" => host, + "port" => port + } + end + + subject { LogStash::Inputs::Log4j.new(config) } + + before do + subject.register + end + + let(:thread) do + Thread.new { subject.run(queue) } + end + + let(:queue) do + [] + end + + let(:client) do + Stud.try(5.times) { TCPSocket.new(host, port) } + end + + after do + subject.do_stop + + 10.times do + break unless thread.alive? + sleep(0.1) + end + expect(thread).not_to be_alive + end + + shared_examples "accept events from the network" do |fixture| + before do + thread # make the thread run + File.open(fixture, "rb") do |payload| + IO.copy_stream(payload, client) + end + client.close + + Stud.try(5.times) do + throw StandardError.new("queue was empty, no data?") if queue.empty? + end + expect(queue.size).to be == 1 + end + + it "should accept an event from the network" do + event = queue.first + expect(event.get("message")).to be == "Hello world" + end + end + + context "default behavior" do + include_examples "accept events from the network", "spec/fixtures/log4j.payload" + end + + context "with proxy enabled" do + let(:config) do + { + "host" => host, + "port" => port, + "proxy_protocol" => true + } + end + + before do + client.write("PROXY TCP4 1.2.3.4 5.6.7.8 1234 5678\r\n") + end + + include_examples "accept events from the network", "spec/fixtures/log4j.payload" do + it "should set proxy_host and proxy_port" do + event = queue.first + expect(event.get("host")).to be == "1.2.3.4" + end + end + end + end end diff --git a/testdata/log4jtest/Makefile b/testdata/log4jtest/Makefile new file mode 100644 index 0000000..d28534b --- /dev/null +++ b/testdata/log4jtest/Makefile @@ -0,0 +1,5 @@ +sender.class: sender.java + javac -cp log4j.jar sender.java + +test: + java -cp .:log4j.jar sender diff --git a/testdata/log4jtest/README.md b/testdata/log4jtest/README.md new file mode 100644 index 0000000..b950e94 --- /dev/null +++ b/testdata/log4jtest/README.md @@ -0,0 +1,10 @@ +log4jtest +========= + +copy log4j.jar into current dir +make +make test + +will connect to localhost:2518 and send a logging message + +To capture: nc -l 2518 > log4j.capture diff --git a/testdata/log4jtest/log4j.properties b/testdata/log4jtest/log4j.properties new file mode 100644 index 0000000..ab536fc --- /dev/null +++ b/testdata/log4jtest/log4j.properties @@ -0,0 +1,20 @@ +log4j.rootLogger=DEBUG, A1, LOGSTASH +log4j.appender.A1=org.apache.log4j.ConsoleAppender +log4j.appender.A1.layout=org.apache.log4j.PatternLayout + +log4j.appender.LOGSTASH=org.apache.log4j.net.SocketAppender +log4j.appender.LOGSTASH.Port=2518 +log4j.appender.LOGSTASH.RemoteHost=localhost +log4j.appender.LOGSTASH.ReconnectionDelay=500 + +# Print the date in ISO 8601 format +log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n + +#Set the threshold for each appender +log4j.appender.LOGSTASH.Threshold=DEBUG +log4j.appender.A1.Threshold=DEBUG + +log4j.logger.org.apache.http=WARN + +log4j.logger.org.apache.pdfbox=ERROR + diff --git a/testdata/log4jtest/sender.java b/testdata/log4jtest/sender.java new file mode 100644 index 0000000..e9dcb26 --- /dev/null +++ b/testdata/log4jtest/sender.java @@ -0,0 +1,14 @@ +import org.apache.log4j.Logger; + +import java.io.*; +import java.util.*; + +public class sender { + + /* Get actual class name to be printed on */ + static Logger log = Logger.getLogger(sender.class.getName()); + + public static void main(String[] args)throws IOException { + log.debug("Hello"); + } +}