Skip to content

Commit

Permalink
upstream: rebuild cluster when health check config is changed (envoyp…
Browse files Browse the repository at this point in the history
…roxy#4075)

This PR makes sure changing health check address triggers rebuilding the cluster.

Risk Level: medium
Testing: unit, manual as depicted by @bplotnick in envoyproxy#4070.
Docs Changes: N/A
Release Notes: N/A

Fixes envoyproxy#4070

Signed-off-by: Dhi Aurrahman <dio@tetrate.io>
  • Loading branch information
dio authored and htuch committed Aug 17, 2018
1 parent 05c0d52 commit ba40cc9
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 2 deletions.
17 changes: 15 additions & 2 deletions source/common/upstream/upstream_impl.cc
Expand Up @@ -905,9 +905,22 @@ bool BaseDynamicClusterImpl::updateDynamicHostList(
continue;
}

// To match a new host with an existing host means comparing their addresses.
auto existing_host = all_hosts_.find(host->address()->asString());

if (existing_host != all_hosts_.end()) {
const bool existing_host_found = existing_host != all_hosts_.end();

// Check if in-place host update should be skipped, i.e. when the following criteria are met
// (currently there is only one criterion, but we might add more in the future):
// - The cluster health checker is activated and a new host is matched with the existing one,
// but the health check address is different.
const bool skip_inplace_host_update =
health_checker_ != nullptr && existing_host_found &&
*existing_host->second->healthCheckAddress() != *host->healthCheckAddress();

// When there is a match and we decided to do in-place update, we potentially update the host's
// health check flag and metadata. Afterwards, the host is pushed back into the final_hosts,
// i.e. hosts that should be preserved in the current priority.
if (existing_host_found && !skip_inplace_host_update) {
existing_hosts_for_current_priority.emplace(existing_host->first);
// If we find a host matched based on address, we keep it. However we do change weight inline
// so do that here.
Expand Down
161 changes: 161 additions & 0 deletions test/common/upstream/eds_test.cc
Expand Up @@ -75,6 +75,89 @@ class EdsTest : public testing::Test {
NiceMock<LocalInfo::MockLocalInfo> local_info_;
};

class EdsWithHealthCheckUpdateTest : public EdsTest {
protected:
EdsWithHealthCheckUpdateTest() {}

// Build the initial cluster with some endpoints.
void initializeCluster(const std::vector<uint32_t> endpoint_ports,
const bool drain_connections_on_host_removal) {
resetCluster(drain_connections_on_host_removal);

auto health_checker = std::make_shared<MockHealthChecker>();
EXPECT_CALL(*health_checker, start());
EXPECT_CALL(*health_checker, addHostCheckCompleteCb(_)).Times(2);
cluster_->setHealthChecker(health_checker);

cluster_load_assignment_ = resources_.Add();
cluster_load_assignment_->set_cluster_name("fare");

for (const auto& port : endpoint_ports) {
addEndpoint(port);
}

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources_, ""));

// Make sure the cluster is rebuilt.
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());
{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));

// Mark the hosts as healthy
hosts[0]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
hosts[1]->healthFlagClear(Host::HealthFlag::FAILED_ACTIVE_HC);
}
}

void resetCluster(const bool drain_connections_on_host_removal) {
const std::string config = R"EOF(
name: name
connect_timeout: 0.25s
type: EDS
lb_policy: ROUND_ROBIN
drain_connections_on_host_removal: {}
eds_cluster_config:
service_name: fare
eds_config:
api_config_source:
cluster_names:
- eds
refresh_delay: 1s
)EOF";
EdsTest::resetCluster(fmt::format(config, drain_connections_on_host_removal));
}

void addEndpoint(const uint32_t port) {
auto* endpoints = cluster_load_assignment_->add_endpoints();
auto* socket_address = endpoints->add_lb_endpoints()
->mutable_endpoint()
->mutable_address()
->mutable_socket_address();
socket_address->set_address("1.2.3.4");
socket_address->set_port_value(port);
}

