Skip to content

Commit

Permalink
Don't exit reaper if endpointToRange is empty and segment count 0
Browse files Browse the repository at this point in the history
We have seen reaper exiting dueing this condition and this turns
it into a reaper exception instead of hitting the assertion in
computeGlobalSegmentCount.
  • Loading branch information
German Eichberger committed Apr 20, 2022
1 parent bee5dad commit 7618270
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,10 @@ List<Segment> generateSegments(
String cassandraVersion = clusterFacade.getCassandraVersion(targetCluster);

int globalSegmentCount = segmentCount;
if (globalSegmentCount == 0 && endpointToRange.isEmpty()) {
LOG.info("Couldn't get endpoints for tokens");
throw new ReaperException("Couldn't get endpoints for tokens");
}
if (globalSegmentCount == 0) {
globalSegmentCount = computeGlobalSegmentCount(segmentCountPerNode, endpointToRange);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,4 +296,87 @@ protected JmxProxy connectImpl(Node host) throws ReaperException {
assertEquals(3, segments.get(0).getReplicas().keySet().size());
assertEquals("dc1", segments.get(0).getReplicas().get("127.0.0.1"));
}

@Test(expected = ReaperException.class)
public void generateSegmentsTestEmpty() throws ReaperException, UnknownHostException {
Cluster cluster = Cluster.builder()
.withName("test_" + RandomStringUtils.randomAlphabetic(12))
.withSeedHosts(ImmutableSet.of("127.0.0.1", "127.0.0.2", "127.0.0.3"))
.withState(Cluster.State.ACTIVE)
.withPartitioner("Murmur3Partitioner")
.build();
final String KS_NAME = "reaper";
final Set<String> CF_NAMES = Sets.newHashSet("reaper");
final boolean INCREMENTAL_REPAIR = false;
final Set<String> NODES = Sets.newHashSet("127.0.0.1", "127.0.0.2", "127.0.0.3");
final Set<String> DATACENTERS = Collections.emptySet();
final Set<String> BLACKLISTED_TABLES = Collections.emptySet();
final long TIME_RUN = 41L;
final double INTENSITY = 0.5f;
final int REPAIR_THREAD_COUNT = 1;
final List<BigInteger> TOKENS = Lists.newArrayList(
BigInteger.valueOf(0L),
BigInteger.valueOf(100L),
BigInteger.valueOf(200L));
final IStorage storage = new MemoryStorage();

storage.addCluster(cluster);

RepairUnit cf = storage.addRepairUnit(
RepairUnit.builder()
.clusterName(cluster.getName())
.keyspaceName(KS_NAME)
.columnFamilies(CF_NAMES)
.incrementalRepair(INCREMENTAL_REPAIR)
.nodes(NODES)
.datacenters(DATACENTERS)
.blacklistedTables(BLACKLISTED_TABLES)
.repairThreadCount(REPAIR_THREAD_COUNT));
DateTimeUtils.setCurrentMillisFixed(TIME_RUN);

AppContext context = new AppContext();
context.storage = storage;
context.config = new ReaperApplicationConfiguration();
final Semaphore mutex = new Semaphore(0);
final JmxProxy jmx = JmxProxyTest.mockJmxProxyImpl();
when(jmx.getClusterName()).thenReturn(cluster.getName());
when(jmx.isConnectionAlive()).thenReturn(true);
when(jmx.getRangeToEndpointMap(anyString())).thenReturn(RepairRunnerTest.sixNodeCluster());
EndpointSnitchInfoMBean endpointSnitchInfoMBean = mock(EndpointSnitchInfoMBean.class);
when(endpointSnitchInfoMBean.getDatacenter()).thenReturn("dc1");
try {
when(endpointSnitchInfoMBean.getDatacenter(anyString())).thenReturn("dc1");
} catch (UnknownHostException ex) {
throw new AssertionError(ex);
}
JmxProxyTest.mockGetEndpointSnitchInfoMBean(jmx, endpointSnitchInfoMBean);

ClusterFacade clusterFacade = mock(ClusterFacade.class);
when(clusterFacade.connect(any(Cluster.class))).thenReturn(jmx);
when(clusterFacade.nodeIsAccessibleThroughJmx(any(), any())).thenReturn(true);
when(clusterFacade.tokenRangeToEndpoint(any(), anyString(), any())).thenReturn(Lists.newArrayList(NODES));
when(clusterFacade.getRangeToEndpointMap(any(), anyString()))
.thenReturn((Map)ImmutableMap.of(Lists.newArrayList("0", "100"), Collections.emptyList()));
when(clusterFacade.getCassandraVersion(any())).thenReturn("3.11.6");
when(clusterFacade.getTokens(any())).thenReturn(TOKENS);


context.jmxConnectionFactory = new JmxConnectionFactory(context, new NoopCrypotograph()) {
@Override
protected JmxProxy connectImpl(Node host) throws ReaperException {
return jmx;
}
};

RepairRunService repairRunService = RepairRunService.create(context, () -> clusterFacade);

RepairUnit unit = RepairUnit.builder()
.clusterName(cluster.getName())
.keyspaceName("test")
.blacklistedTables(Sets.newHashSet("table1"))
.incrementalRepair(false)
.repairThreadCount(4)
.build(UUIDs.timeBased());
List<Segment> segments = repairRunService.generateSegments(cluster, 0, 0, unit);
}
}

0 comments on commit 7618270

Please sign in to comment.