Skip to content

Commit

Permalink
Stabilize singleton deployment integration tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
pferraro committed Sep 30, 2015
1 parent e874bf1 commit d6c9d7b
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 41 deletions.
Expand Up @@ -56,6 +56,8 @@
import org.wildfly.clustering.marshalling.jboss.IndexExternalizer; import org.wildfly.clustering.marshalling.jboss.IndexExternalizer;
import org.wildfly.clustering.marshalling.jboss.MarshallingContext; import org.wildfly.clustering.marshalling.jboss.MarshallingContext;
import org.wildfly.clustering.server.group.JGroupsNodeFactory; import org.wildfly.clustering.server.group.JGroupsNodeFactory;
import org.wildfly.clustering.service.concurrent.ServiceExecutor;
import org.wildfly.clustering.service.concurrent.StampedLockServiceExecutor;


/** /**
* {@link MessageDispatcher} based {@link CommandDispatcherFactory}. * {@link MessageDispatcher} based {@link CommandDispatcherFactory}.
Expand All @@ -68,6 +70,7 @@ public class ChannelCommandDispatcherFactory implements CommandDispatcherFactory
final Map<Object, AtomicReference<Object>> contexts = new ConcurrentHashMap<>(); final Map<Object, AtomicReference<Object>> contexts = new ConcurrentHashMap<>();
final MarshallingContext marshallingContext; final MarshallingContext marshallingContext;


private final ServiceExecutor executor = new StampedLockServiceExecutor();
private final List<Listener> listeners = new CopyOnWriteArrayList<>(); private final List<Listener> listeners = new CopyOnWriteArrayList<>();
private final AtomicReference<View> view = new AtomicReference<>(); private final AtomicReference<View> view = new AtomicReference<>();
private final MessageDispatcher dispatcher; private final MessageDispatcher dispatcher;
Expand Down Expand Up @@ -97,8 +100,10 @@ protected RequestCorrelator createRequestCorrelator(Protocol transport, RequestH


@Override @Override
public void close() { public void close() {
this.dispatcher.stop(); this.executor.close(() -> {
this.dispatcher.getChannel().setUpHandler(null); this.dispatcher.stop();
this.dispatcher.getChannel().setUpHandler(null);
});
} }


@Override @Override
Expand Down Expand Up @@ -216,9 +221,11 @@ public void viewAccepted(View view) {
this.nodeFactory.invalidate(leftMembers); this.nodeFactory.invalidate(leftMembers);
} }


for (Listener listener: this.listeners) { this.executor.execute(() -> {
listener.membershipChanged(oldNodes, newNodes, view instanceof MergeView); for (Listener listener: this.listeners) {
} listener.membershipChanged(oldNodes, newNodes, view instanceof MergeView);
}
});
} }
} }


Expand Down
Expand Up @@ -54,11 +54,11 @@ public interface ClusteringServerLogger {


@LogMessage(level = INFO) @LogMessage(level = INFO)
@Message(id = 1, value = "This node will now operate as the singleton provider of the %s service") @Message(id = 1, value = "This node will now operate as the singleton provider of the %s service")
void electedMaster(String service); void startSingleton(String service);


@LogMessage(level = INFO) @LogMessage(level = INFO)
@Message(id = 2, value = "This node will no longer operate as the singleton provider of the %s service") @Message(id = 2, value = "This node will no longer operate as the singleton provider of the %s service")
void electedSlave(String service); void stopSingleton(String service);


@LogMessage(level = INFO) @LogMessage(level = INFO)
@Message(id = 3, value = "%s elected as the singleton provider of the %s service") @Message(id = 3, value = "%s elected as the singleton provider of the %s service")
Expand Down
Expand Up @@ -123,12 +123,9 @@ public void close() {
} }


void register(Node node, T service) { void register(Node node, T service) {
Set<Node> nodes = new CopyOnWriteArraySet<>(Collections.singleton(node)); Set<Node> nodes = this.cache.getAdvancedCache().withFlags(Flag.FORCE_SYNCHRONOUS).computeIfAbsent(service, key -> new CopyOnWriteArraySet<>(Collections.singleton(node)));
Set<Node> existing = this.cache.getAdvancedCache().withFlags(Flag.FORCE_SYNCHRONOUS).putIfAbsent(service, nodes); if (nodes.add(node)) {
if (existing != null) { this.cache.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES).replace(service, nodes);
if (existing.add(node)) {
this.cache.getAdvancedCache().withFlags(Flag.IGNORE_RETURN_VALUES).replace(service, existing);
}
} }
} }


Expand All @@ -145,8 +142,8 @@ public Set<T> getServices() {


@Override @Override
public void membershipChanged(List<Node> previousMembers, List<Node> members, final boolean merged) { public void membershipChanged(List<Node> previousMembers, List<Node> members, final boolean merged) {
if (this.getGroup().isCoordinator()) { this.executor.execute(() -> {
this.executor.execute(() -> { if (this.getGroup().isCoordinator()) {
Set<Node> deadNodes = new HashSet<>(previousMembers); Set<Node> deadNodes = new HashSet<>(previousMembers);
deadNodes.removeAll(members); deadNodes.removeAll(members);
Set<Node> newNodes = new HashSet<>(members); Set<Node> newNodes = new HashSet<>(members);
Expand All @@ -172,18 +169,20 @@ public void membershipChanged(List<Node> previousMembers, List<Node> members, fi
} }
} }
} }
}); }
} });
} }


@CacheEntryCreated @CacheEntryCreated
@CacheEntryModified @CacheEntryModified
public void modified(CacheEntryEvent<ServiceName, Set<Node>> event) { public void modified(CacheEntryEvent<ServiceName, Set<Node>> event) {
if (event.isPre()) return; if (event.isPre()) return;
Listener listener = this.listeners.get(event.getKey()); this.executor.execute(() -> {
if (listener != null) { Listener listener = this.listeners.get(event.getKey());
listener.providersChanged(event.getValue()); if (listener != null) {
} listener.providersChanged(event.getValue());
}
});
} }


List<T> getServices(Node node) { List<T> getServices(Node node) {
Expand Down
Expand Up @@ -252,6 +252,7 @@ public void providersChanged(Set<Node> nodes) {
public void start() { public void start() {
// If we were not already master // If we were not already master
if (this.master.compareAndSet(false, true)) { if (this.master.compareAndSet(false, true)) {
ClusteringServerLogger.ROOT_LOGGER.startSingleton(this.singletonServiceName.getCanonicalName());
ServiceController<?> service = this.serviceRegistry.getRequiredService(this.targetServiceName); ServiceController<?> service = this.serviceRegistry.getRequiredService(this.targetServiceName);
try { try {
ServiceContainerHelper.start(service); ServiceContainerHelper.start(service);
Expand All @@ -266,6 +267,7 @@ public void start() {
public void stop() { public void stop() {
// If we were the previous master // If we were the previous master
if (this.master.compareAndSet(true, false)) { if (this.master.compareAndSet(true, false)) {
ClusteringServerLogger.ROOT_LOGGER.stopSingleton(this.singletonServiceName.getCanonicalName());
ServiceContainerHelper.stop(this.serviceRegistry.getRequiredService(this.targetServiceName)); ServiceContainerHelper.stop(this.serviceRegistry.getRequiredService(this.targetServiceName));
} }
} }
Expand All @@ -291,7 +293,7 @@ private AtomicReference<T> getRemoteValueRef() {
Map<Node, CommandResponse<AtomicReference<T>>> results = Collections.emptyMap(); Map<Node, CommandResponse<AtomicReference<T>>> results = Collections.emptyMap();
while (results.isEmpty()) { while (results.isEmpty()) {
if (!CacheSingletonServiceBuilder.this.started) { if (!CacheSingletonServiceBuilder.this.started) {
throw new IllegalStateException(ClusteringServerLogger.ROOT_LOGGER.notStarted(CacheSingletonServiceBuilder.this.singletonServiceName.getCanonicalName())); throw ClusteringServerLogger.ROOT_LOGGER.notStarted(CacheSingletonServiceBuilder.this.singletonServiceName.getCanonicalName());
} }
results = CacheSingletonServiceBuilder.this.dispatcher.executeOnCluster(new SingletonValueCommand<T>()); results = CacheSingletonServiceBuilder.this.dispatcher.executeOnCluster(new SingletonValueCommand<T>());
Iterator<CommandResponse<AtomicReference<T>>> responses = results.values().iterator(); Iterator<CommandResponse<AtomicReference<T>>> responses = results.values().iterator();
Expand Down Expand Up @@ -321,6 +323,8 @@ private AtomicReference<T> getRemoteValueRef() {
} }
} }
return results.values().iterator().next().get(); return results.values().iterator().next().get();
} catch (RuntimeException e) {
throw e;
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);
} }
Expand Down
Expand Up @@ -34,26 +34,26 @@
*/ */
public class SingletonDeploymentDescriptorTestCase extends SingletonDeploymentTestCase { public class SingletonDeploymentDescriptorTestCase extends SingletonDeploymentTestCase {


private static final String DEPLOYMENT_NAME = "singleton-deployment-descriptor.war"; private static final String DEPLOYMENT_NAME = "singleton-deployment-descriptor";


public SingletonDeploymentDescriptorTestCase() { public SingletonDeploymentDescriptorTestCase() {
super(DEPLOYMENT_NAME); super(DEPLOYMENT_NAME);
} }


@Deployment(name = DEPLOYMENT_1, managed = false, testable = false) @Deployment(name = SINGLETON_DEPLOYMENT_1, managed = false, testable = false)
@TargetsContainer(CONTAINER_1) @TargetsContainer(CONTAINER_1)
public static Archive<?> deployment0() { public static Archive<?> deployment0() {
return createDeployment(); return createDeployment();
} }


@Deployment(name = DEPLOYMENT_2, managed = false, testable = false) @Deployment(name = SINGLETON_DEPLOYMENT_2, managed = false, testable = false)
@TargetsContainer(CONTAINER_2) @TargetsContainer(CONTAINER_2)
public static Archive<?> deployment1() { public static Archive<?> deployment1() {
return createDeployment(); return createDeployment();
} }


private static Archive<?> createDeployment() { private static Archive<?> createDeployment() {
WebArchive war = ShrinkWrap.create(WebArchive.class, DEPLOYMENT_NAME); WebArchive war = ShrinkWrap.create(WebArchive.class, DEPLOYMENT_NAME + ".war");
war.addPackage(TraceServlet.class.getPackage()); war.addPackage(TraceServlet.class.getPackage());
war.addAsManifestResource(SingletonDeploymentDescriptorTestCase.class.getPackage(), "singleton-deployment.xml", "singleton-deployment.xml"); war.addAsManifestResource(SingletonDeploymentDescriptorTestCase.class.getPackage(), "singleton-deployment.xml", "singleton-deployment.xml");
return war; return war;
Expand Down
Expand Up @@ -34,26 +34,26 @@
*/ */
public class SingletonDeploymentJBossAllTestCase extends SingletonDeploymentTestCase { public class SingletonDeploymentJBossAllTestCase extends SingletonDeploymentTestCase {


private static final String DEPLOYMENT_NAME = "singleton-deployment-jboss-all.war"; private static final String DEPLOYMENT_NAME = "singleton-deployment-jboss-all";


public SingletonDeploymentJBossAllTestCase() { public SingletonDeploymentJBossAllTestCase() {
super(DEPLOYMENT_NAME); super(DEPLOYMENT_NAME);
} }


@Deployment(name = DEPLOYMENT_1, managed = false, testable = false) @Deployment(name = SINGLETON_DEPLOYMENT_1, managed = false, testable = false)
@TargetsContainer(CONTAINER_1) @TargetsContainer(CONTAINER_1)
public static Archive<?> deployment0() { public static Archive<?> deployment0() {
return createDeployment(); return createDeployment();
} }


@Deployment(name = DEPLOYMENT_2, managed = false, testable = false) @Deployment(name = SINGLETON_DEPLOYMENT_2, managed = false, testable = false)
@TargetsContainer(CONTAINER_2) @TargetsContainer(CONTAINER_2)
public static Archive<?> deployment1() { public static Archive<?> deployment1() {
return createDeployment(); return createDeployment();
} }


private static Archive<?> createDeployment() { private static Archive<?> createDeployment() {
WebArchive war = ShrinkWrap.create(WebArchive.class, DEPLOYMENT_NAME); WebArchive war = ShrinkWrap.create(WebArchive.class, DEPLOYMENT_NAME + ".war");
war.addPackage(TraceServlet.class.getPackage()); war.addPackage(TraceServlet.class.getPackage());
war.addAsManifestResource(SingletonDeploymentJBossAllTestCase.class.getPackage(), "jboss-all.xml", "jboss-all.xml"); war.addAsManifestResource(SingletonDeploymentJBossAllTestCase.class.getPackage(), "jboss-all.xml", "jboss-all.xml");
return war; return war;
Expand Down
Expand Up @@ -33,14 +33,19 @@
import org.apache.http.client.methods.HttpTrace; import org.apache.http.client.methods.HttpTrace;
import org.apache.http.client.utils.HttpClientUtils; import org.apache.http.client.utils.HttpClientUtils;
import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.CloseableHttpClient;
import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.arquillian.container.test.api.OperateOnDeployment; import org.jboss.arquillian.container.test.api.OperateOnDeployment;
import org.jboss.arquillian.container.test.api.RunAsClient; import org.jboss.arquillian.container.test.api.RunAsClient;
import org.jboss.arquillian.container.test.api.TargetsContainer;
import org.jboss.arquillian.junit.Arquillian; import org.jboss.arquillian.junit.Arquillian;
import org.jboss.arquillian.test.api.ArquillianResource; import org.jboss.arquillian.test.api.ArquillianResource;
import org.jboss.as.test.clustering.cluster.ClusterAbstractTestCase; import org.jboss.as.test.clustering.cluster.ClusterAbstractTestCase;
import org.jboss.as.test.clustering.cluster.singleton.servlet.TraceServlet; import org.jboss.as.test.clustering.cluster.singleton.servlet.TraceServlet;
import org.jboss.as.test.http.util.TestHttpClientUtils; import org.jboss.as.test.http.util.TestHttpClientUtils;
import org.jboss.as.test.shared.TimeoutUtil; import org.jboss.as.test.shared.TimeoutUtil;
import org.jboss.shrinkwrap.api.Archive;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.spec.WebArchive;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
Expand All @@ -52,24 +57,49 @@
@RunAsClient @RunAsClient
public abstract class SingletonDeploymentTestCase extends ClusterAbstractTestCase { public abstract class SingletonDeploymentTestCase extends ClusterAbstractTestCase {


@SuppressWarnings("unused") private static final String DEPLOYMENT_NAME = "singleton-deployment-helper.war";
private final String deploymentName; static final String SINGLETON_DEPLOYMENT_1 = "singleton-deployment-0";
static final String SINGLETON_DEPLOYMENT_2 = "singleton-deployment-1";

@Deployment(name = DEPLOYMENT_1, managed = false, testable = false)
@TargetsContainer(CONTAINER_1)
public static Archive<?> deploymentHelper0() {
return createDeployment();
}

@Deployment(name = DEPLOYMENT_2, managed = false, testable = false)
@TargetsContainer(CONTAINER_2)
public static Archive<?> deploymentHelper1() {
return createDeployment();
}

private static Archive<?> createDeployment() {
WebArchive war = ShrinkWrap.create(WebArchive.class, DEPLOYMENT_NAME);
war.addPackage(TraceServlet.class.getPackage());
return war;
}


public static final int DELAY = TimeoutUtil.adjust(5000); public static final int DELAY = TimeoutUtil.adjust(5000);


protected SingletonDeploymentTestCase(String deploymentName) { private final String deploymentName;

SingletonDeploymentTestCase(String deploymentName) {
this.deploymentName = deploymentName; this.deploymentName = deploymentName;
} }


@Test @Test
public void test( public void test(
@ArquillianResource(TraceServlet.class) @OperateOnDeployment(DEPLOYMENT_1) URL baseURL1, @ArquillianResource(TraceServlet.class) @OperateOnDeployment(DEPLOYMENT_1) URL baseURL1,
@ArquillianResource() @OperateOnDeployment(DEPLOYMENT_2) URL baseURL2) @ArquillianResource(TraceServlet.class) @OperateOnDeployment(DEPLOYMENT_2) URL baseURL2)
throws IOException, URISyntaxException, InterruptedException { throws IOException, URISyntaxException, InterruptedException {


URI uri1 = TraceServlet.createURI(baseURL1); this.deploy(SINGLETON_DEPLOYMENT_1);
// baseURL2 will not have been resolved, since the deployment would not have started Thread.sleep(DELAY);
URI uri2 = TraceServlet.createURI(new URL(baseURL2.getProtocol(), baseURL2.getHost(), baseURL2.getPort(), baseURL1.getFile())); this.deploy(SINGLETON_DEPLOYMENT_2);
Thread.sleep(DELAY);

URI uri1 = TraceServlet.createURI(new URL(baseURL1.getProtocol(), baseURL1.getHost(), baseURL1.getPort(), "/" + this.deploymentName + "/"));
URI uri2 = TraceServlet.createURI(new URL(baseURL2.getProtocol(), baseURL2.getHost(), baseURL2.getPort(), "/" + this.deploymentName + "/"));


try (CloseableHttpClient client = TestHttpClientUtils.promiscuousCookieHttpClient()) { try (CloseableHttpClient client = TestHttpClientUtils.promiscuousCookieHttpClient()) {
HttpResponse response = client.execute(new HttpTrace(uri1)); HttpResponse response = client.execute(new HttpTrace(uri1));
Expand All @@ -86,7 +116,7 @@ public void test(
HttpClientUtils.closeQuietly(response); HttpClientUtils.closeQuietly(response);
} }


this.undeploy(DEPLOYMENT_1); this.undeploy(SINGLETON_DEPLOYMENT_1);


Thread.sleep(DELAY); Thread.sleep(DELAY);


Expand All @@ -104,7 +134,7 @@ public void test(
HttpClientUtils.closeQuietly(response); HttpClientUtils.closeQuietly(response);
} }


this.deploy(DEPLOYMENT_1); this.deploy(SINGLETON_DEPLOYMENT_1);


Thread.sleep(DELAY); Thread.sleep(DELAY);


Expand All @@ -122,7 +152,7 @@ public void test(
HttpClientUtils.closeQuietly(response); HttpClientUtils.closeQuietly(response);
} }


this.undeploy(DEPLOYMENT_2); this.undeploy(SINGLETON_DEPLOYMENT_2);


Thread.sleep(DELAY); Thread.sleep(DELAY);


Expand All @@ -140,7 +170,7 @@ public void test(
HttpClientUtils.closeQuietly(response); HttpClientUtils.closeQuietly(response);
} }


this.deploy(DEPLOYMENT_2); this.deploy(SINGLETON_DEPLOYMENT_2);


Thread.sleep(DELAY); Thread.sleep(DELAY);


Expand All @@ -157,6 +187,8 @@ public void test(
} finally { } finally {
HttpClientUtils.closeQuietly(response); HttpClientUtils.closeQuietly(response);
} }
} finally {
this.undeploy(SINGLETON_DEPLOYMENT_1, SINGLETON_DEPLOYMENT_2);
} }
} }
} }

0 comments on commit d6c9d7b

Please sign in to comment.