From c6a1c3b07902ac2896c5dfcc36e9080b8f0bca3d Mon Sep 17 00:00:00 2001 From: Cameron Fieber Date: Mon, 3 Jun 2019 09:14:02 -0700 Subject: [PATCH] fix(eureka): Handles duplicate eureka records (#3735) This adds deterministic handling when we get multiple records for the same instance id. Previously we would potentially generate two cache data objects, however since they both had the same id only one would be written (last wins style most likely). This now prefers a record in a 'worse' state (based on ordering the HealthState enum from worst to best) to ensure that an instance showing as Down is not accidentally considered ready for traffic during a deploy. If all records agree on health state then takes the most recently updated. --- .../clouddriver/model/HealthState.groovy | 6 +- .../eureka/model/EurekaInstance.groovy | 10 ++ .../provider/agent/EurekaCachingAgent.groovy | 49 +++++-- .../agent/EurekaCachingAgentSpec.groovy | 126 ++++++++++++++++++ 4 files changed, 176 insertions(+), 15 deletions(-) create mode 100644 clouddriver-eureka/src/test/groovy/com/netflix/spinnaker/clouddriver/eureka/provider/agent/EurekaCachingAgentSpec.groovy diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/model/HealthState.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/model/HealthState.groovy index 1e4c093ca6a..86ef34f5969 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/model/HealthState.groovy +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/model/HealthState.groovy @@ -17,5 +17,9 @@ package com.netflix.spinnaker.clouddriver.model enum HealthState { - Up, Down, Unknown, Starting, OutOfService, Succeeded, Failed + Failed, Down, OutOfService, Unknown, Starting, Succeeded, Up + + static HealthState fromString(String name) { + values().find { it.name().equalsIgnoreCase(name) } ?: Unknown + } } diff --git a/clouddriver-eureka/src/main/groovy/com/netflix/spinnaker/clouddriver/eureka/model/EurekaInstance.groovy b/clouddriver-eureka/src/main/groovy/com/netflix/spinnaker/clouddriver/eureka/model/EurekaInstance.groovy index 073a1947722..5592b2c2ae0 100644 --- a/clouddriver-eureka/src/main/groovy/com/netflix/spinnaker/clouddriver/eureka/model/EurekaInstance.groovy +++ b/clouddriver-eureka/src/main/groovy/com/netflix/spinnaker/clouddriver/eureka/model/EurekaInstance.groovy @@ -86,6 +86,16 @@ class EurekaInstance extends DiscoveryHealth { healthState = HealthState.Down } + // if this has an asgName and is not part of a titus task registration, + // prefer the app name derived from the asg name rather than the supplied + // app name. We index these records on application to associate them to + // a particular cluster, and with the name incorrect then the record will + // not be properly linked + if (metadata?.titusTaskId == null && asgName != null) { + def idx = asgName.indexOf('-') + def appFromAsg = idx == -1 ? asgName : asgName.substring(0, idx) + app = appFromAsg + } //the preferred instanceId value comes from DataCenterInfo Metadata // Jackson was doing some shenanigans whereby the top level registration diff --git a/clouddriver-eureka/src/main/groovy/com/netflix/spinnaker/clouddriver/eureka/provider/agent/EurekaCachingAgent.groovy b/clouddriver-eureka/src/main/groovy/com/netflix/spinnaker/clouddriver/eureka/provider/agent/EurekaCachingAgent.groovy index 5a899685e5a..d8eeba6031f 100644 --- a/clouddriver-eureka/src/main/groovy/com/netflix/spinnaker/clouddriver/eureka/provider/agent/EurekaCachingAgent.groovy +++ b/clouddriver-eureka/src/main/groovy/com/netflix/spinnaker/clouddriver/eureka/provider/agent/EurekaCachingAgent.groovy @@ -32,6 +32,7 @@ import com.netflix.spinnaker.clouddriver.eureka.api.EurekaApi import com.netflix.spinnaker.clouddriver.eureka.model.EurekaApplication import com.netflix.spinnaker.clouddriver.eureka.model.EurekaApplications import com.netflix.spinnaker.clouddriver.eureka.model.EurekaInstance +import com.netflix.spinnaker.clouddriver.model.HealthState import groovy.util.logging.Slf4j import static com.netflix.spinnaker.clouddriver.core.provider.agent.Namespace.HEALTH @@ -93,20 +94,15 @@ class EurekaCachingAgent implements CachingAgent, HealthProvidingCachingAgent, C log.info("Describing items in ${agentType}") EurekaApplications disco = eurekaApi.loadEurekaApplications() - Collection eurekaCacheData = new LinkedList() - Collection instanceCacheData = new LinkedList() + Map> instanceHealthRelationships = [:].withDefault { new HashSet() } + Map> eurekaInstances = [:].withDefault { [] } for (EurekaApplication application : disco.applications) { - Map> convertedInstancesById = ((List) objectMapper.convertValue( - application.instances.findAll { it.instanceId }, - new TypeReference>>() {} - )).collectEntries { - [it.instanceId, it] - } + List> instanceAttributes = objectMapper.convertValue(application.instances, + new TypeReference>>() {}) - for (EurekaInstance instance : application.instances) { - if (instance.instanceId) { - Map attributes = convertedInstancesById[instance.instanceId] + for (Map attributes : instanceAttributes) { + if (attributes.instanceId) { attributes.eurekaAccountName = eurekaAccountName attributes.allowMultipleEurekaPerAccount = allowMultipleEurekaPerAccount attributes.application = application.name.toLowerCase() @@ -116,22 +112,47 @@ class EurekaCachingAgent implements CachingAgent, HealthProvidingCachingAgent, C String instanceKey = provider.getInstanceKey(attributes, region) if (instanceKey) { String instanceHealthKey = provider.getInstanceHealthKey(attributes, region, healthId) + instanceHealthRelationships[instanceKey].add(instanceHealthKey) Map> healthRelationship = [(INSTANCES.ns): [instanceKey]] - Map> instanceRelationship = [(HEALTH.ns): [instanceHealthKey]] - eurekaCacheData.add(new DefaultCacheData(instanceHealthKey, attributes, healthRelationship)) - instanceCacheData.add(new DefaultCacheData(instanceKey, Collections.emptyMap(), instanceRelationship)) + eurekaInstances[instanceHealthKey].add(new DefaultCacheData(instanceHealthKey, attributes, healthRelationship)) } } } } } } + Collection instanceCacheData = instanceHealthRelationships.collect { instanceId, healths -> + new DefaultCacheData(instanceId, Collections.emptyMap(), [(HEALTH.ns): healths]) + } + + Set dupeDetected = [] + Collection eurekaCacheData = eurekaInstances.values().findResults { List cacheDatas -> + if (cacheDatas.size() == 1) { + return cacheDatas[0] + } + + cacheDatas.sort(new EurekaHealthComparator()) + def data = cacheDatas.first() + dupeDetected.add(data.id) + return data + } + if (dupeDetected) { + log.warn("Duplicate eureka records found for instances: $dupeDetected") + } log.info("Caching ${eurekaCacheData.size()} items in ${agentType}") new DefaultCacheResult( (INSTANCES.ns): instanceCacheData, (HEALTH.ns): eurekaCacheData) } + private static class EurekaHealthComparator implements Comparator { + @Override + int compare(CacheData a, CacheData b) { + return HealthState.fromString(a.attributes.state) <=> HealthState.fromString(b.attributes.state) ?: + (Long) b.attributes.lastUpdatedTimestamp <=> (Long) a.attributes.lastUpdatedTimestamp + } + } + @Override long getPollIntervalMillis() { return pollIntervalMillis diff --git a/clouddriver-eureka/src/test/groovy/com/netflix/spinnaker/clouddriver/eureka/provider/agent/EurekaCachingAgentSpec.groovy b/clouddriver-eureka/src/test/groovy/com/netflix/spinnaker/clouddriver/eureka/provider/agent/EurekaCachingAgentSpec.groovy new file mode 100644 index 00000000000..1f2457641eb --- /dev/null +++ b/clouddriver-eureka/src/test/groovy/com/netflix/spinnaker/clouddriver/eureka/provider/agent/EurekaCachingAgentSpec.groovy @@ -0,0 +1,126 @@ +package com.netflix.spinnaker.clouddriver.eureka.provider.agent + +import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.spinnaker.cats.provider.ProviderCache +import com.netflix.spinnaker.clouddriver.eureka.api.EurekaApi +import com.netflix.spinnaker.clouddriver.eureka.model.DataCenterInfo +import com.netflix.spinnaker.clouddriver.eureka.model.DataCenterMetadata +import com.netflix.spinnaker.clouddriver.eureka.model.EurekaApplication +import com.netflix.spinnaker.clouddriver.eureka.model.EurekaApplications +import com.netflix.spinnaker.clouddriver.eureka.model.EurekaInstance +import com.netflix.spinnaker.clouddriver.model.HealthState +import spock.lang.Specification + +import static com.netflix.spinnaker.clouddriver.core.provider.agent.Namespace.HEALTH +import static com.netflix.spinnaker.clouddriver.core.provider.agent.Namespace.INSTANCES + +class EurekaCachingAgentSpec extends Specification { + def providerCache = Stub(ProviderCache) + def eurekaApi = Stub(EurekaApi) + def eap = new TestEurekaAwareProvider() + + def agent = new EurekaCachingAgent(eurekaApi, "us-foo-2", new ObjectMapper(), "http://eureka", "true", "eureka-foo", [eap], 0, 0) + + def "it should cache instances"() { + given: + eurekaApi.loadEurekaApplications() >> new EurekaApplications(applications: [ + new EurekaApplication(name: "foo", instances: [ + instance("foo", "i-1", "UP"), + instance("foo", "i-2", "UP") + ]) + ]) + + when: + def result = agent.loadData(providerCache) + + then: + result.cacheResults.size() == 2 + result.cacheResults[HEALTH.ns].size() == 2 + result.cacheResults[INSTANCES.ns].size() == 2 + result.cacheResults[HEALTH.ns]*.id.sort() == ["us-foo-2:i-1:Discovery", "us-foo-2:i-2:Discovery"] + result.cacheResults[INSTANCES.ns]*.id.sort() == ["us-foo-2:i-1", "us-foo-2:i-2"] + } + + def "it should dedupe multiple discovery records prefering HealthState order"() { + given: + eurekaApi.loadEurekaApplications() >> new EurekaApplications(applications: [ + new EurekaApplication(name: "foo", instances: [ + instance("foo", "i-1", "UP"), + instance("foo", "i-1", "DOWN") + ]) + ]) + + when: + def result = agent.loadData(providerCache) + + then: + result.cacheResults.size() == 2 + result.cacheResults[HEALTH.ns].size() == 1 + result.cacheResults[INSTANCES.ns].size() == 1 + result.cacheResults[HEALTH.ns].first().attributes.state == HealthState.Down.name() + + } + + def "it should dedupe multiple discovery records preferring newest"() { + given: + eurekaApi.loadEurekaApplications() >> new EurekaApplications(applications: [ + new EurekaApplication(name: "foo", instances: [ + instance("foo", "i-1", "UP", 12345), + instance("foo", "i-1", "UP", 23451), + instance("foo", "i-1", "UP", 12344) + ]) + ]) + + when: + def result = agent.loadData(providerCache) + + then: + result.cacheResults.size() == 2 + result.cacheResults[HEALTH.ns].size() == 1 + result.cacheResults[INSTANCES.ns].size() == 1 + result.cacheResults[HEALTH.ns].first().attributes.lastUpdatedTimestamp == 23451 + + } + + private static EurekaInstance instance(String app, String id, String status, Long timestamp = System.currentTimeMillis()) { + EurekaInstance.buildInstance( + "host", + app, + "127.0.0.1", + status, + "UNKNOWN", + new DataCenterInfo( + name: "my-dc", + metadata: new DataCenterMetadata( + accountId: "foo", + availabilityZone: "us-foo-2a", + amiId: "ami-foo", + instanceId: id, + instanceType: "m3.megabig")), + "/status", + "/healthcheck", + id, + id, + timestamp, + "$app-v000", + null, + id) + } + + static class TestEurekaAwareProvider implements EurekaAwareProvider { + @Override + Boolean isProviderForEurekaRecord(Map attributes) { + return true + } + + @Override + String getInstanceKey(Map attributes, String region) { + return "$region:$attributes.instanceId" + } + + @Override + String getInstanceHealthKey(Map attributes, String region, String healthId) { + return "$region:$attributes.instanceId:$healthId" + } + } +}