Skip to content

Commit

Permalink
ISPN-7551 Use invokeAsync in CacheImpl.*async methods
Browse files Browse the repository at this point in the history
* Remove FutureMode Param in functional API
  • Loading branch information
rvansa authored and wburns committed Apr 21, 2017
1 parent 2b8fca7 commit fc2156a
Show file tree
Hide file tree
Showing 30 changed files with 383 additions and 1,163 deletions.
@@ -1,13 +1,10 @@
package org.infinispan.commons.api.functional;

import java.util.concurrent.CompletableFuture;

import org.infinispan.commons.util.Experimental;

/**
* An easily extensible parameter that allows functional map operations to be
* tweaked. Apart from {@link org.infinispan.commons.api.functional.Param.FutureMode}, examples
* would include local-only parameter, skip-cache-store parameter and others.
* tweaked. Examples would include local-only parameter, skip-cache-store parameter and others.
*
* <p>What makes {@link Param} different from {@link MetaParam} is that {@link Param}
* values are never stored in the functional map. They merely act as ways to
Expand Down Expand Up @@ -43,52 +40,6 @@ public interface Param<P> {
*/
P get();

/**
* When a method defines {@link CompletableFuture} as a return type, it
* implies the method called will be called asynchronously and that the
* {@link CompletableFuture} returned will be completed once the method's
* work is complete.
*
* <p>So, calling a method that returns {@link CompletableFuture} normally
* implies that the method will allocate a thread to do that job. However,
* there are situations when the user calls a method that returns
* {@link CompletableFuture} and immediately calls {@link CompletableFuture#get()}
* or similar methods to get the result. Calling such methods result in the
* caller thread blocking in which case, having such method spawn another
* thread is a waste of resources. So, for such situations, the caller can
* pass in the {@link #COMPLETED} param so that the internal logic avoids
* creating a separate thread, since the caller thread will block to get
* the result immediately.
*
* <p>By default, all methods returning {@link CompletableFuture} are
* asynchronous, hence using the {@link #ASYNC} future mode.
*
* @since 8.0
*/
@Experimental
enum FutureMode implements Param<FutureMode> {
ASYNC, COMPLETED;

public static final int ID = ParamIds.FUTURE_MODE_ID;

@Override
public int id() {
return ID;
}

@Override
public FutureMode get() {
return this;
}

/**
* Provides default future mode.
*/
public static FutureMode defaultValue() {
return ASYNC;
}
}

