Skip to content

Commit

Permalink
Properly resume paused runs
Browse files Browse the repository at this point in the history
  • Loading branch information
Bj0rnen committed Jan 23, 2015
1 parent b8838ed commit 953d167
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.spotify.reaper.service.SegmentGenerator;
import com.spotify.reaper.storage.IStorage;

import org.apache.cassandra.db.Column;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -168,8 +167,8 @@ public Response modifyRunState(
if (isResuming(oldState, newState)) {
return resumeRun(repairRun, table);
}
String errMsg = String.format("Transition %s->%s not supported.", newState.toString(),
oldState.toString());
String errMsg = String.format("Transition %s->%s not supported.", oldState.toString(),
newState.toString());
LOG.error(errMsg);
return Response.status(501).entity(errMsg).build();
} catch (ReaperException e) {
Expand Down Expand Up @@ -198,7 +197,7 @@ private Response startRun(RepairRun repairRun, ColumnFamily table) {
.startTime(DateTime.now())
.build(repairRun.getId());
storage.updateRepairRun(updatedRun);
RepairRunner.startNewRepairRun(storage, repairRun.getId(), jmxFactory);
RepairRunner.startRepairRun(storage, repairRun.getId(), jmxFactory);
return Response.status(Response.Status.OK).entity(new RepairRunStatus(repairRun, table))
.build();
}
Expand Down
14 changes: 5 additions & 9 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,11 @@ public static void resumeRunningRepairRuns(IStorage storage,
SegmentRunner.postpone(storage, runningSegment);
}
}
RepairRunner.startNewRepairRun(storage, repairRun.getId(), jmxConnectionFactory);
RepairRunner.startRepairRun(storage, repairRun.getId(), jmxConnectionFactory);
}
}

public static void startNewRepairRun(IStorage storage, long repairRunID,
public static void startRepairRun(IStorage storage, long repairRunID,
JmxConnectionFactory jmxConnectionFactory) {
// TODO: make sure that no more than one RepairRunner is created per RepairRun
assert null != executor : "you need to initialize the thread pool first";
Expand All @@ -91,8 +91,7 @@ public static void startNewRepairRun(IStorage storage, long repairRunID,
private final JmxConnectionFactory jmxConnectionFactory;
private JmxProxy jmxConnection;

@VisibleForTesting
RepairRunner(IStorage storage, long repairRunId, JmxConnectionFactory jmxConnectionFactory)
private RepairRunner(IStorage storage, long repairRunId, JmxConnectionFactory jmxConnectionFactory)
throws ReaperException {
this.storage = storage;
this.repairRunId = repairRunId;
Expand All @@ -106,9 +105,6 @@ public static void startNewRepairRun(IStorage storage, long repairRunID,
*/
@Override
public void run() {
// TODO: just like SegmentRunner, RepairRunner should probably be blocking.
// TODO: the best way to do that is probably to remove the Runnable interface and do everything
// TODO: in a while loop.
RepairRun.RunState state = storage.getRepairRun(repairRunId).getRunState();
LOG.debug("run() called for repair run #{} with run state {}", repairRunId, state);
switch (state) {
Expand All @@ -119,10 +115,10 @@ public void run() {
startNextSegment();
break;
case PAUSED:
// Do nothing
executor.schedule(this, retryDelayMillis, TimeUnit.MILLISECONDS);
break;
case DONE:
// Do nothing
// We're done. Let go of thread.
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void noSegmentsTest() throws InterruptedException {
// start the repair
DateTimeUtils.setCurrentMillisFixed(TIME_START);
RepairRunner.initializeThreadPool(1, 3, TimeUnit.HOURS, 30, TimeUnit.SECONDS);
RepairRunner.startNewRepairRun(storage, RUN_ID, new JmxConnectionFactory() {
RepairRunner.startRepairRun(storage, RUN_ID, new JmxConnectionFactory() {
@Override
public JmxProxy create(Optional<RepairStatusHandler> handler, String host)
throws ReaperException {
Expand Down Expand Up @@ -128,7 +128,7 @@ public void testHangingRepair() throws ReaperException, InterruptedException {
RepairRunner.initializeThreadPool(1, 500, TimeUnit.MILLISECONDS, 1, TimeUnit.MILLISECONDS);

assertEquals(storage.getRepairSegment(SEGMENT_ID).getState(), RepairSegment.State.NOT_STARTED);
RepairRunner.startNewRepairRun(storage, RUN_ID, new JmxConnectionFactory() {
RepairRunner.startRepairRun(storage, RUN_ID, new JmxConnectionFactory() {
final AtomicInteger repairAttempts = new AtomicInteger(0);

@Override
Expand Down

0 comments on commit 953d167

Please sign in to comment.