Skip to content

Commit

Permalink
Revert "Remove launch constrainer for SDK 60 (#3165) (#3176)"
Browse files Browse the repository at this point in the history
This reverts commit 6cc9bf0
  • Loading branch information
alexeygorobets committed Aug 11, 2020
1 parent 8f0ce83 commit 1d5456b
Show file tree
Hide file tree
Showing 20 changed files with 440 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.mesosphere.sdk.scheduler.recovery.RecoveryPlanOverrider;
import com.mesosphere.sdk.scheduler.recovery.RecoveryStep;
import com.mesosphere.sdk.scheduler.recovery.RecoveryType;
import com.mesosphere.sdk.scheduler.recovery.constrain.UnconstrainedLaunchConstrainer;
import com.mesosphere.sdk.specification.CommandSpec;
import com.mesosphere.sdk.specification.DefaultCommandSpec;
import com.mesosphere.sdk.specification.DefaultPodSpec;
Expand Down Expand Up @@ -126,6 +127,7 @@ private Phase getNodeRecoveryPhase(Plan inputPlan, int index) {
Step replaceStep = new RecoveryStep(
inputLaunchStep.getName(),
replacePodInstanceRequirement,
new UnconstrainedLaunchConstrainer(),
stateStore);

List<Step> steps = new ArrayList<>();
Expand Down Expand Up @@ -153,6 +155,7 @@ private Phase getNodeRecoveryPhase(Plan inputPlan, int index) {
return new RecoveryStep(
step.getName(),
restartPodInstanceRequirement,
new UnconstrainedLaunchConstrainer(),
stateStore);
})
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.mesosphere.sdk.scheduler.recovery.RecoveryPlanOverrider;
import com.mesosphere.sdk.scheduler.recovery.RecoveryStep;
import com.mesosphere.sdk.scheduler.recovery.RecoveryType;
import com.mesosphere.sdk.scheduler.recovery.constrain.UnconstrainedLaunchConstrainer;
import com.mesosphere.sdk.state.StateStore;

