diff --git a/src/main/java/org/jboss/msc/service/DelegatingServiceContainer.java b/src/main/java/org/jboss/msc/service/DelegatingServiceContainer.java index 95be6dfd9..625b249f3 100644 --- a/src/main/java/org/jboss/msc/service/DelegatingServiceContainer.java +++ b/src/main/java/org/jboss/msc/service/DelegatingServiceContainer.java @@ -207,13 +207,13 @@ public void awaitTermination(long timeout, TimeUnit unit) throws InterruptedExce /** {@inheritDoc} */ @Override - public void awaitStability() throws InterruptedException { + public void awaitStability(final Set> failed, final Set> problem) throws InterruptedException { throw new UnsupportedOperationException(); } /** {@inheritDoc} */ @Override - public boolean awaitStability(final long timeout, final TimeUnit unit) throws InterruptedException { + public boolean awaitStability(final long timeout, final TimeUnit unit, final Set> failed, final Set> problem) throws InterruptedException { throw new UnsupportedOperationException(); } } diff --git a/src/main/java/org/jboss/msc/service/ServiceContainer.java b/src/main/java/org/jboss/msc/service/ServiceContainer.java index 501052b42..cb030c653 100644 --- a/src/main/java/org/jboss/msc/service/ServiceContainer.java +++ b/src/main/java/org/jboss/msc/service/ServiceContainer.java @@ -23,6 +23,7 @@ package org.jboss.msc.service; import java.io.PrintStream; +import java.util.Set; import java.util.concurrent.TimeUnit; /** @@ -73,20 +74,24 @@ public interface ServiceContainer extends ServiceTarget, ServiceRegistry { /** * Causes the current thread to wait until the container is stable. * + * @param failed a set into which failed services should be copied, or {@code null} to ignore + * @param problem a set into which problem services should be copied, or {@code null} to ignore * @throws InterruptedException if the current thread is interrupted * while waiting */ - void awaitStability() throws InterruptedException; + void awaitStability(Set> failed, Set> problem) throws InterruptedException; /** * Causes the current thread to wait until the container is stable. * * @param timeout the maximum time to wait * @param unit the time unit of the {@code timeout} argument + * @param failed a set into which failed services should be copied, or {@code null} to ignore + * @param problem a set into which problem services should be copied, or {@code null} to ignore * @throws InterruptedException if the current thread is interrupted * while waiting */ - boolean awaitStability(long timeout, TimeUnit unit) throws InterruptedException; + boolean awaitStability(long timeout, TimeUnit unit, Set> failed, Set> problem) throws InterruptedException; /** * Dump a complete list of services to {@code System.out}. diff --git a/src/main/java/org/jboss/msc/service/ServiceContainerImpl.java b/src/main/java/org/jboss/msc/service/ServiceContainerImpl.java index 350e8274a..e59f13fd6 100644 --- a/src/main/java/org/jboss/msc/service/ServiceContainerImpl.java +++ b/src/main/java/org/jboss/msc/service/ServiceContainerImpl.java @@ -36,7 +36,6 @@ import java.security.PrivilegedAction; import java.util.ArrayDeque; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -59,9 +58,6 @@ import javax.management.MBeanServer; import javax.management.ObjectName; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; -import java.util.concurrent.locks.LockSupport; import org.jboss.modules.management.ObjectProperties; import org.jboss.modules.ref.Reaper; import org.jboss.modules.ref.Reference; @@ -94,13 +90,13 @@ public String run() { } private final ConcurrentMap registry = new UnlockedReadHashMap(512); + private final long start = System.nanoTime(); - private static final Thread[] NO_WAITERS = new Thread[0]; + private final Set> problems = new IdentityHashSet>(); + private final Set> failed = new IdentityHashSet>(); + private final Object lock = new Object(); - @SuppressWarnings("unused") - private volatile int unstableServices; - private volatile Thread[] stabilityWaiters = NO_WAITERS; - private final long start = System.nanoTime(); + private int unstableServices; private long shutdownInitiated; private final List terminateListeners = new ArrayList(1); @@ -244,9 +240,6 @@ public String dumpServiceDetails(final String serviceName) { } }; - private static final AtomicIntegerFieldUpdater unstableServicesUpdater = AtomicIntegerFieldUpdater.newUpdater(ServiceContainerImpl.class, "unstableServices"); - private static final AtomicReferenceFieldUpdater stabilityWaitersUpdater = AtomicReferenceFieldUpdater.newUpdater(ServiceContainerImpl.class, Thread[].class, "stabilityWaiters"); - ServiceContainerImpl(String name, int coreSize, long timeOut, TimeUnit timeOutUnit, final boolean autoShutdown) { this.autoShutdown = autoShutdown; final int serialNo = SERIAL.getAndIncrement(); @@ -303,16 +296,48 @@ public void handleTermination(final Info info) { } } - int incrementUnstableServices() { - return unstableServicesUpdater.getAndIncrement(this); + void removeProblem(ServiceController controller) { + synchronized (lock) { + problems.remove(controller); + } } - int decrementUnstableServices() { - final int old = unstableServicesUpdater.getAndDecrement(this); - if (old == 1) { - awakenStabilityWaiters(); + void removeFailed(ServiceController controller) { + synchronized (lock) { + failed.remove(controller); + } + } + + void incrementUnstableServices() { + synchronized (lock) { + unstableServices++; + } + } + + void addProblem(ServiceController controller) { + synchronized (lock) { + problems.add(controller); + if (--unstableServices == 0) { + lock.notifyAll(); + } + } + } + + void addFailed(ServiceController controller) { + synchronized (lock) { + failed.add(controller); + if (--unstableServices == 0) { + lock.notifyAll(); + } + } + } + + void decrementUnstableServices() { + synchronized (lock) { + if (--unstableServices == 0) { + lock.notifyAll(); + } } - return old; } boolean isAutoShutdown() { @@ -356,93 +381,39 @@ public void awaitTermination(long timeout, TimeUnit unit) throws InterruptedExce } @Override - public void awaitStability() throws InterruptedException { - try { + public void awaitStability(Set> failed, Set> problem) throws InterruptedException { + synchronized (lock) { while (unstableServices != 0) { - addStabilityWaiter(); - if (unstableServices != 0) { - LockSupport.park(this); - } + lock.wait(); + } + if (failed != null) { + failed.addAll(this.failed); + } + if (problem != null) { + problem.addAll(this.problems); } - } finally { - removeStabilityWaiter(); } } @Override - public boolean awaitStability(final long timeout, final TimeUnit unit) throws InterruptedException { - long t = System.nanoTime(); + public boolean awaitStability(final long timeout, final TimeUnit unit, Set> failed, Set> problem) throws InterruptedException { + long now = System.nanoTime(); long remaining = unit.toNanos(timeout); - try { + synchronized (lock) { while (unstableServices != 0) { if (remaining <= 0L) { return false; } - addStabilityWaiter(); - if (unstableServices != 0) { - LockSupport.parkNanos(this, remaining); - // this subtracts the elapsed time from remaining (rhs is negative but must remain ordered) - remaining += t - (t = System.nanoTime()); - } - } - return true; - } finally { - removeStabilityWaiter(); - } - } - - private void addStabilityWaiter() { - final Thread thread = Thread.currentThread(); - Thread[] oldVal, newVal; - int oldLen; - do { - oldVal = stabilityWaiters; - for (Thread item : oldVal) { - if (item == thread) { - return; - } + lock.wait(remaining / 1000000L, (int) (remaining % 1000000L)); + remaining -= (-now + (now = System.nanoTime())); } - oldLen = oldVal.length; - newVal = Arrays.copyOf(oldVal, oldLen + 1); - newVal[oldLen] = thread; - } while (! stabilityWaitersUpdater.compareAndSet(this, oldVal, newVal)); - } - - private void removeStabilityWaiter() { - final Thread thread = Thread.currentThread(); - Thread[] oldVal, newVal; - int oldLen; - int found; - do { - oldVal = stabilityWaiters; - oldLen = oldVal.length; - found = -1; - for (int i = 0; i < oldLen; i++) { - final Thread item = oldVal[i]; - if (item == thread) { - found = i; - break; - } - } - if (found == -1) { - return; + if (failed != null) { + failed.addAll(this.failed); } - if (oldLen == 1) { - newVal = NO_WAITERS; - } else { - newVal = Arrays.copyOf(oldVal, oldLen - 1); - if (found != oldLen - 1) { - newVal[found] = oldVal[oldLen - 1]; - } + if (problem != null) { + problem.addAll(this.problems); } - newVal[oldLen] = thread; - } while (! stabilityWaitersUpdater.compareAndSet(this, oldVal, newVal)); - } - - private void awakenStabilityWaiters() { - final Thread[] set = stabilityWaitersUpdater.getAndSet(this, NO_WAITERS); - for (Thread thread : set) { - LockSupport.unpark(thread); + return true; } } diff --git a/src/main/java/org/jboss/msc/service/ServiceControllerImpl.java b/src/main/java/org/jboss/msc/service/ServiceControllerImpl.java index fa08ad9c9..996e5efa0 100644 --- a/src/main/java/org/jboss/msc/service/ServiceControllerImpl.java +++ b/src/main/java/org/jboss/msc/service/ServiceControllerImpl.java @@ -552,6 +552,7 @@ void transition(final ArrayList tasks) { break; } case START_REQUESTED_to_PROBLEM: { + getPrimaryRegistration().getContainer().addProblem(this); if (!immediateUnavailableDependencies.isEmpty()) { getListenerTasks(ListenerNotification.IMMEDIATE_DEPENDENCY_UNAVAILABLE, tasks); } @@ -581,6 +582,7 @@ void transition(final ArrayList tasks) { break; } case STARTING_to_START_FAILED: { + getPrimaryRegistration().getContainer().addFailed(this); ChildServiceTarget childTarget = this.childTarget; if (childTarget != null) { childTarget.valid = false; @@ -591,6 +593,7 @@ void transition(final ArrayList tasks) { break; } case START_FAILED_to_STARTING: { + getPrimaryRegistration().getContainer().removeFailed(this); getListenerTasks(transition, tasks); tasks.add(new DependencyRetryingTask(getDependents())); tasks.add(new DependentStartedTask()); @@ -602,6 +605,7 @@ void transition(final ArrayList tasks) { break; } case START_FAILED_to_DOWN: { + getPrimaryRegistration().getContainer().removeFailed(this); startException = null; failCount--; getListenerTasks(transition, tasks); @@ -655,6 +659,7 @@ void transition(final ArrayList tasks) { break; } case PROBLEM_to_START_REQUESTED: { + getPrimaryRegistration().getContainer().removeProblem(this); if (!immediateUnavailableDependencies.isEmpty()) { getListenerTasks(ListenerNotification.IMMEDIATE_DEPENDENCY_AVAILABLE, tasks); } diff --git a/src/test/java/org/jboss/msc/service/ContainerStabilityTestCase.java b/src/test/java/org/jboss/msc/service/ContainerStabilityTestCase.java index ee99b5c00..46f573ebf 100644 --- a/src/test/java/org/jboss/msc/service/ContainerStabilityTestCase.java +++ b/src/test/java/org/jboss/msc/service/ContainerStabilityTestCase.java @@ -22,8 +22,11 @@ package org.jboss.msc.service; +import java.util.Set; import org.junit.Test; +import static org.junit.Assert.assertTrue; + /** * @author David M. Lloyd */ @@ -33,12 +36,16 @@ public final class ContainerStabilityTestCase extends AbstractServiceTest { public void testSimpleInstallation() { final ServiceBuilder builder = serviceContainer.addService(ServiceName.of("Test1"), Service.NULL); final ServiceController controller = builder.install(); + final Set problem = new IdentityHashSet(); + final Set failed = new IdentityHashSet(); try { - serviceContainer.awaitStability(); + serviceContainer.awaitStability(failed, problem); } catch (InterruptedException e) { throw new RuntimeException(e); } assertController(controller.getName(), controller); + assertTrue(problem.isEmpty()); + assertTrue(failed.isEmpty()); } @Test @@ -66,12 +73,16 @@ public Object getValue() throws IllegalStateException, IllegalArgumentException builder = serviceContainer.addService(ServiceName.of("Test2"), Service.NULL); builder.setInitialMode(ServiceController.Mode.ON_DEMAND); final ServiceController controller2 = builder.install(); + final Set problem = new IdentityHashSet(); + final Set failed = new IdentityHashSet(); try { - serviceContainer.awaitStability(); + serviceContainer.awaitStability(failed, problem); } catch (InterruptedException e) { throw new RuntimeException(e); } assertController(controller1.getName(), controller1); assertController(controller2.getName(), controller2); + assertTrue(problem.isEmpty()); + assertTrue(failed.isEmpty()); } }