Skip to content

Commit

Permalink
fix(eureka): Handles duplicate eureka records (#3735)
Browse files Browse the repository at this point in the history
This adds deterministic handling when we get multiple records for the
same instance id. Previously we would potentially generate two cache
data objects, however since they both had the same id only one would be
written (last wins style most likely).

This now prefers a record in a 'worse' state (based on ordering the
HealthState enum from worst to best) to ensure that an instance showing
as Down is not accidentally considered ready for traffic during a
deploy.

If all records agree on health state then takes the most recently
updated.
  • Loading branch information
cfieber committed Jun 3, 2019
1 parent ffdae2a commit c6a1c3b
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@
package com.netflix.spinnaker.clouddriver.model

enum HealthState {
Up, Down, Unknown, Starting, OutOfService, Succeeded, Failed
Failed, Down, OutOfService, Unknown, Starting, Succeeded, Up

static HealthState fromString(String name) {
values().find { it.name().equalsIgnoreCase(name) } ?: Unknown
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ class EurekaInstance extends DiscoveryHealth {
healthState = HealthState.Down
}

// if this has an asgName and is not part of a titus task registration,
// prefer the app name derived from the asg name rather than the supplied
// app name. We index these records on application to associate them to
// a particular cluster, and with the name incorrect then the record will
// not be properly linked
if (metadata?.titusTaskId == null && asgName != null) {
def idx = asgName.indexOf('-')
def appFromAsg = idx == -1 ? asgName : asgName.substring(0, idx)
app = appFromAsg
}

//the preferred instanceId value comes from DataCenterInfo Metadata
// Jackson was doing some shenanigans whereby the top level registration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import com.netflix.spinnaker.clouddriver.eureka.api.EurekaApi
import com.netflix.spinnaker.clouddriver.eureka.model.EurekaApplication
import com.netflix.spinnaker.clouddriver.eureka.model.EurekaApplications
import com.netflix.spinnaker.clouddriver.eureka.model.EurekaInstance
import com.netflix.spinnaker.clouddriver.model.HealthState
import groovy.util.logging.Slf4j

import static com.netflix.spinnaker.clouddriver.core.provider.agent.Namespace.HEALTH
Expand Down Expand Up @@ -93,20 +94,15 @@ class EurekaCachingAgent implements CachingAgent, HealthProvidingCachingAgent, C
log.info("Describing items in ${agentType}")
EurekaApplications disco = eurekaApi.loadEurekaApplications()

Collection<CacheData> eurekaCacheData = new LinkedList<CacheData>()
Collection<CacheData> instanceCacheData = new LinkedList<CacheData>()
Map<String, Set<String>> instanceHealthRelationships = [:].withDefault { new HashSet<String>() }
Map<String, List<CacheData>> eurekaInstances = [:].withDefault { [] }

for (EurekaApplication application : disco.applications) {
Map<String, Map<String, Object>> convertedInstancesById = ((List<Map>) objectMapper.convertValue(
application.instances.findAll { it.instanceId },
new TypeReference<List<Map<String, Object>>>() {}
)).collectEntries {
[it.instanceId, it]
}
List<Map<String, Object>> instanceAttributes = objectMapper.convertValue(application.instances,
new TypeReference<List<Map<String, Object>>>() {})

for (EurekaInstance instance : application.instances) {
if (instance.instanceId) {
Map<String, Object> attributes = convertedInstancesById[instance.instanceId]
for (Map<String, Object> attributes : instanceAttributes) {
if (attributes.instanceId) {
attributes.eurekaAccountName = eurekaAccountName
attributes.allowMultipleEurekaPerAccount = allowMultipleEurekaPerAccount
attributes.application = application.name.toLowerCase()
Expand All @@ -116,22 +112,47 @@ class EurekaCachingAgent implements CachingAgent, HealthProvidingCachingAgent, C
String instanceKey = provider.getInstanceKey(attributes, region)
if (instanceKey) {
String instanceHealthKey = provider.getInstanceHealthKey(attributes, region, healthId)
instanceHealthRelationships[instanceKey].add(instanceHealthKey)
Map<String, Collection<String>> healthRelationship = [(INSTANCES.ns): [instanceKey]]
Map<String, Collection<String>> instanceRelationship = [(HEALTH.ns): [instanceHealthKey]]
eurekaCacheData.add(new DefaultCacheData(instanceHealthKey, attributes, healthRelationship))
instanceCacheData.add(new DefaultCacheData(instanceKey, Collections.emptyMap(), instanceRelationship))
eurekaInstances[instanceHealthKey].add(new DefaultCacheData(instanceHealthKey, attributes, healthRelationship))
}
}
}
}
}
}
Collection<CacheData> instanceCacheData = instanceHealthRelationships.collect { instanceId, healths ->
new DefaultCacheData(instanceId, Collections.emptyMap(), [(HEALTH.ns): healths])
}

Set<String> dupeDetected = []
Collection<CacheData> eurekaCacheData = eurekaInstances.values().findResults { List<CacheData> cacheDatas ->
if (cacheDatas.size() == 1) {
return cacheDatas[0]
}

cacheDatas.sort(new EurekaHealthComparator())
def data = cacheDatas.first()
dupeDetected.add(data.id)
return data
}
if (dupeDetected) {
log.warn("Duplicate eureka records found for instances: $dupeDetected")
}
log.info("Caching ${eurekaCacheData.size()} items in ${agentType}")
new DefaultCacheResult(
(INSTANCES.ns): instanceCacheData,
(HEALTH.ns): eurekaCacheData)
}

private static class EurekaHealthComparator implements Comparator<CacheData> {
@Override
int compare(CacheData a, CacheData b) {
return HealthState.fromString(a.attributes.state) <=> HealthState.fromString(b.attributes.state) ?:
(Long) b.attributes.lastUpdatedTimestamp <=> (Long) a.attributes.lastUpdatedTimestamp
}
}

@Override
long getPollIntervalMillis() {
return pollIntervalMillis
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package com.netflix.spinnaker.clouddriver.eureka.provider.agent

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spinnaker.cats.provider.ProviderCache
import com.netflix.spinnaker.clouddriver.eureka.api.EurekaApi
import com.netflix.spinnaker.clouddriver.eureka.model.DataCenterInfo
import com.netflix.spinnaker.clouddriver.eureka.model.DataCenterMetadata
import com.netflix.spinnaker.clouddriver.eureka.model.EurekaApplication
import com.netflix.spinnaker.clouddriver.eureka.model.EurekaApplications
import com.netflix.spinnaker.clouddriver.eureka.model.EurekaInstance
import com.netflix.spinnaker.clouddriver.model.HealthState
import spock.lang.Specification

import static com.netflix.spinnaker.clouddriver.core.provider.agent.Namespace.HEALTH
import static com.netflix.spinnaker.clouddriver.core.provider.agent.Namespace.INSTANCES

class EurekaCachingAgentSpec extends Specification {
def providerCache = Stub(ProviderCache)
def eurekaApi = Stub(EurekaApi)
def eap = new TestEurekaAwareProvider()

def agent = new EurekaCachingAgent(eurekaApi, "us-foo-2", new ObjectMapper(), "http://eureka", "true", "eureka-foo", [eap], 0, 0)

def "it should cache instances"() {
given:
eurekaApi.loadEurekaApplications() >> new EurekaApplications(applications: [
new EurekaApplication(name: "foo", instances: [
instance("foo", "i-1", "UP"),
instance("foo", "i-2", "UP")
])
])

when:
def result = agent.loadData(providerCache)

then:
result.cacheResults.size() == 2
result.cacheResults[HEALTH.ns].size() == 2
result.cacheResults[INSTANCES.ns].size() == 2
result.cacheResults[HEALTH.ns]*.id.sort() == ["us-foo-2:i-1:Discovery", "us-foo-2:i-2:Discovery"]
result.cacheResults[INSTANCES.ns]*.id.sort() == ["us-foo-2:i-1", "us-foo-2:i-2"]
}

def "it should dedupe multiple discovery records prefering HealthState order"() {
given:
eurekaApi.loadEurekaApplications() >> new EurekaApplications(applications: [
new EurekaApplication(name: "foo", instances: [
instance("foo", "i-1", "UP"),
instance("foo", "i-1", "DOWN")
])
])

when:
def result = agent.loadData(providerCache)

then:
result.cacheResults.size() == 2
result.cacheResults[HEALTH.ns].size() == 1
result.cacheResults[INSTANCES.ns].size() == 1
result.cacheResults[HEALTH.ns].first().attributes.state == HealthState.Down.name()

}

def "it should dedupe multiple discovery records preferring newest"() {
given:
eurekaApi.loadEurekaApplications() >> new EurekaApplications(applications: [
new EurekaApplication(name: "foo", instances: [
instance("foo", "i-1", "UP", 12345),
instance("foo", "i-1", "UP", 23451),
instance("foo", "i-1", "UP", 12344)
])
])

when:
def result = agent.loadData(providerCache)

then:
result.cacheResults.size() == 2
result.cacheResults[HEALTH.ns].size() == 1
result.cacheResults[INSTANCES.ns].size() == 1
result.cacheResults[HEALTH.ns].first().attributes.lastUpdatedTimestamp == 23451

}

private static EurekaInstance instance(String app, String id, String status, Long timestamp = System.currentTimeMillis()) {
EurekaInstance.buildInstance(
"host",
app,
"127.0.0.1",
status,
"UNKNOWN",
new DataCenterInfo(
name: "my-dc",
metadata: new DataCenterMetadata(
accountId: "foo",
availabilityZone: "us-foo-2a",
amiId: "ami-foo",
instanceId: id,
instanceType: "m3.megabig")),
"/status",
"/healthcheck",
id,
id,
timestamp,
"$app-v000",
null,
id)
}

static class TestEurekaAwareProvider implements EurekaAwareProvider {
@Override
Boolean isProviderForEurekaRecord(Map<String, Object> attributes) {
return true
}

@Override
String getInstanceKey(Map<String, Object> attributes, String region) {
return "$region:$attributes.instanceId"
}

@Override
String getInstanceHealthKey(Map<String, Object> attributes, String region, String healthId) {
return "$region:$attributes.instanceId:$healthId"
}
}
}

0 comments on commit c6a1c3b

Please sign in to comment.