diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ClusterEntry.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ClusterEntry.java index cc46d949662..ea09f43ca4a 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ClusterEntry.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/ClusterEntry.java @@ -21,6 +21,9 @@ import java.util.List; import java.util.concurrent.CompletableFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; @@ -35,8 +38,11 @@ final class ClusterEntry implements AsyncCloseable { + private static final Logger logger = LoggerFactory.getLogger(ClusterEntry.class); + private final EndpointsPool endpointsPool; - private final LoadBalancer loadBalancer = new SubsetLoadBalancer(); + @Nullable + private volatile LoadBalancer loadBalancer; private final ClusterManager clusterManager; private final EventExecutor eventExecutor; private List endpoints = ImmutableList.of(); @@ -51,6 +57,10 @@ final class ClusterEntry implements AsyncCloseable { @Nullable Endpoint selectNow(ClientRequestContext ctx) { + final LoadBalancer loadBalancer = this.loadBalancer; + if (loadBalancer == null) { + return null; + } return loadBalancer.selectNow(ctx); } @@ -64,9 +74,16 @@ void updateClusterSnapshot(ClusterSnapshot clusterSnapshot) { void accept(ClusterSnapshot clusterSnapshot, List endpoints) { assert eventExecutor.inEventLoop(); - this.endpoints = endpoints; - final PrioritySet prioritySet = new PrioritySet(endpoints, clusterSnapshot); - loadBalancer.prioritySetUpdated(prioritySet); + this.endpoints = ImmutableList.copyOf(endpoints); + final PrioritySet prioritySet = new PriorityStateManager(clusterSnapshot, endpoints).build(); + if (logger.isTraceEnabled()) { + logger.trace("XdsEndpointGroup is using a new PrioritySet({})", prioritySet); + } + if (clusterSnapshot.xdsResource().resource().hasLbSubsetConfig()) { + loadBalancer = new SubsetLoadBalancer(prioritySet); + } else { + loadBalancer = new DefaultLoadBalancer(prioritySet); + } clusterManager.notifyListeners(); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/DefaultLbStateFactory.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/DefaultLbStateFactory.java new file mode 100644 index 00000000000..a7eb723996c --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/DefaultLbStateFactory.java @@ -0,0 +1,396 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import java.util.Collections; +import java.util.Map; +import java.util.SortedSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Streams; +import com.google.common.math.IntMath; +import com.google.common.math.LongMath; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.client.endpoint.DefaultLoadBalancer.DistributeLoadState; +import com.linecorp.armeria.xds.client.endpoint.DefaultLoadBalancer.HostAvailability; +import com.linecorp.armeria.xds.client.endpoint.DefaultLoadBalancer.PriorityAndAvailability; + +import it.unimi.dsi.fastutil.ints.Int2IntMap; +import it.unimi.dsi.fastutil.ints.Int2IntMaps; +import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap; + +final class DefaultLbStateFactory { + + private static final Logger logger = LoggerFactory.getLogger(DefaultLbStateFactory.class); + + static DefaultLbState newInstance(PrioritySet prioritySet) { + PerPriorityLoad perPriorityLoad = calculatePerPriorityLoad(prioritySet); + final PerPriorityPanic perPriorityPanic = + recalculatePerPriorityPanic(prioritySet, + perPriorityLoad.normalizedTotalAvailability()); + + logger.debug("XdsEndpointGroup load balancer priorities for cluster({}) has been updated with" + + " perPriorityLoad({}), perPriorityPanic({}).", + prioritySet.cluster().getName(), perPriorityLoad, perPriorityPanic); + + if (perPriorityPanic.totalPanic()) { + perPriorityLoad = recalculateLoadInTotalPanic(prioritySet); + logger.debug("XdsEndpointGroup load balancer in panic for cluster({}) with perPriorityLoad({}).", + prioritySet.cluster().getName(), perPriorityLoad); + } + return new DefaultLbState(prioritySet, perPriorityLoad, perPriorityPanic); + } + + private static PerPriorityLoad calculatePerPriorityLoad(PrioritySet prioritySet) { + final Int2IntMap perPriorityHealth = new Int2IntOpenHashMap(prioritySet.priorities().size()); + final Int2IntMap perPriorityDegraded = new Int2IntOpenHashMap(prioritySet.priorities().size()); + for (int priority: prioritySet.priorities()) { + final HealthAndDegraded healthAndDegraded = + recalculatePerPriorityState(priority, prioritySet); + perPriorityHealth.put(priority, healthAndDegraded.healthWeight); + perPriorityDegraded.put(priority, healthAndDegraded.degradedWeight); + } + return buildLoads(prioritySet, + Int2IntMaps.unmodifiable(perPriorityHealth), + Int2IntMaps.unmodifiable(perPriorityDegraded)); + } + + private static HealthAndDegraded recalculatePerPriorityState( + int priority, PrioritySet prioritySet) { + final HostSet hostSet = prioritySet.hostSets().get(priority); + final int hostCount = hostSet.hosts().size(); + + if (hostCount <= 0) { + return HealthAndDegraded.ZERO; + } + + long healthyWeight = 0; + long degradedWeight = 0; + long totalWeight = 0; + if (hostSet.weightedPriorityHealth()) { + for (Endpoint host : hostSet.healthyHosts()) { + healthyWeight += host.weight(); + } + for (Endpoint host : hostSet.degradedHosts()) { + degradedWeight += host.weight(); + } + for (Endpoint host : hostSet.hosts()) { + totalWeight += host.weight(); + } + } else { + healthyWeight = hostSet.healthyHosts().size(); + degradedWeight = hostSet.degradedHosts().size(); + totalWeight = hostCount; + } + final int health = (int) Math.min(100L, LongMath.saturatedMultiply( + hostSet.overProvisioningFactor(), healthyWeight) / totalWeight); + final int degraded = (int) Math.min(100L, LongMath.saturatedMultiply( + hostSet.overProvisioningFactor(), degradedWeight) / totalWeight); + return new HealthAndDegraded(health, degraded); + } + + private static PerPriorityLoad buildLoads(PrioritySet prioritySet, + Map perPriorityHealth, + Map perPriorityDegraded) { + final int normalizedTotalAvailability = + normalizedTotalAvailability(perPriorityHealth, perPriorityDegraded); + if (normalizedTotalAvailability == 0) { + return PerPriorityLoad.INVALID; + } + + final Map healthyPriorityLoad = new Int2IntOpenHashMap(); + final Map degradedPriorityLoad = new Int2IntOpenHashMap(); + final DistributeLoadState firstHealthyAndRemaining = + distributeLoad(prioritySet.priorities(), healthyPriorityLoad, perPriorityHealth, + 100, normalizedTotalAvailability); + final DistributeLoadState firstDegradedAndRemaining = + distributeLoad(prioritySet.priorities(), degradedPriorityLoad, perPriorityDegraded, + firstHealthyAndRemaining.totalLoad, normalizedTotalAvailability); + final int remainingLoad = firstDegradedAndRemaining.totalLoad; + if (remainingLoad > 0) { + final int firstHealthy = firstHealthyAndRemaining.firstAvailablePriority; + final int firstDegraded = firstDegradedAndRemaining.firstAvailablePriority; + if (firstHealthy != -1) { + healthyPriorityLoad.computeIfPresent(firstHealthy, (k, v) -> v + remainingLoad); + } else { + assert firstDegraded != -1; + degradedPriorityLoad.computeIfPresent(firstDegraded, (k, v) -> v + remainingLoad); + } + } + + assert priorityLoadSum(healthyPriorityLoad, degradedPriorityLoad) == 100; + return new PerPriorityLoad(healthyPriorityLoad, degradedPriorityLoad, + normalizedTotalAvailability); + } + + private static int normalizedTotalAvailability(Map perPriorityHealth, + Map perPriorityDegraded) { + final int totalAvailability = Streams.concat(perPriorityHealth.values().stream(), + perPriorityDegraded.values().stream()) + .reduce(0, IntMath::saturatedAdd).intValue(); + return Math.min(totalAvailability, 100); + } + + private static int priorityLoadSum(Map healthyPriorityLoad, + Map degradedPriorityLoad) { + return Streams.concat(healthyPriorityLoad.values().stream(), + degradedPriorityLoad.values().stream()) + .reduce(0, IntMath::saturatedAdd).intValue(); + } + + private static DistributeLoadState distributeLoad(SortedSet priorities, + Map perPriorityLoad, + Map perPriorityAvailability, + int totalLoad, int normalizedTotalAvailability) { + int firstAvailablePriority = -1; + for (Integer priority: priorities) { + final long availability = perPriorityAvailability.getOrDefault(priority, 0); + if (firstAvailablePriority < 0 && availability > 0) { + firstAvailablePriority = priority; + } + final int load = (int) Math.min(totalLoad, availability * 100 / normalizedTotalAvailability); + perPriorityLoad.put(priority, load); + totalLoad -= load; + } + return new DistributeLoadState(totalLoad, firstAvailablePriority); + } + + private static PerPriorityPanic recalculatePerPriorityPanic(PrioritySet prioritySet, + int normalizedTotalAvailability) { + final int panicThreshold = prioritySet.panicThreshold(); + if (normalizedTotalAvailability == 0 && panicThreshold == 0) { + // there are no hosts available and panic mode is disabled. + // we should always return a null Endpoint for this case. + return PerPriorityPanic.INVALID; + } + boolean totalPanic = true; + final ImmutableMap.Builder perPriorityPanicBuilder = ImmutableMap.builder(); + for (Integer priority : prioritySet.priorities()) { + final HostSet hostSet = prioritySet.hostSets().get(priority); + final boolean isPanic = + normalizedTotalAvailability == 100 ? false : isHostSetInPanic(hostSet, panicThreshold); + perPriorityPanicBuilder.put(priority, isPanic); + totalPanic &= isPanic; + } + return new PerPriorityPanic(perPriorityPanicBuilder.build(), totalPanic); + } + + private static PerPriorityLoad recalculateLoadInTotalPanic(PrioritySet prioritySet) { + final int totalHostsCount = prioritySet.hostSets().values().stream() + .map(hostSet -> hostSet.hosts().size()) + .reduce(0, IntMath::saturatedAdd) + .intValue(); + if (totalHostsCount == 0) { + return PerPriorityLoad.INVALID; + } + int totalLoad = 100; + int firstNoEmpty = -1; + final Map healthyPriorityLoad = + new Int2IntOpenHashMap(prioritySet.priorities().size()); + final Map degradedPriorityLoad = + new Int2IntOpenHashMap(prioritySet.priorities().size()); + for (Integer priority: prioritySet.priorities()) { + final HostSet hostSet = prioritySet.hostSets().get(priority); + final int hostsSize = hostSet.hosts().size(); + if (firstNoEmpty == -1 && hostsSize > 0) { + firstNoEmpty = priority; + } + final int load = 100 * hostsSize / totalHostsCount; + healthyPriorityLoad.put(priority, load); + degradedPriorityLoad.put(priority, 0); + totalLoad -= load; + } + final int remainingLoad = totalLoad; + healthyPriorityLoad.computeIfPresent(firstNoEmpty, (k, v) -> v + remainingLoad); + final int priorityLoadSum = priorityLoadSum(healthyPriorityLoad, degradedPriorityLoad); + assert priorityLoadSum == 100 : "The priority loads not summing up to 100 (" + priorityLoadSum + + ") for cluster (" + prioritySet.cluster().getName() + ')'; + return new PerPriorityLoad(healthyPriorityLoad, degradedPriorityLoad, 100); + } + + private static boolean isHostSetInPanic(HostSet hostSet, int panicThreshold) { + final int hostCount = hostSet.hosts().size(); + final double healthyPercent = + hostCount == 0 ? 0 : 100.0 * hostSet.healthyHosts().size() / hostCount; + final double degradedPercent = + hostCount == 0 ? 0 : 100.0 * hostSet.degradedHosts().size() / hostCount; + return healthyPercent + degradedPercent < panicThreshold; + } + + static class PerPriorityLoad { + final Map healthyPriorityLoad; + final Map degradedPriorityLoad; + private final int normalizedTotalAvailability; + private final boolean forceEmptyEndpoint; + + private static final PerPriorityLoad INVALID = new PerPriorityLoad(); + + private PerPriorityLoad() { + healthyPriorityLoad = Collections.emptyMap(); + degradedPriorityLoad = Collections.emptyMap(); + normalizedTotalAvailability = 0; + forceEmptyEndpoint = true; + } + + PerPriorityLoad(Map healthyPriorityLoad, + Map degradedPriorityLoad, + int normalizedTotalAvailability) { + this.healthyPriorityLoad = ImmutableMap.copyOf(healthyPriorityLoad); + this.degradedPriorityLoad = ImmutableMap.copyOf(degradedPriorityLoad); + this.normalizedTotalAvailability = normalizedTotalAvailability; + forceEmptyEndpoint = false; + } + + int normalizedTotalAvailability() { + return normalizedTotalAvailability; + } + + int getHealthy(int priority) { + return healthyPriorityLoad.getOrDefault(priority, 0); + } + + int getDegraded(int priority) { + return degradedPriorityLoad.getOrDefault(priority, 0); + } + + boolean forceEmptyEndpoint() { + return forceEmptyEndpoint; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("healthyPriorityLoad", healthyPriorityLoad) + .add("degradedPriorityLoad", degradedPriorityLoad) + .add("normalizedTotalAvailability", normalizedTotalAvailability) + .add("forceEmptyEndpoint", forceEmptyEndpoint) + .toString(); + } + } + + static class PerPriorityPanic { + final Map perPriorityPanic; + private final boolean totalPanic; + private final boolean forceEmptyEndpoint; + + static final PerPriorityPanic INVALID = new PerPriorityPanic(); + + private PerPriorityPanic() { + perPriorityPanic = Collections.emptyMap(); + forceEmptyEndpoint = true; + totalPanic = false; + } + + PerPriorityPanic(Map perPriorityPanic, boolean totalPanic) { + this.perPriorityPanic = ImmutableMap.copyOf(perPriorityPanic); + this.totalPanic = totalPanic; + forceEmptyEndpoint = false; + } + + boolean get(int priority) { + return perPriorityPanic.getOrDefault(priority, true); + } + + boolean totalPanic() { + return totalPanic; + } + + boolean forceEmptyEndpoint() { + return forceEmptyEndpoint; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("perPriorityPanic", perPriorityPanic) + .add("totalPanic", totalPanic) + .add("forceEmptyEndpoint", forceEmptyEndpoint) + .toString(); + } + } + + static class DefaultLbState { + private final PrioritySet prioritySet; + private final PerPriorityLoad perPriorityLoad; + private final PerPriorityPanic perPriorityPanic; + + DefaultLbState(PrioritySet prioritySet, + PerPriorityLoad perPriorityLoad, PerPriorityPanic perPriorityPanic) { + this.prioritySet = prioritySet; + this.perPriorityLoad = perPriorityLoad; + this.perPriorityPanic = perPriorityPanic; + } + + PerPriorityPanic perPriorityPanic() { + return perPriorityPanic; + } + + PrioritySet prioritySet() { + return prioritySet; + } + + PerPriorityLoad perPriorityLoad() { + return perPriorityLoad; + } + + @Nullable + PriorityAndAvailability choosePriority(int hash) { + if (perPriorityLoad.forceEmptyEndpoint() || perPriorityPanic.forceEmptyEndpoint()) { + return null; + } + hash = hash % 100 + 1; + int aggregatePercentageLoad = 0; + final PerPriorityLoad perPriorityLoad = perPriorityLoad(); + for (Integer priority: prioritySet.priorities()) { + aggregatePercentageLoad += perPriorityLoad.getHealthy(priority); + if (hash <= aggregatePercentageLoad) { + return new PriorityAndAvailability(priority, HostAvailability.HEALTHY); + } + } + for (Integer priority: prioritySet.priorities()) { + aggregatePercentageLoad += perPriorityLoad.getDegraded(priority); + if (hash <= aggregatePercentageLoad) { + return new PriorityAndAvailability(priority, HostAvailability.DEGRADED); + } + } + // Shouldn't reach here + throw new IllegalStateException("Unable to select a priority for cluster(" + + prioritySet.cluster().getName() + "), hash(" + hash + ')'); + } + } + + private static class HealthAndDegraded { + + static final HealthAndDegraded ZERO = new HealthAndDegraded(0, 0); + + private final int healthWeight; + private final int degradedWeight; + + HealthAndDegraded(int healthWeight, int degradedWeight) { + this.healthWeight = healthWeight; + this.degradedWeight = degradedWeight; + } + } + + private DefaultLbStateFactory() {} +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/DefaultLoadBalancer.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/DefaultLoadBalancer.java new file mode 100644 index 00000000000..0176817bdcf --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/DefaultLoadBalancer.java @@ -0,0 +1,215 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.google.common.base.Preconditions.checkArgument; + +import java.util.Map; + +import com.google.common.base.MoreObjects; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.client.endpoint.DefaultLbStateFactory.DefaultLbState; + +import io.envoyproxy.envoy.config.core.v3.Locality; + +final class DefaultLoadBalancer implements LoadBalancer { + + private final DefaultLbStateFactory.DefaultLbState lbState; + + DefaultLoadBalancer(PrioritySet prioritySet) { + lbState = DefaultLbStateFactory.newInstance(prioritySet); + } + + @Override + @Nullable + public Endpoint selectNow(ClientRequestContext ctx) { + final PrioritySet prioritySet = lbState.prioritySet(); + if (prioritySet.priorities().isEmpty()) { + return null; + } + final int hash = EndpointUtil.hash(ctx); + final HostsSource hostsSource = hostSourceToUse(lbState, hash); + if (hostsSource == null) { + return null; + } + final HostSet hostSet = prioritySet.hostSets().get(hostsSource.priority); + if (hostSet == null) { + // shouldn't reach here + throw new IllegalStateException("Unable to select a priority for cluster(" + + prioritySet.cluster().getName() + "), hostsSource(" + + hostsSource + ')'); + } + switch (hostsSource.sourceType) { + case ALL_HOSTS: + return hostSet.hostsEndpointGroup().selectNow(ctx); + case HEALTHY_HOSTS: + return hostSet.healthyHostsEndpointGroup().selectNow(ctx); + case DEGRADED_HOSTS: + return hostSet.degradedHostsEndpointGroup().selectNow(ctx); + case LOCALITY_HEALTHY_HOSTS: + final Map healthyLocalities = + hostSet.healthyEndpointGroupPerLocality(); + final EndpointGroup healthyEndpointGroup = healthyLocalities.get(hostsSource.locality); + if (healthyEndpointGroup != null) { + return healthyEndpointGroup.selectNow(ctx); + } + break; + case LOCALITY_DEGRADED_HOSTS: + final Map degradedLocalities = + hostSet.degradedEndpointGroupPerLocality(); + final EndpointGroup degradedEndpointGroup = degradedLocalities.get(hostsSource.locality); + if (degradedEndpointGroup != null) { + return degradedEndpointGroup.selectNow(ctx); + } + break; + default: + throw new Error(); + } + return null; + } + + @Nullable + HostsSource hostSourceToUse(DefaultLbState lbState, int hash) { + final PriorityAndAvailability priorityAndAvailability = lbState.choosePriority(hash); + if (priorityAndAvailability == null) { + return null; + } + final PrioritySet prioritySet = lbState.prioritySet(); + final int priority = priorityAndAvailability.priority; + final HostSet hostSet = prioritySet.hostSets().get(priority); + final HostAvailability hostAvailability = priorityAndAvailability.hostAvailability; + if (lbState.perPriorityPanic().get(priority)) { + if (prioritySet.failTrafficOnPanic()) { + return null; + } else { + return new HostsSource(priority, SourceType.ALL_HOSTS); + } + } + + if (prioritySet.localityWeightedBalancing()) { + final Locality locality; + if (hostAvailability == HostAvailability.DEGRADED) { + locality = hostSet.chooseDegradedLocality(); + } else { + locality = hostSet.chooseHealthyLocality(); + } + if (locality != null) { + return new HostsSource(priority, localitySourceType(hostAvailability), locality); + } + } + + // don't do zone aware routing for now + return new HostsSource(priority, sourceType(hostAvailability), null); + } + + private static SourceType localitySourceType(HostAvailability hostAvailability) { + final SourceType sourceType; + switch (hostAvailability) { + case HEALTHY: + sourceType = SourceType.LOCALITY_HEALTHY_HOSTS; + break; + case DEGRADED: + sourceType = SourceType.LOCALITY_DEGRADED_HOSTS; + break; + default: + throw new Error(); + } + return sourceType; + } + + private static SourceType sourceType(HostAvailability hostAvailability) { + final SourceType sourceType; + switch (hostAvailability) { + case HEALTHY: + sourceType = SourceType.HEALTHY_HOSTS; + break; + case DEGRADED: + sourceType = SourceType.DEGRADED_HOSTS; + break; + default: + throw new Error(); + } + return sourceType; + } + + static class PriorityAndAvailability { + final int priority; + final HostAvailability hostAvailability; + + PriorityAndAvailability(int priority, HostAvailability hostAvailability) { + this.priority = priority; + this.hostAvailability = hostAvailability; + } + } + + static class HostsSource { + final int priority; + final SourceType sourceType; + @Nullable + final Locality locality; + + HostsSource(int priority, SourceType sourceType) { + this(priority, sourceType, null); + } + + HostsSource(int priority, SourceType sourceType, @Nullable Locality locality) { + if (sourceType == SourceType.LOCALITY_HEALTHY_HOSTS || + sourceType == SourceType.LOCALITY_DEGRADED_HOSTS) { + checkArgument(locality != null, "Locality must be non-null for %s", sourceType); + } + this.priority = priority; + this.sourceType = sourceType; + this.locality = locality; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("priority", priority) + .add("sourceType", sourceType) + .add("locality", locality) + .toString(); + } + } + + enum SourceType { + ALL_HOSTS, + HEALTHY_HOSTS, + DEGRADED_HOSTS, + LOCALITY_HEALTHY_HOSTS, + LOCALITY_DEGRADED_HOSTS, + } + + enum HostAvailability { + HEALTHY, + DEGRADED, + } + + static class DistributeLoadState { + final int totalLoad; + final int firstAvailablePriority; + + DistributeLoadState(int totalLoad, int firstAvailablePriority) { + this.totalLoad = totalLoad; + this.firstAvailablePriority = firstAvailablePriority; + } + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointGroupUtil.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointGroupUtil.java new file mode 100644 index 00000000000..049b75b77a3 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointGroupUtil.java @@ -0,0 +1,78 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.function.Predicate; +import java.util.stream.Collectors; + +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy; + +import io.envoyproxy.envoy.config.core.v3.Locality; + +final class EndpointGroupUtil { + + static Map> endpointsByLocality(List endpoints) { + return endpoints.stream().collect(Collectors.groupingBy(EndpointUtil::locality)); + } + + static EndpointGroup filter(List endpoints, EndpointSelectionStrategy strategy, + Predicate predicate) { + final List filteredEndpoints = + endpoints.stream().filter(predicate).collect(Collectors.toList()); + return EndpointGroup.of(strategy, filteredEndpoints); + } + + static EndpointGroup filter(EndpointGroup origEndpointGroup, Predicate predicate) { + return filter(origEndpointGroup.endpoints(), origEndpointGroup.selectionStrategy(), predicate); + } + + static Map filterByLocality(Map> endpointsMap, + EndpointSelectionStrategy strategy, + Predicate predicate) { + final ImmutableMap.Builder filteredLocality = ImmutableMap.builder(); + for (Entry> entry: endpointsMap.entrySet()) { + final EndpointGroup endpointGroup = filter(entry.getValue(), strategy, predicate); + if (endpointGroup.endpoints().isEmpty()) { + continue; + } + filteredLocality.put(entry.getKey(), endpointGroup); + } + return filteredLocality.build(); + } + + static Map filterByLocality(Map origLocality, + Predicate predicate) { + final ImmutableMap.Builder filteredLocality = ImmutableMap.builder(); + for (Entry entry: origLocality.entrySet()) { + final EndpointGroup endpointGroup = filter(entry.getValue(), predicate); + if (endpointGroup.endpoints().isEmpty()) { + continue; + } + filteredLocality.put(entry.getKey(), endpointGroup); + } + return filteredLocality.build(); + } + + private EndpointGroupUtil() {} +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointUtil.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointUtil.java index e2c96bd7a1e..af42ad1cc44 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointUtil.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/EndpointUtil.java @@ -16,19 +16,29 @@ package com.linecorp.armeria.xds.client.endpoint; +import java.util.concurrent.ThreadLocalRandom; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.protobuf.Duration; +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.Endpoint; import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy; import com.linecorp.armeria.client.endpoint.EndpointWeightTransition; import com.linecorp.armeria.client.endpoint.WeightRampingUpStrategyBuilder; import com.linecorp.armeria.common.annotation.Nullable; import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.CommonLbConfig; import io.envoyproxy.envoy.config.cluster.v3.Cluster.LbPolicy; import io.envoyproxy.envoy.config.cluster.v3.Cluster.SlowStartConfig; +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; final class EndpointUtil { @@ -92,5 +102,89 @@ private static EndpointSelectionStrategy rampingUpSelectionStrategy(SlowStartCon return builder.build(); } + static Locality locality(Endpoint endpoint) { + final LocalityLbEndpoints localityLbEndpoints = localityLbEndpoints(endpoint); + return localityLbEndpoints.hasLocality() ? localityLbEndpoints.getLocality() + : Locality.getDefaultInstance(); + } + + static CoarseHealth coarseHealth(Endpoint endpoint) { + final LbEndpoint lbEndpoint = lbEndpoint(endpoint); + switch (lbEndpoint.getHealthStatus()) { + // Assume UNKNOWN means health check wasn't performed + case UNKNOWN: + case HEALTHY: + return CoarseHealth.HEALTHY; + case DEGRADED: + return CoarseHealth.DEGRADED; + default: + return CoarseHealth.UNHEALTHY; + } + } + + static int hash(ClientRequestContext ctx) { + if (ctx.hasAttr(XdsAttributeKeys.SELECTION_HASH)) { + final Integer selectionHash = ctx.attr(XdsAttributeKeys.SELECTION_HASH); + assert selectionHash != null; + return Math.max(0, selectionHash); + } + return ThreadLocalRandom.current().nextInt(0, Integer.MAX_VALUE); + } + + static int priority(Endpoint endpoint) { + return localityLbEndpoints(endpoint).getPriority(); + } + + static boolean hasLocalityLoadBalancingWeight(Endpoint endpoint) { + return localityLbEndpoints(endpoint).hasLoadBalancingWeight(); + } + + static int localityLoadBalancingWeight(Endpoint endpoint) { + return localityLbEndpoints(endpoint).getLoadBalancingWeight().getValue(); + } + + private static LbEndpoint lbEndpoint(Endpoint endpoint) { + final LbEndpoint lbEndpoint = endpoint.attr(XdsAttributeKeys.LB_ENDPOINT_KEY); + assert lbEndpoint != null; + return lbEndpoint; + } + + private static LocalityLbEndpoints localityLbEndpoints(Endpoint endpoint) { + final LocalityLbEndpoints localityLbEndpoints = endpoint.attr( + XdsAttributeKeys.LOCALITY_LB_ENDPOINTS_KEY); + assert localityLbEndpoints != null; + return localityLbEndpoints; + } + + static int overProvisionFactor(ClusterLoadAssignment clusterLoadAssignment) { + if (!clusterLoadAssignment.hasPolicy()) { + return 140; + } + final Policy policy = clusterLoadAssignment.getPolicy(); + return policy.hasOverprovisioningFactor() ? policy.getOverprovisioningFactor().getValue() : 140; + } + + static boolean weightedPriorityHealth(ClusterLoadAssignment clusterLoadAssignment) { + return clusterLoadAssignment.hasPolicy() ? + clusterLoadAssignment.getPolicy().getWeightedPriorityHealth() : false; + } + + static int panicThreshold(Cluster cluster) { + if (!cluster.hasCommonLbConfig()) { + return 50; + } + final CommonLbConfig commonLbConfig = cluster.getCommonLbConfig(); + if (!commonLbConfig.hasHealthyPanicThreshold()) { + return 50; + } + return Math.min((int) Math.round(commonLbConfig.getHealthyPanicThreshold().getValue()), 100); + } + + enum CoarseHealth { + HEALTHY, + DEGRADED, + UNHEALTHY, + } + private EndpointUtil() {} } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/HostSet.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/HostSet.java new file mode 100644 index 00000000000..a21993a59e0 --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/HostSet.java @@ -0,0 +1,190 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import java.util.List; +import java.util.Map; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Ints; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.internal.client.endpoint.WeightedRandomDistributionSelector; + +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; + +final class HostSet { + + private final boolean weightedPriorityHealth; + private final int overProvisioningFactor; + + private final WeightedRandomDistributionSelector healthyLocalitySelector; + private final WeightedRandomDistributionSelector degradedLocalitySelector; + + private final EndpointGroup hostsEndpointGroup; + private final EndpointGroup healthyHostsEndpointGroup; + private final Map healthyEndpointGroupPerLocality; + private final EndpointGroup degradedHostsEndpointGroup; + private final Map degradedEndpointGroupPerLocality; + + HostSet(UpdateHostsParam params, ClusterLoadAssignment clusterLoadAssignment) { + weightedPriorityHealth = EndpointUtil.weightedPriorityHealth(clusterLoadAssignment); + overProvisioningFactor = EndpointUtil.overProvisionFactor(clusterLoadAssignment); + + healthyLocalitySelector = rebuildLocalityScheduler( + params.healthyHostsPerLocality(), params.hostsPerLocality(), + params.localityWeightsMap(), overProvisioningFactor); + degradedLocalitySelector = rebuildLocalityScheduler( + params.degradedHostsPerLocality(), params.hostsPerLocality(), + params.localityWeightsMap(), overProvisioningFactor); + + hostsEndpointGroup = params.hosts(); + healthyHostsEndpointGroup = params.healthyHosts(); + degradedHostsEndpointGroup = params.degradedHosts(); + healthyEndpointGroupPerLocality = params.healthyHostsPerLocality(); + degradedEndpointGroupPerLocality = params.degradedHostsPerLocality(); + } + + List hosts() { + return hostsEndpointGroup.endpoints(); + } + + EndpointGroup hostsEndpointGroup() { + return hostsEndpointGroup; + } + + List healthyHosts() { + return healthyHostsEndpointGroup.endpoints(); + } + + EndpointGroup healthyHostsEndpointGroup() { + return healthyHostsEndpointGroup; + } + + Map healthyEndpointGroupPerLocality() { + return healthyEndpointGroupPerLocality; + } + + List degradedHosts() { + return degradedHostsEndpointGroup.endpoints(); + } + + EndpointGroup degradedHostsEndpointGroup() { + return degradedHostsEndpointGroup; + } + + Map degradedEndpointGroupPerLocality() { + return degradedEndpointGroupPerLocality; + } + + boolean weightedPriorityHealth() { + return weightedPriorityHealth; + } + + int overProvisioningFactor() { + return overProvisioningFactor; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("hostsEndpointGroup", hostsEndpointGroup) + .add("healthyHostsEndpointGroup", healthyHostsEndpointGroup) + .add("healthyEndpointGroupPerLocality", healthyEndpointGroupPerLocality) + .add("degradedHostsEndpointGroup", degradedHostsEndpointGroup) + .add("degradedEndpointGroupPerLocality", degradedEndpointGroupPerLocality) + .add("weightedPriorityHealth", weightedPriorityHealth) + .add("overProvisioningFactor", overProvisioningFactor) + .toString(); + } + + private static WeightedRandomDistributionSelector rebuildLocalityScheduler( + Map eligibleHostsPerLocality, + Map allHostsPerLocality, + Map localityWeightsMap, + int overProvisioningFactor) { + final ImmutableList.Builder localityWeightsBuilder = ImmutableList.builder(); + for (Locality locality : allHostsPerLocality.keySet()) { + final double effectiveWeight = + effectiveLocalityWeight(locality, eligibleHostsPerLocality, allHostsPerLocality, + localityWeightsMap, overProvisioningFactor); + if (effectiveWeight > 0) { + localityWeightsBuilder.add(new LocalityEntry(locality, effectiveWeight)); + } + } + return new WeightedRandomDistributionSelector<>(localityWeightsBuilder.build()); + } + + static double effectiveLocalityWeight(Locality locality, + Map eligibleHostsPerLocality, + Map allHostsPerLocality, + Map localityWeightsMap, + int overProvisioningFactor) { + final EndpointGroup localityEligibleHosts = + eligibleHostsPerLocality.getOrDefault(locality, EndpointGroup.of()); + final int hostCount = allHostsPerLocality.getOrDefault(locality, EndpointGroup.of()).endpoints().size(); + if (hostCount == 0) { + return 0; + } + // We compute the availability of a locality via: + // (overProvisioningFactor) * (# healthy/degraded of hosts) / (# total hosts) + // https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/locality_weight.html + final double localityAvailabilityRatio = (double) localityEligibleHosts.endpoints().size() / hostCount; + final int weight = localityWeightsMap.getOrDefault(locality, 0); + final double effectiveLocalityAvailabilityRatio = + Math.min(1.0, (overProvisioningFactor / 100.0) * localityAvailabilityRatio); + return weight * effectiveLocalityAvailabilityRatio; + } + + @Nullable + Locality chooseDegradedLocality() { + final LocalityEntry localityEntry = degradedLocalitySelector.select(); + if (localityEntry == null) { + return null; + } + return localityEntry.locality; + } + + @Nullable + Locality chooseHealthyLocality() { + final LocalityEntry localityEntry = healthyLocalitySelector.select(); + if (localityEntry == null) { + return null; + } + return localityEntry.locality; + } + + static class LocalityEntry extends WeightedRandomDistributionSelector.AbstractEntry { + + private final Locality locality; + private final int weight; + + LocalityEntry(Locality locality, double weight) { + this.locality = locality; + this.weight = Ints.saturatedCast(Math.round(weight)); + } + + @Override + public int weight() { + return weight; + } + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/LoadBalancer.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/LoadBalancer.java index 1b382a33764..8c55870c63a 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/LoadBalancer.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/LoadBalancer.java @@ -24,6 +24,4 @@ interface LoadBalancer { @Nullable Endpoint selectNow(ClientRequestContext ctx); - - void prioritySetUpdated(PrioritySet prioritySet); } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PrioritySet.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PrioritySet.java index c3944fb7bd6..a62ca703187 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PrioritySet.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PrioritySet.java @@ -17,26 +17,126 @@ package com.linecorp.armeria.xds.client.endpoint; import java.util.List; +import java.util.Map; +import java.util.SortedSet; +import java.util.TreeSet; -import com.google.common.collect.ImmutableList; +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.common.annotation.Nullable; import com.linecorp.armeria.xds.ClusterSnapshot; +import com.linecorp.armeria.xds.EndpointSnapshot; + +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.CommonLbConfig; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; final class PrioritySet { - private final List endpoints; + private final Map hostSets; + private final SortedSet priorities; + private final List origEndpoints; private final ClusterSnapshot clusterSnapshot; + private final Cluster cluster; + private final int panicThreshold; - PrioritySet(List endpoints, ClusterSnapshot clusterSnapshot) { - this.endpoints = ImmutableList.copyOf(endpoints); + PrioritySet(ClusterSnapshot clusterSnapshot, Map hostSets, List origEndpoints) { this.clusterSnapshot = clusterSnapshot; + cluster = clusterSnapshot.xdsResource().resource(); + panicThreshold = EndpointUtil.panicThreshold(cluster); + this.hostSets = hostSets; + priorities = new TreeSet<>(hostSets.keySet()); + this.origEndpoints = origEndpoints; + } + + boolean failTrafficOnPanic() { + final CommonLbConfig commonLbConfig = commonLbConfig(); + if (commonLbConfig == null) { + return false; + } + if (!commonLbConfig.hasZoneAwareLbConfig()) { + return false; + } + return commonLbConfig.getZoneAwareLbConfig().getFailTrafficOnPanic(); + } + + @Nullable + private CommonLbConfig commonLbConfig() { + if (!cluster.hasCommonLbConfig()) { + return null; + } + return cluster.getCommonLbConfig(); + } + + boolean localityWeightedBalancing() { + final CommonLbConfig commonLbConfig = commonLbConfig(); + if (commonLbConfig == null) { + return false; + } + return commonLbConfig.hasLocalityWeightedLbConfig(); + } + + int panicThreshold() { + return panicThreshold; } + SortedSet priorities() { + return priorities; + } + + Map hostSets() { + return hostSets; + } + + /** + * Returns the original list of endpoints this priority set was created with. + * This method acts as a temporary measure to keep backwards compatibility with + * {@link SubsetLoadBalancer}. It will be removed once {@link SubsetLoadBalancer} + * is fully implemented. + */ List endpoints() { - return endpoints; + return origEndpoints; + } + + Cluster cluster() { + return cluster; } ClusterSnapshot clusterSnapshot() { return clusterSnapshot; } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("hostSets", hostSets) + .add("cluster", cluster) + .toString(); + } + + static final class PrioritySetBuilder { + + private final ImmutableMap.Builder hostSetsBuilder = ImmutableMap.builder(); + private final ClusterSnapshot clusterSnapshot; + private final List origEndpoints; + private final ClusterLoadAssignment clusterLoadAssignment; + + PrioritySetBuilder(ClusterSnapshot clusterSnapshot, List origEndpoints) { + this.clusterSnapshot = clusterSnapshot; + this.origEndpoints = origEndpoints; + final EndpointSnapshot endpointSnapshot = clusterSnapshot.endpointSnapshot(); + assert endpointSnapshot != null; + clusterLoadAssignment = endpointSnapshot.xdsResource().resource(); + } + + void createHostSet(int priority, UpdateHostsParam params) { + final HostSet hostSet = new HostSet(params, clusterLoadAssignment); + hostSetsBuilder.put(priority, hostSet); + } + + PrioritySet build() { + return new PrioritySet(clusterSnapshot, hostSetsBuilder.build(), origEndpoints); + } + } } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PriorityState.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PriorityState.java new file mode 100644 index 00000000000..4194d7739ff --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PriorityState.java @@ -0,0 +1,73 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.client.endpoint.EndpointGroupUtil.endpointsByLocality; +import static com.linecorp.armeria.xds.client.endpoint.EndpointUtil.locality; +import static com.linecorp.armeria.xds.client.endpoint.EndpointUtil.localityLoadBalancingWeight; +import static com.linecorp.armeria.xds.client.endpoint.EndpointUtil.selectionStrategy; + +import java.util.List; +import java.util.Map; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.xds.ClusterSnapshot; + +import io.envoyproxy.envoy.config.core.v3.Locality; + +final class PriorityState { + private final UpdateHostsParam param; + + PriorityState(List hosts, Map localityWeightsMap, + ClusterSnapshot clusterSnapshot) { + final Map> endpointsPerLocality = endpointsByLocality(hosts); + param = new UpdateHostsParam(hosts, endpointsPerLocality, localityWeightsMap, + selectionStrategy(clusterSnapshot.xdsResource().resource())); + } + + UpdateHostsParam param() { + return param; + } + + static final class PriorityStateBuilder { + + private final ImmutableList.Builder hostsBuilder = ImmutableList.builder(); + private final ImmutableMap.Builder localityWeightsBuilder = + ImmutableMap.builder(); + private final ClusterSnapshot clusterSnapshot; + + PriorityStateBuilder(ClusterSnapshot clusterSnapshot) { + this.clusterSnapshot = clusterSnapshot; + } + + void addEndpoint(Endpoint endpoint) { + hostsBuilder.add(endpoint); + if (locality(endpoint) != Locality.getDefaultInstance() && + EndpointUtil.hasLocalityLoadBalancingWeight(endpoint)) { + localityWeightsBuilder.put(locality(endpoint), localityLoadBalancingWeight(endpoint)); + } + } + + PriorityState build() { + return new PriorityState(hostsBuilder.build(), localityWeightsBuilder.buildKeepingLast(), + clusterSnapshot); + } + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PriorityStateManager.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PriorityStateManager.java new file mode 100644 index 00000000000..30a94e0123f --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/PriorityStateManager.java @@ -0,0 +1,66 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.client.endpoint.EndpointUtil.priority; + +import java.util.List; +import java.util.Map.Entry; +import java.util.SortedMap; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.xds.ClusterSnapshot; + +import it.unimi.dsi.fastutil.ints.Int2ReferenceAVLTreeMap; + +final class PriorityStateManager { + + private final SortedMap priorityStateMap = + new Int2ReferenceAVLTreeMap<>(); + private final ClusterSnapshot clusterSnapshot; + private final List origEndpoints; + + PriorityStateManager(ClusterSnapshot clusterSnapshot, List origEndpoints) { + this.clusterSnapshot = clusterSnapshot; + this.origEndpoints = origEndpoints; + for (Endpoint endpoint : origEndpoints) { + registerEndpoint(endpoint); + } + } + + private void registerEndpoint(Endpoint endpoint) { + final int priority = priority(endpoint); + PriorityState.PriorityStateBuilder builder = priorityStateMap.get(priority); + if (builder == null) { + builder = priorityStateMap.computeIfAbsent( + priority(endpoint), + ignored -> new PriorityState.PriorityStateBuilder(clusterSnapshot)); + } + builder.addEndpoint(endpoint); + } + + PrioritySet build() { + final PrioritySet.PrioritySetBuilder prioritySetBuilder = + new PrioritySet.PrioritySetBuilder(clusterSnapshot, origEndpoints); + for (Entry entry: priorityStateMap.entrySet()) { + final Integer priority = entry.getKey(); + final PriorityState priorityState = entry.getValue().build(); + prioritySetBuilder.createHostSet(priority, priorityState.param()); + } + return prioritySetBuilder.build(); + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetLoadBalancer.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetLoadBalancer.java index 74bbfadd07e..3aa92b4534a 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetLoadBalancer.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/SubsetLoadBalancer.java @@ -43,24 +43,18 @@ final class SubsetLoadBalancer implements LoadBalancer { private static final Logger logger = LoggerFactory.getLogger(SubsetLoadBalancer.class); - @Nullable - private volatile EndpointGroup endpointGroup; + private final EndpointGroup endpointGroup; + + SubsetLoadBalancer(PrioritySet prioritySet) { + endpointGroup = createEndpointGroup(prioritySet); + } @Override @Nullable public Endpoint selectNow(ClientRequestContext ctx) { - final EndpointGroup endpointGroup = this.endpointGroup; - if (endpointGroup == null) { - return null; - } return endpointGroup.selectNow(ctx); } - @Override - public void prioritySetUpdated(PrioritySet prioritySet) { - endpointGroup = createEndpointGroup(prioritySet); - } - private static EndpointGroup createEndpointGroup(PrioritySet prioritySet) { final ClusterSnapshot clusterSnapshot = prioritySet.clusterSnapshot(); final Struct filterMetadata = filterMetadata(clusterSnapshot); diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/UpdateHostsParam.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/UpdateHostsParam.java new file mode 100644 index 00000000000..77f62ed639d --- /dev/null +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/UpdateHostsParam.java @@ -0,0 +1,91 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.client.endpoint.EndpointGroupUtil.filter; +import static com.linecorp.armeria.xds.client.endpoint.EndpointGroupUtil.filterByLocality; +import static com.linecorp.armeria.xds.client.endpoint.EndpointUtil.coarseHealth; + +import java.util.List; +import java.util.Map; + +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy; +import com.linecorp.armeria.xds.client.endpoint.EndpointUtil.CoarseHealth; + +import io.envoyproxy.envoy.config.core.v3.Locality; + +/** + * Hosts per partition. + */ +final class UpdateHostsParam { + + private final EndpointGroup hosts; + private final EndpointGroup healthyHosts; + private final EndpointGroup degradedHosts; + private final Map hostsPerLocality; + private final Map healthyHostsPerLocality; + private final Map degradedHostsPerLocality; + private final Map localityWeightsMap; + + UpdateHostsParam(List endpoints, + Map> endpointsPerLocality, + Map localityWeightsMap, + EndpointSelectionStrategy strategy) { + hosts = EndpointGroup.of(strategy, endpoints); + hostsPerLocality = filterByLocality(endpointsPerLocality, strategy, ignored -> true); + healthyHosts = filter(endpoints, strategy, + endpoint -> coarseHealth(endpoint) == CoarseHealth.HEALTHY); + healthyHostsPerLocality = filterByLocality(endpointsPerLocality, strategy, + endpoint -> coarseHealth(endpoint) == CoarseHealth.HEALTHY); + degradedHosts = filter(endpoints, strategy, + endpoint -> coarseHealth(endpoint) == CoarseHealth.DEGRADED); + degradedHostsPerLocality = filterByLocality( + endpointsPerLocality, strategy, + endpoint -> coarseHealth(endpoint) == CoarseHealth.DEGRADED); + this.localityWeightsMap = localityWeightsMap; + } + + EndpointGroup hosts() { + return hosts; + } + + Map hostsPerLocality() { + return hostsPerLocality; + } + + EndpointGroup healthyHosts() { + return healthyHosts; + } + + Map healthyHostsPerLocality() { + return healthyHostsPerLocality; + } + + EndpointGroup degradedHosts() { + return degradedHosts; + } + + Map degradedHostsPerLocality() { + return degradedHostsPerLocality; + } + + Map localityWeightsMap() { + return localityWeightsMap; + } +} diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributeAssigningEndpointGroup.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributeAssigningEndpointGroup.java index 8c9316e9986..5b3b27d2192 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributeAssigningEndpointGroup.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributeAssigningEndpointGroup.java @@ -16,8 +16,8 @@ package com.linecorp.armeria.xds.client.endpoint; -import static com.linecorp.armeria.xds.client.endpoint.XdsAttributesKeys.LB_ENDPOINT_KEY; -import static com.linecorp.armeria.xds.client.endpoint.XdsAttributesKeys.LOCALITY_LB_ENDPOINTS_KEY; +import static com.linecorp.armeria.xds.client.endpoint.XdsAttributeKeys.LB_ENDPOINT_KEY; +import static com.linecorp.armeria.xds.client.endpoint.XdsAttributeKeys.LOCALITY_LB_ENDPOINTS_KEY; import java.util.List; import java.util.function.Consumer; diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributesKeys.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributeKeys.java similarity index 73% rename from xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributesKeys.java rename to xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributeKeys.java index 9b2a531c861..ee4c507590c 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributesKeys.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsAttributeKeys.java @@ -20,12 +20,14 @@ import io.envoyproxy.envoy.config.endpoint.v3.LocalityLbEndpoints; import io.netty.util.AttributeKey; -final class XdsAttributesKeys { +final class XdsAttributeKeys { static final AttributeKey LB_ENDPOINT_KEY = - AttributeKey.valueOf(XdsAttributesKeys.class, "LB_ENDPOINT_KEY"); + AttributeKey.valueOf(XdsAttributeKeys.class, "LB_ENDPOINT_KEY"); static final AttributeKey LOCALITY_LB_ENDPOINTS_KEY = - AttributeKey.valueOf(XdsAttributesKeys.class, "LOCALITY_LB_ENDPOINTS_KEY"); + AttributeKey.valueOf(XdsAttributeKeys.class, "LOCALITY_LB_ENDPOINTS_KEY"); + static final AttributeKey SELECTION_HASH = + AttributeKey.valueOf(XdsAttributeKeys.class, "SELECTION_HASH"); - private XdsAttributesKeys() {} + private XdsAttributeKeys() {} } diff --git a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsEndpointUtil.java b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsEndpointUtil.java index 3b908bb1b42..34692f97cdd 100644 --- a/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsEndpointUtil.java +++ b/xds/src/main/java/com/linecorp/armeria/xds/client/endpoint/XdsEndpointUtil.java @@ -57,7 +57,7 @@ static List convertEndpoints(List endpoints, Struct filterMe checkArgument(filterMetadata.getFieldsCount() > 0, "filterMetadata.getFieldsCount(): %s (expected: > 0)", filterMetadata.getFieldsCount()); final Predicate lbEndpointPredicate = endpoint -> { - final LbEndpoint lbEndpoint = endpoint.attr(XdsAttributesKeys.LB_ENDPOINT_KEY); + final LbEndpoint lbEndpoint = endpoint.attr(XdsAttributeKeys.LB_ENDPOINT_KEY); assert lbEndpoint != null; final Struct endpointMetadata = lbEndpoint.getMetadata().getFilterMetadataOrDefault( SUBSET_LOAD_BALANCING_FILTER_NAME, Struct.getDefaultInstance()); @@ -201,13 +201,13 @@ private static Endpoint convertToEndpoint(LocalityLbEndpoints localityLbEndpoint if (!Strings.isNullOrEmpty(hostname)) { endpoint = Endpoint.of(hostname) .withIpAddr(socketAddress.getAddress()) - .withAttr(XdsAttributesKeys.LB_ENDPOINT_KEY, lbEndpoint) - .withAttr(XdsAttributesKeys.LOCALITY_LB_ENDPOINTS_KEY, localityLbEndpoints) + .withAttr(XdsAttributeKeys.LB_ENDPOINT_KEY, lbEndpoint) + .withAttr(XdsAttributeKeys.LOCALITY_LB_ENDPOINTS_KEY, localityLbEndpoints) .withWeight(weight); } else { endpoint = Endpoint.of(socketAddress.getAddress()) - .withAttr(XdsAttributesKeys.LB_ENDPOINT_KEY, lbEndpoint) - .withAttr(XdsAttributesKeys.LOCALITY_LB_ENDPOINTS_KEY, localityLbEndpoints) + .withAttr(XdsAttributeKeys.LB_ENDPOINT_KEY, lbEndpoint) + .withAttr(XdsAttributeKeys.LOCALITY_LB_ENDPOINTS_KEY, localityLbEndpoints) .withWeight(weight); } if (socketAddress.hasPortValue()) { diff --git a/xds/src/test/java/com/linecorp/armeria/xds/XdsTestResources.java b/xds/src/test/java/com/linecorp/armeria/xds/XdsTestResources.java index 8dca7588ee3..0355afbe801 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/XdsTestResources.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/XdsTestResources.java @@ -66,6 +66,7 @@ import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpFilter; import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.Rds; import io.envoyproxy.envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext; +import io.envoyproxy.envoy.type.v3.Percent; public final class XdsTestResources { @@ -74,7 +75,8 @@ public final class XdsTestResources { private XdsTestResources() {} public static LbEndpoint endpoint(String address, int port) { - return endpoint(address, port, Metadata.getDefaultInstance()); + return endpoint(address, port, Metadata.getDefaultInstance(), 1, + HealthStatus.HEALTHY); } public static LbEndpoint endpoint(String address, int port, int weight) { @@ -82,6 +84,15 @@ public static LbEndpoint endpoint(String address, int port, int weight) { HealthStatus.HEALTHY); } + public static LbEndpoint endpoint(String address, int port, HealthStatus healthStatus) { + return endpoint(address, port, Metadata.getDefaultInstance(), 1, healthStatus); + } + + public static LbEndpoint endpoint(String address, int port, HealthStatus healthStatus, + int weight) { + return endpoint(address, port, Metadata.getDefaultInstance(), weight, healthStatus); + } + public static LbEndpoint endpoint(String address, int port, Metadata metadata) { return endpoint(address, port, metadata, 1, HealthStatus.HEALTHY); } @@ -104,6 +115,16 @@ public static LbEndpoint endpoint(String address, int port, Metadata metadata, i .build()).build(); } + public static Locality locality(String region) { + return Locality.newBuilder() + .setRegion(region) + .build(); + } + + public static Percent percent(int percent) { + return Percent.newBuilder().setValue(percent).build(); + } + public static ClusterLoadAssignment loadAssignment(String clusterName, URI uri) { return loadAssignment(clusterName, uri.getHost(), uri.getPort()); } @@ -385,10 +406,30 @@ public static Bootstrap staticBootstrap(Listener listener, Cluster cluster) { public static LocalityLbEndpoints localityLbEndpoints(Locality locality, Collection endpoints) { - return LocalityLbEndpoints.newBuilder() - .addAllLbEndpoints(endpoints) - .setLocality(locality) - .build(); + return localityLbEndpoints(locality, endpoints, -1, 0); + } + + public static LocalityLbEndpoints localityLbEndpoints(Locality locality, + Collection endpoints, + Integer priority) { + return localityLbEndpoints(locality, endpoints, priority, 0); + } + + public static LocalityLbEndpoints localityLbEndpoints(Locality locality, + Collection endpoints, + int priority, + int loadBalancingWeight) { + final LocalityLbEndpoints.Builder builder = LocalityLbEndpoints.newBuilder() + .addAllLbEndpoints(endpoints) + .setLocality(locality); + if (priority >= 0) { + builder.setPriority(priority); + } + if (loadBalancingWeight > 0) { + builder.setLoadBalancingWeight(UInt32Value.of(loadBalancingWeight)); + } + + return builder.build(); } public static LocalityLbEndpoints localityLbEndpoints(Locality locality, LbEndpoint... endpoints) { diff --git a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/LocalityTest.java b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/LocalityTest.java new file mode 100644 index 00000000000..68e903ac739 --- /dev/null +++ b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/LocalityTest.java @@ -0,0 +1,170 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.XdsTestResources.createStaticCluster; +import static com.linecorp.armeria.xds.XdsTestResources.endpoint; +import static com.linecorp.armeria.xds.XdsTestResources.locality; +import static com.linecorp.armeria.xds.XdsTestResources.localityLbEndpoints; +import static com.linecorp.armeria.xds.XdsTestResources.staticBootstrap; +import static com.linecorp.armeria.xds.XdsTestResources.staticResourceListener; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.xds.ListenerRoot; +import com.linecorp.armeria.xds.XdsBootstrap; + +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.CommonLbConfig; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.CommonLbConfig.Builder; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.CommonLbConfig.LocalityWeightedLbConfig; +import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.listener.v3.Listener; + +class LocalityTest { + + private static final Builder LOCALITY_LB_CONFIG = + CommonLbConfig.newBuilder() + .setLocalityWeightedLbConfig(LocalityWeightedLbConfig.getDefaultInstance()); + + @Test + void basicCase() { + final Listener listener = staticResourceListener(); + + final List lbEndpointsA = + ImmutableList.of(endpoint("127.0.0.1", 8080, 1000)); + final List lbEndpointsB = + ImmutableList.of(endpoint("127.0.0.1", 8081, 1)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(locality("regionA"), lbEndpointsA, 0, 9)) + .addEndpoints(localityLbEndpoints(locality("regionB"), lbEndpointsB, 0, 1)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setCommonLbConfig(LOCALITY_LB_CONFIG).build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + final Map countsMap = new HashMap<>(); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + // Regardless of the endpoint weight, the locality weight will be used + // to determine which endpoint group to use + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + for (int i = 0; i < 10; i++) { + final Endpoint selected = endpointGroup.selectNow(ctx); + assertThat(selected).isNotNull(); + countsMap.compute(selected, (k, v) -> v == null ? 1 : v + 1); + } + assertThat(countsMap) + .containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of(Endpoint.of("127.0.0.1", 8080).withWeight(1000), 9, + Endpoint.of("127.0.0.1", 8081).withWeight(1), 1)); + } + } + + @Test + void emptyLocality() { + final Listener listener = staticResourceListener(); + + final List lbEndpointsA = ImmutableList.of(); + final List lbEndpointsB = + ImmutableList.of(endpoint("127.0.0.1", 8081), + endpoint("127.0.0.1", 8081)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints(locality("regionA"), lbEndpointsA)) + .addEndpoints(localityLbEndpoints(locality("regionB"), lbEndpointsB)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setCommonLbConfig(LOCALITY_LB_CONFIG).build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + // regionA won't be selected at all since it is empty + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + } + } + + @Test + void multiPriorityAndLocality() { + final Listener listener = staticResourceListener(); + + final List lbEndpointsA = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.HEALTHY)); + // the unhealthy endpoint won't be selected due to priority selection + final List lbEndpointsB = + ImmutableList.of(endpoint("127.0.0.1", 8081, HealthStatus.UNHEALTHY)); + final List lbEndpointsC = + ImmutableList.of(endpoint("127.0.0.1", 8082, HealthStatus.HEALTHY)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(locality("regionA"), lbEndpointsA, 0, 9)) + .addEndpoints(localityLbEndpoints(locality("regionB"), lbEndpointsB, 0, 1000)) + .addEndpoints(localityLbEndpoints(locality("regionC"), lbEndpointsC, 0, 1)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setCommonLbConfig(LOCALITY_LB_CONFIG).build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + final Map countsMap = new HashMap<>(); + for (int i = 0; i < 10; i++) { + final Endpoint selected = endpointGroup.selectNow(ctx); + assertThat(selected).isNotNull(); + countsMap.compute(selected, (k, v) -> v == null ? 1 : v + 1); + } + assertThat(countsMap) + .containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of(Endpoint.of("127.0.0.1", 8080), 9, + Endpoint.of("127.0.0.1", 8082), 1)); + } + } +} diff --git a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/PriorityTest.java b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/PriorityTest.java new file mode 100644 index 00000000000..9921e008fc7 --- /dev/null +++ b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/PriorityTest.java @@ -0,0 +1,388 @@ +/* + * Copyright 2024 LINE Corporation + * + * LINE Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.armeria.xds.client.endpoint; + +import static com.linecorp.armeria.xds.XdsTestResources.createStaticCluster; +import static com.linecorp.armeria.xds.XdsTestResources.endpoint; +import static com.linecorp.armeria.xds.XdsTestResources.localityLbEndpoints; +import static com.linecorp.armeria.xds.XdsTestResources.percent; +import static com.linecorp.armeria.xds.XdsTestResources.staticBootstrap; +import static com.linecorp.armeria.xds.XdsTestResources.staticResourceListener; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.UInt32Value; + +import com.linecorp.armeria.client.ClientRequestContext; +import com.linecorp.armeria.client.Endpoint; +import com.linecorp.armeria.client.endpoint.EndpointGroup; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpRequest; +import com.linecorp.armeria.common.annotation.Nullable; +import com.linecorp.armeria.xds.ListenerRoot; +import com.linecorp.armeria.xds.XdsBootstrap; + +import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; +import io.envoyproxy.envoy.config.cluster.v3.Cluster; +import io.envoyproxy.envoy.config.cluster.v3.Cluster.CommonLbConfig; +import io.envoyproxy.envoy.config.core.v3.HealthStatus; +import io.envoyproxy.envoy.config.core.v3.Locality; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment; +import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment.Policy; +import io.envoyproxy.envoy.config.endpoint.v3.LbEndpoint; +import io.envoyproxy.envoy.config.listener.v3.Listener; +import io.envoyproxy.envoy.type.v3.Percent; + +class PriorityTest { + + @Test + void basicCase() { + final Listener listener = staticResourceListener(); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080), + endpoint("127.0.0.1", 8081), + endpoint("127.0.0.1", 8082)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + } + } + + @Test + void differentWeights() { + final Listener listener = staticResourceListener(); + + final List lbEndpoints = + ImmutableList.of(endpoint("127.0.0.1", 8080, 1), + endpoint("127.0.0.1", 8081, 1), + endpoint("127.0.0.1", 8082, 2)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment.newBuilder() + .addEndpoints(localityLbEndpoints( + Locality.getDefaultInstance(), lbEndpoints)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + } + } + + @Test + void differentPriorities() { + final Listener listener = staticResourceListener(); + + final List lbEndpoints0 = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.HEALTHY), + endpoint("127.0.0.1", 8081, HealthStatus.DEGRADED)); + final List lbEndpoints1 = + ImmutableList.of(endpoint("127.0.0.1", 8082, HealthStatus.HEALTHY), + endpoint("127.0.0.1", 8083, HealthStatus.DEGRADED)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0, 0)) + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints1, 1)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + + // default overprovisioning factor (140) * 0.5 = 70 will be routed + // to healthy endpoints for priority 0 + ctx.setAttr(XdsAttributeKeys.SELECTION_HASH, 0); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + ctx.setAttr(XdsAttributeKeys.SELECTION_HASH, 68); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + + // 100 - 70 (priority 0) = 30 will be routed to healthy endpoints for priority 1 + ctx.setAttr(XdsAttributeKeys.SELECTION_HASH, 70); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + ctx.setAttr(XdsAttributeKeys.SELECTION_HASH, 99); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + } + } + + @Test + void degradedEndpoints() { + final Listener listener = staticResourceListener(); + + final List lbEndpoints0 = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.HEALTHY, 1), + endpoint("127.0.0.1", 8081, HealthStatus.UNHEALTHY, 9)); + final List lbEndpoints1 = + ImmutableList.of(endpoint("127.0.0.1", 8082, HealthStatus.HEALTHY, 1), + endpoint("127.0.0.1", 8083, HealthStatus.DEGRADED, 9)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0, 0)) + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints1, 1)) + // set overprovisioning factor to 100 for simpler calculation + .setPolicy(Policy.newBuilder() + .setOverprovisioningFactor(UInt32Value.of(100)) + .setWeightedPriorityHealth(true)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder() + .setCommonLbConfig(CommonLbConfig.newBuilder() + .setHealthyPanicThreshold(Percent.newBuilder() + .setValue(0))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + + // 0 ~ 9 for priority 0 HEALTHY + ctx.setAttr(XdsAttributeKeys.SELECTION_HASH, 0); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + + // 10 ~ 19 for priority 1 HEALTHY + ctx.setAttr(XdsAttributeKeys.SELECTION_HASH, 10); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + + // 20 ~ 99 for priority 1 DEGRADED + ctx.setAttr(XdsAttributeKeys.SELECTION_HASH, 20); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8083)); + } + } + + @Test + void noHosts() { + final Listener listener = staticResourceListener(); + final List lbEndpoints0 = ImmutableList.of(); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0, 0)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder() + .setCommonLbConfig(CommonLbConfig.newBuilder() + .setHealthyPanicThreshold(Percent.newBuilder() + .setValue(50))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot, true); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + await().pollDelay(3, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(endpointGroup.selectNow(ctx)).isNull()); + } + } + + @Test + void partialPanic() { + final Listener listener = staticResourceListener(); + + // there are no healthy endpoints in priority0 + final List lbEndpoints0 = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.UNHEALTHY), + endpoint("127.0.0.1", 8081, HealthStatus.UNHEALTHY), + endpoint("127.0.0.1", 8082, HealthStatus.UNHEALTHY)); + final List lbEndpoints1 = + ImmutableList.of(endpoint("127.0.0.1", 8083, HealthStatus.HEALTHY)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0, 0)) + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints1, 1)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setCommonLbConfig(CommonLbConfig.newBuilder() + .setHealthyPanicThreshold(Percent.newBuilder() + .setValue(50))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + final ClientRequestContext ctx = + ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + ctx.setAttr(XdsAttributeKeys.SELECTION_HASH, 0); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8083)); + ctx.setAttr(XdsAttributeKeys.SELECTION_HASH, 99); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8083)); + } + } + + @Test + void totalPanic() { + final Listener listener = staticResourceListener(); + + // 0.33 (healthy) * 140 (overprovisioning factor) < 50 (healthyPanicThreshold) + final List lbEndpoints0 = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.HEALTHY), + endpoint("127.0.0.1", 8081, HealthStatus.UNHEALTHY), + endpoint("127.0.0.1", 8082, HealthStatus.UNHEALTHY)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0, 0)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder().setCommonLbConfig(CommonLbConfig.newBuilder() + .setHealthyPanicThreshold(Percent.newBuilder() + .setValue(50))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + // When in panic mode, all endpoints are selected regardless of health status + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8080)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8081)); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(Endpoint.of("127.0.0.1", 8082)); + } + } + + @Test + void onlyUnhealthyPanicDisabled() { + final Listener listener = staticResourceListener(); + + final List lbEndpoints0 = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.UNHEALTHY), + endpoint("127.0.0.1", 8081, HealthStatus.UNHEALTHY), + endpoint("127.0.0.1", 8082, HealthStatus.UNHEALTHY)); + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder() + .setCommonLbConfig(CommonLbConfig.newBuilder().setHealthyPanicThreshold(percent(0))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + // When in panic mode, all endpoints are selected regardless of health status + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isNull(); + assertThat(endpointGroup.selectNow(ctx)).isNull(); + assertThat(endpointGroup.selectNow(ctx)).isNull(); + } + } + + private static Stream healthyLoadZeroArgs() { + return Stream.of( + // panic mode routes traffic to all endpoints + Arguments.of(51, Endpoint.of("127.0.0.1", 8080), Endpoint.of("127.0.0.1", 8081)), + // non-panic mode doesn't route traffic + Arguments.of(49, null, null) + ); + } + + @ParameterizedTest + @MethodSource("healthyLoadZeroArgs") + void healthyLoadZero(int healthyPanicThreshold, @Nullable Endpoint endpoint1, + @Nullable Endpoint endpoint2) { + final Listener listener = staticResourceListener(); + final List lbEndpoints0 = + ImmutableList.of(endpoint("127.0.0.1", 8080, HealthStatus.HEALTHY, 1), + endpoint("127.0.0.1", 8081, HealthStatus.UNHEALTHY, 10000)); + + final ClusterLoadAssignment loadAssignment = + ClusterLoadAssignment + .newBuilder() + .addEndpoints(localityLbEndpoints(Locality.getDefaultInstance(), lbEndpoints0)) + .setPolicy(Policy.newBuilder() + .setWeightedPriorityHealth(true)) + .build(); + final Cluster cluster = createStaticCluster("cluster", loadAssignment) + .toBuilder() + .setCommonLbConfig(CommonLbConfig.newBuilder() + .setHealthyPanicThreshold(percent(healthyPanicThreshold))) + .build(); + + final Bootstrap bootstrap = staticBootstrap(listener, cluster); + try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { + final ListenerRoot listenerRoot = xdsBootstrap.listenerRoot("listener"); + final EndpointGroup endpointGroup = XdsEndpointGroup.of(listenerRoot); + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + + // When in panic mode, all endpoints are selected regardless of health status + await().untilAsserted(() -> assertThat(endpointGroup.whenReady()).isDone()); + final ClientRequestContext ctx = ClientRequestContext.of(HttpRequest.of(HttpMethod.GET, "/")); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint1); + assertThat(endpointGroup.selectNow(ctx)).isEqualTo(endpoint2); + } + } +} diff --git a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/XdsConverterUtilTest.java b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/XdsConverterUtilTest.java index cb7a4621145..b4ead223195 100644 --- a/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/XdsConverterUtilTest.java +++ b/xds/src/test/java/com/linecorp/armeria/xds/client/endpoint/XdsConverterUtilTest.java @@ -48,15 +48,15 @@ void convertEndpointsWithFilterMetadata() { final Metadata metadata1 = metadata(ImmutableMap.of("foo", "foo1")); final LbEndpoint lbEndpoint1 = endpoint("127.0.0.1", 8080, metadata1); final Endpoint endpoint1 = Endpoint.of("127.0.0.1", 8080) - .withAttr(XdsAttributesKeys.LB_ENDPOINT_KEY, lbEndpoint1); + .withAttr(XdsAttributeKeys.LB_ENDPOINT_KEY, lbEndpoint1); final Metadata metadata2 = metadata(ImmutableMap.of("foo", "foo1", "bar", "bar2")); final LbEndpoint lbEndpoint2 = endpoint("127.0.0.1", 8081, metadata2); final Endpoint endpoint2 = Endpoint.of("127.0.0.1", 8081) - .withAttr(XdsAttributesKeys.LB_ENDPOINT_KEY, lbEndpoint2); + .withAttr(XdsAttributeKeys.LB_ENDPOINT_KEY, lbEndpoint2); final Metadata metadata3 = metadata(ImmutableMap.of("foo", "foo1", "bar", "bar1", "baz", "baz1")); final LbEndpoint lbEndpoint3 = endpoint("127.0.0.1", 8082, metadata3); final Endpoint endpoint3 = Endpoint.of("127.0.0.1", 8082) - .withAttr(XdsAttributesKeys.LB_ENDPOINT_KEY, lbEndpoint3); + .withAttr(XdsAttributeKeys.LB_ENDPOINT_KEY, lbEndpoint3); final List endpoints = convertEndpoints(ImmutableList.of(endpoint1, endpoint2, endpoint3), Struct.newBuilder() .putFields("foo", stringValue("foo1"))