Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WFLY-12588 BeanManager.findBean(...) can return an expired bean #12842

Merged
merged 6 commits into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions clustering/ee/cache/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@
</description>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>wildfly-clustering-common</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>wildfly-clustering-ee-spi</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,20 +26,19 @@
import java.time.Instant;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;

import org.jboss.threads.JBossThreadFactory;
import org.jboss.as.clustering.context.DefaultExecutorService;
import org.jboss.as.clustering.context.DefaultThreadFactory;
import org.wildfly.clustering.ee.Scheduler;
import org.wildfly.clustering.service.concurrent.ClassLoaderThreadFactory;
import org.wildfly.security.ParametricPrivilegedAction;
import org.wildfly.security.manager.WildFlySecurityManager;

/**
Expand All @@ -48,34 +47,14 @@
*/
public class LocalScheduler<T> implements Scheduler<T, Instant>, Iterable<T>, Runnable {

private enum ThreadFactoryAction implements ParametricPrivilegedAction<ThreadFactory, Class<?>> {
INSTANCE;

@Override
public ThreadFactory run(Class<?> targetClass) {
ThreadFactory factory = new JBossThreadFactory(new ThreadGroup(targetClass.getSimpleName()), Boolean.FALSE, null, "%G - %t", null, null);
return new ClassLoaderThreadFactory(factory, targetClass.getClassLoader());
}
}

private enum ExecutorServiceAction implements ParametricPrivilegedAction<Void, ExecutorService> {
SHUTDOWN;

@Override
public Void run(ExecutorService executor) {
executor.shutdown();
return null;
}
}

private final ScheduledExecutorService executor;
private final ScheduledEntries<T, Instant> entries;
private final Predicate<T> task;

private volatile Future<?> future = null;

public LocalScheduler(ScheduledEntries<T, Instant> entries, Predicate<T> task) {
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, WildFlySecurityManager.doPrivilegedWithParameter(this.getClass(), ThreadFactoryAction.INSTANCE));
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new DefaultThreadFactory(this.getClass()));
executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
executor.setRemoveOnCancelPolicy(entries.isSorted());
this.executor = executor;
Expand Down Expand Up @@ -137,12 +116,16 @@ public void accept(Map.Entry<T, Instant> entry) {

@Override
public synchronized void close() {
WildFlySecurityManager.doPrivilegedWithParameter(this.executor, DefaultExecutorService.SHUTDOWN_ACTION);
if (this.future != null) {
if (!this.future.isDone()) {
this.future.cancel(true);
try {
this.future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (CancellationException | ExecutionException e) {
// Ignore
}
}
WildFlySecurityManager.doPrivilegedWithParameter(this.executor, ExecutorServiceAction.SHUTDOWN);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,9 @@ public BeanExpirationScheduler(Group group, Batcher<TransactionBatch> batcher, B

@Override
public void schedule(I id) {
try (Batch batch = this.batcher.createBatch()) {
BeanEntry<I> entry = this.factory.findValue(id);
if (entry != null) {
this.schedule(id, entry);
}
BeanEntry<I> entry = this.factory.findValue(id);
if (entry != null) {
this.schedule(id, entry);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
*/
package org.wildfly.clustering.ejb.infinispan;

import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
Expand All @@ -47,6 +46,7 @@
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.infinispan.remoting.transport.Address;
import org.jboss.as.clustering.context.DefaultExecutorService;
import org.jboss.as.clustering.context.DefaultThreadFactory;
import org.jboss.ejb.client.Affinity;
import org.jboss.ejb.client.ClusterAffinity;
Expand Down Expand Up @@ -162,8 +162,7 @@ public void start() {
public void stop() {
this.groupFactory.close();
this.cache.removeListener(this);
PrivilegedAction<List<Runnable>> action = () -> this.executor.shutdownNow();
WildFlySecurityManager.doUnchecked(action);
WildFlySecurityManager.doUnchecked(this.executor, DefaultExecutorService.SHUTDOWN_NOW_ACTION);
try {
this.executor.awaitTermination(this.cache.getCacheConfiguration().transaction().cacheStopTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -222,6 +221,7 @@ public Bean<I, T> createBean(I id, I groupId, T bean) {
return new SchedulableBean<>(this.beanFactory.createBean(id, entry), entry, this.primaryOwnerScheduler);
}

@SuppressWarnings("resource")
@Override
public Bean<I, T> findBean(I id) {
InfinispanEjbLogger.ROOT_LOGGER.tracef("Locating bean %s", id);
Expand All @@ -231,6 +231,11 @@ public Bean<I, T> findBean(I id) {
InfinispanEjbLogger.ROOT_LOGGER.debugf("Could not find bean %s", id);
return null;
}
if (bean.isExpired()) {
InfinispanEjbLogger.ROOT_LOGGER.tracef("Bean %s was found, but has expired", id);
this.beanFactory.remove(id, this.expiration.getRemoveListener());
return null;
}
if (this.primaryOwnerScheduler != null) {
this.primaryOwnerScheduler.cancel(id);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public I getGroupId() {

@Override
public boolean isExpired() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pferraro Am I correct in that before this PR this method was never used outside a test case?

I ask because I don't understand the implications of L78 but TBH if Bean.isExpired was never used you can make the semantics what you like and it doesn't matter if I understand. :) (The L78 change doesn't seem wrong, I just don't pretend to deep understanding of the relationship to group.isCloseable vs bean.isExpired.)

The rest of this PR all seems fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

References to Bean.isExpired() were replaced by BeanEntry.isExpired(...) at some point, which meant only having to look up 1 cache entry to determine if a bean is expired. However, a bean should not be considered expired if it is still referenced (i.e. Bean.isCloseable() returns false). A transaction spanning invocations is a prime example of a bean that is not yet closeable.

return this.entry.isExpired(this.timeout);
return this.entry.isExpired(this.timeout) && this.group.isCloseable();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ public void release() {
public void isExpired() {
when(this.entry.getLastAccessedTime()).thenReturn(Instant.now().minus(Duration.ofMinutes(2)));
when(this.entry.isExpired(this.timeout)).thenCallRealMethod();
when(this.group.isCloseable()).thenReturn(true, false, true);
Assert.assertTrue(this.bean.isExpired());
Assert.assertFalse(this.bean.isExpired());

when(this.entry.getLastAccessedTime()).thenReturn(Instant.now());
Assert.assertFalse(this.bean.isExpired());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.infinispan.affinity.impl.KeyAffinityServiceImpl;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.remoting.transport.Address;
import org.jboss.as.clustering.context.DefaultExecutorService;
import org.jboss.as.clustering.context.DefaultThreadFactory;
import org.jboss.as.clustering.controller.CapabilityServiceNameProvider;
import org.jboss.as.controller.PathAddress;
Expand All @@ -48,13 +49,14 @@
import org.wildfly.clustering.service.AsyncServiceConfigurator;
import org.wildfly.clustering.service.FunctionalService;
import org.wildfly.clustering.service.ServiceConfigurator;
import org.wildfly.security.manager.WildFlySecurityManager;

/**
* Key affinity service factory that will only generates keys for use by the local node.
* Returns a trivial implementation if the specified cache is not distributed.
* @author Paul Ferraro
*/
public class KeyAffinityServiceFactoryServiceConfigurator extends CapabilityServiceNameProvider implements ServiceConfigurator, Function<ExecutorService, KeyAffinityServiceFactory>, Supplier<ExecutorService> {
public class KeyAffinityServiceFactoryServiceConfigurator extends CapabilityServiceNameProvider implements ServiceConfigurator, Function<ExecutorService, KeyAffinityServiceFactory>, Supplier<ExecutorService>, Consumer<ExecutorService> {

private volatile int bufferSize = 100;

Expand Down Expand Up @@ -84,11 +86,16 @@ public <K> KeyAffinityService<K> createService(Cache<K, ?> cache, KeyGenerator<K
};
}

@Override
public void accept(ExecutorService executor) {
WildFlySecurityManager.doUnchecked(executor, DefaultExecutorService.SHUTDOWN_NOW_ACTION);
}

@Override
public ServiceBuilder<?> build(ServiceTarget target) {
ServiceBuilder<?> builder = new AsyncServiceConfigurator(this.getServiceName()).startSynchronously().build(target);
Consumer<KeyAffinityServiceFactory> affinityFactory = builder.provides(this.getServiceName());
Service service = new FunctionalService<>(affinityFactory, this, this, ExecutorService::shutdown);
Service service = new FunctionalService<>(affinityFactory, this, this, this);
return builder.setInstance(service).setInitialMode(ServiceController.Mode.ON_DEMAND);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
*/
package org.wildfly.clustering.web.infinispan.session;

import java.security.PrivilegedAction;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand All @@ -39,6 +37,7 @@
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.jboss.as.clustering.context.DefaultExecutorService;
import org.jboss.as.clustering.context.DefaultThreadFactory;
import org.wildfly.clustering.Registrar;
import org.wildfly.clustering.dispatcher.CommandDispatcherFactory;
Expand Down Expand Up @@ -184,8 +183,7 @@ private SessionAttributesFactory<?> createSessionAttributesFactory(InfinispanSes
@Override
public void close() {
this.cache.removeListener(this);
PrivilegedAction<List<Runnable>> action = () -> this.executor.shutdownNow();
WildFlySecurityManager.doUnchecked(action);
WildFlySecurityManager.doUnchecked(this.executor, DefaultExecutorService.SHUTDOWN_NOW_ACTION);
try {
this.executor.awaitTermination(this.cache.getCacheConfiguration().transaction().cacheStopTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,10 @@ public SessionExpirationScheduler(Batcher<TransactionBatch> batcher, ImmutableSe

@Override
public void schedule(String sessionId) {
try (Batch batch = this.batcher.createBatch()) {
MV value = this.metaDataFactory.findValue(sessionId);
if (value != null) {
ImmutableSessionMetaData metaData = this.metaDataFactory.createImmutableSessionMetaData(sessionId, value);
this.schedule(sessionId, metaData);
}
MV value = this.metaDataFactory.findValue(sessionId);
if (value != null) {
ImmutableSessionMetaData metaData = this.metaDataFactory.createImmutableSessionMetaData(sessionId, value);
this.schedule(sessionId, metaData);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@

<dependencies>
<module name="javax.transaction.api"/>
<module name="org.jboss.as.clustering.common"/>
<module name="org.jboss.logging"/>
<module name="org.jboss.threads"/>
<module name="org.wildfly.clustering.marshalling.api"/>
<module name="org.wildfly.clustering.ee.spi"/>
<module name="org.wildfly.clustering.service"/>
Expand Down