Skip to content

Commit

Permalink
Implement HttpCassandraManagementProxy.getEndpointToHostId() and Http…
Browse files Browse the repository at this point in the history
…CassandraManagementProxy.getLocalEndpoint()
  • Loading branch information
Miles-Garnsey committed Sep 6, 2023
1 parent 660420b commit bccdc9c
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import java.io.IOException;
import java.math.BigInteger;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
Expand Down Expand Up @@ -110,18 +111,18 @@ public String getHost() {
public List<BigInteger> getTokens() {
EndpointStates endpointStates;
try {
endpointStates = apiClient.getEndpointStates();
endpointStates = apiClient.getEndpointStates();
return Arrays.stream(endpointStates.getEntity().stream().filter(i ->
i.getOrDefault("IS_LOCAL", "false")
.equals("true")
)
.findFirst()
.orElseThrow(() -> new RuntimeException("Failed to find local endpoint"))
.get("TOKENS")
.split(","))
i.getOrDefault("IS_LOCAL", "false")
.equals("true")
)
.findFirst()
.orElseThrow(() -> new RuntimeException("Failed to find local endpoint"))
.get("TOKENS")
.split(","))
.map(strToken -> new BigInteger(strToken)).collect(Collectors.toList());

} catch (Exception e) {
} catch (ApiException e) {
LOG.error("Failed to retrieve endpoint states", e);
return Collections.emptyList();
}
Expand All @@ -135,13 +136,33 @@ public Map<List<String>, List<String>> getRangeToEndpointMap(String keyspace) th
@NotNull
@Override
public String getLocalEndpoint() throws ReaperException {
return null; // TODO: implement me.
// TODO: validate that this works in all situations. I suspect that if any address translation is
// happening we'll see failures here, but address translation is not in scope in this phase.
// The logic is that host is either a DNS address, or an IP address. If it's a DNS address, we do a
// reverse lookup to get the IP.
try {
return InetAddress.getByName(host).toString().split("/")[1];
}
catch (UnknownHostException e) {
throw new ReaperException(e);
}
}

@NotNull
@Override
public Map<String, String> getEndpointToHostId() {
return null; // TODO: implement me.
try {
return apiClient.getEndpointStates().getEntity().stream()
.collect(
Collectors.toMap(
i -> i.get("ENDPOINT_IP"),
i -> i.get("HOST_ID")
)
);
} catch (ApiException ae) {
LOG.error("Failed to retrieve endpoint states - does the HTTP proxy have connectivity?", ae);
return Collections.emptyMap();
}
}

@Override
Expand Down Expand Up @@ -262,7 +283,7 @@ public int triggerRepair(
(new com.datastax.mgmtapi.client.model.RingRange())
.start(i.getStart().longValue())
.end(i.getEnd().longValue())
).collect(Collectors.toList())
).collect(Collectors.toList())
)
);
jobId = resp.getRepairId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import io.cassandrareaper.core.Snapshot;
import io.cassandrareaper.core.Table;
import io.cassandrareaper.management.ICassandraManagementProxy;
import io.cassandrareaper.management.RepairStatusHandler;
import io.cassandrareaper.management.http.models.JobStatusTracker;

Expand Down Expand Up @@ -361,5 +362,50 @@ public void testGetTokens() throws Exception {
new BigInteger("4"));
}

@Test
public void getEndpointToHostId() throws Exception {
DefaultApi mockClient = Mockito.mock(DefaultApi.class);
List<Map<String, String>> mockEntity = new ArrayList<>();
mockEntity.add(ImmutableMap.of(
"ENDPOINT_IP", "127.0.0.1",
"HOST_ID", "fakehostID1"
)
);
mockEntity.add(ImmutableMap.of(
"ENDPOINT_IP", "127.0.0.2",
"HOST_ID", "fakehostID2"
)
);
EndpointStates mockEndpointStates = new EndpointStates().entity(mockEntity);
when(mockClient.getEndpointStates()).thenReturn(mockEndpointStates);
mockProxy(mockClient);
assertThat(mockProxy(mockClient).getEndpointToHostId()).containsAllEntriesOf(
ImmutableMap.of(
"127.0.0.1", "fakehostID1",
"127.0.0.1", "fakehostID2"
)
);
}

@Test
public void testGetLocalEndpoint() throws Exception {
DefaultApi mockClient = Mockito.mock(DefaultApi.class);
ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
when(executorService.submit(any(Callable.class))).thenAnswer(i -> {
Callable<Object> callable = i.getArgument(0);
callable.call();
return ConcurrentUtils.constantFuture(null);
});

ICassandraManagementProxy mockProxyIp = new HttpCassandraManagementProxy(
null, "/", InetSocketAddress.createUnresolved("192.168.1.1", 8080), executorService, mockClient);
assertThat(mockProxyIp.getLocalEndpoint()).isEqualTo("192.168.1.1");

ICassandraManagementProxy mockProxyDns = new HttpCassandraManagementProxy(
null, "/", InetSocketAddress.createUnresolved("localhost", 8080), executorService, mockClient);
assertThat(mockProxyDns.getLocalEndpoint()).isEqualTo("127.0.0.1");


}

}

0 comments on commit bccdc9c

Please sign in to comment.