Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mark both client and server as not worker safe #14

Closed

Conversation

andrewvc
Copy link

This is due to IO#select not being threadsafe in JRuby

Fixes #13

@@ -38,6 +38,9 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
# event will be written as a single line.
config :message_format, :validate => :string, :deprecated => true

# respond_to? check needed for backwards compatibility with < 2.2 Logstashes
declare_workers_not_supported! if self.respond_to?(:workers_not_supported!)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be :workers_not_supported without !, right?

~/projects/logstash (git)-[2.2] % grep workers_not_supported\! . -r
./docs/static/include/pluginbody.asciidoc:  declare_workers_not_supported! if self.respond_to?(:workers_not_supported!)
./docs/static/include/pluginbody.asciidoc:    # Does the same thing as declare_workers_not_supported!
./logstash-core/lib/logstash/outputs/base.rb:    declare_workers_not_supported!
./logstash-core/lib/logstash/outputs/base.rb:  def self.declare_workers_not_supported!(message=nil)
./logstash-core/lib/logstash/outputs/base.rb:    self.class.declare_workers_not_supported!(message)

and in 2.1.1

/tmp/logstash-2.1.1 % grep workers_not_supported\! . -r
/tmp/logstash-2.1.1 % 

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems pluginbody.asciidoc also needs to be fixed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd nope! workers_not_supported is the legacy instance method. That's why we call both in this plugin. In v3.0.0 we can remove the legacy calls.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsvd I do see a bug here though, it should be declare_workers_not_supported! if self.respond_to?(:declare_workers_not_supported!). I need to fix that in the asciidocs as well.

@jordansissel
Copy link
Contributor

