Skip to content

Commit

Permalink
ISPN-6610 Stop using forkInvocationSync
Browse files Browse the repository at this point in the history
Interceptors must use onReturn() for things they need
to do after command.perform(). While forkInvocationSync()
still works, it doesn't allow asynchronous execution.

* Fix forkInvocation-related problems in BaseSequentialInvocationContext.
* Make BaseSequentialInvocationContext.invokeInterceptors invoke
  visitCommand in sequence for all interceptors, to allow inlining.
* Move visit notification from CallInterceptor to EntryWrappingInterceptor
  • Loading branch information
danberindei authored and pruivo committed Jun 2, 2016
1 parent 3a2aa5e commit 5d3d55e
Show file tree
Hide file tree
Showing 82 changed files with 3,460 additions and 2,748 deletions.
Expand Up @@ -38,8 +38,7 @@
@Test(groups = "functional", testName = "cdi.test.event.CacheEventTest") @Test(groups = "functional", testName = "cdi.test.event.CacheEventTest")
public class CacheEventTest extends Arquillian { public class CacheEventTest extends Arquillian {


private final NonTxInvocationContext invocationContext = new NonTxInvocationContext(null, AnyEquivalence.getInstance(), private final NonTxInvocationContext invocationContext = new NonTxInvocationContext(null, AnyEquivalence.getInstance());
null);


@Inject @Inject
@Cache1 @Cache1
Expand Down
@@ -1,7 +1,5 @@
package org.infinispan.container; package org.infinispan.container;


import org.infinispan.distribution.DistributionManager;
import org.infinispan.metadata.Metadata;
import org.infinispan.atomic.Delta; import org.infinispan.atomic.Delta;
import org.infinispan.atomic.DeltaAware; import org.infinispan.atomic.DeltaAware;
import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.Configuration;
Expand All @@ -13,8 +11,10 @@
import org.infinispan.container.entries.RepeatableReadEntry; import org.infinispan.container.entries.RepeatableReadEntry;
import org.infinispan.container.entries.StateChangingEntry; import org.infinispan.container.entries.StateChangingEntry;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.factories.annotations.Inject; import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start; import org.infinispan.factories.annotations.Start;
import org.infinispan.metadata.Metadata;
import org.infinispan.util.TimeService; import org.infinispan.util.TimeService;
import org.infinispan.util.concurrent.IsolationLevel; import org.infinispan.util.concurrent.IsolationLevel;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
Expand All @@ -31,7 +31,7 @@
public class EntryFactoryImpl implements EntryFactory { public class EntryFactoryImpl implements EntryFactory {


private static final Log log = LogFactory.getLog(EntryFactoryImpl.class); private static final Log log = LogFactory.getLog(EntryFactoryImpl.class);
private final boolean trace = log.isTraceEnabled(); private final static boolean trace = log.isTraceEnabled();


private boolean useRepeatableRead; private boolean useRepeatableRead;
private DataContainer container; private DataContainer container;
Expand Down
Expand Up @@ -35,7 +35,7 @@ public InvocationContext createInvocationContext(boolean isWrite, int keyCount)
if (keyCount == 1) { if (keyCount == 1) {
return new SingleKeyNonTxInvocationContext(null, keyEq); return new SingleKeyNonTxInvocationContext(null, keyEq);
} else if (keyCount > 0) { } else if (keyCount > 0) {
return new NonTxInvocationContext(keyCount, null, keyEq, interceptorChain); return new NonTxInvocationContext(keyCount, null, keyEq);
} }
return createInvocationContext(null, false); return createInvocationContext(null, false);
} }
Expand All @@ -47,8 +47,7 @@ public InvocationContext createInvocationContext(Transaction tx, boolean implici


@Override @Override
public NonTxInvocationContext createNonTxInvocationContext() { public NonTxInvocationContext createNonTxInvocationContext() {
NonTxInvocationContext ctx = new NonTxInvocationContext(null, keyEq, interceptorChain); return new NonTxInvocationContext(null, keyEq);
return ctx;
} }


@Override @Override
Expand All @@ -58,8 +57,7 @@ public InvocationContext createSingleKeyNonTxInvocationContext() {


@Override @Override
public NonTxInvocationContext createRemoteInvocationContext(Address origin) { public NonTxInvocationContext createRemoteInvocationContext(Address origin) {
NonTxInvocationContext ctx = new NonTxInvocationContext(origin, keyEq, interceptorChain); return new NonTxInvocationContext(origin, keyEq);
return ctx;
} }


@Override @Override
Expand Down
Expand Up @@ -72,7 +72,7 @@ public interface SequentialInvocationContext {
* synchronously. * synchronously.
* *
* <p>Usually it is easier to use than * <p>Usually it is easier to use than
* {@link #forkInvocation(VisitableCommand, SequentialInterceptor.ReturnHandler)}. * {@link #forkInvocation(VisitableCommand, SequentialInterceptor.ForkReturnHandler)}.
* However, it is not recommended, because any asynchronous work in the remaining interceptors will block * However, it is not recommended, because any asynchronous work in the remaining interceptors will block
* the calling thread.</p> * the calling thread.</p>
* *
Expand Down
Expand Up @@ -68,18 +68,18 @@ public InvocationContext createInvocationContext(Transaction tx, boolean implici
throw new IllegalArgumentException("Cannot create a transactional context without a valid Transaction instance."); throw new IllegalArgumentException("Cannot create a transactional context without a valid Transaction instance.");
} }
LocalTransaction localTransaction = transactionTable.getOrCreateLocalTransaction(tx, implicitTransaction); LocalTransaction localTransaction = transactionTable.getOrCreateLocalTransaction(tx, implicitTransaction);
return new LocalTxInvocationContext(localTransaction, interceptorChain); return new LocalTxInvocationContext(localTransaction);
} }


@Override @Override
public LocalTxInvocationContext createTxInvocationContext(LocalTransaction localTransaction) { public LocalTxInvocationContext createTxInvocationContext(LocalTransaction localTransaction) {
return new LocalTxInvocationContext(localTransaction, interceptorChain); return new LocalTxInvocationContext(localTransaction);
} }


@Override @Override
public RemoteTxInvocationContext createRemoteTxInvocationContext( public RemoteTxInvocationContext createRemoteTxInvocationContext(
RemoteTransaction tx, Address origin) { RemoteTransaction tx, Address origin) {
RemoteTxInvocationContext ctx = new RemoteTxInvocationContext(tx, interceptorChain); RemoteTxInvocationContext ctx = new RemoteTxInvocationContext(tx);
return ctx; return ctx;
} }


Expand All @@ -104,7 +104,7 @@ private Transaction getRunningTx() {
} }


protected final NonTxInvocationContext newNonTxInvocationContext(Address origin) { protected final NonTxInvocationContext newNonTxInvocationContext(Address origin) {
NonTxInvocationContext ctx = new NonTxInvocationContext(origin, keyEq, interceptorChain); NonTxInvocationContext ctx = new NonTxInvocationContext(origin, keyEq);
return ctx; return ctx;
} }
} }
Expand Up @@ -2,7 +2,6 @@


