Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix a possible race where Thread.interrupted was not properly cleared #131

Closed
wants to merge 1 commit into from

Conversation

andrewvc
Copy link
Contributor

@andrewvc andrewvc commented Dec 14, 2017

This should keep perf close enough. It's a little slower, but worth it for the consistency.

Old perf as measured with time:
54.57 real 216.58 user 3.51 sys
New perf as measured with time:
59.58 real 238.70 user 4.21 sys

Test config:

# MULTIPLE grok filters
input {
    generator {
        type => foo
        message => "random message, la la la"
        count => 1000000
    }
}

filter {
    grok {
        match => {
            "message" => "^1 foo 1 bar$"
        }
    }
    grok {
        match => {
            "message" => "^2 foo 2 bar$"
        }
    }
    grok {
        match => {
            "message" => "^3 foo 3 bar$"
        }
    }
    grok {
        match => {
            "message" => "^4 foo 4 bar$"
        }
    }
    grok {
        match => {
            "message" => "^5 foo 5 bar$"
        }
    }
    grok {
        match => {
            "message" => "^6 foo 6 bar$"
        }
    }
    grok {
        match => {
            "message" => "^7 foo 7 bar$"
        }
    }
    grok {
        match => {
            "message" => "^8 foo 8 bar$"
        }
    }
    grok {
        match => {
            "message" => "^9 foo 9 bar$"
        }
    }
    grok {
        match => {
            "message" => "^10 foo 10 bar$"
        }
    }
    grok {
        match => {
            "message" => "^11 foo 11 bar$"
        }
    }
    grok {
        match => {
            "message" => "^12 foo 12 bar$"
        }
    }
    grok {
        match => {
            "message" => "^13 foo 13 bar$"
        }
    }
    grok {
        match => {
            "message" => "^14 foo 14 bar$"
        }
    }
    grok {
        match => {
            "message" => "^15 foo 15 bar$"
        }
    }
    grok {
        match => {
            "message" => "^16 foo 16 bar$"
        }
    }
    grok {
        match => {
            "message" => "^17 foo 17 bar$"
        }
    }
    grok {
        match => {
            "message" => "^18 foo 18 bar$"
        }
    }
    grok {
        match => {
            "message" => "^19 foo 19 bar$"
        }
    }
    grok {
        match => {
            "message" => "^20 foo 20 bar$"
        }
    }

    metrics {
        meter => "events"
        add_tag => "metric"
    }
}

output {
    if "metric" in [tags] {
        stdout {
            codec => line {
                format => "rate_1m: %{[events][rate_1m]}, rate_5m: %{[events][rate_5m]}"
            }
        }
    }
}

rescue java.lang.InterruptedException => e
# NOOP, we don't expect these, but maybe some interruptible thing could be
# added to grok besides regexps
@logger.debug("Unexpected interruptible caught during grok. This isn't a problem most likely")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the grok has likely been interrupted, shouldn't we either log at a higher, warn or error or bubble up the exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, good point, yes, it should be a warn.

@@ -72,31 +68,16 @@ def start_thread_groking(thread)
@threads_to_start_time.put(thread, java.lang.System.nanoTime)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one thing confusing here and in the grok_till_timeout method is that the current thread is passed as the thread parameter but the current thread is also referenced implicitly using the java.lang.Thread.xxx.
AFAIU both should be the same thread so I wonder for clarity sake if we should not use one thread reference notation?

java.lang.Thread.interrupted
@threads_to_start_time.put(java.lang.Thread.currentThread(), java.lang.System.nanoTime)

or

thread.interrupted
@threads_to_start_time.put(thread, java.lang.System.nanoTime)

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on standardizing on the instance

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, this is now done

@colinsurprenant
Copy link
Contributor

Left some minor notes, the thread interruption handling logic seems good.

end
# If the regexp finished, but interrupt was called after, we'll want to
# clear the interrupted status anyway
@threads_to_start_time.remove(thread)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even though this is class-internal, this change introduces something of a mixed abstraction to this method -- we register this thread via start_thread_groking and unregister it by reaching directly into an ivar; if we can keep this method at a single level of abstraction and continue to use the stop_thread_groking that this change also removes to unregister, I believe it will provide greater long-term clarity.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've opted to remove start_thread_groking and just inline its one call. stop_thread_grokking would only be called once from one spot since there isn't a single way to stop it. In one spot we must compute in another remove.

WDYT of how it looks now?

Copy link
Contributor

@yaauie yaauie Dec 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh, in an ideal world we'd have better encapsulation (e.g., both grok_till_timeout and cancel_time_out! reach into the @threads_to_start_time ivar directly to muck with its internals), but at least this method isn't mixing abstractions.

EDIT: what better encapsulation? this is literally in a class that only encapsulates the timeout enforcement logic. Clearly I need more coffee.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

☕️

