Skip to content

Commit

Permalink
WFLY-9428 Distributed shared web sessions can passivate prematurely
Browse files Browse the repository at this point in the history
  • Loading branch information
pferraro committed Dec 21, 2017
1 parent f1aef62 commit 0bfeb93
Show file tree
Hide file tree
Showing 12 changed files with 287 additions and 187 deletions.
@@ -0,0 +1,67 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2017, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.wildfly.clustering.web.infinispan.session;

import java.util.List;

import org.wildfly.clustering.infinispan.spi.distribution.Locality;
import org.wildfly.clustering.web.session.ImmutableSessionMetaData;

/**
* @author Paul Ferraro
*/
public class CompositeScheduler implements Scheduler {
private final List<Scheduler> schedulers;

public CompositeScheduler(List<Scheduler> schedulers) {
this.schedulers = schedulers;
}

@Override
public void schedule(String sessionId, ImmutableSessionMetaData metaData) {
for (Scheduler scheduler : this.schedulers) {
scheduler.schedule(sessionId, metaData);
}
}

@Override
public void cancel(String sessionId) {
for (Scheduler scheduler : this.schedulers) {
scheduler.cancel(sessionId);
}
}

@Override
public void cancel(Locality locality) {
for (Scheduler scheduler : this.schedulers) {
scheduler.cancel(locality);
}
}

@Override
public void close() {
for (Scheduler scheduler : this.schedulers) {
scheduler.close();
}
}
}
Expand Up @@ -21,6 +21,11 @@
*/
package org.wildfly.clustering.web.infinispan.session;

import java.util.Collection;
import java.util.concurrent.CopyOnWriteArraySet;

import org.wildfly.clustering.Registrar;
import org.wildfly.clustering.Registration;
import org.wildfly.clustering.ee.Remover;
import org.wildfly.clustering.web.infinispan.logging.InfinispanWebLogger;
import org.wildfly.clustering.web.session.ImmutableSession;
Expand All @@ -32,14 +37,13 @@
* Session remover that removes a session if and only if it is expired.
* @author Paul Ferraro
*/
public class ExpiredSessionRemover<MV, AV, L> implements Remover<String> {
public class ExpiredSessionRemover<MV, AV, L> implements Remover<String>, Registrar<SessionExpirationListener> {

private final SessionFactory<MV, AV, L> factory;
private final SessionExpirationListener listener;
private final Collection<SessionExpirationListener> listeners = new CopyOnWriteArraySet<>();

public ExpiredSessionRemover(SessionFactory<MV, AV, L> factory, SessionExpirationListener listener) {
public ExpiredSessionRemover(SessionFactory<MV, AV, L> factory) {
this.factory = factory;
this.listener = listener;
}

@Override
Expand All @@ -53,11 +57,19 @@ public boolean remove(String id) {
ImmutableSessionAttributes attributes = this.factory.getAttributesFactory().createImmutableSessionAttributes(id, attributesValue);
ImmutableSession session = this.factory.createImmutableSession(id, metaData, attributes);
InfinispanWebLogger.ROOT_LOGGER.tracef("Session %s has expired.", id);
this.listener.sessionExpired(session);
for (SessionExpirationListener listener : this.listeners) {
listener.sessionExpired(session);
}
}
return this.factory.remove(id);
}
}
return false;
}

@Override
public Registration register(SessionExpirationListener listener) {
this.listeners.add(listener);
return () -> this.listeners.remove(listener);
}
}
Expand Up @@ -21,20 +21,10 @@
*/
package org.wildfly.clustering.web.infinispan.session;

import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -46,36 +36,31 @@
import javax.servlet.http.HttpSessionEvent;

import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.context.Flag;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryActivated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryPassivated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.annotation.DataRehashed;
import org.infinispan.notifications.cachelistener.event.CacheEntryActivatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryPassivatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
import org.infinispan.notifications.cachelistener.event.DataRehashedEvent;
import org.infinispan.remoting.transport.Address;
import org.jboss.threads.JBossThreadFactory;
import org.wildfly.clustering.Registrar;
import org.wildfly.clustering.Registration;
import org.wildfly.clustering.dispatcher.Command;
import org.wildfly.clustering.dispatcher.CommandDispatcher;
import org.wildfly.clustering.dispatcher.CommandDispatcherFactory;
import org.wildfly.clustering.ee.Batch;
import org.wildfly.clustering.ee.Batcher;
import org.wildfly.clustering.ee.Invoker;
import org.wildfly.clustering.ee.Recordable;
import org.wildfly.clustering.ee.infinispan.CacheProperties;
import org.wildfly.clustering.ee.infinispan.RetryingInvoker;
import org.wildfly.clustering.ee.infinispan.TransactionBatch;
import org.wildfly.clustering.group.Group;
import org.wildfly.clustering.group.Node;
import org.wildfly.clustering.infinispan.spi.distribution.CacheLocality;
import org.wildfly.clustering.infinispan.spi.distribution.ConsistentHashLocality;
import org.wildfly.clustering.infinispan.spi.distribution.Key;
import org.wildfly.clustering.infinispan.spi.distribution.Locality;
import org.wildfly.clustering.infinispan.spi.distribution.SimpleLocality;
import org.wildfly.clustering.spi.NodeFactory;
import org.wildfly.clustering.web.IdentifierFactory;
import org.wildfly.clustering.web.infinispan.logging.InfinispanWebLogger;
Expand All @@ -88,7 +73,6 @@
import org.wildfly.clustering.web.session.SessionExpirationListener;
import org.wildfly.clustering.web.session.SessionManager;
import org.wildfly.clustering.web.session.SessionMetaData;
import org.wildfly.security.manager.WildFlySecurityManager;

/**
* Generic session manager implementation - independent of cache mapping strategy.
Expand All @@ -97,97 +81,54 @@
@Listener(primaryOnly = true)
public class InfinispanSessionManager<MV, AV, L> implements SessionManager<L, TransactionBatch> {

private static ThreadFactory createThreadFactory() {
PrivilegedAction<ThreadFactory> action = () -> new JBossThreadFactory(new ThreadGroup(InfinispanSessionManager.class.getSimpleName()), Boolean.FALSE, null, "%G - %t", null, null);
return WildFlySecurityManager.doUnchecked(action);
}

private final Registrar<SessionExpirationListener> expirationRegistrar;
private final SessionExpirationListener expirationListener;
private final Batcher<TransactionBatch> batcher;
private final Cache<Key<String>, ?> cache;
private final CacheProperties properties;
private final SessionFactory<MV, AV, L> factory;
private final IdentifierFactory<String> identifierFactory;
private final CommandDispatcherFactory dispatcherFactory;
private final CommandDispatcher<Scheduler> dispatcher;
private final Group group;
private final NodeFactory<Address> memberFactory;
private final int maxActiveSessions;
private volatile Duration defaultMaxInactiveInterval = Duration.ofMinutes(30L);
private final Invoker invoker = new RetryingInvoker(0, 10, 100);
private final SessionCreationMetaDataKeyFilter filter = new SessionCreationMetaDataKeyFilter();
private final Recordable<ImmutableSession> recorder;
private final ServletContext context;
private final AtomicReference<Future<?>> rehashFuture = new AtomicReference<>();

private volatile CommandDispatcher<Scheduler> dispatcher;
private volatile Scheduler scheduler;
private volatile ExecutorService executor;
private volatile Duration defaultMaxInactiveInterval = Duration.ofMinutes(30L);
private volatile Registration expirationRegistration;

public InfinispanSessionManager(SessionFactory<MV, AV, L> factory, InfinispanSessionManagerConfiguration configuration) {
this.factory = factory;
this.cache = configuration.getCache();
this.properties = configuration.getProperties();
this.expirationRegistrar = configuration.getExpirationRegistar();
this.expirationListener = configuration.getExpirationListener();
this.identifierFactory = configuration.getIdentifierFactory();
this.batcher = configuration.getBatcher();
this.dispatcherFactory = configuration.getCommandDispatcherFactory();
this.dispatcher = configuration.getCommandDispatcher();
this.group = configuration.getGroup();
this.memberFactory = configuration.getMemberFactory();
this.maxActiveSessions = configuration.getMaxActiveSessions();
this.recorder = configuration.getInactiveSessionRecorder();
this.context = configuration.getServletContext();
}

@Override
public void start() {
this.executor = Executors.newSingleThreadExecutor(createThreadFactory());
if (this.recorder != null) {
this.recorder.reset();
}
this.identifierFactory.start();
final List<Scheduler> schedulers = new ArrayList<>(2);
schedulers.add(new SessionExpirationScheduler(this.batcher, new ExpiredSessionRemover<>(this.factory, this.expirationListener)));
if (this.maxActiveSessions >= 0) {
schedulers.add(new SessionEvictionScheduler(this.cache.getName() + ".eviction", this.factory, this.batcher, this.dispatcherFactory, this.maxActiveSessions));
}
this.scheduler = new Scheduler() {
@Override
public void schedule(String sessionId, ImmutableSessionMetaData metaData) {
schedulers.forEach(scheduler -> scheduler.schedule(sessionId, metaData));
}

@Override
public void cancel(String sessionId) {
schedulers.forEach(scheduler -> scheduler.cancel(sessionId));
}

@Override
public void cancel(Locality locality) {
schedulers.forEach(scheduler -> scheduler.cancel(locality));
}

@Override
public void close() {
schedulers.forEach(scheduler -> scheduler.close());
}
};
this.dispatcher = this.dispatcherFactory.createCommandDispatcher(this.cache.getName() + ".schedulers", this.scheduler);
this.cache.addListener(this, this.filter);
this.schedule(new SimpleLocality(false), new CacheLocality(this.cache));
this.expirationRegistration = this.expirationRegistrar.register(this.expirationListener);
}

@Override
public void stop() {
this.cache.removeListener(this);
PrivilegedAction<List<Runnable>> action = () -> this.executor.shutdownNow();
WildFlySecurityManager.doUnchecked(action);
try {
this.executor.awaitTermination(this.cache.getCacheConfiguration().transaction().cacheStopTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
this.dispatcher.close();
this.scheduler.close();
this.identifierFactory.stop();
}
this.expirationRegistration.close();
this.identifierFactory.stop();
}

boolean isPersistent() {
Expand Down Expand Up @@ -221,7 +162,7 @@ private void executeOnPrimaryOwner(final String sessionId, final Command<Void, S
private Node locatePrimaryOwner(String sessionId) {
DistributionManager dist = this.cache.getAdvancedCache().getDistributionManager();
Address address = (dist != null) ? dist.getPrimaryLocation(new Key<>(sessionId)) : null;
return (address != null) ? this.memberFactory.createNode(address) : this.dispatcherFactory.getGroup().getLocalMember();
return (address != null) ? this.memberFactory.createNode(address) : this.group.getLocalMember();
}

@Override
Expand Down Expand Up @@ -302,11 +243,6 @@ private Set<String> getSessions(Flag... flags) {
}
}

@Override
public int getMaxActiveSessions() {
return this.maxActiveSessions;
}

@Override
public long getActiveSessionCount() {
return this.getActiveSessions().size();
Expand Down Expand Up @@ -365,59 +301,6 @@ public void removed(CacheEntryRemovedEvent<SessionCreationMetaDataKey, ?> event)
}
}

@DataRehashed
public void dataRehashed(DataRehashedEvent<SessionCreationMetaDataKey, ?> event) {
Cache<SessionCreationMetaDataKey, ?> cache = event.getCache();
Address localAddress = cache.getCacheManager().getAddress();
Locality newLocality = new ConsistentHashLocality(localAddress, event.getConsistentHashAtEnd());
if (event.isPre()) {
Future<?> future = this.rehashFuture.getAndSet(null);
if (future != null) {
future.cancel(true);
}
try {
this.executor.submit(() -> this.scheduler.cancel(newLocality));
} catch (RejectedExecutionException e) {
// Executor was shutdown
}
} else {
Locality oldLocality = new ConsistentHashLocality(localAddress, event.getConsistentHashAtStart());
try {
this.rehashFuture.set(this.executor.submit(() -> this.schedule(oldLocality, newLocality)));
} catch (RejectedExecutionException e) {
// Executor was shutdown
}
}
}

private void schedule(Locality oldLocality, Locality newLocality) {
SessionMetaDataFactory<MV, L> metaDataFactory = this.factory.getMetaDataFactory();
// Iterate over sessions in memory
try (Stream<Key<String>> stream = this.cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL, Flag.SKIP_CACHE_LOAD).keySet().stream().filter(this.filter)) {
Iterator<Key<String>> keys = stream.iterator();
while (keys.hasNext()) {
if (Thread.currentThread().isInterrupted()) break;
Key<String> key = keys.next();
// If we are the new primary owner of this session then schedule expiration of this session locally
if (this.filter.test(key) && !oldLocality.isLocal(key) && newLocality.isLocal(key)) {
String id = key.getValue();
try (Batch batch = this.batcher.createBatch()) {
try {
// We need to lookup the session to obtain its meta data
MV value = metaDataFactory.tryValue(id);
if (value != null) {
this.scheduler.schedule(id, metaDataFactory.createImmutableSessionMetaData(id, value));
}
return;
} catch (CacheException e) {
batch.discard();
}
}
}
}
}
}

void triggerPrePassivationEvents(ImmutableSession session) {
List<HttpSessionActivationListener> listeners = findListeners(session);
if (!listeners.isEmpty()) {
Expand Down

0 comments on commit 0bfeb93

Please sign in to comment.