import org.slf4j.Logger;
Expand Down Expand Up @@ -106,6 +107,7 @@ private Phase getRecoveryPhase(Plan inputPlan, int index, String phaseName) {
new RecoveryStep(
inputBootstrapStep.getName(),
bootstrapPodInstanceRequirement,
new UnconstrainedLaunchConstrainer(),
stateStore);

// JournalNode or NameNode
Expand All @@ -120,6 +122,7 @@ private Phase getRecoveryPhase(Plan inputPlan, int index, String phaseName) {
new RecoveryStep(
inputNodeStep.getName(),
nameNodePodInstanceRequirement,
new UnconstrainedLaunchConstrainer(),
stateStore);

return new DefaultPhase(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.mesosphere.sdk.scheduler.recovery.RecoveryPlanOverriderFactory;
import com.mesosphere.sdk.scheduler.recovery.RecoveryStep;
import com.mesosphere.sdk.scheduler.recovery.RecoveryType;
import com.mesosphere.sdk.scheduler.recovery.constrain.UnconstrainedLaunchConstrainer;
import com.mesosphere.sdk.state.StateStore;
import com.mesosphere.sdk.state.StateStoreUtils;
import com.mesosphere.sdk.storage.Persister;
Expand Down Expand Up @@ -476,6 +477,7 @@ public Optional<Phase> override(PodInstanceRequirement podInstanceRequirement) {
new RecoveryStep(
podInstanceRequirement.getPodInstance().getName(),
podInstanceRequirement,
new UnconstrainedLaunchConstrainer(),
stateStore)),
new SerialStrategy<>(),
Collections.emptyList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
*/

@SuppressWarnings({
"checkstyle:LineLength",
"checkstyle:MethodCount",
"checkstyle:ExecutableStatementCount",
"checkstyle:ReturnCount",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* This class encapsulates shared offer evaluation logic for evaluation stages.
*/
@SuppressWarnings({
"checkstyle:LineLength",
"checkstyle:InnerTypeLast",
"checkstyle:HiddenField",
"checkstyle:ThrowsCount",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
* in reference to {@link PodInstanceRequirement}s.
*/
@SuppressWarnings({
"checkstyle:LineLength",
"checkstyle:MultipleStringLiterals",
"checkstyle:HiddenField",
"checkstyle:ThrowsCount",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
* into {@link OfferRecommendation}s. The ones which are not launched do not get used.
*/
@SuppressWarnings({
"checkstyle:LineLength",
"checkstyle:LocalVariableName",
"checkstyle:VariableDeclarationUsageDistance",
"checkstyle:CyclomaticComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
@SuppressWarnings({
"checkstyle:MultipleStringLiterals",
"checkstyle:IllegalCatch",
"checkstyle:LineLength",
"checkstyle:ParameterNumber",
"checkstyle:DeclarationOrder"
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
import com.mesosphere.sdk.scheduler.recovery.DefaultRecoveryPlanManager;
import com.mesosphere.sdk.scheduler.recovery.RecoveryPlanOverrider;
import com.mesosphere.sdk.scheduler.recovery.RecoveryPlanOverriderFactory;
import com.mesosphere.sdk.scheduler.recovery.constrain.LaunchConstrainer;
import com.mesosphere.sdk.scheduler.recovery.constrain.TimedLaunchConstrainer;
import com.mesosphere.sdk.scheduler.recovery.constrain.UnconstrainedLaunchConstrainer;
import com.mesosphere.sdk.scheduler.recovery.monitor.FailureMonitor;
import com.mesosphere.sdk.scheduler.recovery.monitor.NeverFailureMonitor;
import com.mesosphere.sdk.scheduler.recovery.monitor.TimedFailureMonitor;
Expand Down Expand Up @@ -565,20 +568,25 @@ private PlanManager getRecoveryPlanManager(
logger.info("Adding overriding recovery plan manager.");
overrideRecoveryPlanManagers.add(recoveryOverriderFactory.get().create(stateStore, plans));
}
final LaunchConstrainer launchConstrainer;
final FailureMonitor failureMonitor;
if (serviceSpec.getReplacementFailurePolicy().isPresent()) {
ReplacementFailurePolicy failurePolicy = serviceSpec.getReplacementFailurePolicy().get();
launchConstrainer = new TimedLaunchConstrainer(
Duration.ofMinutes(failurePolicy.getMinReplaceDelayMins()));
failureMonitor = new TimedFailureMonitor(
Duration.ofMinutes(failurePolicy.getPermanentFailureTimeoutMins()),
stateStore,
configStore);
} else {
launchConstrainer = new UnconstrainedLaunchConstrainer();
failureMonitor = new NeverFailureMonitor();
}
return new DefaultRecoveryPlanManager(
stateStore,
configStore,
PlanUtils.getLaunchableTasks(plans),
launchConstrainer,
failureMonitor,
namespace,
overrideRecoveryPlanManagers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public static boolean isEligible(
*/
@SuppressWarnings({
"checkstyle:CyclomaticComplexity",
"checkstyle:LineLength",
"checkstyle:MultipleStringLiterals"
})
public static Status getAggregateStatus(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.mesosphere.sdk.scheduler.plan.Step;
import com.mesosphere.sdk.scheduler.plan.backoff.Backoff;
import com.mesosphere.sdk.scheduler.plan.strategy.ParallelStrategy;
import com.mesosphere.sdk.scheduler.recovery.constrain.LaunchConstrainer;
import com.mesosphere.sdk.scheduler.recovery.monitor.FailureMonitor;
import com.mesosphere.sdk.specification.PodInstance;
import com.mesosphere.sdk.specification.ServiceSpec;
Expand Down Expand Up @@ -47,8 +48,9 @@
* generates a new {@link RecoveryStep} for them and adds them to the recovery Plan, if not already added.
*/
@SuppressWarnings({
"checkstyle:LineLength",
"checkstyle:DeclarationOrder",
"checkstyle:HiddenField"
"checkstyle:HiddenField",
})
public class DefaultRecoveryPlanManager implements PlanManager {
public static final String DEFAULT_RECOVERY_PHASE_NAME = "default";
Expand All @@ -69,19 +71,23 @@ public class DefaultRecoveryPlanManager implements PlanManager {

protected final FailureMonitor failureMonitor;

protected final LaunchConstrainer launchConstrainer;

protected final Object planLock = new Object();

public DefaultRecoveryPlanManager(
StateStore stateStore,
ConfigStore<ServiceSpec> configStore,
Set<String> recoverableTaskNames,
LaunchConstrainer launchConstrainer,
FailureMonitor failureMonitor,
Optional<String> namespace)
{
this(
stateStore,
configStore,
recoverableTaskNames,
launchConstrainer,
failureMonitor,
namespace,
Collections.emptyList());
Expand All @@ -91,6 +97,7 @@ public DefaultRecoveryPlanManager(
StateStore stateStore,
ConfigStore<ServiceSpec> configStore,
Set<String> recoverableTaskNames,
LaunchConstrainer launchConstrainer,
FailureMonitor failureMonitor,
Optional<String> namespace,
List<RecoveryPlanOverrider> recoveryPlanOverriders)
Expand All @@ -100,6 +107,7 @@ public DefaultRecoveryPlanManager(
this.configStore = configStore;
this.recoverableTaskNames = recoverableTaskNames;
this.failureMonitor = failureMonitor;
this.launchConstrainer = launchConstrainer;
this.namespace = namespace;
this.recoveryPlanOverriders = recoveryPlanOverriders;
plan = new DefaultPlan(Constants.RECOVERY_PLAN_NAME, Collections.emptyList());
Expand Down Expand Up @@ -140,7 +148,10 @@ protected void setPlanInternal(Plan plan) {
public Collection<? extends Step> getCandidates(Collection<PodInstanceRequirement> dirtyAssets) {
synchronized (planLock) {
updatePlan(dirtyAssets);
return getPlan().getCandidates(dirtyAssets);
return getPlan().getCandidates(dirtyAssets).stream()
.filter(step ->
launchConstrainer.canLaunch(((RecoveryStep) step).getRecoveryType()))
.collect(Collectors.toList());
}
}

Expand Down Expand Up @@ -429,6 +440,7 @@ private Step createStep(PodInstanceRequirement podInstanceRequirement) {
return new RecoveryStep(
podInstanceRequirement.getName(),
podInstanceRequirement,
launchConstrainer,
stateStore,
namespace);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.mesosphere.sdk.scheduler.recovery;

import com.mesosphere.sdk.offer.LaunchOfferRecommendation;
import com.mesosphere.sdk.offer.OfferRecommendation;
import com.mesosphere.sdk.scheduler.plan.DeploymentStep;
import com.mesosphere.sdk.scheduler.plan.PodInstanceRequirement;
import com.mesosphere.sdk.scheduler.recovery.constrain.LaunchConstrainer;
import com.mesosphere.sdk.state.StateStore;

import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;

import java.util.Collection;
import java.util.Objects;
import java.util.Optional;

Expand All @@ -15,21 +19,26 @@
*/
public class RecoveryStep extends DeploymentStep {

private final LaunchConstrainer launchConstrainer;

public RecoveryStep(
String name,
PodInstanceRequirement podInstanceRequirement,
LaunchConstrainer launchConstrainer,
StateStore stateStore)
{
this(name, podInstanceRequirement, stateStore, Optional.empty());
this(name, podInstanceRequirement, launchConstrainer, stateStore, Optional.empty());
}

public RecoveryStep(
String name,
PodInstanceRequirement podInstanceRequirement,
LaunchConstrainer launchConstrainer,
StateStore stateStore,
Optional<String> namespace)
{
super(name, podInstanceRequirement, stateStore, namespace);
this.launchConstrainer = launchConstrainer;
}

@Override
Expand All @@ -39,6 +48,17 @@ public void start() {
}
}

@Override
public void updateOfferStatus(Collection<OfferRecommendation> recommendations) {
super.updateOfferStatus(recommendations);
for (OfferRecommendation recommendation : recommendations) {
if (recommendation instanceof LaunchOfferRecommendation) {
launchConstrainer
.launchHappened((LaunchOfferRecommendation) recommendation, getRecoveryType());
}
}
}

public RecoveryType getRecoveryType() {
return podInstanceRequirement.getRecoveryType();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.mesosphere.sdk.scheduler.recovery.constrain;

import com.mesosphere.sdk.offer.LaunchOfferRecommendation;
import com.mesosphere.sdk.scheduler.recovery.RecoveryType;

import java.util.Arrays;
import java.util.List;

/**
* A {@link LaunchConstrainer} combinator that ensures that all the given constrainers are satisfied before launching a
* task. Useful to create policies that need to limit launches to a certain rate, and when it's an off-peak time.
* <p>
* N.B. When determining whether a launch can happen, this object will short-circuit if any of its {@link
* LaunchConstrainer}s reject the task.
*/
public class AllLaunchConstrainer implements LaunchConstrainer {
private List<LaunchConstrainer> constrainers;

public AllLaunchConstrainer(LaunchConstrainer... constrainers) {
this.constrainers = Arrays.asList(constrainers);
}

@Override
public void launchHappened(LaunchOfferRecommendation recommendation, RecoveryType recoveryType) {
for (LaunchConstrainer constrainer : constrainers) {
constrainer.launchHappened(recommendation, recoveryType);
}
}

@Override
public boolean canLaunch(RecoveryType recoveryType) {
for (LaunchConstrainer constrainer : constrainers) {
if (!constrainer.canLaunch(recoveryType)) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.mesosphere.sdk.scheduler.recovery.constrain;

import com.mesosphere.sdk.offer.LaunchOfferRecommendation;
import com.mesosphere.sdk.scheduler.recovery.RecoveryType;

import org.apache.mesos.Protos.Offer.Operation;

/**
* This interface provides methods which govern and react to Launch Operations. It is sometimes desirable to limit the
* rate of Launch Operations. In particular may be desirable to limit the rate of Operations which may be destructive
* in reaction to permanent failures. Notifications regarding launch Operations and desired RecoveryRequirements allow
* LaunchConstrainers to throttle Launch Operations.
*/
public interface LaunchConstrainer {
/**
* Invoked every time a task is launchHappened.
* <p>
* We take a {@link Operation} so that frameworks can specify additional metadata, in order to smooth the launch
* rate.
*
* @param recommendation The OfferRecommendation containing the Launch Operation which occurred
* @param recoveryType The type of the recovery which has been executed
*/
void launchHappened(LaunchOfferRecommendation recommendation, RecoveryType recoveryType);

/**
* Determines whether the given {@link RecoveryType}
* can be launchHappened right now.
*
* @param recoveryType The {@link RecoveryType} to be examined
* @return {@code true} if the offer is safe to launch immediately, false if it should wait
*/
boolean canLaunch(RecoveryType recoveryType);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.mesosphere.sdk.scheduler.recovery.constrain;

import com.mesosphere.sdk.offer.LaunchOfferRecommendation;
import com.mesosphere.sdk.scheduler.recovery.RecoveryType;

/**
* {@link LaunchConstrainer} that makes it easy to enable/disable launches for testing.
* <p>
* Defaults to allowing all launches.
*/
public class TestingLaunchConstrainer implements LaunchConstrainer {
private boolean canLaunch;

public TestingLaunchConstrainer() {
this.canLaunch = false;
}

public void setCanLaunch(boolean canLaunch) {
this.canLaunch = canLaunch;
}

@Override
public void launchHappened(LaunchOfferRecommendation recommendation, RecoveryType recoveryType) {
// Does nothing when the launch happens
}

@Override
public boolean canLaunch(RecoveryType recoveryType) {
return this.canLaunch;
}
}
Loading

0 comments on commit 1d5456b

Please sign in to comment.