Skip to content

Commit

Permalink
Don't block Infinispan's topology change thread.
Browse files Browse the repository at this point in the history
  • Loading branch information
pferraro committed Nov 1, 2016
1 parent e3cf127 commit 02fac42
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 38 deletions.
Expand Up @@ -81,6 +81,7 @@ public void cancel(I id) {
@Override @Override
public void cancel(Locality locality) { public void cancel(Locality locality) {
for (I id: this.expirationFutures.keySet()) { for (I id: this.expirationFutures.keySet()) {
if (Thread.currentThread().isInterrupted()) break;
if (!locality.isLocal(id)) { if (!locality.isLocal(id)) {
this.cancel(id); this.cancel(id);
} }
Expand Down
Expand Up @@ -79,6 +79,7 @@ public void cancel(Locality locality) {
synchronized (this.evictionQueue) { synchronized (this.evictionQueue) {
Iterator<I> groups = this.evictionQueue.iterator(); Iterator<I> groups = this.evictionQueue.iterator();
while (groups.hasNext()) { while (groups.hasNext()) {
if (Thread.currentThread().isInterrupted()) break;
I id = groups.next(); I id = groups.next();
if (!locality.isLocal(id)) { if (!locality.isLocal(id)) {
groups.remove(); groups.remove();
Expand Down
Expand Up @@ -21,8 +21,18 @@
*/ */
package org.wildfly.clustering.ejb.infinispan; package org.wildfly.clustering.ejb.infinispan;


import java.security.PrivilegedAction;
import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream; import java.util.stream.Stream;


import org.infinispan.Cache; import org.infinispan.Cache;
Expand All @@ -42,6 +52,7 @@
import org.jboss.ejb.client.Affinity; import org.jboss.ejb.client.Affinity;
import org.jboss.ejb.client.ClusterAffinity; import org.jboss.ejb.client.ClusterAffinity;
import org.jboss.ejb.client.NodeAffinity; import org.jboss.ejb.client.NodeAffinity;
import org.jboss.threads.JBossThreadFactory;
import org.wildfly.clustering.dispatcher.Command; import org.wildfly.clustering.dispatcher.Command;
import org.wildfly.clustering.dispatcher.CommandDispatcher; import org.wildfly.clustering.dispatcher.CommandDispatcher;
import org.wildfly.clustering.dispatcher.CommandDispatcherFactory; import org.wildfly.clustering.dispatcher.CommandDispatcherFactory;
Expand All @@ -65,8 +76,7 @@
import org.wildfly.clustering.infinispan.spi.distribution.Locality; import org.wildfly.clustering.infinispan.spi.distribution.Locality;
import org.wildfly.clustering.infinispan.spi.distribution.SimpleLocality; import org.wildfly.clustering.infinispan.spi.distribution.SimpleLocality;
import org.wildfly.clustering.registry.Registry; import org.wildfly.clustering.registry.Registry;
import org.wildfly.clustering.service.concurrent.ServiceExecutor; import org.wildfly.security.manager.WildFlySecurityManager;
import org.wildfly.clustering.service.concurrent.StampedLockServiceExecutor;


/** /**
* A {@link BeanManager} implementation backed by an infinispan cache. * A {@link BeanManager} implementation backed by an infinispan cache.
Expand All @@ -80,6 +90,11 @@
@Listener(primaryOnly = true) @Listener(primaryOnly = true)
public class InfinispanBeanManager<I, T> implements BeanManager<I, T, TransactionBatch> { public class InfinispanBeanManager<I, T> implements BeanManager<I, T, TransactionBatch> {


private static ThreadFactory createThreadFactory() {
PrivilegedAction<ThreadFactory> action = () -> new JBossThreadFactory(new ThreadGroup(InfinispanBeanManager.class.getSimpleName()), Boolean.FALSE, null, "%G - %t", null, null);
return WildFlySecurityManager.doUnchecked(action);
}

private final String beanName; private final String beanName;
private final Cache<BeanKey<I>, BeanEntry<I>> cache; private final Cache<BeanKey<I>, BeanEntry<I>> cache;
private final CacheProperties properties; private final CacheProperties properties;
Expand All @@ -96,11 +111,11 @@ public class InfinispanBeanManager<I, T> implements BeanManager<I, T, Transactio
private final Batcher<TransactionBatch> batcher; private final Batcher<TransactionBatch> batcher;
private final Invoker invoker = new RetryingInvoker(0, 10, 100); private final Invoker invoker = new RetryingInvoker(0, 10, 100);
private final BeanFilter<I> filter; private final BeanFilter<I> filter;
private final AtomicReference<Future<?>> rehashFuture = new AtomicReference<>();


private volatile SchedulerContext<I> schedulerContext; private volatile SchedulerContext<I> schedulerContext;

private volatile ExecutorService executor;
private volatile CommandDispatcher<SchedulerContext<I>> dispatcher; private volatile CommandDispatcher<SchedulerContext<I>> dispatcher;
private volatile ServiceExecutor executor;


public InfinispanBeanManager(InfinispanBeanManagerConfiguration<T> configuration, IdentifierFactory<I> identifierFactory, Configuration<BeanKey<I>, BeanEntry<I>, BeanFactory<I, T>> beanConfiguration, Configuration<BeanGroupKey<I>, BeanGroupEntry<I, T>, BeanGroupFactory<I, T>> groupConfiguration) { public InfinispanBeanManager(InfinispanBeanManagerConfiguration<T> configuration, IdentifierFactory<I> identifierFactory, Configuration<BeanKey<I>, BeanEntry<I>, BeanFactory<I, T>> beanConfiguration, Configuration<BeanGroupKey<I>, BeanGroupEntry<I, T>, BeanGroupFactory<I, T>> groupConfiguration) {
this.beanName = configuration.getBeanName(); this.beanName = configuration.getBeanName();
Expand All @@ -124,7 +139,7 @@ public InfinispanBeanManager(InfinispanBeanManagerConfiguration<T> configuration


@Override @Override
public void start() { public void start() {
this.executor = new StampedLockServiceExecutor(); this.executor = Executors.newSingleThreadExecutor(createThreadFactory());
this.affinity.start(); this.affinity.start();
Time timeout = this.expiration.getTimeout(); Time timeout = this.expiration.getTimeout();
Scheduler<I> noopScheduler = new Scheduler<I>() { Scheduler<I> noopScheduler = new Scheduler<I>() {
Expand Down Expand Up @@ -170,12 +185,18 @@ public Scheduler<I> getBeanGroupScheduler() {


@Override @Override
public void stop() { public void stop() {
this.executor.close(() -> { this.cache.removeListener(this);
this.cache.removeListener(this); PrivilegedAction<List<Runnable>> action = () -> this.executor.shutdownNow();
WildFlySecurityManager.doUnchecked(action);
try {
this.executor.awaitTermination(this.cache.getCacheConfiguration().transaction().cacheStopTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
this.dispatcher.close(); this.dispatcher.close();
this.schedulerContext.close(); this.schedulerContext.close();
this.affinity.stop(); this.affinity.stop();
}); }
} }


@Override @Override
Expand Down Expand Up @@ -285,13 +306,11 @@ public void passivated(CacheEntryPassivatedEvent<BeanKey<I>, BeanEntry<I>> event
if (event.isPre()) { if (event.isPre()) {
this.passiveCount.incrementAndGet(); this.passiveCount.incrementAndGet();
if (!this.properties.isPersistent()) { if (!this.properties.isPersistent()) {
this.executor.execute(() -> { I groupId = event.getValue().getGroupId();
I groupId = event.getValue().getGroupId(); BeanGroupEntry<I, T> entry = this.groupFactory.findValue(groupId);
BeanGroupEntry<I, T> entry = this.groupFactory.findValue(groupId); if (entry != null) {
if (entry != null) { this.groupFactory.createGroup(groupId, entry).prePassivate(event.getKey().getId(), this.passivation.getPassivationListener());
this.groupFactory.createGroup(groupId, entry).prePassivate(event.getKey().getId(), this.passivation.getPassivationListener()); }
}
});
} }
} }
} }
Expand All @@ -301,40 +320,56 @@ public void activated(CacheEntryActivatedEvent<BeanKey<I>, BeanEntry<I>> event)
if (!event.isPre()) { if (!event.isPre()) {
this.passiveCount.decrementAndGet(); this.passiveCount.decrementAndGet();
if (!this.properties.isPersistent()) { if (!this.properties.isPersistent()) {
this.executor.execute(() -> { I groupId = event.getValue().getGroupId();
I groupId = event.getValue().getGroupId(); BeanGroupEntry<I, T> entry = this.groupFactory.findValue(groupId);
BeanGroupEntry<I, T> entry = this.groupFactory.findValue(groupId); if (entry != null) {
if (entry != null) { this.groupFactory.createGroup(groupId, entry).postActivate(event.getKey().getId(), this.passivation.getPassivationListener());
this.groupFactory.createGroup(groupId, entry).postActivate(event.getKey().getId(), this.passivation.getPassivationListener()); }
}
});
} }
} }
} }


@DataRehashed @DataRehashed
public void dataRehashed(DataRehashedEvent<BeanKey<I>, BeanEntry<I>> event) { public void dataRehashed(DataRehashedEvent<BeanKey<I>, BeanEntry<I>> event) {
this.executor.execute(() -> { Address localAddress = this.cache.getCacheManager().getAddress();
Address localAddress = this.cache.getCacheManager().getAddress(); Locality newLocality = new ConsistentHashLocality(localAddress, event.getConsistentHashAtEnd());
if (event.isPre()) {
Future<?> future = this.rehashFuture.getAndSet(null);
if (future != null) {
future.cancel(true);
}
try {
this.executor.submit(() -> {
this.schedulerContext.getBeanScheduler().cancel(newLocality);
this.schedulerContext.getBeanGroupScheduler().cancel(newLocality);
});
} catch (RejectedExecutionException e) {
// Executor was shutdown
}
} else {
Locality oldLocality = new ConsistentHashLocality(localAddress, event.getConsistentHashAtStart()); Locality oldLocality = new ConsistentHashLocality(localAddress, event.getConsistentHashAtStart());
Locality newLocality = new ConsistentHashLocality(localAddress, event.getConsistentHashAtEnd()); try {
if (event.isPre()) { this.rehashFuture.set(this.executor.submit(() -> this.schedule(oldLocality, newLocality)));
this.schedulerContext.getBeanScheduler().cancel(newLocality); } catch (RejectedExecutionException e) {
this.schedulerContext.getBeanGroupScheduler().cancel(newLocality); // Executor was shutdown
} else {
this.schedule(oldLocality, newLocality);
} }
}); }
} }


private void schedule(Locality oldLocality, Locality newLocality) { private void schedule(Locality oldLocality, Locality newLocality) {
// Iterate over sessions in memory // Iterate over beans in memory
try (Stream<Map.Entry<BeanKey<I>, BeanEntry<I>>> entries = this.cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_LOAD).entrySet().stream()) { try (Stream<Map.Entry<BeanKey<I>, BeanEntry<I>>> stream = this.cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_LOAD).entrySet().stream().filter(this.filter)) {
// If we are the new primary owner of this session then schedule expiration of this session locally Iterator<Map.Entry<BeanKey<I>, BeanEntry<I>>> entries = stream.iterator();
entries.filter(this.filter).filter(entry -> !oldLocality.isLocal(entry.getKey()) && newLocality.isLocal(entry.getKey())).forEach(entry -> { while (entries.hasNext()) {
this.schedulerContext.getBeanScheduler().schedule(entry.getKey().getId()); if (Thread.currentThread().isInterrupted()) break;
this.schedulerContext.getBeanGroupScheduler().schedule(entry.getValue().getGroupId()); Map.Entry<BeanKey<I>, BeanEntry<I>> entry = entries.next();
}); BeanKey<I> key = entry.getKey();
// If we are the new primary owner of this bean then schedule expiration of this bean locally
if (this.filter.test(this.filter) && !oldLocality.isLocal(key) && newLocality.isLocal(key)) {
this.schedulerContext.getBeanScheduler().schedule(key.getId());
this.schedulerContext.getBeanGroupScheduler().schedule(entry.getValue().getGroupId());
}
}
} }
} }


Expand Down

0 comments on commit 02fac42

Please sign in to comment.