Skip to content

Commit

Permalink
WFLY-5108 SingletonService can start multiple master nodes if electio…
Browse files Browse the repository at this point in the history
…n policies are not identical
  • Loading branch information
pferraro committed Aug 14, 2015
1 parent 2455c01 commit 62dece4
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 113 deletions.
Expand Up @@ -60,7 +60,7 @@ interface Listener {
Set<Node> getProviders();

/**
* Indicates that this local can no longer provide this service.
* Indicates that this node can no longer provide this service.
* Once closed, this object is no longer functional.
*/
@Override
Expand Down
Expand Up @@ -105,26 +105,25 @@ public ServiceProviderRegistration<T> register(final T service, Listener listene
return new AbstractServiceProviderRegistration<T>(service, this) {
@Override
public void close() {
if (CacheServiceProviderRegistry.this.listeners.remove(service) != null) {
final Node node = CacheServiceProviderRegistry.this.getGroup().getLocalNode();
try (Batch batch = CacheServiceProviderRegistry.this.batcher.createBatch()) {
Set<Node> nodes = CacheServiceProviderRegistry.this.cache.get(service);
if ((nodes != null) && nodes.remove(node)) {
Cache<T, Set<Node>> cache = CacheServiceProviderRegistry.this.cache.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES);
if (nodes.isEmpty()) {
cache.remove(service);
} else {
cache.replace(service, nodes);
}
Node node = CacheServiceProviderRegistry.this.getGroup().getLocalNode();
try (Batch batch = CacheServiceProviderRegistry.this.batcher.createBatch()) {
Set<Node> nodes = CacheServiceProviderRegistry.this.cache.get(service);
if ((nodes != null) && nodes.remove(node)) {
Cache<T, Set<Node>> cache = CacheServiceProviderRegistry.this.cache.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES);
if (nodes.isEmpty()) {
cache.remove(service);
} else {
cache.replace(service, nodes);
}
}
}
CacheServiceProviderRegistry.this.listeners.remove(service);
}
};
}

@Override
public Set<Node> getProviders(final Object service) {
public Set<Node> getProviders(final T service) {
Set<Node> nodes = this.cache.get(service);
return (nodes != null) ? Collections.unmodifiableSet(nodes) : Collections.<Node>emptySet();
}
Expand Down
Expand Up @@ -70,24 +70,23 @@
@SuppressWarnings("deprecation")
public class CacheSingletonServiceBuilder<T> implements SingletonServiceBuilder<T>, Service<T>, ServiceProviderRegistration.Listener, SingletonContext<T>, Singleton {

private final InjectedValue<Group> group = new InjectedValue<>();
@SuppressWarnings("rawtypes")
private final InjectedValue<ServiceProviderRegistry> registry = new InjectedValue<>();
private final InjectedValue<CommandDispatcherFactory> dispatcherFactory = new InjectedValue<>();
private final Service<T> service;
final ServiceName targetServiceName;
final ServiceName singletonServiceName;
private final ServiceName singletonServiceName;
private final AtomicBoolean master = new AtomicBoolean(false);
private final SingletonContext<T> singletonDispatcher = new SingletonDispatcher();
private final String containerName;
private final String cacheName;

volatile ServiceProviderRegistration<ServiceName> registration;
volatile CommandDispatcher<SingletonContext<T>> dispatcher;
volatile boolean started = false;
private volatile Group group;
private volatile ServiceProviderRegistration<ServiceName> registration;
private volatile CommandDispatcher<SingletonContext<T>> dispatcher;
private volatile boolean started = false;
private volatile SingletonElectionPolicy electionPolicy = new SimpleSingletonElectionPolicy();
private volatile ServiceRegistry serviceRegistry;
volatile int quorum = 1;
private volatile int quorum = 1;

public CacheSingletonServiceBuilder(ServiceName serviceName, Service<T> service, String containerName, String cacheName) {
this.singletonServiceName = serviceName;
Expand Down Expand Up @@ -118,7 +117,6 @@ public void serviceRemoveRequested(ServiceController<? extends T> controller) {
};
final ServiceBuilder<T> singletonBuilder = new AsynchronousServiceBuilder<>(this.singletonServiceName, this).build(target)
.addAliases(this.singletonServiceName.append("singleton"))
.addDependency(CacheGroupServiceName.GROUP.getServiceName(this.containerName, this.cacheName), Group.class, this.group)
.addDependency(CacheGroupServiceName.SERVICE_PROVIDER_REGISTRY.getServiceName(this.containerName, this.cacheName), ServiceProviderRegistry.class, this.registry)
.addDependency(GroupServiceName.COMMAND_DISPATCHER.getServiceName(this.containerName), CommandDispatcherFactory.class, this.dispatcherFactory)
.addListener(listener)
Expand Down Expand Up @@ -192,6 +190,7 @@ public void start(StartContext context) {
this.serviceRegistry = context.getController().getServiceContainer();
this.dispatcher = this.dispatcherFactory.getValue().<SingletonContext<T>>createCommandDispatcher(this.singletonServiceName, this);
ServiceProviderRegistry<ServiceName> registry = this.registry.getValue();
this.group = registry.getGroup();
this.registration = registry.register(this.singletonServiceName, this);
this.started = true;
}
Expand All @@ -210,57 +209,74 @@ public boolean isMaster() {

@Override
public void providersChanged(Set<Node> nodes) {
if (this.elected(nodes)) {
if (!this.master.get()) {
ClusteringServerLogger.ROOT_LOGGER.electedMaster(this.singletonServiceName.getCanonicalName());
this.singletonDispatcher.stopOldMaster();
this.startNewMaster();
List<Node> candidates = this.group.getNodes();
candidates.retainAll(nodes);

// Only run election on a single node
if (candidates.isEmpty() || candidates.get(0).equals(this.group.getLocalNode())) {
Node elected = null;

// First validate that quorum was met
int size = candidates.size();
if (size >= this.quorum) {
if ((this.quorum > 1) && (size == this.quorum)) {
ClusteringServerLogger.ROOT_LOGGER.quorumJustReached(this.singletonServiceName.getCanonicalName(), this.quorum);
}

if (!candidates.isEmpty()) {
elected = this.electionPolicy.elect(candidates);

ClusteringServerLogger.ROOT_LOGGER.elected(elected.getName(), this.singletonServiceName.getCanonicalName());
}
} else if (this.quorum > 1) {
ClusteringServerLogger.ROOT_LOGGER.quorumNotReached(this.singletonServiceName.getCanonicalName(), this.quorum);
}
} else if (this.master.get()) {
ClusteringServerLogger.ROOT_LOGGER.electedSlave(this.singletonServiceName.getCanonicalName());
this.stopOldMaster();
}
}

private boolean elected(Set<Node> candidates) {
int size = candidates.size();
if (size < this.quorum) {
ClusteringServerLogger.ROOT_LOGGER.quorumNotReached(this.singletonServiceName.getCanonicalName(), this.quorum);
return false;
} else if (size == this.quorum) {
ClusteringServerLogger.ROOT_LOGGER.quorumJustReached(this.singletonServiceName.getCanonicalName(), this.quorum);
}
Node elected = this.election(candidates);
if (elected != null) {
ClusteringServerLogger.ROOT_LOGGER.elected(elected.getName(), this.singletonServiceName.getCanonicalName());
try {
if (elected != null) {
// Stop service on every node except elected node
CacheSingletonServiceBuilder.this.dispatcher.executeOnCluster(new StopCommand<>(), elected);
// Start service on elected node
CacheSingletonServiceBuilder.this.dispatcher.executeOnNode(new StartCommand<>(), elected);
} else {
// Stop service on every node
CacheSingletonServiceBuilder.this.dispatcher.executeOnCluster(new StopCommand<>());
}
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
return (elected != null) ? elected.equals(this.group.getValue().getLocalNode()) : false;
}

private Node election(Set<Node> candidates) {
SingletonElectionPolicy policy = this.electionPolicy;
List<Node> nodes = this.group.getValue().getNodes();
nodes.retainAll(candidates);
return !nodes.isEmpty() ? policy.elect(nodes) : null;
@Override
public void start() {
// If we were not already master
if (this.master.compareAndSet(false, true)) {
ServiceController<?> service = this.serviceRegistry.getRequiredService(this.targetServiceName);
try {
ServiceContainerHelper.start(service);
} catch (StartException e) {
ClusteringServerLogger.ROOT_LOGGER.serviceStartFailed(e, this.targetServiceName.getCanonicalName());
ServiceContainerHelper.stop(service);
}
}
}

private void startNewMaster() {
this.master.set(true);
ServiceController<?> service = this.serviceRegistry.getRequiredService(this.targetServiceName);
try {
ServiceContainerHelper.start(service);
} catch (StartException e) {
ClusteringServerLogger.ROOT_LOGGER.serviceStartFailed(e, this.targetServiceName.getCanonicalName());
ServiceContainerHelper.stop(service);
@Override
public void stop() {
// If we were the previous master
if (this.master.compareAndSet(true, false)) {
ServiceContainerHelper.stop(this.serviceRegistry.getRequiredService(this.targetServiceName));
}
}

@Override
public T getValue() {
if (!this.started) throw new IllegalStateException();
// Save ourselves a remote call if we can
AtomicReference<T> ref = this.getValueRef();
if (ref == null) {
ref = this.singletonDispatcher.getValueRef();
ref = this.getRemoteValueRef();
}
return ref.get();
}
Expand All @@ -270,60 +286,43 @@ public AtomicReference<T> getValueRef() {
return this.master.get() ? new AtomicReference<>(this.service.getValue()) : null;
}

@Override
public void stopOldMaster() {
if (this.master.compareAndSet(true, false)) {
ServiceContainerHelper.stop(this.serviceRegistry.getRequiredService(this.targetServiceName));
}
}

class SingletonDispatcher implements SingletonContext<T> {

@Override
public void stopOldMaster() {
try {
CacheSingletonServiceBuilder.this.dispatcher.executeOnCluster(new StopSingletonCommand<T>());
} catch (Exception e) {
throw new IllegalStateException(e);
}
}

@Override
public AtomicReference<T> getValueRef() {
try {
Map<Node, CommandResponse<AtomicReference<T>>> results = Collections.emptyMap();
while (results.isEmpty()) {
if (!CacheSingletonServiceBuilder.this.started) {
throw new IllegalStateException(ClusteringServerLogger.ROOT_LOGGER.notStarted(CacheSingletonServiceBuilder.this.singletonServiceName.getCanonicalName()));
}
results = CacheSingletonServiceBuilder.this.dispatcher.executeOnCluster(new SingletonValueCommand<T>());
Iterator<CommandResponse<AtomicReference<T>>> responses = results.values().iterator();
while (responses.hasNext()) {
if (responses.next().get() == null) {
// Prune non-master results
responses.remove();
}
private AtomicReference<T> getRemoteValueRef() {
try {
Map<Node, CommandResponse<AtomicReference<T>>> results = Collections.emptyMap();
while (results.isEmpty()) {
if (!CacheSingletonServiceBuilder.this.started) {
throw new IllegalStateException(ClusteringServerLogger.ROOT_LOGGER.notStarted(CacheSingletonServiceBuilder.this.singletonServiceName.getCanonicalName()));
}
results = CacheSingletonServiceBuilder.this.dispatcher.executeOnCluster(new SingletonValueCommand<T>());
Iterator<CommandResponse<AtomicReference<T>>> responses = results.values().iterator();
while (responses.hasNext()) {
if (responses.next().get() == null) {
// Prune non-master results
responses.remove();
}
// We expect only 1 result
int count = results.size();
if (count > 1) {
// This would mean there are multiple masters!
throw ClusteringServerLogger.ROOT_LOGGER.unexpectedResponseCount(CacheSingletonServiceBuilder.this.singletonServiceName.getCanonicalName(), count);
}
// We expect only 1 result
int count = results.size();
if (count > 1) {
// This would mean there are multiple masters!
throw ClusteringServerLogger.ROOT_LOGGER.unexpectedResponseCount(CacheSingletonServiceBuilder.this.singletonServiceName.getCanonicalName(), count);
}
if (count == 0) {
ClusteringServerLogger.ROOT_LOGGER.noResponseFromMaster(CacheSingletonServiceBuilder.this.singletonServiceName.getCanonicalName());
// Verify whether there is no master because a quorum was not reached during the last election
if (CacheSingletonServiceBuilder.this.registration.getProviders().size() < CacheSingletonServiceBuilder.this.quorum) {
return new AtomicReference<>();
}
if (count == 0) {
ClusteringServerLogger.ROOT_LOGGER.noResponseFromMaster(CacheSingletonServiceBuilder.this.singletonServiceName.getCanonicalName());
// Verify whether there is no master because a quorum was not reached during the last election
if (CacheSingletonServiceBuilder.this.registration.getProviders().size() < CacheSingletonServiceBuilder.this.quorum) {
return new AtomicReference<>();
}
// Otherwise, we're in the midst of a new master election, so just try again
Thread.yield();
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException();
}
// Otherwise, we're in the midst of a new master election, so just try again
Thread.yield();
}
return results.values().iterator().next().get();
} catch (Exception e) {
throw new IllegalStateException(e);
}
return results.values().iterator().next().get();
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
}
Expand Up @@ -34,6 +34,6 @@ public class SingletonClassTableContributor implements ClassTableContributor {

@Override
public Collection<Class<?>> getKnownClasses() {
return Arrays.<Class<?>>asList(SingletonValueCommand.class, StopSingletonCommand.class);
return Arrays.<Class<?>>asList(SingletonValueCommand.class, StartCommand.class, StopCommand.class);
}
}
Expand Up @@ -24,7 +24,10 @@
import java.util.concurrent.atomic.AtomicReference;

public interface SingletonContext<T> {
void stopOldMaster();

void start();

void stop();

AtomicReference<T> getValueRef();
}
@@ -0,0 +1,39 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2015, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.wildfly.clustering.server.singleton;

import org.wildfly.clustering.dispatcher.Command;

/**
* Command to start a singleton service.
* @author Paul Ferraro
*/
public class StartCommand<T> implements Command<Void, SingletonContext<T>> {
private static final long serialVersionUID = 3194143912789013071L;

@Override
public Void execute(SingletonContext<T> context) throws Exception {
context.start();
return null;
}
}
@@ -1,6 +1,6 @@
/*
* JBoss, Home of Professional Open Source.
* Copyright 2013, Red Hat, Inc., and individual contributors
* Copyright 2015, Red Hat, Inc., and individual contributors
* as indicated by the @author tags. See the copyright.txt file in the
* distribution for a full listing of individual contributors.
*
Expand All @@ -19,14 +19,21 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.wildfly.clustering.server.singleton;

public class StopSingletonCommand<T> implements SingletonCommand<Void, T> {
private static final long serialVersionUID = 3116447976723987749L;
import org.wildfly.clustering.dispatcher.Command;

/**
* Command to stop a singleton service.
* @author Paul Ferraro
*/
public class StopCommand<T> implements Command<Void, SingletonContext<T>> {
private static final long serialVersionUID = 3194143912789013071L;

@Override
public Void execute(SingletonContext<T> context) {
context.stopOldMaster();
public Void execute(SingletonContext<T> context) throws Exception {
context.stop();
return null;
}
}
Expand Up @@ -52,6 +52,7 @@ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws Se
if (service == null) {
throw new ServletException(String.format("No %s specified", SERVICE));
}
this.log(String.format("Received request for %s", service));
Environment env = (Environment) CurrentServiceContainer.getServiceContainer().getService(ServiceName.parse(service)).getValue();
if (env != null) {
resp.setHeader("node", env.getNodeName());
Expand Down

0 comments on commit 62dece4

Please sign in to comment.