Skip to content

Commit

Permalink
Fix a possible race where Thread.interrupted was not properly cleared
Browse files Browse the repository at this point in the history
This should keep perf even
  • Loading branch information
andrewvc committed Dec 14, 2017
1 parent 9931349 commit 3edfa23
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 36 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,6 @@
## 4.0.1
- Fix a potential race

## 4.0.0
- Major performance improvements due to reduced locking

Expand Down
51 changes: 16 additions & 35 deletions lib/logstash/filters/grok/timeout_enforcer.rb
Expand Up @@ -19,19 +19,15 @@ def grok_till_timeout(grok, field, value)
grok.execute(value)
rescue InterruptedRegexpError => e
raise ::LogStash::Filters::Grok::TimeoutException.new(grok, field, value)
rescue java.lang.InterruptedException => e
# NOOP, we don't expect these, but maybe some interruptible thing could be
# added to grok besides regexps
@logger.debug("Unexpected interruptible caught during grok. This isn't a problem most likely")
ensure
unless stop_thread_groking(thread)
@cancel_mutex.lock
begin
# Clear any interrupts from any previous invocations that were not caught by Joni
# It may appear that this should go in #stop_thread_groking but that would actually
# break functionality! If this were moved there we would clear the interrupt
# immediately after setting it in #cancel_timed_out, hence this MUST be here
java.lang.Thread.interrupted
ensure
@cancel_mutex.unlock
end
end
# If the regexp finished, but interrupt was called after, we'll want to
# clear the interrupted status anyway
@threads_to_start_time.remove(thread)
java.lang.Thread.interrupted
end
end

Expand Down Expand Up @@ -72,31 +68,16 @@ def start_thread_groking(thread)
@threads_to_start_time.put(thread, java.lang.System.nanoTime)
end

# Returns falsy in case there was no Grok execution in progress for the thread
def stop_thread_groking(thread)
@threads_to_start_time.remove(thread)
end

def cancel_timed_out!
now = java.lang.System.nanoTime # save ourselves some nanotime calls
@threads_to_start_time.entry_set.each do |entry|
start_time = entry.get_value
if start_time < now && now - start_time > @timeout_nanos
thread = entry.get_key
# Ensure that we never attempt to cancel this thread unless a Grok execution is in progress
# Theoretically there is a race condition here in case the entry's grok action changed
# between evaluating the above condition on the start_time and calling stop_thread_groking
# Practically this is impossible, since it would require a whole loop of writing to an
# output, pulling new input events and starting a new Grok execution in worker thread
# in between the above `if start_time < now && now - start_time > @timeout_nanos` and
# the call to `stop_thread_groking`.
if stop_thread_groking(thread)
@cancel_mutex.lock
begin
thread.interrupt()
ensure
@cancel_mutex.unlock
end
@threads_to_start_time.forEach do |thread, start_time|
# Use compute to lock this value
@threads_to_start_time.compute(thread) do |thread, start_time|
if start_time && start_time < now && now - start_time > @timeout_nanos
thread.interrupt
nil # Delete the key
else
start_time # preserve the key
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-filter-grok.gemspec
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-grok'
s.version = '4.0.0'
s.version = '4.0.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Parses unstructured event data into fields"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down

0 comments on commit 3edfa23

Please sign in to comment.