diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index 56d9863..7420f31 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -30,6 +30,10 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base # `client` connects to a server. config :mode, :validate => ["server", "client"], :default => "server" + # Proxy protocol support, only v1 is supported at this time + # http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt + config :proxy_protocol, :validate => :boolean, :default => false + # Enable SSL (must be set for other `ssl_` options to take effect). config :ssl_enable, :validate => :boolean, :default => false @@ -55,6 +59,8 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base HOST_FIELD = "host".freeze PORT_FIELD = "port".freeze + PROXY_HOST_FIELD = "proxy_host".freeze + PROXY_PORT_FIELD = "proxy_port".freeze SSLSUBJECT_FIELD = "sslsubject".freeze def initialize(*args) @@ -153,8 +159,31 @@ def server_connection_thread(output_queue, socket) def handle_socket(socket, client_address, client_port, output_queue, codec) peer = "#{client_address}:#{client_port}" + first_read = true while !stop? - codec.decode(read(socket)) do |event| + tbuf = read(socket) + if @proxy_protocol && first_read + first_read = false + orig_buf = tbuf + pp_hdr, tbuf = tbuf.split("\r\n", 2) + + pp_info = pp_hdr.split(/\s/) + # PROXY proto clientip proxyip clientport proxyport + if pp_info[0] != "PROXY" + @logger.error("invalid proxy protocol header label", :hdr => pp_hdr) + raise IOError + else + proxy_address = pp_info[3] + proxy_port = pp_info[5] + client_address = pp_info[2] + client_port = pp_info[4] + end + end + codec.decode(tbuf) do |event| + if @proxy_protocol + event.set(PROXY_HOST_FIELD, proxy_address) unless event.get(PROXY_HOST_FIELD) + event.set(PROXY_PORT_FIELD, proxy_port) unless event.get(PROXY_PORT_FIELD) + end event.set(HOST_FIELD, client_address) unless event.get(HOST_FIELD) event.set(PORT_FIELD, client_port) unless event.get(PORT_FIELD) event.set(SSLSUBJECT_FIELD, socket.peer_cert.subject.to_s) if @ssl_enable && @ssl_verify && event.get(SSLSUBJECT_FIELD).nil? diff --git a/spec/inputs/tcp_spec.rb b/spec/inputs/tcp_spec.rb index f0e58ac..3a92336 100644 --- a/spec/inputs/tcp_spec.rb +++ b/spec/inputs/tcp_spec.rb @@ -63,6 +63,42 @@ end end + it "should handle PROXY protocol v1 connections" do + event_count = 10 + port = rand(1024..65535) + conf = <<-CONFIG + input { + tcp { + port => #{port} + proxy_protocol => true + } + } + CONFIG + + events = input(conf) do |pipeline, queue| + socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) } + socket.puts("PROXY TCP4 1.2.3.4 5.6.7.8 1234 5678\r"); + socket.flush + event_count.times do |i| + # unicode smiley for testing unicode support! + socket.puts("#{i} ☹") + socket.flush + end + socket.close + + event_count.times.collect {queue.pop} + end + + insist { events.length } == event_count + event_count.times do |i| + insist { events[i].get("message") } == "#{i} ☹" + insist { events[i].get("host") } == "1.2.3.4" + insist { events[i].get("port") } == "1234" + insist { events[i].get("proxy_host") } == "5.6.7.8" + insist { events[i].get("proxy_port") } == "5678" + end + end + it "should read events with plain codec and ISO-8859-1 charset" do port = rand(1024..65535) charset = "ISO-8859-1"