Skip to content

Commit

Permalink
[MSC-122] Add ability to get a copy of the affected controllers on co…
Browse files Browse the repository at this point in the history
…ntainer stability report
  • Loading branch information
dmlloyd committed Nov 19, 2012
1 parent 327fa7d commit d84e8d1
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 98 deletions.
Expand Up @@ -207,13 +207,13 @@ public void awaitTermination(long timeout, TimeUnit unit) throws InterruptedExce

/** {@inheritDoc} */
@Override
public void awaitStability() throws InterruptedException {
public void awaitStability(final Set<? super ServiceController<?>> failed, final Set<? super ServiceController<?>> problem) throws InterruptedException {
throw new UnsupportedOperationException();
}

/** {@inheritDoc} */
@Override
public boolean awaitStability(final long timeout, final TimeUnit unit) throws InterruptedException {
public boolean awaitStability(final long timeout, final TimeUnit unit, final Set<? super ServiceController<?>> failed, final Set<? super ServiceController<?>> problem) throws InterruptedException {
throw new UnsupportedOperationException();
}
}
9 changes: 7 additions & 2 deletions src/main/java/org/jboss/msc/service/ServiceContainer.java
Expand Up @@ -23,6 +23,7 @@
package org.jboss.msc.service;

import java.io.PrintStream;
import java.util.Set;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -73,20 +74,24 @@ public interface ServiceContainer extends ServiceTarget, ServiceRegistry {
/**
* Causes the current thread to wait until the container is stable.
*
* @param failed a set into which failed services should be copied, or {@code null} to ignore
* @param problem a set into which problem services should be copied, or {@code null} to ignore
* @throws InterruptedException if the current thread is interrupted
* while waiting
*/
void awaitStability() throws InterruptedException;
void awaitStability(Set<? super ServiceController<?>> failed, Set<? super ServiceController<?>> problem) throws InterruptedException;

/**
* Causes the current thread to wait until the container is stable.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the {@code timeout} argument
* @param failed a set into which failed services should be copied, or {@code null} to ignore
* @param problem a set into which problem services should be copied, or {@code null} to ignore
* @throws InterruptedException if the current thread is interrupted
* while waiting
*/
boolean awaitStability(long timeout, TimeUnit unit) throws InterruptedException;
boolean awaitStability(long timeout, TimeUnit unit, Set<? super ServiceController<?>> failed, Set<? super ServiceController<?>> problem) throws InterruptedException;

/**
* Dump a complete list of services to {@code System.out}.
Expand Down
155 changes: 63 additions & 92 deletions src/main/java/org/jboss/msc/service/ServiceContainerImpl.java
Expand Up @@ -36,7 +36,6 @@
import java.security.PrivilegedAction;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -59,9 +58,6 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.LockSupport;
import org.jboss.modules.management.ObjectProperties;
import org.jboss.modules.ref.Reaper;
import org.jboss.modules.ref.Reference;
Expand Down Expand Up @@ -94,13 +90,13 @@ public String run() {
}

private final ConcurrentMap<ServiceName, ServiceRegistrationImpl> registry = new UnlockedReadHashMap<ServiceName, ServiceRegistrationImpl>(512);
private final long start = System.nanoTime();

private static final Thread[] NO_WAITERS = new Thread[0];
private final Set<ServiceController<?>> problems = new IdentityHashSet<ServiceController<?>>();
private final Set<ServiceController<?>> failed = new IdentityHashSet<ServiceController<?>>();
private final Object lock = new Object();

@SuppressWarnings("unused")
private volatile int unstableServices;
private volatile Thread[] stabilityWaiters = NO_WAITERS;
private final long start = System.nanoTime();
private int unstableServices;
private long shutdownInitiated;

private final List<TerminateListener> terminateListeners = new ArrayList<TerminateListener>(1);
Expand Down Expand Up @@ -244,9 +240,6 @@ public String dumpServiceDetails(final String serviceName) {
}
};

private static final AtomicIntegerFieldUpdater<ServiceContainerImpl> unstableServicesUpdater = AtomicIntegerFieldUpdater.newUpdater(ServiceContainerImpl.class, "unstableServices");
private static final AtomicReferenceFieldUpdater<ServiceContainerImpl, Thread[]> stabilityWaitersUpdater = AtomicReferenceFieldUpdater.newUpdater(ServiceContainerImpl.class, Thread[].class, "stabilityWaiters");

ServiceContainerImpl(String name, int coreSize, long timeOut, TimeUnit timeOutUnit, final boolean autoShutdown) {
this.autoShutdown = autoShutdown;
final int serialNo = SERIAL.getAndIncrement();
Expand Down Expand Up @@ -303,16 +296,48 @@ public void handleTermination(final Info info) {
}
}

int incrementUnstableServices() {
return unstableServicesUpdater.getAndIncrement(this);
void removeProblem(ServiceController<?> controller) {
synchronized (lock) {
problems.remove(controller);
}
}

int decrementUnstableServices() {
final int old = unstableServicesUpdater.getAndDecrement(this);
if (old == 1) {
awakenStabilityWaiters();
void removeFailed(ServiceController<?> controller) {
synchronized (lock) {
failed.remove(controller);
}
}

void incrementUnstableServices() {
synchronized (lock) {
unstableServices++;
}
}

void addProblem(ServiceController<?> controller) {
synchronized (lock) {
problems.add(controller);
if (--unstableServices == 0) {
lock.notifyAll();
}
}
}

void addFailed(ServiceController<?> controller) {
synchronized (lock) {
failed.add(controller);
if (--unstableServices == 0) {
lock.notifyAll();
}
}
}

void decrementUnstableServices() {
synchronized (lock) {
if (--unstableServices == 0) {
lock.notifyAll();
}
}
return old;
}

boolean isAutoShutdown() {
Expand Down Expand Up @@ -356,93 +381,39 @@ public void awaitTermination(long timeout, TimeUnit unit) throws InterruptedExce
}

@Override
public void awaitStability() throws InterruptedException {
try {
public void awaitStability(Set<? super ServiceController<?>> failed, Set<? super ServiceController<?>> problem) throws InterruptedException {
synchronized (lock) {
while (unstableServices != 0) {
addStabilityWaiter();
if (unstableServices != 0) {
LockSupport.park(this);
}
lock.wait();
}
if (failed != null) {
failed.addAll(this.failed);
}
if (problem != null) {
problem.addAll(this.problems);
}
} finally {
removeStabilityWaiter();
}
}

@Override
public boolean awaitStability(final long timeout, final TimeUnit unit) throws InterruptedException {
long t = System.nanoTime();
public boolean awaitStability(final long timeout, final TimeUnit unit, Set<? super ServiceController<?>> failed, Set<? super ServiceController<?>> problem) throws InterruptedException {
long now = System.nanoTime();
long remaining = unit.toNanos(timeout);
try {
synchronized (lock) {
while (unstableServices != 0) {
if (remaining <= 0L) {
return false;
}
addStabilityWaiter();
if (unstableServices != 0) {
LockSupport.parkNanos(this, remaining);
// this subtracts the elapsed time from remaining (rhs is negative but must remain ordered)
remaining += t - (t = System.nanoTime());
}
}
return true;
} finally {
removeStabilityWaiter();
}
}

private void addStabilityWaiter() {
final Thread thread = Thread.currentThread();
Thread[] oldVal, newVal;
int oldLen;
do {
oldVal = stabilityWaiters;
for (Thread item : oldVal) {
if (item == thread) {
return;
}
lock.wait(remaining / 1000000L, (int) (remaining % 1000000L));
remaining -= (-now + (now = System.nanoTime()));
}
oldLen = oldVal.length;
newVal = Arrays.copyOf(oldVal, oldLen + 1);
newVal[oldLen] = thread;
} while (! stabilityWaitersUpdater.compareAndSet(this, oldVal, newVal));
}

private void removeStabilityWaiter() {
final Thread thread = Thread.currentThread();
Thread[] oldVal, newVal;
int oldLen;
int found;
do {
oldVal = stabilityWaiters;
oldLen = oldVal.length;
found = -1;
for (int i = 0; i < oldLen; i++) {
final Thread item = oldVal[i];
if (item == thread) {
found = i;
break;
}
}
if (found == -1) {
return;
if (failed != null) {
failed.addAll(this.failed);
}
if (oldLen == 1) {
newVal = NO_WAITERS;
} else {
newVal = Arrays.copyOf(oldVal, oldLen - 1);
if (found != oldLen - 1) {
newVal[found] = oldVal[oldLen - 1];
}
if (problem != null) {
problem.addAll(this.problems);
}
newVal[oldLen] = thread;
} while (! stabilityWaitersUpdater.compareAndSet(this, oldVal, newVal));
}

private void awakenStabilityWaiters() {
final Thread[] set = stabilityWaitersUpdater.getAndSet(this, NO_WAITERS);
for (Thread thread : set) {
LockSupport.unpark(thread);
return true;
}
}

Expand Down
Expand Up @@ -552,6 +552,7 @@ void transition(final ArrayList<Runnable> tasks) {
break;
}
case START_REQUESTED_to_PROBLEM: {
getPrimaryRegistration().getContainer().addProblem(this);
if (!immediateUnavailableDependencies.isEmpty()) {
getListenerTasks(ListenerNotification.IMMEDIATE_DEPENDENCY_UNAVAILABLE, tasks);
}
Expand Down Expand Up @@ -581,6 +582,7 @@ void transition(final ArrayList<Runnable> tasks) {
break;
}
case STARTING_to_START_FAILED: {
getPrimaryRegistration().getContainer().addFailed(this);
ChildServiceTarget childTarget = this.childTarget;
if (childTarget != null) {
childTarget.valid = false;
Expand All @@ -591,6 +593,7 @@ void transition(final ArrayList<Runnable> tasks) {
break;
}
case START_FAILED_to_STARTING: {
getPrimaryRegistration().getContainer().removeFailed(this);
getListenerTasks(transition, tasks);
tasks.add(new DependencyRetryingTask(getDependents()));
tasks.add(new DependentStartedTask());
Expand All @@ -602,6 +605,7 @@ void transition(final ArrayList<Runnable> tasks) {
break;
}
case START_FAILED_to_DOWN: {
getPrimaryRegistration().getContainer().removeFailed(this);
startException = null;
failCount--;
getListenerTasks(transition, tasks);
Expand Down Expand Up @@ -655,6 +659,7 @@ void transition(final ArrayList<Runnable> tasks) {
break;
}
case PROBLEM_to_START_REQUESTED: {
getPrimaryRegistration().getContainer().removeProblem(this);
if (!immediateUnavailableDependencies.isEmpty()) {
getListenerTasks(ListenerNotification.IMMEDIATE_DEPENDENCY_AVAILABLE, tasks);
}
Expand Down
Expand Up @@ -22,8 +22,11 @@

package org.jboss.msc.service;

import java.util.Set;
import org.junit.Test;

import static org.junit.Assert.assertTrue;

/**
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
*/
Expand All @@ -33,12 +36,16 @@ public final class ContainerStabilityTestCase extends AbstractServiceTest {
public void testSimpleInstallation() {
final ServiceBuilder<Void> builder = serviceContainer.addService(ServiceName.of("Test1"), Service.NULL);
final ServiceController<Void> controller = builder.install();
final Set<Object> problem = new IdentityHashSet<Object>();
final Set<Object> failed = new IdentityHashSet<Object>();
try {
serviceContainer.awaitStability();
serviceContainer.awaitStability(failed, problem);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertController(controller.getName(), controller);
assertTrue(problem.isEmpty());
assertTrue(failed.isEmpty());
}

@Test
Expand Down Expand Up @@ -66,12 +73,16 @@ public Object getValue() throws IllegalStateException, IllegalArgumentException
builder = serviceContainer.addService(ServiceName.of("Test2"), Service.NULL);
builder.setInitialMode(ServiceController.Mode.ON_DEMAND);
final ServiceController<?> controller2 = builder.install();
final Set<Object> problem = new IdentityHashSet<Object>();
final Set<Object> failed = new IdentityHashSet<Object>();
try {
serviceContainer.awaitStability();
serviceContainer.awaitStability(failed, problem);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertController(controller1.getName(), controller1);
assertController(controller2.getName(), controller2);
assertTrue(problem.isEmpty());
assertTrue(failed.isEmpty());
}
}

0 comments on commit d84e8d1

Please sign in to comment.