Skip to content

Commit

Permalink
fix(clouddriver): Getting task from clouddriver master when timed out…
Browse files Browse the repository at this point in the history
… attempts to replicas (#2011)

- keeping track if we have tried master
- added fx to look up task from master directly (KatoRestService)
  • Loading branch information
jeyrschabu committed Feb 22, 2018
1 parent 493ed17 commit 2b6f0ca
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.spinnaker.orca.clouddriver

import com.netflix.spinnaker.orca.clouddriver.model.Task
import com.netflix.spinnaker.orca.clouddriver.model.TaskId
import retrofit.client.Response
import retrofit.http.Body
Expand Down Expand Up @@ -57,4 +58,10 @@ interface KatoRestService {
@Path("id") String id,
@Path("fileName") String fileName)

/**
* This should _only_ be called if there is a problem retrieving the Task from CloudDriverTaskStatusService (ie. a
* clouddriver replica).
*/
@GET("/task/{id}")
Task lookupTask(@Path("id") String id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ class KatoService {
return Observable.from(cloudDriverTaskStatusService.listTasks())
}

Observable<Task> lookupTask(String id) {
Observable<Task> lookupTask(String id, boolean skipReplica = false) {
if (skipReplica) {
return Observable.from(katoRestService.lookupTask(id))
}

return Observable.from(cloudDriverTaskStatusService.lookupTask(id))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,29 @@ class MonitorKatoTask implements RetryableTask {
}

Task katoTask
def skipReplica = stage.context."kato.task.skipReplica" ?: false
try {
katoTask = kato.lookupTask(taskId.id).toBlocking().first()
katoTask = kato.lookupTask(taskId.id, skipReplica as Boolean).toBlocking().first()
} catch (RetrofitError re) {
//handle a 404 if a task update has not successfully replicated to a read replica
if (re.kind == RetrofitError.Kind.HTTP && re.response.status == HttpURLConnection.HTTP_NOT_FOUND) {
def firstNotFoundRetry = stage.context."kato.task.firstNotFoundRetry" as Long

def now = clock.millis()
def ctx = [:]
if (firstNotFoundRetry == null || firstNotFoundRetry == -1) {
ctx['kato.task.firstNotFoundRetry'] = now
firstNotFoundRetry = now
}

if (now - firstNotFoundRetry > TASK_NOT_FOUND_TIMEOUT) {
if (skipReplica) {
// immediately fail the first time it gets a 404 directly from the master
throw re
}

registry.counter("monitorKatoTask.taskNotFound.timeout").increment()
throw re
ctx['kato.task.skipReplica'] = true
}

registry.counter("monitorKatoTask.taskNotFound.retry").increment()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import spock.lang.Specification
import spock.lang.Subject
import spock.lang.Unroll

import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.stage

class MonitorKatoTaskSpec extends Specification {

def now = Instant.now()
Expand All @@ -43,7 +45,7 @@ class MonitorKatoTaskSpec extends Specification {
def "result depends on Kato task status"() {
given:
task.kato = Stub(KatoService) {
lookupTask(taskId) >> Observable.from(new Task(taskId, new Task.Status(completed: completed, failed: failed), [], []))
lookupTask(taskId, false) >> Observable.from(new Task(taskId, new Task.Status(completed: completed, failed: failed), [], []))
}

and:
Expand All @@ -68,7 +70,7 @@ class MonitorKatoTaskSpec extends Specification {
def "result depends on Kato task status and result object size for create/upsert operations"() {
given:
task.kato = Stub(KatoService) {
lookupTask(taskId) >> Observable.from(new Task(taskId, new Task.Status(completed: true), resultObjects, []))
lookupTask(taskId, false) >> Observable.from(new Task(taskId, new Task.Status(completed: true), resultObjects, []))
}

and:
Expand Down Expand Up @@ -127,7 +129,7 @@ class MonitorKatoTaskSpec extends Specification {
}
def stage = new Stage(Execution.newPipeline("orca"), "whatever", ctx)
task.kato = Stub(KatoService) {
lookupTask(taskId) >> { retrofit404() }
lookupTask(taskId, false) >> { retrofit404() }
}

when:
Expand All @@ -146,15 +148,69 @@ class MonitorKatoTaskSpec extends Specification {
"about to fail" | MonitorKatoTask.TASK_NOT_FOUND_TIMEOUT | true | 'N/A'
}

def "should set kato.task.skipReplica=true when getting a task from clouddriver times out"() {
given:
def taskId = "katoTaskId"
def elapsed = MonitorKatoTask.TASK_NOT_FOUND_TIMEOUT + 1
task.kato = Mock(KatoService)

def stage = stage {
type = "type"
context = [
"kato.last.task.id": new TaskId(taskId),
"kato.task.firstNotFoundRetry": now.minusMillis(elapsed).toEpochMilli()
]
}

when:
def result = task.execute(stage)

then:
result.context['kato.task.skipReplica'] == true
notThrown(RetrofitError)
with(task.kato) {
1 * lookupTask(taskId, false) >> { retrofit404() }
0 * lookupTask(taskId, true)
}
}

def "should get task from master when kato.task.skipReplica=true"() {
given:
def taskId = "katoTaskId"
def elapsed = MonitorKatoTask.TASK_NOT_FOUND_TIMEOUT + 1
task.kato = Mock(KatoService)
def stage = stage {
context = [
"kato.last.task.id": new TaskId(taskId),
"kato.task.firstNotFoundRetry": now.minusMillis(elapsed).toEpochMilli(),
"kato.task.skipReplica": true
]
}

when:
task.execute(stage)

then:
notThrown(RetrofitError)
with(task.kato) {
1 * lookupTask(taskId, true) >> Observable.from(new Task(taskId, new Task.Status(completed: true, failed: false), [], []))
0 * lookupTask(taskId, false)
}
}

def "should timeout if task not not found after timeout period"() {
given:
def ctx = [
"kato.last.task.id": new TaskId(taskId)
]
ctx.put('kato.task.firstNotFoundRetry', now.minusMillis(elapsed).toEpochMilli())
def stage = new Stage(Execution.newPipeline("orca"), "whatever", ctx)
def taskId = "katoTaskId"
def stage = stage {
context = [
"kato.last.task.id": new TaskId(taskId),
"kato.task.firstNotFoundRetry": now.minusMillis(elapsed).toEpochMilli(),
"kato.task.skipReplica": true
]
}

task.kato = Stub(KatoService) {
lookupTask(taskId) >> { retrofit404() }
lookupTask(taskId, true) >> { retrofit404() }
}

when:
Expand All @@ -164,7 +220,6 @@ class MonitorKatoTaskSpec extends Specification {
thrown(RetrofitError)

where:
taskId = "katoTaskId"
elapsed = MonitorKatoTask.TASK_NOT_FOUND_TIMEOUT + 1
}

Expand Down

0 comments on commit 2b6f0ca

Please sign in to comment.