Skip to content

Commit

Permalink
TLS Client verification
Browse files Browse the repository at this point in the history
This PR introduce client verification when using `ssl`, when using this
option you need to specify the **Certificate Authority** using the
`ssl_certificate_authorities` option, this configuration take a
certificate to validate the client with it. You also need to specify a
`ssl_verify_mode`, If you want strict ssl verification use `force_peer`,
with this value if the client doesn't provide a valid certificate the
plugins will refuse the client.

On Filebeat you need to configure the client to declare a certificate using
theses options.

```
tls:
  certificate_authorities: ["/etc/server.crt"]
  certificate: /etc/server.crt
  certificate_key: /etc/server.key
```

This change also introduce testing with travis-ci and an integration
suite with filebeat and LSF for the SSL options.

Fix: logstash-plugins#8
  • Loading branch information
ph committed Mar 9, 2016
1 parent 706ce32 commit 24ad7c0
Show file tree
Hide file tree
Showing 13 changed files with 1,126 additions and 100 deletions.
10 changes: 10 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
sudo: false
language: ruby
cache: bundler
rvm:
- jruby-1.7.24
before_script:
- bundle exec rake test:integration:setup
script:
- bundle exec rspec spec
- bundle exec rspec spec --tag integration
57 changes: 57 additions & 0 deletions Rakefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,64 @@
# encoding: utf-8

OS_PLATFORM = RbConfig::CONFIG["host_os"]
VENDOR_PATH = File.expand_path(File.join(File.dirname(__FILE__), "vendor"))
if OS_PLATFORM == "linux"
FILEBEAT_URL = "https://beats-nightlies.s3.amazonaws.com/filebeat/filebeat-5.0.0-nightlylatest-#{RbConfig::CONFIG["host_cpu"]}.tar.gz"
else
FILEBEAT_URL = "https://beats-nightlies.s3.amazonaws.com/filebeat/filebeat-5.0.0-nightlylatest-#{OS_PLATFORM}.tar.gz"
end
LSF_URL = "https://download.elastic.co/logstash-forwarder/binaries/logstash-forwarder_#{OS_PLATFORM}_amd64"

require "fileutils"
@files=[]

task :default do
system("rake -T")
end

require "logstash/devutils/rake"

namespace :test do
namespace :integration do
task :setup do
Rake::Task["test:integration:setup:filebeat"].invoke
Rake::Task["test:integration:setup:lsf"].invoke
end

namespace :setup do
desc "Download lastest stable version of Logstash-forwarder"
task :lsf do
destination = File.join(VENDOR_PATH, "logstash-forwarder")
FileUtils.rm_rf(destination)
FileUtils.mkdir_p(destination)
download_destination = File.join(destination, "logstash-forwarder")
puts "Logstash-forwarder: downloading from #{LSF_URL} to #{download_destination}"
download(LSF_URL, download_destination)
File.chmod(0755, download_destination)
end

desc "Download nigthly filebeat for integration testing"
task :filebeat do
FileUtils.mkdir_p(VENDOR_PATH)
download_destination = File.join(VENDOR_PATH, "filebeat.tar.gz")
destination = File.join(VENDOR_PATH, "filebeat")
FileUtils.rm_rf(download_destination)
FileUtils.rm_rf(destination)
FileUtils.rm_rf(File.join(VENDOR_PATH, "filebeat.tar"))
puts "Filebeat: downloading from #{FILEBEAT_URL} to #{download_destination}"
download(FILEBEAT_URL, download_destination)

untar_all(download_destination, File.join(VENDOR_PATH, "filebeat")) { |e| e }
end
end
end
end

# Uncompress all the file from the archive this only work with
# one level directory structure and its fine for LSF and filebeat packaging.
def untar_all(file, destination)
untar(file) do |entry|
out = entry.full_name.split("/").last
File.join(destination, out)
end
end
43 changes: 33 additions & 10 deletions lib/logstash/inputs/beats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,22 @@ class InsertingToQueueTakeTooLong < Exception; end
# SSL key passphrase to use.
config :ssl_key_passphrase, :validate => :password