Server is threaded per-connection and appears to be synchronous (doesn't use IO.select). The main thread uses Socket#accept, by my reading anyway.

Client does use IO.select, but with the old pipeline (before your filter-output merge), output workers were clones of output instances, so this was safe.

@jordansissel
Copy link
Contributor

Oops, submitted before I was done.

Continuing from above "so this was safe" - can you update the commit message to reflect the cause of the tcp output w/ client mode no longer being threadsafe? (cause being the pipeline change, or never was threadsafe, or whatever?)

@@ -1,5 +1,6 @@
## 2.0.3
- Declare plugin as not worker safe for both server and client mode. (IO#select is not threadsafe)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think server mode is worker safe. Can you add a comment in the code linking to where IO#select is noted as not threadsafe in JRuby (docs somewhere on jruby, if possible?)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The server mode was already marked as workers_not_supported before my change. Is this not the case? It shouldn't be, AFAICT given that only one worker can bind to a port.

@jsvd
Copy link
Member

jsvd commented Jan 11, 2016

Note that this plugin hasn't been worker safe for the client mode before, but since the logstash-core patch that increased the number of output workers to match the filter workers, this limitation has become evident.

logstash 2.1.1:

/tmp/logstash-2.1.1 % bin/logstash -e "input { generator { count => 1000  } } output { tcp { workers => 4 port => 3333 host => localhost codec => plain { format => '%{sequence}' } } }"
Settings: Default filter workers: 2
Logstash startup completed
tcp output exception {:host=>"localhost", :port=>3333, :exception=>java.nio.channels.IllegalBlockingModeException, :backtrace=>["java.nio.channels.spi.AbstractSelectableChannel.register(AbstractSelectableChannel.java:201)", "org.jruby.util.io.SelectBlob.registerSelect(SelectBlob.java:519)", "org.jruby.util.io.SelectBlob.trySelectRead(SelectBlob.java:163)", "org.jruby.util.io.SelectBlob.processReads(SelectBlob.java:135)", "org.jruby.util.io.SelectBlob.goForIt(SelectBlob.java:63)", "org.jruby.RubyIO.select_static(RubyIO.java:3686)", "org.jruby.RubyIO.select(RubyIO.java:3682)", "org.jruby.RubyIO$INVOKER$s$0$3$select.call(RubyIO$INVOKER$s$0$3$select.gen)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:70)", "org.jruby.ast.CallManyArgsNode.interpret(CallManyArgsNode.java:59)", "org.jruby.ast.MultipleAsgn19Node.interpret(MultipleAsgn19Node.java:104)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.ast.RescueNode.executeBody(RescueNode.java:221)", "org.jruby.ast.RescueNode.interpret(RescueNode.java:116)", "org.jruby.ast.BeginNode.interpret(BeginNode.java:83)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)", "org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)", "org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)", "org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)", "org.jruby.runtime.Block.call(Block.java:101)", "org.jruby.RubyProc.call(RubyProc.java:300)", "org.jruby.RubyProc.call19(RubyProc.java:281)", "org.jruby.RubyProc$INVOKER$i$0$0$call19.call(RubyProc$INVOKER$i$0$0$call19.gen)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:218)", "org.jruby.internal.runtime.methods.DynamicMethod.call(DynamicMethod.java:214)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:202)", "org.jruby.ast.CallTwoArgNode.interpret(CallTwoArgNode.java:59)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.IfNode.interpret(IfNode.java:116)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)", "org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)", "org.jruby.ast.FCallOneArgNode.interpret(FCallOneArgNode.java:36)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_METHOD(ASTInterpreter.java:74)", "org.jruby.internal.runtime.methods.InterpretedMethod.call(InterpretedMethod.java:182)", "org.jruby.internal.runtime.methods.DefaultMethod.call(DefaultMethod.java:203)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:168)", "org.jruby.ast.CallOneArgNode.interpret(CallOneArgNode.java:57)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.ast.WhileNode.interpret(WhileNode.java:131)", "org.jruby.ast.NewlineNode.interpret(NewlineNode.java:105)", "org.jruby.ast.BlockNode.interpret(BlockNode.java:71)", "org.jruby.evaluator.ASTInterpreter.INTERPRET_BLOCK(ASTInterpreter.java:112)", "org.jruby.runtime.Interpreted19Block.evalBlockBody(Interpreted19Block.java:206)", "org.jruby.runtime.Interpreted19Block.yield(Interpreted19Block.java:194)", "org.jruby.runtime.Interpreted19Block.call(Interpreted19Block.java:125)", "org.jruby.runtime.Block.call(Block.java:101)", "org.jruby.RubyProc.call(RubyProc.java:300)", "org.jruby.RubyProc.call(RubyProc.java:230)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:99)", "java.lang.Thread.run(Thread.java:745)"], :level=>:warn}
tcp output exception {:host=>"localhost", :port=>3333, :exception=>#<Errno::EBADF: Bad file descriptor - Bad file descriptor>, :backtrace=>["org/jruby/RubyIO.java:3682:in `select'", "/tmp/logstash-2.1.1/vendor/bundle/jruby/1.9/gems/logstash-output-tcp-2.0.2/lib/logstash/outputs/tcp.rb:101:in `register'", "org/jruby/RubyProc.java:281:in `call'", "/tmp/logstash-2.1.1/vendor/bundle/jruby/1.9/gems/logstash-codec-plain-2.0.2/lib/logstash/codecs/plain.rb:41:in `encode'", "/tmp/logstash-2.1.1/vendor/bundle/jruby/1.9/gems/logstash-output-tcp-2.0.2/lib/logstash/outputs/tcp.rb:143:in `receive'", "/tmp/logstash-2.1.1/vendor/bundle/jruby/1.9/gems/logstash-core-2.1.1-java/lib/logstash/outputs/base.rb:81:in `handle'", "/tmp/logstash-2.1.1/vendor/bundle/jruby/1.9/gems/logstash-core-2.1.1-java/lib/logstash/outputs/base.rb:71:in `worker_setup'"], :level=>:warn}
Failed (Connection refused - Connection refused). Sleeping for 0.02Failed (Connection refused - Connection refused). Sleeping for 0.02
Failed (Connection refused - Connection refused). Sleeping for 0.04
Failed (Connection refused - Connection refused). Sleeping for 0.04
Failed (Connection refused - Connection refused). Sleeping for 2.0
Failed (Connection refused - Connection refused). Sleeping for 2.0
tcp output exception {:host=>"localhost", :port=>3333, :exception=>#<ConcurrencyError: can not set IO blocking after select; concurrent select detected?>, :backtrace=>["org/jruby/RubyIO.java:3682:in `select'", "/tmp/logstash-2.1.1/vendor/bundle/jruby/1.9/gems/logstash-output-tcp-2.0.2/lib/logstash/outputs/tcp.rb:101:in `register'", "org/jruby/RubyProc.java:281:in `call'", "/tmp/logstash-2.1.1/vendor/bundle/jruby/1.9/gems/logstash-codec-plain-2.0.2/lib/logstash/codecs/plain.rb:41:in `encode'", "/tmp/logstash-2.1.1/vendor/bundle/jruby/1.9/gems/logstash-output-tcp-2.0.2/lib/logstash/outputs/tcp.rb:143:in `receive'", "/tmp/logstash-2.1.1/vendor/bundle/jruby/1.9/gems/logstash-core-2.1.1-java/lib/logstash/outputs/base.rb:81:in `handle'", "/tmp/logstash-2.1.1/vendor/bundle/jruby/1.9/gems/logstash-core-2.1.1-java/lib/logstash/outputs/base.rb:71:in `worker_setup'"], :level=>:warn}
^CSIGINT received. Shutting down the pipeline. {:level=>:warn}
^CSIGINT received. Terminating immediately.. {:level=>:fatal}

@jordansissel
Copy link
Contributor

@jsvd how odd! Thanks for showing me the exception. :)

@jsvd
Copy link
Member

jsvd commented Jan 11, 2016

Same in 1.5.4:

/tmp/logstash-1.5.4 % bin/logstash -e "input { generator { count => 1000  } } output { tcp { workers => 4 port => 3333 host => localhost codec => plain { format => '%{sequence}' } } }"
Logstash startup completed
Failed (Connection refused - Connection refused). Sleeping for 0.02Failed (Connection refused - Connection refused). Sleeping for 0.02
...
Failed (Connection refused - Connection refused). Sleeping for 0.04
...
Failed (Connection refused - Connection refused). Sleeping for 0.16
Failed (Connection refused - Connection refused). Sleeping for 0.32Failed (Connection refused - Connection refused). Sleeping for 0.32Failed (Connection refused - Connection refused). Sleeping for 0.32Failed (Connection refused - Connection refused). Sleeping for 0.32
Failed (Connection refused - Connection refused). Sleeping for 0.64
Failed (Connection reset by peer). Sleeping for 1.28
Failed (Connection reset by peer). Sleeping for 1.28
tcp output exception {:host=>"localhost", :port=>3333, :exception=>#<ConcurrencyError: can not set IO blocking after select; concurrent select detected?>, :backtrace=>["org/jruby/RubyIO.java:3682:in `select'", "/tmp/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-tcp-1.0.0/lib/logstash/outputs/tcp.rb:101:in `register'", "org/jruby/RubyProc.java:271:in `call'", "/tmp/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-codec-plain-1.0.0/lib/logstash/codecs/plain.rb:41:in `encode'", "/tmp/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-output-tcp-1.0.0/lib/logstash/outputs/tcp.rb:143:in `receive'", "/tmp/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/outputs/base.rb:88:in `handle'", "/tmp/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/outputs/base.rb:79:in `worker_setup'"], :level=>:warn}
^CSIGINT received. Shutting down the pipeline. {:level=>:warn}
^CSIGINT received. Terminating immediately.. {:level=>:fatal}

@andrewvc
Copy link
Author

@jordansissel looking at the 2.1 codebase it looks like we were just creating new instances, not cloning see here.

I think this has always been a bug.

@andrewvc
Copy link
Author

@jsvd has confirmed that 2.1 had this same issue.

@jsvd
Copy link
Member

jsvd commented Jan 11, 2016

I cannot get 1.4.2 to do the same, so it seems the logic indeed changed from 1.4 to 1.5

@jsvd
Copy link
Member

jsvd commented Jan 12, 2016

I'm trying to understand why the client isn't threadsafe: so multiple pipeline workers (N threads) will reuse the same output plugin which has multiple workers (M) to send data out.

So, potentially, all N threads can, at some point, use the same worker out of M and that is why more than 1 thread does a select on the same tcp socket?

Shouldn't the access to the output worker pool be synchronized?

@andrewvc
Copy link
Author

@jsvd yes, but I assumed that with the JMM using the SizedQueue synchronizes all the variables in the thread. I need to reread https://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.4.5 but IIRC that is the case.

I could be wrong however. Maybe we do need to have a per-output synchronized block.

HOWEVER I doubt that because the problem goes away with one worker. The object is shared across threads in that case as well.

@jsvd jsvd added the P2 label Apr 26, 2016
This is due to IO#select not being threadsafe in JRuby

Fixes logstash-plugins#13
@andrewvc andrewvc force-pushed the declare_workers_not_supported branch from 315d561 to 6f27ea0 Compare June 2, 2016 19:20
@andrewvc andrewvc force-pushed the declare_workers_not_supported branch from 6f27ea0 to 5c3e73d Compare June 2, 2016 19:21
@andrewvc
Copy link
Author

andrewvc commented Jun 2, 2016

@jsvd @jordansissel can you guys help follow-up with me on this patch here?

@jordansissel
Copy link
Contributor

LGTM - +1 on marking this plugin (both server and client) as not worker/thread-safe until we can make it safe.

@jsvd
Copy link
Member

jsvd commented Oct 12, 2016

this was addressed in #21

@jsvd jsvd closed this Oct 12, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants