Skip to content

Commit

Permalink
Topology change resilience for incremental repair (#1235)
Browse files Browse the repository at this point in the history
Allows incremental repair to survive nodes changing IP address during the repair.
The hostID is now stored in each segment and the ip address is recomputed from it when the segment runs.
  • Loading branch information
Miles-Garnsey authored and adejanovski committed Nov 16, 2022
1 parent 8d30814 commit 1db72a5
Show file tree
Hide file tree
Showing 7 changed files with 298 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ public final class RepairSegment {
private final DateTime startTime;
private final DateTime endTime;
private final Map<String, String> replicas;
// hostID field is only ever populated for incremental repairs. For full repairs it is always null.
private final UUID hostID;


private RepairSegment(Builder builder, @Nullable UUID id) {
this.id = id;
Expand All @@ -58,6 +61,7 @@ private RepairSegment(Builder builder, @Nullable UUID id) {
this.replicas = builder.replicas != null
? ImmutableMap.copyOf(builder.replicas)
: null;
this.hostID = builder.hostID;
}

public static Builder builder(Segment tokenRange, UUID repairUnitId) {
Expand Down Expand Up @@ -129,6 +133,10 @@ public boolean hasEndTime() {
return null != endTime;
}

public UUID getHostID() {
return hostID;
}

/** Reset to NOT_STARTED state, with nulled startTime and endTime. */
public Builder reset() {
Builder builder = new Builder(this);
Expand Down Expand Up @@ -159,6 +167,7 @@ public static final class Builder {
private DateTime startTime;
private DateTime endTime;
private Map<String, String> replicas;
private UUID hostID;

private Builder() {}

Expand All @@ -170,6 +179,7 @@ private Builder(Segment tokenRange, UUID repairUnitId) {
this.failCount = 0;
this.state = State.NOT_STARTED;
this.replicas = tokenRange.getReplicas();
this.hostID = null;
}

private Builder(RepairSegment original) {
Expand All @@ -183,6 +193,7 @@ private Builder(RepairSegment original) {
startTime = original.startTime;
endTime = original.endTime;
replicas = original.replicas;
hostID = original.hostID;
}

public Builder withRunId(UUID runId) {
Expand Down Expand Up @@ -242,6 +253,12 @@ public Builder withReplicas(Map<String, String> replicas) {
return this;
}

public Builder withHostID(UUID hostID) {
this.hostID = hostID;
return this;
}


public RepairSegment build() {
// a null segmentId is a special case where the storage uses a sequence for it
Preconditions.checkNotNull(runId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,17 @@
import io.cassandrareaper.jmx.JmxProxy;

import java.math.BigInteger;

import java.util.Arrays;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -124,7 +127,7 @@ public RepairRun registerRepairRun(

// the last preparation step is to generate actual repair segments
List<RepairSegment.Builder> segmentBuilders = repairUnit.getIncrementalRepair()
? createRepairSegmentsForIncrementalRepair(nodes, repairUnit)
? createRepairSegmentsForIncrementalRepair(nodes, repairUnit, cluster, clusterFacade)
: createRepairSegments(tokenSegments, repairUnit);

RepairRun repairRun = context.storage.addRepairRun(runBuilder, segmentBuilders);
Expand Down Expand Up @@ -333,23 +336,27 @@ private static List<RepairSegment.Builder> createRepairSegments(
@VisibleForTesting
static List<RepairSegment.Builder> createRepairSegmentsForIncrementalRepair(
Map<String, RingRange> nodes,
RepairUnit repairUnit) {
RepairUnit repairUnit,
Cluster cluster,
ClusterFacade clusterFacade) throws ReaperException {

Map<String, String> endpointHostIdMap = clusterFacade.getEndpointToHostId(cluster);

List<RepairSegment.Builder> repairSegmentBuilders = Lists.newArrayList();

nodes
.entrySet()
.forEach(
range ->
repairSegmentBuilders.add(
RepairSegment.builder(
Segment.builder()
.withTokenRanges(Arrays.asList(range.getValue()))
.build(),
repairUnit.getId())
.withReplicas(Collections.emptyMap())
.withCoordinatorHost(range.getKey())));

.forEach(range -> {
RepairSegment.Builder segment = RepairSegment.builder(
Segment.builder()
.withTokenRanges(Arrays.asList(range.getValue()))
.build(),
repairUnit.getId())
.withReplicas(Collections.emptyMap())
.withCoordinatorHost(range.getKey())
.withHostID(UUID.fromString(endpointHostIdMap.get(range.getKey())));
repairSegmentBuilders.add(segment);
});
return repairSegmentBuilders;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.cassandrareaper.storage.IDistributedStorage;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -462,13 +461,24 @@ private void startNextSegment() throws ReaperException, InterruptedException {
repairRunId);

Optional<RepairSegment> nextRepairSegment = Optional.empty();
Collection<String> potentialReplicas = new HashSet<>();
final Collection<String> potentialReplicas = new HashSet<>();
for (RepairSegment segment : nextRepairSegments) {
Map<String, String> potentialReplicaMap = this.repairRunService.getDCsByNodeForRepairSegment(
cluster, segment.getTokenRange(), repairUnit.getKeyspaceName(), repairUnit);
potentialReplicas = repairUnit.getIncrementalRepair()
? Collections.singletonList(segment.getCoordinatorHost())
: potentialReplicaMap.keySet();
if (repairUnit.getIncrementalRepair()) {
Map<String, String> endpointHostIdMap = clusterFacade.getEndpointToHostId(cluster);
if (segment.getHostID() == null) {
throw new ReaperException(
String.format("No host ID for repair segment %s", segment.getId().toString())
);
}
endpointHostIdMap.entrySet().stream()
.filter(entry -> entry.getValue().equals(segment.getHostID().toString()))
.forEach(entry -> potentialReplicas.add(entry.getKey()));
} else {
potentialReplicas.addAll(potentialReplicaMap.keySet());
}
LOG.debug("Potential replicas for segment {}: {}", segment.getId(), potentialReplicas);
JmxProxy coordinator = clusterFacade.connect(cluster, potentialReplicas);
if (nodesReadyForNewRepair(coordinator, segment, potentialReplicaMap, repairRunId)) {
nextRepairSegment = Optional.of(segment);
Expand Down Expand Up @@ -533,6 +543,7 @@ private boolean nodesReadyForNewRepair(
UUID segmentId) {

Collection<String> nodes = getNodesInvolvedInSegment(dcByNode);
LOG.debug("Nodes involved in segment {}: {}", segmentId, nodes);
String dc = EndpointSnitchInfoProxy.create(coordinator).getDataCenter();
boolean requireAllHostMetrics = DatacenterAvailability.LOCAL != context.config.getDatacenterAvailability();
boolean allLocalDcHostsChecked = true;
Expand Down Expand Up @@ -577,6 +588,7 @@ private boolean nodesReadyForNewRepair(
LOG.debug("Ok to repair segment '{}' on repair run with id '{}'", segment.getId(), segment.getRunId());
return true;
} else {
LOG.debug("Couldn't get metrics for hosts {}, will retry later", unreachableNodes);
String msg = String.format(
"Postponed repair segment %s on repair run with id %s because we couldn't get %shosts metrics on %s",
segment.getId(),
Expand Down Expand Up @@ -635,7 +647,7 @@ private boolean repairSegment(final UUID segmentId, Segment segment, Collection<
String keyspace = repairUnit.getKeyspaceName();
LOG.debug("preparing to repair segment {} on run with id {}", segmentId, repairRunId);

List<String> potentialCoordinators;
List<String> potentialCoordinators = Lists.newArrayList();
if (!repairUnit.getIncrementalRepair()) {
// full repair
try {
Expand Down Expand Up @@ -675,7 +687,7 @@ private boolean repairSegment(final UUID segmentId, Segment segment, Collection<
Thread.sleep(ThreadLocalRandom.current().nextInt(10, 100) * 100);
Optional<RepairSegment> rs = context.storage.getRepairSegment(repairRunId, segmentId);
if (rs.isPresent()) {
potentialCoordinators = Arrays.asList(rs.get().getCoordinatorHost());
potentialCoordinators.addAll(segmentReplicas);
} else {
// the segment has been removed. should only happen in tests on backends that delete repair segments.
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,34 +444,34 @@ private void prepareStatements() {
.prepare(
"INSERT INTO repair_run"
+ "(id,segment_id,repair_unit_id,start_token,end_token,"
+ " segment_state,fail_count, token_ranges, replicas)"
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)")
+ " segment_state,fail_count, token_ranges, replicas,host_id)"
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
insertRepairSegmentIncrementalPrepStmt = session
.prepare(
"INSERT INTO repair_run"
+ "(id,segment_id,repair_unit_id,start_token,end_token,"
+ "segment_state,coordinator_host,fail_count,replicas)"
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)")
+ "segment_state,coordinator_host,fail_count,replicas,host_id)"
+ " VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
updateRepairSegmentPrepStmt = session
.prepare(
"INSERT INTO repair_run"
+ "(id,segment_id,segment_state,coordinator_host,segment_start_time,fail_count)"
+ " VALUES(?, ?, ?, ?, ?, ?)")
+ "(id,segment_id,segment_state,coordinator_host,segment_start_time,fail_count,host_id)"
+ " VALUES(?, ?, ?, ?, ?, ?, ?)")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
insertRepairSegmentEndTimePrepStmt = session
.prepare("INSERT INTO repair_run(id, segment_id, segment_end_time) VALUES(?, ?, ?)")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
getRepairSegmentPrepStmt = session
.prepare(
"SELECT id,repair_unit_id,segment_id,start_token,end_token,segment_state,coordinator_host,"
+ "segment_start_time,segment_end_time,fail_count, token_ranges, replicas"
+ "segment_start_time,segment_end_time,fail_count, token_ranges, replicas, host_id"
+ " FROM repair_run WHERE id = ? and segment_id = ?")
.setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM);
getRepairSegmentsByRunIdPrepStmt = session.prepare(
"SELECT id,repair_unit_id,segment_id,start_token,end_token,segment_state,coordinator_host,segment_start_time,"
+ "segment_end_time,fail_count, token_ranges, replicas FROM repair_run WHERE id = ?");
+ "segment_end_time,fail_count, token_ranges, replicas, host_id FROM repair_run WHERE id = ?");
getRepairSegmentCountByRunIdPrepStmt = session.prepare("SELECT count(*) FROM repair_run WHERE id = ?");
prepareScheduleStatements();
prepareLeaderElectionStatements(timeUdf);
Expand Down Expand Up @@ -504,7 +504,7 @@ private void prepareStatements() {
try {
getRepairSegmentsByRunIdAndStatePrepStmt = session.prepare(
"SELECT id,repair_unit_id,segment_id,start_token,end_token,segment_state,coordinator_host,"
+ "segment_start_time,segment_end_time,fail_count, token_ranges, replicas FROM repair_run "
+ "segment_start_time,segment_end_time,fail_count, token_ranges, replicas, host_id FROM repair_run "
+ "WHERE id = ? AND segment_state = ? ALLOW FILTERING");
getRepairSegmentCountByRunIdAndStatePrepStmt = session.prepare(
"SELECT count(segment_id) FROM repair_run WHERE id = ? AND segment_state = ? ALLOW FILTERING");
Expand Down Expand Up @@ -817,7 +817,10 @@ public RepairRun addRepairRun(Builder repairRun, Collection<RepairSegment.Builde
segment.getState().ordinal(),
segment.getCoordinatorHost(),
segment.getFailCount(),
segment.getReplicas()));
segment.getReplicas(),
segment.getHostID()
)
);
} else {
try {
repairRunBatch.add(
Expand All @@ -830,7 +833,10 @@ public RepairRun addRepairRun(Builder repairRun, Collection<RepairSegment.Builde
segment.getState().ordinal(),
segment.getFailCount(),
objectMapper.writeValueAsString(segment.getTokenRange().getTokenRanges()),
segment.getReplicas()));
segment.getReplicas(),
segment.getHostID()
)
);
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
Expand Down Expand Up @@ -1134,7 +1140,10 @@ public boolean updateRepairSegmentUnsafe(RepairSegment segment) {
segment.getState().ordinal(),
segment.getCoordinatorHost(),
segment.hasStartTime() ? segment.getStartTime().toDate() : null,
segment.getFailCount()));
segment.getFailCount(),
segment.getHostID()
)
);

if (null != segment.getEndTime() || State.NOT_STARTED == segment.getState()) {

Expand Down Expand Up @@ -1228,6 +1237,10 @@ private static RepairSegment createRepairSegmentFromRow(Row segmentRow) {
builder = builder.withReplicas(segmentRow.getMap("replicas", String.class, String.class));
}

if (null != segmentRow.getUUID("host_id")) {
builder = builder.withHostID(segmentRow.getUUID("host_id"));
}

return builder.withId(segmentRow.getUUID("segment_id")).build();
}

Expand Down
18 changes: 18 additions & 0 deletions src/server/src/main/resources/db/cassandra/031_add_hostID.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
--
-- Copyright 2021-2021 Datastax inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
-- Store the percent unrepaired threshold to trigger incremental repair schedules

ALTER TABLE repair_run ADD host_id uuid;
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Semaphore;

import com.datastax.driver.core.utils.UUIDs;
Expand Down Expand Up @@ -493,6 +495,12 @@ public void failIncrRepairRunCreationTest() throws ReaperException, UnknownHostE
.thenReturn((Map)ImmutableMap.of(Lists.newArrayList("0", "100"), Lists.newArrayList(NODES)));
when(clusterFacade.getCassandraVersion(any())).thenReturn("3.11.6");
when(clusterFacade.getTokens(any())).thenReturn(TOKENS);
when(clusterFacade.getEndpointToHostId(any(Cluster.class))).thenReturn(Collections.emptyMap());
Map<String, String> endpointToHostIDMap = new HashMap<String, String>();
endpointToHostIDMap.put("127.0.0.1", UUID.randomUUID().toString());
endpointToHostIDMap.put("127.0.0.2", UUID.randomUUID().toString());
endpointToHostIDMap.put("127.0.0.3", UUID.randomUUID().toString());
when(clusterFacade.getEndpointToHostId(any(Cluster.class))).thenReturn(endpointToHostIDMap);

RepairRunService repairRunService = RepairRunService.create(context, () -> clusterFacade);

Expand Down Expand Up @@ -634,7 +642,7 @@ public JmxProxy connectImpl(Node host) throws ReaperException {
}

@Test
public void createRepairSegmentsForIncrementalRepairTest() {
public void createRepairSegmentsForIncrementalRepairTest() throws ReaperException {
final String KS_NAME = "reaper";
final Set<String> CF_NAMES = Sets.newHashSet("reaper");
final boolean INCREMENTAL_REPAIR = false;
Expand All @@ -643,7 +651,7 @@ public void createRepairSegmentsForIncrementalRepairTest() {
final Set<String> BLACKLISTED_TABLES = Collections.emptySet();
final int REPAIR_THREAD_COUNT = 1;
final int segmentTimeout = 30;
Cluster cluster = Cluster.builder()
final Cluster cluster = Cluster.builder()
.withName("test_" + RandomStringUtils.randomAlphabetic(12))
.withSeedHosts(ImmutableSet.of("127.0.0.1", "127.0.0.2", "127.0.0.3"))
.withState(Cluster.State.ACTIVE)
Expand All @@ -654,6 +662,12 @@ public void createRepairSegmentsForIncrementalRepairTest() {
nodes.put("127.0.0.1", new RingRange("1", "2"));
nodes.put("127.0.0.2", new RingRange("3", "4"));

Map<String, String> endpointToHostIDMap = new HashMap<String, String>();
endpointToHostIDMap.put("127.0.0.1", UUID.randomUUID().toString());
endpointToHostIDMap.put("127.0.0.2", UUID.randomUUID().toString());
endpointToHostIDMap.put("127.0.0.3", UUID.randomUUID().toString());
ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.getEndpointToHostId(any(Cluster.class))).thenReturn(endpointToHostIDMap);
RepairUnit repairUnit = RepairUnit.builder()
.clusterName(cluster.getName())
.keyspaceName(KS_NAME)
Expand All @@ -665,9 +679,8 @@ public void createRepairSegmentsForIncrementalRepairTest() {
.repairThreadCount(REPAIR_THREAD_COUNT)
.incrementalRepair(true)
.timeout(segmentTimeout).build(UUIDs.timeBased());

List<RepairSegment.Builder> segmentBuilders
= RepairRunService.createRepairSegmentsForIncrementalRepair(nodes, repairUnit);
= RepairRunService.createRepairSegmentsForIncrementalRepair(nodes, repairUnit, cluster, clusterFacade);
assertEquals("Not enough segment builders were created", 2, segmentBuilders.size());
}

Expand Down

0 comments on commit 1db72a5

Please sign in to comment.