# Validate client certificates against theses authorities
# You can defined multiples files or path, all the certificates will
# be read and added to the trust store.
config :ssl_certificate_authorities, :validate => :array, :default => []

# By default the server dont do any client verification,
#
# `peer` will make the server ask the client to provide a certificate,
# if the client provide the certificate it will be validated.
#
# `force_peer` will make the server ask the client for their certificate, if the clients
# doesn't provide it the connection will be closed.
#
# This option need to be used with `ssl_certificate_authorities` and a defined list of CA.
config :ssl_verify_mode, :validate => ["none", "peer", "force_peer"], :default => "none"

# The number of seconds before we raise a timeout,
# this option is useful to control how much time to wait if something is blocking the pipeline.
config :congestion_threshold, :validate => :number, :default => 5
Expand All @@ -87,9 +103,16 @@ def register
end

@logger.info("Beats inputs: Starting input listener", :address => "#{@host}:#{@port}")
@lumberjack = Lumberjack::Beats::Server.new(:address => @host, :port => @port,
:ssl => @ssl, :ssl_certificate => @ssl_certificate, :ssl_key => @ssl_key,
:ssl_key_passphrase => @ssl_key_passphrase)


@lumberjack = Lumberjack::Beats::Server.new(:address => @host,
:port => @port,
:ssl => @ssl,
:ssl_certificate => @ssl_certificate,
:ssl_key => @ssl_key,
:ssl_key_passphrase => @ssl_key_passphrase,
:ssl_certificate_authorities => @ssl_certificate_authorities,
:ssl_verify_mode => @ssl_verify_mode)

# in 1.5 the main SizeQueue doesnt have the concept of timeout
# We are using a small plugin buffer to move events to the internal queue
Expand All @@ -103,8 +126,8 @@ def register
@codec = LogStash::Codecs::IdentityMapCodec.new(@codec)

# Keep a list of active connections so we can flush their codec on shutdown
# Use threadsafe gem, since we have a strict dependency on concurrent-ruby 0.9.2

# Use threadsafe gem, since we have a strict dependency on concurrent-ruby 0.9.2
# in the core
@connections_list = ThreadSafe::Hash.new
end # def register
Expand All @@ -127,13 +150,13 @@ def run(output_queue)
connection = @lumberjack.accept # call that creates a new connection
# if the connection is nil the connection was closed upstream,
# so we will try in another iteration to recover or stop.
next if connection.nil?
next if connection.nil?

Thread.new do
Thread.new do
handle_new_connection(connection)
end
else
@logger.warn("Beats input: the pipeline is blocked, temporary refusing new connection.",
@logger.warn("Beats input: the pipeline is blocked, temporary refusing new connection.",
:reconnect_backoff_sleep => RECONNECT_BACKOFF_SLEEP)
sleep(RECONNECT_BACKOFF_SLEEP)
end
Expand Down Expand Up @@ -186,9 +209,9 @@ def handle_new_connection(connection)
@logger.warn("Beats input: The circuit breaker has detected a slowdown or stall in the pipeline, the input is closing the current connection and rejecting new connection until the pipeline recover.",
:exception => e.class)
rescue Exception => e # If we have a malformed packet we should handle that so the input doesn't crash completely.
@logger.error("Beats input: unhandled exception",
@logger.error("Beats input: unhandled exception",
:exception => e,
:backtrace => e.backtrace)
:backtrace => e.backtrace)
ensure
transformer = LogStash::Inputs::BeatsSupport::EventTransformCommon.new(self)

Expand Down
81 changes: 65 additions & 16 deletions lib/lumberjack/beats/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,21 @@ def initialize(opts={})
:port => 0,
:addresses => [],
:ssl_certificate => nil,
:ssl_certificate_key => nil,
:ssl_certificate_authorities => nil,
:ssl => true,
:json => false,
}.merge(opts)

