Skip to content

Commit

Permalink
Allow repair schedules to be adaptive
Browse files Browse the repository at this point in the history
Adaptive schedules will be updated at the end of each repair if the run metrics require it. The schedule can get a higher/lower number of segments or a higher segment timeout depending on the latest run.
  • Loading branch information
adejanovski committed Sep 21, 2021
1 parent 14e70de commit 699b855
Show file tree
Hide file tree
Showing 34 changed files with 773 additions and 155 deletions.
Expand Up @@ -561,6 +561,9 @@ public static final class AutoSchedulingConfiguration {
@JsonProperty
private List<String> excludedClusters = Collections.emptyList();

@JsonProperty
private Boolean adaptive;

public Boolean isEnabled() {
return enabled;
}
Expand Down Expand Up @@ -621,6 +624,14 @@ public List<String> getExcludedClusters() {
return excludedClusters;
}

public Boolean isAdaptive() {
return adaptive == null ? false : adaptive;
}

public void setAdaptive(Boolean adaptive) {
this.adaptive = adaptive;
}

@Override
public String toString() {
return "AutoSchedulingConfiguration{"
Expand All @@ -634,6 +645,8 @@ public String toString() {
+ timeBeforeFirstSchedule
+ ", scheduleSpreadPeriod="
+ scheduleSpreadPeriod
+ ", adaptive="
+ adaptive
+ '}';
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/server/src/main/java/io/cassandrareaper/core/RepairRun.java
Expand Up @@ -48,6 +48,7 @@ public final class RepairRun implements Comparable<RepairRun> {
private final int segmentCount;
private final RepairParallelism repairParallelism;
private final Set<String> tables;
private final boolean adaptiveSchedule;

private RepairRun(Builder builder, UUID id) {
this.id = id;
Expand All @@ -65,6 +66,7 @@ private RepairRun(Builder builder, UUID id) {
this.segmentCount = builder.segmentCount;
this.repairParallelism = builder.repairParallelism;
this.tables = builder.tables;
this.adaptiveSchedule = builder.adaptiveSchedule;
}

public static Builder builder(String clusterName, UUID repairUnitId) {
Expand Down Expand Up @@ -131,6 +133,10 @@ public Set<String> getTables() {
return tables;
}

public Boolean getAdaptiveSchedule() {
return adaptiveSchedule;
}

public Builder with() {
return new Builder(this);
}
Expand Down Expand Up @@ -208,6 +214,8 @@ public static final class Builder {
private Integer segmentCount;
private RepairParallelism repairParallelism;
private Set<String> tables;
private boolean adaptiveSchedule;


private Builder(String clusterName, UUID repairUnitId) {
this.clusterName = clusterName;
Expand All @@ -229,6 +237,7 @@ private Builder(RepairRun original) {
segmentCount = original.segmentCount;
repairParallelism = original.repairParallelism;
tables = original.tables;
adaptiveSchedule = original.adaptiveSchedule;
}

public Builder runState(RunState runState) {
Expand Down Expand Up @@ -294,6 +303,11 @@ public Builder tables(Set<String> tables) {
return this;
}

public Builder adaptiveSchedule(boolean adaptive) {
this.adaptiveSchedule = adaptive;
return this;
}

public RepairRun build(UUID id) {
Preconditions.checkState(null != repairParallelism, "repairParallelism(..) must be called before build(..)");
Preconditions.checkState(null != intensity, "intensity(..) must be called before build(..)");
Expand Down
Expand Up @@ -36,9 +36,12 @@ public final class RepairSchedule extends EditableRepairSchedule {
private final State state;
private final DateTime nextActivation;
private final ImmutableList<UUID> runHistory;
@Deprecated private final int segmentCount;
private final RepairParallelism repairParallelism;
private final double intensity;
private final DateTime creationTime;
private final DateTime pauseTime;
private final int segmentCountPerNode;
private final boolean adaptive;

private RepairSchedule(Builder builder, UUID id) {
this.id = id;
Expand All @@ -47,13 +50,13 @@ private RepairSchedule(Builder builder, UUID id) {
this.daysBetween = builder.daysBetween;
this.nextActivation = builder.nextActivation;
this.runHistory = builder.runHistory;
this.segmentCount = builder.segmentCount;
this.repairParallelism = builder.repairParallelism;
this.intensity = builder.intensity;
this.creationTime = builder.creationTime;
this.owner = builder.owner;
this.pauseTime = builder.pauseTime;
this.segmentCountPerNode = builder.segmentCountPerNode;
this.adaptive = builder.adaptive;
}

public static Builder builder(UUID repairUnitId) {
Expand Down Expand Up @@ -92,8 +95,16 @@ public LongCollectionSqlType getRunHistorySql() {
return new LongCollectionSqlType(list);
}

public int getSegmentCount() {
return segmentCount;
public Integer getSegmentCountPerNode() {
return segmentCountPerNode;
}

public RepairParallelism getRepairParallelism() {
return repairParallelism;
}

public Double getIntensity() {
return intensity;
}

public DateTime getCreationTime() {
Expand All @@ -108,6 +119,10 @@ public DateTime getPauseTime() {
return pauseTime;
}

public boolean getAdaptive() {
return adaptive;
}

public Builder with() {
return new Builder(this);
}
Expand All @@ -130,14 +145,13 @@ public static final class Builder {
private Integer daysBetween;
private DateTime nextActivation;
private ImmutableList<UUID> runHistory = ImmutableList.<UUID>of();
@Deprecated private int segmentCount = 0;
private RepairParallelism repairParallelism;
private Double intensity;
private DateTime creationTime = DateTime.now();
private String owner = "";
private DateTime pauseTime;
private Integer segmentCountPerNode;
private boolean majorCompaction = false;
private boolean adaptive = false;

private Builder(UUID repairUnitId) {
this.repairUnitId = repairUnitId;
Expand All @@ -149,14 +163,14 @@ private Builder(RepairSchedule original) {
daysBetween = original.daysBetween;
nextActivation = original.nextActivation;
runHistory = original.runHistory;
segmentCount = original.segmentCount;
repairParallelism = original.repairParallelism;
intensity = original.intensity;
creationTime = original.creationTime;
owner = original.owner;
pauseTime = original.pauseTime;
intensity = original.intensity;
segmentCountPerNode = original.segmentCountPerNode;
adaptive = original.adaptive;
}

public Builder state(State state) {
Expand All @@ -179,11 +193,6 @@ public Builder runHistory(ImmutableList<UUID> runHistory) {
return this;
}

public Builder segmentCount(int segmentCount) {
this.segmentCount = segmentCount;
return this;
}

public Builder repairParallelism(RepairParallelism repairParallelism) {
this.repairParallelism = repairParallelism;
return this;
Expand Down Expand Up @@ -214,6 +223,11 @@ public Builder segmentCountPerNode(int segmentCountPerNode) {
return this;
}

public Builder adaptive(boolean adaptive) {
this.adaptive = adaptive;
return this;
}

public RepairSchedule build(UUID id) {
Preconditions.checkState(null != daysBetween, "daysBetween(..) must be called before build(..)");
Preconditions.checkState(null != nextActivation, "nextActivation(..) must be called before build(..)");
Expand Down
Expand Up @@ -199,7 +199,6 @@ public Response addRepairRun(
LOG.error(ex.getMessage(), ex);
return Response.status(Response.Status.NOT_FOUND).entity(ex.getMessage()).build();
}

int timeout = timeoutParam.orElse(context.config.getHangingRepairTimeoutMins());
boolean force = (forceParam.isPresent() ? Boolean.parseBoolean(forceParam.get()) : false);

Expand Down Expand Up @@ -245,10 +244,10 @@ public Response addRepairRun(
theRepairUnit,
cause,
owner.get(),
0,
segments,
parallelism,
intensity);
intensity,
false);

return Response.created(buildRepairRunUri(uriInfo, newRepairRun))
.entity(new RepairRunStatus(newRepairRun, theRepairUnit, 0))
Expand Down
Expand Up @@ -166,7 +166,8 @@ public Response addRepairSchedule(
@QueryParam("blacklistedTables") Optional<String> blacklistedTableNamesParam,
@QueryParam("repairThreadCount") Optional<Integer> repairThreadCountParam,
@QueryParam("force") Optional<String> forceParam,
@QueryParam("timeout") Optional<Integer> timeoutParam) {
@QueryParam("timeout") Optional<Integer> timeoutParam,
@QueryParam("adaptive") Optional<String> adaptiveParam) {

try {
Response possibleFailResponse = RepairRunResource.checkRequestForAddRepair(
Expand Down Expand Up @@ -267,6 +268,7 @@ public Response addRepairSchedule(
boolean force = (forceParam.isPresent() ? Boolean.parseBoolean(forceParam.get()) : false);

int timeout = timeoutParam.orElse(context.config.getHangingRepairTimeoutMins());
boolean adaptive = (adaptiveParam.isPresent() ? Boolean.parseBoolean(adaptiveParam.get()) : false);

RepairUnit.Builder unitBuilder = RepairUnit.builder()
.clusterName(cluster.getName())
Expand All @@ -290,7 +292,8 @@ public Response addRepairSchedule(
nextActivation,
getSegmentCount(segmentCountPerNode),
getIntensity(intensityStr),
force);
force,
adaptive);

} catch (ReaperException e) {
LOG.error(e.getMessage(), e);
Expand All @@ -309,7 +312,8 @@ private Response addRepairSchedule(
DateTime next,
int segments,
Double intensity,
boolean force) {
boolean force,
boolean adaptive) {

Optional<RepairSchedule> conflictingRepairSchedule
= repairScheduleService.identicalRepairUnit(cluster, unitBuilder);
Expand Down Expand Up @@ -351,7 +355,7 @@ private Response addRepairSchedule(
.checkState(unit.getIncrementalRepair() == incremental, "%s!=%s", unit.getIncrementalRepair(), incremental);

RepairSchedule newRepairSchedule = repairScheduleService
.storeNewRepairSchedule(cluster, unit, days, next, owner, segments, parallel, intensity, force);
.storeNewRepairSchedule(cluster, unit, days, next, owner, segments, parallel, intensity, force, adaptive);

return Response.created(buildRepairScheduleUri(uriInfo, newRepairSchedule)).build();
}
Expand Down
Expand Up @@ -115,6 +115,9 @@ public final class RepairRunStatus {
@JsonProperty("segment_timeout")
private int segmentTimeout;

@JsonProperty("adaptive_schedule")
private boolean adaptiveSchedule;


/**
* Default public constructor Required for Jackson JSON parsing.
Expand Down Expand Up @@ -145,7 +148,8 @@ public RepairRunStatus(
Collection<String> blacklistedTables,
int repairThreadCount,
UUID repairUnitId,
int segmentTimeout) {
int segmentTimeout,
boolean adaptiveSchedule) {

this.id = runId;
this.cause = cause;
Expand All @@ -171,6 +175,7 @@ public RepairRunStatus(
this.blacklistedTables = blacklistedTables;
this.repairThreadCount = repairThreadCount;
this.segmentTimeout = segmentTimeout;
this.adaptiveSchedule = adaptiveSchedule;

if (startTime == null) {
duration = null;
Expand Down Expand Up @@ -232,7 +237,8 @@ public RepairRunStatus(RepairRun repairRun, RepairUnit repairUnit, int segmentsR
repairUnit.getBlacklistedTables(),
repairUnit.getRepairThreadCount(),
repairRun.getRepairUnitId(),
repairUnit.getTimeout());
repairUnit.getTimeout(),
repairRun.getAdaptiveSchedule());
}

@JsonProperty("creation_time")
Expand Down Expand Up @@ -510,4 +516,11 @@ public void setRepairUnitId(UUID repairUnitId) {
this.repairUnitId = repairUnitId;
}

public boolean getAdaptiveSchedule() {
return adaptiveSchedule;
}

public void setAdaptiveSchedule(boolean adaptiveSchedule) {
this.adaptiveSchedule = adaptiveSchedule;
}
}

0 comments on commit 699b855

Please sign in to comment.