From 3edfa2336ebf953953d1b7fb6550e62b2629c5f1 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Wed, 13 Dec 2017 21:14:12 -0600 Subject: [PATCH] Fix a possible race where Thread.interrupted was not properly cleared This should keep perf even --- CHANGELOG.md | 3 ++ lib/logstash/filters/grok/timeout_enforcer.rb | 51 ++++++------------- logstash-filter-grok.gemspec | 2 +- 3 files changed, 20 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 178a55c..79d080a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 4.0.1 + - Fix a potential race + ## 4.0.0 - Major performance improvements due to reduced locking diff --git a/lib/logstash/filters/grok/timeout_enforcer.rb b/lib/logstash/filters/grok/timeout_enforcer.rb index f826038..7d5145b 100644 --- a/lib/logstash/filters/grok/timeout_enforcer.rb +++ b/lib/logstash/filters/grok/timeout_enforcer.rb @@ -19,19 +19,15 @@ def grok_till_timeout(grok, field, value) grok.execute(value) rescue InterruptedRegexpError => e raise ::LogStash::Filters::Grok::TimeoutException.new(grok, field, value) + rescue java.lang.InterruptedException => e + # NOOP, we don't expect these, but maybe some interruptible thing could be + # added to grok besides regexps + @logger.debug("Unexpected interruptible caught during grok. This isn't a problem most likely") ensure - unless stop_thread_groking(thread) - @cancel_mutex.lock - begin - # Clear any interrupts from any previous invocations that were not caught by Joni - # It may appear that this should go in #stop_thread_groking but that would actually - # break functionality! If this were moved there we would clear the interrupt - # immediately after setting it in #cancel_timed_out, hence this MUST be here - java.lang.Thread.interrupted - ensure - @cancel_mutex.unlock - end - end + # If the regexp finished, but interrupt was called after, we'll want to + # clear the interrupted status anyway + @threads_to_start_time.remove(thread) + java.lang.Thread.interrupted end end @@ -72,31 +68,16 @@ def start_thread_groking(thread) @threads_to_start_time.put(thread, java.lang.System.nanoTime) end - # Returns falsy in case there was no Grok execution in progress for the thread - def stop_thread_groking(thread) - @threads_to_start_time.remove(thread) - end - def cancel_timed_out! now = java.lang.System.nanoTime # save ourselves some nanotime calls - @threads_to_start_time.entry_set.each do |entry| - start_time = entry.get_value - if start_time < now && now - start_time > @timeout_nanos - thread = entry.get_key - # Ensure that we never attempt to cancel this thread unless a Grok execution is in progress - # Theoretically there is a race condition here in case the entry's grok action changed - # between evaluating the above condition on the start_time and calling stop_thread_groking - # Practically this is impossible, since it would require a whole loop of writing to an - # output, pulling new input events and starting a new Grok execution in worker thread - # in between the above `if start_time < now && now - start_time > @timeout_nanos` and - # the call to `stop_thread_groking`. - if stop_thread_groking(thread) - @cancel_mutex.lock - begin - thread.interrupt() - ensure - @cancel_mutex.unlock - end + @threads_to_start_time.forEach do |thread, start_time| + # Use compute to lock this value + @threads_to_start_time.compute(thread) do |thread, start_time| + if start_time && start_time < now && now - start_time > @timeout_nanos + thread.interrupt + nil # Delete the key + else + start_time # preserve the key end end end diff --git a/logstash-filter-grok.gemspec b/logstash-filter-grok.gemspec index 2d4389d..e48d5dd 100644 --- a/logstash-filter-grok.gemspec +++ b/logstash-filter-grok.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-filter-grok' - s.version = '4.0.0' + s.version = '4.0.1' s.licenses = ['Apache License (2.0)'] s.summary = "Parses unstructured event data into fields" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"