Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement locality/priority based load balancing #5610

Merged
merged 13 commits into from
Jun 10, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;

Expand All @@ -35,8 +38,11 @@

final class ClusterEntry implements AsyncCloseable {

private static final Logger logger = LoggerFactory.getLogger(ClusterEntry.class);

private final EndpointsPool endpointsPool;
private final LoadBalancer loadBalancer = new SubsetLoadBalancer();
@Nullable
private volatile LoadBalancer loadBalancer;
private final ClusterManager clusterManager;
private final EventExecutor eventExecutor;
private List<Endpoint> endpoints = ImmutableList.of();
Expand All @@ -51,6 +57,10 @@ final class ClusterEntry implements AsyncCloseable {

@Nullable
Endpoint selectNow(ClientRequestContext ctx) {
final LoadBalancer loadBalancer = this.loadBalancer;
if (loadBalancer == null) {
return null;
}
return loadBalancer.selectNow(ctx);
}

Expand All @@ -64,9 +74,16 @@ void updateClusterSnapshot(ClusterSnapshot clusterSnapshot) {

void accept(ClusterSnapshot clusterSnapshot, List<Endpoint> endpoints) {
assert eventExecutor.inEventLoop();
this.endpoints = endpoints;
final PrioritySet prioritySet = new PrioritySet(endpoints, clusterSnapshot);
loadBalancer.prioritySetUpdated(prioritySet);
this.endpoints = ImmutableList.copyOf(endpoints);
final PrioritySet prioritySet = new PriorityStateManager(clusterSnapshot, endpoints).build();
if (logger.isTraceEnabled()) {
logger.trace("XdsEndpointGroup is using a new PrioritySet({})", prioritySet);
}
if (clusterSnapshot.xdsResource().resource().hasLbSubsetConfig()) {
loadBalancer = new SubsetLoadBalancer(prioritySet);
} else {
loadBalancer = new DefaultLoadBalancer(prioritySet);
}
clusterManager.notifyListeners();
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.xds.client.endpoint;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.Map;

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.client.endpoint.DefaultLbStateFactory.DefaultLbState;

import io.envoyproxy.envoy.config.core.v3.Locality;

final class DefaultLoadBalancer implements LoadBalancer {

private final DefaultLbStateFactory.DefaultLbState lbState;

DefaultLoadBalancer(PrioritySet prioritySet) {
lbState = DefaultLbStateFactory.newInstance(prioritySet);
}

@Override
@Nullable
public Endpoint selectNow(ClientRequestContext ctx) {
final PrioritySet prioritySet = lbState.prioritySet();
if (prioritySet.priorities().isEmpty()) {
return null;
}
final int hash = EndpointUtil.hash(ctx);
final HostsSource hostsSource = hostSourceToUse(lbState, hash);
if (hostsSource == null) {
return null;
}
final HostSet hostSet = prioritySet.hostSets().get(hostsSource.priority);
if (hostSet == null) {
// shouldn't reach here
throw new IllegalStateException("Unable to select a priority for cluster(" +
prioritySet.cluster().getName() + "), hostsSource(" +
hostsSource + ')');
}
switch (hostsSource.sourceType) {
case ALL_HOSTS:
return hostSet.hostsEndpointGroup().selectNow(ctx);
case HEALTHY_HOSTS:
return hostSet.healthyHostsEndpointGroup().selectNow(ctx);
case DEGRADED_HOSTS:
return hostSet.degradedHostsEndpointGroup().selectNow(ctx);
case LOCALITY_HEALTHY_HOSTS:
final Map<Locality, EndpointGroup> healthyLocalities =
hostSet.healthyEndpointGroupPerLocality();
final EndpointGroup healthyEndpointGroup = healthyLocalities.get(hostsSource.locality);
if (healthyEndpointGroup != null) {
return healthyEndpointGroup.selectNow(ctx);
}
break;
case LOCALITY_DEGRADED_HOSTS:
final Map<Locality, EndpointGroup> degradedLocalities =
hostSet.degradedEndpointGroupPerLocality();
final EndpointGroup degradedEndpointGroup = degradedLocalities.get(hostsSource.locality);
if (degradedEndpointGroup != null) {
return degradedEndpointGroup.selectNow(ctx);
}
break;
default:
throw new Error();
}
return null;
}

@Nullable
HostsSource hostSourceToUse(DefaultLbState lbState, int hash) {
final PriorityAndAvailability priorityAndAvailability = lbState.choosePriority(hash);
if (priorityAndAvailability == null) {
return null;
}
final PrioritySet prioritySet = lbState.prioritySet();
final int priority = priorityAndAvailability.priority;
final HostSet hostSet = prioritySet.hostSets().get(priority);
final HostAvailability hostAvailability = priorityAndAvailability.hostAvailability;
if (lbState.perPriorityPanic().get(priority)) {
if (prioritySet.failTrafficOnPanic()) {
return null;
} else {
return new HostsSource(priority, SourceType.ALL_HOSTS);
}
}

if (prioritySet.localityWeightedBalancing()) {
final Locality locality;
if (hostAvailability == HostAvailability.DEGRADED) {
locality = hostSet.chooseDegradedLocality();
} else {
locality = hostSet.chooseHealthyLocality();
}
if (locality != null) {
return new HostsSource(priority, localitySourceType(hostAvailability), locality);
}
}

// don't do zone aware routing for now
return new HostsSource(priority, sourceType(hostAvailability), null);
}

private static SourceType localitySourceType(HostAvailability hostAvailability) {
final SourceType sourceType;
switch (hostAvailability) {
case HEALTHY:
sourceType = SourceType.LOCALITY_HEALTHY_HOSTS;
break;
case DEGRADED:
sourceType = SourceType.LOCALITY_DEGRADED_HOSTS;
break;
default:
throw new Error();
}
return sourceType;
}

private static SourceType sourceType(HostAvailability hostAvailability) {
final SourceType sourceType;
switch (hostAvailability) {
case HEALTHY:
sourceType = SourceType.HEALTHY_HOSTS;
break;
case DEGRADED:
sourceType = SourceType.DEGRADED_HOSTS;
break;
default:
throw new Error();
}
return sourceType;
}

static class PriorityAndAvailability {
final int priority;
final HostAvailability hostAvailability;

PriorityAndAvailability(int priority, HostAvailability hostAvailability) {
this.priority = priority;
this.hostAvailability = hostAvailability;
}
}

static class HostsSource {
final int priority;
final SourceType sourceType;
@Nullable
final Locality locality;

HostsSource(int priority, SourceType sourceType) {
this(priority, sourceType, null);
}

HostsSource(int priority, SourceType sourceType, @Nullable Locality locality) {
if (sourceType == SourceType.LOCALITY_HEALTHY_HOSTS ||
sourceType == SourceType.LOCALITY_DEGRADED_HOSTS) {
checkArgument(locality != null, "Locality must be non-null for %s", sourceType);
}
this.priority = priority;
this.sourceType = sourceType;
this.locality = locality;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("priority", priority)
.add("sourceType", sourceType)
.add("locality", locality)
.toString();
}
}

enum SourceType {
ALL_HOSTS,
HEALTHY_HOSTS,
DEGRADED_HOSTS,
LOCALITY_HEALTHY_HOSTS,
LOCALITY_DEGRADED_HOSTS,
}

enum HostAvailability {
HEALTHY,
DEGRADED,
}

static class DistributeLoadState {
final int totalLoad;
final int firstAvailablePriority;

DistributeLoadState(int totalLoad, int firstAvailablePriority) {
this.totalLoad = totalLoad;
this.firstAvailablePriority = firstAvailablePriority;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.xds.client.endpoint;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableMap;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy;

import io.envoyproxy.envoy.config.core.v3.Locality;

final class EndpointGroupUtil {

static Map<Locality, List<Endpoint>> endpointsByLocality(List<Endpoint> endpoints) {
return endpoints.stream().collect(Collectors.groupingBy(EndpointUtil::locality));
}

static EndpointGroup filter(List<Endpoint> endpoints, EndpointSelectionStrategy strategy,
Predicate<Endpoint> predicate) {
final List<Endpoint> filteredEndpoints =
endpoints.stream().filter(predicate).collect(Collectors.toList());
return EndpointGroup.of(strategy, filteredEndpoints);
}

static EndpointGroup filter(EndpointGroup origEndpointGroup, Predicate<Endpoint> predicate) {
return filter(origEndpointGroup.endpoints(), origEndpointGroup.selectionStrategy(), predicate);
}

static Map<Locality, EndpointGroup> filterByLocality(Map<Locality, List<Endpoint>> endpointsMap,
EndpointSelectionStrategy strategy,
Predicate<Endpoint> predicate) {
final ImmutableMap.Builder<Locality, EndpointGroup> filteredLocality = ImmutableMap.builder();
for (Entry<Locality, List<Endpoint>> entry: endpointsMap.entrySet()) {
final EndpointGroup endpointGroup = filter(entry.getValue(), strategy, predicate);
if (endpointGroup.endpoints().isEmpty()) {
continue;
}
filteredLocality.put(entry.getKey(), endpointGroup);
}
return filteredLocality.build();
}

static Map<Locality, EndpointGroup> filterByLocality(Map<Locality, EndpointGroup> origLocality,
Predicate<Endpoint> predicate) {
final ImmutableMap.Builder<Locality, EndpointGroup> filteredLocality = ImmutableMap.builder();
for (Entry<Locality, EndpointGroup> entry: origLocality.entrySet()) {
final EndpointGroup endpointGroup = filter(entry.getValue(), predicate);
if (endpointGroup.endpoints().isEmpty()) {
continue;
}
filteredLocality.put(entry.getKey(), endpointGroup);
}
return filteredLocality.build();
}

private EndpointGroupUtil() {}
}