From 14371610e6e7c4b83a4ddf3f29f49a0bd0f21c8c Mon Sep 17 00:00:00 2001 From: Will Weber Date: Wed, 29 Jul 2020 10:50:25 -0400 Subject: [PATCH 1/3] Fix SSLSubject regression Attempt to follow the same general flow for tls principal extraction as the beats input plugin: https://github.com/logstash-plugins/logstash-input-beats/blob/5dd54594f65d32aad87d1dfd7b04d0c801216676/lib/logstash/inputs/beats/message_listener.rb#L125-L155 Extract's the ssl subject from inbound messages by: - adjust the interface to pass in the Netty ChannelHandlerContext instead of the Address name the InputHandler's ssl_handler - in the decoder, check if ssl_verify and ssl_enable are true and pull out the subjectname from the context Follow up after review: - Restore ssl-setup code needed for client-mode initialization - Swap conditionals to check for sslsubject extraction --- lib/logstash/inputs/tcp.rb | 5 +++- lib/logstash/inputs/tcp/decoder_impl.rb | 14 ++++++++--- spec/inputs/tcp_spec.rb | 25 +++++++++++++++++++ src/main/java/org/logstash/tcp/Decoder.java | 4 +-- src/main/java/org/logstash/tcp/InputLoop.java | 7 ++++-- version | 2 +- 6 files changed, 48 insertions(+), 9 deletions(-) diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index e2a3ad0..b3bcdaf 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -186,12 +186,15 @@ def close end def decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address, - proxy_port, tbuf, socket) + proxy_port, tbuf, socket, ssl_subject) codec.decode(tbuf) do |event| if @proxy_protocol event.set(PROXY_HOST_FIELD, proxy_address) unless event.get(PROXY_HOST_FIELD) event.set(PROXY_PORT_FIELD, proxy_port) unless event.get(PROXY_PORT_FIELD) end + if ssl_subject + event.set(SSLSUBJECT_FIELD, ssl_subject) + end enqueue_decorated(event, client_ip_address, client_address, client_port, socket) end end diff --git a/lib/logstash/inputs/tcp/decoder_impl.rb b/lib/logstash/inputs/tcp/decoder_impl.rb index 75af6e0..60ff51c 100644 --- a/lib/logstash/inputs/tcp/decoder_impl.rb +++ b/lib/logstash/inputs/tcp/decoder_impl.rb @@ -11,7 +11,8 @@ def initialize(codec, tcp) @first_read = true end - def decode(channel_addr, data) + def decode(ctx, data) + channel_addr = ctx.channel().remoteAddress() bytes = Java::byte[data.readableBytes].new data.getBytes(0, bytes) data.release @@ -19,8 +20,15 @@ def decode(channel_addr, data) if @first_read tbuf = init_first_read(channel_addr, tbuf) end - @tcp.decode_buffer(@ip_address, @address, @port, @codec, - @proxy_address, @proxy_port, tbuf, nil) + if @tcp.ssl_enable && @tcp.ssl_verify + session = ctx.channel().pipeline().get("ssl-handler").engine().getSession() + sslsubject = session.getPeerPrincipal().getName() + @tcp.decode_buffer(@ip_address, @address, @port, @codec, + @proxy_address, @proxy_port, tbuf, nil, sslsubject) + else + @tcp.decode_buffer(@ip_address, @address, @port, @codec, + @proxy_address, @proxy_port, tbuf, nil, nil) + end end def copy diff --git a/spec/inputs/tcp_spec.rb b/spec/inputs/tcp_spec.rb index 3828548..6493202 100644 --- a/spec/inputs/tcp_spec.rb +++ b/spec/inputs/tcp_spec.rb @@ -519,6 +519,31 @@ def get_port expect(result.first.get("message")).to eq(message) end end + context "with a regular TLS setup" do + let(:config) do + { + "host" => "127.0.0.1", + "port" => port, + "ssl_enable" => true, + "ssl_cert" => chain_of_certificates[:b_cert].path, + "ssl_key" => chain_of_certificates[:b_key].path, + "ssl_extra_chain_certs" => [ chain_of_certificates[:a_cert].path ], + "ssl_certificate_authorities" => [ chain_of_certificates[:root_ca].path ], + "ssl_verify" => true + } + end + it "should be able to extract the sslsubject from connections" do + result = TcpHelpers.pipelineless_input(subject, 1) do + sslsocket.connect + sslsocket.write("#{message}\n") + tcp.flush + sslsocket.close + tcp.close + end + expect(result.size).to eq(1) + expect(result.first.get("sslsubject")).to eq("CN=RubyAA_Cert,DC=ruby-lang,DC=org") + end + end end context "with a poorly-behaving client" do diff --git a/src/main/java/org/logstash/tcp/Decoder.java b/src/main/java/org/logstash/tcp/Decoder.java index 2d7e4c8..5d0a950 100644 --- a/src/main/java/org/logstash/tcp/Decoder.java +++ b/src/main/java/org/logstash/tcp/Decoder.java @@ -1,7 +1,7 @@ package org.logstash.tcp; import io.netty.buffer.ByteBuf; -import java.net.SocketAddress; +import io.netty.channel.ChannelHandlerContext; /** * Decoder bridge to implement in JRuby. @@ -13,7 +13,7 @@ public interface Decoder { * @param key {@link SocketAddress} * @param message Data {@link ByteBuf} for this address */ - void decode(SocketAddress key, ByteBuf message); + void decode(ChannelHandlerContext context, ByteBuf message); /** * Creates a copy of this decoder, that has all internal meta data cleared. diff --git a/src/main/java/org/logstash/tcp/InputLoop.java b/src/main/java/org/logstash/tcp/InputLoop.java index 292b634..5d8a436 100644 --- a/src/main/java/org/logstash/tcp/InputLoop.java +++ b/src/main/java/org/logstash/tcp/InputLoop.java @@ -108,6 +108,7 @@ public void close() { * {@link Decoder}. */ private static final class InputHandler extends ChannelInitializer { + private final String SSL_HANDLER = "ssl-handler"; /** * {@link Decoder} supplied by JRuby. @@ -140,7 +141,7 @@ protected void initChannel(final SocketChannel channel) throws Exception { // if SSL is enabled, the SSL handler must be added to the pipeline first if (sslContext != null) { - channel.pipeline().addLast(sslContext.newHandler(channel.alloc())); + channel.pipeline().addLast(SSL_HANDLER, sslContext.newHandler(channel.alloc())); } channel.pipeline().addLast(new DecoderAdapter(localCopy, logger)); @@ -199,9 +200,11 @@ private static final class DecoderAdapter extends ChannelInboundHandlerAdapter { this.decoder = decoder; } + // 6.07 updated to pass in the full netty ChannelHandlerContext instead of the remoteaddress field + // corresponding interface updated @Override public void channelRead(final ChannelHandlerContext ctx, final Object msg) { - decoder.decode(ctx.channel().remoteAddress(), (ByteBuf) msg); + decoder.decode(ctx, (ByteBuf) msg); } @Override diff --git a/version b/version index b7ff151..089b1e6 100644 --- a/version +++ b/version @@ -1 +1 @@ -6.0.6 +6.0.7 From 1c514d1b7253f7d52347a9d05321e568b7373b69 Mon Sep 17 00:00:00 2001 From: Will Weber Date: Wed, 29 Jul 2020 15:24:22 -0400 Subject: [PATCH 2/3] Move client-mode sslsubject handling to `handle_socket` --- lib/logstash/inputs/tcp.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index b3bcdaf..8bf1048 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -225,6 +225,9 @@ def handle_socket(socket) client_address = socket.peeraddr[3] client_ip_address = socket.peeraddr[2] client_port = socket.peeraddr[1] + + # Client mode sslsubject extraction, server mode happens in DecoderImpl#decode + ssl_subject = socket.peer_cert.subject.to_s if !server? && @ssl_enable && @ssl_verify peer = "#{client_address}:#{client_port}" first_read = true codec = @codec.clone @@ -248,7 +251,7 @@ def handle_socket(socket) end end decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address, - proxy_port, tbuf, socket) + proxy_port, tbuf, socket, ssl_subject) end rescue EOFError @logger.debug? && @logger.debug("Connection closed", :client => peer) @@ -271,7 +274,6 @@ def enqueue_decorated(event, client_ip_address, client_address, client_port, soc event.set(HOST_FIELD, client_address) unless event.get(HOST_FIELD) event.set(HOST_IP_FIELD, client_ip_address) unless event.get(HOST_IP_FIELD) event.set(PORT_FIELD, client_port) unless event.get(PORT_FIELD) - event.set(SSLSUBJECT_FIELD, socket.peer_cert.subject.to_s) if socket && @ssl_enable && @ssl_verify && event.get(SSLSUBJECT_FIELD).nil? decorate(event) @output_queue << event end From ee10d3d3a4d51208823f895fef3dac64f15cb207 Mon Sep 17 00:00:00 2001 From: Will Weber Date: Wed, 29 Jul 2020 15:56:40 -0400 Subject: [PATCH 3/3] Reduce instances where we're passing around socket Reducing the number of times we refer to the socket object so that most of the interaction lives in the `handle_socket` method. Remove redundant conditional in ssl_subject assignment, it's already assumed that we're in client mode at that point. Necessary to also fix up refderences in the DecoderImpl class as well, since the tcp plugin's decode_buffer method changed. --- lib/logstash/inputs/tcp.rb | 17 +++++++++-------- lib/logstash/inputs/tcp/decoder_impl.rb | 6 +++--- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/lib/logstash/inputs/tcp.rb b/lib/logstash/inputs/tcp.rb index 8bf1048..d3a98b2 100644 --- a/lib/logstash/inputs/tcp.rb +++ b/lib/logstash/inputs/tcp.rb @@ -186,7 +186,7 @@ def close end def decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address, - proxy_port, tbuf, socket, ssl_subject) + proxy_port, tbuf, ssl_subject) codec.decode(tbuf) do |event| if @proxy_protocol event.set(PROXY_HOST_FIELD, proxy_address) unless event.get(PROXY_HOST_FIELD) @@ -195,13 +195,13 @@ def decode_buffer(client_ip_address, client_address, client_port, codec, proxy_a if ssl_subject event.set(SSLSUBJECT_FIELD, ssl_subject) end - enqueue_decorated(event, client_ip_address, client_address, client_port, socket) + enqueue_decorated(event, client_ip_address, client_address, client_port) end end - def flush_codec(codec, client_ip_address, client_address, client_port, socket) + def flush_codec(codec, client_ip_address, client_address, client_port) codec.flush do |event| - enqueue_decorated(event, client_ip_address, client_address, client_port, socket) + enqueue_decorated(event, client_ip_address, client_address, client_port) end end @@ -221,13 +221,14 @@ def run_client() client_socket.close rescue nil end + # only called in client mode def handle_socket(socket) client_address = socket.peeraddr[3] client_ip_address = socket.peeraddr[2] client_port = socket.peeraddr[1] # Client mode sslsubject extraction, server mode happens in DecoderImpl#decode - ssl_subject = socket.peer_cert.subject.to_s if !server? && @ssl_enable && @ssl_verify + ssl_subject = socket.peer_cert.subject.to_s if @ssl_enable && @ssl_verify peer = "#{client_address}:#{client_port}" first_read = true codec = @codec.clone @@ -251,7 +252,7 @@ def handle_socket(socket) end end decode_buffer(client_ip_address, client_address, client_port, codec, proxy_address, - proxy_port, tbuf, socket, ssl_subject) + proxy_port, tbuf, ssl_subject) end rescue EOFError @logger.debug? && @logger.debug("Connection closed", :client => peer) @@ -267,10 +268,10 @@ def handle_socket(socket) ensure # catch all rescue nil on close to discard any close errors or invalid socket socket.close rescue nil - flush_codec(codec, client_ip_address, client_address, client_port, socket) + flush_codec(codec, client_ip_address, client_address, client_port) end - def enqueue_decorated(event, client_ip_address, client_address, client_port, socket) + def enqueue_decorated(event, client_ip_address, client_address, client_port) event.set(HOST_FIELD, client_address) unless event.get(HOST_FIELD) event.set(HOST_IP_FIELD, client_ip_address) unless event.get(HOST_IP_FIELD) event.set(PORT_FIELD, client_port) unless event.get(PORT_FIELD) diff --git a/lib/logstash/inputs/tcp/decoder_impl.rb b/lib/logstash/inputs/tcp/decoder_impl.rb index 60ff51c..fd8eeef 100644 --- a/lib/logstash/inputs/tcp/decoder_impl.rb +++ b/lib/logstash/inputs/tcp/decoder_impl.rb @@ -24,10 +24,10 @@ def decode(ctx, data) session = ctx.channel().pipeline().get("ssl-handler").engine().getSession() sslsubject = session.getPeerPrincipal().getName() @tcp.decode_buffer(@ip_address, @address, @port, @codec, - @proxy_address, @proxy_port, tbuf, nil, sslsubject) + @proxy_address, @proxy_port, tbuf, sslsubject) else @tcp.decode_buffer(@ip_address, @address, @port, @codec, - @proxy_address, @proxy_port, tbuf, nil, nil) + @proxy_address, @proxy_port, tbuf, nil) end end @@ -36,7 +36,7 @@ def copy end def flush - @tcp.flush_codec(@codec, @ip_address, @address, @port, nil) + @tcp.flush_codec(@codec, @ip_address, @address, @port) end private