Skip to content

Commit

Permalink
ISPN-11109 Deprecate and remove usages of state transfer executor
Browse files Browse the repository at this point in the history
  • Loading branch information
danberindei authored and tristantarrant committed Jan 17, 2020
1 parent abe858d commit d3c1eb0
Show file tree
Hide file tree
Showing 25 changed files with 139 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ public String persistenceExecutor() {
return attributes.attribute(PERSISTENCE_EXECUTOR).get();
}

/**
* @deprecated Since 10.1, no longer used.
*/
@Deprecated
public String stateTransferExecutor() {
return attributes.attribute(STATE_TRANSFER_EXECUTOR).get();
}
Expand Down Expand Up @@ -170,6 +174,11 @@ public ThreadPoolConfiguration persistenceThreadPool() {
return threads.persistenceThreadPool();
}

/**
* @return An empty {@code ThreadPoolConfiguration}.
* @deprecated Since 10.1, no longer used.
*/
@Deprecated
public ThreadPoolConfiguration stateTransferThreadPool() {
return threads.stateTransferThreadPool();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public ThreadPoolConfigurationBuilder persistenceThreadPool() {
return threads.persistenceThreadPool();
}

/**
* @deprecated Since 10.1, no longer used.
*/
@Deprecated
@Override
public ThreadPoolConfigurationBuilder stateTransferThreadPool() {
return threads.stateTransferThreadPool();
Expand Down Expand Up @@ -173,6 +177,10 @@ public CacheContainerConfigurationBuilder persistenceExecutor(String name) {
return this;
}

/**
* @deprecated Since 10.1, no longer used.
*/
@Deprecated
public CacheContainerConfigurationBuilder stateTransferExecutor(String name) {
attributes.attribute(STATE_TRANSFER_EXECUTOR).set(name);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ public ThreadPoolConfiguration persistenceThreadPool() {
return cacheContainerConfiguration.persistenceThreadPool();
}

/**
* @return An empty {@code ThreadPoolConfiguration}.
* @deprecated Since 10.1, no longer used.
*/
@Deprecated
public ThreadPoolConfiguration stateTransferThreadPool() {
return cacheContainerConfiguration.stateTransferThreadPool();
}
Expand Down Expand Up @@ -153,6 +158,10 @@ public String persistenceThreadPoolName() {
return cacheContainer().persistenceExecutor();
}

/**
* @deprecated Since 10.1, no longer used.
*/
@Deprecated
public String stateTransferThreadPoolName() {
return cacheContainer().stateTransferExecutor();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ public GlobalConfigurationBuilder persistenceThreadPoolName(String name) {
return this;
}

/**
* @deprecated Since 10.1, no longer used.
*/
@Deprecated
public GlobalConfigurationBuilder stateTransferThreadPoolName(String name) {
cacheContainer().stateTransferExecutor(name);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ public ThreadPoolConfiguration remoteThreadPool() {
return remoteThreadPool;
}

/**
* @return An empty {@code ThreadPoolConfiguration}.
* @deprecated Since 10.1, no longer used.
*/
@Deprecated
public ThreadPoolConfiguration stateTransferThreadPool() {
return stateTransferThreadPool;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ public ThreadPoolConfigurationBuilder remoteCommandThreadPool() {
return remoteCommandThreadPool;
}

/**
* @deprecated Since 10.1, no longer used.
*/
@Deprecated
@Override
public ThreadPoolConfigurationBuilder stateTransferThreadPool() {
return stateTransferThreadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import static org.infinispan.factories.KnownComponentNames.EXPIRATION_SCHEDULED_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.PERSISTENCE_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.REMOTE_COMMAND_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.STATE_TRANSFER_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.shortened;
import static org.infinispan.util.logging.Log.CONFIG;

Expand Down Expand Up @@ -631,8 +630,7 @@ private void parseContainer(XMLExtendedStreamReader reader, ConfigurationBuilder
break;
}
case STATE_TRANSFER_EXECUTOR: {
builder.stateTransferThreadPoolName(value);
builder.stateTransferThreadPool().read(createThreadPoolConfiguration(value, STATE_TRANSFER_EXECUTOR, holder));
ignoreAttribute(reader, Attribute.STATE_TRANSFER_EXECUTOR);
break;
}
case STATISTICS: {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.infinispan.conflict.impl;

import static org.infinispan.factories.KnownComponentNames.STATE_TRANSFER_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR;
import static org.infinispan.util.logging.Log.CLUSTER;

import java.util.Collection;
Expand Down Expand Up @@ -90,8 +90,8 @@ public class DefaultConflictManager<K, V> implements InternalConflictManager<K,
@Inject Configuration cacheConfiguration;
@Inject CommandsFactory commandsFactory;
@Inject DistributionManager distributionManager;
@Inject @ComponentName(STATE_TRANSFER_EXECUTOR)
ExecutorService stateTransferExecutor;
@Inject @ComponentName(ASYNC_TRANSPORT_EXECUTOR)
ExecutorService transportExecutor;
@Inject InvocationContextFactory invocationContextFactory;
@Inject RpcManager rpcManager;
@Inject ComponentRef<StateConsumer> stateConsumer;
Expand Down Expand Up @@ -123,7 +123,7 @@ public void start() {
this.conflictTimeout = cacheConfiguration.clustering().stateTransfer().timeout();

// Limit the number of concurrent tasks to ensure that internal CR operations can never overlap
this.resolutionExecutor = new LimitedExecutor("ConflictManager-" + cacheName, stateTransferExecutor, 1);
this.resolutionExecutor = new LimitedExecutor("ConflictManager-" + cacheName, transportExecutor, 1);
this.running = true;
if (trace) log.tracef("Cache %s starting %s. isRunning=%s", cacheName, getClass().getSimpleName(), !running);
}
Expand Down Expand Up @@ -282,7 +282,7 @@ private void doResolveConflicts(final LocalizedCacheTopology topology, final Ent
final Phaser phaser = new Phaser(1);
getConflicts(topology).forEach(conflictMap -> {
phaser.register();
stateTransferExecutor.execute(() -> {
transportExecutor.execute(() -> {
if (trace) log.tracef("Cache %s conflict detected %s", cacheName, conflictMap);

Collection<CacheEntry<K, V>> entries = conflictMap.values();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.infinispan.conflict.impl;

import static org.infinispan.factories.KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.CACHE_NAME;
import static org.infinispan.factories.KnownComponentNames.STATE_TRANSFER_EXECUTOR;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -52,12 +52,12 @@ public class StateReceiverImpl<K, V> implements StateReceiver<K, V> {

@ComponentName(CACHE_NAME)
@Inject String cacheName;
@Inject CacheNotifier cacheNotifier;
@Inject CacheNotifier<K, V> cacheNotifier;
@Inject CommandsFactory commandsFactory;
@Inject InternalDataContainer<K, V> dataContainer;
@Inject RpcManager rpcManager;
@Inject @ComponentName(STATE_TRANSFER_EXECUTOR)
ExecutorService stateTransferExecutor;
@Inject @ComponentName(ASYNC_TRANSPORT_EXECUTOR)
ExecutorService transportExecutor;

private LimitedExecutor stateReceiverExecutor;

Expand All @@ -66,7 +66,7 @@ public class StateReceiverImpl<K, V> implements StateReceiver<K, V> {
@Start
public void start() {
cacheNotifier.addListener(this);
stateReceiverExecutor = new LimitedExecutor("StateReceiver-" + cacheName, stateTransferExecutor, 1);
stateReceiverExecutor = new LimitedExecutor("StateReceiver-" + cacheName, transportExecutor, 1);
}

@Stop
Expand All @@ -85,7 +85,7 @@ public void cancelRequests() {

@DataRehashed
@SuppressWarnings("WeakerAccess")
public void onDataRehash(DataRehashedEvent dataRehashedEvent) {
public void onDataRehash(DataRehashedEvent<K, V> dataRehashedEvent) {
if (dataRehashedEvent.isPre()) {
if (trace) log.tracef("Cache %s received event: %s", cacheName, dataRehashedEvent);
for (SegmentRequest request : requestMap.values())
Expand Down Expand Up @@ -144,7 +144,7 @@ class SegmentRequest {
this.segmentId = segmentId;
this.topology = topology;
this.timeout = timeout;
this.replicaHosts = topology.getDistributionForSegment(segmentId).writeOwners();
this.replicaHosts = topology.getSegmentDistribution(segmentId).writeOwners();
}

synchronized CompletableFuture<List<Map<Address, CacheEntry<K, V>>>> requestState() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package org.infinispan.factories;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

Expand Down Expand Up @@ -31,6 +29,10 @@ public class KnownComponentNames {
public static final String MODULE_COMMAND_INITIALIZERS ="org.infinispan.modules.command.initializers";
public static final String MODULE_COMMAND_FACTORIES ="org.infinispan.modules.command.factories";
public static final String CLASS_LOADER = "java.lang.ClassLoader";
/**
* @deprecated Since 10.1, no longer used
*/
@Deprecated
public static final String STATE_TRANSFER_EXECUTOR = "org.infinispan.executors.stateTransferExecutor";
public static final String TRANSACTION_VERSION_GENERATOR = "org.infinispan.transaction.versionGenerator";
public static final String ASYNC_OPERATIONS_EXECUTOR = "org.infinispan.executors.async";
Expand All @@ -40,12 +42,6 @@ public class KnownComponentNames {
public static final String INTERNAL_MARSHALLER = "org.infinispan.marshaller.internal";
public static final String PERSISTENCE_MARSHALLER = "org.infinispan.marshaller.persistence";

// Please make sure this is kept up to date
public static final Collection<String> ALL_KNOWN_COMPONENT_NAMES = Arrays.asList(
MODULE_COMMAND_INITIALIZERS, MODULE_COMMAND_FACTORIES, CLASS_LOADER,
TRANSACTION_VERSION_GENERATOR, INTERNAL_MARSHALLER, PERSISTENCE_MARSHALLER
);

private static final Map<String, Integer> DEFAULT_THREAD_COUNT = new HashMap<>(7);
private static final Map<String, Integer> DEFAULT_QUEUE_SIZE = new HashMap<>(7);
private static final Map<String, Integer> DEFAULT_THREAD_PRIORITY = new HashMap<>(8);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.infinispan.scattered.impl;

import static org.infinispan.factories.KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR;
import static org.infinispan.util.logging.Log.CONTAINER;
import static org.infinispan.util.logging.Log.PERSISTENCE;

Expand All @@ -17,7 +16,6 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;

import org.infinispan.commands.remote.ClusteredGetAllCommand;
Expand All @@ -36,7 +34,6 @@
import org.infinispan.container.versioning.SimpleClusteredVersion;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.lifecycle.ComponentStatus;
import org.infinispan.metadata.Metadata;
Expand Down Expand Up @@ -73,8 +70,6 @@ public class ScatteredStateConsumerImpl extends StateConsumerImpl {
protected static final long SKIP_OWNERSHIP_FLAGS = FlagBitSets.SKIP_OWNERSHIP_CHECK;

@Inject protected InternalEntryFactory entryFactory;
@Inject @ComponentName(ASYNC_TRANSPORT_EXECUTOR)
protected ExecutorService asyncExecutor;
@Inject protected ScatteredVersionManager<?> svm;

@GuardedBy("transferMapsLock")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,28 +413,27 @@ private void tryRegularInvalidations(boolean force) {
} finally {
writeLock.unlock();
}
executorService.execute(() -> {
// we'll invalidate all keys in one run
// we don't have to keep any topology lock, because the versions increase monotonically
int numKeys = scheduledKeys.size();
Object[] keys = new Object[numKeys];
int[] topologyIds = new int[numKeys];
long[] versions = new long[numKeys];
boolean[] isRemoved = new boolean[numKeys];
int numRemoved = 0;
int i = 0;
for (Map.Entry<K, InvalidationInfo> entry : scheduledKeys.entrySet()) {
keys[i] = entry.getKey();
topologyIds[i] = entry.getValue().topologyId;
versions[i] = entry.getValue().version;
if (isRemoved[i] = entry.getValue().removal) { // intentional assignment
numRemoved++;
}
++i;

// we'll invalidate all keys in one run
// we don't have to keep any topology lock, because the versions increase monotonically
int numKeys = scheduledKeys.size();
Object[] keys = new Object[numKeys];
int[] topologyIds = new int[numKeys];
long[] versions = new long[numKeys];
boolean[] isRemoved = new boolean[numKeys];
int numRemoved = 0;
int i = 0;
for (Map.Entry<K, InvalidationInfo> entry : scheduledKeys.entrySet()) {
keys[i] = entry.getKey();
topologyIds[i] = entry.getValue().topologyId;
versions[i] = entry.getValue().version;
if (isRemoved[i] = entry.getValue().removal) { // intentional assignment
numRemoved++;
}
InvalidateVersionsCommand command = commandsFactory.buildInvalidateVersionsCommand(-1, keys, topologyIds, versions, false);
sendRegularInvalidations(command, keys, topologyIds, versions, numRemoved, isRemoved, force);
});
++i;
}
InvalidateVersionsCommand command = commandsFactory.buildInvalidateVersionsCommand(-1, keys, topologyIds, versions, false);
sendRegularInvalidations(command, keys, topologyIds, versions, numRemoved, isRemoved, force);
}

private void sendRegularInvalidations(InvalidateVersionsCommand command, Object[] keys, int[] topologyIds, long[] versions, int numRemoved, boolean[] isRemoved, boolean force) {
Expand Down Expand Up @@ -489,20 +488,19 @@ private void tryRemovedInvalidations() {
} finally {
writeLock.unlock();
}
executorService.execute(() -> {
int numKeys = removedKeys.size();
Object[] keys = new Object[numKeys];
int[] topologyIds = new int[numKeys];
long[] versions = new long[numKeys];
int i = 0;
for (Map.Entry<K, InvalidationInfo> entry : removedKeys.entrySet()) {
keys[i] = entry.getKey();
topologyIds[i] = entry.getValue().topologyId;
versions[i] = entry.getValue().version;
}
InvalidateVersionsCommand removeCommand = commandsFactory.buildInvalidateVersionsCommand(-1, keys, topologyIds, versions, true);
sendRemoveInvalidations(removeCommand);
});

int numKeys = removedKeys.size();
Object[] keys = new Object[numKeys];
int[] topologyIds = new int[numKeys];
long[] versions = new long[numKeys];
int i = 0;
for (Map.Entry<K, InvalidationInfo> entry : removedKeys.entrySet()) {
keys[i] = entry.getKey();
topologyIds[i] = entry.getValue().topologyId;
versions[i] = entry.getValue().version;
}
InvalidateVersionsCommand removeCommand = commandsFactory.buildInvalidateVersionsCommand(-1, keys, topologyIds, versions, true);
sendRemoveInvalidations(removeCommand);
}

private void sendRemoveInvalidations(InvalidateVersionsCommand removeCommand) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import static org.infinispan.context.Flag.SKIP_REMOTE_LOOKUP;
import static org.infinispan.context.Flag.SKIP_SHARED_CACHE_STORE;
import static org.infinispan.context.Flag.SKIP_XSITE_BACKUP;
import static org.infinispan.factories.KnownComponentNames.STATE_TRANSFER_EXECUTOR;
import static org.infinispan.factories.KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR;
import static org.infinispan.persistence.manager.PersistenceManager.AccessMode.PRIVATE;
import static org.infinispan.reactive.RxJavaInterop.completionStageToCompletable;
import static org.infinispan.util.concurrent.CompletionStages.handleAndCompose;
Expand Down Expand Up @@ -147,8 +147,8 @@ public class StateConsumerImpl implements StateConsumer {
@Inject @ComponentName(KnownComponentNames.REMOTE_COMMAND_EXECUTOR)
protected BlockingTaskAwareExecutorService remoteCommandsExecutor;
@Inject protected CommitManager commitManager;
@Inject @ComponentName(STATE_TRANSFER_EXECUTOR)
protected ExecutorService stateTransferExecutor;
@Inject @ComponentName(ASYNC_TRANSPORT_EXECUTOR)
protected ExecutorService transportExecutor;
@Inject protected CommandAckCollector commandAckCollector;
@Inject protected TriangleOrderManager triangleOrderManager;
@Inject protected DistributionManager distributionManager;
Expand Down Expand Up @@ -827,7 +827,7 @@ public void start() {

rpcOptions = new RpcOptions(DeliverOrder.NONE, timeout, TimeUnit.MILLISECONDS);

stateRequestExecutor = new LimitedExecutor("StateRequest-" + cacheName, stateTransferExecutor, 1);
stateRequestExecutor = new LimitedExecutor("StateRequest-" + cacheName, transportExecutor, 1);
running = true;
}

Expand Down
Loading

0 comments on commit d3c1eb0

Please sign in to comment.