Skip to content

Commit

Permalink
fix(eureka): Skip instances that are already UP in discovery, or stil…
Browse files Browse the repository at this point in the history
…l starting (#4733)

* fix(eureka): Skip instances that are already UP in discovery

* fix(pr): Fix tests

* fix(pr): Rename DiscoveryStatus.DOWN to OUT_OF_SERVICE

* fix(pr): Avoid double-negative when deciding to skip resetting discovery status

Co-authored-by: Daniel Reynaud <dreynaud@netflix.com>

Co-authored-by: Daniel Reynaud <dreynaud@netflix.com>
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Jul 10, 2020
1 parent ae25bca commit 752bebc
Show file tree
Hide file tree
Showing 11 changed files with 116 additions and 56 deletions.
Expand Up @@ -200,7 +200,7 @@ abstract class AbstractEnableDisableAtomicOperation implements AtomicOperation<V

// eureka registration
if (credentials.discoveryEnabled && instanceIds) {
def status = disable ? AbstractEurekaSupport.DiscoveryStatus.Disable : AbstractEurekaSupport.DiscoveryStatus.Enable
def status = disable ? AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE : AbstractEurekaSupport.DiscoveryStatus.UP
task.updateStatus phaseName, "Marking ASG $serverGroupName as $status with Discovery"

def enableDisableInstanceDiscoveryDescription = new EnableDisableInstanceDiscoveryDescription(
Expand Down
Expand Up @@ -68,7 +68,7 @@ abstract class AbstractEnableDisableInstanceDiscoveryAtomicOperation implements
return
}

def status = isEnable() ? AbstractEurekaSupport.DiscoveryStatus.Enable : AbstractEurekaSupport.DiscoveryStatus.Disable
def status = isEnable() ? AbstractEurekaSupport.DiscoveryStatus.UP : AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE
discoverySupport.updateDiscoveryStatusForInstances(
description, task, phaseName, status, instancesInAsg*.instanceId, true
)
Expand Down
Expand Up @@ -215,7 +215,7 @@ private boolean updateInstanceStatus(Eureka eureka, String app, String instanceI
while (retry < properties.getEurekaUpdateStatusRetryMax()) {
retry++;
try {
eureka.updateInstanceStatus(app, instanceId, DiscoveryStatus.Disable.getValue());
eureka.updateInstanceStatus(app, instanceId, DiscoveryStatus.OUT_OF_SERVICE.getValue());
return true;
} catch (RetrofitError e) {
final String recoverableMessage =
Expand Down
Expand Up @@ -83,10 +83,11 @@ class EnableAsgAtomicOperationUnitSpec extends EnableDisableAtomicOperationUnitS
1 * eureka.getInstanceInfo('i1') >>
[
instance: [
app: "asg1"
app: "asg1",
status: "OUT_OF_SERVICE"
]
]
1 * eureka.resetInstanceStatus('asg1', 'i1', AbstractEurekaSupport.DiscoveryStatus.Disable.value)
1 * eureka.resetInstanceStatus('asg1', 'i1', AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value)
}

def 'should skip discovery if not enabled for account'() {
Expand Down
Expand Up @@ -100,7 +100,7 @@ class DiscoverySupportUnitSpec extends Specification {

when:
discoverySupport.updateDiscoveryStatusForInstances(
description, task, "phase", AbstractEurekaSupport.DiscoveryStatus.Disable, instances
description, task, "phase", AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE, instances
)

then:
Expand Down Expand Up @@ -150,7 +150,7 @@ class DiscoverySupportUnitSpec extends Specification {

when:
discoverySupport.updateDiscoveryStatusForInstances(
description, task, "phase", AbstractEurekaSupport.DiscoveryStatus.Disable, instances
description, task, "phase", AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE, instances
)

then:
Expand All @@ -177,19 +177,20 @@ class DiscoverySupportUnitSpec extends Specification {
1 * eureka.getInstanceInfo(_) >>
[
instance: [
app: appName
app: appName,
status: "OUT_OF_SERVICE"
]
]

0 * task.fail()
instanceIds.each {
1 * eureka.resetInstanceStatus(appName, it, AbstractEurekaSupport.DiscoveryStatus.Disable.value) >> response(200)
1 * eureka.resetInstanceStatus(appName, it, AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value) >> response(200)
}

where:
discoveryUrl = "http://us-west-1.discovery.netflix.net"
region = "us-west-1"
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.Enable
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.UP
appName = "kato"
instanceIds = ["i-123", "i-456"]
}
Expand All @@ -208,10 +209,10 @@ class DiscoverySupportUnitSpec extends Specification {
then:
task.getStatus() >> new DefaultTaskStatus(state: TaskState.STARTED)
1 * task.fail()
1 * eureka.getInstanceInfo(_) >> [ instance: [ app: appName ] ]
1 * eureka.resetInstanceStatus(appName, "bad", AbstractEurekaSupport.DiscoveryStatus.Disable.value) >> httpError(500)
1 * eureka.resetInstanceStatus(appName, "good", AbstractEurekaSupport.DiscoveryStatus.Disable.value) >> response(200)
1 * eureka.resetInstanceStatus(appName, "also-bad", AbstractEurekaSupport.DiscoveryStatus.Disable.value) >> httpError(500)
1 * eureka.getInstanceInfo(_) >> [ instance: [ app: appName, status: "OUT_OF_SERVICE" ] ]
1 * eureka.resetInstanceStatus(appName, "bad", AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value) >> httpError(500)
1 * eureka.resetInstanceStatus(appName, "good", AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value) >> response(200)
1 * eureka.resetInstanceStatus(appName, "also-bad", AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value) >> httpError(500)
1 * task.updateStatus("PHASE", { it.startsWith("Looking up discovery") })
3 * task.updateStatus("PHASE", { it.startsWith("Attempting to mark") })
1 * task.updateStatus("PHASE", { it.startsWith("Failed marking instances 'UP'")})
Expand All @@ -220,7 +221,7 @@ class DiscoverySupportUnitSpec extends Specification {
where:
discoveryUrl = "http://us-west-1.discovery.netflix.net"
region = "us-west-1"
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.Enable
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.UP
appName = "kato"
instanceIds = ["good", "bad", "also-bad"]

Expand All @@ -240,10 +241,10 @@ class DiscoverySupportUnitSpec extends Specification {

then:
task.getStatus() >> new DefaultTaskStatus(state: TaskState.STARTED)
1 * eureka.getInstanceInfo(_) >> [ instance: [ app: appName ] ]
1 * eureka.resetInstanceStatus(appName, "bad", AbstractEurekaSupport.DiscoveryStatus.Disable.value) >> httpError(500)
1 * eureka.resetInstanceStatus(appName, "good", AbstractEurekaSupport.DiscoveryStatus.Disable.value) >> response(200)
1 * eureka.resetInstanceStatus(appName, "also-bad", AbstractEurekaSupport.DiscoveryStatus.Disable.value) >> httpError(500)
1 * eureka.getInstanceInfo(_) >> [ instance: [ app: appName, status: "OUT_OF_SERVICE" ] ]
1 * eureka.resetInstanceStatus(appName, "bad", AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value) >> httpError(500)
1 * eureka.resetInstanceStatus(appName, "good", AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value) >> response(200)
1 * eureka.resetInstanceStatus(appName, "also-bad", AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value) >> httpError(500)
1 * task.updateStatus("PHASE", { it.startsWith("Looking up discovery") })
3 * task.updateStatus("PHASE", { it.startsWith("Attempting to mark") })
0 * task.updateStatus("PHASE", { it.startsWith("Failed marking instances 'UP'")})
Expand All @@ -253,7 +254,7 @@ class DiscoverySupportUnitSpec extends Specification {
where:
discoveryUrl = "http://us-west-1.discovery.netflix.net"
region = "us-west-1"
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.Enable
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.UP
appName = "kato"
instanceIds = ["good", "bad", "also-bad"]

Expand All @@ -273,10 +274,10 @@ class DiscoverySupportUnitSpec extends Specification {

then:
task.getStatus() >> new DefaultTaskStatus(state: TaskState.STARTED)
1 * eureka.getInstanceInfo(_) >> [ instance: [ app: appName ] ]
1 * eureka.resetInstanceStatus(appName, "bad", AbstractEurekaSupport.DiscoveryStatus.Disable.value) >> httpError(500)
1 * eureka.resetInstanceStatus(appName, "good", AbstractEurekaSupport.DiscoveryStatus.Disable.value) >> response(200)
1 * eureka.resetInstanceStatus(appName, "also-bad", AbstractEurekaSupport.DiscoveryStatus.Disable.value) >> httpError(500)
1 * eureka.getInstanceInfo(_) >> [ instance: [ app: appName, status: "OUT_OF_SERVICE" ] ]
1 * eureka.resetInstanceStatus(appName, "bad", AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value) >> httpError(500)
1 * eureka.resetInstanceStatus(appName, "good", AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value) >> response(200)
1 * eureka.resetInstanceStatus(appName, "also-bad", AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value) >> httpError(500)
1 * task.updateStatus("PHASE", { it.startsWith("Looking up discovery") })
3 * task.updateStatus("PHASE", { it.startsWith("Attempting to mark") })
1 * task.updateStatus("PHASE", { it.startsWith("Failed marking instances 'UP'")})
Expand All @@ -286,7 +287,7 @@ class DiscoverySupportUnitSpec extends Specification {
where:
discoveryUrl = "http://us-west-1.discovery.netflix.net"
region = "us-west-1"
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.Enable
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.UP
appName = "kato"
instanceIds = ["good", "bad", "also-bad"]

Expand All @@ -312,19 +313,17 @@ class DiscoverySupportUnitSpec extends Specification {
} >>
[
instance: [
app: appName
app: appName,
status: "UNKNOWN"
]
]
instanceIds.each {
1 * eureka.resetInstanceStatus(appName, it, AbstractEurekaSupport.DiscoveryStatus.Disable.value) >> response(200)
}

where:
failure << [httpError(404), httpError(406), httpError(503), amazonError(503)]

discoveryUrl = "http://us-west-1.discovery.netflix.net"
region = "us-west-1"
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.Enable
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.UP
appName = "kato"
instanceIds = ["i-123"]

Expand Down Expand Up @@ -352,7 +351,7 @@ class DiscoverySupportUnitSpec extends Specification {
where:
discoveryUrl = "http://us-west-1.discovery.netflix.net"
region = "us-west-1"
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.Enable
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.UP
appName = "kato"
instanceIds = ["i-123"]
}
Expand All @@ -372,24 +371,25 @@ class DiscoverySupportUnitSpec extends Specification {
1 * eureka.getInstanceInfo('i-123') >>
[
instance: [
app: appName
app: appName,
status: "OUT_OF_SERVICE"
]
]
3 * eureka.resetInstanceStatus(appName, 'i-123', AbstractEurekaSupport.DiscoveryStatus.Disable.value) >>> [response(302), response(201), response(200)]
3 * eureka.resetInstanceStatus(appName, 'i-123', AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value) >>> [response(302), response(201), response(200)]
4 * task.getStatus() >> new DefaultTaskStatus(state: TaskState.STARTED)
0 * task.fail()

where:
discoveryUrl = "http://us-west-1.discovery.netflix.net"
region = "us-west-1"
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.Enable
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.UP
appName = "kato"
instanceIds = ["i-123"]
}

void "should NOT fail disable operation if instance is not found"() {
given:
def status = AbstractEurekaSupport.DiscoveryStatus.Disable
def status = AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE
def task = Mock(Task)
def description = new EnableDisableInstanceDiscoveryDescription(
region: 'us-east-1',
Expand Down Expand Up @@ -430,12 +430,13 @@ class DiscoverySupportUnitSpec extends Specification {
1 * eureka.getInstanceInfo(_) >>
[
instance: [
app: appName
app: appName,
status: "OUT_OF_SERVICE"
]
]
1 * task.fail()
instanceIds.eachWithIndex { it, idx ->
1 * eureka.resetInstanceStatus(appName, it, AbstractEurekaSupport.DiscoveryStatus.Disable.value) >> {
1 * eureka.resetInstanceStatus(appName, it, AbstractEurekaSupport.DiscoveryStatus.OUT_OF_SERVICE.value) >> {
if (!result[idx]) {
throw new RuntimeException("blammo")
}
Expand All @@ -446,7 +447,7 @@ class DiscoverySupportUnitSpec extends Specification {
where:
discoveryUrl = "http://us-west-1.discovery.netflix.net"
region = "us-west-1"
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.Enable
discoveryStatus = AbstractEurekaSupport.DiscoveryStatus.UP
appName = "kato"
instanceIds = ["i-123", "i-345", "i-456"]
result = [true, false, true]
Expand Down
Expand Up @@ -70,8 +70,8 @@ class EnableDisableInstancesInDiscoveryAtomicOperationUnitSpec extends Specifica

where:
operation || expectedDiscoveryStatus
new EnableInstancesInDiscoveryAtomicOperation(description) || DiscoveryStatus.Enable
new DisableInstancesInDiscoveryAtomicOperation(description) || DiscoveryStatus.Disable
new EnableInstancesInDiscoveryAtomicOperation(description) || DiscoveryStatus.UP
new DisableInstancesInDiscoveryAtomicOperation(description) || DiscoveryStatus.OUT_OF_SERVICE
}

@Unroll
Expand Down
Expand Up @@ -136,7 +136,7 @@ class InstanceTerminationLifecycleWorkerSpec extends Specification {
1 * accountCredentialsProvider.getAll() >> [mgmtCredentials, testCredentials]
1 * awsEurekaSupportProvider.get() >> awsEurekaSupport
1 * awsEurekaSupport.getEureka(_, 'us-west-2') >> eureka
1 * eureka.updateInstanceStatus('clouddriver', 'i-1234', DiscoveryStatus.Disable.value)
1 * eureka.updateInstanceStatus('clouddriver', 'i-1234', DiscoveryStatus.OUT_OF_SERVICE.value)
}

def 'should process both sns and sqs messages'() {
Expand Down
Expand Up @@ -73,27 +73,30 @@ abstract class AbstractEurekaSupport {

def eureka = getEureka(description.credentials, description.region)
def random = new Random()
def instanceDetails = null
def applicationName = null
def targetHealthyDeployPercentage = description.targetHealthyDeployPercentage != null ? description.targetHealthyDeployPercentage : 100

if (targetHealthyDeployPercentage < 0 || targetHealthyDeployPercentage > 100) {
throw new NumberFormatException("targetHealthyDeployPercentage must be an integer between 0 and 100")
} else if (targetHealthyDeployPercentage < 100) {
AbstractEurekaSupport.log.info("Marking ${description.asgName} instances ${discoveryStatus.value} with targetHealthyDeployPercentage ${targetHealthyDeployPercentage}")
}

try {
applicationName = retry(task, phaseName, findApplicationNameRetryMax) { retryCount ->
(applicationName, instanceDetails) = retry(task, phaseName, findApplicationNameRetryMax) { retryCount ->
def instanceId = instanceIds[random.nextInt(instanceIds.size())]
task.updateStatus phaseName, "Looking up discovery application name for instance $instanceId (attempt: $retryCount)"

def instanceDetails = eureka.getInstanceInfo(instanceId)
def appName = instanceDetails?.instance?.app
def details = eureka.getInstanceInfo(instanceId)
def appName = details?.instance?.app
if (!appName) {
throw new RetryableException("Looking up instance application name in Discovery failed for instance ${instanceId} (attempt: $retryCount)")
}
return appName
return [appName, details]
}
} catch (e) {
if (discoveryStatus == DiscoveryStatus.Enable || verifyInstanceAndAsgExist(description.credentials, description.region, null, description.asgName)) {
if (discoveryStatus == DiscoveryStatus.UP || verifyInstanceAndAsgExist(description.credentials, description.region, null, description.asgName)) {
throw e
}
}
Expand All @@ -113,7 +116,7 @@ abstract class AbstractEurekaSupport {
sleep eurekaSupportConfigurationProperties.throttleMillis
}

if (discoveryStatus == DiscoveryStatus.Disable) {
if (discoveryStatus == DiscoveryStatus.OUT_OF_SERVICE) {
if (index % eurekaSupportConfigurationProperties.attemptShortCircuitEveryNInstances == 0) {
try {
def hasUpInstances = doesCachedClusterContainDiscoveryStatus(
Expand All @@ -139,18 +142,26 @@ abstract class AbstractEurekaSupport {

Response resp

if (discoveryStatus == DiscoveryStatus.Disable) {
if (discoveryStatus == DiscoveryStatus.OUT_OF_SERVICE) {
resp = eureka.updateInstanceStatus(applicationName, instanceId, discoveryStatus.value)
} else {
resp = eureka.resetInstanceStatus(applicationName, instanceId, DiscoveryStatus.Disable.value)
// If we're trying to set the status to UP, and the instance is already UP (or hasn't registered yet),
// skip it.
if (instanceDetails.instance.status == DiscoveryStatus.OUT_OF_SERVICE.value) {
resp = eureka.resetInstanceStatus(applicationName, instanceId, DiscoveryStatus.OUT_OF_SERVICE.value)
} else {
log.debug("Instance {} is {} in discovery, no override to remove", instanceId, instanceDetails.instance.status)
skipped.add(instanceId)
return
}
}

if (resp.status != 200) {
throw new RetryableException("Non HTTP 200 response from discovery for instance ${instanceId}, will retry (attempt: $retryCount}).")
}
}
} catch (RetrofitError retrofitError) {
if (discoveryStatus == DiscoveryStatus.Disable) {
if (discoveryStatus == DiscoveryStatus.OUT_OF_SERVICE) {
def alwaysSkippable = retrofitError.response?.status == 404
def willSkip = alwaysSkippable || !strict
def skippingOrNot = willSkip ? "skipping" : "not skipping"
Expand All @@ -172,15 +183,18 @@ abstract class AbstractEurekaSupport {
} catch (ex) {
errors[instanceId] = ex
}

if (errors[instanceId]) {
if (verifyInstanceAndAsgExist(description.credentials, description.region, instanceId, description.asgName)) {
fatals.add(instanceId)
} else {
task.updateStatus phaseName, "Instance '${instanceId}' does not exist and will not be marked as '${discoveryStatus.value}'"
}
}

index++
}

if (fatals) {
Integer requiredInstances = Math.ceil(instanceIds.size() * targetHealthyDeployPercentage / 100D) as Integer
if (instanceIds.size() - fatals.size() >= requiredInstances) {
Expand All @@ -192,6 +206,7 @@ abstract class AbstractEurekaSupport {
AbstractEurekaSupport.log.info("[$phaseName] - Failed marking discovery $discoveryStatus.value for instances ${errors}")
}
}

if (!skipped.isEmpty()) {
task.addResultObjects([
["discoverySkippedInstanceIds": instanceIds]
Expand Down Expand Up @@ -336,8 +351,8 @@ abstract class AbstractEurekaSupport {
}

enum DiscoveryStatus {
Enable('UP'),
Disable('OUT_OF_SERVICE')
UP('UP'),
OUT_OF_SERVICE('OUT_OF_SERVICE')

String value

Expand Down

0 comments on commit 752bebc

Please sign in to comment.