Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement locality/priority based load balancing #5610

Merged
merged 13 commits into from
Jun 10, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,27 @@
import com.linecorp.armeria.xds.ClusterSnapshot;
import com.linecorp.armeria.xds.EndpointSnapshot;

import io.envoyproxy.envoy.config.cluster.v3.Cluster;
import io.envoyproxy.envoy.config.endpoint.v3.ClusterLoadAssignment;

final class ClusterEntry implements Consumer<List<Endpoint>>, AsyncCloseable {

private final EndpointGroup endpointGroup;
private final Cluster cluster;
private final ClusterLoadAssignment clusterLoadAssignment;
private final LoadBalancer loadBalancer;
private List<Endpoint> endpoints = ImmutableList.of();

ClusterEntry(ClusterSnapshot clusterSnapshot, ClusterManager clusterManager) {
final EndpointSnapshot endpointSnapshot = clusterSnapshot.endpointSnapshot();
assert endpointSnapshot != null;
loadBalancer = new SubsetLoadBalancer(clusterSnapshot);
cluster = clusterSnapshot.xdsResource().resource();
clusterLoadAssignment = endpointSnapshot.xdsResource().resource();
if (cluster.hasLbSubsetConfig()) {
loadBalancer = new SubsetLoadBalancer(clusterSnapshot);
} else {
loadBalancer = new DefaultLoadBalancer();
}

// The order of adding listeners is important
endpointGroup = XdsEndpointUtil.convertEndpointGroup(clusterSnapshot);
Expand All @@ -58,7 +69,12 @@ Endpoint selectNow(ClientRequestContext ctx) {
@Override
public void accept(List<Endpoint> endpoints) {
this.endpoints = ImmutableList.copyOf(endpoints);
final PrioritySet prioritySet = new PrioritySet(endpoints);
final PriorityStateManager priorityStateManager =
new PriorityStateManager(cluster, clusterLoadAssignment, endpoints);
for (Endpoint endpoint: endpoints) {
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
priorityStateManager.registerEndpoint(endpoint);
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
}
final PrioritySet prioritySet = priorityStateManager.build();
loadBalancer.prioritySetUpdated(prioritySet);
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.xds.client.endpoint;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.Map;

import com.linecorp.armeria.client.ClientRequestContext;
import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.common.annotation.Nullable;
import com.linecorp.armeria.xds.client.endpoint.DefaultLbStateFactory.DefaultLbState;

import io.envoyproxy.envoy.config.core.v3.Locality;

final class DefaultLoadBalancer implements LoadBalancer {

@Nullable
private DefaultLbStateFactory.DefaultLbState lbState;

@Override
@Nullable
public Endpoint selectNow(ClientRequestContext ctx) {
final DefaultLbState lbState = this.lbState;
if (lbState == null) {
return null;
}
final PrioritySet prioritySet = lbState.prioritySet();
if (prioritySet.priorities().isEmpty()) {
return null;
}
final int hash = EndpointUtil.hash(ctx);
final HostsSource hostsSource = hostSourceToUse(lbState, hash);
if (hostsSource == null) {
return null;
}
if (!prioritySet.hostSets().containsKey(hostsSource.priority)) {
return null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question) With the current implementation, does hostsSource.priority always exist so we can't reach here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, let me just throw an exception here to make this clearer 👍

}
final HostSet hostSet = prioritySet.hostSets().get(hostsSource.priority);
switch (hostsSource.sourceType) {
case ALL_HOSTS:
return hostSet.hostsEndpointGroup().selectNow(ctx);
case HEALTHY_HOSTS:
return hostSet.healthyHostsEndpointGroup().selectNow(ctx);
case DEGRADED_HOSTS:
return hostSet.degradedHostsEndpointGroup().selectNow(ctx);
case LOCALITY_HEALTHY_HOSTS:
final Map<Locality, EndpointGroup> healthyLocalities =
hostSet.healthyEndpointGroupPerLocality();
if (healthyLocalities.containsKey(hostsSource.locality)) {
return healthyLocalities.get(hostsSource.locality).selectNow(ctx);
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
}
break;
case LOCALITY_DEGRADED_HOSTS:
final Map<Locality, EndpointGroup> degradedLocalities =
hostSet.degradedEndpointGroupPerLocality();
if (degradedLocalities.containsKey(hostsSource.locality)) {
return degradedLocalities.get(hostsSource.locality).selectNow(ctx);
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
}
break;
default:
throw new Error();
}
return null;
}

@Nullable
HostsSource hostSourceToUse(DefaultLbState lbState, int hash) {
final PriorityAndAvailability priorityAndAvailability = lbState.choosePriority(hash);
if (priorityAndAvailability == null) {
return null;
}
final PrioritySet prioritySet = lbState.prioritySet();
final int priority = priorityAndAvailability.priority;
final HostSet hostSet = prioritySet.hostSets().get(priority);
final HostAvailability hostAvailability = priorityAndAvailability.hostAvailability;
if (lbState.perPriorityPanic().get(priority)) {
if (prioritySet.failTrafficOnPanic()) {
return null;
} else {
return new HostsSource(priority, SourceType.ALL_HOSTS);
}
}

if (prioritySet.localityWeightedBalancing()) {
final Locality locality;
if (hostAvailability == HostAvailability.DEGRADED) {
locality = hostSet.chooseDegradedLocality();
} else {
locality = hostSet.chooseHealthyLocality();
}
if (locality != null) {
return new HostsSource(priority, localitySourceType(hostAvailability), locality);
}
}

// don't do zone aware routing for now
return new HostsSource(priority, sourceType(hostAvailability), null);
}

private static SourceType localitySourceType(HostAvailability hostAvailability) {
final SourceType sourceType;
switch (hostAvailability) {
case HEALTHY:
sourceType = SourceType.LOCALITY_HEALTHY_HOSTS;
break;
case DEGRADED:
sourceType = SourceType.LOCALITY_DEGRADED_HOSTS;
break;
default:
throw new Error();
}
return sourceType;
}

private static SourceType sourceType(HostAvailability hostAvailability) {
final SourceType sourceType;
switch (hostAvailability) {
case HEALTHY:
sourceType = SourceType.HEALTHY_HOSTS;
break;
case DEGRADED:
sourceType = SourceType.DEGRADED_HOSTS;
break;
default:
throw new Error();
}
return sourceType;
}

@Override
public void prioritySetUpdated(PrioritySet prioritySet) {
lbState = DefaultLbStateFactory.newInstance(prioritySet);
}

static class PriorityAndAvailability {
final int priority;
final HostAvailability hostAvailability;

PriorityAndAvailability(int priority, HostAvailability hostAvailability) {
this.priority = priority;
this.hostAvailability = hostAvailability;
}
}

static class HostsSource {
final int priority;
final SourceType sourceType;
@Nullable
final Locality locality;

HostsSource(int priority, SourceType sourceType) {
this(priority, sourceType, null);
}

HostsSource(int priority, SourceType sourceType, @Nullable Locality locality) {
if (sourceType == SourceType.LOCALITY_HEALTHY_HOSTS ||
sourceType == SourceType.LOCALITY_DEGRADED_HOSTS) {
checkArgument(locality != null, "Locality must be non-null for %s", sourceType);
}
this.priority = priority;
this.sourceType = sourceType;
this.locality = locality;
}
}

enum SourceType {
ALL_HOSTS,
HEALTHY_HOSTS,
DEGRADED_HOSTS,
LOCALITY_HEALTHY_HOSTS,
LOCALITY_DEGRADED_HOSTS,
}

enum HostAvailability {
HEALTHY,
DEGRADED,
}

static class DistributeLoadState {
final int totalLoad;
final int firstAvailablePriority;

DistributeLoadState(int totalLoad, int firstAvailablePriority) {
this.totalLoad = totalLoad;
this.firstAvailablePriority = firstAvailablePriority;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2024 LINE Corporation
*
* LINE Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.armeria.xds.client.endpoint;

import static com.linecorp.armeria.xds.client.endpoint.EndpointUtil.locality;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableMap;

import com.linecorp.armeria.client.Endpoint;
import com.linecorp.armeria.client.endpoint.EndpointGroup;
import com.linecorp.armeria.client.endpoint.EndpointSelectionStrategy;

import io.envoyproxy.envoy.config.core.v3.Locality;

final class EndpointGroupUtil {

static Map<Locality, List<Endpoint>> endpointsByLocality(List<Endpoint> endpoints) {
final Map<Locality, List<Endpoint>> endpointsPerLocality = new HashMap<>();
for (Endpoint endpoint : endpoints) {
endpointsPerLocality.computeIfAbsent(locality(endpoint), ignored -> new ArrayList<>())
.add(endpoint);
}
jrhee17 marked this conversation as resolved.
Show resolved Hide resolved
return ImmutableMap.copyOf(endpointsPerLocality);
}

static EndpointGroup filter(List<Endpoint> endpoints, EndpointSelectionStrategy strategy,
Predicate<Endpoint> predicate) {
final List<Endpoint> filteredEndpoints =
endpoints.stream().filter(predicate).collect(Collectors.toList());
return EndpointGroup.of(strategy, filteredEndpoints);
}

static EndpointGroup filter(EndpointGroup origEndpointGroup, Predicate<Endpoint> predicate) {
return filter(origEndpointGroup.endpoints(), origEndpointGroup.selectionStrategy(), predicate);
}

static Map<Locality, EndpointGroup> filterByLocality(Map<Locality, List<Endpoint>> endpointsMap,
EndpointSelectionStrategy strategy,
Predicate<Endpoint> predicate) {
final ImmutableMap.Builder<Locality, EndpointGroup> filteredLocality = ImmutableMap.builder();
for (Entry<Locality, List<Endpoint>> entry: endpointsMap.entrySet()) {
final EndpointGroup endpointGroup = filter(entry.getValue(), strategy, predicate);
if (endpointGroup.endpoints().isEmpty()) {
continue;
}
filteredLocality.put(entry.getKey(), endpointGroup);
}
return filteredLocality.build();
}

static Map<Locality, EndpointGroup> filterByLocality(Map<Locality, EndpointGroup> origLocality,
Predicate<Endpoint> predicate) {
final ImmutableMap.Builder<Locality, EndpointGroup> filteredLocality = ImmutableMap.builder();
for (Entry<Locality, EndpointGroup> entry: origLocality.entrySet()) {
final EndpointGroup endpointGroup = filter(entry.getValue(), predicate);
if (endpointGroup.endpoints().isEmpty()) {
continue;
}
filteredLocality.put(entry.getKey(), endpointGroup);
}
return filteredLocality.build();
}

private EndpointGroupUtil() {}
}
Loading
Loading