diff --git a/gradle.properties b/gradle.properties index 42e2630146..fe54fbc7d9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,7 @@ #Tue Sep 17 18:06:31 UTC 2019 fiatVersion=1.1.0 enablePublishing=false +korkVersion=6.8.0 spinnakerGradleVersion=7.0.1 korkVersion=6.8.0 keikoVersion=2.13.5 diff --git a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/KatoRestService.groovy b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/KatoRestService.groovy index 3f9dd9ed0d..40d84cc5d1 100644 --- a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/KatoRestService.groovy +++ b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/KatoRestService.groovy @@ -18,6 +18,7 @@ package com.netflix.spinnaker.orca.clouddriver import com.netflix.spinnaker.orca.clouddriver.model.Task import com.netflix.spinnaker.orca.clouddriver.model.TaskId +import io.github.resilience4j.retry.annotation.Retry import retrofit.client.Response import retrofit.http.Body import retrofit.http.DELETE @@ -69,4 +70,8 @@ interface KatoRestService { */ @GET("/task/{id}") Task lookupTask(@Path("id") String id); + + @POST("/task/{id}:resume") + @Retry(name = "katoRetrofitServiceWriter") + TaskId resumeTask(@Path("id") String id); } diff --git a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/KatoService.groovy b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/KatoService.groovy index 670a1fd2eb..b72fe744d6 100644 --- a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/KatoService.groovy +++ b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/KatoService.groovy @@ -25,6 +25,8 @@ import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Component import rx.Observable +import javax.annotation.Nonnull + @Component class KatoService { @@ -53,6 +55,11 @@ class KatoService { return Observable.from(cloudDriverTaskStatusService.lookupTask(id)) } + @Nonnull + TaskId resumeTask(@Nonnull String id) { + katoRestService.resumeTask(id) + } + private static String requestId(Object payload) { final ExecutionContext context = ExecutionContext.get() final byte[] payloadBytes = OrcaObjectMapper.getInstance().writeValueAsBytes(payload) diff --git a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/model/Task.groovy b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/model/Task.groovy index 9cddf34cfe..028be6c060 100644 --- a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/model/Task.groovy +++ b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/model/Task.groovy @@ -32,6 +32,7 @@ class Task { static class Status implements Serializable { boolean completed boolean failed + boolean retryable } @Immutable diff --git a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/tasks/MonitorKatoTask.groovy b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/tasks/MonitorKatoTask.groovy index b1831fc6d3..876546ffb5 100644 --- a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/tasks/MonitorKatoTask.groovy +++ b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/tasks/MonitorKatoTask.groovy @@ -24,16 +24,18 @@ import com.netflix.spinnaker.orca.clouddriver.KatoService import com.netflix.spinnaker.orca.clouddriver.model.Task import com.netflix.spinnaker.orca.clouddriver.model.TaskId import com.netflix.spinnaker.orca.pipeline.model.Stage +import com.netflix.spinnaker.orca.pipeline.model.SystemNotification import groovy.transform.CompileStatic import groovy.transform.TypeCheckingMode +import groovy.util.logging.Slf4j import org.springframework.beans.factory.annotation.Autowired import org.springframework.beans.factory.annotation.Value import org.springframework.stereotype.Component import retrofit.RetrofitError import java.time.Clock -import java.util.concurrent.TimeUnit +@Slf4j @Component @CompileStatic class MonitorKatoTask implements RetryableTask { @@ -48,24 +50,23 @@ class MonitorKatoTask implements RetryableTask { private final Clock clock private final Registry registry + private final KatoService kato @Autowired - public MonitorKatoTask(Registry registry) { - this(registry, Clock.systemUTC()) + MonitorKatoTask(KatoService katoService, Registry registry) { + this(katoService, registry, Clock.systemUTC()) } - MonitorKatoTask(Registry registry, Clock clock) { + MonitorKatoTask(KatoService katoService, Registry registry, Clock clock) { this.registry = registry this.clock = clock + this.kato = katoService } long getBackoffPeriod() { 5000L } long getTimeout() { 3600000L } - @Autowired - KatoService kato - @Override TaskResult execute(Stage stage) { TaskId taskId = stage.context."kato.last.task.id" as TaskId @@ -107,7 +108,6 @@ class MonitorKatoTask implements RetryableTask { } } - def katoResultExpected = (stage.context["kato.result.expected"] as Boolean) ?: false ExecutionStatus status = katoStatusToTaskStatus(katoTask, katoResultExpected) @@ -130,6 +130,15 @@ class MonitorKatoTask implements RetryableTask { if (!stage.context.containsKey("deploy.jobs") && deployed) { outputs["deploy.jobs"] = deployed } + + if (stage.context."kato.taask.retriedOperation" == true) { + stage.execution.systemNotifications.add(new SystemNotification( + clock.millis(), + "katoRetryTask", + "Completed cloud provider retry", + true + )) + } } if (status == ExecutionStatus.SUCCEEDED || status == ExecutionStatus.TERMINAL || status == ExecutionStatus.RUNNING) { List> katoTasks = [] @@ -149,7 +158,23 @@ class MonitorKatoTask implements RetryableTask { } katoTasks << m outputs["kato.tasks"] = katoTasks + } + if (status == ExecutionStatus.TERMINAL && katoTask.status.retryable) { + stage.execution.systemNotifications.add(new SystemNotification( + clock.millis(), + "katoRetryTask", + "Retrying failed downstream cloud provider operation", + false + )) + try { + kato.resumeTask(katoTask.id) + } catch (Exception e) { + // Swallow the exception; we'll let Orca retry the next time around. + log.error("Request failed attempting to resume task", e) + } + status = ExecutionStatus.RUNNING + stage.context."kato.task.retriedOperation" = true } TaskResult.builder(status).context(outputs).build() diff --git a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/tasks/job/MonitorJobTask.java b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/tasks/job/MonitorJobTask.java index 75b260f21e..35365f97fd 100644 --- a/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/tasks/job/MonitorJobTask.java +++ b/orca-clouddriver/src/main/groovy/com/netflix/spinnaker/orca/clouddriver/tasks/job/MonitorJobTask.java @@ -19,6 +19,7 @@ import com.netflix.spectator.api.Registry; import com.netflix.spinnaker.orca.TaskResult; +import com.netflix.spinnaker.orca.clouddriver.KatoService; import com.netflix.spinnaker.orca.clouddriver.tasks.MonitorKatoTask; import com.netflix.spinnaker.orca.pipeline.model.Stage; import javax.annotation.Nonnull; @@ -31,13 +32,13 @@ public class MonitorJobTask extends MonitorKatoTask { private final JobUtils jobUtils; @Autowired - public MonitorJobTask(Registry registry, JobUtils jobUtils) { - super(registry); + public MonitorJobTask(KatoService katoService, Registry registry, JobUtils jobUtils) { + super(katoService, registry); this.jobUtils = jobUtils; } - public MonitorJobTask(Registry registry) { - super(registry); + public MonitorJobTask(KatoService katoService, Registry registry) { + super(katoService, registry); this.jobUtils = null; } diff --git a/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/clouddriver/tasks/MonitorKatoTaskSpec.groovy b/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/clouddriver/tasks/MonitorKatoTaskSpec.groovy index b378abb0c6..68c75b4f54 100644 --- a/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/clouddriver/tasks/MonitorKatoTaskSpec.groovy +++ b/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/clouddriver/tasks/MonitorKatoTaskSpec.groovy @@ -41,16 +41,16 @@ class MonitorKatoTaskSpec extends Specification { def now = Instant.now() - @Subject task = new MonitorKatoTask(new NoopRegistry(), Clock.fixed(now, ZoneId.of("UTC"))) {{ + KatoService kato = Mock(KatoService) + + @Subject task = new MonitorKatoTask(kato, new NoopRegistry(), Clock.fixed(now, ZoneId.of("UTC"))) {{ taskNotFoundTimeoutMs = TASK_NOT_FOUND_TIMEOUT }} @Unroll("result is #expectedResult if kato task is #katoStatus") def "result depends on Kato task status"() { given: - task.kato = Stub(KatoService) { - lookupTask(taskId, false) >> Observable.from(new Task(taskId, new Task.Status(completed: completed, failed: failed), [], [])) - } + kato.lookupTask(taskId, false) >> Observable.from(new Task(taskId, new Task.Status(completed: completed, failed: failed), [], [])) and: def stage = new Stage(Execution.newPipeline("orca"), "whatever", [ @@ -73,9 +73,7 @@ class MonitorKatoTaskSpec extends Specification { @Unroll("result is #expectedResult if katoResultExpected is #katoResultExpected and resultObject is #resultObjects") def "result depends on Kato task status and result object size for create/upsert operations"() { given: - task.kato = Stub(KatoService) { - lookupTask(taskId, false) >> Observable.from(new Task(taskId, new Task.Status(completed: true), resultObjects, [])) - } + kato.lookupTask(taskId, false) >> Observable.from(new Task(taskId, new Task.Status(completed: true), resultObjects, [])) and: def stage = new Stage(Execution.newPipeline("orca"), "whatever", [ @@ -132,9 +130,7 @@ class MonitorKatoTaskSpec extends Specification { } } def stage = new Stage(Execution.newPipeline("orca"), "whatever", ctx) - task.kato = Stub(KatoService) { - lookupTask(taskId, false) >> { retrofit404() } - } + kato.lookupTask(taskId, false) >> { retrofit404() } when: def result = task.execute(stage) @@ -156,7 +152,6 @@ class MonitorKatoTaskSpec extends Specification { given: def taskId = "katoTaskId" def elapsed = task.TASK_NOT_FOUND_TIMEOUT + 1 - task.kato = Mock(KatoService) def stage = stage { type = "type" @@ -172,17 +167,38 @@ class MonitorKatoTaskSpec extends Specification { then: result.context['kato.task.skipReplica'] == true notThrown(RetrofitError) - with(task.kato) { + with(kato) { 1 * lookupTask(taskId, false) >> { retrofit404() } 0 * lookupTask(taskId, true) } } + def "should retry clouddriver task if classified as retryable"() { + given: + def katoTask = new Task("katoTaskId", new Task.Status(true, true, true), [], []) + + def stage = stage { + type = "type" + context = [ + "kato.last.task.id": new TaskId(katoTask.id) + ] + } + + when: + task.execute(stage) + + then: + notThrown(RetrofitError) + with(kato) { + 1 * lookupTask(katoTask.id, false) >> { Observable.just(katoTask) } + 1 * resumeTask(katoTask.id) >> { new TaskId(katoTask.id) } + } + } + def "should get task from master when kato.task.skipReplica=true"() { given: def taskId = "katoTaskId" def elapsed = TASK_NOT_FOUND_TIMEOUT + 1 - task.kato = Mock(KatoService) def stage = stage { context = [ "kato.last.task.id": new TaskId(taskId), @@ -196,7 +212,7 @@ class MonitorKatoTaskSpec extends Specification { then: notThrown(RetrofitError) - with(task.kato) { + with(kato) { 1 * lookupTask(taskId, true) >> Observable.from(new Task(taskId, new Task.Status(completed: true, failed: false), [], [])) 0 * lookupTask(taskId, false) } @@ -213,9 +229,7 @@ class MonitorKatoTaskSpec extends Specification { ] } - task.kato = Stub(KatoService) { - lookupTask(taskId, true) >> { retrofit404() } - } + kato.lookupTask(taskId, true) >> { retrofit404() } when: task.execute(stage) diff --git a/orca-clouddriver/src/test/java/com/netflix/spinnaker/orca/clouddriver/tasks/providers/cf/CloudFoundryMonitorKatoServicesTaskTest.java b/orca-clouddriver/src/test/java/com/netflix/spinnaker/orca/clouddriver/tasks/providers/cf/CloudFoundryMonitorKatoServicesTaskTest.java index 07e541d503..70452bc737 100644 --- a/orca-clouddriver/src/test/java/com/netflix/spinnaker/orca/clouddriver/tasks/providers/cf/CloudFoundryMonitorKatoServicesTaskTest.java +++ b/orca-clouddriver/src/test/java/com/netflix/spinnaker/orca/clouddriver/tasks/providers/cf/CloudFoundryMonitorKatoServicesTaskTest.java @@ -53,7 +53,7 @@ private void testKatoServiceStatus( new Task[] { new Task( taskIdString, - new Task.Status(completed, failed), + new Task.Status(completed, failed, false), resultObjects, Collections.emptyList()) })); @@ -63,7 +63,7 @@ private void testKatoServiceStatus( ImmutableMap.Builder katoTaskMapBuilder = new ImmutableMap.Builder() .put("id", taskIdString) - .put("status", new Task.Status(completed, failed)) + .put("status", new Task.Status(completed, failed, false)) .put("history", Collections.emptyList()) .put( "resultObjects", diff --git a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/SystemNotification.kt b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/SystemNotification.kt index a377f476df..b898f78cf4 100644 --- a/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/SystemNotification.kt +++ b/orca-core/src/main/java/com/netflix/spinnaker/orca/pipeline/model/SystemNotification.kt @@ -20,8 +20,8 @@ package com.netflix.spinnaker.orca.pipeline.model * * Each [SystemNotification] is treated as immutable and stored in an * append-only log. In order to finalize/cancel/dismiss a previous - * notification, the [group] propertyshould be used in combination with - * [closed], so that setting [closed] to true willhide other messages using + * notification, the [group] property should be used in combination with + * [closed], so that setting [closed] to true will hide other messages using * the same [group] value. In the case of a close record, the message should * include a reason why it is being closed. */ diff --git a/orca-web/config/orca.yml b/orca-web/config/orca.yml index 5b11bb77e9..5077424480 100644 --- a/orca-web/config/orca.yml +++ b/orca-web/config/orca.yml @@ -51,6 +51,18 @@ tasks: logging: config: classpath:logback-defaults.xml +resilience4j.retry: + instances: + # TODO(rz): These defaults were just copied from the resilience4j website. They still need to be tuned. + default: + maxRetryAttempts: 2 + waitDuration: 10s + enableExponentialBackoff: false + # katoRetrofitServiceWriter is any write operation to Clouddriver using Retrofit. + katoRetrofitServiceWriter: + retryExceptions: + - retrofit.RetrofitError + integrations: gremlin: enabled: false