From d4e7082f6ac884058186662b9c45b05eb3e9ad22 Mon Sep 17 00:00:00 2001 From: Nathan Neulinger Date: Sun, 30 Oct 2016 10:43:26 -0500 Subject: [PATCH 1/9] Add support for proxy protocol --- lib/logstash/inputs/log4j.rb | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/lib/logstash/inputs/log4j.rb b/lib/logstash/inputs/log4j.rb index f5af436..cf79cd0 100644 --- a/lib/logstash/inputs/log4j.rb +++ b/lib/logstash/inputs/log4j.rb @@ -41,6 +41,10 @@ class LogStash::Inputs::Log4j < LogStash::Inputs::Base # When mode is `client`, the port to connect to. config :port, :validate => :number, :default => 4560 + # Proxy protocol support, only v1 is supported at this time + # http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt + config :proxy_protocol, :validate => :boolean, :default => false + # Read timeout in seconds. If a particular TCP connection is # idle for more than this timeout period, we will assume # it is dead and close it. @@ -100,10 +104,25 @@ def create_event(log4j_obj) private def handle_socket(socket, output_queue) begin + peer = socket.peer + if @proxy_protocol + pp_hdr = socket.readline + pp_info = pp_hdr.split(/\s/) + + # PROXY proto clientip proxyip clientport proxyport + if pp_info[0] != "PROXY" + @logger.error("invalid proxy protocol header label", :hdr => pp_hdr) + raise IOError + else + # would be nice to log the proxy host and port data as well, but minimizing changes + peer = pp_info[2] + end + end ois = socket_to_inputstream(socket) + while !stop? event = create_event(ois.readObject) - event.set("host", socket.peer) + event.set("host", peer) decorate(event) output_queue << event end # loop do From ce3b6ca7381c14baafd443b6aff9e3288b31de02 Mon Sep 17 00:00:00 2001 From: Nathan Neulinger Date: Sat, 19 Nov 2016 15:16:47 -0600 Subject: [PATCH 2/9] return instead of killing entire process --- lib/logstash/inputs/log4j.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/inputs/log4j.rb b/lib/logstash/inputs/log4j.rb index cf79cd0..d2bbb76 100644 --- a/lib/logstash/inputs/log4j.rb +++ b/lib/logstash/inputs/log4j.rb @@ -112,7 +112,7 @@ def handle_socket(socket, output_queue) # PROXY proto clientip proxyip clientport proxyport if pp_info[0] != "PROXY" @logger.error("invalid proxy protocol header label", :hdr => pp_hdr) - raise IOError + return else # would be nice to log the proxy host and port data as well, but minimizing changes peer = pp_info[2] From 8e06b89f39a01e2f97f1f389805a6b958f6fd203 Mon Sep 17 00:00:00 2001 From: Nathan Neulinger Date: Thu, 22 Dec 2016 13:02:33 -0600 Subject: [PATCH 3/9] add sample test data for log4j event --- testdata/gen-proxyproto.pl | 9 +++++++++ testdata/log4j-proxyproto.capture | Bin 0 -> 435 bytes testdata/log4j.capture | Bin 0 -> 394 bytes testdata/log4jtest/Makefile | 5 +++++ testdata/log4jtest/README.md | 10 ++++++++++ testdata/log4jtest/log4j.properties | 20 ++++++++++++++++++++ testdata/log4jtest/sender.java | 14 ++++++++++++++ 7 files changed, 58 insertions(+) create mode 100755 testdata/gen-proxyproto.pl create mode 100644 testdata/log4j-proxyproto.capture create mode 100644 testdata/log4j.capture create mode 100644 testdata/log4jtest/Makefile create mode 100644 testdata/log4jtest/README.md create mode 100644 testdata/log4jtest/log4j.properties create mode 100644 testdata/log4jtest/sender.java diff --git a/testdata/gen-proxyproto.pl b/testdata/gen-proxyproto.pl new file mode 100755 index 0000000..6752a0f --- /dev/null +++ b/testdata/gen-proxyproto.pl @@ -0,0 +1,9 @@ +#!/usr/bin/perl + +open(my $s, ">log4j-proxyproto.capture"); +print $s "PROXY TCP4 1.2.3.4 127.0.0.1 12345 2456\r\n"; +open(my $in, "TE>-`-_n>_IP!;dLR~>6cRSYLs5&x2Ar&ckB_EZA zqK|6sO3huWxv1v)8xVflUJF^suX>JcB-|!V29`buhC~|GM(_+NANCECAw%ICmL!z3 zb?n%IVKiK*t&X~2`}l|ElyEo>#rky>jD*(MVLH_OB{ Nl6sl}C>`k8{1^N}j^6+P literal 0 HcmV?d00001 diff --git a/testdata/log4j.capture b/testdata/log4j.capture new file mode 100644 index 0000000000000000000000000000000000000000..2713fdc8f35c698a11fe0b79d3fef49c7153862e GIT binary patch literal 394 zcmZwCJ5B>J5C-6hl7I*x9#Rf~QlvmyH*_f|5K=&fg3yp|*d4qMUfc0bUQKFFLPbl# zaX0`CTma`$1PPbFe$RaK_67Aaz=;bgL62!piE-*|kd$60(kZ2_x*U_`pYM+r@4lZ` zq0xtCk);>T&!ltDBY#K3kq(sg;mBs`-|QN8SQpe|EWCuh6q%F@vs)~P;aCoEjKZK* zqQ@bw>coz5v8A28vQLhoB_mC&XktkdktXCuDSp~sgBY5tnIqOlbg|4C9}UGuHPVJK zhl+%QKsKWYzM-;2MeDf1c_7Rdv~{u{6E|55SXoa0x7=^GLb1emrXJ#}JhVSwUKzH^ a log4j.capture diff --git a/testdata/log4jtest/log4j.properties b/testdata/log4jtest/log4j.properties new file mode 100644 index 0000000..ab536fc --- /dev/null +++ b/testdata/log4jtest/log4j.properties @@ -0,0 +1,20 @@ +log4j.rootLogger=DEBUG, A1, LOGSTASH +log4j.appender.A1=org.apache.log4j.ConsoleAppender +log4j.appender.A1.layout=org.apache.log4j.PatternLayout + +log4j.appender.LOGSTASH=org.apache.log4j.net.SocketAppender +log4j.appender.LOGSTASH.Port=2518 +log4j.appender.LOGSTASH.RemoteHost=localhost +log4j.appender.LOGSTASH.ReconnectionDelay=500 + +# Print the date in ISO 8601 format +log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n + +#Set the threshold for each appender +log4j.appender.LOGSTASH.Threshold=DEBUG +log4j.appender.A1.Threshold=DEBUG + +log4j.logger.org.apache.http=WARN + +log4j.logger.org.apache.pdfbox=ERROR + diff --git a/testdata/log4jtest/sender.java b/testdata/log4jtest/sender.java new file mode 100644 index 0000000..e9dcb26 --- /dev/null +++ b/testdata/log4jtest/sender.java @@ -0,0 +1,14 @@ +import org.apache.log4j.Logger; + +import java.io.*; +import java.util.*; + +public class sender { + + /* Get actual class name to be printed on */ + static Logger log = Logger.getLogger(sender.class.getName()); + + public static void main(String[] args)throws IOException { + log.debug("Hello"); + } +} From ccf76fb6853f9b2b180359c449a9f033302c0068 Mon Sep 17 00:00:00 2001 From: Nathan Neulinger Date: Thu, 22 Dec 2016 13:50:18 -0600 Subject: [PATCH 4/9] draft of attempted testing --- spec/inputs/log4j_spec.rb | 50 ++++++++++++++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/spec/inputs/log4j_spec.rb b/spec/inputs/log4j_spec.rb index 6c437b4..0207300 100644 --- a/spec/inputs/log4j_spec.rb +++ b/spec/inputs/log4j_spec.rb @@ -1,16 +1,19 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" +require "socket" require "logstash/inputs/log4j" require "logstash/plugin" +require "stud/try" +require "stud/task" describe LogStash::Inputs::Log4j do it "should register" do - input = LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") + plugin = LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") # register will try to load jars and raise if it cannot find jars or if org.apache.log4j.spi.LoggingEvent class is not present - expect {input.register}.to_not raise_error + expect {plugin.register}.to_not raise_error end context "when interrupting the plugin in server mode" do @@ -35,7 +38,7 @@ end context "reading general information from a org.apache.log4j.spi.LoggingEvent" do - let (:input) { LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") } + let (:plugin) { LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") } let (:log_obj) { org.apache.log4j.spi.LoggingEvent.new( "org.apache.log4j.Logger", @@ -59,7 +62,7 @@ } it "creates event with general information" do - subject = input.create_event(log_obj) + subject = plugin.create_event(log_obj) expect(subject.get("timestamp")).to eq(1426366971) expect(subject.get("path")).to eq("org.apache.log4j.LayoutTest") expect(subject.get("priority")).to eq("INFO") @@ -74,14 +77,49 @@ end it "creates event without stacktrace" do - subject = input.create_event(log_obj) + subject = plugin.create_event(log_obj) expect(subject.get("stack_trace")).to be_nil end it "creates event with stacktrace" do - subject = input.create_event(log_obj_with_stacktrace) + subject = plugin.create_event(log_obj_with_stacktrace) #checks stack_trace is collected, exact value is too monstruous expect(subject.get("stack_trace")).not_to be_empty end + + + it "should instantiate with port and let us send content" do + p "starting my test" + port = rand(1024..65535) + event_counts = 5 + conf = <<-CONFIG + input { + log4j { + port => #{port} + } + } + CONFIG + + events = input(conf) do |pipeline, queue| + p "in pipeline" + event_counts.times do |i| + p "in event count loop" + socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } + p "in event count loop after socket" + data = File.read("testdata/log4j.capture") + p data + socket.puts(data) + socket.flush + socket.close + end + + p "collect" + event_counts.times.collect { queue.pop } + end + + p "after loop" + insist { events.length } == event_counts + insist { events[0].get("message") } == "whatever" + end end end From 37d47722c2f757a263c944a76fe0954aa809cfa9 Mon Sep 17 00:00:00 2001 From: Nathan Neulinger Date: Thu, 22 Dec 2016 14:11:32 -0600 Subject: [PATCH 5/9] another variant, still hanging --- spec/inputs/log4j_spec.rb | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/spec/inputs/log4j_spec.rb b/spec/inputs/log4j_spec.rb index 0207300..04eb88b 100644 --- a/spec/inputs/log4j_spec.rb +++ b/spec/inputs/log4j_spec.rb @@ -91,35 +91,35 @@ it "should instantiate with port and let us send content" do p "starting my test" port = rand(1024..65535) - event_counts = 5 + conf = <<-CONFIG input { log4j { + mode => "server" port => #{port} } } CONFIG + p conf + p "before pipeline" events = input(conf) do |pipeline, queue| - p "in pipeline" - event_counts.times do |i| - p "in event count loop" - socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } - p "in event count loop after socket" - data = File.read("testdata/log4j.capture") - p data - socket.puts(data) - socket.flush - socket.close - end - - p "collect" - event_counts.times.collect { queue.pop } + + p "before socket" + socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } + data = File.read("testdata/log4j.capture") + socket.puts(data) + socket.flush + socket.close + + p "before collect" + 1.times.collect { queue.pop } end + p "after pipeline" p "after loop" - insist { events.length } == event_counts - insist { events[0].get("message") } == "whatever" + insist { events.length } == 1 + insist { events[0].get("logger_name") } == "sender" end end end From 7c4fb9a840034981f16dd24aa98e946da7af50fb Mon Sep 17 00:00:00 2001 From: Nathan Neulinger Date: Thu, 22 Dec 2016 14:23:16 -0600 Subject: [PATCH 6/9] still doesn't work but doesn't hang at least --- spec/inputs/log4j_spec.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spec/inputs/log4j_spec.rb b/spec/inputs/log4j_spec.rb index 04eb88b..236d31d 100644 --- a/spec/inputs/log4j_spec.rb +++ b/spec/inputs/log4j_spec.rb @@ -86,8 +86,9 @@ #checks stack_trace is collected, exact value is too monstruous expect(subject.get("stack_trace")).not_to be_empty end + end - + context "full socket tests" do it "should instantiate with port and let us send content" do p "starting my test" port = rand(1024..65535) @@ -112,8 +113,8 @@ socket.flush socket.close - p "before collect" - 1.times.collect { queue.pop } + #p "before collect" + #1.times.collect { queue.pop } end p "after pipeline" From f748dd5384c0668d3ee8c8372a12c61cceca94a0 Mon Sep 17 00:00:00 2001 From: Nathan Neulinger Date: Thu, 22 Dec 2016 14:42:26 -0600 Subject: [PATCH 7/9] add a timeout --- spec/inputs/log4j_spec.rb | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/spec/inputs/log4j_spec.rb b/spec/inputs/log4j_spec.rb index 236d31d..61290df 100644 --- a/spec/inputs/log4j_spec.rb +++ b/spec/inputs/log4j_spec.rb @@ -5,6 +5,7 @@ require "logstash/plugin" require "stud/try" require "stud/task" +require 'timeout' describe LogStash::Inputs::Log4j do @@ -108,13 +109,15 @@ p "before socket" socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } - data = File.read("testdata/log4j.capture") + data = File.binread("testdata/log4j.capture") socket.puts(data) socket.flush socket.close - #p "before collect" - #1.times.collect { queue.pop } + p "before collect" + static = Timeout::timeout(5) { + 1.times.collect { queue.pop } + } end p "after pipeline" From 464a3c559f78cd6a1833570ffcc70fcc4a2a571c Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Mon, 23 Jan 2017 12:08:32 -0800 Subject: [PATCH 8/9] Restructure the integration test to avoid `input` helper This change makes the plugin be invoked directly with the `run` method. --- logstash-input-log4j.gemspec | 1 + spec/fixtures/log4j.payload | Bin 0 -> 397 bytes spec/inputs/log4j_spec.rb | 117 +++++++++++++++++++++++++---------- 3 files changed, 85 insertions(+), 33 deletions(-) create mode 100644 spec/fixtures/log4j.payload diff --git a/logstash-input-log4j.gemspec b/logstash-input-log4j.gemspec index f5f6cb9..c2c1055 100644 --- a/logstash-input-log4j.gemspec +++ b/logstash-input-log4j.gemspec @@ -26,5 +26,6 @@ Gem::Specification.new do |s| s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" s.add_development_dependency 'logstash-devutils' + s.add_development_dependency 'flores' end diff --git a/spec/fixtures/log4j.payload b/spec/fixtures/log4j.payload new file mode 100644 index 0000000000000000000000000000000000000000..c80185857ebac04fe355f268e33e2fca93eeefb8 GIT binary patch literal 397 zcmZwBzfJ-{5C-reV2q$Kn$T+~O>QmhC@hQy&1xYw(ye#nvbeXi?9Bn!_yj(Rtp)Gk ztJvrZxcn(Famp{>%>3TJV5Egi3a`M0BVY3zTCB8fJ5U2L|IhSt?^(!Lp aJwvO9*3!|Gg3TBjxiZdA8vy00pH;sNrHB;( literal 0 HcmV?d00001 diff --git a/spec/inputs/log4j_spec.rb b/spec/inputs/log4j_spec.rb index 61290df..fc82898 100644 --- a/spec/inputs/log4j_spec.rb +++ b/spec/inputs/log4j_spec.rb @@ -6,13 +6,13 @@ require "stud/try" require "stud/task" require 'timeout' +require "flores/random" describe LogStash::Inputs::Log4j do it "should register" do plugin = LogStash::Plugin.lookup("input", "log4j").new("mode" => "client") - # register will try to load jars and raise if it cannot find jars or if org.apache.log4j.spi.LoggingEvent class is not present expect {plugin.register}.to_not raise_error end @@ -89,41 +89,92 @@ end end - context "full socket tests" do - it "should instantiate with port and let us send content" do - p "starting my test" - port = rand(1024..65535) - - conf = <<-CONFIG - input { - log4j { - mode => "server" - port => #{port} - } - } - CONFIG - p conf - - p "before pipeline" - events = input(conf) do |pipeline, queue| - - p "before socket" - socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } - data = File.binread("testdata/log4j.capture") - socket.puts(data) - socket.flush - socket.close - - p "before collect" - static = Timeout::timeout(5) { - 1.times.collect { queue.pop } + context "integration test" do + let(:host) { "127.0.0.1" } + let(:port) do + socket, address, port = Flores::Random.tcp_listener + socket.close + port + end + + let(:config) do + { + "host" => host, + "port" => port + } + end + + subject { LogStash::Inputs::Log4j.new(config) } + + before do + subject.register + end + + let(:thread) do + Thread.new { subject.run(queue) } + end + + let(:queue) do + [] + end + + let(:client) do + Stud.try(5.times) { TCPSocket.new(host, port) } + end + + after do + subject.do_stop + + 10.times do + break unless thread.alive? + sleep(0.1) + end + expect(thread).not_to be_alive + end + + shared_examples "accept events from the network" do |fixture| + before do + thread # make the thread run + File.open(fixture, "rb") do |payload| + IO.copy_stream(payload, client) + end + client.close + + Stud.try(5.times) do + throw StandardError.new("queue was empty, no data?") if queue.empty? + end + expect(queue.size).to be == 1 + end + + it "should accept an event from the network" do + event = queue.first + expect(event.get("message")).to be == "Hello world" + end + end + + context "default behavior" do + include_examples "accept events from the network", "spec/fixtures/log4j.payload" + end + + context "with proxy enabled" do + let(:config) do + { + "host" => host, + "port" => port, + "proxy_protocol" => true } end - p "after pipeline" - p "after loop" - insist { events.length } == 1 - insist { events[0].get("logger_name") } == "sender" + before do + client.write("PROXY TCP4 1.2.3.4 5.6.7.8 1234 5678\r\n") + end + + include_examples "accept events from the network", "spec/fixtures/log4j.payload" do + it "should set proxy_host and proxy_port" do + event = queue.first + expect(event.get("host")).to be == "1.2.3.4" + end + end end end end From f2159aa2693f808e3a277b4f58f3da4f988494eb Mon Sep 17 00:00:00 2001 From: Jordan Sissel Date: Mon, 23 Jan 2017 12:11:13 -0800 Subject: [PATCH 9/9] Remove no-longer-used files. --- testdata/gen-proxyproto.pl | 9 --------- testdata/log4j-proxyproto.capture | Bin 435 -> 0 bytes testdata/log4j.capture | Bin 394 -> 0 bytes 3 files changed, 9 deletions(-) delete mode 100755 testdata/gen-proxyproto.pl delete mode 100644 testdata/log4j-proxyproto.capture delete mode 100644 testdata/log4j.capture diff --git a/testdata/gen-proxyproto.pl b/testdata/gen-proxyproto.pl deleted file mode 100755 index 6752a0f..0000000 --- a/testdata/gen-proxyproto.pl +++ /dev/null @@ -1,9 +0,0 @@ -#!/usr/bin/perl - -open(my $s, ">log4j-proxyproto.capture"); -print $s "PROXY TCP4 1.2.3.4 127.0.0.1 12345 2456\r\n"; -open(my $in, "TE>-`-_n>_IP!;dLR~>6cRSYLs5&x2Ar&ckB_EZA zqK|6sO3huWxv1v)8xVflUJF^suX>JcB-|!V29`buhC~|GM(_+NANCECAw%ICmL!z3 zb?n%IVKiK*t&X~2`}l|ElyEo>#rky>jD*(MVLH_OB{ Nl6sl}C>`k8{1^N}j^6+P diff --git a/testdata/log4j.capture b/testdata/log4j.capture deleted file mode 100644 index 2713fdc8f35c698a11fe0b79d3fef49c7153862e..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 394 zcmZwCJ5B>J5C-6hl7I*x9#Rf~QlvmyH*_f|5K=&fg3yp|*d4qMUfc0bUQKFFLPbl# zaX0`CTma`$1PPbFe$RaK_67Aaz=;bgL62!piE-*|kd$60(kZ2_x*U_`pYM+r@4lZ` zq0xtCk);>T&!ltDBY#K3kq(sg;mBs`-|QN8SQpe|EWCuh6q%F@vs)~P;aCoEjKZK* zqQ@bw>coz5v8A28vQLhoB_mC&XktkdktXCuDSp~sgBY5tnIqOlbg|4C9}UGuHPVJK zhl+%QKsKWYzM-;2MeDf1c_7Rdv~{u{6E|55SXoa0x7=^GLb1emrXJ#}JhVSwUKzH^ a