Skip to content

Commit

Permalink
Implement locality/priority based load balancing (#5610)
Browse files Browse the repository at this point in the history
Motivation:

This pull request attempts to implement the default [EdfLoadBalancerBase](https://github.com/envoyproxy/envoy/blob/bdff856f461c2261305b8303285e83d6f2cec627/source/common/upstream/load_balancer_impl.h#L508) which is the default load balancer used in envoy. Most load balancer implementations in envoy derive from this base class.

The basic load balancer supports [locality weighted load balancing](https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/locality_weight) and [priority levels](https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/load_balancing/priority).

Note that not all functionality (namely zone aware load balancing, etc..) have been implemented for simplicity.
Additionally, [SubsetLoadBalancer](https://github.com/envoyproxy/envoy/blob/bdff856f461c2261305b8303285e83d6f2cec627/source/extensions/load_balancing_policies/subset/subset_lb.h#L135) has not been implemented in this PR for easier reviewing. The previous `SubsetLoadBalancer` will be replaced in the next (and final) pull request.

This pull request mostly focuses on updating the `LoadBalancer` state when a `ClusterSnapshot` is updated.
The flow is as follows:
1. `ClusterEntry#accept` is called, which indicates a `ClusterSnapshot` has been updated
2. A [PriorityStateManager](https://github.com/envoyproxy/envoy/blob/bdff856f461c2261305b8303285e83d6f2cec627/source/common/upstream/upstream_impl.h#L1347) computes endpoints per priority and locality.
  - https://github.com/envoyproxy/envoy/blob/bdff856f461c2261305b8303285e83d6f2cec627/source/common/upstream/upstream_impl.cc#L2075
3. A [PrioritySet](https://github.com/envoyproxy/envoy/blob/bdff856f461c2261305b8303285e83d6f2cec627/envoy/upstream/upstream.h#L512) is created. This `PrioritySet` contains a map of `Integer -> HostSet` where a [HostSet](https://github.com/envoyproxy/envoy/blob/bdff856f461c2261305b8303285e83d6f2cec627/envoy/upstream/upstream.h#L382) contains host information for each health/locality.
  - https://github.com/envoyproxy/envoy/blob/bdff856f461c2261305b8303285e83d6f2cec627/source/common/upstream/upstream_impl.cc#L2099
  - https://github.com/envoyproxy/envoy/blob/bdff856f461c2261305b8303285e83d6f2cec627/source/common/upstream/upstream_impl.cc#L610
4. A `DefaultLbStateFactory` is created which creates a `LbState` for the `DefaultLoadBalancer`. A `LbState` is a convenience object used to avoid potential race conditions. The `LbState` is replaced atomically from the perspective of `DefaultLoadBalancer`. 
  - In terms of `envoy` source, each load balancer registers a callback which listens for `HostSet` updates. Once a `HostSet` is updated, the load balancer state is updated.
  - https://github.com/envoyproxy/envoy/blob/bdff856f461c2261305b8303285e83d6f2cec627/source/common/upstream/load_balancer_impl.cc#L149-L154
  - Essentially, a range from 1~100 is assigned to each priority where healthy endpoints are prioritized.
5. Lastly, the load balancer chooses a `HostSource`, and subsequently a host from the selected `HostSet`
  - https://github.com/envoyproxy/envoy/blob/bdff856f461c2261305b8303285e83d6f2cec627/source/common/upstream/load_balancer_impl.cc#L1173
  - https://github.com/envoyproxy/envoy/blob/bdff856f461c2261305b8303285e83d6f2cec627/source/common/upstream/load_balancer_impl.cc#L891-L892

ref: #5450

Modifications:

- `ClusterEntry` now creates a `PrioritySet` for each `ClusterSnapshot` update
  - In order to create a `PrioritySet`, utility classes `PriorityStateManager`, `PriorityState`, `HostSet`, `UpdateHostsParam` have been created.
  - Utility classes `EndpointUtil`, `EndpointGroupUtil` have been created
- `DefaultLoadBalancer` has been introduced. The functionality of this `LoadBalancer` is equivalent to `EdfLoadBalancer`.
- `DefaultLbStateFactory` has been introduced to facilitate creating a state for `DefaultLoadBalancer`. The `LbState` created by `DefaultLbStateFactory` is intended to be updated atomically from the perspective of `DefaultLoadBalancer`.

Result:

- Priority and locality based load balancing is now available for `XdsEndpointGroup`

<!--
Visit this URL to learn more about how to write a pull request description:
https://armeria.dev/community/developer-guide#how-to-write-pull-request-description
-->
  • Loading branch information
jrhee17 committed Jun 10, 2024
1 parent b3da82e commit b47d6c9
Show file tree
Hide file tree
Showing 19 changed files with 1,954 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;

Expand All @@ -35,8 +38,11 @@

final class ClusterEntry implements AsyncCloseable {

private static final Logger logger = LoggerFactory.getLogger(ClusterEntry.class);

private final EndpointsPool endpointsPool;
private final LoadBalancer loadBalancer = new SubsetLoadBalancer();
@Nullable
private volatile LoadBalancer loadBalancer;
private final ClusterManager clusterManager;
private final EventExecutor eventExecutor;
private List<Endpoint> endpoints = ImmutableList.of();
Expand All @@ -51,6 +57,10 @@ final class ClusterEntry implements AsyncCloseable {

@Nullable
Endpoint selectNow(ClientRequestContext ctx) {
final LoadBalancer loadBalancer = this.loadBalancer;
if (loadBalancer == null) {
return null;
}
return loadBalancer.selectNow(ctx);
}

Expand All @@ -64,9 +74,16 @@ void updateClusterSnapshot(ClusterSnapshot clusterSnapshot) {

void accept(ClusterSnapshot clusterSnapshot, List<Endpoint> endpoints) {
assert eventExecutor.inEventLoop();
this.endpoints = endpoints;
final PrioritySet prioritySet = new PrioritySet(endpoints, clusterSnapshot);
loadBalancer.prioritySetUpdated(prioritySet);
this.endpoints = ImmutableList.copyOf(endpoints);
final PrioritySet prioritySet = new PriorityStateManager(clusterSnapshot, endpoints).build();
if (logger.isTraceEnabled()) {
logger.trace("XdsEndpointGroup is using a new PrioritySet({})", prioritySet);
}
if (clusterSnapshot.xdsResource().resource().hasLbSubsetConfig()) {
loadBalancer = new SubsetLoadBalancer(prioritySet);
} else {
loadBalancer = new DefaultLoadBalancer(prioritySet);
}
clusterManager.notifyListeners();
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.xds.client.endpoint;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.Map;

import com.google.common.base.MoreObjects;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.client.endpoint.DefaultLbStateFactory.DefaultLbState;

import io.envoyproxy.envoy.config.core.v3.Locality;

final class DefaultLoadBalancer implements LoadBalancer {

private final DefaultLbStateFactory.DefaultLbState lbState;

DefaultLoadBalancer(PrioritySet prioritySet) {
lbState = DefaultLbStateFactory.newInstance(prioritySet);
}

@Override
@Nullable
public Endpoint selectNow(ClientRequestContext ctx) {
final PrioritySet prioritySet = lbState.prioritySet();
if (prioritySet.priorities().isEmpty()) {
return null;
}
final int hash = EndpointUtil.hash(ctx);
final HostsSource hostsSource = hostSourceToUse(lbState, hash);
if (hostsSource == null) {
return null;
}
final HostSet hostSet = prioritySet.hostSets().get(hostsSource.priority);
if (hostSet == null) {
// shouldn't reach here
throw new IllegalStateException("Unable to select a priority for cluster(" +
prioritySet.cluster().getName() + "), hostsSource(" +
hostsSource + ')');
}
switch (hostsSource.sourceType) {
case ALL_HOSTS:
return hostSet.hostsEndpointGroup().selectNow(ctx);
case HEALTHY_HOSTS:
return hostSet.healthyHostsEndpointGroup().selectNow(ctx);
case DEGRADED_HOSTS:
return hostSet.degradedHostsEndpointGroup().selectNow(ctx);
case LOCALITY_HEALTHY_HOSTS:
final Map<Locality, EndpointGroup> healthyLocalities =
hostSet.healthyEndpointGroupPerLocality();
final EndpointGroup healthyEndpointGroup = healthyLocalities.get(hostsSource.locality);
if (healthyEndpointGroup != null) {
return healthyEndpointGroup.selectNow(ctx);
}
break;
case LOCALITY_DEGRADED_HOSTS:
final Map<Locality, EndpointGroup> degradedLocalities =
hostSet.degradedEndpointGroupPerLocality();
final EndpointGroup degradedEndpointGroup = degradedLocalities.get(hostsSource.locality);
if (degradedEndpointGroup != null) {
return degradedEndpointGroup.selectNow(ctx);
}
break;
default:
throw new Error();
}
return null;
}

@Nullable
HostsSource hostSourceToUse(DefaultLbState lbState, int hash) {
final PriorityAndAvailability priorityAndAvailability = lbState.choosePriority(hash);
if (priorityAndAvailability == null) {
return null;
}
final PrioritySet prioritySet = lbState.prioritySet();
final int priority = priorityAndAvailability.priority;
final HostSet hostSet = prioritySet.hostSets().get(priority);
final HostAvailability hostAvailability = priorityAndAvailability.hostAvailability;
if (lbState.perPriorityPanic().get(priority)) {
if (prioritySet.failTrafficOnPanic()) {
return null;
} else {
return new HostsSource(priority, SourceType.ALL_HOSTS);
}
}

if (prioritySet.localityWeightedBalancing()) {
final Locality locality;
if (hostAvailability == HostAvailability.DEGRADED) {
locality = hostSet.chooseDegradedLocality();
} else {
locality = hostSet.chooseHealthyLocality();
}
if (locality != null) {
return new HostsSource(priority, localitySourceType(hostAvailability), locality);
}
}

// don't do zone aware routing for now
return new HostsSource(priority, sourceType(hostAvailability), null);
}

private static SourceType localitySourceType(HostAvailability hostAvailability) {
final SourceType sourceType;
switch (hostAvailability) {
case HEALTHY:
sourceType = SourceType.LOCALITY_HEALTHY_HOSTS;
break;
case DEGRADED:
sourceType = SourceType.LOCALITY_DEGRADED_HOSTS;
break;
default:
throw new Error();
}
return sourceType;
}

private static SourceType sourceType(HostAvailability hostAvailability) {
final SourceType sourceType;
switch (hostAvailability) {
case HEALTHY:
sourceType = SourceType.HEALTHY_HOSTS;
break;
case DEGRADED:
sourceType = SourceType.DEGRADED_HOSTS;
break;
default:
throw new Error();
}
return sourceType;
}

static class PriorityAndAvailability {
final int priority;
final HostAvailability hostAvailability;

PriorityAndAvailability(int priority, HostAvailability hostAvailability) {
this.priority = priority;
this.hostAvailability = hostAvailability;
}
}

static class HostsSource {
final int priority;
final SourceType sourceType;
@Nullable
final Locality locality;

HostsSource(int priority, SourceType sourceType) {
this(priority, sourceType, null);
}

HostsSource(int priority, SourceType sourceType, @Nullable Locality locality) {
if (sourceType == SourceType.LOCALITY_HEALTHY_HOSTS ||
sourceType == SourceType.LOCALITY_DEGRADED_HOSTS) {
checkArgument(locality != null, "Locality must be non-null for %s", sourceType);
}
this.priority = priority;
this.sourceType = sourceType;
this.locality = locality;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("priority", priority)
.add("sourceType", sourceType)
.add("locality", locality)
.toString();
}
}

enum SourceType {
ALL_HOSTS,
HEALTHY_HOSTS,
DEGRADED_HOSTS,
LOCALITY_HEALTHY_HOSTS,
LOCALITY_DEGRADED_HOSTS,
}

enum HostAvailability {
HEALTHY,
DEGRADED,
}

static class DistributeLoadState {
final int totalLoad;
final int firstAvailablePriority;

DistributeLoadState(int totalLoad, int firstAvailablePriority) {
this.totalLoad = totalLoad;
this.firstAvailablePriority = firstAvailablePriority;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.xds.client.endpoint;

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableMap;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy;

import io.envoyproxy.envoy.config.core.v3.Locality;

final class EndpointGroupUtil {

static Map<Locality, List<Endpoint>> endpointsByLocality(List<Endpoint> endpoints) {
return endpoints.stream().collect(Collectors.groupingBy(EndpointUtil::locality));
}

static EndpointGroup filter(List<Endpoint> endpoints, EndpointSelectionStrategy strategy,
Predicate<Endpoint> predicate) {
final List<Endpoint> filteredEndpoints =
endpoints.stream().filter(predicate).collect(Collectors.toList());
return EndpointGroup.of(strategy, filteredEndpoints);
}

static EndpointGroup filter(EndpointGroup origEndpointGroup, Predicate<Endpoint> predicate) {
return filter(origEndpointGroup.endpoints(), origEndpointGroup.selectionStrategy(), predicate);
}

static Map<Locality, EndpointGroup> filterByLocality(Map<Locality, List<Endpoint>> endpointsMap,
EndpointSelectionStrategy strategy,
Predicate<Endpoint> predicate) {
final ImmutableMap.Builder<Locality, EndpointGroup> filteredLocality = ImmutableMap.builder();
for (Entry<Locality, List<Endpoint>> entry: endpointsMap.entrySet()) {
final EndpointGroup endpointGroup = filter(entry.getValue(), strategy, predicate);
if (endpointGroup.endpoints().isEmpty()) {
continue;
}
filteredLocality.put(entry.getKey(), endpointGroup);
}
return filteredLocality.build();
}

static Map<Locality, EndpointGroup> filterByLocality(Map<Locality, EndpointGroup> origLocality,
Predicate<Endpoint> predicate) {
final ImmutableMap.Builder<Locality, EndpointGroup> filteredLocality = ImmutableMap.builder();
for (Entry<Locality, EndpointGroup> entry: origLocality.entrySet()) {
final EndpointGroup endpointGroup = filter(entry.getValue(), predicate);
if (endpointGroup.endpoints().isEmpty()) {
continue;
}
filteredLocality.put(entry.getKey(), endpointGroup);
}
return filteredLocality.build();
}

private EndpointGroupUtil() {}
}
Loading

0 comments on commit b47d6c9

Please sign in to comment.