Skip to content

Commit

Permalink
[grpclb] Use a separate channel arg to enable load reporting filter (g…
Browse files Browse the repository at this point in the history
…rpc#33465)

This avoids overloading the GRPC_ARG_LB_POLICY_NAME channel arg, which
is really intended for passing from the application to the client
channel.
  • Loading branch information
markdroth committed Jun 22, 2023
1 parent 6e95ceb commit 32a46a9
Showing 1 changed file with 18 additions and 13 deletions.
31 changes: 18 additions & 13 deletions src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@
#define GRPC_GRPCLB_DEFAULT_FALLBACK_TIMEOUT_MS 10000
#define GRPC_GRPCLB_DEFAULT_SUBCHANNEL_DELETION_DELAY_MS 10000

// Channel arg used to enable load reporting filter.
#define GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER \
"grpc.internal.grpclb_enable_load_reporting_filter"

namespace grpc_core {

TraceFlag grpc_lb_glb_trace(false, "glb");
Expand Down Expand Up @@ -503,7 +507,7 @@ class GrpcLb : public LoadBalancingPolicy {
void ShutdownLocked() override;

// Helper functions used in UpdateLocked().
absl::Status UpdateBalancerChannelLocked(const ChannelArgs& args);
absl::Status UpdateBalancerChannelLocked();

void CancelBalancerChannelConnectivityWatchLocked();

Expand Down Expand Up @@ -1524,6 +1528,7 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
const bool is_initial_update = lb_channel_ == nullptr;
config_ = args.config;
GPR_ASSERT(config_ != nullptr);
args_ = std::move(args.args);
// Update fallback address list.
fallback_backend_addresses_ = std::move(args.addresses);
if (fallback_backend_addresses_.ok()) {
Expand All @@ -1536,7 +1541,7 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
}
resolution_note_ = std::move(args.resolution_note);
// Update balancer channel.
absl::Status status = UpdateBalancerChannelLocked(args.args);
absl::Status status = UpdateBalancerChannelLocked();
// Update the existing child policy, if any.
if (child_policy_ != nullptr) CreateOrUpdateChildPolicyLocked();
// If this is the initial update, start the fallback-at-startup checks
Expand Down Expand Up @@ -1577,12 +1582,9 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
// helpers for UpdateLocked()
//

absl::Status GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) {
// Make sure that GRPC_ARG_LB_POLICY_NAME is set in channel args,
// since we use this to trigger the client_load_reporting filter.
args_ = args.Set(GRPC_ARG_LB_POLICY_NAME, "grpclb");
absl::Status GrpcLb::UpdateBalancerChannelLocked() {
// Get balancer addresses.
ServerAddressList balancer_addresses = ExtractBalancerAddresses(args);
ServerAddressList balancer_addresses = ExtractBalancerAddresses(args_);
absl::Status status;
if (balancer_addresses.empty()) {
status = absl::UnavailableError("balancer address list must be non-empty");
Expand All @@ -1591,7 +1593,7 @@ absl::Status GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) {
auto channel_credentials = channel_control_helper()->GetChannelCredentials();
// Construct args for balancer channel.
ChannelArgs lb_channel_args =
BuildBalancerChannelArgs(response_generator_.get(), args);
BuildBalancerChannelArgs(response_generator_.get(), args_);
// Create balancer channel if needed.
if (lb_channel_ == nullptr) {
std::string uri_str = absl::StrCat("fake:///", server_name_);
Expand All @@ -1603,7 +1605,7 @@ absl::Status GrpcLb::UpdateBalancerChannelLocked(const ChannelArgs& args) {
channelz::ChannelNode* child_channelz_node =
grpc_channel_get_channelz_node(lb_channel_);
channelz::ChannelNode* parent_channelz_node =
args.GetObject<channelz::ChannelNode>();
args_.GetObject<channelz::ChannelNode>();
if (child_channelz_node != nullptr && parent_channelz_node != nullptr) {
parent_channelz_node->AddChildChannel(child_channelz_node->uuid());
parent_channelz_node_ = parent_channelz_node->Ref();
Expand Down Expand Up @@ -1728,8 +1730,10 @@ void GrpcLb::OnFallbackTimerLocked() {
ChannelArgs GrpcLb::CreateChildPolicyArgsLocked(
bool is_backend_from_grpclb_load_balancer) {
ChannelArgs r =
args_.Set(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER,
is_backend_from_grpclb_load_balancer);
args_
.Set(GRPC_ARG_ADDRESS_IS_BACKEND_FROM_GRPCLB_LOAD_BALANCER,
is_backend_from_grpclb_load_balancer)
.Set(GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER, 1);
if (is_backend_from_grpclb_load_balancer) {
r = r.Set(GRPC_ARG_INHIBIT_HEALTH_CHECKING, 1);
}
Expand Down Expand Up @@ -1881,8 +1885,9 @@ void RegisterGrpcLbPolicy(CoreConfiguration::Builder* builder) {
builder->channel_init()->RegisterStage(
GRPC_CLIENT_SUBCHANNEL, GRPC_CHANNEL_INIT_BUILTIN_PRIORITY,
[](ChannelStackBuilder* builder) {
if (builder->channel_args().GetString(GRPC_ARG_LB_POLICY_NAME) ==
"grpclb") {
auto enable = builder->channel_args().GetBool(
GRPC_ARG_GRPCLB_ENABLE_LOAD_REPORTING_FILTER);
if (enable.has_value() && *enable) {
builder->PrependFilter(&ClientLoadReportingFilter::kFilter);
}
return true;
Expand Down

0 comments on commit 32a46a9

Please sign in to comment.