diff --git a/frameworks/cassandra/src/main/java/com/mesosphere/sdk/cassandra/scheduler/CassandraRecoveryPlanOverrider.java b/frameworks/cassandra/src/main/java/com/mesosphere/sdk/cassandra/scheduler/CassandraRecoveryPlanOverrider.java index 0e75b55524c..54605143c6d 100644 --- a/frameworks/cassandra/src/main/java/com/mesosphere/sdk/cassandra/scheduler/CassandraRecoveryPlanOverrider.java +++ b/frameworks/cassandra/src/main/java/com/mesosphere/sdk/cassandra/scheduler/CassandraRecoveryPlanOverrider.java @@ -11,6 +11,7 @@ import com.mesosphere.sdk.scheduler.recovery.RecoveryPlanOverrider; import com.mesosphere.sdk.scheduler.recovery.RecoveryStep; import com.mesosphere.sdk.scheduler.recovery.RecoveryType; +import com.mesosphere.sdk.scheduler.recovery.constrain.UnconstrainedLaunchConstrainer; import com.mesosphere.sdk.specification.CommandSpec; import com.mesosphere.sdk.specification.DefaultCommandSpec; import com.mesosphere.sdk.specification.DefaultPodSpec; @@ -126,6 +127,7 @@ private Phase getNodeRecoveryPhase(Plan inputPlan, int index) { Step replaceStep = new RecoveryStep( inputLaunchStep.getName(), replacePodInstanceRequirement, + new UnconstrainedLaunchConstrainer(), stateStore); List steps = new ArrayList<>(); @@ -153,6 +155,7 @@ private Phase getNodeRecoveryPhase(Plan inputPlan, int index) { return new RecoveryStep( step.getName(), restartPodInstanceRequirement, + new UnconstrainedLaunchConstrainer(), stateStore); }) .collect(Collectors.toList()); diff --git a/frameworks/hdfs/src/main/java/com/mesosphere/sdk/hdfs/scheduler/HdfsRecoveryPlanOverrider.java b/frameworks/hdfs/src/main/java/com/mesosphere/sdk/hdfs/scheduler/HdfsRecoveryPlanOverrider.java index 3a7393c24c4..5e6945a9b75 100644 --- a/frameworks/hdfs/src/main/java/com/mesosphere/sdk/hdfs/scheduler/HdfsRecoveryPlanOverrider.java +++ b/frameworks/hdfs/src/main/java/com/mesosphere/sdk/hdfs/scheduler/HdfsRecoveryPlanOverrider.java @@ -9,6 +9,7 @@ import com.mesosphere.sdk.scheduler.recovery.RecoveryPlanOverrider; import com.mesosphere.sdk.scheduler.recovery.RecoveryStep; import com.mesosphere.sdk.scheduler.recovery.RecoveryType; +import com.mesosphere.sdk.scheduler.recovery.constrain.UnconstrainedLaunchConstrainer; import com.mesosphere.sdk.state.StateStore; import org.slf4j.Logger; @@ -106,6 +107,7 @@ private Phase getRecoveryPhase(Plan inputPlan, int index, String phaseName) { new RecoveryStep( inputBootstrapStep.getName(), bootstrapPodInstanceRequirement, + new UnconstrainedLaunchConstrainer(), stateStore); // JournalNode or NameNode @@ -120,6 +122,7 @@ private Phase getRecoveryPhase(Plan inputPlan, int index, String phaseName) { new RecoveryStep( inputNodeStep.getName(), nameNodePodInstanceRequirement, + new UnconstrainedLaunchConstrainer(), stateStore); return new DefaultPhase( diff --git a/frameworks/helloworld/src/test/java/com/mesosphere/sdk/helloworld/scheduler/ServiceTest.java b/frameworks/helloworld/src/test/java/com/mesosphere/sdk/helloworld/scheduler/ServiceTest.java index 414c8e684c5..979840b098f 100644 --- a/frameworks/helloworld/src/test/java/com/mesosphere/sdk/helloworld/scheduler/ServiceTest.java +++ b/frameworks/helloworld/src/test/java/com/mesosphere/sdk/helloworld/scheduler/ServiceTest.java @@ -8,6 +8,7 @@ import com.mesosphere.sdk.scheduler.recovery.RecoveryPlanOverriderFactory; import com.mesosphere.sdk.scheduler.recovery.RecoveryStep; import com.mesosphere.sdk.scheduler.recovery.RecoveryType; +import com.mesosphere.sdk.scheduler.recovery.constrain.UnconstrainedLaunchConstrainer; import com.mesosphere.sdk.state.StateStore; import com.mesosphere.sdk.state.StateStoreUtils; import com.mesosphere.sdk.storage.Persister; @@ -476,6 +477,7 @@ public Optional override(PodInstanceRequirement podInstanceRequirement) { new RecoveryStep( podInstanceRequirement.getPodInstance().getName(), podInstanceRequirement, + new UnconstrainedLaunchConstrainer(), stateStore)), new SerialStrategy<>(), Collections.emptyList()); diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/TaskUtils.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/TaskUtils.java index c7b2f060246..771f3744b44 100644 --- a/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/TaskUtils.java +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/TaskUtils.java @@ -48,6 +48,7 @@ */ @SuppressWarnings({ + "checkstyle:LineLength", "checkstyle:MethodCount", "checkstyle:ExecutableStatementCount", "checkstyle:ReturnCount", diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/evaluate/OfferEvaluationUtils.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/evaluate/OfferEvaluationUtils.java index dff7388de0d..d706c9a9ecc 100644 --- a/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/evaluate/OfferEvaluationUtils.java +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/evaluate/OfferEvaluationUtils.java @@ -27,6 +27,7 @@ * This class encapsulates shared offer evaluation logic for evaluation stages. */ @SuppressWarnings({ + "checkstyle:LineLength", "checkstyle:InnerTypeLast", "checkstyle:HiddenField", "checkstyle:ThrowsCount", diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/evaluate/OfferEvaluator.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/evaluate/OfferEvaluator.java index c6f190b6026..8c4b911878d 100644 --- a/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/evaluate/OfferEvaluator.java +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/evaluate/OfferEvaluator.java @@ -55,6 +55,7 @@ * in reference to {@link PodInstanceRequirement}s. */ @SuppressWarnings({ + "checkstyle:LineLength", "checkstyle:MultipleStringLiterals", "checkstyle:HiddenField", "checkstyle:ThrowsCount", diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/evaluate/PodInfoBuilder.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/evaluate/PodInfoBuilder.java index 3534d67b671..459bd81b92d 100644 --- a/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/evaluate/PodInfoBuilder.java +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/offer/evaluate/PodInfoBuilder.java @@ -67,6 +67,7 @@ * into {@link OfferRecommendation}s. The ones which are not launched do not get used. */ @SuppressWarnings({ + "checkstyle:LineLength", "checkstyle:LocalVariableName", "checkstyle:VariableDeclarationUsageDistance", "checkstyle:CyclomaticComplexity" diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/DefaultScheduler.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/DefaultScheduler.java index 11363757fb7..27dda7216ed 100644 --- a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/DefaultScheduler.java +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/DefaultScheduler.java @@ -74,6 +74,7 @@ @SuppressWarnings({ "checkstyle:MultipleStringLiterals", "checkstyle:IllegalCatch", + "checkstyle:LineLength", "checkstyle:ParameterNumber", "checkstyle:DeclarationOrder" }) diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/SchedulerBuilder.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/SchedulerBuilder.java index 6758a501cac..74a04085c74 100644 --- a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/SchedulerBuilder.java +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/SchedulerBuilder.java @@ -37,6 +37,9 @@ import com.mesosphere.sdk.scheduler.recovery.DefaultRecoveryPlanManager; import com.mesosphere.sdk.scheduler.recovery.RecoveryPlanOverrider; import com.mesosphere.sdk.scheduler.recovery.RecoveryPlanOverriderFactory; +import com.mesosphere.sdk.scheduler.recovery.constrain.LaunchConstrainer; +import com.mesosphere.sdk.scheduler.recovery.constrain.TimedLaunchConstrainer; +import com.mesosphere.sdk.scheduler.recovery.constrain.UnconstrainedLaunchConstrainer; import com.mesosphere.sdk.scheduler.recovery.monitor.FailureMonitor; import com.mesosphere.sdk.scheduler.recovery.monitor.NeverFailureMonitor; import com.mesosphere.sdk.scheduler.recovery.monitor.TimedFailureMonitor; @@ -565,20 +568,25 @@ private PlanManager getRecoveryPlanManager( logger.info("Adding overriding recovery plan manager."); overrideRecoveryPlanManagers.add(recoveryOverriderFactory.get().create(stateStore, plans)); } + final LaunchConstrainer launchConstrainer; final FailureMonitor failureMonitor; if (serviceSpec.getReplacementFailurePolicy().isPresent()) { ReplacementFailurePolicy failurePolicy = serviceSpec.getReplacementFailurePolicy().get(); + launchConstrainer = new TimedLaunchConstrainer( + Duration.ofMinutes(failurePolicy.getMinReplaceDelayMins())); failureMonitor = new TimedFailureMonitor( Duration.ofMinutes(failurePolicy.getPermanentFailureTimeoutMins()), stateStore, configStore); } else { + launchConstrainer = new UnconstrainedLaunchConstrainer(); failureMonitor = new NeverFailureMonitor(); } return new DefaultRecoveryPlanManager( stateStore, configStore, PlanUtils.getLaunchableTasks(plans), + launchConstrainer, failureMonitor, namespace, overrideRecoveryPlanManagers); diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/plan/PlanUtils.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/plan/PlanUtils.java index 96ec9152b80..cf313e71ab5 100644 --- a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/plan/PlanUtils.java +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/plan/PlanUtils.java @@ -100,6 +100,7 @@ public static boolean isEligible( */ @SuppressWarnings({ "checkstyle:CyclomaticComplexity", + "checkstyle:LineLength", "checkstyle:MultipleStringLiterals" }) public static Status getAggregateStatus( diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/DefaultRecoveryPlanManager.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/DefaultRecoveryPlanManager.java index cf95ac21786..97b2579193e 100644 --- a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/DefaultRecoveryPlanManager.java +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/DefaultRecoveryPlanManager.java @@ -19,6 +19,7 @@ import com.mesosphere.sdk.scheduler.plan.Step; import com.mesosphere.sdk.scheduler.plan.backoff.Backoff; import com.mesosphere.sdk.scheduler.plan.strategy.ParallelStrategy; +import com.mesosphere.sdk.scheduler.recovery.constrain.LaunchConstrainer; import com.mesosphere.sdk.scheduler.recovery.monitor.FailureMonitor; import com.mesosphere.sdk.specification.PodInstance; import com.mesosphere.sdk.specification.ServiceSpec; @@ -47,8 +48,9 @@ * generates a new {@link RecoveryStep} for them and adds them to the recovery Plan, if not already added. */ @SuppressWarnings({ + "checkstyle:LineLength", "checkstyle:DeclarationOrder", - "checkstyle:HiddenField" + "checkstyle:HiddenField", }) public class DefaultRecoveryPlanManager implements PlanManager { public static final String DEFAULT_RECOVERY_PHASE_NAME = "default"; @@ -69,12 +71,15 @@ public class DefaultRecoveryPlanManager implements PlanManager { protected final FailureMonitor failureMonitor; + protected final LaunchConstrainer launchConstrainer; + protected final Object planLock = new Object(); public DefaultRecoveryPlanManager( StateStore stateStore, ConfigStore configStore, Set recoverableTaskNames, + LaunchConstrainer launchConstrainer, FailureMonitor failureMonitor, Optional namespace) { @@ -82,6 +87,7 @@ public DefaultRecoveryPlanManager( stateStore, configStore, recoverableTaskNames, + launchConstrainer, failureMonitor, namespace, Collections.emptyList()); @@ -91,6 +97,7 @@ public DefaultRecoveryPlanManager( StateStore stateStore, ConfigStore configStore, Set recoverableTaskNames, + LaunchConstrainer launchConstrainer, FailureMonitor failureMonitor, Optional namespace, List recoveryPlanOverriders) @@ -100,6 +107,7 @@ public DefaultRecoveryPlanManager( this.configStore = configStore; this.recoverableTaskNames = recoverableTaskNames; this.failureMonitor = failureMonitor; + this.launchConstrainer = launchConstrainer; this.namespace = namespace; this.recoveryPlanOverriders = recoveryPlanOverriders; plan = new DefaultPlan(Constants.RECOVERY_PLAN_NAME, Collections.emptyList()); @@ -140,7 +148,10 @@ protected void setPlanInternal(Plan plan) { public Collection getCandidates(Collection dirtyAssets) { synchronized (planLock) { updatePlan(dirtyAssets); - return getPlan().getCandidates(dirtyAssets); + return getPlan().getCandidates(dirtyAssets).stream() + .filter(step -> + launchConstrainer.canLaunch(((RecoveryStep) step).getRecoveryType())) + .collect(Collectors.toList()); } } @@ -429,6 +440,7 @@ private Step createStep(PodInstanceRequirement podInstanceRequirement) { return new RecoveryStep( podInstanceRequirement.getName(), podInstanceRequirement, + launchConstrainer, stateStore, namespace); } diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/RecoveryStep.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/RecoveryStep.java index fa9d750d091..8584b51105d 100644 --- a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/RecoveryStep.java +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/RecoveryStep.java @@ -1,12 +1,16 @@ package com.mesosphere.sdk.scheduler.recovery; +import com.mesosphere.sdk.offer.LaunchOfferRecommendation; +import com.mesosphere.sdk.offer.OfferRecommendation; import com.mesosphere.sdk.scheduler.plan.DeploymentStep; import com.mesosphere.sdk.scheduler.plan.PodInstanceRequirement; +import com.mesosphere.sdk.scheduler.recovery.constrain.LaunchConstrainer; import com.mesosphere.sdk.state.StateStore; import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; +import java.util.Collection; import java.util.Objects; import java.util.Optional; @@ -15,21 +19,26 @@ */ public class RecoveryStep extends DeploymentStep { + private final LaunchConstrainer launchConstrainer; + public RecoveryStep( String name, PodInstanceRequirement podInstanceRequirement, + LaunchConstrainer launchConstrainer, StateStore stateStore) { - this(name, podInstanceRequirement, stateStore, Optional.empty()); + this(name, podInstanceRequirement, launchConstrainer, stateStore, Optional.empty()); } public RecoveryStep( String name, PodInstanceRequirement podInstanceRequirement, + LaunchConstrainer launchConstrainer, StateStore stateStore, Optional namespace) { super(name, podInstanceRequirement, stateStore, namespace); + this.launchConstrainer = launchConstrainer; } @Override @@ -39,6 +48,17 @@ public void start() { } } + @Override + public void updateOfferStatus(Collection recommendations) { + super.updateOfferStatus(recommendations); + for (OfferRecommendation recommendation : recommendations) { + if (recommendation instanceof LaunchOfferRecommendation) { + launchConstrainer + .launchHappened((LaunchOfferRecommendation) recommendation, getRecoveryType()); + } + } + } + public RecoveryType getRecoveryType() { return podInstanceRequirement.getRecoveryType(); } diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/AllLaunchConstrainer.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/AllLaunchConstrainer.java new file mode 100644 index 00000000000..4066240cb0c --- /dev/null +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/AllLaunchConstrainer.java @@ -0,0 +1,39 @@ +package com.mesosphere.sdk.scheduler.recovery.constrain; + +import com.mesosphere.sdk.offer.LaunchOfferRecommendation; +import com.mesosphere.sdk.scheduler.recovery.RecoveryType; + +import java.util.Arrays; +import java.util.List; + +/** + * A {@link LaunchConstrainer} combinator that ensures that all the given constrainers are satisfied before launching a + * task. Useful to create policies that need to limit launches to a certain rate, and when it's an off-peak time. + *

+ * N.B. When determining whether a launch can happen, this object will short-circuit if any of its {@link + * LaunchConstrainer}s reject the task. + */ +public class AllLaunchConstrainer implements LaunchConstrainer { + private List constrainers; + + public AllLaunchConstrainer(LaunchConstrainer... constrainers) { + this.constrainers = Arrays.asList(constrainers); + } + + @Override + public void launchHappened(LaunchOfferRecommendation recommendation, RecoveryType recoveryType) { + for (LaunchConstrainer constrainer : constrainers) { + constrainer.launchHappened(recommendation, recoveryType); + } + } + + @Override + public boolean canLaunch(RecoveryType recoveryType) { + for (LaunchConstrainer constrainer : constrainers) { + if (!constrainer.canLaunch(recoveryType)) { + return false; + } + } + return true; + } +} diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/LaunchConstrainer.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/LaunchConstrainer.java new file mode 100644 index 00000000000..ac044959703 --- /dev/null +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/LaunchConstrainer.java @@ -0,0 +1,34 @@ +package com.mesosphere.sdk.scheduler.recovery.constrain; + +import com.mesosphere.sdk.offer.LaunchOfferRecommendation; +import com.mesosphere.sdk.scheduler.recovery.RecoveryType; + +import org.apache.mesos.Protos.Offer.Operation; + +/** + * This interface provides methods which govern and react to Launch Operations. It is sometimes desirable to limit the + * rate of Launch Operations. In particular may be desirable to limit the rate of Operations which may be destructive + * in reaction to permanent failures. Notifications regarding launch Operations and desired RecoveryRequirements allow + * LaunchConstrainers to throttle Launch Operations. + */ +public interface LaunchConstrainer { + /** + * Invoked every time a task is launchHappened. + *

+ * We take a {@link Operation} so that frameworks can specify additional metadata, in order to smooth the launch + * rate. + * + * @param recommendation The OfferRecommendation containing the Launch Operation which occurred + * @param recoveryType The type of the recovery which has been executed + */ + void launchHappened(LaunchOfferRecommendation recommendation, RecoveryType recoveryType); + + /** + * Determines whether the given {@link RecoveryType} + * can be launchHappened right now. + * + * @param recoveryType The {@link RecoveryType} to be examined + * @return {@code true} if the offer is safe to launch immediately, false if it should wait + */ + boolean canLaunch(RecoveryType recoveryType); +} diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/TestingLaunchConstrainer.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/TestingLaunchConstrainer.java new file mode 100644 index 00000000000..b5bd8e7ae45 --- /dev/null +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/TestingLaunchConstrainer.java @@ -0,0 +1,31 @@ +package com.mesosphere.sdk.scheduler.recovery.constrain; + +import com.mesosphere.sdk.offer.LaunchOfferRecommendation; +import com.mesosphere.sdk.scheduler.recovery.RecoveryType; + +/** + * {@link LaunchConstrainer} that makes it easy to enable/disable launches for testing. + *

+ * Defaults to allowing all launches. + */ +public class TestingLaunchConstrainer implements LaunchConstrainer { + private boolean canLaunch; + + public TestingLaunchConstrainer() { + this.canLaunch = false; + } + + public void setCanLaunch(boolean canLaunch) { + this.canLaunch = canLaunch; + } + + @Override + public void launchHappened(LaunchOfferRecommendation recommendation, RecoveryType recoveryType) { + // Does nothing when the launch happens + } + + @Override + public boolean canLaunch(RecoveryType recoveryType) { + return this.canLaunch; + } +} diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/TimedLaunchConstrainer.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/TimedLaunchConstrainer.java new file mode 100644 index 00000000000..437faef3788 --- /dev/null +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/TimedLaunchConstrainer.java @@ -0,0 +1,66 @@ +package com.mesosphere.sdk.scheduler.recovery.constrain; + +import com.mesosphere.sdk.offer.LaunchOfferRecommendation; +import com.mesosphere.sdk.offer.LoggingUtils; +import com.mesosphere.sdk.scheduler.recovery.RecoveryType; + +import org.slf4j.Logger; + +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Implements a {@link LaunchConstrainer} that requires a minimum number of seconds to elapse between launches, for + * rate-limiting purposes. + */ +public class TimedLaunchConstrainer implements LaunchConstrainer { + private final Logger logger = LoggingUtils.getLogger(getClass()); + + private AtomicLong lastPermanentRecoveryLaunchMs; + + private Duration minDelay; + + /** + * Create a new constrainer with the given required minimum delay between permanent (destructive) recovery + * operations. + * + * @param minimumDelay Minimum delay between each destructive launch + */ + public TimedLaunchConstrainer(Duration minimumDelay) { + this.minDelay = minimumDelay; + this.lastPermanentRecoveryLaunchMs = new AtomicLong(0); + } + + @Override + public void launchHappened(LaunchOfferRecommendation recommendation, RecoveryType recoveryType) { + if (recoveryType.equals(RecoveryType.PERMANENT)) { + lastPermanentRecoveryLaunchMs.compareAndSet( + lastPermanentRecoveryLaunchMs.get(), + System.currentTimeMillis()); + } + } + + @Override + public boolean canLaunch(RecoveryType recoveryType) { + if (recoveryType.equals(RecoveryType.PERMANENT)) { + long timeLeft = + lastPermanentRecoveryLaunchMs.get() + minDelay.toMillis() - getCurrentTimeMs(); + if (timeLeft < 0) { + return true; + } else { + logger.info( + "Refusing to launch task for another {}s", + TimeUnit.SECONDS.convert(timeLeft, TimeUnit.MILLISECONDS) + ); + return false; + } + } else { + return true; + } + } + + protected long getCurrentTimeMs() { + return System.currentTimeMillis(); + } +} diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/UnconstrainedLaunchConstrainer.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/UnconstrainedLaunchConstrainer.java new file mode 100644 index 00000000000..1817175f9a3 --- /dev/null +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/scheduler/recovery/constrain/UnconstrainedLaunchConstrainer.java @@ -0,0 +1,21 @@ +package com.mesosphere.sdk.scheduler.recovery.constrain; + +import com.mesosphere.sdk.offer.LaunchOfferRecommendation; +import com.mesosphere.sdk.scheduler.recovery.RecoveryType; + +/** + * Implementation of {@link LaunchConstrainer} that always allows launches. + *

+ * This is equivalent to disabling the launch constraining feature. + */ +public class UnconstrainedLaunchConstrainer implements LaunchConstrainer { + @Override + public void launchHappened(LaunchOfferRecommendation recommendation, RecoveryType recoveryType) { + //do nothing + } + + @Override + public boolean canLaunch(RecoveryType recoveryType) { + return true; + } +} diff --git a/sdk/scheduler/src/main/java/com/mesosphere/sdk/specification/DefaultServiceSpec.java b/sdk/scheduler/src/main/java/com/mesosphere/sdk/specification/DefaultServiceSpec.java index b36ddbffd0d..30e5f0c3c65 100644 --- a/sdk/scheduler/src/main/java/com/mesosphere/sdk/specification/DefaultServiceSpec.java +++ b/sdk/scheduler/src/main/java/com/mesosphere/sdk/specification/DefaultServiceSpec.java @@ -71,6 +71,7 @@ * Default implementation of {@link ServiceSpec}. */ @SuppressWarnings({ + "checkstyle:LineLength", "checkstyle:EqualsAvoidNull", "checkstyle:MultipleStringLiterals", "checkstyle:InnerTypeLast", diff --git a/sdk/scheduler/src/test/java/com/mesosphere/sdk/scheduler/recovery/DefaultRecoveryPlanManagerTest.java b/sdk/scheduler/src/test/java/com/mesosphere/sdk/scheduler/recovery/DefaultRecoveryPlanManagerTest.java index 159c9959f1d..a168f14d0cc 100644 --- a/sdk/scheduler/src/test/java/com/mesosphere/sdk/scheduler/recovery/DefaultRecoveryPlanManagerTest.java +++ b/sdk/scheduler/src/test/java/com/mesosphere/sdk/scheduler/recovery/DefaultRecoveryPlanManagerTest.java @@ -6,6 +6,7 @@ import com.mesosphere.sdk.offer.taskdata.TaskLabelWriter; import com.mesosphere.sdk.scheduler.SchedulerConfig; import com.mesosphere.sdk.scheduler.plan.*; +import com.mesosphere.sdk.scheduler.recovery.constrain.TestingLaunchConstrainer; import com.mesosphere.sdk.scheduler.recovery.monitor.TestingFailureMonitor; import com.mesosphere.sdk.specification.DefaultServiceSpec; import com.mesosphere.sdk.specification.ServiceSpec; @@ -60,6 +61,7 @@ public class DefaultRecoveryPlanManagerTest extends DefaultCapabilitiesTestSuite private StateStore stateStore; private ConfigStore configStore; private TestingFailureMonitor failureMonitor; + private TestingLaunchConstrainer launchConstrainer; private PlanCoordinator planCoordinator; private PlanScheduler planScheduler; private PlanManager mockDeployManager; @@ -81,6 +83,7 @@ public void beforeEach() throws Exception { MockitoAnnotations.initMocks(this); failureMonitor = spy(new TestingFailureMonitor()); + launchConstrainer = spy(new TestingLaunchConstrainer()); Persister persister = MemPersister.newBuilder().build(); frameworkStore = new FrameworkStore(persister); stateStore = new StateStore(persister); @@ -105,6 +108,7 @@ public void beforeEach() throws Exception { stateStore, configStore, new HashSet<>(Arrays.asList(taskInfo.getName())), + launchConstrainer, failureMonitor, Optional.empty())); mockDeployManager = mock(PlanManager.class); @@ -126,6 +130,39 @@ public void beforeEach() throws Exception { new DefaultPlanCoordinator(Optional.empty(), Arrays.asList(mockDeployManager, recoveryManager)); } + @Test + @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") + public void ifStoppedTryConstrainedlaunch() throws Exception { + final Protos.TaskStatus status = TaskTestUtils.generateStatus( + taskInfo.getTaskId(), + Protos.TaskState.TASK_FAILED); + + launchConstrainer.setCanLaunch(false); + stateStore.storeTasks(taskInfos); + stateStore.storeStatus(taskInfo.getName(), status); + recoveryManager.update(status); + Collection recommendations = + planScheduler.resourceOffers(getOffers(), planCoordinator.getCandidates()); + + assertEquals(0, recommendations.size()); + // Verify launchConstrainer was used + verify(launchConstrainer, times(1)).canLaunch(any()); + + // Verify that the UI remains stable + for (int i = 0; i < 10; i++) { + planScheduler.resourceOffers(getOffers(), planCoordinator.getCandidates()); + //verify the UI + assertNotNull(recoveryManager.getPlan()); + assertNotNull(recoveryManager.getPlan().getChildren()); + assertNotNull(recoveryManager.getPlan().getChildren().get(0).getChildren()); + assertTrue(recoveryManager.getPlan().getChildren().get(0).getChildren().size() == 1); + assertEquals("test-task-type-0:[test-task-name]", + recoveryManager.getPlan().getChildren().get(0).getChildren().get(0).getName()); + } + + reset(mockDeployManager); + } + @Test @SuppressFBWarnings("RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT") public void ifStoppedDoRelaunch() throws Exception { @@ -136,6 +173,8 @@ public void ifStoppedDoRelaunch() throws Exception { stateStore.storeTasks(taskInfos); stateStore.storeStatus(taskInfo.getName(), status); frameworkStore.storeFrameworkId(TestConstants.FRAMEWORK_ID); + launchConstrainer.setCanLaunch(true); + recoveryManager.update(status); // no dirty @@ -143,6 +182,9 @@ public void ifStoppedDoRelaunch() throws Exception { planScheduler.resourceOffers(getOffers(), planCoordinator.getCandidates()); assertEquals(1, distinctOffers(recommendations).size()); + // Verify launchConstrainer was checked before launch + verify(launchConstrainer, times(1)).canLaunch(any()); + reset(mockDeployManager); } @@ -152,6 +194,7 @@ public void stepWithDifferentNameLaunches() throws Exception { final Protos.TaskStatus status = TaskTestUtils.generateStatus(taskInfo.getTaskId(), Protos.TaskState.TASK_FAILED); final Step step = mock(Step.class); + launchConstrainer.setCanLaunch(true); stateStore.storeTasks(taskInfos); stateStore.storeStatus(taskInfo.getName(), status); frameworkStore.storeFrameworkId(TestConstants.FRAMEWORK_ID); @@ -166,6 +209,39 @@ public void stepWithDifferentNameLaunches() throws Exception { reset(mockDeployManager); } + @Test + public void stoppedTaskTransitionsToFailed() throws Exception { + final List infos = Collections.singletonList(TaskTestUtils.withFailedFlag(taskInfo)); + final Protos.TaskStatus status = TaskTestUtils.generateStatus(taskInfo.getTaskId(), Protos.TaskState.TASK_FAILED); + + failureMonitor.setFailedList(infos.get(0)); + launchConstrainer.setCanLaunch(false); + stateStore.storeTasks(infos); + stateStore.storeStatus(taskInfo.getName(), status); + when(mockDeployManager.getCandidates(Collections.emptyList())).thenReturn(Collections.emptyList()); + + recoveryManager.update(status); + planScheduler.resourceOffers( + getOffers(), + planCoordinator.getCandidates()); + + // Verify that the UI remains stable + for (int i = 0; i < 10; i++) { + planScheduler.resourceOffers( + getOffers(), + planCoordinator.getCandidates()); + + // verify the transition to stopped + assertNotNull(recoveryManager.getPlan()); + assertNotNull(recoveryManager.getPlan().getChildren()); + assertNotNull(recoveryManager.getPlan().getChildren().get(0).getChildren()); + assertTrue(recoveryManager.getPlan().getChildren().get(0).getChildren().size() == 1); + assertEquals("test-task-type-0:[test-task-name]", + recoveryManager.getPlan().getChildren().get(0).getChildren().get(0).getName()); + } + reset(mockDeployManager); + } + @Test public void failedTaskCanBeRestarted() throws Exception { final Protos.TaskStatus status = TaskTestUtils.generateStatus( @@ -173,6 +249,7 @@ public void failedTaskCanBeRestarted() throws Exception { Protos.TaskState.TASK_FAILED); failureMonitor.setFailedList(taskInfo); + launchConstrainer.setCanLaunch(true); stateStore.storeTasks(taskInfos); stateStore.storeStatus(taskInfo.getName(), status); frameworkStore.storeFrameworkId(TestConstants.FRAMEWORK_ID); @@ -208,6 +285,7 @@ public void failedTasksAreNotLaunchedWithInsufficientResources() throws Exceptio Protos.TaskState.TASK_FAILED); failureMonitor.setFailedList(taskInfo); + launchConstrainer.setCanLaunch(true); stateStore.storeTasks(taskInfos); stateStore.storeStatus(taskInfo.getName(), status); frameworkStore.storeFrameworkId(TestConstants.FRAMEWORK_ID); @@ -240,6 +318,7 @@ public void permanentlyFailedTasksAreRescheduled() throws Exception { Protos.TaskState.TASK_FAILED); failureMonitor.setFailedList(failedTaskInfo); + launchConstrainer.setCanLaunch(true); stateStore.storeTasks(infos); stateStore.storeStatus(taskInfo.getName(), status); frameworkStore.storeFrameworkId(TestConstants.FRAMEWORK_ID); @@ -269,6 +348,8 @@ public void testUpdateTaskFailsTwice() throws Exception { taskInfo.getTaskId(), Protos.TaskState.TASK_FAILED); + launchConstrainer.setCanLaunch(true); + // TASK_RUNNING stateStore.storeTasks(taskInfos); stateStore.storeStatus(taskInfo.getName(), runningStatus); @@ -296,6 +377,8 @@ public void testMultipleFailuresSingleTask() throws Exception { taskInfo.getTaskId(), Protos.TaskState.TASK_FAILED); + launchConstrainer.setCanLaunch(true); + // TASK_RUNNING stateStore.storeTasks(taskInfos); stateStore.storeStatus(taskInfo.getName(), runningStatus); diff --git a/sdk/scheduler/src/test/java/com/mesosphere/sdk/scheduler/recovery/TimedLaunchConstrainerTest.java b/sdk/scheduler/src/test/java/com/mesosphere/sdk/scheduler/recovery/TimedLaunchConstrainerTest.java new file mode 100644 index 00000000000..d1946c3e044 --- /dev/null +++ b/sdk/scheduler/src/test/java/com/mesosphere/sdk/scheduler/recovery/TimedLaunchConstrainerTest.java @@ -0,0 +1,108 @@ +package com.mesosphere.sdk.scheduler.recovery; + +import com.mesosphere.sdk.scheduler.recovery.constrain.TimedLaunchConstrainer; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; + +/** + * This class tests the TimedLaunchConstrainer class. + */ +public class TimedLaunchConstrainerTest { + private static final Duration MIN_DELAY = Duration.ofMillis(3000); + private TimedLaunchConstrainer timedLaunchConstrainer; + + private static class TestTimedLaunchConstrainer extends TimedLaunchConstrainer { + private long currentTime; + + public TestTimedLaunchConstrainer(Duration minDelay) { + super(minDelay); + } + + public void setCurrentTime(long currentTime) { + this.currentTime = currentTime; + } + + @Override + protected long getCurrentTimeMs() { + return currentTime; + } + + } + + @Before + public void beforeEach() { + timedLaunchConstrainer = new TimedLaunchConstrainer(MIN_DELAY); + } + + @Test + public void testConstruction() { + Assert.assertNotNull(timedLaunchConstrainer); + } + + @Test + public void testCanLaunchNoneAfterNoRecovery() { + timedLaunchConstrainer.launchHappened(null, RecoveryType.NONE); + Assert.assertTrue(timedLaunchConstrainer.canLaunch(RecoveryType.NONE)); + } + + @Test + public void testCanLaunchTransientAfterNoRecovery() { + timedLaunchConstrainer.launchHappened(null, RecoveryType.NONE); + Assert.assertTrue(timedLaunchConstrainer.canLaunch(RecoveryType.TRANSIENT)); + } + + @Test + public void testCanLaunchPermanentAfterNoRecovery() { + timedLaunchConstrainer.launchHappened(null, RecoveryType.NONE); + Assert.assertTrue(timedLaunchConstrainer.canLaunch(RecoveryType.PERMANENT)); + } + + @Test + public void testCanLaunchNoneAfterTransientRecovery() { + timedLaunchConstrainer.launchHappened(null, RecoveryType.TRANSIENT); + Assert.assertTrue(timedLaunchConstrainer.canLaunch(RecoveryType.NONE)); + } + + @Test + public void testCanLaunchPermanentAfterTransientRecovery() { + timedLaunchConstrainer.launchHappened(null, RecoveryType.TRANSIENT); + Assert.assertTrue(timedLaunchConstrainer.canLaunch(RecoveryType.PERMANENT)); + } + + @Test + public void testCanLaunchTransientAfterTransientRecovery() { + timedLaunchConstrainer.launchHappened(null, RecoveryType.TRANSIENT); + Assert.assertTrue(timedLaunchConstrainer.canLaunch(RecoveryType.TRANSIENT)); + } + + @Test + public void testCanLaunchNoneAfterPermanentRecovery() { + timedLaunchConstrainer.launchHappened(null, RecoveryType.PERMANENT); + Assert.assertTrue(timedLaunchConstrainer.canLaunch(RecoveryType.NONE)); + } + + @Test + public void testCanLaunchTransientAfterPermanentRecovery() { + timedLaunchConstrainer.launchHappened(null, RecoveryType.PERMANENT); + Assert.assertTrue(timedLaunchConstrainer.canLaunch(RecoveryType.TRANSIENT)); + } + + @Test + public void testCannotLaunchPermanentAfterPermanentRecovery() { + timedLaunchConstrainer.launchHappened(null, RecoveryType.PERMANENT); + Assert.assertFalse(timedLaunchConstrainer.canLaunch(RecoveryType.PERMANENT)); + } + + @Test + public void testCanLaunchAfterPermanentRecoveryAndDelay() throws InterruptedException { + TestTimedLaunchConstrainer testTimedLaunchConstrainer = new TestTimedLaunchConstrainer(MIN_DELAY); + testTimedLaunchConstrainer.launchHappened(null, RecoveryType.PERMANENT); + testTimedLaunchConstrainer.setCurrentTime(System.currentTimeMillis()); + Assert.assertFalse(testTimedLaunchConstrainer.canLaunch(RecoveryType.PERMANENT)); + testTimedLaunchConstrainer.setCurrentTime(testTimedLaunchConstrainer.getCurrentTimeMs() + MIN_DELAY.toMillis() * 1000 + 1); + Assert.assertTrue(testTimedLaunchConstrainer.canLaunch(RecoveryType.PERMANENT)); + } +}