Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion lib/logstash/inputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base
# `client` connects to a server.
config :mode, :validate => ["server", "client"], :default => "server"

# Proxy protocol support, only v1 is supported at this time
# http://www.haproxy.org/download/1.5/doc/proxy-protocol.txt
config :proxy_protocol, :validate => :boolean, :default => false

# Enable SSL (must be set for other `ssl_` options to take effect).
config :ssl_enable, :validate => :boolean, :default => false

Expand All @@ -55,6 +59,8 @@ class LogStash::Inputs::Tcp < LogStash::Inputs::Base

HOST_FIELD = "host".freeze
PORT_FIELD = "port".freeze
PROXY_HOST_FIELD = "proxy_host".freeze
PROXY_PORT_FIELD = "proxy_port".freeze
SSLSUBJECT_FIELD = "sslsubject".freeze

def initialize(*args)
Expand Down Expand Up @@ -153,8 +159,31 @@ def server_connection_thread(output_queue, socket)

def handle_socket(socket, client_address, client_port, output_queue, codec)
peer = "#{client_address}:#{client_port}"
first_read = true
while !stop?
codec.decode(read(socket)) do |event|
tbuf = read(socket)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that read() returns a payload that has too-few bytes in it to form a complete header.

However, given the following part of the specification, I am willing to accept that this is unlikely my experience with modern networks:

Both formats are designed to fit in the smallest TCP segment that any TCP/IP
host is required to support (576 - 40 = 536 bytes). This ensures that the whole
header will always be delivered at once when the socket buffers are still empty
at the beginning of a connection. The sender must always ensure that the header
is sent at once, so that the transport layer maintains atomicity along the path
to the receiver. The receiver may be tolerant to partial headers or may simply
drop the connection when receiving a partial header. Recommendation is to be
tolerant, but implementation constraints may not always easily permit this

if @proxy_protocol && first_read
first_read = false
orig_buf = tbuf
pp_hdr, tbuf = tbuf.split("\r\n", 2)

pp_info = pp_hdr.split(/\s/)
# PROXY proto clientip proxyip clientport proxyport
if pp_info[0] != "PROXY"
@logger.error("invalid proxy protocol header label", :hdr => pp_hdr)
raise IOError
else
proxy_address = pp_info[3]
proxy_port = pp_info[5]
client_address = pp_info[2]
client_port = pp_info[4]
end
end
codec.decode(tbuf) do |event|
if @proxy_protocol
event.set(PROXY_HOST_FIELD, proxy_address) unless event.get(PROXY_HOST_FIELD)
event.set(PROXY_PORT_FIELD, proxy_port) unless event.get(PROXY_PORT_FIELD)
end
event.set(HOST_FIELD, client_address) unless event.get(HOST_FIELD)
event.set(PORT_FIELD, client_port) unless event.get(PORT_FIELD)
event.set(SSLSUBJECT_FIELD, socket.peer_cert.subject.to_s) if @ssl_enable && @ssl_verify && event.get(SSLSUBJECT_FIELD).nil?
Expand Down
36 changes: 36 additions & 0 deletions spec/inputs/tcp_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,42 @@
end
end

it "should handle PROXY protocol v1 connections" do
event_count = 10
port = rand(1024..65535)
conf = <<-CONFIG
input {
tcp {
port => #{port}
proxy_protocol => true
}
}
CONFIG

events = input(conf) do |pipeline, queue|
socket = Stud::try(5.times) { TCPSocket.new("127.0.0.1", port) }
socket.puts("PROXY TCP4 1.2.3.4 5.6.7.8 1234 5678\r");
socket.flush
event_count.times do |i|
# unicode smiley for testing unicode support!
socket.puts("#{i} ☹")
socket.flush
end
socket.close

event_count.times.collect {queue.pop}
end

insist { events.length } == event_count
event_count.times do |i|
insist { events[i].get("message") } == "#{i} ☹"
insist { events[i].get("host") } == "1.2.3.4"
insist { events[i].get("port") } == "1234"
insist { events[i].get("proxy_host") } == "5.6.7.8"
insist { events[i].get("proxy_port") } == "5678"
end
end

it "should read events with plain codec and ISO-8859-1 charset" do
port = rand(1024..65535)
charset = "ISO-8859-1"
Expand Down