Skip to content

Commit

Permalink
Merge pull request #88 from spotify/zvo/parallelSegments
Browse files Browse the repository at this point in the history
Zvo/parallel segments
  • Loading branch information
varjoranta committed Apr 10, 2015
2 parents d1e4c83 + ebeaca4 commit eb3f51c
Show file tree
Hide file tree
Showing 14 changed files with 404 additions and 243 deletions.
5 changes: 5 additions & 0 deletions src/main/java/com/spotify/reaper/cassandra/JmxProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ public BigInteger apply(String s) {
});
}

public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace) {
checkNotNull(ssProxy, "Looks like the proxy is not connected");
return ssProxy.getRangeToEndpointMap(keyspace);
}

/**
* @return all hosts owning a range of tokens
*/
Expand Down
21 changes: 13 additions & 8 deletions src/main/java/com/spotify/reaper/service/RepairManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.spotify.reaper.AppContext;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.core.RepairRun;
import com.spotify.reaper.core.RepairSegment;

import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.joda.time.DateTime;
import org.slf4j.Logger;
Expand All @@ -16,14 +17,14 @@
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class RepairManager {

private static final Logger LOG = LoggerFactory.getLogger(RepairManager.class);

private ScheduledExecutorService executor;
private ListeningScheduledExecutorService executor;
private long repairTimeoutMillis;
private long retryDelayMillis;

Expand All @@ -38,8 +39,8 @@ public long getRepairTimeoutMillis() {
public void initializeThreadPool(int threadAmount, long repairTimeout,
TimeUnit repairTimeoutTimeUnit, long retryDelay,
TimeUnit retryDelayTimeUnit) {
executor = Executors
.newScheduledThreadPool(threadAmount, new NamedThreadFactory("RepairRunner"));
executor = MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(threadAmount,
new NamedThreadFactory("RepairRunner")));
repairTimeoutMillis = repairTimeoutTimeUnit.toMillis(repairTimeout);
retryDelayMillis = retryDelayTimeUnit.toMillis(retryDelay);
}
Expand Down Expand Up @@ -157,8 +158,12 @@ public void scheduleRetry(RepairRunner runner) {
executor.schedule(runner, retryDelayMillis, TimeUnit.MILLISECONDS);
}

public void scheduleNextRun(RepairRunner runner, long delay) {
executor.schedule(runner, delay, TimeUnit.MILLISECONDS);
public ListenableFuture<?> submitSegment(SegmentRunner runner) {
return executor.submit(runner);
}

public void submitNextRun(RepairRunner runner) {
executor.submit(runner);
}

public void removeRunner(RepairRunner runner) {
Expand Down
181 changes: 126 additions & 55 deletions src/main/java/com/spotify/reaper/service/RepairRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@
package com.spotify.reaper.service;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Optional;

import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.spotify.reaper.AppContext;
import com.spotify.reaper.ReaperException;
import com.spotify.reaper.cassandra.JmxProxy;
Expand All @@ -29,7 +35,13 @@
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLongArray;

import javax.annotation.Nullable;

public class RepairRunner implements Runnable {

Expand All @@ -39,7 +51,9 @@ public class RepairRunner implements Runnable {
private final long repairRunId;
private final String clusterName;
private JmxProxy jmxConnection;
private Long currentlyRunningSegmentId;
// private Long currentlyRunningSegmentId;
private final AtomicLongArray currentlyRunningSegments;
private final List<RingRange> parallelRanges;

public RepairRunner(AppContext context, long repairRunId)
throws ReaperException {
Expand All @@ -50,16 +64,71 @@ public RepairRunner(AppContext context, long repairRunId)
Optional<Cluster> cluster = context.storage.getCluster(repairRun.get().getClusterName());
assert cluster.isPresent() : "No Cluster with name " + repairRun.get().getClusterName()
+ " found from storage";
Optional<RepairUnit> repairUnit = context.storage.getRepairUnit(repairRun.get().getRepairUnitId());
assert repairUnit.isPresent() : "No RepairUnit with id " + repairRun.get().getRepairUnitId()
+ " found in storage";
this.clusterName = cluster.get().getName();
JmxProxy jmx = this.context.jmxConnectionFactory.connectAny(cluster.get());

int parallelRepairs =
getPossibleParallelRepairsCount(
jmx.getRangeToEndpointMap(repairUnit.get().getKeyspaceName()));
currentlyRunningSegments = new AtomicLongArray(parallelRepairs);
for(int i=0;i<parallelRepairs;i++) {
currentlyRunningSegments.set(i, -1);
}

parallelRanges = getParallelRanges(
parallelRepairs,
Lists.newArrayList(Collections2.transform(
context.storage.getRepairSegmentsForRun(repairRunId),
new Function<RepairSegment, RingRange>() {
@Override
public RingRange apply(RepairSegment input) {
return input.getTokenRange();
}
})));
}

public long getRepairRunId() {
return repairRunId;
}

@VisibleForTesting
public Long getCurrentlyRunningSegmentId() {
return currentlyRunningSegmentId;
public static int getPossibleParallelRepairsCount(Map<List<String>, List<String>> ranges)
throws ReaperException {
if (ranges.size() == 0) {
String msg = "Repairing 0-sized cluster.";
LOG.error(msg);
throw new ReaperException(msg);
}
return ranges.size() / ranges.values().iterator().next().size();
}

@VisibleForTesting
public static List<RingRange> getParallelRanges(int parallelRepairs, List<RingRange> segments)
throws ReaperException {
if (parallelRepairs == 0) {
String msg = "Can't repair anything with 0 threads";
LOG.error(msg);
throw new ReaperException(msg);
}

Collections.sort(segments, RingRange.startComparator);

List<RingRange> parallelRanges = Lists.newArrayList();
for (int i = 0; i < parallelRepairs - 1; i++) {
parallelRanges.add(new RingRange(
segments.get(i * segments.size() / parallelRepairs).getStart(),
segments.get((i + 1) * segments.size() / parallelRepairs).getStart()
));
}
parallelRanges.add(new RingRange(
segments.get((parallelRepairs - 1) * segments.size() / parallelRepairs).getStart(),
segments.get(0).getStart()
));

return parallelRanges;
}

/**
Expand All @@ -68,7 +137,7 @@ public Long getCurrentlyRunningSegmentId() {
@Override
public void run() {

Thread.currentThread().setName(this.clusterName);
Thread.currentThread().setName(clusterName + ":" + repairRunId);

Optional<RepairRun> repairRun = context.storage.getRepairRun(repairRunId);
try {
Expand Down Expand Up @@ -149,15 +218,40 @@ private void end() {
* Get the next segment and repair it. If there is none, we're done.
*/
private void startNextSegment() throws ReaperException {
// Currently not allowing parallel repairs.
assert
context.storage.getSegmentAmountForRepairRunWithState(repairRunId,
RepairSegment.State.RUNNING) == 0;
Optional<RepairSegment> nextSegment = context.storage.getNextFreeSegment(repairRunId);
if (nextSegment.isPresent()) {
repairSegment(nextSegment.get().getId(), nextSegment.get().getTokenRange());
} else {
end();
boolean noMoreSegments = true;
for (int rangeIndex = 0; rangeIndex < currentlyRunningSegments.length(); rangeIndex++) {
Optional<RepairSegment> nextRepairSegment =
context.storage.getNextFreeSegmentInRange(repairRunId, parallelRanges.get(rangeIndex));
if (!nextRepairSegment.isPresent()) {
LOG.debug("No repair segment available for range {}", parallelRanges.get(rangeIndex));
} else {
noMoreSegments = false;
long segmentId = nextRepairSegment.get().getId();
boolean wasSet = currentlyRunningSegments.compareAndSet(rangeIndex, -1, segmentId);
if (!wasSet) {
LOG.debug("Didn't set segment id `{}` to slot {} because it was busy", segmentId, rangeIndex);
} else {
LOG.debug("Did set segment id `{}` to slot {}", segmentId, rangeIndex);
repairSegment(rangeIndex, nextRepairSegment.get().getId(), nextRepairSegment.get().getTokenRange());
}
}
}
if (noMoreSegments) {
boolean allRangesDone = true;
for(int i=0;i<currentlyRunningSegments.length();i++) {
if (currentlyRunningSegments.get(i) != -1) {
allRangesDone = false;
}
}
if (allRangesDone) {
if (context.storage.getSegmentAmountForRepairRunWithState(repairRunId, RepairSegment.State.DONE) ==
context.storage.getSegmentAmountForRepairRun(repairRunId)) {
end();
} else {
LOG.debug("No more segments to repair, but some still running");
context.repairManager.scheduleRetry(this);
}
}
}
}

Expand All @@ -167,7 +261,7 @@ private void startNextSegment() throws ReaperException {
* @param segmentId id of the segment to repair.
* @param tokenRange token range of the segment to repair.
*/
private void repairSegment(long segmentId, RingRange tokenRange) throws ReaperException {
private void repairSegment(final int rangeIndex, final long segmentId, RingRange tokenRange) throws ReaperException {
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
RepairUnit repairUnit = context.storage.getRepairUnit(repairRun.getRepairUnitId()).get();
String keyspace = repairUnit.getKeyspaceName();
Expand All @@ -181,11 +275,11 @@ private void repairSegment(long segmentId, RingRange tokenRange) throws ReaperEx
} catch (ReaperException e) {
e.printStackTrace();
LOG.warn("Failed to reestablish JMX connection in runner #{}, retrying", repairRunId);

currentlyRunningSegments.set(rangeIndex, -1);
context.repairManager.scheduleRetry(this);
return;
}
LOG.info("successfully reestablished JMX proxy for repair runner on run id: {}", repairRunId);
LOG.debug("successfully reestablished JMX proxy for repair runner on run id: {}", repairRunId);
}

List<String> potentialCoordinators = jmxConnection.tokenRangeToEndpoint(keyspace, tokenRange);
Expand All @@ -202,12 +296,22 @@ private void repairSegment(long segmentId, RingRange tokenRange) throws ReaperEx
return;
}

currentlyRunningSegmentId = segmentId;
SegmentRunner.triggerRepair(context, segmentId, potentialCoordinators,
context.repairManager.getRepairTimeoutMillis());
currentlyRunningSegmentId = null;
SegmentRunner segmentRunner = new SegmentRunner(context, segmentId, potentialCoordinators,
context.repairManager.getRepairTimeoutMillis(), repairRun.getIntensity(), clusterName);

handleResult(segmentId);
ListenableFuture<?> segmentResult = context.repairManager.submitSegment(segmentRunner);
Futures.addCallback(segmentResult, new FutureCallback<Object>() {
@Override
public void onSuccess(Object ignored) {
handleResult(segmentId);
currentlyRunningSegments.set(rangeIndex, -1);
}
@Override
public void onFailure(Throwable t) {
currentlyRunningSegments.set(rangeIndex, -1);
LOG.error("Executing SegmentRunner failed: " + t.getMessage());
}
});
}

private void handleResult(long segmentId) {
Expand All @@ -222,13 +326,7 @@ private void handleResult(long segmentId) {
break;
case DONE:
// Successful repair
long delay = intensityBasedDelayMillis(segment);
context.repairManager.scheduleNextRun(this, delay);
String event = String.format("Waiting %ds because of intensity based delay", delay / 1000);
RepairRun updatedRepairRun =
context.storage.getRepairRun(repairRunId).get().with().lastEvent(event)
.build(repairRunId);
context.storage.updateRepairRun(updatedRepairRun);
context.repairManager.submitNextRun(this);
break;
default:
// Another thread has started a new repair on this segment already
Expand All @@ -239,31 +337,4 @@ private void handleResult(long segmentId) {
throw new RuntimeException(msg);
}
}

/**
* Calculate the delay that should be used before starting the next repair segment.
*
* @param repairSegment the last finished repair segment.
* @return the delay in milliseconds.
*/
long intensityBasedDelayMillis(RepairSegment repairSegment) {
if (repairSegment.getEndTime() == null && repairSegment.getStartTime() == null) {
return 0;
}
else if (repairSegment.getEndTime() != null && repairSegment.getStartTime() != null) {
long repairEnd = repairSegment.getEndTime().getMillis();
long repairStart = repairSegment.getStartTime().getMillis();
long repairDuration = repairEnd - repairStart;
RepairRun repairRun = context.storage.getRepairRun(repairRunId).get();
long delay = (long) (repairDuration / repairRun.getIntensity() - repairDuration);
LOG.debug("Scheduling next runner run() with delay {} ms", delay);
return delay;
} else
{
LOG.error("Segment {} returned with startTime {} and endTime {}. This should not happen."
+ "Intensity cannot apply, so next run will start immediately.",
repairSegment.getId(), repairSegment.getStartTime(), repairSegment.getEndTime());
return 0;
}
}
}
Loading

0 comments on commit eb3f51c

Please sign in to comment.