Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ jobs:
distribution: 'temurin'
- name: Setup Minikube
uses: medyagh/setup-minikube@latest
with:
extra-config: 'controller-manager.max-endpoints-per-slice=2'
- name: Setup Gradle
uses: gradle/actions/setup-gradle@v4
- name: Run Integration Tests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ void teardown() throws IOException {
@DisplayName("should continuously resolve all addresses of deployment behind a service")
@ParameterizedTest(name = "replicas changes = {0}")
@MethodSource(value = "testCases")
void continuouslyResolveAllAddressesTest(List<Integer> replicasHistory) throws IOException, InterruptedException {
void continuouslyResolveAllAddressesTest(List<Integer> replicasHistory) {
for (Integer replicas : replicasHistory) {
log.info("Scaling server to {} replicas", replicas);
final var serverIPs = manager.scaleServer(replicas);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,10 @@ public List<Endpoint> awaitScaledReadyDeployment(String deploymentName, int repl
.list()
.getItems()
.stream()
.findAny()
.map(EndpointSlice::getEndpoints)
.stream()
.filter(Objects::nonNull)
.flatMap(Collection::stream)
.filter(e -> e.getConditions().getReady())
.filter(e -> e != null && e.getConditions() != null && e.getConditions().getReady())
.toList(),
readyEndpoints -> (long) readyEndpoints.size() == replicasCount
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import io.github.lothar1998.kuberesolver.kubernetes.EndpointSliceWatcher;
import io.github.lothar1998.kuberesolver.kubernetes.InClusterEndpointSliceWatcher;
Expand Down Expand Up @@ -116,7 +120,7 @@ public KubernetesNameResolver(Executor executor, ResolverTarget params) throws I
@Override
public void start(Listener listener) {
this.listener = listener;
resolve();
refresh();
}

/**
Expand All @@ -143,6 +147,8 @@ private void resolve() {
*/
private void watch() {
watcher.watch(params.service(), new EndpointSliceWatcher.Subscriber() {
private final Map<String, List<Set<SocketAddress>>> endpoints = new ConcurrentHashMap<>();

@Override
public void onEvent(Event event) {
// watch event occurred
Expand All @@ -152,21 +158,48 @@ public void onEvent(Event event) {
return;
}

if (event.type().equals(EventType.DELETED)) {
LOGGER.log(Level.FINE, "EndpointSlice {0} was deleted",
new Object[]{event.endpointSlice().metadata().name()});
if (event.endpointSlice() == null) {
LOGGER.log(Level.FINE, "No EndpointSlice found in watch event");
return;
}

if (event.endpointSlice() == null) {
LOGGER.log(Level.FINE, "No EndpointSlice found in watch event");
if (event.endpointSlice().metadata() == null || event.endpointSlice().metadata().name() == null) {
LOGGER.log(Level.FINE, "No EndpointSlice name found in watch event metadata");
return;
}

if (event.type().equals(EventType.DELETED)) {
LOGGER.log(Level.FINE, "EndpointSlice {0} was deleted",
new Object[]{event.endpointSlice().metadata().name()});
endpoints.remove(event.endpointSlice().metadata().name());
return;
}

LOGGER.log(Level.FINER, "Resolving addresses for service {0}", new Object[]{params.service()});
buildAddresses(event.endpointSlice()).ifPresentOrElse(a -> listener.onAddresses(a, Attributes.EMPTY),
() -> LOGGER.log(Level.FINE, "No usable addresses found for Kubernetes service {0}",
new Object[]{params.service()}));
var endpointSliceAddresses = buildAddresses(event.endpointSlice());
if (endpointSliceAddresses.isEmpty()) {
LOGGER.log(Level.FINE, "No usable addresses found for service {0} in EndpointSlice {1}",
new Object[]{params.service(), event.endpointSlice().metadata().name()});
} else {
LOGGER.log(Level.FINEST,
() -> String.format(
"Resolved addresses for service %s from EndpointSlice %s: %s",
params.service(),
event.endpointSlice().metadata().name(),
addressGroupsToString(endpointSliceAddresses.get())
));
endpoints.put(event.endpointSlice().metadata().name(), endpointSliceAddresses.get());

var allAddresses = endpoints.values().stream()
.flatMap(List::stream)
.distinct()
.toList();

LOGGER.log(Level.FINEST, () -> String.format(
"All resolved addresses for service %s: %s",
params.service(), addressGroupsToString(allAddresses)));
listener.onAddresses(toEquivalentAddressGroups(allAddresses), Attributes.EMPTY);
}
}

@Override
Expand Down Expand Up @@ -208,17 +241,28 @@ public String getServiceAuthority() {
}

/**
* Builds a list of gRPC {@link EquivalentAddressGroup} from the given
* {@link EndpointSlice}.
* Extracts and processes network addresses from a Kubernetes {@link EndpointSlice}.
* <p>
* This method performs several key steps in the address resolution process:
* <ol>
* <li>Finds the appropriate port to use from the EndpointSlice</li>
* <li>Filters for endpoints that are in the "ready" condition</li>
* <li>Maps each endpoint's IP addresses to socket addresses using the resolved port</li>
* </ol>
* <p>
* If no suitable port can be found or if the EndpointSlice contains no ready endpoints,
* an empty Optional will be returned.
*
* @param endpointSlice the EndpointSlice to process
* @return an optional list of resolved addresses
* @param endpointSlice the Kubernetes EndpointSlice containing endpoint information
* @return an Optional containing a list of socket address sets for ready endpoints,
* or an empty Optional if no addresses could be resolved
*/
private Optional<List<EquivalentAddressGroup>> buildAddresses(EndpointSlice endpointSlice) {
private Optional<List<Set<SocketAddress>>> buildAddresses(EndpointSlice endpointSlice) {
return findPort(endpointSlice.ports())
.map(port -> endpointSlice.endpoints().stream()
.filter(endpoint -> endpoint.conditions().isReady())
.map(endpoint -> buildAddressGroup(endpoint.addresses(), port))
.filter(group -> !group.isEmpty())
.toList());
}

Expand Down Expand Up @@ -246,17 +290,77 @@ private Optional<Integer> findPort(List<EndpointPort> ports) {
}

/**
* Builds a gRPC {@link EquivalentAddressGroup} from the given addresses and
* port.
* Builds a set of socket addresses from a list of IP addresses and a port number.
* This method converts each IP address into an {@link InetSocketAddress} using the given port,
* which represents one endpoint in a Kubernetes EndpointSlice.
* <p>
* The resulting set of addresses is used in the name resolution process to provide
* gRPC clients with possible connection endpoints for the target service.
*
* @param addresses the list of addresses
* @param port the port number
* @return an {@link EquivalentAddressGroup} containing the resolved addresses
* @param addresses the list of IP addresses from a Kubernetes endpoint
* @param port the port number to use for all addresses
* @return a set of {@link SocketAddress} objects representing the endpoint addresses
*/
private EquivalentAddressGroup buildAddressGroup(List<String> addresses, int port) {
var socketAddresses = addresses.stream()
private Set<SocketAddress> buildAddressGroup(List<String> addresses, int port) {
return addresses.stream()
.map(address -> (SocketAddress) new InetSocketAddress(address, port))
.collect(Collectors.toSet());
}

/**
* Converts a list of socket address sets into a list of {@link EquivalentAddressGroup} objects.
* Each set of socket addresses is transformed into a single {@link EquivalentAddressGroup},
* which gRPC uses to represent a group of equivalent addresses for load balancing.
*
* @param addressGroups the list of socket address sets to convert
* @return a list of {@link EquivalentAddressGroup} objects, each representing one set of addresses
*/
private List<EquivalentAddressGroup> toEquivalentAddressGroups(List<Set<SocketAddress>> addressGroups) {
return addressGroups.stream()
.map(group -> new EquivalentAddressGroup(new ArrayList<>(group)))
.toList();
return new EquivalentAddressGroup(socketAddresses, Attributes.EMPTY);
}

/**
* Converts a list of socket address sets into a human-readable string representation.
* The format is a nested structure like: [(addr1, addr2), (addr3), (addr4, addr5)]
* where each set of addresses is represented as a group in parentheses.
*
* @param addressGroups the list of socket address sets to convert to string
* @return a string representation of the address groups
*/
private String addressGroupsToString(List<Set<SocketAddress>> addressGroups) {
if (addressGroups == null || addressGroups.isEmpty()) {
return "[]";
}

var result = new StringBuilder("[");

result.append("(");
boolean firstAddr = true;
for (SocketAddress address : addressGroups.get(0)) {
if (!firstAddr) {
result.append(", ");
}
result.append(address);
firstAddr = false;
}
result.append(")");

for (int i = 1; i < addressGroups.size(); i++) {
result.append(", (");
firstAddr = true;
for (SocketAddress address : addressGroups.get(i)) {
if (!firstAddr) {
result.append(", ");
}
result.append(address);
firstAddr = false;
}
result.append(")");
}

result.append("]");
return result.toString();
}
}
Loading