Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 3.1.2
- Fix: eliminate high CPU usage when data timeout is disabled and no data is available on the socket [#30](https://github.com/logstash-plugins/logstash-input-unix/pull/30)

## 3.1.1
- Fix: unable to stop plugin (on LS 6.x) [#29](https://github.com/logstash-plugins/logstash-input-unix/pull/29)
- Refactor: plugin internals got reviewed for `data_timeout => ...` to work reliably
Expand Down
48 changes: 38 additions & 10 deletions lib/logstash/inputs/unix.rb
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,15 @@ def handle_socket(socket, output_queue)
begin
hostname = Socket.gethostname
while !stop?
data = socket.read_nonblock(16384, exception: false)

if data == :wait_readable
if @data_timeout == -1 || IO.select([socket], nil, nil, @data_timeout)
next # retry socket read
else
# socket not ready after @data_timeout seconds
@logger.info("Closing connection after read timeout", :path => @path)
return
end
data = io_interruptable_readpartial(socket, 16384, @data_timeout)

if data == :data_timeout
# socket not ready after @data_timeout seconds
@logger.info("Closing connection after read timeout", :path => @path)
return
elsif data == :stopping
@logger.trace("Shutdown in progress", :path => @path)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we make the log info level for better visibility?

Copy link
Contributor Author

@yaauie yaauie Oct 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I intentionally added at trace level because this is noise about the plugin stopping reads from the socket that is not likely to be useful when a user is going about their normal business and already has signal about the plugin and pipeline getting closed.

next # let next loop handle graceful stop
end

@codec.decode(data) do |event|
Expand All @@ -118,6 +117,35 @@ def handle_socket(socket, output_queue)
end
end

##
# Emulates `IO#readpartial` with a timeout and our plugin's stop-condition,
# limiting blocking calls to windows of 10s or less to ensure it can be interrupted.
#
# @param readable_io [IO] the IO to read from
# @param maxlen [Integer] the max bytes to be read
# @param timeout [Number] the maximum number of seconds to , or -1 to disable timeouts
#
# @return [:data_timeout] if timeout was reached before bytes were available
# @return [:stopping] if plugin stop-condition was detected before bytes were available
# @return [String] a non-empty string if bytes became available before the timeout was reached
def io_interruptable_readpartial(readable_io, maxlen, timeout)

data_timeout_deadline = timeout < 0 ? nil : Time.now + timeout
maximum_blocking_seconds = timeout < 0 || timeout > 10 ? 10 : timeout

loop do
return :stopping if stop?
result = readable_io.read_nonblock(maxlen, exception: false)

return result if result.kind_of?(String)
raise EOFError if result.nil?
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would be a case where esult.nil? is true?

Copy link

@mashhurs mashhurs Oct 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on IO#read_nonblock doc:
At EOF, it will return nil instead of raising EOFError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. When using IO#read_nonblock with exception: false we must handle all of the failure conditions ourselves.


return :data_timeout if (data_timeout_deadline && data_timeout_deadline < Time.now)
IO.select([readable_io], nil, nil, maximum_blocking_seconds)
end
end
private :io_interruptable_readpartial

private
def server?
@mode == "server"
Expand Down
2 changes: 1 addition & 1 deletion logstash-input-unix.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-input-unix'
s.version = '3.1.1'
s.version = '3.1.2'
s.licenses = ['Apache License (2.0)']
s.summary = "Reads events over a UNIX socket"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down