Skip to content

Commit

Permalink
Remove IStorage.getNextFreeSegment(..) as `getNextFreeSegmentInRang…
Browse files Browse the repository at this point in the history
…e(..)` is always used now.
  • Loading branch information
michaelsembwever committed Aug 23, 2017
1 parent 956b2ae commit 514e01e
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 58 deletions.
4 changes: 2 additions & 2 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ private void startNextSegment() throws ReaperException, InterruptedException {

// We have an empty slot, so let's start new segment runner if possible.
LOG.info("Running segment for range {}", parallelRanges.get(rangeIndex));
Optional<RepairSegment> nextRepairSegment =
context.storage.getNextFreeSegmentInRange(repairRunId, parallelRanges.get(rangeIndex));
Optional<RepairSegment> nextRepairSegment
= context.storage.getNextFreeSegmentInRange(repairRunId, Optional.of(parallelRanges.get(rangeIndex)));

if (!nextRepairSegment.isPresent()) {
LOG.debug("No repair segment available for range {}", parallelRanges.get(rangeIndex));
Expand Down
36 changes: 11 additions & 25 deletions src/main/java/com/spotify/reaper/storage/CassandraStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,8 @@ public Collection<RepairSegment> getRepairSegmentsForRunInLocalMode(UUID runId,
return segments;
}

private boolean segmentIsWithinRange(RepairSegment segment, RingRange range) {
private static boolean segmentIsWithinRange(RepairSegment segment, RingRange range) {
return range.encloses(new RingRange(segment.getStartToken(), segment.getEndToken()));

}

private static RepairSegment createRepairSegmentFromRow(Row segmentRow){
Expand All @@ -473,35 +472,18 @@ private static RepairSegment createRepairSegmentFromRow(Row segmentRow){
.build(segmentRow.getUUID("segment_id"));
}

public Optional<RepairSegment> getSegment(UUID runId, Optional<RingRange> range) {

@Override
public Optional<RepairSegment> getNextFreeSegmentInRange(UUID runId, Optional<RingRange> range) {
List<RepairSegment> segments = Lists.<RepairSegment>newArrayList(getRepairSegmentsForRun(runId));
Collections.shuffle(segments);

RepairSegment segment = null;
for(RepairSegment seg:segments){
if(seg.getState().equals(State.NOT_STARTED) // State condition
&& ((range.isPresent() &&
(segmentIsWithinRange(seg, range.get()))
) || !range.isPresent()) // Token range condition
){
if(takeLeadOnSegment(seg.getId())) {
segment = seg;
break;
}
if (seg.getState().equals(State.NOT_STARTED) && withinRange(seg, range) && takeLeadOnSegment(seg.getId())) {
return Optional.of(seg);
}
}
return Optional.fromNullable(segment);
}


@Override
public Optional<RepairSegment> getNextFreeSegment(UUID runId) {
return getSegment(runId, Optional.<RingRange>absent());
}

@Override
public Optional<RepairSegment> getNextFreeSegmentInRange(UUID runId, RingRange range) {
return getSegment(runId, Optional.fromNullable(range));
return Optional.absent();
}

@Override
Expand Down Expand Up @@ -816,4 +798,8 @@ public void saveHeartbeat() {
lastHeartBeat = now;
}
}

private static boolean withinRange(RepairSegment segment, Optional<RingRange> range) {
return !range.isPresent() || segmentIsWithinRange(segment, range.get());
}
}
4 changes: 1 addition & 3 deletions src/main/java/com/spotify/reaper/storage/IStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,16 +98,14 @@ Optional<RepairUnit> getRepairUnit(String cluster, String keyspace,

Collection<RepairSegment> getRepairSegmentsForRun(UUID runId);

Optional<RepairSegment> getNextFreeSegment(UUID runId);

/**
* @param runId the run id that the segment belongs to.
* @param range a ring range. The start of the range may be greater than or equal to the end.
* This case has to be handled. When start = end, consider that as a range
* that covers the whole ring.
* @return a segment enclosed by the range with state NOT_STARTED, or nothing.
*/
Optional<RepairSegment> getNextFreeSegmentInRange(UUID runId, RingRange range);
Optional<RepairSegment> getNextFreeSegmentInRange(UUID runId, Optional<RingRange> range);

Collection<RepairSegment> getSegmentsWithState(UUID runId, RepairSegment.State segmentState);

Expand Down
18 changes: 10 additions & 8 deletions src/main/java/com/spotify/reaper/storage/MemoryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,7 @@ public Collection<RepairSegment> getRepairSegmentsForRun(UUID runId) {
return repairSegmentsByRunId.get(runId).values();
}

@Override
public Optional<RepairSegment> getNextFreeSegment(UUID runId) {
private Optional<RepairSegment> getNextFreeSegment(UUID runId) {
for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) {
if (segment.getState() == RepairSegment.State.NOT_STARTED) {
return Optional.of(segment);
Expand All @@ -275,12 +274,15 @@ public Optional<RepairSegment> getNextFreeSegment(UUID runId) {
}

@Override
public Optional<RepairSegment> getNextFreeSegmentInRange(UUID runId, RingRange range) {
for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) {
if (segment.getState() == RepairSegment.State.NOT_STARTED &&
range.encloses(segment.getTokenRange())) {
return Optional.of(segment);
}
public Optional<RepairSegment> getNextFreeSegmentInRange(UUID runId, Optional<RingRange> range) {
if (range.isPresent()) {
for (RepairSegment segment : repairSegmentsByRunId.get(runId).values()) {
if (segment.getState() == RepairSegment.State.NOT_STARTED && range.get().encloses(segment.getTokenRange())) {
return Optional.of(segment);
}
}
} else {
return getNextFreeSegment(runId);
}
return Optional.absent();
}
Expand Down
31 changes: 17 additions & 14 deletions src/main/java/com/spotify/reaper/storage/PostgresStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,7 @@ public Collection<RepairSegment> getRepairSegmentsForRun(UUID runId) {
}
}

@Override
public Optional<RepairSegment> getNextFreeSegment(UUID runId) {
private Optional<RepairSegment> getNextFreeSegment(UUID runId) {
RepairSegment result;
try (Handle h = jdbi.open()) {
result = getPostgresStorage(h).getNextFreeRepairSegment(UuidUtil.toSequenceId(runId));
Expand All @@ -326,19 +325,23 @@ public Optional<RepairSegment> getNextFreeSegment(UUID runId) {
}

@Override
public Optional<RepairSegment> getNextFreeSegmentInRange(UUID runId, RingRange range) {
RepairSegment result;
try (Handle h = jdbi.open()) {
IStoragePostgreSQL storage = getPostgresStorage(h);
if (!range.isWrapping()) {
result = storage.getNextFreeRepairSegmentInNonWrappingRange(UuidUtil.toSequenceId(runId), range.getStart(),
range.getEnd());
} else {
result = storage.getNextFreeRepairSegmentInWrappingRange(UuidUtil.toSequenceId(runId), range.getStart(),
range.getEnd());
}
public Optional<RepairSegment> getNextFreeSegmentInRange(UUID runId, Optional<RingRange> range) {
if (range.isPresent()) {
RepairSegment result;
try (Handle h = jdbi.open()) {
IStoragePostgreSQL storage = getPostgresStorage(h);
if (!range.get().isWrapping()) {
result = storage.getNextFreeRepairSegmentInNonWrappingRange(UuidUtil.toSequenceId(runId), range.get().getStart(),
range.get().getEnd());
} else {
result = storage.getNextFreeRepairSegmentInWrappingRange(UuidUtil.toSequenceId(runId), range.get().getStart(),
range.get().getEnd());
}
}
return Optional.fromNullable(result);
} else {
return getNextFreeSegment(runId);
}
return Optional.fromNullable(result);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public void testHangingRepair() throws InterruptedException, ReaperException {
new RepairRun.Builder(CLUSTER_NAME, cf.getId(), DateTime.now(), INTENSITY, 1, RepairParallelism.PARALLEL),
Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ZERO, BigInteger.ONE), cf.getId())));
final UUID RUN_ID = run.getId();
final UUID SEGMENT_ID = storage.getNextFreeSegment(run.getId()).get().getId();
final UUID SEGMENT_ID = storage.getNextFreeSegmentInRange(run.getId(), Optional.absent()).get().getId();

assertEquals(storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState(),
RepairSegment.State.NOT_STARTED);
Expand Down Expand Up @@ -209,7 +209,7 @@ public void testHangingRepairNewAPI() throws InterruptedException, ReaperExcepti
new RepairRun.Builder(CLUSTER_NAME, cf.getId(), DateTime.now(), INTENSITY, 1, RepairParallelism.PARALLEL),
Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ZERO, BigInteger.ONE), cf.getId())));
final UUID RUN_ID = run.getId();
final UUID SEGMENT_ID = storage.getNextFreeSegment(run.getId()).get().getId();
final UUID SEGMENT_ID = storage.getNextFreeSegmentInRange(run.getId(), Optional.absent()).get().getId();

assertEquals(storage.getRepairSegment(RUN_ID, SEGMENT_ID).get().getState(),
RepairSegment.State.NOT_STARTED);
Expand Down Expand Up @@ -331,7 +331,7 @@ public void testResumeRepair() throws InterruptedException, ReaperException {
.repairCommandId(1337),
new RepairSegment.Builder(new RingRange(BigInteger.ONE, BigInteger.ZERO), cf)));
final UUID RUN_ID = run.getId();
final UUID SEGMENT_ID = storage.getNextFreeSegment(run.getId()).get().getId();
final UUID SEGMENT_ID = storage.getNextFreeSegmentInRange(run.getId(), Optional.absent()).get().getId();

context.repairManager.initializeThreadPool(1, 500, TimeUnit.MILLISECONDS, 1, TimeUnit.MILLISECONDS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void timeoutTest() throws InterruptedException, ReaperException, Executio
Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ONE, BigInteger.ZERO), cf.getId())));

final UUID runId = run.getId();
final UUID segmentId = context.storage.getNextFreeSegment(run.getId()).get().getId();
final UUID segmentId = context.storage.getNextFreeSegmentInRange(run.getId(), Optional.absent()).get().getId();

final ExecutorService executor = Executors.newSingleThreadExecutor();
final MutableObject<Future<?>> future = new MutableObject<>();
Expand Down Expand Up @@ -138,7 +138,7 @@ public void successTest() throws InterruptedException, ReaperException, Executio
new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5, 1, RepairParallelism.PARALLEL),
Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ONE, BigInteger.ZERO), cf.getId())));
final UUID runId = run.getId();
final UUID segmentId = storage.getNextFreeSegment(run.getId()).get().getId();
final UUID segmentId = storage.getNextFreeSegmentInRange(run.getId(), Optional.absent()).get().getId();

final ExecutorService executor = Executors.newSingleThreadExecutor();
final MutableObject<Future<?>> future = new MutableObject<>();
Expand Down Expand Up @@ -212,7 +212,7 @@ public void failureTest() throws InterruptedException, ReaperException, Executio
new RepairRun.Builder("reaper", cf.getId(), DateTime.now(), 0.5, 1, RepairParallelism.PARALLEL),
Collections.singleton(new RepairSegment.Builder(new RingRange(BigInteger.ONE, BigInteger.ZERO), cf.getId())));
final UUID runId = run.getId();
final UUID segmentId = storage.getNextFreeSegment(run.getId()).get().getId();
final UUID segmentId = storage.getNextFreeSegmentInRange(run.getId(), Optional.absent()).get().getId();

final ExecutorService executor = Executors.newSingleThreadExecutor();
final MutableObject<Future<?>> future = new MutableObject<>();
Expand Down

0 comments on commit 514e01e

Please sign in to comment.