Skip to content

Commit

Permalink
feat(clouddriver): Basic support for retryable kato tasks (#3162)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert committed Sep 17, 2019
1 parent 8a409a6 commit caa471c
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 33 deletions.
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#Tue Sep 17 18:06:31 UTC 2019
fiatVersion=1.1.0
enablePublishing=false
korkVersion=6.8.0
spinnakerGradleVersion=7.0.1
korkVersion=6.8.0
keikoVersion=2.13.5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.netflix.spinnaker.orca.clouddriver

import com.netflix.spinnaker.orca.clouddriver.model.Task
import com.netflix.spinnaker.orca.clouddriver.model.TaskId
import io.github.resilience4j.retry.annotation.Retry
import retrofit.client.Response
import retrofit.http.Body
import retrofit.http.DELETE
Expand Down Expand Up @@ -69,4 +70,8 @@ interface KatoRestService {
*/
@GET("/task/{id}")
Task lookupTask(@Path("id") String id);

@POST("/task/{id}:resume")
@Retry(name = "katoRetrofitServiceWriter")
TaskId resumeTask(@Path("id") String id);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
import rx.Observable

import javax.annotation.Nonnull

@Component
class KatoService {

Expand Down Expand Up @@ -53,6 +55,11 @@ class KatoService {
return Observable.from(cloudDriverTaskStatusService.lookupTask(id))
}

@Nonnull
TaskId resumeTask(@Nonnull String id) {
katoRestService.resumeTask(id)
}

private static String requestId(Object payload) {
final ExecutionContext context = ExecutionContext.get()
final byte[] payloadBytes = OrcaObjectMapper.getInstance().writeValueAsBytes(payload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class Task {
static class Status implements Serializable {
boolean completed
boolean failed
boolean retryable
}

@Immutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,18 @@ import com.netflix.spinnaker.orca.clouddriver.KatoService
import com.netflix.spinnaker.orca.clouddriver.model.Task
import com.netflix.spinnaker.orca.clouddriver.model.TaskId
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.model.SystemNotification
import groovy.transform.CompileStatic
import groovy.transform.TypeCheckingMode
import groovy.util.logging.Slf4j
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import retrofit.RetrofitError

import java.time.Clock
import java.util.concurrent.TimeUnit

@Slf4j
@Component
@CompileStatic
class MonitorKatoTask implements RetryableTask {
Expand All @@ -48,24 +50,23 @@ class MonitorKatoTask implements RetryableTask {

private final Clock clock
private final Registry registry
private final KatoService kato

@Autowired
public MonitorKatoTask(Registry registry) {
this(registry, Clock.systemUTC())
MonitorKatoTask(KatoService katoService, Registry registry) {
this(katoService, registry, Clock.systemUTC())
}

MonitorKatoTask(Registry registry, Clock clock) {
MonitorKatoTask(KatoService katoService, Registry registry, Clock clock) {
this.registry = registry
this.clock = clock
this.kato = katoService
}

long getBackoffPeriod() { 5000L }

long getTimeout() { 3600000L }

@Autowired
KatoService kato

@Override
TaskResult execute(Stage stage) {
TaskId taskId = stage.context."kato.last.task.id" as TaskId
Expand Down Expand Up @@ -107,7 +108,6 @@ class MonitorKatoTask implements RetryableTask {
}
}


def katoResultExpected = (stage.context["kato.result.expected"] as Boolean) ?: false
ExecutionStatus status = katoStatusToTaskStatus(katoTask, katoResultExpected)

Expand All @@ -130,6 +130,15 @@ class MonitorKatoTask implements RetryableTask {
if (!stage.context.containsKey("deploy.jobs") && deployed) {
outputs["deploy.jobs"] = deployed
}

if (stage.context."kato.taask.retriedOperation" == true) {
stage.execution.systemNotifications.add(new SystemNotification(
clock.millis(),
"katoRetryTask",
"Completed cloud provider retry",
true
))
}
}
if (status == ExecutionStatus.SUCCEEDED || status == ExecutionStatus.TERMINAL || status == ExecutionStatus.RUNNING) {
List<Map<String, Object>> katoTasks = []
Expand All @@ -149,7 +158,23 @@ class MonitorKatoTask implements RetryableTask {
}
katoTasks << m
outputs["kato.tasks"] = katoTasks
}

if (status == ExecutionStatus.TERMINAL && katoTask.status.retryable) {
stage.execution.systemNotifications.add(new SystemNotification(
clock.millis(),
"katoRetryTask",
"Retrying failed downstream cloud provider operation",
false
))
try {
kato.resumeTask(katoTask.id)
} catch (Exception e) {
// Swallow the exception; we'll let Orca retry the next time around.
log.error("Request failed attempting to resume task", e)
}
status = ExecutionStatus.RUNNING
stage.context."kato.task.retriedOperation" = true
}

TaskResult.builder(status).context(outputs).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.orca.TaskResult;
import com.netflix.spinnaker.orca.clouddriver.KatoService;
import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask;
import com.netflix.spinnaker.orca.pipeline.model.Stage;
import javax.annotation.Nonnull;
Expand All @@ -31,13 +32,13 @@ public class MonitorJobTask extends MonitorKatoTask {
private final JobUtils jobUtils;

@Autowired
public MonitorJobTask(Registry registry, JobUtils jobUtils) {
super(registry);
public MonitorJobTask(KatoService katoService, Registry registry, JobUtils jobUtils) {
super(katoService, registry);
this.jobUtils = jobUtils;
}

public MonitorJobTask(Registry registry) {
super(registry);
public MonitorJobTask(KatoService katoService, Registry registry) {
super(katoService, registry);
this.jobUtils = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ class MonitorKatoTaskSpec extends Specification {

def now = Instant.now()

@Subject task = new MonitorKatoTask(new NoopRegistry(), Clock.fixed(now, ZoneId.of("UTC"))) {{
KatoService kato = Mock(KatoService)

@Subject task = new MonitorKatoTask(kato, new NoopRegistry(), Clock.fixed(now, ZoneId.of("UTC"))) {{
taskNotFoundTimeoutMs = TASK_NOT_FOUND_TIMEOUT
}}

@Unroll("result is #expectedResult if kato task is #katoStatus")
def "result depends on Kato task status"() {
given:
task.kato = Stub(KatoService) {
lookupTask(taskId, false) >> Observable.from(new Task(taskId, new Task.Status(completed: completed, failed: failed), [], []))
}
kato.lookupTask(taskId, false) >> Observable.from(new Task(taskId, new Task.Status(completed: completed, failed: failed), [], []))

and:
def stage = new Stage(Execution.newPipeline("orca"), "whatever", [
Expand All @@ -73,9 +73,7 @@ class MonitorKatoTaskSpec extends Specification {
@Unroll("result is #expectedResult if katoResultExpected is #katoResultExpected and resultObject is #resultObjects")
def "result depends on Kato task status and result object size for create/upsert operations"() {
given:
task.kato = Stub(KatoService) {
lookupTask(taskId, false) >> Observable.from(new Task(taskId, new Task.Status(completed: true), resultObjects, []))
}
kato.lookupTask(taskId, false) >> Observable.from(new Task(taskId, new Task.Status(completed: true), resultObjects, []))

and:
def stage = new Stage(Execution.newPipeline("orca"), "whatever", [
Expand Down Expand Up @@ -132,9 +130,7 @@ class MonitorKatoTaskSpec extends Specification {
}
}
def stage = new Stage(Execution.newPipeline("orca"), "whatever", ctx)
task.kato = Stub(KatoService) {
lookupTask(taskId, false) >> { retrofit404() }
}
kato.lookupTask(taskId, false) >> { retrofit404() }

when:
def result = task.execute(stage)
Expand All @@ -156,7 +152,6 @@ class MonitorKatoTaskSpec extends Specification {
given:
def taskId = "katoTaskId"
def elapsed = task.TASK_NOT_FOUND_TIMEOUT + 1
task.kato = Mock(KatoService)

def stage = stage {
type = "type"
Expand All @@ -172,17 +167,38 @@ class MonitorKatoTaskSpec extends Specification {
then:
result.context['kato.task.skipReplica'] == true
notThrown(RetrofitError)
with(task.kato) {
with(kato) {
1 * lookupTask(taskId, false) >> { retrofit404() }
0 * lookupTask(taskId, true)
}
}

def "should retry clouddriver task if classified as retryable"() {
given:
def katoTask = new Task("katoTaskId", new Task.Status(true, true, true), [], [])

def stage = stage {
type = "type"
context = [
"kato.last.task.id": new TaskId(katoTask.id)
]
}

when:
task.execute(stage)

then:
notThrown(RetrofitError)
with(kato) {
1 * lookupTask(katoTask.id, false) >> { Observable.just(katoTask) }
1 * resumeTask(katoTask.id) >> { new TaskId(katoTask.id) }
}
}

def "should get task from master when kato.task.skipReplica=true"() {
given:
def taskId = "katoTaskId"
def elapsed = TASK_NOT_FOUND_TIMEOUT + 1
task.kato = Mock(KatoService)
def stage = stage {
context = [
"kato.last.task.id": new TaskId(taskId),
Expand All @@ -196,7 +212,7 @@ class MonitorKatoTaskSpec extends Specification {

then:
notThrown(RetrofitError)
with(task.kato) {
with(kato) {
1 * lookupTask(taskId, true) >> Observable.from(new Task(taskId, new Task.Status(completed: true, failed: false), [], []))
0 * lookupTask(taskId, false)
}
Expand All @@ -213,9 +229,7 @@ class MonitorKatoTaskSpec extends Specification {
]
}

task.kato = Stub(KatoService) {
lookupTask(taskId, true) >> { retrofit404() }
}
kato.lookupTask(taskId, true) >> { retrofit404() }

when:
task.execute(stage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private void testKatoServiceStatus(
new Task[] {
new Task(
taskIdString,
new Task.Status(completed, failed),
new Task.Status(completed, failed, false),
resultObjects,
Collections.emptyList())
}));
Expand All @@ -63,7 +63,7 @@ private void testKatoServiceStatus(
ImmutableMap.Builder<String, Object> katoTaskMapBuilder =
new ImmutableMap.Builder<String, Object>()
.put("id", taskIdString)
.put("status", new Task.Status(completed, failed))
.put("status", new Task.Status(completed, failed, false))
.put("history", Collections.emptyList())
.put(
"resultObjects",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package com.netflix.spinnaker.orca.pipeline.model
*
* Each [SystemNotification] is treated as immutable and stored in an
* append-only log. In order to finalize/cancel/dismiss a previous
* notification, the [group] propertyshould be used in combination with
* [closed], so that setting [closed] to true willhide other messages using
* notification, the [group] property should be used in combination with
* [closed], so that setting [closed] to true will hide other messages using
* the same [group] value. In the case of a close record, the message should
* include a reason why it is being closed.
*/
Expand Down
12 changes: 12 additions & 0 deletions orca-web/config/orca.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ tasks:
logging:
config: classpath:logback-defaults.xml

resilience4j.retry:
instances:
# TODO(rz): These defaults were just copied from the resilience4j website. They still need to be tuned.
default:
maxRetryAttempts: 2
waitDuration: 10s
enableExponentialBackoff: false
# katoRetrofitServiceWriter is any write operation to Clouddriver using Retrofit.
katoRetrofitServiceWriter:
retryExceptions:
- retrofit.RetrofitError

integrations:
gremlin:
enabled: false
Expand Down

0 comments on commit caa471c

Please sign in to comment.