diff --git a/CHANGELOG.md b/CHANGELOG.md index c6028cd..4072926 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 3.1.2 + - Fix: eliminate high CPU usage when data timeout is disabled and no data is available on the socket [#30](https://github.com/logstash-plugins/logstash-input-unix/pull/30) + ## 3.1.1 - Fix: unable to stop plugin (on LS 6.x) [#29](https://github.com/logstash-plugins/logstash-input-unix/pull/29) - Refactor: plugin internals got reviewed for `data_timeout => ...` to work reliably diff --git a/lib/logstash/inputs/unix.rb b/lib/logstash/inputs/unix.rb index ebaf26e..2863f27 100644 --- a/lib/logstash/inputs/unix.rb +++ b/lib/logstash/inputs/unix.rb @@ -84,16 +84,15 @@ def handle_socket(socket, output_queue) begin hostname = Socket.gethostname while !stop? - data = socket.read_nonblock(16384, exception: false) - - if data == :wait_readable - if @data_timeout == -1 || IO.select([socket], nil, nil, @data_timeout) - next # retry socket read - else - # socket not ready after @data_timeout seconds - @logger.info("Closing connection after read timeout", :path => @path) - return - end + data = io_interruptable_readpartial(socket, 16384, @data_timeout) + + if data == :data_timeout + # socket not ready after @data_timeout seconds + @logger.info("Closing connection after read timeout", :path => @path) + return + elsif data == :stopping + @logger.trace("Shutdown in progress", :path => @path) + next # let next loop handle graceful stop end @codec.decode(data) do |event| @@ -118,6 +117,35 @@ def handle_socket(socket, output_queue) end end + ## + # Emulates `IO#readpartial` with a timeout and our plugin's stop-condition, + # limiting blocking calls to windows of 10s or less to ensure it can be interrupted. + # + # @param readable_io [IO] the IO to read from + # @param maxlen [Integer] the max bytes to be read + # @param timeout [Number] the maximum number of seconds to , or -1 to disable timeouts + # + # @return [:data_timeout] if timeout was reached before bytes were available + # @return [:stopping] if plugin stop-condition was detected before bytes were available + # @return [String] a non-empty string if bytes became available before the timeout was reached + def io_interruptable_readpartial(readable_io, maxlen, timeout) + + data_timeout_deadline = timeout < 0 ? nil : Time.now + timeout + maximum_blocking_seconds = timeout < 0 || timeout > 10 ? 10 : timeout + + loop do + return :stopping if stop? + result = readable_io.read_nonblock(maxlen, exception: false) + + return result if result.kind_of?(String) + raise EOFError if result.nil? + + return :data_timeout if (data_timeout_deadline && data_timeout_deadline < Time.now) + IO.select([readable_io], nil, nil, maximum_blocking_seconds) + end + end + private :io_interruptable_readpartial + private def server? @mode == "server" diff --git a/logstash-input-unix.gemspec b/logstash-input-unix.gemspec index 68b2678..bfed81e 100644 --- a/logstash-input-unix.gemspec +++ b/logstash-input-unix.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-input-unix' - s.version = '3.1.1' + s.version = '3.1.2' s.licenses = ['Apache License (2.0)'] s.summary = "Reads events over a UNIX socket" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"