Skip to content

Commit

Permalink
Expiration Reaper defer to async thread pool when in a transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
wburns committed Nov 22, 2019
1 parent 49e74ad commit c965ec9
Showing 1 changed file with 28 additions and 4 deletions.
Expand Up @@ -7,11 +7,12 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;

import net.jcip.annotations.ThreadSafe;
import org.infinispan.AdvancedCache;
import org.infinispan.cache.impl.AbstractDelegatingCache;
import org.infinispan.commands.CommandsFactory;
Expand All @@ -25,6 +26,8 @@
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.factories.KnownComponentNames;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.impl.ComponentRef;
import org.infinispan.marshall.core.MarshalledEntry;
Expand All @@ -38,6 +41,8 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

import net.jcip.annotations.ThreadSafe;

/**
* Allows for cluster based expirations to occur. This provides guarantees that when an entry is expired that it will
* expire that entry across the entire cluster at once. This requires obtaining the lock for said entry before
Expand Down Expand Up @@ -67,11 +72,14 @@ public class ClusterExpirationManager<K, V> extends ExpirationManagerImpl<K, V>
@Inject protected ComponentRef<CommandsFactory> cf;
@Inject protected RpcManager rpcManager;
@Inject protected DistributionManager distributionManager;
@ComponentName(KnownComponentNames.ASYNC_OPERATIONS_EXECUTOR)
private ExecutorService asyncExecutor;

private AdvancedCache<K, V> cache;
private Address localAddress;
private long timeout;
private String cacheName;
private boolean transactional;

@Override
public void start() {
Expand All @@ -81,6 +89,8 @@ public void start() {
this.cacheName = cache.getName();
this.localAddress = cache.getCacheManager().getAddress();
this.timeout = cache.getCacheConfiguration().clustering().remoteTimeout();

transactional = configuration.transaction().transactionMode().isTransactional();
}

@Override
Expand Down Expand Up @@ -219,7 +229,14 @@ CompletableFuture<Void> handleLifespanExpireEntry(K key, V value, long lifespan,
log.tracef("Submitting expiration removal for key %s which had lifespan of %s", toStr(key), lifespan);
}
AdvancedCache<K, V> cacheToUse = skipLocking ? cache.withFlags(Flag.SKIP_LOCKING) : cache;
CompletableFuture<Void> future = cacheToUse.removeLifespanExpired(key, value, lifespan);
CompletableFuture<Void> future;
if (transactional) {
// Transactional is still blocking - to be removed later
future = CompletableFuture.supplyAsync(() -> cacheToUse.removeLifespanExpired(key, value, lifespan), asyncExecutor)
.thenCompose(Function.identity());
} else {
future = cacheToUse.removeLifespanExpired(key, value, lifespan);
}
return future.whenComplete((v, t) -> expiring.remove(key, key));
}
return CompletableFutures.completedNull();
Expand All @@ -241,8 +258,15 @@ CompletableFuture<Boolean> actualRemoveMaxIdleExpireEntry(K key, V value, long m
completableFuture.whenComplete((b, t) -> expiring.remove(key, completableFuture));
try {
AdvancedCache<K, V> cacheToUse = skipLocking ? cache.withFlags(Flag.SKIP_LOCKING) : cache;
CompletableFuture<Boolean> expired = cacheToUse.removeMaxIdleExpired(key, value);
expired.whenComplete((b, t) -> {
CompletableFuture<Boolean> future;
if (transactional) {
// Transactional is still blocking - to be removed later
future = CompletableFuture.supplyAsync(() -> cacheToUse.removeMaxIdleExpired(key, value), asyncExecutor)
.thenCompose(Function.identity());
} else {
future = cacheToUse.removeMaxIdleExpired(key, value);
}
future.whenComplete((b, t) -> {
if (t != null) {
completableFuture.completeExceptionally(t);
} else {
Expand Down

0 comments on commit c965ec9

Please sign in to comment.