Skip to content

Commit

Permalink
Lazily compute list of nodes with ongoing repairs
Browse files Browse the repository at this point in the history
Only force when a node refuses to repair specifically because there's alreay a repair happening there.
  • Loading branch information
Bj0rnen committed Nov 10, 2015
1 parent af24e8a commit 512e2b8
Showing 1 changed file with 22 additions and 10 deletions.
32 changes: 22 additions & 10 deletions src/main/java/com/spotify/reaper/service/SegmentRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.cassandra.repair.RepairParallelism;
import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.utils.SimpleCondition;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.LazyInitializer;
import org.joda.time.DateTime;
import org.joda.time.Seconds;
import org.slf4j.Logger;
Expand Down Expand Up @@ -152,14 +154,20 @@ private void runRepair() {
segmentRunners.put(segmentId, this);

RepairUnit repairUnit = context.storage.getRepairUnit(segment.getRepairUnitId()).get();
String keyspace = repairUnit.getKeyspaceName();

Collection<RepairParameters> ongoingRepairs =
context.storage.getOngoingRepairsInCluster(clusterName);
Set<String> busyHosts = Sets.newHashSet();
for (RepairParameters ongoingRepair : ongoingRepairs) {
busyHosts.addAll(coordinator.tokenRangeToEndpoint(keyspace, ongoingRepair.tokenRange));
}
final String keyspace = repairUnit.getKeyspaceName();

LazyInitializer<Set<String>> busyHosts = new LazyInitializer<Set<String>>() {
@Override
protected Set<String> initialize() {
Collection<RepairParameters> ongoingRepairs =
context.storage.getOngoingRepairsInCluster(clusterName);
Set<String> busyHosts = Sets.newHashSet();
for (RepairParameters ongoingRepair : ongoingRepairs) {
busyHosts.addAll(coordinator.tokenRangeToEndpoint(keyspace, ongoingRepair.tokenRange));
}
return busyHosts;
}
};
if (!canRepair(segment, keyspace, coordinator, busyHosts)) {
postponeCurrentSegment();
return;
Expand Down Expand Up @@ -226,7 +234,7 @@ private void runRepair() {
}

boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
Set<String> busyHosts) {
LazyInitializer<Set<String>> busyHosts) {
Collection<String> allHosts;
try {
// when hosts are coming up or going down, this method can throw an
Expand Down Expand Up @@ -258,7 +266,7 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
+ "already involved in a repair", segmentId, hostProxy.getHost());
String msg = "Postponed due to affected hosts already doing repairs";
repairRunner.updateLastEvent(msg);
if (!busyHosts.contains(hostName)) {
if (!busyHosts.get().contains(hostName)) {
LOG.warn("A host ({}) reported that it is involved in a repair, but there is no record "
+ "of any ongoing repair involving the host. Sending command to abort all repairs "
+ "on the host.", hostProxy.getHost());
Expand All @@ -280,6 +288,10 @@ boolean canRepair(RepairSegment segment, String keyspace, JmxProxy coordinator,
repairRunner.updateLastEvent(msg);
LOG.warn("Open files amount for process: " + getOpenFilesAmount());
return false;
} catch (ConcurrentException e) {
LOG.warn("Exception thrown while listing all nodes in cluster \"{}\" with ongoing repairs: "
+ "{}", clusterName, e);
return false;
}
}
LOG.info("It is ok to repair segment '{}' on repair run with id '{}'",
Expand Down

0 comments on commit 512e2b8

Please sign in to comment.