@opts[:addresses] = [@opts[:addresses]] if @opts[:addresses].class == String
@opts[:addresses] = Array(@opts[:addresses])
raise "Must set a port." if @opts[:port] == 0
raise "Must set atleast one address" if @opts[:addresses].empty? == 0
raise "Must set a ssl certificate or path" if @opts[:ssl_certificate].nil? && @opts[:ssl]

if @opts[:ssl]
if @opts[:ssl_certificate_authorities].nil? && (@opts[:ssl_certificate].nil? || @opts[:ssl_certificate_key].nil?)
raise "Must set a ssl certificate or path"
end
end

@socket = connect
end
Expand Down Expand Up @@ -67,36 +74,78 @@ def initialize(opts={})
@opts = {
:port => 0,
:address => "127.0.0.1",
:ssl_certificate_authorities => [], # use the same naming as beats' TLS options
:ssl_certificate => nil,
:ssl_certificate_key => nil,
:ssl_certificate_password => nil,
:ssl => true,
:json => false,
}.merge(opts)
@host = @opts[:address]

connection_start(opts)
connection_start
end

private
def connection_start(opts)
tcp_socket = TCPSocket.new(opts[:address], opts[:port])
if !opts[:ssl]
def connection_start
tcp_socket = TCPSocket.new(@opts[:address], @opts[:port])

if !@opts[:ssl]
@socket = tcp_socket
else
certificate = OpenSSL::X509::Certificate.new(File.read(opts[:ssl_certificate]))

certificate_store = OpenSSL::X509::Store.new
certificate_store.add_cert(certificate)
@socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, setup_ssl)
@socket.connect
end
end

private
def setup_ssl
ssl_context = OpenSSL::SSL::SSLContext.new

ssl_context = OpenSSL::SSL::SSLContext.new
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
ssl_context.cert_store = certificate_store
ssl_context.cert = certificate
ssl_context.key = private_key
ssl_context.verify_mode = OpenSSL::SSL::VERIFY_PEER
ssl_context.cert_store = trust_store
ssl_context
end

@socket = OpenSSL::SSL::SSLSocket.new(tcp_socket, ssl_context)
@socket.connect
private
def certificate
if @opts[:ssl_certificate]
OpenSSL::X509::Certificate.new(File.open(@opts[:ssl_certificate]))
end
end

private
private
def private_key
OpenSSL::PKey::RSA.new(File.read(@opts[:ssl_certificate_key]), @opts[:ssl_certificate_password]) if @opts[:ssl_certificate_key]
end

private
def trust_store
store = OpenSSL::X509::Store.new

Array(@opts[:ssl_certificate_authorities]).each do |certificate_authority|
if File.file?(certificate_authority)
store.add_file(certificate_authority)
else
# add_path is no implemented under jruby
# so recursively try to load all the certificate from this directory
# https://github.com/jruby/jruby-openssl/blob/master/src/main/java/org/jruby/ext/openssl/X509Store.java#L159
if !!(RUBY_PLATFORM == "java")
Dir.glob(File.join(certificate_authority, "**", "*")).each { |f| store.add_file(f) }
else
store.add_path(certificate_authority)
end
end
end

store
end


private
def inc
@sequence = 0 if @sequence + 1 > Lumberjack::Beats::SEQUENCE_MAX
@sequence = @sequence + 1
Expand Down Expand Up @@ -134,7 +183,7 @@ def write_sync(elements, opts={})
ack(elements.size)
end

private
private
def compress_payload(payload)
compress = Zlib::Deflate.deflate(payload)
["1", "C", compress.bytesize, compress].pack("AANA*")
Expand Down
Loading

0 comments on commit 24ad7c0

Please sign in to comment.