Skip to content

Commit

Permalink
Apply DC filters when building the list of segments
Browse files Browse the repository at this point in the history
  • Loading branch information
adejanovski committed Oct 19, 2021
1 parent f3f80a2 commit 93c0410
Showing 1 changed file with 11 additions and 3 deletions.
Expand Up @@ -202,7 +202,8 @@ List<Segment> generateSegments(
Segment.builder()
.withBaseRange(segment.getBaseRange())
.withTokenRanges(segment.getTokenRanges())
.withReplicas(getDCsByNodeForRepairSegment(targetCluster, segment, repairUnit.getKeyspaceName()))
.withReplicas(getDCsByNodeForRepairSegment(
targetCluster, segment, repairUnit.getKeyspaceName(), repairUnit))
.build());
}

Expand All @@ -212,7 +213,8 @@ List<Segment> generateSegments(
private Map<String, String> getDCsByNodeForRepairSegment(
Cluster cluster,
Segment segment,
String keyspace) throws ReaperException {
String keyspace,
RepairUnit repairUnit) throws ReaperException {

final int maxAttempts = 2;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
Expand All @@ -222,7 +224,13 @@ private Map<String, String> getDCsByNodeForRepairSegment(
Collection<String> nodes = clusterFacade.tokenRangeToEndpoint(cluster, keyspace, segment);
Map<String, String> dcByNode = Maps.newHashMap();
nodes.forEach(node -> dcByNode.put(node, EndpointSnitchInfoProxy.create(jmxConnection).getDataCenter(node)));
return dcByNode;
if (repairUnit.getDatacenters().isEmpty()) {
return dcByNode;
} else {
return dcByNode.entrySet().stream()
.filter(entry -> repairUnit.getDatacenters().contains(entry.getValue()))
.collect(Collectors.toMap(entry -> entry.getKey(), entry -> entry.getValue()));
}
} catch (RuntimeException e) {
if (attempt < maxAttempts - 1) {
LOG.warn("Failed getting replicas for token range {}. Attempt {} of {}",
Expand Down

0 comments on commit 93c0410

Please sign in to comment.