Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a possible race where Thread.interrupted was not properly cleared #131

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
@@ -1,3 +1,6 @@
## 4.0.1
- Fix a potential race

## 4.0.0
- Major performance improvements due to reduced locking

Expand Down
77 changes: 22 additions & 55 deletions lib/logstash/filters/grok/timeout_enforcer.rb
@@ -1,44 +1,37 @@
class LogStash::Filters::Grok::TimeoutEnforcer
attr_reader :running

def initialize(logger, timeout_nanos)
@logger = logger
@running = false
@running = java.util.concurrent.atomic.AtomicBoolean.new(false)
@timeout_nanos = timeout_nanos

# Stores running matches with their start time, this is used to cancel long running matches
# Is a map of Thread => start_time
@threads_to_start_time = java.util.concurrent.ConcurrentHashMap.new
@cancel_mutex = Mutex.new
end

def running
@running.get()
end

def grok_till_timeout(grok, field, value)
begin
thread = java.lang.Thread.currentThread()
start_thread_groking(thread)
@threads_to_start_time.put(thread, java.lang.System.nanoTime)
grok.execute(value)
rescue InterruptedRegexpError => e
rescue InterruptedRegexpError, java.lang.InterruptedException => e
raise ::LogStash::Filters::Grok::TimeoutException.new(grok, field, value)
ensure
unless stop_thread_groking(thread)
@cancel_mutex.lock
begin
# Clear any interrupts from any previous invocations that were not caught by Joni
# It may appear that this should go in #stop_thread_groking but that would actually
# break functionality! If this were moved there we would clear the interrupt
# immediately after setting it in #cancel_timed_out, hence this MUST be here
java.lang.Thread.interrupted
ensure
@cancel_mutex.unlock
end
end
# If the regexp finished, but interrupt was called after, we'll want to
# clear the interrupted status anyway
@threads_to_start_time.remove(thread)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even though this is class-internal, this change introduces something of a mixed abstraction to this method -- we register this thread via start_thread_groking and unregister it by reaching directly into an ivar; if we can keep this method at a single level of abstraction and continue to use the stop_thread_groking that this change also removes to unregister, I believe it will provide greater long-term clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opted to remove start_thread_groking and just inline its one call. stop_thread_grokking would only be called once from one spot since there isn't a single way to stop it. In one spot we must compute in another remove.

WDYT of how it looks now?

Copy link
Contributor

@yaauie yaauie Dec 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh, in an ideal world we'd have better encapsulation (e.g., both grok_till_timeout and cancel_time_out! reach into the @threads_to_start_time ivar directly to muck with its internals), but at least this method isn't mixing abstractions.

EDIT: what better encapsulation? this is literally in a class that only encapsulates the timeout enforcement logic. Clearly I need more coffee.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

☕️

thread.interrupted
end
end

def start!
@running = true
@running.set(true)
@timer_thread = Thread.new do
while @running
while @running.get()
begin
cancel_timed_out!
rescue Exception => e
Expand All @@ -54,49 +47,23 @@ def start!
end

def stop!
@running = false
@running.set(false)
# Check for the thread mostly for a fast start/shutdown scenario
@timer_thread.join if @timer_thread
end

private

# These methods are private in large part because if they aren't called
# in specific sequence and used together in specific ways the interrupt
# behavior will be incorrect. Do NOT use or modify these methods unless
# you know what you are doing!

def start_thread_groking(thread)
# Clear any interrupts from any previous invocations that were not caught by Joni
java.lang.Thread.interrupted
@threads_to_start_time.put(thread, java.lang.System.nanoTime)
end

# Returns falsy in case there was no Grok execution in progress for the thread
def stop_thread_groking(thread)
@threads_to_start_time.remove(thread)
end

def cancel_timed_out!
now = java.lang.System.nanoTime # save ourselves some nanotime calls
@threads_to_start_time.entry_set.each do |entry|
start_time = entry.get_value
if start_time < now && now - start_time > @timeout_nanos
thread = entry.get_key
# Ensure that we never attempt to cancel this thread unless a Grok execution is in progress
# Theoretically there is a race condition here in case the entry's grok action changed
# between evaluating the above condition on the start_time and calling stop_thread_groking
# Practically this is impossible, since it would require a whole loop of writing to an
# output, pulling new input events and starting a new Grok execution in worker thread
# in between the above `if start_time < now && now - start_time > @timeout_nanos` and
# the call to `stop_thread_groking`.
if stop_thread_groking(thread)
@cancel_mutex.lock
begin
thread.interrupt()
ensure
@cancel_mutex.unlock
end
@threads_to_start_time.forEach do |thread, start_time|
# Use compute to lock this value
@threads_to_start_time.computeIfPresent(thread) do |thread, start_time|
if start_time < now && now - start_time > @timeout_nanos
thread.interrupt
nil # Delete the key
else
start_time # preserve the key
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion logstash-filter-grok.gemspec
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-filter-grok'
s.version = '4.0.0'
s.version = '4.0.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Parses unstructured event data into fields"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down