Skip to content

Commit

Permalink
Incorporate various PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewvc committed Dec 15, 2017
1 parent 3edfa23 commit d213869
Showing 1 changed file with 5 additions and 21 deletions.
26 changes: 5 additions & 21 deletions lib/logstash/filters/grok/timeout_enforcer.rb
Expand Up @@ -9,25 +9,20 @@ def initialize(logger, timeout_nanos)
# Stores running matches with their start time, this is used to cancel long running matches
# Is a map of Thread => start_time
@threads_to_start_time = java.util.concurrent.ConcurrentHashMap.new
@cancel_mutex = Mutex.new
end

def grok_till_timeout(grok, field, value)
begin
thread = java.lang.Thread.currentThread()
start_thread_groking(thread)
@threads_to_start_time.put(thread, java.lang.System.nanoTime)
grok.execute(value)
rescue InterruptedRegexpError => e
rescue InterruptedRegexpError, java.lang.InterruptedException => e
raise ::LogStash::Filters::Grok::TimeoutException.new(grok, field, value)
rescue java.lang.InterruptedException => e
# NOOP, we don't expect these, but maybe some interruptible thing could be
# added to grok besides regexps
@logger.debug("Unexpected interruptible caught during grok. This isn't a problem most likely")
ensure
# If the regexp finished, but interrupt was called after, we'll want to
# clear the interrupted status anyway
@threads_to_start_time.remove(thread)
java.lang.Thread.interrupted
thread.interrupted
end
end

Expand Down Expand Up @@ -57,23 +52,12 @@ def stop!

private

# These methods are private in large part because if they aren't called
# in specific sequence and used together in specific ways the interrupt
# behavior will be incorrect. Do NOT use or modify these methods unless
# you know what you are doing!

def start_thread_groking(thread)
# Clear any interrupts from any previous invocations that were not caught by Joni
java.lang.Thread.interrupted
@threads_to_start_time.put(thread, java.lang.System.nanoTime)
end

def cancel_timed_out!
now = java.lang.System.nanoTime # save ourselves some nanotime calls
@threads_to_start_time.forEach do |thread, start_time|
# Use compute to lock this value
@threads_to_start_time.compute(thread) do |thread, start_time|
if start_time && start_time < now && now - start_time > @timeout_nanos
@threads_to_start_time.computeIfPresent(thread) do |thread, start_time|
if start_time < now && now - start_time > @timeout_nanos
thread.interrupt
nil # Delete the key
else
Expand Down

0 comments on commit d213869

Please sign in to comment.