Skip to content

Commit

Permalink
remove fan out in repair runner scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
varjoranta committed Aug 11, 2015
1 parent af6faf1 commit 572c25f
Showing 1 changed file with 10 additions and 20 deletions.
30 changes: 10 additions & 20 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private void startNextSegment() {
// Just checking that no currently running segment runner is stuck.
RepairSegment supposedlyRunningSegment =
context.storage.getRepairSegment(currentlyRunningSegments.get(rangeIndex)).get();
if (supposedlyRunningSegment.getState() != RepairSegment.State.RUNNING) {
if (supposedlyRunningSegment.getState() == RepairSegment.State.DONE) {
LOG.warn("Segment #{} supposedly running in slot {} has state: {}",
supposedlyRunningSegment.getId(), rangeIndex,
supposedlyRunningSegment.getState().toString());
Expand Down Expand Up @@ -253,22 +253,16 @@ private void startNextSegment() {
}
}

if (!repairStarted) {
if (!anythingRunningStill) {
int amountDone = context.storage
.getSegmentAmountForRepairRunWithState(repairRunId, RepairSegment.State.DONE);
if (amountDone == context.storage.getSegmentAmountForRepairRun(repairRunId)) {
endRepairRun();
} else {
LOG.debug("No more segments to repair, but some still running");
context.repairManager.scheduleRetry(this);
}
} else {
// There are segments still running, but as we didn't start a new repair,
// we need to schedule next run here.
context.repairManager.scheduleRetry(this);
if (!repairStarted && !anythingRunningStill) {
int amountDone = context.storage
.getSegmentAmountForRepairRunWithState(repairRunId, RepairSegment.State.DONE);
if (amountDone == context.storage.getSegmentAmountForRepairRun(repairRunId)) {
endRepairRun();
// RepairRun ended, so no more scheduling required.
return;
}
}
context.repairManager.scheduleRetry(this);
}

/**
Expand Down Expand Up @@ -342,9 +336,6 @@ private void repairSegment(final int rangeIndex, final long segmentId, RingRange
@Override
public void onSuccess(Object ignored) {
currentlyRunningSegments.set(rangeIndex, -1);
// NOTE: notice that this will branch out the amount of activations
// for this runner by the amount of slots we have,
// as the handleResult will schedule new runs for this runner (one per slot).
handleResult(segmentId);
}

Expand All @@ -362,15 +353,14 @@ private void handleResult(long segmentId) {
LOG.debug("In repair run #{}, triggerRepair on segment {} ended with state {}",
repairRunId, segmentId, segmentState);

// Don't do rescheduling here, not to spawn uncontrolled amount of threads
switch (segmentState) {
case NOT_STARTED:
// Unsuccessful repair
context.repairManager.scheduleRetry(this);
break;

case DONE:
// Successful repair
context.repairManager.submitNextRun(this);
break;

default:
Expand Down

0 comments on commit 572c25f

Please sign in to comment.