/**
* When a persistence store is attached to a cache, by default all write
* operations, regardless of whether they are inserts, updates or removes,
Expand Down
Expand Up @@ -10,8 +10,7 @@
@Experimental
public final class ParamIds {

public static final int FUTURE_MODE_ID = 0;
public static final int PERSISTENCE_MODE_ID = 1;
public static final int PERSISTENCE_MODE_ID = 0;

private ParamIds() {
// Cannot be instantiated, it's just a holder class
Expand Down
271 changes: 108 additions & 163 deletions core/src/main/java/org/infinispan/cache/impl/CacheImpl.java

Large diffs are not rendered by default.

@@ -1,5 +1,7 @@
package org.infinispan.functional.impl;

import java.util.concurrent.CompletableFuture;

import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
Expand All @@ -9,6 +11,7 @@
import org.infinispan.commons.CacheException;
import org.infinispan.commons.api.functional.FunctionalMap;
import org.infinispan.commons.api.functional.Status;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.infinispan.configuration.cache.Configuration;
Expand All @@ -24,18 +27,21 @@ abstract class AbstractFunctionalMap<K, V> implements FunctionalMap<K, V> {
private static final Log log = LogFactory.getLog(FunctionalMap.class);

protected final FunctionalMapImpl<K, V> fmap;
protected final Params params;

private final boolean transactional;
private final boolean autoCommit;
private final BatchContainer batchContainer;
private final TransactionManager transactionManager;

protected AbstractFunctionalMap(FunctionalMapImpl<K, V> fmap) {
protected AbstractFunctionalMap(Params params, FunctionalMapImpl<K, V> fmap) {
this.fmap = fmap;
Configuration config = fmap.cache.getCacheConfiguration();
transactional = config.transaction().transactionMode().isTransactional();
autoCommit = config.transaction().autoCommit();
transactionManager = transactional ? fmap.cache.getTransactionManager() : null;
batchContainer = transactional && config.invocationBatching().enabled() ? fmap.cache.getBatchContainer() : null;
this.params = params;
}

@Override
Expand Down Expand Up @@ -69,36 +75,13 @@ protected InvocationContext getInvocationContext(boolean isWrite, int keyCount)
transaction = getOngoingTransaction();
txInjected = true;
}
invocationContext = fmap.invCtxFactory().createInvocationContext(transaction, txInjected);
invocationContext = fmap.invCtxFactory.createInvocationContext(transaction, txInjected);
} else {
invocationContext = fmap.invCtxFactory().createInvocationContext(isWrite, keyCount);
invocationContext = fmap.invCtxFactory.createInvocationContext(isWrite, keyCount);
}
return invocationContext;
}

protected void commitIfNeeded(InvocationContext ctx) {
if (ctx.isInTxScope() && ((TxInvocationContext) ctx).isImplicitTransaction()) {
if (transactionManager != null) {
try {
transactionManager.commit();
} catch (Throwable e) {
log.couldNotCompleteInjectedTransaction(e);
throw new CacheException("Could not commit implicit transaction", e);
}
}
}
}

protected void rollbackIfNeeded(InvocationContext ctx) {
if (ctx.isInTxScope() && ((TxInvocationContext) ctx).isImplicitTransaction()) {
try {
if (transactionManager != null) transactionManager.rollback();
} catch (Throwable t) {
log.trace("Could not rollback", t);//best effort
}
}
}

private Transaction getOngoingTransaction() {
try {
Transaction transaction = null;
Expand All @@ -114,20 +97,53 @@ private Transaction getOngoingTransaction() {
}
}

protected Object invoke(InvocationContext ctx, VisitableCommand cmd) {
Object result;
protected <T> CompletableFuture<T> invokeAsync(InvocationContext ctx, VisitableCommand cmd) {
CompletableFuture<T> cf;
boolean isImplicitTx = ctx.isInTxScope() && ((TxInvocationContext) ctx).isImplicitTransaction();
final Transaction implicitTransaction;
try {
result = fmap.chain().invoke(ctx, cmd);
// interceptors must not access thread-local transaction anyway
if (isImplicitTx) {
implicitTransaction = transactionManager.suspend();
assert implicitTransaction != null;
} else {
implicitTransaction = null;
}
cf = (CompletableFuture<T>) fmap.chain.invokeAsync(ctx, cmd);
} catch (SystemException e) {
throw new CacheException("Cannot suspend implicit transaction", e);
} catch (Throwable t) {
try {
rollbackIfNeeded(ctx);
} catch (Throwable t2) {
t2.addSuppressed(t);
throw t2;
if (isImplicitTx) {
try {
if (transactionManager != null) transactionManager.rollback();
} catch (Throwable t2) {
log.trace("Could not rollback", t2);//best effort
t.addSuppressed(t2);
}
}
throw t;
}
commitIfNeeded(ctx);
return result;
if (isImplicitTx) {
return cf.handle((result, throwable) -> {
if (throwable != null) {
try {
implicitTransaction.rollback();
} catch (SystemException e) {
log.trace("Could not rollback", e);
throwable.addSuppressed(e);
}
throw CompletableFutures.asCompletionException(throwable);
}
try {
implicitTransaction.commit();
} catch (Exception e) {
log.couldNotCompleteInjectedTransaction(e);
throw CompletableFutures.asCompletionException(e);
}
return result;
});
} else {
return cf;
}
}
}
@@ -1,17 +1,14 @@
package org.infinispan.functional.impl;

import static org.infinispan.factories.KnownComponentNames.ASYNC_OPERATIONS_EXECUTOR;

import java.util.concurrent.ExecutorService;

import org.infinispan.AdvancedCache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commons.api.functional.FunctionalMap;
import org.infinispan.commons.api.functional.Param;
import org.infinispan.commons.api.functional.Status;
import org.infinispan.commons.util.Experimental;
import org.infinispan.context.InvocationContextFactory;
import org.infinispan.interceptors.InterceptorChain;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.lifecycle.ComponentStatus;

/**
Expand All @@ -24,6 +21,10 @@ public final class FunctionalMapImpl<K, V> implements FunctionalMap<K, V> {

final Params params;
final AdvancedCache<K, V> cache;
final AsyncInterceptorChain chain;
final CommandsFactory commandsFactory;
final InvocationContextFactory invCtxFactory;
final FunctionalNotifier notifier;

public static <K, V> FunctionalMapImpl<K, V> create(Params params, AdvancedCache<K, V> cache) {
return new FunctionalMapImpl<>(params, cache);
Expand All @@ -36,26 +37,11 @@ public static <K, V> FunctionalMapImpl<K, V> create(AdvancedCache<K, V> cache) {
private FunctionalMapImpl(Params params, AdvancedCache<K, V> cache) {
this.params = params;
this.cache = cache;
}

InvocationContextFactory invCtxFactory() {
return cache.getComponentRegistry().getComponent(InvocationContextFactory.class);
}

CommandsFactory cmdFactory() {
return cache.getComponentRegistry().getComponent(CommandsFactory.class);
}

InterceptorChain chain() {
return cache.getComponentRegistry().getComponent(InterceptorChain.class);
}

ExecutorService asyncExec() {
return cache.getComponentRegistry().getComponent(ExecutorService.class, ASYNC_OPERATIONS_EXECUTOR);
}

FunctionalNotifier<K, V> notifier() {
return cache.getComponentRegistry().getComponent(FunctionalNotifier.class);
ComponentRegistry componentRegistry = cache.getComponentRegistry();
chain = componentRegistry.getComponent(AsyncInterceptorChain.class);
invCtxFactory = componentRegistry.getComponent(InvocationContextFactory.class);
commandsFactory = componentRegistry.getComponent(CommandsFactory.class);
notifier = componentRegistry.getComponent(FunctionalNotifier.class);
}

@Override
Expand Down
22 changes: 0 additions & 22 deletions core/src/main/java/org/infinispan/functional/impl/Params.java
Expand Up @@ -5,12 +5,8 @@
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import org.infinispan.commons.api.functional.Param;
import org.infinispan.commons.api.functional.Param.FutureMode;
import org.infinispan.commons.api.functional.Param.PersistenceMode;
import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.commons.util.Experimental;
Expand All @@ -32,7 +28,6 @@
public final class Params {

private static final Param<?>[] DEFAULTS = new Param<?>[]{
FutureMode.defaultValue(),
PersistenceMode.defaultValue(),
};
// TODO: as Params are immutable and there's only limited number of them,
Expand Down Expand Up @@ -99,23 +94,6 @@ public static Params from(Param<?>... ps) {
return new Params(paramsAll);
}

static <T> CompletableFuture<T> withFuture(Param<FutureMode> futureParam,
ExecutorService asyncExec, Supplier<T> s) {
switch (futureParam.get()) {
case COMPLETED:
// If completed, complete the future directly with the result.
// No separate thread or executor is instantiated.
return CompletableFuture.completedFuture(s.get());
case ASYNC:
// If async, execute the supply function asynchronously,
// and return a future that's completed when the supply
// function returns.
return CompletableFuture.supplyAsync(s, asyncExec);
default:
throw new IllegalStateException();
}
}

public static void writeObject(ObjectOutput output, Params params) throws IOException {
// There's no point in sending FutureMode over wire
MarshallUtil.marshallEnum((PersistenceMode) params.get(PersistenceMode.ID).get(), output);
Expand Down
@@ -1,7 +1,5 @@
package org.infinispan.functional.impl;

import static org.infinispan.functional.impl.Params.withFuture;

import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
Expand All @@ -15,7 +13,6 @@
import org.infinispan.commons.api.functional.EntryView.ReadEntryView;
import org.infinispan.commons.api.functional.FunctionalMap.ReadOnlyMap;
import org.infinispan.commons.api.functional.Param;
import org.infinispan.commons.api.functional.Param.FutureMode;
import org.infinispan.commons.api.functional.Traversable;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.commons.util.Experimental;
Expand All @@ -32,11 +29,9 @@
@Experimental
public final class ReadOnlyMapImpl<K, V> extends AbstractFunctionalMap<K, V> implements ReadOnlyMap<K, V> {
private static final Log log = LogFactory.getLog(ReadOnlyMapImpl.class);
private final Params params;

private ReadOnlyMapImpl(Params params, FunctionalMapImpl<K, V> functionalMap) {
super(functionalMap);
this.params = params;
super(params, functionalMap);
}

public static <K, V> ReadOnlyMap<K, V> create(FunctionalMapImpl<K, V> functionalMap) {
Expand All @@ -50,18 +45,17 @@ private static <K, V> ReadOnlyMap<K, V> create(Params params, FunctionalMapImpl<
@Override
public <R> CompletableFuture<R> eval(K key, Function<ReadEntryView<K, V>, R> f) {
log.tracef("Invoked eval(k=%s, %s)", key, params);
Param<FutureMode> futureMode = params.get(FutureMode.ID);
ReadOnlyKeyCommand cmd = fmap.cmdFactory().buildReadOnlyKeyCommand(key, f);
InvocationContext ctx = fmap.invCtxFactory().createInvocationContext(false, 1);
return withFuture(futureMode, fmap.asyncExec(), () -> (R) fmap.chain().invoke(ctx, cmd));
ReadOnlyKeyCommand cmd = fmap.commandsFactory.buildReadOnlyKeyCommand(key, f);
InvocationContext ctx = fmap.invCtxFactory.createInvocationContext(false, 1);
return (CompletableFuture<R>) fmap.chain.invokeAsync(ctx, cmd);
}

@Override
public <R> Traversable<R> evalMany(Set<? extends K> keys, Function<ReadEntryView<K, V>, R> f) {
log.tracef("Invoked evalMany(m=%s, %s)", keys, params);
ReadOnlyManyCommand<K, V, R> cmd = fmap.cmdFactory().buildReadOnlyManyCommand(keys, f);
InvocationContext ctx = fmap.invCtxFactory().createInvocationContext(false, keys.size());
return Traversables.of((Stream<R>) fmap.chain().invoke(ctx, cmd));
ReadOnlyManyCommand<K, V, R> cmd = fmap.commandsFactory.buildReadOnlyManyCommand(keys, f);
InvocationContext ctx = fmap.invCtxFactory.createInvocationContext(false, keys.size());
return Traversables.of((Stream<R>) fmap.chain.invokeAsync(ctx, cmd).join());
}

@Override
Expand Down

0 comments on commit fc2156a

Please sign in to comment.