Skip to content

Commit

Permalink
First working repair run
Browse files Browse the repository at this point in the history
  • Loading branch information
Bj0rnen committed Dec 5, 2014
1 parent a4a6c25 commit 9c149e4
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 28 deletions.
1 change: 1 addition & 0 deletions src/main/java/com/spotify/reaper/core/RepairRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public static class Builder {

public Builder(Object repairRunLock, RunState runState, DateTime creationTime,
double intensity) {
this.repairRunLock = repairRunLock;
this.runState = runState;
this.creationTime = creationTime;
this.intensity = intensity;
Expand Down
18 changes: 8 additions & 10 deletions src/main/java/com/spotify/reaper/core/RepairSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ public class RepairSegment {
private final Integer repairCommandId; // received when triggering repair in Cassandra
private final ColumnFamily columnFamily;
private final long runID;
private final Range<BigInteger> tokenRange;
private final BigInteger startToken; // open
private final BigInteger endToken; // closed
private final State state;
private final DateTime startTime;
private final DateTime endTime;
Expand All @@ -33,16 +34,12 @@ public long getRunID() {
return runID;
}

public Range<BigInteger> getTokenRange() {
return tokenRange;
}

public BigInteger getStartToken() {
return tokenRange.lowerEndpoint();
return startToken;
}

public BigInteger getEndToken() {
return tokenRange.upperEndpoint();
return endToken;
}

public State getState() {
Expand Down Expand Up @@ -80,7 +77,8 @@ private RepairSegment(Builder builder, long id) {
this.repairCommandId = builder.repairCommandId;
this.columnFamily = builder.columnFamily;
this.runID = builder.runID;
this.tokenRange = Range.openClosed(builder.startToken, builder.endToken);
this.startToken = builder.startToken;
this.endToken = builder.endToken;
this.state = builder.state;
this.startTime = builder.startTime;
this.endTime = builder.endTime;
Expand Down Expand Up @@ -133,7 +131,7 @@ public String toString() {

public String toString() {
return String.format("(%s,%s]",
tokenRange.lowerEndpoint().toString(),
tokenRange.upperEndpoint().toString());
startToken.toString(),
endToken.toString());
}
}
10 changes: 6 additions & 4 deletions src/main/java/com/spotify/reaper/resources/TableResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,7 @@ public Response addTable(@Context UriInfo uriInfo,
if (existingCluster == null) {
LOG.info("creating new cluster based on given seed host: {}", seedHost);
storage.addCluster(targetCluster);
}
else if (!existingCluster.equals(targetCluster)) {
} else if (!existingCluster.equals(targetCluster)) {
LOG.info("cluster information has changed for cluster: {}", targetCluster.getName());
storage.updateCluster(targetCluster);
}
Expand Down Expand Up @@ -127,7 +126,9 @@ else if (!existingCluster.equals(targetCluster)) {
}

// TODO: verify that the table exists in the cluster.
ColumnFamily existingTable = storage.getColumnFamily(targetCluster.getName(), keyspace.get(), table.get());
ColumnFamily
existingTable =
storage.getColumnFamily(targetCluster.getName(), keyspace.get(), table.get());
if (existingTable == null) {
LOG.info("storing new table");

Expand Down Expand Up @@ -182,7 +183,8 @@ else if (!existingCluster.equals(targetCluster)) {

if (segments == null || seedHosts.isEmpty()) {
return Response.status(404)
.entity("couldn't connect to any of the seed hosts in cluster \"" + clusterName + "\"").build();
.entity("couldn't connect to any of the seed hosts in cluster \"" + existingTable
.getCluster().getName() + "\"").build();
}
} catch (ReaperException e) {
String errMsg = "failed generating segments for new table: " + existingTable;
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ private RepairRunner(IStorage storage, RepairRun repairRun, String clusterSeedHo
public static void startNewRepairRun(IStorage storage, RepairRun repairRun,
String clusterSeedHost) {
assert null != executor : "you need to initialize the thread pool first";
LOG.info("scheduling repair for repair run #" + repairRun.getId());
executor.schedule(new RepairRunner(storage, repairRun, clusterSeedHost), 0, TimeUnit.SECONDS);
}

Expand Down Expand Up @@ -209,7 +210,7 @@ else if (currentSegment.getState() == RepairSegment.State.DONE) {
}

// TODO: should sleep time be relative to past performance?
startNextSegmentEarliest = DateTime.now().plusSeconds(5);
startNextSegmentEarliest = DateTime.now().plusSeconds(1);
}

private void changeCurrentRepairRunState(RepairRun.RunState newRunState) {
Expand Down
23 changes: 16 additions & 7 deletions src/main/java/com/spotify/reaper/service/SegmentGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,13 @@ public List<RepairSegment.Builder> generateSegments(int totalSegmentCount,
throw new ReaperException(String.format("Tokens (%s,%s) not in range of %s",
start, stop, partitioner));
}
if (start.equals(stop)) {
// TODO: fix corner case where there is only one token!
if (start.equals(stop) && tokenRangeCount != 1) {
throw new ReaperException(String.format("Tokens (%s,%s): two nodes have the same token",
start, stop));
}

BigInteger rangeSize = stop.subtract(start);
if (lowerThan(rangeSize, BigInteger.ZERO)) {
if (lowerThanOrEqual(rangeSize, BigInteger.ZERO)) {
// wrap around case
rangeSize = rangeSize.add(RANGE_SIZE);
}
Expand Down Expand Up @@ -131,24 +130,34 @@ protected boolean inRange(BigInteger token) {
}

@VisibleForTesting
protected static BigInteger max(BigInteger a, BigInteger b) {
public static BigInteger max(BigInteger a, BigInteger b) {
return greaterThan(a, b) ? a : b;
}

@VisibleForTesting
protected static BigInteger min(BigInteger a, BigInteger b) {
public static BigInteger min(BigInteger a, BigInteger b) {
return lowerThan(a, b) ? a : b;
}

@VisibleForTesting
protected static boolean lowerThan(BigInteger a, BigInteger b) {
public static boolean lowerThan(BigInteger a, BigInteger b) {
return a.compareTo(b) < 0;
}

@VisibleForTesting
protected static boolean greaterThan(BigInteger a, BigInteger b) {
public static boolean lowerThanOrEqual(BigInteger a, BigInteger b) {
return a.compareTo(b) <= 0;
}

@VisibleForTesting
public static boolean greaterThan(BigInteger a, BigInteger b) {
return a.compareTo(b) > 0;
}

@VisibleForTesting
public static boolean greaterThanOrEqual(BigInteger a, BigInteger b) {
return a.compareTo(b) >= 0;
}


}
2 changes: 1 addition & 1 deletion src/main/java/com/spotify/reaper/storage/IStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,5 @@ public interface IStorage {

RepairSegment getRepairSegment(long id);
RepairSegment getNextFreeSegment(long runId);
RepairSegment getNextFreeSegmentInRange(long runId, Range<BigInteger> range);
RepairSegment getNextFreeSegmentInRange(long runId, BigInteger start, BigInteger end);
}
26 changes: 22 additions & 4 deletions src/main/java/com/spotify/reaper/storage/MemoryStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.spotify.reaper.core.ColumnFamily;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;
import com.spotify.reaper.service.SegmentGenerator;

import java.math.BigInteger;
import java.util.Collection;
Expand All @@ -27,8 +28,7 @@ public class MemoryStorage implements IStorage {
private ConcurrentMap<Long, ColumnFamily> columnFamilies = Maps.newConcurrentMap();
private ConcurrentMap<TableName, ColumnFamily> columnFamiliesByName = Maps.newConcurrentMap();
private ConcurrentMap<Long, RepairSegment> repairSegments = Maps.newConcurrentMap();
private ConcurrentMap<Long, Collection<RepairSegment>>
repairSegmentsByRunId =
private ConcurrentMap<Long, Collection<RepairSegment>> repairSegmentsByRunId =
Maps.newConcurrentMap();

public static class TableName {
Expand Down Expand Up @@ -137,6 +137,8 @@ public Collection<RepairSegment> addRepairSegments(Collection<RepairSegment.Buil
repairSegments.put(newRepairSegment.getId(), newRepairSegment);
newSegments.add(newRepairSegment);
}
// TODO: (bj0rn) this is very ugly, the function should probably take runId.
repairSegmentsByRunId.put(newSegments.iterator().next().getRunID(), newSegments);
return newSegments;
}

Expand Down Expand Up @@ -165,11 +167,27 @@ public RepairSegment getNextFreeSegment(long runId) {
return null;
}


public static boolean encloses(BigInteger rangeStart, BigInteger rangeEnd,
BigInteger segmentStart, BigInteger segmentEnd) {
// TODO: unit test for this
if (SegmentGenerator.lowerThanOrEqual(rangeStart, rangeEnd)) {
return SegmentGenerator.greaterThanOrEqual(segmentStart, rangeStart) &&
SegmentGenerator.lowerThanOrEqual(segmentEnd, rangeEnd);
} else if (SegmentGenerator.lowerThanOrEqual(segmentStart, segmentEnd)) {
return SegmentGenerator.greaterThanOrEqual(segmentStart, rangeStart) ||
SegmentGenerator.lowerThanOrEqual(segmentEnd, rangeEnd);
} else {
return SegmentGenerator.greaterThanOrEqual(segmentStart, rangeStart) &&
SegmentGenerator.lowerThanOrEqual(segmentEnd, rangeEnd);
}
}

@Override
public RepairSegment getNextFreeSegmentInRange(long runId, Range<BigInteger> range) {
public RepairSegment getNextFreeSegmentInRange(long runId, BigInteger start, BigInteger end) {
for (RepairSegment segment : repairSegmentsByRunId.get(runId)) {
if (segment.getState() == RepairSegment.State.NOT_STARTED &&
range.encloses(segment.getTokenRange())) {
encloses(start, end, segment.getStartToken(), segment.getEndToken())) {
return segment;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public RepairSegment getNextFreeSegment(long runId) {
}

@Override
public RepairSegment getNextFreeSegmentInRange(long runId, Range<BigInteger> range) {
public RepairSegment getNextFreeSegmentInRange(long runId, BigInteger start, BigInteger end) {
// TODO: implementation
return null;
}
Expand Down

0 comments on commit 9c149e4

Please sign in to comment.