Skip to content

Commit

Permalink
ISPN-9784 Remove Distributed Executor
Browse files Browse the repository at this point in the history
* Refactor DistributedExecutor to ClusterExecutor
  • Loading branch information
wburns authored and danberindei committed Feb 5, 2019
1 parent 3c84f87 commit d2bb008
Show file tree
Hide file tree
Showing 37 changed files with 754 additions and 706 deletions.
Expand Up @@ -1059,7 +1059,9 @@ public Health getHealth() {


@Override @Override
public ClusterExecutor executor() { public ClusterExecutor executor() {
if (globalComponentRegistry.getStatus() != ComponentStatus.RUNNING) { // Allow INITIALIZING state so ClusterExecutor can be used by components in a @Start method.
if (globalComponentRegistry.getStatus() != ComponentStatus.RUNNING &&
globalComponentRegistry.getStatus() != ComponentStatus.INITIALIZING) {
throw new IllegalStateException("CacheManager must be started before retrieving a ClusterExecutor!"); throw new IllegalStateException("CacheManager must be started before retrieving a ClusterExecutor!");
} }
JGroupsTransport transport = (JGroupsTransport) globalComponentRegistry.getComponent(Transport.class); JGroupsTransport transport = (JGroupsTransport) globalComponentRegistry.getComponent(Transport.class);
Expand Down
Expand Up @@ -32,7 +32,7 @@ class LocalClusterExecutor implements ClusterExecutor {
LocalClusterExecutor(Predicate<? super Address> predicate, EmbeddedCacheManager manager, Executor localExecutor, LocalClusterExecutor(Predicate<? super Address> predicate, EmbeddedCacheManager manager, Executor localExecutor,
long time, TimeUnit unit, ScheduledExecutorService timeoutExecutor) { long time, TimeUnit unit, ScheduledExecutorService timeoutExecutor) {
this.predicate = predicate; this.predicate = predicate;
this.manager = Objects.requireNonNull(manager); this.manager = new UnwrappingEmbeddedCacheManager(Objects.requireNonNull(manager));
this.localExecutor = Objects.requireNonNull(localExecutor); this.localExecutor = Objects.requireNonNull(localExecutor);
if (time <= 0) { if (time <= 0) {
throw new IllegalArgumentException("time must be greater than 0"); throw new IllegalArgumentException("time must be greater than 0");
Expand Down

Large diffs are not rendered by default.

Expand Up @@ -5,12 +5,12 @@
import java.util.List; import java.util.List;


import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.distexec.DefaultExecutorService;
import org.infinispan.interceptors.AsyncInterceptor; import org.infinispan.interceptors.AsyncInterceptor;
import org.infinispan.interceptors.AsyncInterceptorChain; import org.infinispan.interceptors.AsyncInterceptorChain;
import org.infinispan.manager.ClusterExecutor;
import org.infinispan.security.Security; import org.infinispan.security.Security;
import org.infinispan.security.actions.GetCacheInterceptorChainAction; import org.infinispan.security.actions.GetCacheInterceptorChainAction;
import org.infinispan.security.actions.GetDefaultExecutorServiceAction; import org.infinispan.security.actions.GetClusterExecutorAction;


/** /**
* SecurityActions for the org.infinispan.notifications.cachelistener package. * SecurityActions for the org.infinispan.notifications.cachelistener package.
Expand All @@ -30,8 +30,8 @@ private static <T> T doPrivileged(PrivilegedAction<T> action) {
} }
} }


static DefaultExecutorService getDefaultExecutorService(final Cache<?, ?> cache) { static ClusterExecutor getClusterExecutor(final Cache<?, ?> cache) {
GetDefaultExecutorServiceAction action = new GetDefaultExecutorServiceAction(cache); GetClusterExecutorAction action = new GetClusterExecutorAction(cache);
return doPrivileged(action); return doPrivileged(action);
} }


Expand Down
Expand Up @@ -3,7 +3,6 @@
import java.util.Collection; import java.util.Collection;
import java.util.UUID; import java.util.UUID;


import org.infinispan.distexec.DistributedCallable;
import org.infinispan.notifications.cachelistener.CacheNotifier; import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent; import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;


Expand All @@ -29,5 +28,5 @@ public interface ClusterCacheNotifier<K, V> extends CacheNotifier<K, V> {
* the existing cluster listeners that are already installed. * the existing cluster listeners that are already installed.
* @return A collection of callables that should be invoked on the new node to properly install cluster listener information * @return A collection of callables that should be invoked on the new node to properly install cluster listener information
*/ */
Collection<DistributedCallable> retrieveClusterListenerCallablesToInstall(); Collection<ClusterListenerReplicateCallable<K, V>> retrieveClusterListenerCallablesToInstall();
} }
Expand Up @@ -7,10 +7,11 @@
import java.util.Collections; import java.util.Collections;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Function;


import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.commons.marshall.AbstractExternalizer; import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.distexec.DistributedCallable; import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.core.Ids; import org.infinispan.marshall.core.Ids;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;
Expand All @@ -22,42 +23,41 @@
* @author wburns * @author wburns
* @since 7.0 * @since 7.0
*/ */
public class ClusterEventCallable<K, V> implements DistributedCallable<K, V, Void> { public class ClusterEventCallable<K, V> implements Function<EmbeddedCacheManager, Void> {


private static final Log log = LogFactory.getLog(ClusterEventCallable.class); private static final Log log = LogFactory.getLog(ClusterEventCallable.class);
private static final boolean trace = log.isTraceEnabled(); private static final boolean trace = log.isTraceEnabled();


private transient ClusterCacheNotifier clusterCacheNotifier; private transient ClusterCacheNotifier clusterCacheNotifier;


private final String cacheName;
private final UUID identifier; private final UUID identifier;
private final Collection<? extends ClusterEvent<K, V>> events; private final Collection<? extends ClusterEvent<K, V>> events;


public ClusterEventCallable(UUID identifier, ClusterEvent<K, V> event) { public ClusterEventCallable(String cacheName, UUID identifier, ClusterEvent<K, V> event) {
this(identifier, Collections.singleton(event)); this(cacheName, identifier, Collections.singleton(event));
} }


public ClusterEventCallable(UUID identifier, Collection<? extends ClusterEvent<K, V>> events) { public ClusterEventCallable(String cacheName, UUID identifier, Collection<? extends ClusterEvent<K, V>> events) {
this.cacheName = cacheName;
this.identifier = identifier; this.identifier = identifier;
this.events = events; this.events = events;
} }


@Override @Override
public Void call() throws Exception { public Void apply(EmbeddedCacheManager embeddedCacheManager) {
Cache<K, V> cache = embeddedCacheManager.getCache(cacheName);
ClusterCacheNotifier<K, V> clusterCacheNotifier = cache.getAdvancedCache().getComponentRegistry().getComponent(ClusterCacheNotifier.class);
for (ClusterEvent event : events) {
event.cache = cache;
}
if (trace) { if (trace) {
log.tracef("Received cluster event(s) %s, notifying cluster listener with id %s", events, identifier); log.tracef("Received cluster event(s) %s, notifying cluster listener with id %s", events, identifier);
} }
clusterCacheNotifier.notifyClusterListeners(events, identifier); clusterCacheNotifier.notifyClusterListeners(events, identifier);
return null; return null;
} }


@Override
public void setEnvironment(Cache<K, V> cache, Set<K> inputKeys) {
this.clusterCacheNotifier = cache.getAdvancedCache().getComponentRegistry().getComponent(ClusterCacheNotifier.class);
for (ClusterEvent event : events) {
event.cache = cache;
}
}

@Override @Override
public String toString() { public String toString() {
final StringBuilder sb = new StringBuilder("ClusterEventCallable{"); final StringBuilder sb = new StringBuilder("ClusterEventCallable{");
Expand All @@ -75,13 +75,15 @@ public Set<Class<? extends ClusterEventCallable>> getTypeClasses() {


@Override @Override
public void writeObject(ObjectOutput output, ClusterEventCallable object) throws IOException { public void writeObject(ObjectOutput output, ClusterEventCallable object) throws IOException {
output.writeObject(object.cacheName);
output.writeObject(object.identifier); output.writeObject(object.identifier);
output.writeObject(object.events); output.writeObject(object.events);
} }


@Override @Override
public ClusterEventCallable readObject(ObjectInput input) throws IOException, ClassNotFoundException { public ClusterEventCallable readObject(ObjectInput input) throws IOException, ClassNotFoundException {
return new ClusterEventCallable((UUID)input.readObject(), (Collection<? extends ClusterEvent>)input.readObject()); return new ClusterEventCallable((String) input.readObject(), (UUID)input.readObject(),
(Collection<? extends ClusterEvent>)input.readObject());
} }


@Override @Override
Expand Down
Expand Up @@ -6,10 +6,11 @@
import java.util.Collections; import java.util.Collections;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.Function;


import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.commons.marshall.AbstractExternalizer; import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.distexec.DistributedCallable; import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.marshall.core.Ids; import org.infinispan.marshall.core.Ids;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;
Expand All @@ -21,25 +22,21 @@
* @author wburns * @author wburns
* @since 7.0 * @since 7.0
*/ */
public class ClusterListenerRemoveCallable<K, V> implements DistributedCallable<K, V, Void> { public class ClusterListenerRemoveCallable implements Function<EmbeddedCacheManager, Void> {
private static final Log log = LogFactory.getLog(ClusterListenerRemoveCallable.class); private static final Log log = LogFactory.getLog(ClusterListenerRemoveCallable.class);
private static final boolean trace = log.isTraceEnabled(); private static final boolean trace = log.isTraceEnabled();


private transient Cache<K, V> cache; private final String cacheName;

private final UUID identifier; private final UUID identifier;


public ClusterListenerRemoveCallable(UUID identifier) { public ClusterListenerRemoveCallable(String cacheName, UUID identifier) {
this.cacheName = cacheName;
this.identifier = identifier; this.identifier = identifier;
} }


@Override @Override
public void setEnvironment(Cache<K, V> cache, Set<K> inputKeys) { public Void apply(EmbeddedCacheManager embeddedCacheManager) {
this.cache = cache; Cache<Object, Object> cache = embeddedCacheManager.getCache(cacheName);
}

@Override
public Void call() throws Exception {
// Remove the listener from the cache now // Remove the listener from the cache now
Set<Object> listeners = cache.getListeners(); Set<Object> listeners = cache.getListeners();
for (Object listener : listeners) { for (Object listener : listeners) {
Expand All @@ -64,12 +61,13 @@ public Set<Class<? extends ClusterListenerRemoveCallable>> getTypeClasses() {


@Override @Override
public void writeObject(ObjectOutput output, ClusterListenerRemoveCallable object) throws IOException { public void writeObject(ObjectOutput output, ClusterListenerRemoveCallable object) throws IOException {
output.writeObject(object.cacheName);
output.writeObject(object.identifier); output.writeObject(object.identifier);
} }


@Override @Override
public ClusterListenerRemoveCallable readObject(ObjectInput input) throws IOException, ClassNotFoundException { public ClusterListenerRemoveCallable readObject(ObjectInput input) throws IOException, ClassNotFoundException {
return new ClusterListenerRemoveCallable((UUID)input.readObject()); return new ClusterListenerRemoveCallable((String) input.readObject(), (UUID)input.readObject());
} }


@Override @Override
Expand Down
Expand Up @@ -8,12 +8,12 @@
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.function.Function;


import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.commons.marshall.AbstractExternalizer; import org.infinispan.commons.marshall.AbstractExternalizer;
import org.infinispan.commons.marshall.MarshallUtil; import org.infinispan.commons.marshall.MarshallUtil;
import org.infinispan.distexec.DistributedCallable;
import org.infinispan.distexec.DistributedExecutorService;
import org.infinispan.encoding.DataConversion; import org.infinispan.encoding.DataConversion;
import org.infinispan.factories.ComponentRegistry; import org.infinispan.factories.ComponentRegistry;
import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.manager.EmbeddedCacheManager;
Expand All @@ -36,17 +36,12 @@
* @author wburns * @author wburns
* @since 7.0 * @since 7.0
*/ */
public class ClusterListenerReplicateCallable<K, V> implements DistributedCallable<K, V, Void> { public class ClusterListenerReplicateCallable<K, V> implements Function<EmbeddedCacheManager, Void>,
BiConsumer<EmbeddedCacheManager, Cache<K, V>> {
private static final Log log = LogFactory.getLog(ClusterListenerReplicateCallable.class); private static final Log log = LogFactory.getLog(ClusterListenerReplicateCallable.class);
private static final boolean trace = log.isTraceEnabled(); private static final boolean trace = log.isTraceEnabled();


private transient EmbeddedCacheManager cacheManager; private final String cacheName;
private transient CacheNotifier cacheNotifier;
private transient CacheManagerNotifier cacheManagerNotifier;
private transient DistributedExecutorService distExecutor;
private transient Address ourAddress;
private transient ClusterEventManager<K, V> eventManager;

private final UUID identifier; private final UUID identifier;
private final CacheEventFilter<K, V> filter; private final CacheEventFilter<K, V> filter;
private final CacheEventConverter<K, V, ?> converter; private final CacheEventConverter<K, V, ?> converter;
Expand All @@ -57,10 +52,11 @@ public class ClusterListenerReplicateCallable<K, V> implements DistributedCallab
private final DataConversion valueDataConversion; private final DataConversion valueDataConversion;
private final boolean useStorageFormat; private final boolean useStorageFormat;


public ClusterListenerReplicateCallable(UUID identifier, Address origin, CacheEventFilter<K, V> filter, public ClusterListenerReplicateCallable(String cacheName, UUID identifier, Address origin, CacheEventFilter<K, V> filter,
CacheEventConverter<K, V, ?> converter, boolean sync, CacheEventConverter<K, V, ?> converter, boolean sync,
Set<Class<? extends Annotation>> filterAnnotations, Set<Class<? extends Annotation>> filterAnnotations,
DataConversion keyDataConversion, DataConversion valueDataConversion, boolean useStorageFormat) { DataConversion keyDataConversion, DataConversion valueDataConversion, boolean useStorageFormat) {
this.cacheName = cacheName;
this.identifier = identifier; this.identifier = identifier;
this.origin = origin; this.origin = origin;
this.filter = filter; this.filter = filter;
Expand All @@ -76,27 +72,29 @@ public ClusterListenerReplicateCallable(UUID identifier, Address origin, CacheEv
} }


@Override @Override
public void setEnvironment(Cache<K, V> cache, Set<K> inputKeys) { public Void apply(EmbeddedCacheManager cacheManager) {
cacheManager = cache.getCacheManager(); Cache<K, V> cache = cacheManager.getCache(cacheName);
accept(cacheManager, cache);

return null;
}


@Override
public void accept(EmbeddedCacheManager cacheManager, Cache<K, V> cache) {
ComponentRegistry componentRegistry = cache.getAdvancedCache().getComponentRegistry(); ComponentRegistry componentRegistry = cache.getAdvancedCache().getComponentRegistry();


cacheNotifier = componentRegistry.getComponent(CacheNotifier.class); CacheNotifier<K, V> cacheNotifier = componentRegistry.getComponent(CacheNotifier.class);
cacheManagerNotifier = cache.getCacheManager().getGlobalComponentRegistry().getComponent( CacheManagerNotifier cacheManagerNotifier = cache.getCacheManager().getGlobalComponentRegistry().getComponent(
CacheManagerNotifier.class); CacheManagerNotifier.class);
distExecutor = SecurityActions.getDefaultExecutorService(cache); Address ourAddress = cache.getCacheManager().getAddress();
ourAddress = cache.getCacheManager().getAddress(); ClusterEventManager<K, V> eventManager = componentRegistry.getComponent(ClusterEventManager.class);
eventManager = componentRegistry.getComponent(ClusterEventManager.class);
if (filter != null) { if (filter != null) {
componentRegistry.wireDependencies(filter); componentRegistry.wireDependencies(filter);
} }
if (converter != null && converter != filter) { if (converter != null && converter != filter) {
componentRegistry.wireDependencies(converter); componentRegistry.wireDependencies(converter);
} }
}


@Override
public Void call() throws Exception {
// Only register listeners if we aren't the ones that registered the cluster listener // Only register listeners if we aren't the ones that registered the cluster listener
if (!ourAddress.equals(origin)) { if (!ourAddress.equals(origin)) {
// Make sure the origin is around otherwise don't register the listener - some way with identifier (CHM maybe?) // Make sure the origin is around otherwise don't register the listener - some way with identifier (CHM maybe?)
Expand All @@ -114,7 +112,7 @@ public Void call() throws Exception {
} }
} }
if (!alreadyInstalled) { if (!alreadyInstalled) {
RemoteClusterListener listener = new RemoteClusterListener(identifier, origin, distExecutor, cacheNotifier, RemoteClusterListener listener = new RemoteClusterListener(identifier, origin, cacheNotifier,
cacheManagerNotifier, eventManager, sync); cacheManagerNotifier, eventManager, sync);
ListenerHolder listenerHolder = new ListenerHolder(listener, keyDataConversion, valueDataConversion, useStorageFormat); ListenerHolder listenerHolder = new ListenerHolder(listener, keyDataConversion, valueDataConversion, useStorageFormat);
cacheNotifier.addFilteredListener(listenerHolder, filter, converter, filterAnnotations); cacheNotifier.addFilteredListener(listenerHolder, filter, converter, filterAnnotations);
Expand Down Expand Up @@ -142,7 +140,7 @@ public Void call() throws Exception {
} else if (trace) { } else if (trace) {
log.trace("Not registering local cluster listener as we are the node who registered the cluster listener"); log.trace("Not registering local cluster listener as we are the node who registered the cluster listener");
} }
return null;
} }


public static class Externalizer extends AbstractExternalizer<ClusterListenerReplicateCallable> { public static class Externalizer extends AbstractExternalizer<ClusterListenerReplicateCallable> {
Expand All @@ -153,6 +151,7 @@ public Set<Class<? extends ClusterListenerReplicateCallable>> getTypeClasses() {


@Override @Override
public void writeObject(ObjectOutput output, ClusterListenerReplicateCallable object) throws IOException { public void writeObject(ObjectOutput output, ClusterListenerReplicateCallable object) throws IOException {
output.writeObject(object.cacheName);
output.writeObject(object.identifier); output.writeObject(object.identifier);
output.writeObject(object.origin); output.writeObject(object.origin);
output.writeObject(object.filter); output.writeObject(object.filter);
Expand All @@ -171,6 +170,7 @@ public void writeObject(ObjectOutput output, ClusterListenerReplicateCallable ob


@Override @Override
public ClusterListenerReplicateCallable readObject(ObjectInput input) throws IOException, ClassNotFoundException { public ClusterListenerReplicateCallable readObject(ObjectInput input) throws IOException, ClassNotFoundException {
String cacheName = (String) input.readObject();
UUID id = (UUID) input.readObject(); UUID id = (UUID) input.readObject();
Address address = (Address) input.readObject(); Address address = (Address) input.readObject();
CacheEventFilter filter = (CacheEventFilter) input.readObject(); CacheEventFilter filter = (CacheEventFilter) input.readObject();
Expand All @@ -186,7 +186,7 @@ public ClusterListenerReplicateCallable readObject(ObjectInput input) throws IOE
DataConversion keyDataConversion = DataConversion.readFrom(input); DataConversion keyDataConversion = DataConversion.readFrom(input);
DataConversion valueDataConversion = DataConversion.readFrom(input); DataConversion valueDataConversion = DataConversion.readFrom(input);
boolean raw = input.readBoolean(); boolean raw = input.readBoolean();
return new ClusterListenerReplicateCallable(id, address, filter, converter, sync, listenerAnnots, return new ClusterListenerReplicateCallable(cacheName, id, address, filter, converter, sync, listenerAnnots,
keyDataConversion, valueDataConversion, raw); keyDataConversion, valueDataConversion, raw);
} }


Expand All @@ -195,4 +195,14 @@ public Integer getId() {
return Ids.CLUSTER_LISTENER_REPLICATE_CALLABLE; return Ids.CLUSTER_LISTENER_REPLICATE_CALLABLE;
} }
} }

@Override
public String toString() {
return "ClusterListenerReplicateCallable{" +
"cacheName='" + cacheName + '\'' +
", identifier=" + identifier +
", origin=" + origin +
", sync=" + sync +
'}';
}
} }

0 comments on commit d2bb008

Please sign in to comment.