import org.infinispan.commands.write.WriteCommand; import org.infinispan.commands.write.WriteCommand;
import org.infinispan.container.entries.CacheEntry; import org.infinispan.container.entries.CacheEntry;
import org.infinispan.interceptors.SequentialInterceptorChain;
import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Address;
import org.infinispan.transaction.impl.AbstractCacheTransaction; import org.infinispan.transaction.impl.AbstractCacheTransaction;
import org.infinispan.transaction.xa.GlobalTransaction; import org.infinispan.transaction.xa.GlobalTransaction;
Expand All @@ -25,8 +24,7 @@ public abstract class AbstractTxInvocationContext<T extends AbstractCacheTransac


private final T cacheTransaction; private final T cacheTransaction;


protected AbstractTxInvocationContext(T cacheTransaction, Address origin, protected AbstractTxInvocationContext(T cacheTransaction, Address origin) {
SequentialInterceptorChain interceptorChain) {
super(origin); super(origin);
if (cacheTransaction == null) { if (cacheTransaction == null) {
throw new NullPointerException("CacheTransaction cannot be null"); throw new NullPointerException("CacheTransaction cannot be null");
Expand Down
Expand Up @@ -5,7 +5,6 @@
import org.infinispan.container.entries.CacheEntry; import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.SequentialInterceptor; import org.infinispan.interceptors.SequentialInterceptor;
import org.infinispan.interceptors.impl.BaseSequentialInvocationContext;
import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Address;


import java.util.Collections; import java.util.Collections;
Expand All @@ -19,7 +18,7 @@
* *
* @author Sanne Grinovero <sanne@infinispan.org> (C) 2011 Red Hat Inc. * @author Sanne Grinovero <sanne@infinispan.org> (C) 2011 Red Hat Inc.
*/ */
public final class ImmutableContext extends BaseSequentialInvocationContext { public final class ImmutableContext implements InvocationContext {


public static final ImmutableContext INSTANCE = new ImmutableContext(); public static final ImmutableContext INSTANCE = new ImmutableContext();


Expand Down
@@ -1,6 +1,5 @@
package org.infinispan.context.impl; package org.infinispan.context.impl;


import org.infinispan.interceptors.SequentialInterceptorChain;
import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Address;
import org.infinispan.transaction.impl.LocalTransaction; import org.infinispan.transaction.impl.LocalTransaction;


Expand All @@ -20,9 +19,8 @@
*/ */
public class LocalTxInvocationContext extends AbstractTxInvocationContext<LocalTransaction> { public class LocalTxInvocationContext extends AbstractTxInvocationContext<LocalTransaction> {


public LocalTxInvocationContext(LocalTransaction localTransaction, public LocalTxInvocationContext(LocalTransaction localTransaction) {
SequentialInterceptorChain interceptorChain) { super(localTransaction, null);
super(localTransaction, null, interceptorChain);
} }


@Override @Override
Expand Down
Expand Up @@ -4,7 +4,6 @@
import org.infinispan.commons.equivalence.Equivalence; import org.infinispan.commons.equivalence.Equivalence;
import org.infinispan.commons.util.CollectionFactory; import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.container.entries.CacheEntry; import org.infinispan.container.entries.CacheEntry;
import org.infinispan.interceptors.SequentialInterceptorChain;
import org.infinispan.remoting.transport.Address; import org.infinispan.remoting.transport.Address;


import java.util.Collections; import java.util.Collections;
Expand All @@ -27,15 +26,13 @@ public class NonTxInvocationContext extends AbstractInvocationContext {
private Object lockOwner; private Object lockOwner;




public NonTxInvocationContext(int numEntries, Address origin, Equivalence<Object> keyEq, public NonTxInvocationContext(int numEntries, Address origin, Equivalence<Object> keyEq) {
SequentialInterceptorChain interceptorChain) {
super(origin); super(origin);
lookedUpEntries = CollectionFactory.makeMap(CollectionFactory.computeCapacity(numEntries), keyEq, AnyEquivalence.getInstance()); lookedUpEntries = CollectionFactory.makeMap(CollectionFactory.computeCapacity(numEntries), keyEq, AnyEquivalence.getInstance());
this.keyEq = keyEq; this.keyEq = keyEq;
} }


public NonTxInvocationContext(Address origin, Equivalence<Object> keyEq, public NonTxInvocationContext(Address origin, Equivalence<Object> keyEq) {
SequentialInterceptorChain interceptorChain) {
super(origin); super(origin);
lookedUpEntries = CollectionFactory.makeMap(INITIAL_CAPACITY, keyEq, AnyEquivalence.getInstance()); lookedUpEntries = CollectionFactory.makeMap(INITIAL_CAPACITY, keyEq, AnyEquivalence.getInstance());
this.keyEq = keyEq; this.keyEq = keyEq;
Expand Down
@@ -1,6 +1,5 @@
package org.infinispan.context.impl; package org.infinispan.context.impl;


import org.infinispan.interceptors.SequentialInterceptorChain;
import org.infinispan.transaction.impl.RemoteTransaction; import org.infinispan.transaction.impl.RemoteTransaction;


import javax.transaction.Transaction; import javax.transaction.Transaction;
Expand All @@ -15,9 +14,8 @@
*/ */
public class RemoteTxInvocationContext extends AbstractTxInvocationContext<RemoteTransaction> { public class RemoteTxInvocationContext extends AbstractTxInvocationContext<RemoteTransaction> {


public RemoteTxInvocationContext(RemoteTransaction cacheTransaction, public RemoteTxInvocationContext(RemoteTransaction cacheTransaction) {
SequentialInterceptorChain interceptorChain) { super(cacheTransaction, cacheTransaction.getGlobalTransaction().getAddress());
super(cacheTransaction, cacheTransaction.getGlobalTransaction().getAddress(), interceptorChain);
} }


@Override @Override
Expand Down
Expand Up @@ -817,7 +817,11 @@ public void buildInjectionMethodsList() throws ClassNotFoundException {
} }
Method m = meta.getMethod(); Method m = meta.getMethod();
if (m == null) { if (m == null) {
m = ReflectionUtil.findMethod(clazz, meta.getMethodName(), parameterClasses); try {
m = ReflectionUtil.findMethod(clazz, meta.getMethodName(), parameterClasses);
} catch (CacheException e) {
throw new CacheException("Injection method not found in class " + clazz + ": " + meta.getMethodName() + Arrays.toString(parameterClasses), e);
}
meta.setMethod(m); meta.setMethod(m);
} }
} }
Expand Down Expand Up @@ -848,9 +852,8 @@ public boolean equals(Object o) {
PrioritizedMethod that = (PrioritizedMethod) o; PrioritizedMethod that = (PrioritizedMethod) o;


if (component != null ? !component.equals(that.component) : that.component != null) return false; if (component != null ? !component.equals(that.component) : that.component != null) return false;
if (metadata != null ? !metadata.equals(that.metadata) : that.metadata != null) return false; return metadata != null ? metadata.equals(that.metadata) : that.metadata == null;


return true;
} }


@Override @Override
Expand Down
Expand Up @@ -29,7 +29,7 @@ public interface SequentialInterceptor {
* <li>The interceptor can call {@link SequentialInvocationContext#shortCircuit(Object)} in order to skip * <li>The interceptor can call {@link SequentialInvocationContext#shortCircuit(Object)} in order to skip
* the execution of the rest of the chain (and the command itself).</li> * the execution of the rest of the chain (and the command itself).</li>
* <li>The interceptor can call * <li>The interceptor can call
* {@link SequentialInvocationContext#forkInvocation(VisitableCommand, ReturnHandler)} in order to invoke * {@link SequentialInvocationContext#forkInvocation(VisitableCommand, ForkReturnHandler)} in order to invoke
* a new command, starting with the next interceptor in the chain. The return handler then behaves as * a new command, starting with the next interceptor in the chain. The return handler then behaves as
* another {@code visitCommand} invocation: it can allow the invocation of the original command to continue * another {@code visitCommand} invocation: it can allow the invocation of the original command to continue
* with the next interceptor, short-circuit the invocation, or fork another command.</li> * with the next interceptor, short-circuit the invocation, or fork another command.</li>
Expand All @@ -39,8 +39,9 @@ public interface SequentialInterceptor {
* </ul> * </ul>
* *
* <p>Thread safety: The interceptor must only invoke methods on the context or command in the thread * <p>Thread safety: The interceptor must only invoke methods on the context or command in the thread
* calling {@link #visitCommand(InvocationContext, VisitableCommand)} or * calling {@code visitCommand(InvocationContext, VisitableCommand)},
* {@link ReturnHandler#handle(InvocationContext, VisitableCommand, Object, Throwable)}.</p> * {@link ReturnHandler#handle(InvocationContext, VisitableCommand, Object, Throwable)}, or
* {@link ForkReturnHandler#handle(InvocationContext, VisitableCommand, Object, Throwable)}.</p>
*/ */
CompletableFuture<Void> visitCommand(InvocationContext ctx, VisitableCommand command) throws Throwable; CompletableFuture<Void> visitCommand(InvocationContext ctx, VisitableCommand command) throws Throwable;


Expand All @@ -53,20 +54,23 @@ public interface SequentialInterceptor {
* {@code CompletableFuture}. Returning {@code null} is allowed, and will continue the invocation with * {@code CompletableFuture}. Returning {@code null} is allowed, and will continue the invocation with
* the next return handler, without modifying the return value and without creating a new * the next return handler, without modifying the return value and without creating a new
* {@code CompletableFuture} instance.</p> * {@code CompletableFuture} instance.</p>
*
* TODO Have different interfaces for processing the return value and for finally-like handlers that don't have access to the return value
* TODO At least prohibit null return values and require an already-completed CF with a known value instead.
*/ */
interface ReturnHandler { interface ReturnHandler {
CompletableFuture<Object> handle(InvocationContext ctx, VisitableCommand command, Object rv, CompletableFuture<Object> handle(InvocationContext rCtx, VisitableCommand rCommand, Object rv,
Throwable throwable) throws Throwable; Throwable throwable) throws Throwable;
} }


/* /**
* A return handler installed with * A return handler installed with
* {@link SequentialInvocationContext#forkInvocation(VisitableCommand, ForkHandler)}. * {@link SequentialInvocationContext#forkInvocation(VisitableCommand, ForkReturnHandler)}.
* *
* <p>It must behave just like {@link #visitCommand(InvocationContext, VisitableCommand)}.</p> * <p>It must behave just like {@link #visitCommand(InvocationContext, VisitableCommand)}.</p>
*/ */
interface ForkReturnHandler { interface ForkReturnHandler {
CompletableFuture<Void> handle(InvocationContext ctx, VisitableCommand command, Object rv, CompletableFuture<Void> handle(InvocationContext rCtx, VisitableCommand rCommand, Object rv,
Throwable throwable) throws Throwable; Throwable throwable) throws Throwable;
} }
} }
Expand Up @@ -3,6 +3,7 @@
import org.infinispan.commands.VisitableCommand; import org.infinispan.commands.VisitableCommand;
import org.infinispan.commons.util.Experimental; import org.infinispan.commons.util.Experimental;
import org.infinispan.context.InvocationContext; import org.infinispan.context.InvocationContext;
import org.infinispan.context.SequentialInvocationContext;


import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
Expand All @@ -17,21 +18,24 @@
*/ */
@Experimental @Experimental
public interface SequentialInterceptorChain { public interface SequentialInterceptorChain {
/**
* @return An immutable list of the current interceptors.
*/
List<SequentialInterceptor> getInterceptors(); List<SequentialInterceptor> getInterceptors();


/** /**
* Inserts the given interceptor at the specified position in the chain (o based indexing). * Inserts the given interceptor at the specified position in the chain (0 based indexing).
* *
* @throws IllegalArgumentException if the position is invalid (e.g. 5 and there are only 2 interceptors in the * @throws IllegalArgumentException if the position is invalid (e.g. 5 and there are only 2 interceptors
* chain) * in the chain)
*/ */
void addInterceptor(SequentialInterceptor interceptor, int position); void addInterceptor(SequentialInterceptor interceptor, int position);


/** /**
* Removes the interceptor at the given postion. * Removes the interceptor at the given position.
* *
* @throws IllegalArgumentException if the position is invalid (e.g. 5 and there are only 2 interceptors in the * @throws IllegalArgumentException if the position is invalid (e.g. 5 and there are only 2 interceptors
* chain) * in the chain)
*/ */
void removeInterceptor(int position); void removeInterceptor(int position);


Expand All @@ -41,28 +45,29 @@ public interface SequentialInterceptorChain {
int size(); int size();


/** /**
* Removes all the occurences of supplied interceptor type from the chain. * Removes all the occurrences of supplied interceptor type from the chain.
*/ */
void removeInterceptor(Class<? extends SequentialInterceptor> clazz); void removeInterceptor(Class<? extends SequentialInterceptor> clazz);


/** /**
* Adds a new interceptor in list after an interceptor of a given type. * Adds a new interceptor in list after an interceptor of a given type.
* *
* @return true if the interceptor was added; i.e. the afterInterceptor exists * @return true if the interceptor was added; i.e. the {@code afterInterceptor} exists
*/ */
boolean addInterceptorAfter(SequentialInterceptor toAdd, Class<? extends boolean addInterceptorAfter(SequentialInterceptor toAdd, Class<? extends
SequentialInterceptor> afterInterceptor); SequentialInterceptor> afterInterceptor);


/** /**
* Adds a new interceptor in list after an interceptor of a given type. * Adds a new interceptor in list before an interceptor of a given type.
* *
* @return true if the interceptor was added; i.e. the afterInterceptor exists * @return true if the interceptor was added; i.e. the {@code beforeInterceptor} exists
*/ */
boolean addInterceptorBefore(SequentialInterceptor toAdd, boolean addInterceptorBefore(SequentialInterceptor toAdd,
Class<? extends SequentialInterceptor> beforeInterceptor); Class<? extends SequentialInterceptor> beforeInterceptor);


/** /**
* Replaces an existing interceptor of the given type in the interceptor chain with a new interceptor instance passed as parameter. * Replaces an existing interceptor of the given type in the interceptor chain with a new interceptor
* instance passed as parameter.
* *
* @param replacingInterceptor the interceptor to add to the interceptor chain * @param replacingInterceptor the interceptor to add to the interceptor chain
* @param toBeReplacedInterceptorType the type of interceptor that should be swapped with the new one * @param toBeReplacedInterceptorType the type of interceptor that should be swapped with the new one
Expand All @@ -80,11 +85,11 @@ boolean replaceInterceptor(SequentialInterceptor replacingInterceptor,
* Walks the command through the interceptor chain. The received ctx is being passed in. * Walks the command through the interceptor chain. The received ctx is being passed in.
* *
* <p>Note: Reusing the context for multiple invocations is allowed. However, the two invocations * <p>Note: Reusing the context for multiple invocations is allowed. However, the two invocations
* must not overlap, so calling {@link #invoke(InvocationContext, VisitableCommand)} from an interceptor * must not overlap, so calling {@code invoke(InvocationContext, VisitableCommand)} from an interceptor
* is not allowed. If an interceptor wants to invoke a new command and cannot use {@link org.infinispan * is not allowed.
* .context.SequentialInvocationContext#forkInvocation(VisitableCommand, SequentialInterceptor * If an interceptor wants to invoke a new command and cannot use
* .ForkReturnHandler)} or * {@link SequentialInvocationContext#forkInvocation(VisitableCommand, SequentialInterceptor.ForkReturnHandler)}
* {@link org.infinispan.context.SequentialInvocationContext#forkInvocationSync(VisitableCommand)}, * or {@link SequentialInvocationContext#forkInvocationSync(VisitableCommand)},
* it must first copy the invocation context with {@link InvocationContext#clone()}.</p> * it must first copy the invocation context with {@link InvocationContext#clone()}.</p>
*/ */
Object invoke(InvocationContext ctx, VisitableCommand command); Object invoke(InvocationContext ctx, VisitableCommand command);
Expand Down

0 comments on commit 5d3d55e

Please sign in to comment.