diff --git a/BUILD b/BUILD
index 348014d7db9e4..78c404c6dd816 100644
--- a/BUILD
+++ b/BUILD
@@ -2917,12 +2917,12 @@ grpc_cc_library(
)
grpc_cc_library(
- name = "server_address",
+ name = "endpoint_addresses",
srcs = [
- "//src/core:lib/resolver/server_address.cc",
+ "//src/core:lib/resolver/endpoint_addresses.cc",
],
hdrs = [
- "//src/core:lib/resolver/server_address.h",
+ "//src/core:lib/resolver/endpoint_addresses.h",
],
external_deps = [
"absl/status",
@@ -2932,6 +2932,7 @@ grpc_cc_library(
language = "c++",
visibility = ["@grpc:client_channel"],
deps = [
+ "gpr",
"gpr_platform",
"sockaddr_utils",
"//src/core:channel_args",
@@ -2940,6 +2941,19 @@ grpc_cc_library(
],
)
+grpc_cc_library(
+ name = "server_address",
+ hdrs = [
+ "//src/core:lib/resolver/server_address.h",
+ ],
+ language = "c++",
+ visibility = ["@grpc:client_channel"],
+ deps = [
+ "endpoint_addresses",
+ "gpr_public_hdrs",
+ ],
+)
+
grpc_cc_library(
name = "grpc_resolver",
srcs = [
@@ -2960,11 +2974,11 @@ grpc_cc_library(
language = "c++",
visibility = ["@grpc:client_channel"],
deps = [
+ "endpoint_addresses",
"gpr",
"grpc_trace",
"orphanable",
"ref_counted_ptr",
- "server_address",
"uri_parser",
"//src/core:channel_args",
"//src/core:grpc_service_config",
@@ -3049,6 +3063,7 @@ grpc_cc_library(
"config",
"config_vars",
"debug_location",
+ "endpoint_addresses",
"exec_ctx",
"gpr",
"grpc_base",
@@ -3065,7 +3080,6 @@ grpc_cc_library(
"promise",
"protobuf_duration_upb",
"ref_counted_ptr",
- "server_address",
"sockaddr_utils",
"stats",
"uri_parser",
@@ -3163,6 +3177,7 @@ grpc_cc_library(
"config",
"config_vars",
"debug_location",
+ "endpoint_addresses",
"exec_ctx",
"gpr",
"grpc_base",
@@ -3174,7 +3189,6 @@ grpc_cc_library(
"orphanable",
"parse_address",
"ref_counted_ptr",
- "server_address",
"sockaddr_utils",
"uri_parser",
"//src/core:channel_args",
@@ -3617,9 +3631,9 @@ grpc_cc_library(
language = "c++",
visibility = ["@grpc:grpclb"],
deps = [
+ "endpoint_addresses",
"gpr_platform",
"grpc_public_hdrs",
- "server_address",
"//src/core:channel_args",
"//src/core:useful",
],
@@ -3737,12 +3751,12 @@ grpc_cc_library(
deps = [
"config",
"debug_location",
+ "endpoint_addresses",
"gpr",
"grpc_public_hdrs",
"grpc_resolver",
"orphanable",
"ref_counted_ptr",
- "server_address",
"uri_parser",
"work_serializer",
"//src/core:channel_args",
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 32b41e65245c4..842d20fc58a32 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -2351,9 +2351,9 @@ add_library(grpc
src/core/lib/promise/party.cc
src/core/lib/promise/sleep.cc
src/core/lib/promise/trace.cc
+ src/core/lib/resolver/endpoint_addresses.cc
src/core/lib/resolver/resolver.cc
src/core/lib/resolver/resolver_registry.cc
- src/core/lib/resolver/server_address.cc
src/core/lib/resource_quota/api.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
@@ -3062,9 +3062,9 @@ add_library(grpc_unsecure
src/core/lib/promise/party.cc
src/core/lib/promise/sleep.cc
src/core/lib/promise/trace.cc
+ src/core/lib/resolver/endpoint_addresses.cc
src/core/lib/resolver/resolver.cc
src/core/lib/resolver/resolver_registry.cc
- src/core/lib/resolver/server_address.cc
src/core/lib/resource_quota/api.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
@@ -5000,9 +5000,9 @@ add_library(grpc_authorization_provider
src/core/lib/promise/activity.cc
src/core/lib/promise/party.cc
src/core/lib/promise/trace.cc
+ src/core/lib/resolver/endpoint_addresses.cc
src/core/lib/resolver/resolver.cc
src/core/lib/resolver/resolver_registry.cc
- src/core/lib/resolver/server_address.cc
src/core/lib/resource_quota/api.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
@@ -24016,9 +24016,9 @@ add_executable(test_core_transport_chaotic_good_frame_test
src/core/lib/promise/activity.cc
src/core/lib/promise/party.cc
src/core/lib/promise/trace.cc
+ src/core/lib/resolver/endpoint_addresses.cc
src/core/lib/resolver/resolver.cc
src/core/lib/resolver/resolver_registry.cc
- src/core/lib/resolver/server_address.cc
src/core/lib/resource_quota/api.cc
src/core/lib/resource_quota/arena.cc
src/core/lib/resource_quota/memory_quota.cc
diff --git a/Makefile b/Makefile
index bea7a4683f459..0ab9ea9c5ead4 100644
--- a/Makefile
+++ b/Makefile
@@ -1579,9 +1579,9 @@ LIBGRPC_SRC = \
src/core/lib/promise/party.cc \
src/core/lib/promise/sleep.cc \
src/core/lib/promise/trace.cc \
+ src/core/lib/resolver/endpoint_addresses.cc \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver_registry.cc \
- src/core/lib/resolver/server_address.cc \
src/core/lib/resource_quota/api.cc \
src/core/lib/resource_quota/arena.cc \
src/core/lib/resource_quota/memory_quota.cc \
@@ -2144,9 +2144,9 @@ LIBGRPC_UNSECURE_SRC = \
src/core/lib/promise/party.cc \
src/core/lib/promise/sleep.cc \
src/core/lib/promise/trace.cc \
+ src/core/lib/resolver/endpoint_addresses.cc \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver_registry.cc \
- src/core/lib/resolver/server_address.cc \
src/core/lib/resource_quota/api.cc \
src/core/lib/resource_quota/arena.cc \
src/core/lib/resource_quota/memory_quota.cc \
diff --git a/Package.swift b/Package.swift
index 70d7105c703b3..8dfd76a7fc2d5 100644
--- a/Package.swift
+++ b/Package.swift
@@ -1474,12 +1474,13 @@ let package = Package(
"src/core/lib/promise/trace.cc",
"src/core/lib/promise/trace.h",
"src/core/lib/promise/try_seq.h",
+ "src/core/lib/resolver/endpoint_addresses.cc",
+ "src/core/lib/resolver/endpoint_addresses.h",
"src/core/lib/resolver/resolver.cc",
"src/core/lib/resolver/resolver.h",
"src/core/lib/resolver/resolver_factory.h",
"src/core/lib/resolver/resolver_registry.cc",
"src/core/lib/resolver/resolver_registry.h",
- "src/core/lib/resolver/server_address.cc",
"src/core/lib/resolver/server_address.h",
"src/core/lib/resource_quota/api.cc",
"src/core/lib/resource_quota/api.h",
diff --git a/build_autogenerated.yaml b/build_autogenerated.yaml
index a709fed35d65c..8a2e61860023e 100644
--- a/build_autogenerated.yaml
+++ b/build_autogenerated.yaml
@@ -879,6 +879,7 @@ libs:
- src/core/lib/promise/sleep.h
- src/core/lib/promise/trace.h
- src/core/lib/promise/try_seq.h
+ - src/core/lib/resolver/endpoint_addresses.h
- src/core/lib/resolver/resolver.h
- src/core/lib/resolver/resolver_factory.h
- src/core/lib/resolver/resolver_registry.h
@@ -1655,9 +1656,9 @@ libs:
- src/core/lib/promise/party.cc
- src/core/lib/promise/sleep.cc
- src/core/lib/promise/trace.cc
+ - src/core/lib/resolver/endpoint_addresses.cc
- src/core/lib/resolver/resolver.cc
- src/core/lib/resolver/resolver_registry.cc
- - src/core/lib/resolver/server_address.cc
- src/core/lib/resource_quota/api.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
@@ -2288,6 +2289,7 @@ libs:
- src/core/lib/promise/sleep.h
- src/core/lib/promise/trace.h
- src/core/lib/promise/try_seq.h
+ - src/core/lib/resolver/endpoint_addresses.h
- src/core/lib/resolver/resolver.h
- src/core/lib/resolver/resolver_factory.h
- src/core/lib/resolver/resolver_registry.h
@@ -2680,9 +2682,9 @@ libs:
- src/core/lib/promise/party.cc
- src/core/lib/promise/sleep.cc
- src/core/lib/promise/trace.cc
+ - src/core/lib/resolver/endpoint_addresses.cc
- src/core/lib/resolver/resolver.cc
- src/core/lib/resolver/resolver_registry.cc
- - src/core/lib/resolver/server_address.cc
- src/core/lib/resource_quota/api.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
@@ -4293,10 +4295,10 @@ libs:
- src/core/lib/promise/seq.h
- src/core/lib/promise/trace.h
- src/core/lib/promise/try_seq.h
+ - src/core/lib/resolver/endpoint_addresses.h
- src/core/lib/resolver/resolver.h
- src/core/lib/resolver/resolver_factory.h
- src/core/lib/resolver/resolver_registry.h
- - src/core/lib/resolver/server_address.h
- src/core/lib/resource_quota/api.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/memory_quota.h
@@ -4568,9 +4570,9 @@ libs:
- src/core/lib/promise/activity.cc
- src/core/lib/promise/party.cc
- src/core/lib/promise/trace.cc
+ - src/core/lib/resolver/endpoint_addresses.cc
- src/core/lib/resolver/resolver.cc
- src/core/lib/resolver/resolver_registry.cc
- - src/core/lib/resolver/server_address.cc
- src/core/lib/resource_quota/api.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
@@ -15815,10 +15817,10 @@ targets:
- src/core/lib/promise/seq.h
- src/core/lib/promise/trace.h
- src/core/lib/promise/try_seq.h
+ - src/core/lib/resolver/endpoint_addresses.h
- src/core/lib/resolver/resolver.h
- src/core/lib/resolver/resolver_factory.h
- src/core/lib/resolver/resolver_registry.h
- - src/core/lib/resolver/server_address.h
- src/core/lib/resource_quota/api.h
- src/core/lib/resource_quota/arena.h
- src/core/lib/resource_quota/memory_quota.h
@@ -16072,9 +16074,9 @@ targets:
- src/core/lib/promise/activity.cc
- src/core/lib/promise/party.cc
- src/core/lib/promise/trace.cc
+ - src/core/lib/resolver/endpoint_addresses.cc
- src/core/lib/resolver/resolver.cc
- src/core/lib/resolver/resolver_registry.cc
- - src/core/lib/resolver/server_address.cc
- src/core/lib/resource_quota/api.cc
- src/core/lib/resource_quota/arena.cc
- src/core/lib/resource_quota/memory_quota.cc
diff --git a/config.m4 b/config.m4
index 069ee44289015..cb82dcb5645e8 100644
--- a/config.m4
+++ b/config.m4
@@ -712,9 +712,9 @@ if test "$PHP_GRPC" != "no"; then
src/core/lib/promise/party.cc \
src/core/lib/promise/sleep.cc \
src/core/lib/promise/trace.cc \
+ src/core/lib/resolver/endpoint_addresses.cc \
src/core/lib/resolver/resolver.cc \
src/core/lib/resolver/resolver_registry.cc \
- src/core/lib/resolver/server_address.cc \
src/core/lib/resource_quota/api.cc \
src/core/lib/resource_quota/arena.cc \
src/core/lib/resource_quota/memory_quota.cc \
diff --git a/config.w32 b/config.w32
index 25e49a76910af..34136004e7f62 100644
--- a/config.w32
+++ b/config.w32
@@ -677,9 +677,9 @@ if (PHP_GRPC != "no") {
"src\\core\\lib\\promise\\party.cc " +
"src\\core\\lib\\promise\\sleep.cc " +
"src\\core\\lib\\promise\\trace.cc " +
+ "src\\core\\lib\\resolver\\endpoint_addresses.cc " +
"src\\core\\lib\\resolver\\resolver.cc " +
"src\\core\\lib\\resolver\\resolver_registry.cc " +
- "src\\core\\lib\\resolver\\server_address.cc " +
"src\\core\\lib\\resource_quota\\api.cc " +
"src\\core\\lib\\resource_quota\\arena.cc " +
"src\\core\\lib\\resource_quota\\memory_quota.cc " +
diff --git a/gRPC-C++.podspec b/gRPC-C++.podspec
index 316a1e5d74eef..b77567e339d16 100644
--- a/gRPC-C++.podspec
+++ b/gRPC-C++.podspec
@@ -974,6 +974,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/trace.h',
'src/core/lib/promise/try_seq.h',
+ 'src/core/lib/resolver/endpoint_addresses.h',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',
'src/core/lib/resolver/resolver_registry.h',
@@ -2043,6 +2044,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/trace.h',
'src/core/lib/promise/try_seq.h',
+ 'src/core/lib/resolver/endpoint_addresses.h',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',
'src/core/lib/resolver/resolver_registry.h',
diff --git a/gRPC-Core.podspec b/gRPC-Core.podspec
index 167607b950a10..1f38f8413059c 100644
--- a/gRPC-Core.podspec
+++ b/gRPC-Core.podspec
@@ -1575,12 +1575,13 @@ Pod::Spec.new do |s|
'src/core/lib/promise/trace.cc',
'src/core/lib/promise/trace.h',
'src/core/lib/promise/try_seq.h',
+ 'src/core/lib/resolver/endpoint_addresses.cc',
+ 'src/core/lib/resolver/endpoint_addresses.h',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',
'src/core/lib/resolver/resolver_registry.cc',
'src/core/lib/resolver/resolver_registry.h',
- 'src/core/lib/resolver/server_address.cc',
'src/core/lib/resolver/server_address.h',
'src/core/lib/resource_quota/api.cc',
'src/core/lib/resource_quota/api.h',
@@ -2794,6 +2795,7 @@ Pod::Spec.new do |s|
'src/core/lib/promise/sleep.h',
'src/core/lib/promise/trace.h',
'src/core/lib/promise/try_seq.h',
+ 'src/core/lib/resolver/endpoint_addresses.h',
'src/core/lib/resolver/resolver.h',
'src/core/lib/resolver/resolver_factory.h',
'src/core/lib/resolver/resolver_registry.h',
diff --git a/grpc.gemspec b/grpc.gemspec
index 643d80bd00633..a30935f085c85 100644
--- a/grpc.gemspec
+++ b/grpc.gemspec
@@ -1480,12 +1480,13 @@ Gem::Specification.new do |s|
s.files += %w( src/core/lib/promise/trace.cc )
s.files += %w( src/core/lib/promise/trace.h )
s.files += %w( src/core/lib/promise/try_seq.h )
+ s.files += %w( src/core/lib/resolver/endpoint_addresses.cc )
+ s.files += %w( src/core/lib/resolver/endpoint_addresses.h )
s.files += %w( src/core/lib/resolver/resolver.cc )
s.files += %w( src/core/lib/resolver/resolver.h )
s.files += %w( src/core/lib/resolver/resolver_factory.h )
s.files += %w( src/core/lib/resolver/resolver_registry.cc )
s.files += %w( src/core/lib/resolver/resolver_registry.h )
- s.files += %w( src/core/lib/resolver/server_address.cc )
s.files += %w( src/core/lib/resolver/server_address.h )
s.files += %w( src/core/lib/resource_quota/api.cc )
s.files += %w( src/core/lib/resource_quota/api.h )
diff --git a/grpc.gyp b/grpc.gyp
index 48849be61d9dd..f97bc1539db07 100644
--- a/grpc.gyp
+++ b/grpc.gyp
@@ -895,9 +895,9 @@
'src/core/lib/promise/party.cc',
'src/core/lib/promise/sleep.cc',
'src/core/lib/promise/trace.cc',
+ 'src/core/lib/resolver/endpoint_addresses.cc',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver_registry.cc',
- 'src/core/lib/resolver/server_address.cc',
'src/core/lib/resource_quota/api.cc',
'src/core/lib/resource_quota/arena.cc',
'src/core/lib/resource_quota/memory_quota.cc',
@@ -1400,9 +1400,9 @@
'src/core/lib/promise/party.cc',
'src/core/lib/promise/sleep.cc',
'src/core/lib/promise/trace.cc',
+ 'src/core/lib/resolver/endpoint_addresses.cc',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver_registry.cc',
- 'src/core/lib/resolver/server_address.cc',
'src/core/lib/resource_quota/api.cc',
'src/core/lib/resource_quota/arena.cc',
'src/core/lib/resource_quota/memory_quota.cc',
@@ -2135,9 +2135,9 @@
'src/core/lib/promise/activity.cc',
'src/core/lib/promise/party.cc',
'src/core/lib/promise/trace.cc',
+ 'src/core/lib/resolver/endpoint_addresses.cc',
'src/core/lib/resolver/resolver.cc',
'src/core/lib/resolver/resolver_registry.cc',
- 'src/core/lib/resolver/server_address.cc',
'src/core/lib/resource_quota/api.cc',
'src/core/lib/resource_quota/arena.cc',
'src/core/lib/resource_quota/memory_quota.cc',
diff --git a/include/grpc/impl/channel_arg_names.h b/include/grpc/impl/channel_arg_names.h
index 669529baa8738..565339a872865 100644
--- a/include/grpc/impl/channel_arg_names.h
+++ b/include/grpc/impl/channel_arg_names.h
@@ -370,6 +370,10 @@
/** Configure the Differentiated Services Code Point used on outgoing packets.
* Integer value ranging from 0 to 63. */
#define GRPC_ARG_DSCP "grpc.dscp"
+/** Connection Attempt Delay for use in Happy Eyeballs, in milliseconds.
+ * Defaults to 250ms. */
+#define GRPC_ARG_HAPPY_EYEBALLS_CONNECTION_ATTEMPT_DELAY_MS \
+ "grpc.happy_eyeballs_connection_attempt_delay_ms"
/** \} */
#endif /* GRPC_IMPL_CHANNEL_ARG_NAMES_H */
diff --git a/package.xml b/package.xml
index 2bb4386c27af7..7ac8ceb0a3ae4 100644
--- a/package.xml
+++ b/package.xml
@@ -1462,12 +1462,13 @@
+
+
-
diff --git a/src/core/BUILD b/src/core/BUILD
index 4e59c1dc97c1b..bb810ed5c6dd3 100644
--- a/src/core/BUILD
+++ b/src/core/BUILD
@@ -2760,8 +2760,10 @@ grpc_cc_library(
"iomgr_fwd",
"pollset_set",
"ref_counted",
+ "resolved_address",
"subchannel_interface",
"//:debug_location",
+ "//:endpoint_addresses",
"//:event_engine_base_hdrs",
"//:exec_ctx",
"//:gpr",
@@ -2769,7 +2771,6 @@ grpc_cc_library(
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:work_serializer",
],
)
@@ -2833,13 +2834,13 @@ grpc_cc_library(
deps = [
"channel_args",
"lb_policy",
+ "resolved_address",
"subchannel_interface",
"//:debug_location",
"//:event_engine_base_hdrs",
"//:gpr_platform",
"//:grpc_security_base",
"//:ref_counted_ptr",
- "//:server_address",
],
)
@@ -4059,6 +4060,7 @@ grpc_cc_library(
"//:channel_stack_builder",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:exec_ctx",
"//:gpr",
"//:grpc_base",
@@ -4074,7 +4076,6 @@ grpc_cc_library(
"//:protobuf_duration_upb",
"//:protobuf_timestamp_upb",
"//:ref_counted_ptr",
- "//:server_address",
"//:sockaddr_utils",
"//:work_serializer",
],
@@ -4150,6 +4151,7 @@ grpc_cc_library(
"//:channel_arg_names",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:exec_ctx",
"//:gpr",
"//:grpc_base",
@@ -4162,7 +4164,6 @@ grpc_cc_library(
"//:orphanable",
"//:ref_counted_ptr",
"//:rls_upb",
- "//:server_address",
"//:work_serializer",
],
)
@@ -4356,6 +4357,7 @@ grpc_cc_library(
"//:channel_arg_names",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:exec_ctx",
"//:gpr",
"//:grpc_base",
@@ -4368,7 +4370,6 @@ grpc_cc_library(
"//:orphanable",
"//:parse_address",
"//:ref_counted_ptr",
- "//:server_address",
"//:sockaddr_utils",
"//:tsi_ssl_credentials",
"//:uri_parser",
@@ -4522,8 +4523,8 @@ grpc_cc_library(
],
language = "c++",
deps = [
+ "//:endpoint_addresses",
"//:gpr_platform",
- "//:server_address",
],
)
@@ -4559,6 +4560,7 @@ grpc_cc_library(
"//:channel_arg_names",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:gpr",
"//:grpc_base",
"//:grpc_client_channel",
@@ -4567,7 +4569,6 @@ grpc_cc_library(
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:work_serializer",
"//:xds_client",
],
@@ -4601,17 +4602,18 @@ grpc_cc_library(
"lb_policy_registry",
"pollset_set",
"ref_counted",
+ "resolved_address",
"subchannel_interface",
"validation_errors",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:gpr",
"//:grpc_base",
"//:grpc_client_channel",
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:xds_client",
],
)
@@ -4643,6 +4645,7 @@ grpc_cc_library(
"validation_errors",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:exec_ctx",
"//:gpr",
"//:gpr_platform",
@@ -4651,7 +4654,6 @@ grpc_cc_library(
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:work_serializer",
],
)
@@ -4683,12 +4685,12 @@ grpc_cc_library(
"validation_errors",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:gpr",
"//:grpc_base",
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:xds_client",
],
)
@@ -4710,9 +4712,9 @@ grpc_cc_library(
"channel_args",
"ref_counted",
"ref_counted_string",
+ "//:endpoint_addresses",
"//:gpr_platform",
"//:ref_counted_ptr",
- "//:server_address",
],
)
@@ -4809,14 +4811,15 @@ grpc_cc_library(
"lb_policy",
"lb_policy_registry",
"pollset_set",
+ "resolved_address",
"subchannel_interface",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:gpr",
"//:grpc_base",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:work_serializer",
],
)
@@ -4848,14 +4851,19 @@ grpc_cc_library(
"lb_policy",
"lb_policy_factory",
"subchannel_interface",
+ "time",
+ "useful",
+ "//:channel_arg_names",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
+ "//:exec_ctx",
"//:gpr",
"//:grpc_base",
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
+ "//:work_serializer",
],
)
@@ -4897,6 +4905,7 @@ grpc_cc_library(
"//:channel_arg_names",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:exec_ctx",
"//:gpr",
"//:grpc_base",
@@ -4904,7 +4913,6 @@ grpc_cc_library(
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:sockaddr_utils",
"//:work_serializer",
],
@@ -4934,6 +4942,7 @@ grpc_cc_library(
"subchannel_interface",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:gpr",
"//:grpc_base",
"//:grpc_trace",
@@ -4996,6 +5005,7 @@ grpc_cc_library(
"validation_errors",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:exec_ctx",
"//:gpr",
"//:grpc_base",
@@ -5053,11 +5063,14 @@ grpc_cc_library(
"lb_policy_registry",
"pollset_set",
"ref_counted",
+ "resolved_address",
"subchannel_interface",
"unique_type_name",
+ "useful",
"validation_errors",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:exec_ctx",
"//:gpr",
"//:grpc_base",
@@ -5065,7 +5078,6 @@ grpc_cc_library(
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:sockaddr_utils",
"//:work_serializer",
],
@@ -5099,6 +5111,7 @@ grpc_cc_library(
"//:channel_arg_names",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:exec_ctx",
"//:gpr",
"//:grpc_base",
@@ -5106,7 +5119,6 @@ grpc_cc_library(
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:work_serializer",
],
)
@@ -5140,6 +5152,7 @@ grpc_cc_library(
"validation_errors",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:exec_ctx",
"//:gpr",
"//:grpc_base",
@@ -5147,7 +5160,6 @@ grpc_cc_library(
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:work_serializer",
],
)
@@ -5186,10 +5198,12 @@ grpc_cc_library(
"lb_policy_registry",
"match",
"pollset_set",
+ "resolved_address",
"subchannel_interface",
"validation_errors",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:exec_ctx",
"//:gpr",
"//:grpc_base",
@@ -5197,7 +5211,6 @@ grpc_cc_library(
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:sockaddr_utils",
"//:work_serializer",
],
@@ -5365,6 +5378,7 @@ grpc_cc_library(
"//:backoff",
"//:channel_arg_names",
"//:debug_location",
+ "//:endpoint_addresses",
"//:exec_ctx",
"//:gpr",
"//:gpr_platform",
@@ -5375,7 +5389,6 @@ grpc_cc_library(
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:uri_parser",
],
)
@@ -5427,13 +5440,13 @@ grpc_cc_library(
"//:channel_arg_names",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:gpr",
"//:grpc_base",
"//:grpc_resolver",
"//:grpc_trace",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:uri_parser",
],
)
@@ -5453,11 +5466,11 @@ grpc_cc_library(
"iomgr_port",
"resolved_address",
"//:config",
+ "//:endpoint_addresses",
"//:gpr",
"//:grpc_resolver",
"//:orphanable",
"//:parse_address",
- "//:server_address",
"//:uri_parser",
],
)
@@ -5480,10 +5493,10 @@ grpc_cc_library(
"resolved_address",
"status_helper",
"//:config",
+ "//:endpoint_addresses",
"//:gpr",
"//:grpc_resolver",
"//:orphanable",
- "//:server_address",
"//:uri_parser",
],
)
@@ -5540,6 +5553,7 @@ grpc_cc_library(
"//:channel_arg_names",
"//:config",
"//:debug_location",
+ "//:endpoint_addresses",
"//:gpr",
"//:grpc_base",
"//:grpc_client_channel",
@@ -5550,7 +5564,6 @@ grpc_cc_library(
"//:legacy_context",
"//:orphanable",
"//:ref_counted_ptr",
- "//:server_address",
"//:uri_parser",
"//:work_serializer",
"//:xds_client",
diff --git a/src/core/ext/filters/client_channel/client_channel.cc b/src/core/ext/filters/client_channel/client_channel.cc
index 5cab65373f54e..06a5ea881f858 100644
--- a/src/core/ext/filters/client_channel/client_channel.cc
+++ b/src/core/ext/filters/client_channel/client_channel.cc
@@ -82,6 +82,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/polling_entity.h"
#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
@@ -93,8 +94,8 @@
#include "src/core/lib/promise/poll.h"
#include "src/core/lib/promise/promise.h"
#include "src/core/lib/promise/try_seq.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/resolver/resolver_registry.h"
-#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/service_config/service_config_call_data.h"
#include "src/core/lib/service_config/service_config_impl.h"
@@ -1084,15 +1085,16 @@ class ClientChannel::ClientChannelControlHelper
}
RefCountedPtr CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) override
+ const grpc_resolved_address& address, const ChannelArgs& per_address_args,
+ const ChannelArgs& args) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(*chand_->work_serializer_) {
if (chand_->resolver_ == nullptr) return nullptr; // Shutting down.
ChannelArgs subchannel_args = ClientChannel::MakeSubchannelArgs(
- args, address.args(), chand_->subchannel_pool_,
+ args, per_address_args, chand_->subchannel_pool_,
chand_->default_authority_);
// Create subchannel.
RefCountedPtr subchannel =
- chand_->client_channel_factory_->CreateSubchannel(address.address(),
+ chand_->client_channel_factory_->CreateSubchannel(address,
subchannel_args);
if (subchannel == nullptr) return nullptr;
// Make sure the subchannel has updated keepalive time.
diff --git a/src/core/ext/filters/client_channel/lb_policy/address_filtering.cc b/src/core/ext/filters/client_channel/lb_policy/address_filtering.cc
index 8dcc5f8d5c411..b6e8396b95e23 100644
--- a/src/core/ext/filters/client_channel/lb_policy/address_filtering.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/address_filtering.cc
@@ -44,18 +44,19 @@ int HierarchicalPathArg::ChannelArgsCompare(const HierarchicalPathArg* a,
}
absl::StatusOr MakeHierarchicalAddressMap(
- const absl::StatusOr& addresses) {
+ const absl::StatusOr& addresses) {
if (!addresses.ok()) return addresses.status();
HierarchicalAddressMap result;
RefCountedPtr remaining_path_attr;
- for (const ServerAddress& address : *addresses) {
- const auto* path_arg = address.args().GetObject();
+ for (const EndpointAddresses& endpoint_addresses : *addresses) {
+ const auto* path_arg =
+ endpoint_addresses.args().GetObject();
if (path_arg == nullptr) continue;
const std::vector& path = path_arg->path();
auto it = path.begin();
if (it == path.end()) continue;
- ServerAddressList& target_list = result[*it];
- ChannelArgs args = address.args();
+ EndpointAddressesList& target_list = result[*it];
+ ChannelArgs args = endpoint_addresses.args();
++it;
if (it != path.end()) {
std::vector remaining_path(it, path.end());
@@ -66,7 +67,7 @@ absl::StatusOr MakeHierarchicalAddressMap(
}
args = args.SetObject(remaining_path_attr);
}
- target_list.emplace_back(address.address(), args);
+ target_list.emplace_back(endpoint_addresses.addresses(), args);
}
return result;
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/address_filtering.h b/src/core/ext/filters/client_channel/lb_policy/address_filtering.h
index b17c13bbb8c14..d0e2faae29451 100644
--- a/src/core/ext/filters/client_channel/lb_policy/address_filtering.h
+++ b/src/core/ext/filters/client_channel/lb_policy/address_filtering.h
@@ -28,7 +28,7 @@
#include "src/core/lib/gprpp/ref_counted.h"
#include "src/core/lib/gprpp/ref_counted_string.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
// The resolver returns a flat list of addresses. When a hierarchy of
// LB policies is in use, each leaf of the hierarchy will need a
@@ -102,15 +102,15 @@ class HierarchicalPathArg : public RefCounted {
std::vector path_;
};
-// A map from the next path element to the addresses that fall under
-// that path element.
+// A map from the next path element to the endpoint addresses that fall
+// under that path element.
using HierarchicalAddressMap =
- std::map;
// Splits up the addresses into a separate list for each child.
absl::StatusOr MakeHierarchicalAddressMap(
- const absl::StatusOr& addresses);
+ const absl::StatusOr& addresses);
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
index 9478f7c2fa985..f82ac6f53d16b 100644
--- a/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/child_policy_handler.cc
@@ -32,10 +32,10 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/load_balancing/delegating_helper.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
-#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
@@ -52,11 +52,12 @@ class ChildPolicyHandler::Helper
: ParentOwningDelegatingChannelControlHelper(std::move(parent)) {}
RefCountedPtr CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) override {
+ const grpc_resolved_address& address, const ChannelArgs& per_address_args,
+ const ChannelArgs& args) override {
if (parent()->shutting_down_) return nullptr;
if (!CalledByCurrentChild() && !CalledByPendingChild()) return nullptr;
return parent()->channel_control_helper()->CreateSubchannel(
- std::move(address), args);
+ address, per_address_args, args);
}
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
diff --git a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc b/src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc
index 9269359d74848..bc0e0bb6ca243 100644
--- a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/endpoint_list.cc
@@ -44,7 +44,7 @@
#include "src/core/lib/load_balancing/delegating_helper.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
namespace grpc_core {
@@ -61,8 +61,9 @@ class EndpointList::Endpoint::Helper
~Helper() override { endpoint_.reset(DEBUG_LOCATION, "Helper"); }
RefCountedPtr CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) override {
- return endpoint_->CreateSubchannel(std::move(address), args);
+ const grpc_resolved_address& address, const ChannelArgs& per_address_args,
+ const ChannelArgs& args) override {
+ return endpoint_->CreateSubchannel(address, per_address_args, args);
}
void UpdateState(
@@ -86,7 +87,7 @@ class EndpointList::Endpoint::Helper
//
void EndpointList::Endpoint::Init(
- const ServerAddress& address, const ChannelArgs& args,
+ const EndpointAddresses& addresses, const ChannelArgs& args,
std::shared_ptr work_serializer) {
ChannelArgs child_args =
args.Set(GRPC_ARG_INTERNAL_PICK_FIRST_ENABLE_HEALTH_CHECKING, true)
@@ -118,7 +119,7 @@ void EndpointList::Endpoint::Init(
GPR_ASSERT(config.ok());
// Update child policy.
LoadBalancingPolicy::UpdateArgs update_args;
- update_args.addresses.emplace().emplace_back(address);
+ update_args.addresses.emplace().emplace_back(addresses);
update_args.args = child_args;
update_args.config = std::move(*config);
// TODO(roth): If the child reports a non-OK status with the update,
@@ -152,9 +153,10 @@ size_t EndpointList::Endpoint::Index() const {
}
RefCountedPtr EndpointList::Endpoint::CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) {
+ const grpc_resolved_address& address, const ChannelArgs& per_address_args,
+ const ChannelArgs& args) {
return endpoint_list_->channel_control_helper()->CreateSubchannel(
- std::move(address), args);
+ address, per_address_args, args);
}
//
@@ -162,13 +164,14 @@ RefCountedPtr EndpointList::Endpoint::CreateSubchannel(
//
void EndpointList::Init(
- const ServerAddressList& addresses, const ChannelArgs& args,
- absl::AnyInvocable(
- RefCountedPtr, const ServerAddress&, const ChannelArgs&)>
+ const EndpointAddressesList& endpoints, const ChannelArgs& args,
+ absl::AnyInvocable(RefCountedPtr,
+ const EndpointAddresses&,
+ const ChannelArgs&)>
create_endpoint) {
- for (const ServerAddress& address : addresses) {
+ for (const EndpointAddresses& addresses : endpoints) {
endpoints_.push_back(
- create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), address, args));
+ create_endpoint(Ref(DEBUG_LOCATION, "Endpoint"), addresses, args));
}
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h b/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h
index 66fce2871e4bf..df31bc39c0e83 100644
--- a/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/endpoint_list.h
@@ -36,9 +36,10 @@
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
#include "src/core/lib/gprpp/work_serializer.h"
+#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
namespace grpc_core {
@@ -52,16 +53,17 @@ namespace grpc_core {
class MyEndpointList : public EndpointList {
public:
MyEndpointList(RefCountedPtr lb_policy,
- const ServerAddressList& addresses, const ChannelArgs& args)
+ const EndpointAddressesList& endpoints,
+ const ChannelArgs& args)
: EndpointList(std::move(lb_policy),
GRPC_TRACE_FLAG_ENABLED(grpc_my_tracer)
? "MyEndpointList"
: nullptr) {
- Init(addresses, args,
+ Init(endpoints, args,
[&](RefCountedPtr endpoint_list,
- const ServerAddress& address, const ChannelArgs& args) {
+ const EndpointAddresses& addresses, const ChannelArgs& args) {
return MakeOrphanable(
- std::move(endpoint_list), address, args,
+ std::move(endpoint_list), addresses, args,
policy()->work_serializer());
});
}
@@ -70,10 +72,10 @@ class MyEndpointList : public EndpointList {
class MyEndpoint : public Endpoint {
public:
MyEndpoint(RefCountedPtr endpoint_list,
- const ServerAddress& address, const ChannelArgs& args,
+ const EndpointAddresses& address, const ChannelArgs& args,
std::shared_ptr work_serializer)
: Endpoint(std::move(endpoint_list)) {
- Init(address, args, std::move(work_serializer));
+ Init(addresses, args, std::move(work_serializer));
}
private:
@@ -119,7 +121,7 @@ class EndpointList : public InternallyRefCounted {
explicit Endpoint(RefCountedPtr endpoint_list)
: endpoint_list_(std::move(endpoint_list)) {}
- void Init(const ServerAddress& address, const ChannelArgs& args,
+ void Init(const EndpointAddresses& addresses, const ChannelArgs& args,
std::shared_ptr work_serializer);
// Templated for convenience, to provide a short-hand for
@@ -150,7 +152,8 @@ class EndpointList : public InternallyRefCounted {
// Called to create a subchannel. Subclasses may override.
virtual RefCountedPtr CreateSubchannel(
- ServerAddress address, const ChannelArgs& args);
+ const grpc_resolved_address& address,
+ const ChannelArgs& per_address_args, const ChannelArgs& args);
RefCountedPtr endpoint_list_;
@@ -181,9 +184,9 @@ class EndpointList : public InternallyRefCounted {
EndpointList(RefCountedPtr policy, const char* tracer)
: policy_(std::move(policy)), tracer_(tracer) {}
- void Init(const ServerAddressList& addresses, const ChannelArgs& args,
+ void Init(const EndpointAddressesList& endpoints, const ChannelArgs& args,
absl::AnyInvocable(
- RefCountedPtr, const ServerAddress&,
+ RefCountedPtr, const EndpointAddresses&,
const ChannelArgs&)>
create_endpoint);
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
index 0655b2dfc45d6..0341ba277a887 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb.cc
@@ -134,8 +134,8 @@
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/resolver/resolver.h"
-#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/slice/slice.h"
#include "src/core/lib/slice/slice_string_helpers.h"
@@ -387,8 +387,8 @@ class GrpcLb : public LoadBalancingPolicy {
// Returns a text representation suitable for logging.
std::string AsText() const;
- // Extracts all non-drop entries into a ServerAddressList.
- ServerAddressList GetServerAddressList(
+ // Extracts all non-drop entries into an EndpointAddressesList.
+ EndpointAddressesList GetServerAddressList(
GrpcLbClientStats* client_stats) const;
// Returns true if the serverlist contains at least one drop entry and
@@ -467,7 +467,8 @@ class GrpcLb : public LoadBalancingPolicy {
: ParentOwningDelegatingChannelControlHelper(std::move(parent)) {}
RefCountedPtr CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) override;
+ const grpc_resolved_address& address,
+ const ChannelArgs& per_address_args, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr picker) override;
void RequestReresolution() override;
@@ -571,7 +572,7 @@ class GrpcLb : public LoadBalancingPolicy {
// Whether we're in fallback mode.
bool fallback_mode_ = false;
// The backend addresses from the resolver.
- absl::StatusOr fallback_backend_addresses_;
+ absl::StatusOr fallback_backend_addresses_;
// The last resolution note from our parent.
// To be passed to child policy when fallback_backend_addresses_ is empty.
std::string resolution_note_;
@@ -668,11 +669,11 @@ bool IsServerValid(const GrpcLbServer& server, size_t idx, bool log) {
}
// Returns addresses extracted from the serverlist.
-ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
+EndpointAddressesList GrpcLb::Serverlist::GetServerAddressList(
GrpcLbClientStats* client_stats) const {
RefCountedPtr stats;
if (client_stats != nullptr) stats = client_stats->Ref();
- ServerAddressList addresses;
+ EndpointAddressesList endpoints;
for (size_t i = 0; i < serverlist_.size(); ++i) {
const GrpcLbServer& server = serverlist_[i];
if (!IsServerValid(server, i, false)) continue;
@@ -692,11 +693,11 @@ ServerAddressList GrpcLb::Serverlist::GetServerAddressList(
: addr_uri.status().ToString().c_str());
}
// Add address with a channel arg containing LB token and stats object.
- addresses.emplace_back(
+ endpoints.emplace_back(
addr, ChannelArgs().SetObject(MakeRefCounted(
std::move(lb_token), stats)));
}
- return addresses;
+ return endpoints;
}
bool GrpcLb::Serverlist::ContainsAllDropEntries() const {
@@ -779,19 +780,21 @@ GrpcLb::PickResult GrpcLb::Picker::Pick(PickArgs args) {
//
RefCountedPtr GrpcLb::Helper::CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) {
+ const grpc_resolved_address& address, const ChannelArgs& per_address_args,
+ const ChannelArgs& args) {
if (parent()->shutting_down_) return nullptr;
- const auto* arg = address.args().GetObject();
+ const auto* arg = per_address_args.GetObject();
if (arg == nullptr) {
+ auto addr_str = grpc_sockaddr_to_string(&address, false);
Crash(
- absl::StrFormat("[grpclb %p] no TokenAndClientStatsArg for address %p",
- parent(), address.ToString().c_str()));
+ absl::StrFormat("[grpclb %p] no TokenAndClientStatsArg for address %s",
+ parent(), addr_str.value_or("N/A").c_str()));
}
std::string lb_token = arg->lb_token();
RefCountedPtr client_stats = arg->client_stats();
return MakeRefCounted(
- parent()->channel_control_helper()->CreateSubchannel(std::move(address),
- args),
+ parent()->channel_control_helper()->CreateSubchannel(
+ address, per_address_args, args),
parent()->Ref(DEBUG_LOCATION, "SubchannelWrapper"), std::move(lb_token),
std::move(client_stats));
}
@@ -1347,11 +1350,11 @@ void GrpcLb::BalancerCallState::OnBalancerStatusReceivedLocked(
// helper code for creating balancer channel
//
-ServerAddressList ExtractBalancerAddresses(const ChannelArgs& args) {
- const ServerAddressList* addresses =
+EndpointAddressesList ExtractBalancerAddresses(const ChannelArgs& args) {
+ const EndpointAddressesList* endpoints =
FindGrpclbBalancerAddressesInChannelArgs(args);
- if (addresses != nullptr) return *addresses;
- return ServerAddressList();
+ if (endpoints != nullptr) return *endpoints;
+ return EndpointAddressesList();
}
// Returns the channel args for the LB channel, used to create a bidirectional
@@ -1514,10 +1517,10 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
fallback_backend_addresses_ = std::move(args.addresses);
if (fallback_backend_addresses_.ok()) {
// Add null LB token attributes.
- for (ServerAddress& address : *fallback_backend_addresses_) {
- address = ServerAddress(
- address.address(),
- address.args().SetObject(
+ for (EndpointAddresses& addresses : *fallback_backend_addresses_) {
+ addresses = EndpointAddresses(
+ addresses.addresses(),
+ addresses.args().SetObject(
MakeRefCounted("", nullptr)));
}
}
@@ -1566,7 +1569,7 @@ absl::Status GrpcLb::UpdateLocked(UpdateArgs args) {
absl::Status GrpcLb::UpdateBalancerChannelLocked() {
// Get balancer addresses.
- ServerAddressList balancer_addresses = ExtractBalancerAddresses(args_);
+ EndpointAddressesList balancer_addresses = ExtractBalancerAddresses(args_);
absl::Status status;
if (balancer_addresses.empty()) {
status = absl::UnavailableError("balancer address list must be non-empty");
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc
index c45be05a6a3ae..2624c3a5c1976 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.cc
@@ -34,25 +34,25 @@ namespace grpc_core {
namespace {
void* BalancerAddressesArgCopy(void* p) {
- ServerAddressList* address_list = static_cast(p);
- return new ServerAddressList(*address_list);
+ EndpointAddressesList* endpoint_list = static_cast(p);
+ return new EndpointAddressesList(*endpoint_list);
}
void BalancerAddressesArgDestroy(void* p) {
- ServerAddressList* address_list = static_cast(p);
- delete address_list;
+ EndpointAddressesList* endpoint_list = static_cast(p);
+ delete endpoint_list;
}
int BalancerAddressesArgCmp(void* p, void* q) {
- ServerAddressList* address_list1 = static_cast(p);
- ServerAddressList* address_list2 = static_cast(q);
- if (address_list1 == nullptr || address_list2 == nullptr) {
- return QsortCompare(address_list1, address_list2);
+ auto* endpoint_list1 = static_cast(p);
+ auto* endpoint_list2 = static_cast(q);
+ if (endpoint_list1 == nullptr || endpoint_list2 == nullptr) {
+ return QsortCompare(endpoint_list1, endpoint_list2);
}
- if (address_list1->size() > address_list2->size()) return 1;
- if (address_list1->size() < address_list2->size()) return -1;
- for (size_t i = 0; i < address_list1->size(); ++i) {
- int retval = (*address_list1)[i].Cmp((*address_list2)[i]);
+ if (endpoint_list1->size() > endpoint_list2->size()) return 1;
+ if (endpoint_list1->size() < endpoint_list2->size()) return -1;
+ for (size_t i = 0; i < endpoint_list1->size(); ++i) {
+ int retval = (*endpoint_list1)[i].Cmp((*endpoint_list2)[i]);
if (retval != 0) return retval;
}
return 0;
@@ -65,24 +65,24 @@ const grpc_arg_pointer_vtable kBalancerAddressesArgVtable = {
} // namespace
grpc_arg CreateGrpclbBalancerAddressesArg(
- const ServerAddressList* address_list) {
+ const EndpointAddressesList* endpoint_list) {
return grpc_channel_arg_pointer_create(
const_cast(GRPC_ARG_GRPCLB_BALANCER_ADDRESSES),
- const_cast(address_list),
+ const_cast(endpoint_list),
&kBalancerAddressesArgVtable);
}
-const ServerAddressList* FindGrpclbBalancerAddressesInChannelArgs(
+const EndpointAddressesList* FindGrpclbBalancerAddressesInChannelArgs(
const ChannelArgs& args) {
- return args.GetPointer(
+ return args.GetPointer(
GRPC_ARG_GRPCLB_BALANCER_ADDRESSES);
}
ChannelArgs SetGrpcLbBalancerAddresses(const ChannelArgs& args,
- ServerAddressList address_list) {
+ EndpointAddressesList endpoint_list) {
return args.Set(
GRPC_ARG_GRPCLB_BALANCER_ADDRESSES,
- ChannelArgs::Pointer(new ServerAddressList(std::move(address_list)),
+ ChannelArgs::Pointer(new EndpointAddressesList(std::move(endpoint_list)),
&kBalancerAddressesArgVtable));
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h
index 7f80aca80e1bc..adc689c6c011f 100644
--- a/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h
+++ b/src/core/ext/filters/client_channel/lb_policy/grpclb/grpclb_balancer_addresses.h
@@ -22,16 +22,16 @@
#include
#include "src/core/lib/channel/channel_args.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
namespace grpc_core {
grpc_arg CreateGrpclbBalancerAddressesArg(
- const ServerAddressList* address_list);
+ const EndpointAddressesList* endpoint_list);
GRPC_MUST_USE_RESULT
ChannelArgs SetGrpcLbBalancerAddresses(const ChannelArgs& args,
- ServerAddressList address_list);
-const ServerAddressList* FindGrpclbBalancerAddressesInChannelArgs(
+ EndpointAddressesList endpoint_list);
+const EndpointAddressesList* FindGrpclbBalancerAddressesInChannelArgs(
const ChannelArgs& args);
} // namespace grpc_core
diff --git a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
index aede6d5067d99..c4434218c2684 100644
--- a/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/outlier_detection/outlier_detection.cc
@@ -50,6 +50,7 @@
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
#include "src/core/lib/experiments/experiments.h"
+#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted.h"
@@ -60,13 +61,14 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/load_balancing/delegating_helper.h"
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
@@ -161,6 +163,8 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
void AddDataWatcher(std::unique_ptr watcher) override;
+ void CancelDataWatcher(DataWatcherInterface* watcher) override;
+
RefCountedPtr subchannel_state() const {
return subchannel_state_;
}
@@ -338,6 +342,25 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
bool counting_enabled_;
};
+ class EndpointAddressesArg : public RefCounted {
+ public:
+ explicit EndpointAddressesArg(EndpointAddressSet addresses)
+ : addresses_(std::move(addresses)) {}
+
+ const EndpointAddressSet& addresses() const { return addresses_; }
+
+ static absl::string_view ChannelArgName() {
+ return GRPC_ARG_NO_SUBCHANNEL_PREFIX "endpoint_addresses";
+ }
+ static int ChannelArgsCompare(const EndpointAddressesArg* a,
+ const EndpointAddressesArg* b) {
+ return QsortCompare(a->addresses_, b->addresses_);
+ }
+
+ private:
+ EndpointAddressSet addresses_;
+ };
+
class Helper
: public ParentOwningDelegatingChannelControlHelper {
public:
@@ -346,7 +369,8 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
std::move(outlier_detection_policy)) {}
RefCountedPtr CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) override;
+ const grpc_resolved_address& address,
+ const ChannelArgs& per_address_args, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr picker) override;
};
@@ -371,10 +395,6 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
~OutlierDetectionLb() override;
- // Returns the address map key for an address, or the empty string if
- // the address should be ignored.
- static std::string MakeKeyForAddress(const ServerAddress& address);
-
void ShutdownLocked() override;
OrphanablePtr CreateChildPolicyLocked(
@@ -394,7 +414,8 @@ class OutlierDetectionLb : public LoadBalancingPolicy {
grpc_connectivity_state state_ = GRPC_CHANNEL_IDLE;
absl::Status status_;
RefCountedPtr picker_;
- std::map> subchannel_state_map_;
+ std::map>
+ subchannel_state_map_;
OrphanablePtr ejection_timer_;
};
@@ -425,6 +446,13 @@ void OutlierDetectionLb::SubchannelWrapper::AddDataWatcher(
DelegatingSubchannel::AddDataWatcher(std::move(watcher));
}
+void OutlierDetectionLb::SubchannelWrapper::CancelDataWatcher(
+ DataWatcherInterface* watcher) {
+ auto* w = static_cast(watcher);
+ if (w->type() == HealthProducer::Type()) watcher_wrapper_ = nullptr;
+ DelegatingSubchannel::CancelDataWatcher(watcher);
+}
+
//
// OutlierDetectionLb::Picker::SubchannelCallTracker
//
@@ -536,15 +564,6 @@ OutlierDetectionLb::~OutlierDetectionLb() {
}
}
-std::string OutlierDetectionLb::MakeKeyForAddress(
- const ServerAddress& address) {
- // Use only the address, not the attributes.
- auto addr_str = grpc_sockaddr_to_string(&address.address(), false);
- // If address couldn't be stringified, ignore it.
- if (!addr_str.ok()) return "";
- return std::move(*addr_str);
-}
-
void OutlierDetectionLb::ShutdownLocked() {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO, "[outlier_detection_lb %p] shutting down", this);
@@ -612,17 +631,16 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
}
// Update subchannel state map.
if (args.addresses.ok()) {
- std::set current_addresses;
- for (const ServerAddress& address : *args.addresses) {
- std::string address_key = MakeKeyForAddress(address);
- if (address_key.empty()) continue;
- auto& subchannel_state = subchannel_state_map_[address_key];
+ std::set current_addresses;
+ for (EndpointAddresses& endpoint : *args.addresses) {
+ EndpointAddressSet key(endpoint.addresses());
+ auto& subchannel_state = subchannel_state_map_[key];
if (subchannel_state == nullptr) {
subchannel_state = MakeRefCounted();
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] adding map entry for %s (%p)",
- this, address_key.c_str(), subchannel_state.get());
+ this, key.ToString().c_str(), subchannel_state.get());
}
} else if (!config_->CountingEnabled()) {
// If counting is not enabled, reset state.
@@ -630,11 +648,15 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] counting disabled; disabling "
"ejection for %s (%p)",
- this, address_key.c_str(), subchannel_state.get());
+ this, key.ToString().c_str(), subchannel_state.get());
}
subchannel_state->DisableEjection();
}
- current_addresses.emplace(address_key);
+ current_addresses.emplace(key);
+ // Add channel arg containing the key, for use in CreateSubchannel().
+ endpoint = EndpointAddresses(
+ endpoint.addresses(),
+ endpoint.args().SetObject(MakeRefCounted(key)));
}
for (auto it = subchannel_state_map_.begin();
it != subchannel_state_map_.end();) {
@@ -644,7 +666,7 @@ absl::Status OutlierDetectionLb::UpdateLocked(UpdateArgs args) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
gpr_log(GPR_INFO,
"[outlier_detection_lb %p] removing map entry for %s (%p)",
- this, it->first.c_str(), it->second.get());
+ this, it->first.ToString().c_str(), it->second.get());
}
it = subchannel_state_map_.erase(it);
} else {
@@ -715,16 +737,20 @@ OrphanablePtr OutlierDetectionLb::CreateChildPolicyLocked(
//
RefCountedPtr OutlierDetectionLb::Helper::CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) {
+ const grpc_resolved_address& address, const ChannelArgs& per_address_args,
+ const ChannelArgs& args) {
if (parent()->shutting_down_) return nullptr;
RefCountedPtr subchannel_state;
- std::string key = MakeKeyForAddress(address);
- if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
- gpr_log(GPR_INFO,
- "[outlier_detection_lb %p] using key %s for subchannel address %s",
- parent(), key.c_str(), address.ToString().c_str());
- }
- if (!key.empty()) {
+ auto* key_attr = per_address_args.GetObject();
+ if (key_attr != nullptr) {
+ const EndpointAddressSet& key = key_attr->addresses();
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
+ std::string address_str =
+ grpc_sockaddr_to_string(&address, false).value_or("");
+ gpr_log(GPR_INFO,
+ "[outlier_detection_lb %p] creating subchannel for %s, key %s",
+ parent(), address_str.c_str(), key.ToString().c_str());
+ }
auto it = parent()->subchannel_state_map_.find(key);
if (it != parent()->subchannel_state_map_.end()) {
subchannel_state = it->second->Ref();
@@ -732,8 +758,8 @@ RefCountedPtr OutlierDetectionLb::Helper::CreateSubchannel(
}
auto subchannel = MakeRefCounted(
parent()->work_serializer(), subchannel_state,
- parent()->channel_control_helper()->CreateSubchannel(std::move(address),
- args));
+ parent()->channel_control_helper()->CreateSubchannel(
+ address, per_address_args, args));
if (subchannel_state != nullptr) {
subchannel_state->AddSubchannel(subchannel.get());
}
@@ -964,8 +990,8 @@ void OutlierDetectionLb::EjectionTimer::OnTimerLocked() {
const bool unejected = subchannel_state->MaybeUneject(
config.base_ejection_time.millis(), config.max_ejection_time.millis());
if (unejected && GRPC_TRACE_FLAG_ENABLED(grpc_outlier_detection_lb_trace)) {
- gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected address %s (%p)",
- parent_.get(), state.first.c_str(), subchannel_state);
+ gpr_log(GPR_INFO, "[outlier_detection_lb %p] unejected endpoint %s (%p)",
+ parent_.get(), state.first.ToString().c_str(), subchannel_state);
}
}
parent_->ejection_timer_ =
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
index 86955752a851c..36d297d0bd01c 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.cc
@@ -24,6 +24,7 @@
#include
#include
#include
+#include
#include
#include
@@ -35,6 +36,8 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
+#include
+#include
#include
#include
@@ -42,10 +45,14 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/core_configuration.h"
#include "src/core/lib/debug/trace.h"
+#include "src/core/lib/gpr/useful.h"
#include "src/core/lib/gprpp/crash.h"
#include "src/core/lib/gprpp/debug_location.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/gprpp/ref_counted_ptr.h"
+#include "src/core/lib/gprpp/time.h"
+#include "src/core/lib/gprpp/work_serializer.h"
+#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
@@ -53,7 +60,7 @@
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
@@ -110,6 +117,9 @@ class PickFirst : public LoadBalancingPolicy {
absl::optional connectivity_state() const {
return connectivity_state_;
}
+ const absl::Status& connectivity_status() const {
+ return connectivity_status_;
+ }
// Returns the index into the subchannel list of this object.
size_t Index() const {
@@ -122,6 +132,13 @@ class PickFirst : public LoadBalancingPolicy {
if (subchannel_ != nullptr) subchannel_->ResetBackoff();
}
+ void RequestConnection() { subchannel_->RequestConnection(); }
+
+ // Requests a connection attempt to start on this subchannel,
+ // with appropriate Connection Attempt Delay.
+ // Used only during the Happy Eyeballs pass.
+ void RequestConnectionWithTimer();
+
// Cancels any pending connectivity watch and unrefs the subchannel.
void ShutdownLocked();
@@ -164,9 +181,6 @@ class PickFirst : public LoadBalancingPolicy {
// subchannel.
void ProcessUnselectedReadyLocked();
- // Reacts to the current connectivity state while trying to connect.
- void ReactToConnectivityStateLocked();
-
// Backpointer to owning subchannel list. Not owned.
SubchannelList* subchannel_list_;
// The subchannel.
@@ -179,8 +193,8 @@ class PickFirst : public LoadBalancingPolicy {
absl::Status connectivity_status_;
};
- SubchannelList(RefCountedPtr policy, ServerAddressList addresses,
- const ChannelArgs& args);
+ SubchannelList(RefCountedPtr policy,
+ EndpointAddressesList addresses, const ChannelArgs& args);
~SubchannelList() override;
@@ -197,6 +211,14 @@ class PickFirst : public LoadBalancingPolicy {
// connectivity state notifications.
bool AllSubchannelsSeenInitialState();
+ // Looks through subchannels_ starting from attempting_index_ to
+ // find the first one not currently in TRANSIENT_FAILURE, then
+ // triggers a connection attempt for that subchannel. If there are
+ // no more subchannels not in TRANSIENT_FAILURE (i.e., the Happy
+ // Eyeballs pass is complete), transitions to a mode where we
+ // try to connect to all subchannels in parallel.
+ void StartConnectingNextSubchannel();
+
// Backpointer to owning policy.
RefCountedPtr policy_;
@@ -210,8 +232,17 @@ class PickFirst : public LoadBalancingPolicy {
// finished processing.
bool shutting_down_ = false;
- bool in_transient_failure_ = false;
+ // The index into subchannels_ to which we are currently attempting
+ // to connect during the initial Happy Eyeballs pass. Once the
+ // initial pass is over, this will be equal to size().
size_t attempting_index_ = 0;
+ // Happy Eyeballs timer handle.
+ absl::optional
+ timer_handle_;
+
+ // After the initial Happy Eyeballs pass, the number of failures
+ // we've seen. Every size() failures, we trigger re-resolution.
+ size_t num_failures_ = 0;
};
class HealthWatcher
@@ -261,6 +292,8 @@ class PickFirst : public LoadBalancingPolicy {
const bool enable_health_watch_;
// Whether we should omit our status message prefix.
const bool omit_status_message_prefix_;
+ // Connection Attempt Delay for Happy Eyeballs.
+ const Duration connection_attempt_delay_;
// Lateset update args.
UpdateArgs latest_update_args_;
@@ -291,7 +324,12 @@ PickFirst::PickFirst(Args args)
omit_status_message_prefix_(
channel_args()
.GetBool(GRPC_ARG_INTERNAL_PICK_FIRST_OMIT_STATUS_MESSAGE_PREFIX)
- .value_or(false)) {
+ .value_or(false)),
+ connection_attempt_delay_(Duration::Milliseconds(
+ Clamp(channel_args()
+ .GetInt(GRPC_ARG_HAPPY_EYEBALLS_CONNECTION_ATTEMPT_DELAY_MS)
+ .value_or(250),
+ 100, 2000))) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
gpr_log(GPR_INFO, "Pick First %p created.", this);
}
@@ -337,7 +375,7 @@ void PickFirst::ResetBackoffLocked() {
void PickFirst::AttemptToConnectUsingLatestUpdateArgsLocked() {
// Create a subchannel list from latest_update_args_.
- ServerAddressList addresses;
+ EndpointAddressesList addresses;
if (latest_update_args_.addresses.ok()) {
addresses = *latest_update_args_.addresses;
}
@@ -393,10 +431,19 @@ absl::Status PickFirst::UpdateLocked(UpdateArgs args) {
} else if (args.addresses->empty()) {
status = absl::UnavailableError("address list must not be empty");
} else {
+ // Shuffle the list if needed.
auto config = static_cast(args.config.get());
if (config->shuffle_addresses()) {
absl::c_shuffle(*args.addresses, bit_gen_);
}
+ // Flatten the list so that we have one address per endpoint.
+ EndpointAddressesList endpoints;
+ for (const auto& endpoint : *args.addresses) {
+ for (const auto& address : endpoint.addresses()) {
+ endpoints.emplace_back(address, endpoint.args());
+ }
+ }
+ args.addresses = std::move(endpoints);
}
// If the update contains a resolver error and we have a previous update
// that was not a resolver error, keep using the previous addresses.
@@ -562,7 +609,8 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
p->UnsetSelectedSubchannel();
p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
// Set our state to that of the pending subchannel list.
- if (p->subchannel_list_->in_transient_failure_) {
+ if (p->subchannel_list_->attempting_index_ ==
+ p->subchannel_list_->size()) {
absl::Status status = absl::UnavailableError(absl::StrCat(
"selected subchannel failed; switching to pending update; "
"last failure: ",
@@ -595,7 +643,6 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
// select in place of the current one.
// If the subchannel is READY, use it.
if (new_state == GRPC_CHANNEL_READY) {
- subchannel_list_->in_transient_failure_ = false;
ProcessUnselectedReadyLocked();
return;
}
@@ -607,94 +654,55 @@ void PickFirst::SubchannelList::SubchannelData::OnConnectivityStateChange(
// see its initial notification. Start trying to connect, starting
// with the first subchannel.
if (!old_state.has_value()) {
- subchannel_list_->subchannels_.front().ReactToConnectivityStateLocked();
+ subchannel_list_->StartConnectingNextSubchannel();
return;
}
- // Ignore any other updates for subchannels we're not currently trying to
- // connect to.
- if (Index() != subchannel_list_->attempting_index_) return;
- // React to the connectivity state.
- ReactToConnectivityStateLocked();
-}
-
-void PickFirst::SubchannelList::SubchannelData::
- ReactToConnectivityStateLocked() {
- PickFirst* p = subchannel_list_->policy_.get();
- // Otherwise, process connectivity state.
- switch (connectivity_state_.value()) {
- case GRPC_CHANNEL_READY:
- // Already handled this case above, so this should not happen.
- GPR_UNREACHABLE_CODE(break);
+ // Otherwise, process connectivity state change.
+ switch (*connectivity_state_) {
case GRPC_CHANNEL_TRANSIENT_FAILURE: {
- // Find the next subchannel not in state TRANSIENT_FAILURE.
- // We skip subchannels in state TRANSIENT_FAILURE to avoid a
- // large recursion that could overflow the stack.
- SubchannelData* found_subchannel = nullptr;
- for (size_t next_index = Index() + 1;
- next_index < subchannel_list_->size(); ++next_index) {
- SubchannelData* sc = &subchannel_list_->subchannels_[next_index];
- GPR_ASSERT(sc->connectivity_state_.has_value());
- if (sc->connectivity_state_ != GRPC_CHANNEL_TRANSIENT_FAILURE) {
- subchannel_list_->attempting_index_ = next_index;
- found_subchannel = sc;
- break;
+ // If a connection attempt fails before the timer fires, then
+ // cancel the timer and start connecting on the next subchannel.
+ if (Index() == subchannel_list_->attempting_index_) {
+ if (subchannel_list_->timer_handle_.has_value()) {
+ p->channel_control_helper()->GetEventEngine()->Cancel(
+ *subchannel_list_->timer_handle_);
}
- }
- // If we found another subchannel in the list not in state
- // TRANSIENT_FAILURE, trigger the right behavior for that subchannel.
- if (found_subchannel != nullptr) {
- found_subchannel->ReactToConnectivityStateLocked();
- break;
- }
- // We didn't find another subchannel not in state TRANSIENT_FAILURE,
- // so report TRANSIENT_FAILURE and wait for the first subchannel
- // in the list to report IDLE before continuing.
- if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO,
- "Pick First %p subchannel list %p failed to connect to "
- "all subchannels",
- p, subchannel_list_);
- }
- subchannel_list_->attempting_index_ = 0;
- subchannel_list_->in_transient_failure_ = true;
- // In case 2, swap to the new subchannel list. This means reporting
- // TRANSIENT_FAILURE and dropping the existing (working) connection,
- // but we can't ignore what the control plane has told us.
- if (subchannel_list_ == p->latest_pending_subchannel_list_.get()) {
- if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
- gpr_log(GPR_INFO,
- "Pick First %p promoting pending subchannel list %p to "
- "replace %p",
- p, p->latest_pending_subchannel_list_.get(),
- p->subchannel_list_.get());
+ ++subchannel_list_->attempting_index_;
+ subchannel_list_->StartConnectingNextSubchannel();
+ } else if (subchannel_list_->attempting_index_ ==
+ subchannel_list_->size()) {
+ // We're done with the initial Happy Eyeballs pass and in a mode
+ // where we're attempting to connect to every subchannel in
+ // parallel. We count the number of failed connection attempts,
+ // and when that is equal to the number of subchannels, request
+ // re-resolution and report TRANSIENT_FAILURE again, so that the
+ // caller has the most recent status message. Note that this
+ // isn't necessarily the same as saying that we've seen one
+ // failure for each subchannel in the list, because the backoff
+ // state may be different in each subchannel, so we may have seen
+ // one subchannel fail more than once and another subchannel not
+ // fail at all. But it's a good enough heuristic.
+ ++subchannel_list_->num_failures_;
+ if (subchannel_list_->num_failures_ % subchannel_list_->size() == 0) {
+ p->channel_control_helper()->RequestReresolution();
+ absl::Status status = absl::UnavailableError(absl::StrCat(
+ (p->omit_status_message_prefix_
+ ? ""
+ : "failed to connect to all addresses; last error: "),
+ connectivity_status_.ToString()));
+ p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
+ MakeRefCounted(status));
}
- p->UnsetSelectedSubchannel();
- p->subchannel_list_ = std::move(p->latest_pending_subchannel_list_);
- }
- // If this is the current subchannel list (either because we were
- // in case 1 or because we were in case 2 and just promoted it to
- // be the current list), re-resolve and report new state.
- if (subchannel_list_ == p->subchannel_list_.get()) {
- p->channel_control_helper()->RequestReresolution();
- absl::Status status = absl::UnavailableError(absl::StrCat(
- (p->omit_status_message_prefix_
- ? ""
- : "failed to connect to all addresses; last error: "),
- connectivity_status_.ToString()));
- p->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
- MakeRefCounted(status));
- }
- // If the first subchannel is already IDLE, trigger the next connection
- // attempt immediately. Otherwise, we'll wait for it to report
- // its own connectivity state change.
- auto& subchannel0 = subchannel_list_->subchannels_.front();
- if (subchannel0.connectivity_state_ == GRPC_CHANNEL_IDLE) {
- subchannel0.subchannel_->RequestConnection();
}
break;
}
case GRPC_CHANNEL_IDLE:
- subchannel_->RequestConnection();
+ // If we've finished the first Happy Eyeballs pass, then we go
+ // into a mode where we immediately try to connect to every
+ // subchannel in parallel.
+ if (subchannel_list_->attempting_index_ == subchannel_list_->size()) {
+ subchannel_->RequestConnection();
+ }
break;
case GRPC_CHANNEL_CONNECTING:
// Only update connectivity state in case 1, and only if we're not
@@ -705,13 +713,66 @@ void PickFirst::SubchannelList::SubchannelData::
MakeRefCounted(nullptr));
}
break;
- case GRPC_CHANNEL_SHUTDOWN:
+ default:
+ // We handled READY above, and we should never see SHUTDOWN.
GPR_UNREACHABLE_CODE(break);
}
}
+void PickFirst::SubchannelList::SubchannelData::RequestConnectionWithTimer() {
+ GPR_ASSERT(connectivity_state_.has_value());
+ if (connectivity_state_ == GRPC_CHANNEL_IDLE) {
+ subchannel_->RequestConnection();
+ } else {
+ GPR_ASSERT(connectivity_state_ == GRPC_CHANNEL_CONNECTING);
+ }
+ // If this is not the last subchannel in the list, start the timer.
+ if (Index() != subchannel_list_->size() - 1) {
+ PickFirst* p = subchannel_list_->policy_.get();
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Pick First %p subchannel list %p: starting Connection "
+ "Attempt Delay timer for %" PRIdPTR "ms for index %" PRIuPTR,
+ p, subchannel_list_, p->connection_attempt_delay_.millis(),
+ Index());
+ }
+ subchannel_list_->timer_handle_ =
+ p->channel_control_helper()->GetEventEngine()->RunAfter(
+ p->connection_attempt_delay_,
+ [subchannel_list =
+ subchannel_list_->Ref(DEBUG_LOCATION, "timer")]() mutable {
+ ApplicationCallbackExecCtx application_exec_ctx;
+ ExecCtx exec_ctx;
+ auto* sl = subchannel_list.get();
+ sl->policy_->work_serializer()->Run(
+ [subchannel_list = std::move(subchannel_list)]() {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Pick First %p subchannel list %p: Connection "
+ "Attempt Delay timer fired (shutting_down=%d, "
+ "selected=%p)",
+ subchannel_list->policy_.get(),
+ subchannel_list.get(),
+ subchannel_list->shutting_down_,
+ subchannel_list->policy_->selected_);
+ }
+ if (subchannel_list->shutting_down_) return;
+ if (subchannel_list->policy_->selected_ != nullptr) return;
+ ++subchannel_list->attempting_index_;
+ subchannel_list->StartConnectingNextSubchannel();
+ },
+ DEBUG_LOCATION);
+ });
+ }
+}
+
void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() {
PickFirst* p = subchannel_list_->policy_.get();
+ // Cancel Happy Eyeballs timer, if any.
+ if (subchannel_list_->timer_handle_.has_value()) {
+ p->channel_control_helper()->GetEventEngine()->Cancel(
+ *subchannel_list_->timer_handle_);
+ }
// If we get here, there are two possible cases:
// 1. We do not currently have a selected subchannel, and the update is
// for a subchannel in p->subchannel_list_ that we're trying to
@@ -772,7 +833,7 @@ void PickFirst::SubchannelList::SubchannelData::ProcessUnselectedReadyLocked() {
//
PickFirst::SubchannelList::SubchannelList(RefCountedPtr policy,
- ServerAddressList addresses,
+ EndpointAddressesList addresses,
const ChannelArgs& args)
: InternallyRefCounted(
GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace) ? "SubchannelList"
@@ -789,9 +850,11 @@ PickFirst::SubchannelList::SubchannelList(RefCountedPtr policy,
}
subchannels_.reserve(addresses.size());
// Create a subchannel for each address.
- for (const ServerAddress& address : addresses) {
+ for (const EndpointAddresses& address : addresses) {
+ GPR_ASSERT(address.addresses().size() == 1);
RefCountedPtr subchannel =
- policy_->channel_control_helper()->CreateSubchannel(address, args_);
+ policy_->channel_control_helper()->CreateSubchannel(
+ address.address(), address.args(), args_);
if (subchannel == nullptr) {
// Subchannel could not be created.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
@@ -829,6 +892,9 @@ void PickFirst::SubchannelList::Orphan() {
for (auto& sd : subchannels_) {
sd.ShutdownLocked();
}
+ if (timer_handle_.has_value()) {
+ policy_->channel_control_helper()->GetEventEngine()->Cancel(*timer_handle_);
+ }
Unref();
}
@@ -845,6 +911,68 @@ bool PickFirst::SubchannelList::AllSubchannelsSeenInitialState() {
return true;
}
+void PickFirst::SubchannelList::StartConnectingNextSubchannel() {
+ // Find the next subchannel not in state TRANSIENT_FAILURE.
+ // We skip subchannels in state TRANSIENT_FAILURE to avoid a
+ // large recursion that could overflow the stack.
+ for (; attempting_index_ < size(); ++attempting_index_) {
+ SubchannelData* sc = &subchannels_[attempting_index_];
+ GPR_ASSERT(sc->connectivity_state().has_value());
+ if (sc->connectivity_state() != GRPC_CHANNEL_TRANSIENT_FAILURE) {
+ // Found a subchannel not in TRANSIENT_FAILURE, so trigger a
+ // connection attempt.
+ sc->RequestConnectionWithTimer();
+ return;
+ }
+ }
+ // We didn't find another subchannel not in state TRANSIENT_FAILURE,
+ // so report TRANSIENT_FAILURE and switch to a mode in which we try to
+ // connect to all addresses in parallel.
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Pick First %p subchannel list %p failed to connect to "
+ "all subchannels",
+ policy_.get(), this);
+ }
+ // In case 2, swap to the new subchannel list. This means reporting
+ // TRANSIENT_FAILURE and dropping the existing (working) connection,
+ // but we can't ignore what the control plane has told us.
+ if (policy_->latest_pending_subchannel_list_.get() == this) {
+ if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_pick_first_trace)) {
+ gpr_log(GPR_INFO,
+ "Pick First %p promoting pending subchannel list %p to "
+ "replace %p",
+ policy_.get(), policy_->latest_pending_subchannel_list_.get(),
+ this);
+ }
+ policy_->UnsetSelectedSubchannel();
+ policy_->subchannel_list_ =
+ std::move(policy_->latest_pending_subchannel_list_);
+ }
+ // If this is the current subchannel list (either because we were
+ // in case 1 or because we were in case 2 and just promoted it to
+ // be the current list), re-resolve and report new state.
+ if (policy_->subchannel_list_.get() == this) {
+ policy_->channel_control_helper()->RequestReresolution();
+ absl::Status status = absl::UnavailableError(
+ absl::StrCat((policy_->omit_status_message_prefix_
+ ? ""
+ : "failed to connect to all addresses; last error: "),
+ subchannels_.back().connectivity_status().ToString()));
+ policy_->UpdateState(GRPC_CHANNEL_TRANSIENT_FAILURE, status,
+ MakeRefCounted(status));
+ }
+ // We now transition into a mode where we try to connect to all
+ // subchannels in parallel. For any subchannel currently in IDLE,
+ // trigger a connection attempt. For any subchannel not currently in
+ // IDLE, we will trigger a connection attempt when it does report IDLE.
+ for (SubchannelData& sd : subchannels_) {
+ if (sd.connectivity_state() == GRPC_CHANNEL_IDLE) {
+ sd.RequestConnection();
+ }
+ }
+}
+
//
// factory
//
diff --git a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h
index ff5e0e6f2a408..4796742526d55 100644
--- a/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h
+++ b/src/core/ext/filters/client_channel/lb_policy/pick_first/pick_first.h
@@ -19,7 +19,7 @@
#include
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
// Internal channel arg to enable health checking in pick_first.
// Intended to be used by petiole policies (e.g., round_robin) that
diff --git a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
index 716b5f41e88a1..09bb347f53737 100644
--- a/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/priority/priority.cc
@@ -59,7 +59,7 @@
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
diff --git a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
index 288f46d80455b..9d440ca75d65e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/ring_hash/ring_hash.cc
@@ -68,7 +68,7 @@
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
@@ -150,7 +150,7 @@ class RingHash : public LoadBalancingPolicy {
public:
struct RingEntry {
uint64_t hash;
- size_t endpoint_index; // Index into RingHash::addresses_.
+ size_t endpoint_index; // Index into RingHash::endpoints_.
};
Ring(RingHash* ring_hash, RingHashLbConfig* config);
@@ -164,7 +164,7 @@ class RingHash : public LoadBalancingPolicy {
// State for a particular endpoint. Delegates to a pick_first child policy.
class RingHashEndpoint : public InternallyRefCounted {
public:
- // index is the index into RingHash::addresses_ of this endpoint.
+ // index is the index into RingHash::endpoints_ of this endpoint.
RingHashEndpoint(RefCountedPtr ring_hash, size_t index)
: ring_hash_(std::move(ring_hash)), index_(index) {}
@@ -208,7 +208,7 @@ class RingHash : public LoadBalancingPolicy {
// Ref to our parent.
RefCountedPtr ring_hash_;
- size_t index_; // Index into RingHash::addresses_ of this endpoint.
+ size_t index_; // Index into RingHash::endpoints_ of this endpoint.
// The pick_first child policy.
OrphanablePtr child_policy_;
@@ -223,7 +223,7 @@ class RingHash : public LoadBalancingPolicy {
explicit Picker(RefCountedPtr ring_hash)
: ring_hash_(std::move(ring_hash)),
ring_(ring_hash_->ring_),
- endpoints_(ring_hash_->addresses_.size()) {
+ endpoints_(ring_hash_->endpoints_.size()) {
for (const auto& p : ring_hash_->endpoint_map_) {
endpoints_[p.second->index()] = p.second->GetInfoForPicker();
}
@@ -281,12 +281,12 @@ class RingHash : public LoadBalancingPolicy {
void UpdateAggregatedConnectivityStateLocked(bool entered_transient_failure,
absl::Status status);
- // Current address list, channel args, and ring.
- ServerAddressList addresses_;
+ // Current endpoint list, channel args, and ring.
+ EndpointAddressesList endpoints_;
ChannelArgs args_;
RefCountedPtr ring_;
- std::map> endpoint_map_;
+ std::map> endpoint_map_;
// TODO(roth): If we ever change the helper UpdateState() API to not
// need the status reported for TRANSIENT_FAILURE state (because
@@ -373,39 +373,40 @@ RingHash::PickResult RingHash::Picker::Pick(PickArgs args) {
RingHash::Ring::Ring(RingHash* ring_hash, RingHashLbConfig* config) {
// Store the weights while finding the sum.
- struct AddressWeight {
- std::string address;
+ struct EndpointWeight {
+ std::string address; // Key by endpoint's first address.
// Default weight is 1 for the cases where a weight is not provided,
// each occurrence of the address will be counted a weight value of 1.
uint32_t weight = 1;
double normalized_weight;
};
- std::vector address_weights;
+ std::vector endpoint_weights;
size_t sum = 0;
- const ServerAddressList& addresses = ring_hash->addresses_;
- address_weights.reserve(addresses.size());
- for (const auto& address : addresses) {
- AddressWeight address_weight;
- address_weight.address =
- grpc_sockaddr_to_string(&address.address(), false).value();
+ const EndpointAddressesList& endpoints = ring_hash->endpoints_;
+ endpoint_weights.reserve(endpoints.size());
+ for (const auto& endpoint : endpoints) {
+ EndpointWeight endpoint_weight;
+ endpoint_weight.address =
+ grpc_sockaddr_to_string(&endpoint.addresses().front(), false).value();
// Weight should never be zero, but ignore it just in case, since
// that value would screw up the ring-building algorithm.
- auto weight_arg = address.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT);
+ auto weight_arg = endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT);
if (weight_arg.value_or(0) > 0) {
- address_weight.weight = *weight_arg;
+ endpoint_weight.weight = *weight_arg;
}
- sum += address_weight.weight;
- address_weights.push_back(std::move(address_weight));
+ sum += endpoint_weight.weight;
+ endpoint_weights.push_back(std::move(endpoint_weight));
}
// Calculating normalized weights and find min and max.
double min_normalized_weight = 1.0;
double max_normalized_weight = 0.0;
- for (auto& address : address_weights) {
- address.normalized_weight = static_cast(address.weight) / sum;
+ for (auto& endpoint_weight : endpoint_weights) {
+ endpoint_weight.normalized_weight =
+ static_cast(endpoint_weight.weight) / sum;
min_normalized_weight =
- std::min(address.normalized_weight, min_normalized_weight);
+ std::min(endpoint_weight.normalized_weight, min_normalized_weight);
max_normalized_weight =
- std::max(address.normalized_weight, max_normalized_weight);
+ std::max(endpoint_weight.normalized_weight, max_normalized_weight);
}
// Scale up the number of hashes per host such that the least-weighted host
// gets a whole number of hashes on the ring. Other hosts might not end up
@@ -435,12 +436,12 @@ RingHash::Ring::Ring(RingHash* ring_hash, RingHashLbConfig* config) {
double target_hashes = 0.0;
uint64_t min_hashes_per_host = ring_size;
uint64_t max_hashes_per_host = 0;
- for (size_t i = 0; i < addresses.size(); ++i) {
- const std::string& address_string = address_weights[i].address;
+ for (size_t i = 0; i < endpoints.size(); ++i) {
+ const std::string& address_string = endpoint_weights[i].address;
hash_key_buffer.assign(address_string.begin(), address_string.end());
hash_key_buffer.emplace_back('_');
auto offset_start = hash_key_buffer.end();
- target_hashes += scale * address_weights[i].normalized_weight;
+ target_hashes += scale * endpoint_weights[i].normalized_weight;
size_t count = 0;
while (current_hashes < target_hashes) {
const std::string count_str = absl::StrCat(count);
@@ -536,12 +537,12 @@ void RingHash::RingHashEndpoint::CreateChildPolicy() {
CoreConfiguration::Get().lb_policy_registry().CreateLoadBalancingPolicy(
"pick_first", std::move(lb_policy_args));
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
- const ServerAddress& address = ring_hash_->addresses_[index_];
+ const EndpointAddresses& endpoint = ring_hash_->endpoints_[index_];
gpr_log(GPR_INFO,
"[RH %p] endpoint %p (index %" PRIuPTR " of %" PRIuPTR
", %s): created child policy %p",
- ring_hash_.get(), this, index_, ring_hash_->addresses_.size(),
- address.ToString().c_str(), child_policy_.get());
+ ring_hash_.get(), this, index_, ring_hash_->endpoints_.size(),
+ endpoint.ToString().c_str(), child_policy_.get());
}
// Add our interested_parties pollset_set to that of the newly created
// child policy. This will make the child policy progress upon activity on
@@ -560,7 +561,7 @@ void RingHash::RingHashEndpoint::UpdateChildPolicyLocked() {
GPR_ASSERT(config.ok());
// Update child policy.
LoadBalancingPolicy::UpdateArgs update_args;
- update_args.addresses.emplace().emplace_back(ring_hash_->addresses_[index_]);
+ update_args.addresses.emplace().emplace_back(ring_hash_->endpoints_[index_]);
update_args.args = ring_hash_->args_;
update_args.config = std::move(*config);
// TODO(roth): If the child reports a non-OK status with the update,
@@ -577,7 +578,7 @@ void RingHash::RingHashEndpoint::OnStateUpdate(
"[RH %p] connectivity changed for endpoint %p (%s, child_policy=%p): "
"prev_state=%s new_state=%s (%s)",
ring_hash_.get(), this,
- ring_hash_->addresses_[index_].ToString().c_str(), child_policy_.get(),
+ ring_hash_->endpoints_[index_].ToString().c_str(), child_policy_.get(),
ConnectivityStateName(connectivity_state_),
ConnectivityStateName(new_state), status.ToString().c_str());
}
@@ -631,7 +632,7 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
gpr_log(GPR_INFO, "[RH %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
}
- addresses_ = *std::move(args.addresses);
+ endpoints_ = *std::move(args.addresses);
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO, "[RH %p] received update with addresses error: %s",
@@ -639,7 +640,7 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
}
// If we already have an endpoint list, then keep using the existing
// list, but still report back that the update was not accepted.
- if (!addresses_.empty()) return args.addresses.status();
+ if (!endpoints_.empty()) return args.addresses.status();
}
// Save channel args.
args_ = std::move(args.args);
@@ -647,24 +648,23 @@ absl::Status RingHash::UpdateLocked(UpdateArgs args) {
ring_ = MakeRefCounted(
this, static_cast(args.config.get()));
// Update endpoint map.
- std::map> endpoint_map;
- for (size_t i = 0; i < addresses_.size(); ++i) {
- const ServerAddress& address = addresses_[i];
+ std::map> endpoint_map;
+ for (size_t i = 0; i < endpoints_.size(); ++i) {
+ const EndpointAddresses& addresses = endpoints_[i];
+ const EndpointAddressSet address_set(addresses.addresses());
// If present in old map, retain it; otherwise, create a new one.
- auto it = endpoint_map_.find(address);
+ auto it = endpoint_map_.find(address_set);
if (it != endpoint_map_.end()) {
it->second->UpdateLocked(i);
- endpoint_map.emplace(address, std::move(it->second));
+ endpoint_map.emplace(address_set, std::move(it->second));
} else {
- endpoint_map.emplace(address, MakeOrphanable(Ref(), i));
+ endpoint_map.emplace(address_set,
+ MakeOrphanable(Ref(), i));
}
}
endpoint_map_ = std::move(endpoint_map);
// If the address list is empty, report TRANSIENT_FAILURE.
- // TODO(roth): As part of adding dualstack backend support, we need to
- // also handle the case where the list of addresses for a given
- // endpoint is empty.
- if (addresses_.empty()) {
+ if (endpoints_.empty()) {
absl::Status status =
args.addresses.ok() ? absl::UnavailableError(absl::StrCat(
"empty address list: ", args.resolution_note))
@@ -726,7 +726,7 @@ void RingHash::UpdateAggregatedConnectivityStateLocked(
start_connection_attempt = true;
} else if (num_connecting > 0) {
state = GRPC_CHANNEL_CONNECTING;
- } else if (num_transient_failure == 1 && addresses_.size() > 1) {
+ } else if (num_transient_failure == 1 && endpoints_.size() > 1) {
state = GRPC_CHANNEL_CONNECTING;
start_connection_attempt = true;
} else if (num_idle > 0) {
@@ -742,7 +742,7 @@ void RingHash::UpdateAggregatedConnectivityStateLocked(
", num_transient_failure=%" PRIuPTR ", size=%" PRIuPTR
") -- start_connection_attempt=%d",
this, ConnectivityStateName(state), num_idle, num_connecting,
- num_ready, num_transient_failure, addresses_.size(),
+ num_ready, num_transient_failure, endpoints_.size(),
start_connection_attempt);
}
// In TRANSIENT_FAILURE, report the last reported failure.
@@ -794,29 +794,31 @@ void RingHash::UpdateAggregatedConnectivityStateLocked(
// CONNECTING, just to ensure that we don't remain in CONNECTING state
// indefinitely if there are no new picks coming in.
if (start_connection_attempt && entered_transient_failure) {
- size_t first_idle_index = addresses_.size();
- for (size_t i = 0; i < addresses_.size(); ++i) {
- auto it = endpoint_map_.find(addresses_[i]);
+ size_t first_idle_index = endpoints_.size();
+ for (size_t i = 0; i < endpoints_.size(); ++i) {
+ auto it =
+ endpoint_map_.find(EndpointAddressSet(endpoints_[i].addresses()));
GPR_ASSERT(it != endpoint_map_.end());
if (it->second->connectivity_state() == GRPC_CHANNEL_CONNECTING) {
- first_idle_index = addresses_.size();
+ first_idle_index = endpoints_.size();
break;
}
- if (first_idle_index == addresses_.size() &&
+ if (first_idle_index == endpoints_.size() &&
it->second->connectivity_state() == GRPC_CHANNEL_IDLE) {
first_idle_index = i;
}
}
- if (first_idle_index != addresses_.size()) {
- auto it = endpoint_map_.find(addresses_[first_idle_index]);
+ if (first_idle_index != endpoints_.size()) {
+ auto it = endpoint_map_.find(
+ EndpointAddressSet(endpoints_[first_idle_index].addresses()));
GPR_ASSERT(it != endpoint_map_.end());
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_ring_hash_trace)) {
gpr_log(GPR_INFO,
"[RH %p] triggering internal connection attempt for endpoint "
"%p (%s) (index %" PRIuPTR " of %" PRIuPTR ")",
this, it->second.get(),
- addresses_[first_idle_index].ToString().c_str(),
- first_idle_index, addresses_.size());
+ endpoints_[first_idle_index].ToString().c_str(),
+ first_idle_index, endpoints_.size());
}
it->second->RequestConnectionLocked();
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
index 7a23aa8308440..dc71d4e4409ee 100644
--- a/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/rls/rls.cc
@@ -92,8 +92,8 @@
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/resolver/resolver_registry.h"
-#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/security/credentials/credentials.h"
#include "src/core/lib/security/credentials/fake/fake_credentials.h"
#include "src/core/lib/service_config/service_config_impl.h"
@@ -709,7 +709,7 @@ class RlsLb : public LoadBalancingPolicy {
OrphanablePtr rls_channel_ ABSL_GUARDED_BY(mu_);
// Accessed only from within WorkSerializer.
- absl::StatusOr addresses_;
+ absl::StatusOr addresses_;
ChannelArgs channel_args_;
RefCountedPtr config_;
RefCountedPtr default_child_policy_;
@@ -1877,7 +1877,7 @@ absl::Status RlsLb::UpdateLocked(UpdateArgs args) {
// Swap out addresses.
// If the new address list is an error and we have an existing address list,
// stick with the existing addresses.
- absl::StatusOr old_addresses;
+ absl::StatusOr old_addresses;
if (args.addresses.ok()) {
old_addresses = std::move(addresses_);
addresses_ = std::move(args.addresses);
diff --git a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
index 2a556d6448903..87d291ca8580f 100644
--- a/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/round_robin/round_robin.cc
@@ -51,6 +51,7 @@
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -523,17 +524,17 @@ class RoundRobin : public LoadBalancingPolicy {
class RoundRobinEndpointList : public EndpointList {
public:
RoundRobinEndpointList(RefCountedPtr round_robin,
- const ServerAddressList& addresses,
+ const EndpointAddressesList& endpoints,
const ChannelArgs& args)
: EndpointList(std::move(round_robin),
GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)
? "RoundRobinEndpointList"
: nullptr) {
- Init(addresses, args,
+ Init(endpoints, args,
[&](RefCountedPtr endpoint_list,
- const ServerAddress& address, const ChannelArgs& args) {
+ const EndpointAddresses& addresses, const ChannelArgs& args) {
return MakeOrphanable(
- std::move(endpoint_list), address, args,
+ std::move(endpoint_list), addresses, args,
policy()->work_serializer());
});
}
@@ -542,10 +543,11 @@ class RoundRobin : public LoadBalancingPolicy {
class RoundRobinEndpoint : public Endpoint {
public:
RoundRobinEndpoint(RefCountedPtr endpoint_list,
- const ServerAddress& address, const ChannelArgs& args,
+ const EndpointAddresses& addresses,
+ const ChannelArgs& args,
std::shared_ptr work_serializer)
: Endpoint(std::move(endpoint_list)) {
- Init(address, args, std::move(work_serializer));
+ Init(addresses, args, std::move(work_serializer));
}
private:
@@ -685,10 +687,10 @@ void RoundRobin::ResetBackoffLocked() {
}
absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
- ServerAddressList addresses;
+ EndpointAddressesList addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace)) {
- gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " addresses",
+ gpr_log(GPR_INFO, "[RR %p] received update with %" PRIuPTR " endpoints",
this, args.addresses->size());
}
addresses = std::move(*args.addresses);
@@ -712,9 +714,6 @@ absl::Status RoundRobin::UpdateLocked(UpdateArgs args) {
args.args);
// If the new list is empty, immediately promote it to
// endpoint_list_ and report TRANSIENT_FAILURE.
- // TODO(roth): As part of adding dualstack backend support, we need to
- // also handle the case where the list of addresses for a given
- // endpoint is empty.
if (latest_pending_endpoint_list_->size() == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_round_robin_trace) &&
endpoint_list_ != nullptr) {
@@ -838,7 +837,6 @@ void RoundRobin::RoundRobinEndpointList::
}
// Only set connectivity state if this is the current child list.
if (round_robin->endpoint_list_.get() != this) return;
- // FIXME: scan children each time instead of keeping counters?
// First matching rule wins:
// 1) ANY child is READY => policy is READY.
// 2) ANY child is CONNECTING => policy is CONNECTING.
diff --git a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
index 7e9b4df8648d2..4738479486975 100644
--- a/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
+++ b/src/core/ext/filters/client_channel/lb_policy/subchannel_list.h
@@ -380,7 +380,7 @@ SubchannelList::SubchannelList(
// Create a subchannel for each address.
for (ServerAddress address : addresses) {
RefCountedPtr subchannel =
- helper->CreateSubchannel(address, args);
+ helper->CreateSubchannel(address.address(), address.args(), args);
if (subchannel == nullptr) {
// Subchannel could not be created.
if (GPR_UNLIKELY(tracer_ != nullptr)) {
diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
index 353eb720f4935..3d35716177a8e 100644
--- a/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/weighted_round_robin/weighted_round_robin.cc
@@ -73,6 +73,7 @@
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/transport/connectivity_state.h"
@@ -1009,7 +1010,8 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
// Represents the weight for a given address.
class EndpointWeight : public RefCounted {
public:
- EndpointWeight(RefCountedPtr wrr, std::string key)
+ EndpointWeight(RefCountedPtr wrr,
+ EndpointAddressSet key)
: wrr_(std::move(wrr)), key_(std::move(key)) {}
~EndpointWeight() override;
@@ -1023,7 +1025,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
private:
RefCountedPtr wrr_;
- const std::string key_;
+ const EndpointAddressSet key_;
Mutex mu_;
float weight_ ABSL_GUARDED_BY(&mu_) = 0;
@@ -1036,12 +1038,12 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
class WrrEndpoint : public Endpoint {
public:
WrrEndpoint(RefCountedPtr endpoint_list,
- const ServerAddress& address, const ChannelArgs& args,
+ const EndpointAddresses& addresses, const ChannelArgs& args,
std::shared_ptr work_serializer)
: Endpoint(std::move(endpoint_list)),
weight_(policy()->GetOrCreateWeight(
- address.address())) {
- Init(address, args, std::move(work_serializer));
+ addresses.addresses())) {
+ Init(addresses, args, std::move(work_serializer));
}
RefCountedPtr weight() const { return weight_; }
@@ -1063,7 +1065,9 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
};
RefCountedPtr CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) override;
+ const grpc_resolved_address& address,
+ const ChannelArgs& per_address_args,
+ const ChannelArgs& args) override;
// Called when the child policy reports a connectivity state update.
void OnStateUpdate(absl::optional old_state,
@@ -1074,16 +1078,17 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
};
WrrEndpointList(RefCountedPtr wrr,
- const ServerAddressList& addresses, const ChannelArgs& args)
+ const EndpointAddressesList& endpoints,
+ const ChannelArgs& args)
: EndpointList(std::move(wrr),
GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)
? "WrrEndpointList"
: nullptr) {
- Init(addresses, args,
+ Init(endpoints, args,
[&](RefCountedPtr endpoint_list,
- const ServerAddress& address, const ChannelArgs& args) {
+ const EndpointAddresses& addresses, const ChannelArgs& args) {
return MakeOrphanable(
- std::move(endpoint_list), address, args,
+ std::move(endpoint_list), addresses, args,
policy()->work_serializer());
});
}
@@ -1192,7 +1197,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
void ShutdownLocked() override;
RefCountedPtr GetOrCreateWeight(
- const grpc_resolved_address& address);
+ const std::vector& addresses);
RefCountedPtr config_;
@@ -1205,7 +1210,7 @@ class WeightedRoundRobin : public LoadBalancingPolicy {
OrphanablePtr latest_pending_endpoint_list_;
Mutex endpoint_weight_map_mu_;
- std::map> endpoint_weight_map_
+ std::map endpoint_weight_map_
ABSL_GUARDED_BY(&endpoint_weight_map_mu_);
bool shutdown_ = false;
@@ -1245,7 +1250,7 @@ void WeightedRoundRobin::EndpointWeight::MaybeUpdateWeight(
gpr_log(GPR_INFO,
"[WRR %p] subchannel %s: qps=%f, eps=%f, utilization=%f: "
"error_util_penalty=%f, weight=%f (not updating)",
- wrr_.get(), key_.c_str(), qps, eps, utilization,
+ wrr_.get(), key_.ToString().c_str(), qps, eps, utilization,
error_utilization_penalty, weight);
}
return;
@@ -1258,7 +1263,7 @@ void WeightedRoundRobin::EndpointWeight::MaybeUpdateWeight(
"[WRR %p] subchannel %s: qps=%f, eps=%f, utilization=%f "
"error_util_penalty=%f : setting weight=%f weight_=%f now=%s "
"last_update_time_=%s non_empty_since_=%s",
- wrr_.get(), key_.c_str(), qps, eps, utilization,
+ wrr_.get(), key_.ToString().c_str(), qps, eps, utilization,
error_utilization_penalty, weight, weight_, now.ToString().c_str(),
last_update_time_.ToString().c_str(),
non_empty_since_.ToString().c_str());
@@ -1277,7 +1282,7 @@ float WeightedRoundRobin::EndpointWeight::GetWeight(
"[WRR %p] subchannel %s: getting weight: now=%s "
"weight_expiration_period=%s blackout_period=%s "
"last_update_time_=%s non_empty_since_=%s weight_=%f",
- wrr_.get(), key_.c_str(), now.ToString().c_str(),
+ wrr_.get(), key_.ToString().c_str(), now.ToString().c_str(),
weight_expiration_period.ToString().c_str(),
blackout_period.ToString().c_str(),
last_update_time_.ToString().c_str(),
@@ -1510,59 +1515,56 @@ void WeightedRoundRobin::ResetBackoffLocked() {
absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
global_stats().IncrementWrrUpdates();
config_ = std::move(args.config);
- ServerAddressList addresses;
+ EndpointAddressesList addresses;
if (args.addresses.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] received update with %" PRIuPTR " addresses",
this, args.addresses->size());
}
- // Weed out duplicate addresses. Also sort the addresses so that if
- // the set of the addresses don't change, their indexes in the
- // subchannel list don't change, since this avoids unnecessary churn
- // in the picker. Note that this does not ensure that if a given
- // address remains present that it will have the same index; if,
- // for example, an address at the end of the list is replaced with one
- // that sorts much earlier in the list, then all of the addresses in
- // between those two positions will have changed indexes.
- struct AddressLessThan {
- bool operator()(const ServerAddress& address1,
- const ServerAddress& address2) const {
- const grpc_resolved_address& addr1 = address1.address();
- const grpc_resolved_address& addr2 = address2.address();
- if (addr1.len != addr2.len) return addr1.len < addr2.len;
- return memcmp(addr1.addr, addr2.addr, addr1.len) < 0;
+ // Weed out duplicate endpoints. Also sort the endpoints so that if
+ // the set of endpoints doesn't change, their indexes in the endpoint
+ // list don't change, since this avoids unnecessary churn in the
+ // picker. Note that this does not ensure that if a given endpoint
+ // remains present that it will have the same index; if, for example,
+ // an endpoint at the end of the list is replaced with one that sorts
+ // much earlier in the list, then all of the endpoints in between those
+ // two positions will have changed indexes.
+ struct EndpointAddressesLessThan {
+ bool operator()(const EndpointAddresses& endpoint1,
+ const EndpointAddresses& endpoint2) const {
+ // Compare unordered addresses only, not channel args.
+ EndpointAddressSet e1(endpoint1.addresses());
+ EndpointAddressSet e2(endpoint2.addresses());
+ return e1 < e2;
}
};
- std::set ordered_addresses(
+ std::set ordered_addresses(
args.addresses->begin(), args.addresses->end());
- addresses =
- ServerAddressList(ordered_addresses.begin(), ordered_addresses.end());
+ addresses = EndpointAddressesList(ordered_addresses.begin(),
+ ordered_addresses.end());
} else {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace)) {
gpr_log(GPR_INFO, "[WRR %p] received update with address error: %s", this,
args.addresses.status().ToString().c_str());
}
- // If we already have a subchannel list, then keep using the existing
+ // If we already have an endpoint list, then keep using the existing
// list, but still report back that the update was not accepted.
if (endpoint_list_ != nullptr) return args.addresses.status();
}
- // Create new subchannel list, replacing the previous pending list, if any.
+ // Create new endpoint list, replacing the previous pending list, if any.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) &&
latest_pending_endpoint_list_ != nullptr) {
- gpr_log(GPR_INFO, "[WRR %p] replacing previous pending subchannel list %p",
+ gpr_log(GPR_INFO, "[WRR %p] replacing previous pending endpoint list %p",
this, latest_pending_endpoint_list_.get());
}
latest_pending_endpoint_list_ =
MakeOrphanable(Ref(), std::move(addresses), args.args);
// If the new list is empty, immediately promote it to
// endpoint_list_ and report TRANSIENT_FAILURE.
- // TODO(roth): As part of adding dualstack backend support, we need to
- // also handle the case where the list of addresses for a given
- // endpoint is empty.
if (latest_pending_endpoint_list_->size() == 0) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_wrr_trace) &&
endpoint_list_ != nullptr) {
- gpr_log(GPR_INFO, "[WRR %p] replacing previous subchannel list %p", this,
+ gpr_log(GPR_INFO, "[WRR %p] replacing previous endpoint list %p", this,
endpoint_list_.get());
}
endpoint_list_ = std::move(latest_pending_endpoint_list_);
@@ -1584,18 +1586,18 @@ absl::Status WeightedRoundRobin::UpdateLocked(UpdateArgs args) {
}
RefCountedPtr
-WeightedRoundRobin::GetOrCreateWeight(const grpc_resolved_address& address) {
- auto key = grpc_sockaddr_to_uri(&address);
- if (!key.ok()) return nullptr;
+WeightedRoundRobin::GetOrCreateWeight(
+ const std::vector& addresses) {
+ EndpointAddressSet key(addresses);
MutexLock lock(&endpoint_weight_map_mu_);
- auto it = endpoint_weight_map_.find(*key);
+ auto it = endpoint_weight_map_.find(key);
if (it != endpoint_weight_map_.end()) {
auto weight = it->second->RefIfNonZero();
if (weight != nullptr) return weight;
}
auto weight = MakeRefCounted(
- Ref(DEBUG_LOCATION, "EndpointWeight"), *key);
- endpoint_weight_map_.emplace(*key, weight.get());
+ Ref(DEBUG_LOCATION, "EndpointWeight"), key);
+ endpoint_weight_map_.emplace(key, weight.get());
return weight;
}
@@ -1619,10 +1621,11 @@ void WeightedRoundRobin::WrrEndpointList::WrrEndpoint::OobWatcher::
RefCountedPtr
WeightedRoundRobin::WrrEndpointList::WrrEndpoint::CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) {
+ const grpc_resolved_address& address, const ChannelArgs& per_address_args,
+ const ChannelArgs& args) {
auto* wrr = policy();
- auto subchannel =
- wrr->channel_control_helper()->CreateSubchannel(std::move(address), args);
+ auto subchannel = wrr->channel_control_helper()->CreateSubchannel(
+ address, per_address_args, args);
// Start OOB watch if configured.
if (wrr->config_->enable_oob_load_report()) {
subchannel->AddDataWatcher(MakeOobBackendMetricWatcher(
@@ -1657,7 +1660,7 @@ void WeightedRoundRobin::WrrEndpointList::WrrEndpoint::OnStateUpdate(
} else if (new_state == GRPC_CHANNEL_READY) {
// If we transition back to READY state, restart the blackout period.
// Skip this if this is the initial notification for this
- // subchannel (which happens whenever we get updated addresses and
+ // endpoint (which happens whenever we get updated addresses and
// create a new endpoint list). Also skip it if the previous state
// was READY (which should never happen in practice, but we've seen
// at least one bug that caused this in the outlier_detection
diff --git a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
index b3ccbbf9e8f9f..fad7aa96857e4 100644
--- a/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/weighted_target/weighted_target.cc
@@ -60,7 +60,7 @@
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/transport/connectivity_state.h"
// IWYU pragma: no_include
@@ -157,7 +157,7 @@ class WeightedTargetLb : public LoadBalancingPolicy {
void Orphan() override;
absl::Status UpdateLocked(const WeightedTargetLbConfig::ChildConfig& config,
- absl::StatusOr addresses,
+ absl::StatusOr addresses,
const std::string& resolution_note,
const ChannelArgs& args);
void ResetBackoffLocked();
@@ -337,7 +337,7 @@ absl::Status WeightedTargetLb::UpdateLocked(UpdateArgs args) {
target = MakeOrphanable(
Ref(DEBUG_LOCATION, "WeightedChild"), name);
}
- absl::StatusOr addresses;
+ absl::StatusOr addresses;
if (address_map.ok()) {
auto it = address_map->find(name);
if (it == address_map->end()) {
@@ -588,7 +588,7 @@ WeightedTargetLb::WeightedChild::CreateChildPolicyLocked(
absl::Status WeightedTargetLb::WeightedChild::UpdateLocked(
const WeightedTargetLbConfig::ChildConfig& config,
- absl::StatusOr addresses,
+ absl::StatusOr addresses,
const std::string& resolution_note, const ChannelArgs& args) {
if (weighted_target_policy_->shutting_down_) return absl::OkStatus();
// Update child weight.
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h
index c7bbf197da088..1df82d2fa1823 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_channel_args.h
@@ -19,7 +19,7 @@
#include
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
// Channel arg indicating the xDS cluster name.
// Set by xds_cluster_impl LB policy and used by GoogleDefaultCredentials.
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
index 158428996f76f..50740151c9d93 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_impl.cc
@@ -56,6 +56,7 @@
#include "src/core/lib/gprpp/sync.h"
#include "src/core/lib/gprpp/validation_errors.h"
#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
@@ -64,7 +65,7 @@
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
@@ -236,7 +237,8 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
std::move(xds_cluster_impl_policy)) {}
RefCountedPtr CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) override;
+ const grpc_resolved_address& address,
+ const ChannelArgs& per_address_args, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr picker) override;
};
@@ -248,8 +250,8 @@ class XdsClusterImplLb : public LoadBalancingPolicy {
OrphanablePtr CreateChildPolicyLocked(
const ChannelArgs& args);
absl::Status UpdateChildPolicyLocked(
- absl::StatusOr addresses, std::string resolution_note,
- const ChannelArgs& args);
+ absl::StatusOr addresses,
+ std::string resolution_note, const ChannelArgs& args);
void MaybeUpdatePickerLocked();
@@ -567,8 +569,8 @@ OrphanablePtr XdsClusterImplLb::CreateChildPolicyLocked(
}
absl::Status XdsClusterImplLb::UpdateChildPolicyLocked(
- absl::StatusOr addresses, std::string resolution_note,
- const ChannelArgs& args) {
+ absl::StatusOr addresses,
+ std::string resolution_note, const ChannelArgs& args) {
// Create policy if needed.
if (child_policy_ == nullptr) {
child_policy_ = CreateChildPolicyLocked(args);
@@ -594,12 +596,13 @@ absl::Status XdsClusterImplLb::UpdateChildPolicyLocked(
//
RefCountedPtr XdsClusterImplLb::Helper::CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) {
+ const grpc_resolved_address& address, const ChannelArgs& per_address_args,
+ const ChannelArgs& args) {
if (parent()->shutting_down_) return nullptr;
// If load reporting is enabled, wrap the subchannel such that it
// includes the locality stats object, which will be used by the Picker.
if (parent()->config_->lrs_load_reporting_server().has_value()) {
- auto locality_name = address.args().GetObjectRef();
+ auto locality_name = per_address_args.GetObjectRef();
RefCountedPtr locality_stats =
parent()->xds_client_->AddClusterLocalityStats(
parent()->config_->lrs_load_reporting_server().value(),
@@ -608,7 +611,7 @@ RefCountedPtr XdsClusterImplLb::Helper::CreateSubchannel(
if (locality_stats != nullptr) {
return MakeRefCounted(
parent()->channel_control_helper()->CreateSubchannel(
- std::move(address), args),
+ address, per_address_args, args),
std::move(locality_stats));
}
gpr_log(
@@ -623,7 +626,7 @@ RefCountedPtr XdsClusterImplLb::Helper::CreateSubchannel(
}
// Load reporting not enabled, so don't wrap the subchannel.
return parent()->channel_control_helper()->CreateSubchannel(
- std::move(address), args);
+ address, per_address_args, args);
}
void XdsClusterImplLb::Helper::UpdateState(
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
index c95f9a35f8e7e..4f6e8611b5a08 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_manager.cc
@@ -59,7 +59,7 @@
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
@@ -149,7 +149,7 @@ class XdsClusterManagerLb : public LoadBalancingPolicy {
absl::Status UpdateLocked(
RefCountedPtr config,
- const absl::StatusOr& addresses,
+ const absl::StatusOr& addresses,
const ChannelArgs& args);
void ExitIdleLocked();
void ResetBackoffLocked();
@@ -482,7 +482,7 @@ XdsClusterManagerLb::ClusterChild::CreateChildPolicyLocked(
absl::Status XdsClusterManagerLb::ClusterChild::UpdateLocked(
RefCountedPtr config,
- const absl::StatusOr& addresses,
+ const absl::StatusOr& addresses,
const ChannelArgs& args) {
if (xds_cluster_manager_policy_->shutting_down_) return absl::OkStatus();
// Update child weight.
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
index 5732cc02b308d..42f8673e724a2 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_cluster_resolver.cc
@@ -69,9 +69,9 @@
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/resolver/resolver.h"
#include "src/core/lib/resolver/resolver_registry.h"
-#include "src/core/lib/resolver/server_address.h"
#define GRPC_EDS_DEFAULT_FALLBACK_TIMEOUT 10000
@@ -390,7 +390,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
absl::Status UpdateChildPolicyLocked();
OrphanablePtr CreateChildPolicyLocked(
const ChannelArgs& args);
- ServerAddressList CreateChildPolicyAddressesLocked();
+ EndpointAddressesList CreateChildPolicyAddressesLocked();
std::string CreateChildPolicyResolutionNoteLocked();
RefCountedPtr CreateChildPolicyConfigLocked();
ChannelArgs CreateChildPolicyArgsLocked(const ChannelArgs& args_in);
@@ -768,8 +768,8 @@ void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index,
// child policy-related methods
//
-ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
- ServerAddressList addresses;
+EndpointAddressesList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
+ EndpointAddressesList addresses;
for (const auto& discovery_entry : discovery_mechanisms_) {
const auto& priority_list =
GetUpdatePriorityList(*discovery_entry.latest_update);
@@ -790,7 +790,7 @@ ServerAddressList XdsClusterResolverLb::CreateChildPolicyAddressesLocked() {
locality.lb_weight *
endpoint.args().GetInt(GRPC_ARG_ADDRESS_WEIGHT).value_or(1);
addresses.emplace_back(
- endpoint.address(),
+ endpoint.addresses(),
endpoint.args()
.SetObject(hierarchical_path_attr)
.Set(GRPC_ARG_ADDRESS_WEIGHT, endpoint_weight)
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
index 7c210284aadbe..10f946a04f8e1 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_override_host.cc
@@ -65,6 +65,7 @@
#include "src/core/lib/iomgr/exec_ctx.h"
#include "src/core/lib/iomgr/iomgr_fwd.h"
#include "src/core/lib/iomgr/pollset_set.h"
+#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/json/json.h"
#include "src/core/lib/json/json_args.h"
#include "src/core/lib/json/json_object_loader.h"
@@ -73,7 +74,7 @@
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
#include "src/core/lib/load_balancing/subchannel_interface.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/transport/connectivity_state.h"
namespace grpc_core {
@@ -97,9 +98,9 @@ struct PtrLessThan {
}
};
-XdsHealthStatus GetAddressHealthStatus(const ServerAddress& address) {
+XdsHealthStatus GetEndpointHealthStatus(const EndpointAddresses& endpoint) {
return XdsHealthStatus(static_cast(
- address.args()
+ endpoint.args()
.GetInt(GRPC_ARG_XDS_HEALTH_STATUS)
.value_or(XdsHealthStatus::HealthStatus::kUnknown)));
}
@@ -224,7 +225,8 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
std::move(xds_override_host_policy)) {}
RefCountedPtr CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) override;
+ const grpc_resolved_address& address,
+ const ChannelArgs& per_address_args, const ChannelArgs& args) override;
void UpdateState(grpc_connectivity_state state, const absl::Status& status,
RefCountedPtr picker) override;
};
@@ -285,11 +287,12 @@ class XdsOverrideHostLb : public LoadBalancingPolicy {
void MaybeUpdatePickerLocked();
- absl::StatusOr UpdateAddressMap(
- absl::StatusOr addresses);
+ absl::StatusOr UpdateAddressMap(
+ absl::StatusOr endpoints);
RefCountedPtr AdoptSubchannel(
- ServerAddress address, RefCountedPtr subchannel);
+ const grpc_resolved_address& address,
+ RefCountedPtr subchannel);
void UnsetSubchannel(absl::string_view key, SubchannelWrapper* subchannel);
@@ -501,43 +504,45 @@ OrphanablePtr XdsOverrideHostLb::CreateChildPolicyLocked(
return lb_policy;
}
-absl::StatusOr XdsOverrideHostLb::UpdateAddressMap(
- absl::StatusOr addresses) {
- if (!addresses.ok()) {
+absl::StatusOr XdsOverrideHostLb::UpdateAddressMap(
+ absl::StatusOr endpoints) {
+ if (!endpoints.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO, "[xds_override_host_lb %p] address error: %s", this,
- addresses.status().ToString().c_str());
+ endpoints.status().ToString().c_str());
}
- return addresses;
+ return endpoints;
}
- ServerAddressList return_value;
+ // TODO(roth): As we clarify this part of the dualstack design, add
+ // support for multiple addresses per endpoint.
+ EndpointAddressesList return_value;
std::map addresses_for_map;
- for (const auto& address : *addresses) {
- XdsHealthStatus status = GetAddressHealthStatus(address);
+ for (const auto& endpoint : *endpoints) {
+ XdsHealthStatus status = GetEndpointHealthStatus(endpoint);
if (status.status() != XdsHealthStatus::kDraining) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
- "[xds_override_host_lb %p] address %s: not draining, "
+ "[xds_override_host_lb %p] endpoint %s: not draining, "
"passing to child",
- this, address.ToString().c_str());
+ this, endpoint.ToString().c_str());
}
- return_value.push_back(address);
+ return_value.push_back(endpoint);
} else if (!config_->override_host_status_set().Contains(status)) {
// Skip draining hosts if not in the override status set.
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
- "[xds_override_host_lb %p] address %s: draining but not in "
+ "[xds_override_host_lb %p] endpoint %s: draining but not in "
"override_host_status set -- ignoring",
- this, address.ToString().c_str());
+ this, endpoint.ToString().c_str());
}
continue;
}
- auto key = grpc_sockaddr_to_uri(&address.address());
+ auto key = grpc_sockaddr_to_uri(&endpoint.address());
if (key.ok()) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_override_host_trace)) {
gpr_log(GPR_INFO,
- "[xds_override_host_lb %p] address %s: adding map key %s", this,
- address.ToString().c_str(), key->c_str());
+ "[xds_override_host_lb %p] endpoint %s: adding map key %s",
+ this, endpoint.ToString().c_str(), key->c_str());
}
addresses_for_map.emplace(std::move(*key), status);
}
@@ -581,8 +586,9 @@ absl::StatusOr XdsOverrideHostLb::UpdateAddressMap(
RefCountedPtr
XdsOverrideHostLb::AdoptSubchannel(
- ServerAddress address, RefCountedPtr subchannel) {
- auto key = grpc_sockaddr_to_uri(&address.address());
+ const grpc_resolved_address& address,
+ RefCountedPtr subchannel) {
+ auto key = grpc_sockaddr_to_uri(&address);
if (!key.ok()) {
return subchannel;
}
@@ -646,9 +652,10 @@ void XdsOverrideHostLb::OnSubchannelConnectivityStateChange(
//
RefCountedPtr XdsOverrideHostLb::Helper::CreateSubchannel(
- ServerAddress address, const ChannelArgs& args) {
- auto subchannel =
- parent()->channel_control_helper()->CreateSubchannel(address, args);
+ const grpc_resolved_address& address, const ChannelArgs& per_address_args,
+ const ChannelArgs& args) {
+ auto subchannel = parent()->channel_control_helper()->CreateSubchannel(
+ address, per_address_args, args);
return parent()->AdoptSubchannel(address, subchannel);
}
diff --git a/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc b/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc
index fa4918633c96d..26f0ec9084a28 100644
--- a/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc
+++ b/src/core/ext/filters/client_channel/lb_policy/xds/xds_wrr_locality.cc
@@ -51,7 +51,7 @@
#include "src/core/lib/load_balancing/lb_policy.h"
#include "src/core/lib/load_balancing/lb_policy_factory.h"
#include "src/core/lib/load_balancing/lb_policy_registry.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
namespace grpc_core {
diff --git a/src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc b/src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc
index 1f782b5e254df..f0fac9231557b 100644
--- a/src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc
+++ b/src/core/ext/filters/client_channel/resolver/binder/binder_resolver.cc
@@ -43,9 +43,9 @@
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/error.h"
#include "src/core/lib/iomgr/resolved_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/resolver/resolver.h"
#include "src/core/lib/resolver/resolver_factory.h"
-#include "src/core/lib/resolver/server_address.h"
#include "src/core/lib/uri/uri_parser.h"
namespace grpc_core {
@@ -53,7 +53,7 @@ namespace {
class BinderResolver : public Resolver {
public:
- BinderResolver(ServerAddressList addresses, ResolverArgs args)
+ BinderResolver(EndpointAddressesList addresses, ResolverArgs args)
: result_handler_(std::move(args.result_handler)),
addresses_(std::move(addresses)),
channel_args_(std::move(args.args)) {}
@@ -70,7 +70,7 @@ class BinderResolver : public Resolver {
private:
std::unique_ptr result_handler_;
- ServerAddressList addresses_;
+ EndpointAddressesList addresses_;
ChannelArgs channel_args_;
};
@@ -83,7 +83,7 @@ class BinderResolverFactory : public ResolverFactory {
}
OrphanablePtr CreateResolver(ResolverArgs args) const override {
- ServerAddressList addresses;
+ EndpointAddressesList addresses;
if (!ParseUri(args.uri, &addresses)) return nullptr;
return MakeOrphanable(std::move(addresses),
std::move(args));
@@ -116,7 +116,7 @@ class BinderResolverFactory : public ResolverFactory {
return absl::OkStatus();
}
- static bool ParseUri(const URI& uri, ServerAddressList* addresses) {
+ static bool ParseUri(const URI& uri, EndpointAddressesList* addresses) {
grpc_resolved_address addr;
{
if (!uri.authority().empty()) {
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
index 47706e2cbb84f..c85c850a87d7c 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/dns_resolver_ares.cc
@@ -70,7 +70,7 @@
#include "src/core/lib/channel/channel_args.h"
#include "src/core/lib/config/config_vars.h"
#include "src/core/lib/iomgr/resolve_address.h"
-#include "src/core/lib/resolver/server_address.h"
+#include "src/core/lib/resolver/endpoint_addresses.h"
#include "src/core/lib/service_config/service_config_impl.h"
#include "src/core/lib/transport/error_utils.h"
@@ -178,9 +178,9 @@ class AresClientChannelDNSResolver : public PollingResolver {
std::unique_ptr txt_request_
ABSL_GUARDED_BY(on_resolved_mu_);
// Output fields from ares request.
- std::unique_ptr addresses_
+ std::unique_ptr addresses_
ABSL_GUARDED_BY(on_resolved_mu_);
- std::unique_ptr balancer_addresses_
+ std::unique_ptr balancer_addresses_
ABSL_GUARDED_BY(on_resolved_mu_);
char* service_config_json_ ABSL_GUARDED_BY(on_resolved_mu_) = nullptr;
};
@@ -299,7 +299,7 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
if (addresses_ != nullptr) {
result.addresses = std::move(*addresses_);
} else {
- result.addresses = ServerAddressList();
+ result.addresses.emplace();
}
if (service_config_json_ != nullptr) {
auto service_config_string = ChooseServiceConfig(service_config_json_);
@@ -320,8 +320,8 @@ AresClientChannelDNSResolver::AresRequestWrapper::OnResolvedLocked(
}
}
if (balancer_addresses_ != nullptr) {
- result.args = SetGrpcLbBalancerAddresses(
- result.args, ServerAddressList(*balancer_addresses_));
+ result.args =
+ SetGrpcLbBalancerAddresses(result.args, *balancer_addresses_);
}
} else {
GRPC_CARES_TRACE_LOG("resolver:%p dns resolution failed: %s", this,
@@ -535,7 +535,7 @@ class AresDNSResolver : public DNSResolver {
absl::StatusOr>)>
on_resolve_address_done_;
// currently resolving addresses
- std::unique_ptr addresses_;
+ std::unique_ptr addresses_;
};
class AresSRVRequest : public AresRequest {
@@ -583,7 +583,7 @@ class AresDNSResolver : public DNSResolver {
absl::StatusOr>)>
on_resolve_address_done_;
// currently resolving addresses
- std::unique_ptr balancer_addresses_;
+ std::unique_ptr balancer_addresses_;
};
class AresTXTRequest : public AresRequest {
diff --git a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
index de913f360d2e8..c4ffb3975a1b3 100644
--- a/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
+++ b/src/core/ext/filters/client_channel/resolver/dns/c_ares/grpc_ares_wrapper.cc
@@ -73,8 +73,8 @@
#include "src/core/lib/iomgr/resolved_address.h"
#include "src/core/lib/iomgr/timer.h"
-using grpc_core::ServerAddress;
-using grpc_core::ServerAddressList;
+using grpc_core::EndpointAddresses;
+using grpc_core::EndpointAddressesList;
grpc_core::TraceFlag grpc_trace_cares_address_sorting(false,
"cares_address_sorting");
@@ -557,7 +557,7 @@ grpc_error_handle grpc_ares_ev_driver_create_locked(
}
static void log_address_sorting_list(const grpc_ares_request* r,
- const ServerAddressList& addresses,
+ const EndpointAddressesList& addresses,
const char* input_output_str) {
for (size_t i = 0; i < addresses.size(); i++) {
auto addr_str = grpc_sockaddr_to_string(&addresses[i].address(), true);
@@ -571,7 +571,7 @@ static void log_address_sorting_list(const grpc_ares_request* r,
}
void grpc_cares_wrapper_address_sorting_sort(const grpc_ares_request* r,
- ServerAddressList* addresses) {
+ EndpointAddressesList* addresses) {
if (GRPC_TRACE_FLAG_ENABLED(grpc_trace_cares_address_sorting)) {
log_address_sorting_list(r, *addresses, "input");
}
@@ -584,10 +584,11 @@ void grpc_cares_wrapper_address_sorting_sort(const grpc_ares_request* r,
sortables[i].dest_addr.len = (*addresses)[i].address().len;
}
address_sorting_rfc_6724_sort(sortables, addresses->size());
- ServerAddressList sorted;
+ EndpointAddressesList sorted;
sorted.reserve(addresses->size());
for (size_t i = 0; i < addresses->size(); ++i) {
- sorted.emplace_back(*static_cast(sortables[i].user_data));
+ sorted.emplace_back(
+ *static_cast(sortables[i].user_data));
}
gpr_free(sortables);
*addresses = std::move(sorted);
@@ -620,7 +621,8 @@ void grpc_ares_complete_request_locked(grpc_ares_request* r)
// with no addresses along side it
}
if (r->balancer_addresses_out != nullptr) {
- ServerAddressList* balancer_addresses = r->balancer_addresses_out->get();
+ EndpointAddressesList* balancer_addresses =
+ r->balancer_addresses_out->get();
if (balancer_addresses != nullptr) {
grpc_cares_wrapper_address_sorting_sort(r, balancer_addresses);
}
@@ -667,12 +669,12 @@ static void on_hostbyname_done_locked(void* arg, int status, int /*timeouts*/,
GRPC_CARES_TRACE_LOG(
"request:%p on_hostbyname_done_locked qtype=%s host=%s ARES_SUCCESS", r,
hr->qtype, hr->host);
- std::unique_ptr* address_list_ptr =
+ std::unique_ptr* address_list_ptr =
hr->is_balancer ? r->balancer_addresses_out : r->addresses_out;
if (*address_list_ptr == nullptr) {
- *address_list_ptr = std::make_unique();
+ *address_list_ptr = std::make_unique();
}
- ServerAddressList& addresses = **address_list_ptr;
+ EndpointAddressesList& addresses = **address_list_ptr;
for (size_t i = 0; hostent->h_addr_list[i] != nullptr; ++i) {
grpc_core::ChannelArgs args;
if (hr->is_balancer) {
@@ -904,7 +906,7 @@ grpc_error_handle grpc_dns_lookup_ares_continued(
static bool inner_resolve_as_ip_literal_locked(
const char* name, const char* default_port,
- std::unique_ptr* addrs, std::string* host,
+ std::unique_ptr* addrs, std::string* host,
std::string* port, std::string* hostport) {
if (!grpc_core::SplitHostPort(name, host, port)) {
gpr_log(GPR_ERROR,
@@ -930,7 +932,7 @@ static bool inner_resolve_as_ip_literal_locked(
grpc_parse_ipv6_hostport(hostport->c_str(), &addr,
false /* log errors */)) {
GPR_ASSERT(*addrs == nullptr);
- *addrs = std::make_unique();
+ *addrs = std::make_unique