void updateEndpointHealthCheckPortAtIndex(const uint32_t index, const uint32_t port) {
cluster_load_assignment_->mutable_endpoints(index)
->mutable_lb_endpoints(0)
->mutable_endpoint()
->mutable_health_check_config()
->set_port_value(port);

VERBOSE_EXPECT_NO_THROW(cluster_->onConfigUpdate(resources_, ""));

// Always rebuild if health check config is changed.
EXPECT_EQ(0UL, stats_.counter("cluster.name.update_no_rebuild").value());
}

Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources_;
envoy::api::v2::ClusterLoadAssignment* cluster_load_assignment_;
};

// Negative test for protoc-gen-validate constraints.
TEST_F(EdsTest, ValidateFail) {
Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources;
Expand Down Expand Up @@ -1152,6 +1235,84 @@ TEST_F(EdsTest, PriorityAndLocalityWeighted) {
EXPECT_EQ(1UL, stats_.counter("cluster.name.update_no_rebuild").value());
}

TEST_F(EdsWithHealthCheckUpdateTest, EndpointUpdateHealthCheckConfig) {
const std::vector<uint32_t> endpoint_ports = {80, 81};
const uint32_t new_health_check_port = 8000;

// Initialize the cluster with two endpoints without draining connections on host removal.
initializeCluster(endpoint_ports, false);

updateEndpointHealthCheckPortAtIndex(0, new_health_check_port);
{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 3);
// Make sure the first endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

EXPECT_NE(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
EXPECT_NE(new_health_check_port, hosts[2]->healthCheckAddress()->ip()->port());
EXPECT_EQ(endpoint_ports[1], hosts[1]->healthCheckAddress()->ip()->port());
EXPECT_EQ(endpoint_ports[0], hosts[2]->healthCheckAddress()->ip()->port());

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));

// The old hosts are still active. The health checker continues to do health checking to these
// hosts, until they are removed.
EXPECT_FALSE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_FALSE(hosts[2]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
}

updateEndpointHealthCheckPortAtIndex(1, new_health_check_port);
{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 4);
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

// Make sure the second endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());

EXPECT_EQ(endpoint_ports[1], hosts[2]->healthCheckAddress()->ip()->port());
EXPECT_EQ(endpoint_ports[0], hosts[3]->healthCheckAddress()->ip()->port());

EXPECT_TRUE(hosts[0]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_TRUE(hosts[1]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));

// The old hosts are still active.
EXPECT_FALSE(hosts[2]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
EXPECT_FALSE(hosts[3]->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
}
}

TEST_F(EdsWithHealthCheckUpdateTest, EndpointUpdateHealthCheckConfigWithDrainConnectionsOnRemoval) {
const std::vector<uint32_t> endpoint_ports = {80, 81};
const uint32_t new_health_check_port = 8000;

// Initialize the cluster with two endpoints with draining connections on host removal.
initializeCluster(endpoint_ports, true);

updateEndpointHealthCheckPortAtIndex(0, new_health_check_port);
{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
// Since drain_connections_on_host_removal is set to true, the old hosts are removed
// immediately.
EXPECT_EQ(hosts.size(), 2);
// Make sure the first endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

EXPECT_NE(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
}

updateEndpointHealthCheckPortAtIndex(1, new_health_check_port);
{
auto& hosts = cluster_->prioritySet().hostSetsPerPriority()[0]->hosts();
EXPECT_EQ(hosts.size(), 2);
EXPECT_EQ(new_health_check_port, hosts[0]->healthCheckAddress()->ip()->port());

// Make sure the second endpoint health check port is updated.
EXPECT_EQ(new_health_check_port, hosts[1]->healthCheckAddress()->ip()->port());
}
}

// Throw on adding a new resource with an invalid endpoint (since the given address is invalid).
TEST_F(EdsTest, MalformedIP) {
Protobuf::RepeatedPtrField<envoy::api::v2::ClusterLoadAssignment> resources;
Expand Down

0 comments on commit ba40cc9

Please sign in to comment.