# If the regexp finished, but interrupt was called after, we'll want to
# clear the interrupted status anyway
@threads_to_start_time.remove(thread)
java.lang.Thread.interrupted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we be sending java.lang.Thread#isInterrupted() to our thread? It's odd that we both use a reference for the current thread and also rely on static methods that target the current thread.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isInterrupted() does not clear the interrupted state, interrupted() does. In other words, if the thread is in an interrupted state, calling twice interrupted() will return true then false.

I agree we should use thread.interrupted() clarity/consistency ... but practically this has no impact since thread is the current thread too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 on using the local thread object for clarity

@threads_to_start_time.compute(thread) do |thread, start_time|
if start_time && start_time < now && now - start_time > @timeout_nanos
thread.interrupt
nil # Delete the key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little unsure about the jruby/java boundary here -- nil is not null, and while I can find documentation that the coersion happens from Java to Ruby, I'm not finding anything explicit about the other direction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

$ irb
irb(main):001:0> require "java"
=> false
irb(main):002:0> h = java.util.concurrent.ConcurrentHashMap.new
=> {}
irb(main):003:0> h.inspect
=> "{}"
irb(main):004:0> h.put("a", 1)
=> nil
irb(main):005:0> h.inspect
=> "{\"a\"=>1}"
irb(main):006:0> h.compute("a") {|v| nil}
=> nil
irb(main):007:0> h.inspect
=> "{}"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for confirming this behavior colin!

@colinsurprenant
Copy link
Contributor

nit: I noticed that @cancel_mutex is useless now, we could remove it.

@colinsurprenant
Copy link
Contributor

@andrewvc @jordansissel following up on our conversation about ConcurrentHashMap forEach behaviour in this situation:

    @threads_to_start_time.forEach do |thread, start_time|
      @threads_to_start_time.compute(thread) do |thread, start_time|
      ...
      end
    end

From what I can read in https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/package-summary.html#Weakly

Most concurrent Collection implementations (including most Queues) also differ from the usual java.util conventions in that their Iterators and Spliterators provide weakly consistent rather than fast-fail traversal:

they may proceed concurrently with other operations
they will never throw ConcurrentModificationException
they are guaranteed to traverse elements as they existed upon construction exactly once, and may (but are not guaranteed to) reflect any modifications subsequent to construction.

So I am not sure about the behaviour of forEach but I would suspect it is also weakly consistent so I think instead of using @threads_to_start_time.compute we could simply use @threads_to_start_time.computeIfPresent and that would account for the possibility of having the thread removed from the collection while in the forEach loop.

@andrewvc
Copy link
Contributor Author

@colinsurprenant makes sense to move to computeIfPresent, I'll make that improvement.

@andrewvc
Copy link
Contributor Author

I changed this in a few ways:

  1. Inlined start_thread_grokking method for simplicity (it was called in one spot)
  2. Removed the warning for non grok interrupted errors, we'll just report those as regexp interrupted errors, this is a boundary condition that occurs only when regexps take too long
  3. Switched from compute to computeIfPresent
  4. Removed an overly conservative call to thread.interrupted before we start grokking. This is unnecessary

@colinsurprenant
Copy link
Contributor

@andrewvc nit: just noticed that @cancel_mutex is still defined but unused

@andrewvc
Copy link
Contributor Author

@colinsurprenant just removed that extra line, good catch!

@colinsurprenant
Copy link
Contributor

Another observation: this is not part of the change set but might be a good idea to modify: the @running bool is used across threads to control termination, I'd suggest we make it an AtomicBoolean to make it explicitly threadsafe.

@colinsurprenant
Copy link
Contributor

@andrewvc I leave it to you to decide for @running since I believe this will not have practical impact.
LGTM!
Really good job on this!!

@andrewvc
Copy link
Contributor Author

@colinsurprenant moved @running to an atomic boolean, good catch. Apparently it didn't cause a problem before, but it definitely wasn't right.

Copy link
Contributor

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

end
# If the regexp finished, but interrupt was called after, we'll want to
# clear the interrupted status anyway
@threads_to_start_time.remove(thread)
Copy link
Contributor

@yaauie yaauie Dec 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh, in an ideal world we'd have better encapsulation (e.g., both grok_till_timeout and cancel_time_out! reach into the @threads_to_start_time ivar directly to muck with its internals), but at least this method isn't mixing abstractions.

EDIT: what better encapsulation? this is literally in a class that only encapsulates the timeout enforcement logic. Clearly I need more coffee.

@elasticsearch-bot
Copy link

Andrew Cholakian merged this into the following branches!

Branch Commits
master 82ec779

@PhaedrusTheGreek
Copy link

@andrewvc , Is this fix available in LS 5.5.2?

Seems like I have only 3.4.4, and If I understand correctly, this fix is in 4.0.1 ?

$  bin/logstash-plugin update logstash-filter-grok
Updating logstash-filter-grok
Updated logstash-filter-grok 3.4.2 to 3.4.4

@PhaedrusTheGreek
Copy link

Just confirmed I was able to upgrade in LS 5.6.3

$ bin/logstash-plugin update logstash-filter-grok
Updating logstash-filter-grok
Updated logstash-filter-grok 4.0.0 to 4.0.1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants