Skip to content

Commit

Permalink
Merge pull request #14422 from pferraro/WFLY-14971
Browse files Browse the repository at this point in the history
WFLY-14971 Singleton deployment tests still failing intermittently
  • Loading branch information
pferraro committed Jul 14, 2021
2 parents bc69be0 + 7b9c20c commit dd4656e
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@
package org.wildfly.clustering.server.provider;

import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -34,34 +33,34 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;

import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.context.Flag;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.TopologyChanged;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.TopologyChangedEvent;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.concurrent.CompletableFutures;
import org.jboss.as.clustering.context.DefaultExecutorService;
import org.jboss.as.clustering.context.DefaultThreadFactory;
import org.jboss.as.clustering.context.ExecutorServiceFactory;
import org.wildfly.clustering.Registration;
import org.wildfly.clustering.dispatcher.Command;
import org.wildfly.clustering.dispatcher.CommandDispatcher;
import org.wildfly.clustering.dispatcher.CommandDispatcherException;
import org.wildfly.clustering.ee.Batch;
import org.wildfly.clustering.ee.Batcher;
import org.wildfly.clustering.ee.Invoker;
import org.wildfly.clustering.ee.cache.CacheProperties;
import org.wildfly.clustering.ee.infinispan.InfinispanCacheProperties;
import org.wildfly.clustering.ee.infinispan.retry.RetryingInvoker;
import org.wildfly.clustering.group.GroupListener;
import org.wildfly.clustering.group.Membership;
import org.wildfly.clustering.group.Node;
import org.wildfly.clustering.infinispan.spi.distribution.ConsistentHashLocality;
import org.wildfly.clustering.infinispan.spi.distribution.Locality;
import org.wildfly.clustering.provider.ServiceProviderRegistration;
import org.wildfly.clustering.provider.ServiceProviderRegistration.Listener;
import org.wildfly.clustering.provider.ServiceProviderRegistry;
Expand All @@ -78,33 +77,29 @@
* @param <T> the service identifier type
*/
@org.infinispan.notifications.Listener
public class CacheServiceProviderRegistry<T> implements ServiceProviderRegistry<T>, GroupListener, AutoCloseable {
public class CacheServiceProviderRegistry<T> implements ServiceProviderRegistry<T>, AutoCloseable {

final Batcher<? extends Batch> batcher;
private final ExecutorService topologyChangeExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory(this.getClass()));
private final Batcher<? extends Batch> batcher;
private final ConcurrentMap<T, Map.Entry<Listener, ExecutorService>> listeners = new ConcurrentHashMap<>();
private final Cache<T, Set<Address>> cache;
private final Group<Address> group;
private final Registration groupRegistration;
private final CommandDispatcher<Set<T>> dispatcher;
private final Invoker invoker;
private final CacheProperties properties;

public CacheServiceProviderRegistry(CacheServiceProviderRegistryConfiguration<T> config) {
this.group = config.getGroup();
this.cache = config.getCache();
this.batcher = config.getBatcher();
this.dispatcher = config.getCommandDispatcherFactory().createCommandDispatcher(config.getId(), this.listeners.keySet(), WildFlySecurityManager.getClassLoaderPrivileged(this.getClass()));
this.cache.addListener(this);
this.groupRegistration = this.group.register(this);
this.invoker = new RetryingInvoker(this.cache);
this.properties = new InfinispanCacheProperties(this.cache.getCacheConfiguration());
}

@Override
public void close() {
this.groupRegistration.close();
this.cache.removeListener(this);
this.dispatcher.close();
this.shutdown(this.topologyChangeExecutor);
// Cleanup any unclosed registrations
for (Map.Entry<Listener, ExecutorService> entry : this.listeners.values()) {
ExecutorService executor = entry.getValue();
Expand All @@ -116,7 +111,7 @@ public void close() {
}

private void shutdown(ExecutorService executor) {
WildFlySecurityManager.doUnchecked(executor, DefaultExecutorService.SHUTDOWN_NOW_ACTION);
WildFlySecurityManager.doUnchecked(executor, DefaultExecutorService.SHUTDOWN_ACTION);
try {
executor.awaitTermination(this.cache.getCacheConfiguration().transaction().cacheStopTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Expand Down Expand Up @@ -147,13 +142,7 @@ public ServiceProviderRegistration<T> register(T service, Listener listener) {
if (entry != newEntry) {
throw new IllegalArgumentException(service.toString());
}
ExceptionRunnable<CacheException> registerAction = new ExceptionRunnable<CacheException>() {
@Override
public void run() throws CacheException {
CacheServiceProviderRegistry.this.registerLocal(service);
}
};
this.invoker.invoke(registerAction);
this.invoker.invoke(new RegisterLocalServiceTask(service));
return new SimpleServiceProviderRegistration<>(service, this, () -> {
Address localAddress = this.group.getAddress(this.group.getLocalMember());
try (Batch batch = this.batcher.createBatch()) {
Expand Down Expand Up @@ -196,58 +185,65 @@ public Set<T> getServices() {
return this.cache.keySet();
}

@Override
public void membershipChanged(Membership previousMembership, Membership membership, final boolean merged) {
if (membership.isCoordinator()) {
Set<Node> previousMembers = new HashSet<>(previousMembership.getMembers());
Set<Node> members = new HashSet<>(membership.getMembers());
List<Address> leftMembers = new ArrayList<>(previousMembers.size());
for (Node previousMember : previousMembers) {
if (!members.contains(previousMember)) {
leftMembers.add(this.group.getAddress(previousMember));
}
}
List<Address> joinedMembers = new ArrayList<>(members.size());
for (Node member : members) {
if (!previousMembers.contains(member)) {
joinedMembers.add(this.group.getAddress(member));
}
}
if (!leftMembers.isEmpty()) {
try (Batch batch = this.batcher.createBatch()) {
try (CloseableIterator<Map.Entry<T, Set<Address>>> entries = this.cache.entrySet().iterator()) {
while (entries.hasNext()) {
Map.Entry<T, Set<Address>> entry = entries.next();
Set<Address> addresses = entry.getValue();
if (addresses.removeAll(leftMembers)) {
entry.setValue(addresses);
}
@TopologyChanged
public void topologyChanged(TopologyChangedEvent<T, Set<Address>> event) {
if (!event.isPre()) {
ConsistentHash previousHash = event.getWriteConsistentHashAtStart();
List<Address> previousMembers = previousHash.getMembers();
ConsistentHash hash = event.getWriteConsistentHashAtEnd();
List<Address> members = hash.getMembers();

if (!members.equals(previousMembers)) {
Cache<T, Set<Address>> cache = event.getCache().getAdvancedCache().withFlags(Flag.FORCE_SYNCHRONOUS);
Address localAddress = cache.getCacheManager().getAddress();

// Determine which nodes have left the cache view
Set<Address> leftMembers = new HashSet<>(previousMembers);
leftMembers.removeAll(members);

if (!leftMembers.isEmpty()) {
Locality locality = new ConsistentHashLocality(cache, hash);
// We're only interested in the entries for which we are the primary owner
Iterator<Address> addresses = leftMembers.iterator();
while (addresses.hasNext()) {
if (!locality.isLocal(addresses.next())) {
addresses.remove();
}
}
}
}
if (merged) {
// Re-assert services for new members following merge since these may have been lost following split
Command<Collection<T>, Set<T>> command = new GetLocalServicesCommand<>();
for (Address joinedMember : joinedMembers) {
BiConsumer<Collection<T>, Throwable> completionHandler = new BiConsumer<Collection<T>, Throwable>() {
@Override
public void accept(Collection<T> services, Throwable exception) {
if (services != null) {
try (Batch batch = CacheServiceProviderRegistry.this.batcher.createBatch()) {
for (T service : services) {
CacheServiceProviderRegistry.this.register(joinedMember, service);

// If this is a merge after cluster split: Re-assert services for local member
Set<T> localServices = !previousMembers.contains(localAddress) ? this.listeners.keySet() : Collections.emptySet();

if (!leftMembers.isEmpty() || !localServices.isEmpty()) {
Batcher<? extends Batch> batcher = this.batcher;
Invoker invoker = this.invoker;
try {
this.topologyChangeExecutor.submit(new Runnable() {
@Override
public void run() {
if (!leftMembers.isEmpty()) {
try (Batch batch = batcher.createBatch()) {
try (CloseableIterator<Map.Entry<T, Set<Address>>> entries = cache.getAdvancedCache().withFlags(Flag.FORCE_WRITE_LOCK).entrySet().iterator()) {
while (entries.hasNext()) {
Map.Entry<T, Set<Address>> entry = entries.next();
Set<Address> addresses = entry.getValue();
if (addresses.removeAll(leftMembers)) {
entry.setValue(addresses);
}
}
}
}
}
if (!localServices.isEmpty()) {
for (T localService : localServices) {
invoker.invoke(new RegisterLocalServiceTask(localService));
}
}
} else if (exception != null) {
ClusteringServerLogger.ROOT_LOGGER.warn(exception.getLocalizedMessage(), exception);
}
}
};
try {
this.dispatcher.executeOnMember(command, this.group.createNode(joinedMember)).whenComplete(completionHandler);
} catch (CommandDispatcherException e) {
ClusteringServerLogger.ROOT_LOGGER.warn(e.getLocalizedMessage(), e);
});
} catch (RejectedExecutionException e) {
// Executor is shutdown
}
}
}
Expand Down Expand Up @@ -283,4 +279,17 @@ public CompletionStage<Void> modified(CacheEntryEvent<T, Set<Address>> event) {
}
return CompletableFutures.completedNull();
}

private class RegisterLocalServiceTask implements ExceptionRunnable<CacheException> {
private final T localService;

RegisterLocalServiceTask(T localService) {
this.localService = localService;
}

@Override
public void run() {
CacheServiceProviderRegistry.this.registerLocal(this.localService);
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.wildfly.clustering.marshalling.protostream.FunctionalMarshaller;
import org.wildfly.clustering.marshalling.protostream.ProtoStreamMarshaller;
import org.wildfly.clustering.marshalling.protostream.SimpleFieldSetMarshaller;
import org.wildfly.clustering.marshalling.protostream.ValueMarshaller;
import org.wildfly.clustering.server.group.InfinispanAddressMarshaller;

/**
Expand All @@ -39,7 +38,6 @@ public class ServiceProviderRegistrySerializationContextInitializer extends Abst

@Override
public void registerMarshallers(SerializationContext context) {
context.registerMarshaller(new ValueMarshaller<>(GetLocalServicesCommand::new));
ProtoStreamMarshaller<Address> addressMarshaller = new SimpleFieldSetMarshaller<>(Address.class, InfinispanAddressMarshaller.INSTANCE);
context.registerMarshaller(new FunctionalMarshaller<>(ConcurrentAddressSetAddFunction.class, addressMarshaller, ConcurrentAddressSetAddFunction::getOperand, ConcurrentAddressSetAddFunction::new));
context.registerMarshaller(new FunctionalMarshaller<>(ConcurrentAddressSetRemoveFunction.class, addressMarshaller, ConcurrentAddressSetRemoveFunction::getOperand, ConcurrentAddressSetRemoveFunction::new));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,6 @@ import "org.jgroups.util.proto";

// IDs: 140 -144

/**
* @TypeId(140)
*/
message GetLocalServicesCommand {
}

/**
* @TypeId(141)
*/
Expand Down

0 comments on commit dd4656e

Please sign in to comment.