-
Notifications
You must be signed in to change notification settings - Fork 629
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ISPN-11385 Convert Remote Command Executor to Non blocking/blocking t… #7997
ISPN-11385 Convert Remote Command Executor to Non blocking/blocking t… #7997
Conversation
Preview to confirm other CI modules pass. With this all the regular thread pools are combined into the non blocking and blocking ones ! |
please run performance tests please |
core/src/main/java/org/infinispan/commands/functional/AbstractWriteManyCommand.java
Outdated
Show resolved
Hide resolved
75e1aa2
to
0574783
Compare
run performance tests please |
Only 1 related test failure in the previous run, which should now be fixed (AsyncInvocationTest thread leak fixed). Rerunning with that fixed and also deprecation of canBlock method. |
9c073f6
to
57a1afb
Compare
CI is all good https://ci.infinispan.org/job/Infinispan/job/PR-7997/6/ |
core/src/main/java/org/infinispan/globalstate/impl/GlobalConfigurationManagerImpl.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/infinispan/manager/DefaultCacheManager.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/infinispan/manager/impl/ReplicableManagerFunctionCommand.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/infinispan/remoting/inboundhandler/GlobalInboundInvocationHandler.java
Show resolved
Hide resolved
core/src/main/java/org/infinispan/topology/ClusterTopologyManagerImpl.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/infinispan/util/CoreBlockHoundIntegration.java
Outdated
Show resolved
Hide resolved
@@ -67,6 +73,10 @@ private static void allowTestsToBlock(BlockHound.Builder builder) { | |||
CommonsBlockHoundIntegration.allowPublicMethodsToBlock(builder, NotifierLatch.class); | |||
|
|||
CommonsBlockHoundIntegration.allowPublicMethodsToBlock(builder, TestBlocking.class); | |||
|
|||
CommonsBlockHoundIntegration.allowMethodsToBlock(builder, Class.forName(ReplListener.class.getName() + "$ReplListenerInterceptor"), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming this is just for the Thread.sleep()
call, I think it would be better to add an executor parameter to TestingUtil.delayed()
and to inject the non-blocking executor in ReplListenerInterceptor
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually am not sure why I didn't just add TestingUtil#sleepThread
to the exception list. Let me try that instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah it was because of logCommand
that acquires a lock. I think I will leave it as is for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But I can also add TestingUtil#sleepThread
as okay to block though too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to add TestingUtil#sleepThread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems to work fine without the ReplListenerInterceptor
exception now.
b3bd615
to
ad99dea
Compare
Actually now that the persistence checks are back in it found more issues. So I am working on this still. |
It looks like it should be fixed now, will let CI sort it out :) |
@@ -1139,11 +1138,14 @@ public ClusterExecutor executor() { | |||
if (transport != null) { | |||
long time = configurationManager.getGlobalConfiguration().transport().distributedSyncTimeout(); | |||
return ClusterExecutors.allSubmissionExecutor(null, this, transport, time, TimeUnit.MILLISECONDS, | |||
globalComponentRegistry.getComponent(ExecutorService.class, KnownComponentNames.REMOTE_COMMAND_EXECUTOR), | |||
// This can run arbitrary code, including user - such commands can block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I more put it here because it can block :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be using the non-blocking executor then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately until the other JIRA is fixed, we don't have a great solution. And cluster executor isn't that widely used afaik. But we should hopefully get it fixed before people use it like this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately I think the few users of cluster executor may be doing exactly blocking cache operations, because there's no way to return a value asynchronously.
I'm starting to think that the proper solution is
change PersistenceManagerImpl to detect if it is a blocking thread and run it inline and if non blocking thread to run the command in a blocking thread.
In fact, I would go even further, and change continueOnCPUExecutor
to also continue on the caller thread if the caller thread was blocking. Otherwise, for cluster executor tasks doing cache.put(k1, v1)
, where the put requires 1 store operation to read the previous value and 1 store operation to store the value, the store read would happen on the task's initial blocking thread, but the store write would need another blocking thread. If the size of the blocking thread pool is N and you have N simultaneous tasks like this, there's no free thread to process the store writes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just an FYI but the only way currently we have to detect if it is a blocking thread is to check the thread name, which is quite brittle.
Also your put case, I don't see how the read and write would need concurrent blocking threads. The read would be done before then the write would be done afterwards, synchronously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just an FYI but the only way currently we have to detect if it is a blocking thread is to check the thread name, which is quite brittle.
Can't we do !(Thread.currentThread() instanceof ISPNNonBlockingThread)
?
Also your put case, I don't see how the read and write would need concurrent blocking threads. The read would be done before then the write would be done afterwards, synchronously.
If the cluster executor task does a blocking cache.put(k, v)
, it needs a (blocking) thread for the entire duration of the cache operation. The read would run on the same thread, but then continueOnCPUExecutor()
would submit a task to the non-blocking executor, and the next PersistenceManagerImpl
call would submit a task to the blocking executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we do !(Thread.currentThread() instanceof ISPNNonBlockingThread)?
No, unfortunately. This would include user threads, jgroups etc.
If the cluster executor task does a blocking cache.put(k, v), it needs a (blocking) thread for the entire duration of the cache operation. The read would run on the same thread, but then continueOnCPUExecutor() would submit a task to the non-blocking executor, and the next PersistenceManagerImpl call would submit a task to the blocking executor.
Oh, okay you were not referring to the read then write. I agree if a blocking operation is invoked on a blocking thread then yes it would use more than 1.
core/src/main/java/org/infinispan/manager/impl/ReplicableManagerFunctionCommand.java
Show resolved
Hide resolved
core/src/test/java/org/infinispan/stream/DistributedSequentialNonRehashStreamTest.java
Outdated
Show resolved
Hide resolved
@@ -67,6 +73,10 @@ private static void allowTestsToBlock(BlockHound.Builder builder) { | |||
CommonsBlockHoundIntegration.allowPublicMethodsToBlock(builder, NotifierLatch.class); | |||
|
|||
CommonsBlockHoundIntegration.allowPublicMethodsToBlock(builder, TestBlocking.class); | |||
|
|||
CommonsBlockHoundIntegration.allowMethodsToBlock(builder, Class.forName(ReplListener.class.getName() + "$ReplListenerInterceptor"), false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to add TestingUtil#sleepThread
core/src/main/java/org/infinispan/remoting/inboundhandler/GlobalInboundInvocationHandler.java
Show resolved
Hide resolved
There are some startup failures in CI |
Yeah I figured there may be and my local run was just a fluke. |
So the failure is because we commit the transaction in a blocking thread as the API is blocking. However the store write can touch the cache store, which doesn't want you to run it on the blocking thread. I am not sure how to fix this other than to change PersistenceManagerImpl to detect if it is a blocking thread and run it inline and if non blocking thread to run the command in a blocking thread. I think this is how it should be long term, but sadly this check is finding lots of bugs as it is now :) |
4730be1
to
bf6f39a
Compare
There are some test failures from the tx changes where I missed something, guessing I am not joining somewhere :) |
9797610
to
5ae68c5
Compare
21db4df
to
fdefde6
Compare
Updated to fix the 2 blocking test failures. |
core/src/main/java/org/infinispan/cache/impl/InvocationHelper.java
Outdated
Show resolved
Hide resolved
@@ -1139,11 +1138,14 @@ public ClusterExecutor executor() { | |||
if (transport != null) { | |||
long time = configurationManager.getGlobalConfiguration().transport().distributedSyncTimeout(); | |||
return ClusterExecutors.allSubmissionExecutor(null, this, transport, time, TimeUnit.MILLISECONDS, | |||
globalComponentRegistry.getComponent(ExecutorService.class, KnownComponentNames.REMOTE_COMMAND_EXECUTOR), | |||
// This can run arbitrary code, including user - such commands can block |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately I think the few users of cluster executor may be doing exactly blocking cache operations, because there's no way to return a value asynchronously.
I'm starting to think that the proper solution is
change PersistenceManagerImpl to detect if it is a blocking thread and run it inline and if non blocking thread to run the command in a blocking thread.
In fact, I would go even further, and change continueOnCPUExecutor
to also continue on the caller thread if the caller thread was blocking. Otherwise, for cluster executor tasks doing cache.put(k1, v1)
, where the put requires 1 store operation to read the previous value and 1 store operation to store the value, the store read would happen on the task's initial blocking thread, but the store write would need another blocking thread. If the size of the blocking thread pool is N and you have N simultaneous tasks like this, there's no free thread to process the store writes.
core/src/main/java/org/infinispan/manager/impl/ReplicableManagerFunctionCommand.java
Show resolved
Hide resolved
core/src/main/java/org/infinispan/transaction/impl/TransactionCoordinator.java
Outdated
Show resolved
Hide resolved
} | ||
} | ||
|
||
private <T> CompletionStage<T> handleRollbackFailure(Throwable t, LocalTransaction localTransaction) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AFAICT handleRollbackFailure
and handleCommitFailure
don't need to return a CompletionStage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They don't, but the users of it require it to be a CompletionStage, so less code overall. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you return void and throw the exception directly, the callers can use handle
instead of handleAndCompose
core/src/main/java/org/infinispan/transaction/xa/XaTransactionTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/infinispan/transaction/xa/XaTransactionTable.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/infinispan/transaction/xa/recovery/RecoveryManagerImpl.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/infinispan/transaction/xa/recovery/RecoveryManagerImpl.java
Outdated
Show resolved
Hide resolved
@@ -41,6 +43,7 @@ public void test() throws Exception { | |||
cache(2).put(key, "value"); | |||
|
|||
ControlledRpcManager rpcManager = ControlledRpcManager.replaceRpcManager(cache(2)); | |||
rpcManager.excludeCommands(StateResponseCommand.class, StateTransferStartCommand.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's so weird that it wasn't a problem before, I have to debug the test to see how it's passing on master :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree. But I wasn't quite sure what was going on.
d028787
to
7010d95
Compare
|
||
//rollback transaction before throwing the exception as there's no guarantee the TM calls XAResource.rollback | ||
//after prepare failed. | ||
return CompletionStages.handleAndCompose(rollback(localTransaction), (ignore2, rollbackThrowable) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for handleAndCompose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, why. We need to handle the case when it was not an error to wrap it with an XAException still. And we want to still catch the rollback exception to supress or rethrow that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but none of those need to return a CompletionStage, so you can use handle instead of handleAndCompose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to keep all the exceptions as bare XAException
. If I do the other I would have to use CompletionException
wrapping XAException
and all callers would have to pay attention to that including TransactionXAAdapter
, but I can do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I don't feel comfortable changing this right now. I can revisit later if we need.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to keep all the exceptions as bare
XAException
. If I do the other I would have to useCompletionException
wrappingXAException
and all callers would have to pay attention to that includingTransactionXAAdapter
, but I can do that.
Not sure what you mean. When you do CompletableFuture.join()
it will wrap the exception in a CompletionStage
anyway, so you have to be prepared to extract the exception with CompletableFutures.extractException
.
But I'm ok with revisiting this later.
core/src/main/java/org/infinispan/transaction/impl/TransactionCoordinator.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/infinispan/transaction/xa/TransactionXaAdapter.java
Outdated
Show resolved
Hide resolved
7010d95
to
3173ad9
Compare
Fixed latest comments. |
514d861
to
6030cc1
Compare
core/src/main/java/org/infinispan/transaction/xa/recovery/RecoveryManagerImpl.java
Outdated
Show resolved
Hide resolved
…hread executor * Invoke commands that block on blocking executor * Invoke other commands by caller * Use non blocking executor instead of remote in other places
… on a blocking thread
6030cc1
to
0caa5b7
Compare
Merged, thanks Will! |
+1
…On Fri, Mar 20, 2020, 5:28 PM Dan Berindei ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
core/src/test/java/org/infinispan/util/CoreTestBlockHoundIntegration.java
<#7997 (comment)>
:
> @@ -67,6 +73,10 @@ private static void allowTestsToBlock(BlockHound.Builder builder) {
CommonsBlockHoundIntegration.allowPublicMethodsToBlock(builder, NotifierLatch.class);
CommonsBlockHoundIntegration.allowPublicMethodsToBlock(builder, TestBlocking.class);
+
+ CommonsBlockHoundIntegration.allowMethodsToBlock(builder, Class.forName(ReplListener.class.getName() + "$ReplListenerInterceptor"), false);
Seems to work fine without the ReplListenerInterceptor exception now.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#7997 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AAE6H35HN5XE3N3ZOXPQLO3RIPNZTANCNFSM4LBT6TGQ>
.
|
…hread executor
https://issues.redhat.com/browse/ISPN-11385
https://issues.redhat.com/browse/ISPN-11473
https://issues.redhat.com/browse/ISPN-11489