From f8b33196204de232f0429816ec453eccdded432a Mon Sep 17 00:00:00 2001 From: Rob Zienert Date: Mon, 16 Sep 2019 16:22:37 -0700 Subject: [PATCH] feat(saga): Add support for retryable tasks within OrchestrationProcessor (#4025) --- .../core/test/TaskRepositoryTck.java | 14 + clouddriver-core/clouddriver-core.gradle | 1 + .../config/CloudDriverConfig.groovy | 12 +- .../config/DeployConfiguration.groovy | 49 +++- ...tionClassifierConfigurationProperties.java | 32 +++ .../clouddriver/data/task/DefaultTask.groovy | 33 +++ .../task/SagaId.java} | 20 +- .../clouddriver/data/task/Status.groovy | 5 + .../clouddriver/data/task/Task.groovy | 85 ------ .../spinnaker/clouddriver/data/task/Task.java | 95 +++++++ .../clouddriver/data/task/TaskState.groovy | 9 +- .../data/task/jedis/JedisTask.groovy | 34 ++- .../data/task/jedis/RedisTaskRepository.java | 30 +- .../clouddriver/documentation/Nullable.groovy | 3 +- .../orchestration/AtomicOperation.java | 2 +- .../AtomicOperationConverter.groovy | 4 +- ...icOperationConverterNotFoundException.java | 33 +++ .../AtomicOperationException.groovy | 36 --- .../AtomicOperationException.java | 52 ++++ .../AtomicOperationNotFoundException.groovy | 25 -- .../AtomicOperationNotFoundException.java | 35 +++ .../AtomicOperationsRegistry.groovy | 4 +- .../DefaultOrchestrationProcessor.groovy | 92 ++++-- .../orchestration/ExceptionClassifier.java | 44 +++ .../orchestration/OperationsService.java | 258 +++++++++++++++++ .../data/task/jedis/JedisTaskSpec.groovy | 3 +- .../DefaultOrchestrationProcessorSpec.groovy | 42 ++- .../OperationsServiceSpec.groovy | 155 +++++------ .../spinnaker/clouddriver/saga/SagaService.kt | 2 + .../clouddriver/saga/flow/SagaAction.kt | 11 + .../spinnaker/clouddriver/sql/SqlTask.kt | 21 ++ .../clouddriver/sql/SqlTaskRepository.kt | 14 +- .../spinnaker/clouddriver/sql/TaskMapper.kt | 12 + .../main/resources/db/changelog-master.yml | 3 + .../db/changelog/20190913-task-sagaids.yml | 23 ++ .../sql/SqlTaskRepositoryTest.java | 4 +- .../deploy/handlers/TitusDeployHandler.java | 6 + .../controllers/OperationsController.groovy | 262 ++++++++---------- .../controllers/TaskController.groovy | 72 ----- .../controllers/FeaturesControllerSpec.groovy | 2 +- 40 files changed, 1120 insertions(+), 519 deletions(-) create mode 100644 clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/config/ExceptionClassifierConfigurationProperties.java rename clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/{orchestration/AtomicOperationConverterNotFoundException.groovy => data/task/SagaId.java} (54%) delete mode 100644 clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/Task.groovy create mode 100644 clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/Task.java create mode 100644 clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationConverterNotFoundException.java delete mode 100644 clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationException.groovy create mode 100644 clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationException.java delete mode 100644 clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationNotFoundException.groovy create mode 100644 clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationNotFoundException.java create mode 100644 clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/ExceptionClassifier.java create mode 100644 clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/OperationsService.java rename clouddriver-web/src/test/groovy/com/netflix/spinnaker/clouddriver/controllers/OperationsControllerSpec.groovy => clouddriver-core/src/test/groovy/com/netflix/spinnaker/clouddriver/orchestration/OperationsServiceSpec.groovy (59%) create mode 100644 clouddriver-sql/src/main/resources/db/changelog/20190913-task-sagaids.yml delete mode 100644 clouddriver-web/src/main/groovy/com/netflix/spinnaker/clouddriver/controllers/TaskController.groovy diff --git a/clouddriver-core-tck/src/main/java/com/netflix/spinnaker/clouddriver/core/test/TaskRepositoryTck.java b/clouddriver-core-tck/src/main/java/com/netflix/spinnaker/clouddriver/core/test/TaskRepositoryTck.java index 1b7caa61618..64d2eb4a065 100644 --- a/clouddriver-core-tck/src/main/java/com/netflix/spinnaker/clouddriver/core/test/TaskRepositoryTck.java +++ b/clouddriver-core-tck/src/main/java/com/netflix/spinnaker/clouddriver/core/test/TaskRepositoryTck.java @@ -62,6 +62,7 @@ public void testTaskLookup() { assertThat(t1.getStatus().isFailed()).isEqualTo(t2.getStatus().isFailed()); assertThat(t1.getStatus().isCompleted()).isFalse(); assertThat(t1.getStatus().isFailed()).isFalse(); + assertThat(t1.getStatus().isRetryable()).isFalse(); } @Test @@ -73,6 +74,19 @@ public void testFailureStatus() { assertThat(t2.getStatus().isCompleted()).isTrue(); assertThat(t2.getStatus().isFailed()).isTrue(); + assertThat(t2.getStatus().isRetryable()).isFalse(); + } + + @Test + public void testRetryableStatus() { + Task t1 = subject.create("TEST", "Test Status"); + t1.fail(true); + + Task t2 = subject.get(t1.getId()); + + assertThat(t2.getStatus().isCompleted()).isTrue(); + assertThat(t2.getStatus().isFailed()).isTrue(); + assertThat(t2.getStatus().isRetryable()).isTrue(); } @Test diff --git a/clouddriver-core/clouddriver-core.gradle b/clouddriver-core/clouddriver-core.gradle index 2e75a920058..f34f7e5dae3 100644 --- a/clouddriver-core/clouddriver-core.gradle +++ b/clouddriver-core/clouddriver-core.gradle @@ -2,6 +2,7 @@ dependencies { implementation project(":cats:cats-core") implementation project(":cats:cats-redis") implementation project(":clouddriver-security") + implementation project(":clouddriver-saga") compileOnly "org.projectlombok:lombok" annotationProcessor "org.projectlombok:lombok" diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/config/CloudDriverConfig.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/config/CloudDriverConfig.groovy index 9264887dc26..71f810a72be 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/config/CloudDriverConfig.groovy +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/config/CloudDriverConfig.groovy @@ -75,6 +75,10 @@ import com.netflix.spinnaker.clouddriver.model.SubnetProvider import com.netflix.spinnaker.clouddriver.names.NamerRegistry import com.netflix.spinnaker.clouddriver.names.NamingStrategy import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationConverter +import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationDescriptionPreProcessor +import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationsRegistry +import com.netflix.spinnaker.clouddriver.orchestration.ExceptionClassifier +import com.netflix.spinnaker.clouddriver.orchestration.OperationsService import com.netflix.spinnaker.clouddriver.search.ApplicationSearchProvider import com.netflix.spinnaker.clouddriver.search.NoopSearchProvider import com.netflix.spinnaker.clouddriver.search.ProjectSearchProvider @@ -82,6 +86,7 @@ import com.netflix.spinnaker.clouddriver.search.SearchProvider import com.netflix.spinnaker.clouddriver.search.executor.SearchExecutorConfig import com.netflix.spinnaker.clouddriver.security.AccountCredentialsProvider import com.netflix.spinnaker.clouddriver.security.AccountCredentialsRepository +import com.netflix.spinnaker.clouddriver.security.AllowedAccountsValidator import com.netflix.spinnaker.clouddriver.security.DefaultAccountCredentialsProvider import com.netflix.spinnaker.clouddriver.security.MapBackedAccountCredentialsRepository import com.netflix.spinnaker.fiat.shared.FiatPermissionEvaluator @@ -111,7 +116,7 @@ import java.time.Clock SearchExecutorConfig ]) @PropertySource(value = "classpath:META-INF/clouddriver-core.properties", ignoreResourceNotFound = true) -@EnableConfigurationProperties(ProjectClustersCachingAgentProperties) +@EnableConfigurationProperties([ProjectClustersCachingAgentProperties, ExceptionClassifierConfigurationProperties]) class CloudDriverConfig { @Bean @@ -345,4 +350,9 @@ class CloudDriverConfig { fiatPermissionEvaluator ) } + + @Bean + ExceptionClassifier exceptionClassifier(ExceptionClassifierConfigurationProperties properties) { + return new ExceptionClassifier(properties) + } } diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/config/DeployConfiguration.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/config/DeployConfiguration.groovy index 57d6ff8137a..840bd55b24c 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/config/DeployConfiguration.groovy +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/config/DeployConfiguration.groovy @@ -16,17 +16,27 @@ package com.netflix.spinnaker.clouddriver.config +import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.spectator.api.Registry import com.netflix.spinnaker.clouddriver.data.task.InMemoryTaskRepository import com.netflix.spinnaker.clouddriver.data.task.TaskRepository import com.netflix.spinnaker.clouddriver.deploy.DefaultDeployHandlerRegistry import com.netflix.spinnaker.clouddriver.deploy.DeployHandler import com.netflix.spinnaker.clouddriver.deploy.DeployHandlerRegistry +import com.netflix.spinnaker.clouddriver.deploy.DescriptionAuthorizer import com.netflix.spinnaker.clouddriver.deploy.NullOpDeployHandler import com.netflix.spinnaker.clouddriver.orchestration.AnnotationsBasedAtomicOperationsRegistry +import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationDescriptionPreProcessor import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationsRegistry import com.netflix.spinnaker.clouddriver.orchestration.DefaultOrchestrationProcessor +import com.netflix.spinnaker.clouddriver.orchestration.ExceptionClassifier +import com.netflix.spinnaker.clouddriver.orchestration.OperationsService import com.netflix.spinnaker.clouddriver.orchestration.OrchestrationProcessor +import com.netflix.spinnaker.clouddriver.orchestration.events.OperationEventHandler +import com.netflix.spinnaker.clouddriver.security.AccountCredentialsRepository +import com.netflix.spinnaker.clouddriver.security.AllowedAccountsValidator import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean +import org.springframework.context.ApplicationContext import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration @@ -46,8 +56,22 @@ class DeployConfiguration { @Bean @ConditionalOnMissingBean(OrchestrationProcessor) - OrchestrationProcessor orchestrationProcessor() { - new DefaultOrchestrationProcessor() + OrchestrationProcessor orchestrationProcessor( + TaskRepository taskRepository, + ApplicationContext applicationContext, + Registry registry, + Optional> operationEventHandlers, + ObjectMapper objectMapper, + ExceptionClassifier exceptionClassifier + ) { + new DefaultOrchestrationProcessor( + taskRepository, + applicationContext, + registry, + operationEventHandlers, + objectMapper, + exceptionClassifier + ) } @Bean @@ -60,4 +84,25 @@ class DeployConfiguration { AtomicOperationsRegistry atomicOperationsRegistry() { new AnnotationsBasedAtomicOperationsRegistry() } + + @Bean + OperationsService operationsService( + AtomicOperationsRegistry atomicOperationsRegistry, + DescriptionAuthorizer descriptionAuthorizer, + Optional> allowedAccountsValidators, + Optional> atomicOperationDescriptionPreProcessors, + AccountCredentialsRepository accountCredentialsRepository, + Registry registry, + ObjectMapper objectMapper + ) { + return new OperationsService( + atomicOperationsRegistry, + descriptionAuthorizer, + allowedAccountsValidators, + atomicOperationDescriptionPreProcessors, + accountCredentialsRepository, + registry, + objectMapper + ) + } } diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/config/ExceptionClassifierConfigurationProperties.java b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/config/ExceptionClassifierConfigurationProperties.java new file mode 100644 index 00000000000..5ccbe5576d1 --- /dev/null +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/config/ExceptionClassifierConfigurationProperties.java @@ -0,0 +1,32 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.config; + +import java.util.ArrayList; +import java.util.List; +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("exception-classifier") +@Data +public class ExceptionClassifierConfigurationProperties { + + /** + * A list of fully-qualified Exception class names that are not retryable within the scope of + * Saga-backed orchestrations. + */ + private List nonRetryableClasses = new ArrayList<>(); +} diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/DefaultTask.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/DefaultTask.groovy index 03e739354f2..f431d91e9d6 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/DefaultTask.groovy +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/DefaultTask.groovy @@ -22,6 +22,7 @@ import com.netflix.spinnaker.clouddriver.core.ClouddriverHostname import groovy.transform.CompileStatic import groovy.transform.Immutable +import javax.annotation.Nonnull import java.util.concurrent.ConcurrentLinkedDeque import java.util.logging.Logger @@ -34,6 +35,7 @@ public class DefaultTask implements Task { final String requestId = null private final Deque statusHistory = new ConcurrentLinkedDeque() private final Deque resultObjects = new ConcurrentLinkedDeque() + private final Deque sagaIdentifiers = new ConcurrentLinkedDeque<>() final long startTimeMs = System.currentTimeMillis() public String getOwnerId() { @@ -67,6 +69,11 @@ public class DefaultTask implements Task { statusHistory.addLast(currentStatus().update(TaskState.FAILED)) } + @Override + void fail(boolean retryable) { + statusHistory.addLast(currentStatus().update(retryable ? TaskState.FAILED_RETRYABLE : TaskState.FAILED)) + } + public Status getStatus() { currentStatus() } @@ -90,6 +97,21 @@ public class DefaultTask implements Task { private DefaultTaskStatus currentStatus() { statusHistory.getLast() as DefaultTaskStatus } + + @Override + void addSagaId(@Nonnull SagaId sagaId) { + sagaIdentifiers.addLast(sagaId) + } + + @Override + List getSagaIds() { + return sagaIdentifiers.toList() + } + + @Override + boolean hasSagaIds() { + return !sagaIdentifiers.isEmpty() + } } @Immutable(knownImmutableClasses = [Status]) @@ -117,6 +139,12 @@ class TaskDisplayStatus implements Status { @JsonIgnore Boolean isFailed() { taskStatus.isFailed() } + + @JsonIgnore + @Override + Boolean isRetryable() { + return taskStatus.isRetryable() + } } @Immutable @@ -142,6 +170,11 @@ class DefaultTaskStatus implements Status { @JsonProperty public Boolean isFailed() { state.isFailed() } + @JsonProperty + public Boolean isRetryable() { + return state.isRetryable() + } + DefaultTaskStatus update(String phase, String status) { ensureUpdateable() new DefaultTaskStatus(phase, status, state) diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationConverterNotFoundException.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/SagaId.java similarity index 54% rename from clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationConverterNotFoundException.groovy rename to clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/SagaId.java index e010c2eef8b..d080c1badae 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationConverterNotFoundException.groovy +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/SagaId.java @@ -1,11 +1,11 @@ /* - * Copyright 2015 Netflix, Inc. + * Copyright 2019 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -13,13 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package com.netflix.spinnaker.clouddriver.data.task; -package com.netflix.spinnaker.clouddriver.orchestration +import javax.annotation.Nonnull; +import lombok.Value; -import groovy.transform.InheritConstructors -import org.springframework.http.HttpStatus -import org.springframework.web.bind.annotation.ResponseStatus - -@ResponseStatus(value = HttpStatus.BAD_REQUEST) -@InheritConstructors -class AtomicOperationConverterNotFoundException extends RuntimeException {} +@Value +public class SagaId { + @Nonnull String name; + @Nonnull String id; +} diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/Status.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/Status.groovy index 733ef993964..a61ee47efd5 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/Status.groovy +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/Status.groovy @@ -46,4 +46,9 @@ public interface Status { * Informs whether the task has failed or not. A "failed" state is always indicitive of a "completed" state. */ Boolean isFailed() + + /** + * Informs whether a failed task is retryable or not. + */ + Boolean isRetryable() } diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/Task.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/Task.groovy deleted file mode 100644 index 385aed197ee..00000000000 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/Task.groovy +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.spinnaker.clouddriver.data.task - -/** - * This interface represents the state of a given execution. Implementations must allow for updating and completing/failing - * status, as well as providing the start time of the task. - * - * - */ -public interface Task { - /** - * A unique identifier for the task, which can be used to retrieve it at a later time. - */ - String getId() - - /** - * A client-provided ID used for de-duplication. - */ - String getRequestId() - - /** - * A list of result objects that are serialized back to the caller - */ - List getResultObjects() - - /** - * This method is used to add results objects to the Task - * @param results - */ - void addResultObjects(Listresults) - - /** - * A comprehensive history of this task's execution. - */ - List getHistory() - - /** - * The id of the clouddriver instance that submitted this task - */ - String getOwnerId() - - /** - * This method is used to update the status of the Task with given phase and status strings. - * @param phase - * @param status - */ - void updateStatus(String phase, String status) - - /** - * This method will complete the task and will represent completed = true from the Task's {@link #getStatus()} method. - */ - void complete() - - /** - * This method will fail the task and will represent completed = true and failed = true from the Task's - * {@link #getStatus()} method. - */ - void fail() - - /** - * This method will return the current status of the task. - * @see Status - */ - Status getStatus() - - /** - * This returns the start time of the Task's execution in milliseconds since epoch form. - */ - long getStartTimeMs() -} diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/Task.java b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/Task.java new file mode 100644 index 00000000000..2a9a1d2689d --- /dev/null +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/Task.java @@ -0,0 +1,95 @@ +package com.netflix.spinnaker.clouddriver.data.task; + +import java.util.List; +import javax.annotation.Nonnull; + +/** + * This interface represents the state of a given execution. Implementations must allow for updating + * and completing/failing status, as well as providing the start time of the task. + */ +public interface Task { + /** A unique identifier for the task, which can be used to retrieve it at a later time. */ + String getId(); + + /** A client-provided ID used for de-duplication. */ + String getRequestId(); + + /** A list of result objects that are serialized back to the caller */ + List getResultObjects(); + + /** + * This method is used to add results objects to the Task + * + * @param results + */ + void addResultObjects(List results); + + /** A comprehensive history of this task's execution. */ + List getHistory(); + + /** The id of the clouddriver instance that submitted this task */ + String getOwnerId(); + + /** + * This method is used to update the status of the Task with given phase and status strings. + * + * @param phase + * @param status + */ + void updateStatus(String phase, String status); + + /** + * This method will complete the task and will represent completed = true from the Task's {@link + * #getStatus()} method. + */ + void complete(); + + /** + * This method will fail the task and will represent completed = true and failed = true from the + * Task's {@link #getStatus()} method. + * + * @deprecated Use `fail(boolean)` instead + */ + @Deprecated + void fail(); + + /** + * This method will fail the task and will represent completed = true and failed = true from the + * Task's {@link #getStatus()} method. + * + * @param retryable If true, the failed state will be marked as retryable (sagas only) + */ + void fail(boolean retryable); + + /** + * This method will return the current status of the task. + * + * @see Status + */ + Status getStatus(); + + /** This returns the start time of the Task's execution in milliseconds since epoch form. */ + long getStartTimeMs(); + + /** + * Add a Saga to this Task. More than one Saga can be associated with a Task. + * + * @param sagaId The Saga name/id pair + */ + void addSagaId(@Nonnull SagaId sagaId); + + /** Returns true if any Sagas have been associated with this Task. */ + boolean hasSagaIds(); + + /** A list of Sagas associated with this Task, if any. */ + @Nonnull + List getSagaIds(); + + /** Returns true if the Task is retryable (in the case of a failure) */ + default boolean isRetryable() { + if (!hasSagaIds()) { + return false; + } + return getStatus().isFailed() && getStatus().isRetryable(); + } +} diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/TaskState.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/TaskState.groovy index 68864e233fd..bde80e17008 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/TaskState.groovy +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/TaskState.groovy @@ -19,13 +19,18 @@ package com.netflix.spinnaker.clouddriver.data.task enum TaskState { STARTED, COMPLETED, - FAILED + FAILED, + FAILED_RETRYABLE boolean isCompleted() { this != STARTED } boolean isFailed() { - this == FAILED + this == FAILED || this == FAILED_RETRYABLE + } + + boolean isRetryable() { + this == FAILED_RETRYABLE } } diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/jedis/JedisTask.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/jedis/JedisTask.groovy index e65e37994fb..db38205266f 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/jedis/JedisTask.groovy +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/jedis/JedisTask.groovy @@ -17,11 +17,14 @@ package com.netflix.spinnaker.clouddriver.data.task.jedis import com.fasterxml.jackson.annotation.JsonIgnore +import com.netflix.spinnaker.clouddriver.data.task.SagaId import com.netflix.spinnaker.clouddriver.data.task.Status import com.netflix.spinnaker.clouddriver.data.task.Task import com.netflix.spinnaker.clouddriver.data.task.TaskState import groovy.util.logging.Slf4j +import javax.annotation.Nonnull + @Slf4j class JedisTask implements Task { @@ -32,15 +35,24 @@ class JedisTask implements Task { final long startTimeMs final String ownerId final String requestId + final List sagaIds @JsonIgnore final boolean previousRedis - JedisTask(String id, long startTimeMs, RedisTaskRepository repository, String ownerId, boolean previousRedis) { + JedisTask( + String id, + long startTimeMs, + RedisTaskRepository repository, + String ownerId, + List sagaIds, + boolean previousRedis + ) { this.id = id this.startTimeMs = startTimeMs this.repository = repository this.ownerId = ownerId + this.sagaIds = sagaIds this.previousRedis = previousRedis } @@ -57,12 +69,22 @@ class JedisTask implements Task { repository.addToHistory(repository.currentState(this).update(TaskState.COMPLETED), this) } + @Deprecated @Override void fail() { checkMutable() repository.addToHistory(repository.currentState(this).update(TaskState.FAILED), this) } + @Override + void fail(boolean retryable) { + checkMutable() + repository.addToHistory( + repository.currentState(this).update(retryable ? TaskState.FAILED_RETRYABLE : TaskState.FAILED), + this + ) + } + @Override public void addResultObjects(List results) { checkMutable() @@ -95,6 +117,16 @@ class JedisTask implements Task { repository.currentState(this) } + @Override + void addSagaId(@Nonnull SagaId sagaId) { + this.sagaIds.add(sagaId) + } + + @Override + boolean hasSagaIds() { + return !sagaIds.isEmpty() + } + private void checkMutable() { if (previousRedis) { throw new IllegalStateException("Read-only task") diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/jedis/RedisTaskRepository.java b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/jedis/RedisTaskRepository.java index 51f958bf72f..81b6ba8d10e 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/jedis/RedisTaskRepository.java +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/data/task/jedis/RedisTaskRepository.java @@ -22,13 +22,16 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.netflix.spinnaker.clouddriver.core.ClouddriverHostname; import com.netflix.spinnaker.clouddriver.data.task.DefaultTaskStatus; +import com.netflix.spinnaker.clouddriver.data.task.SagaId; import com.netflix.spinnaker.clouddriver.data.task.Status; import com.netflix.spinnaker.clouddriver.data.task.Task; import com.netflix.spinnaker.clouddriver.data.task.TaskDisplayStatus; import com.netflix.spinnaker.clouddriver.data.task.TaskRepository; import com.netflix.spinnaker.clouddriver.data.task.TaskState; +import com.netflix.spinnaker.kork.exceptions.SystemException; import com.netflix.spinnaker.kork.jedis.RedisClientDelegate; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -52,6 +55,8 @@ public class RedisTaskRepository implements TaskRepository { private static final String TASK_KEY_MAP = "kato:taskmap"; private static final TypeReference> HISTORY_TYPE = new TypeReference>() {}; + private static final TypeReference> SAGA_IDS_TYPE = + new TypeReference>() {}; private static final int TASK_TTL = (int) TimeUnit.HOURS.toSeconds(12); @@ -84,7 +89,13 @@ public Task create(String phase, String status, String clientRequestId) { String taskId = UUID.randomUUID().toString(); JedisTask task = - new JedisTask(taskId, System.currentTimeMillis(), this, ClouddriverHostname.ID, false); + new JedisTask( + taskId, + System.currentTimeMillis(), + this, + ClouddriverHostname.ID, + new ArrayList<>(), + false); addToHistory(DefaultTaskStatus.create(phase, status, TaskState.STARTED), task); set(taskId, task); Long newTask = @@ -133,11 +144,23 @@ public Task get(String id) { } } if (taskMap.containsKey("id") && taskMap.containsKey("startTimeMs")) { + List sagaIds; + if (taskMap.containsKey("sagaIds")) { + try { + sagaIds = mapper.readValue(taskMap.get("sagaIds"), SAGA_IDS_TYPE); + } catch (IOException e) { + throw new SystemException("Could not deserialize sagaIds key", e); + } + } else { + sagaIds = new ArrayList<>(); + } + return new JedisTask( taskMap.get("id"), Long.parseLong(taskMap.get("startTimeMs")), this, taskMap.get("ownerId"), + sagaIds, oldTask); } return null; @@ -202,6 +225,11 @@ public void set(String id, JedisTask task) { data.put("id", task.getId()); data.put("startTimeMs", Long.toString(task.getStartTimeMs())); data.put("ownerId", task.getOwnerId()); + try { + data.put("sagaIds", mapper.writeValueAsString(task.getSagaIds())); + } catch (JsonProcessingException e) { + throw new SystemException("Failed to serialize saga ids into Task", e); + } retry( () -> redisClientDelegate.withCommandsClient( diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/documentation/Nullable.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/documentation/Nullable.groovy index e367bccf535..ccce2620bb2 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/documentation/Nullable.groovy +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/documentation/Nullable.groovy @@ -24,8 +24,9 @@ import java.lang.annotation.Target /** * Marker annotation for documentation purposes. Methods annotated with @Nullable indicate that the method may return a null value. * - * + * @deprecated Because why use this and not {@code javax.annotation.Nullable}? */ +@Deprecated @Target(ElementType.METHOD) @Retention(RetentionPolicy.SOURCE) @interface Nullable { diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperation.java b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperation.java index 1a236a0711b..9fde5d80994 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperation.java +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperation.java @@ -29,7 +29,7 @@ public interface AtomicOperation { /** * This method will initiate the operation's work. In this, operation's can get a handle on prior - * output results from the requiremed method argument. + * output results from the required method argument. * * @param priorOutputs * @return parameterized type diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationConverter.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationConverter.groovy index cf69a307495..a1a6e34dd31 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationConverter.groovy +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationConverter.groovy @@ -16,7 +16,8 @@ package com.netflix.spinnaker.clouddriver.orchestration -import com.netflix.spinnaker.clouddriver.security.ProviderVersion + +import javax.annotation.Nullable /** * Implementations of this trait will provide an object capable of converting a Map of input parameters to an @@ -29,6 +30,7 @@ trait AtomicOperationConverter implements VersionedCloudProviderOperation { * @param input * @return atomic operation */ + @Nullable abstract AtomicOperation convertOperation(Map input) /** diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationConverterNotFoundException.java b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationConverterNotFoundException.java new file mode 100644 index 00000000000..c26eb860f31 --- /dev/null +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationConverterNotFoundException.java @@ -0,0 +1,33 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.orchestration; + +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +@ResponseStatus(value = HttpStatus.BAD_REQUEST) +public class AtomicOperationConverterNotFoundException extends RuntimeException { + public AtomicOperationConverterNotFoundException() {} + + public AtomicOperationConverterNotFoundException(String message) {} + + public AtomicOperationConverterNotFoundException(String message, Throwable cause) {} + + public AtomicOperationConverterNotFoundException(Throwable cause) {} + + protected AtomicOperationConverterNotFoundException( + String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {} +} diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationException.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationException.groovy deleted file mode 100644 index 7f175917495..00000000000 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationException.groovy +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Copyright 2016 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License") - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.spinnaker.clouddriver.orchestration - -import com.netflix.spinnaker.kork.exceptions.HasAdditionalAttributes -import org.springframework.http.HttpStatus -import org.springframework.web.bind.annotation.ResponseStatus - -@ResponseStatus(HttpStatus.BAD_REQUEST) -class AtomicOperationException extends RuntimeException implements HasAdditionalAttributes { - List errors - - AtomicOperationException(String message, List errors) { - super(message) - this.errors = errors - } - - @Override - Map getAdditionalAttributes() { - return errors ? ["errors": errors] : [:] - } -} diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationException.java b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationException.java new file mode 100644 index 00000000000..87bbda475c4 --- /dev/null +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationException.java @@ -0,0 +1,52 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.orchestration; + +import com.netflix.spinnaker.kork.exceptions.SpinnakerException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +@ResponseStatus(HttpStatus.BAD_REQUEST) +public class AtomicOperationException extends SpinnakerException { + public AtomicOperationException(String message, List errors) { + super(message); + this.errors = errors; + } + + @Override + public Map getAdditionalAttributes() { + if (errors == null || errors.isEmpty()) { + return Collections.emptyMap(); + } + Map map = new HashMap<>(); + map.put("errors", errors); + return map; + } + + public List getErrors() { + return errors; + } + + public void setErrors(List errors) { + this.errors = errors; + } + + private List errors; +} diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationNotFoundException.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationNotFoundException.groovy deleted file mode 100644 index 1f0eec2b1ac..00000000000 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationNotFoundException.groovy +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.spinnaker.clouddriver.orchestration - -import groovy.transform.InheritConstructors -import org.springframework.http.HttpStatus -import org.springframework.web.bind.annotation.ResponseStatus - -@ResponseStatus(value = HttpStatus.BAD_REQUEST, reason = "Could not find a suitable converter for supplied type.") -@InheritConstructors -class AtomicOperationNotFoundException extends RuntimeException {} diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationNotFoundException.java b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationNotFoundException.java new file mode 100644 index 00000000000..94dd2d26c07 --- /dev/null +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationNotFoundException.java @@ -0,0 +1,35 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.orchestration; + +import org.springframework.http.HttpStatus; +import org.springframework.web.bind.annotation.ResponseStatus; + +@ResponseStatus( + value = HttpStatus.BAD_REQUEST, + reason = "Could not find a suitable converter for supplied type.") +public class AtomicOperationNotFoundException extends RuntimeException { + public AtomicOperationNotFoundException() {} + + public AtomicOperationNotFoundException(String message) {} + + public AtomicOperationNotFoundException(String message, Throwable cause) {} + + public AtomicOperationNotFoundException(Throwable cause) {} + + protected AtomicOperationNotFoundException( + String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {} +} diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationsRegistry.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationsRegistry.groovy index 7369975c5d6..577e9c91df7 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationsRegistry.groovy +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/AtomicOperationsRegistry.groovy @@ -19,6 +19,8 @@ package com.netflix.spinnaker.clouddriver.orchestration import com.netflix.spinnaker.clouddriver.deploy.DescriptionValidator import com.netflix.spinnaker.clouddriver.security.ProviderVersion +import javax.annotation.Nullable + /** * A registry which does a lookup of AtomicOperationConverters and DescriptionValidators based on their names and * cloud providers @@ -42,5 +44,5 @@ interface AtomicOperationsRegistry { * @param providerVersion * @return */ - DescriptionValidator getAtomicOperationDescriptionValidator(String validator, String cloudProvider, ProviderVersion version) + @Nullable DescriptionValidator getAtomicOperationDescriptionValidator(String validator, String cloudProvider, ProviderVersion version) } diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/DefaultOrchestrationProcessor.groovy b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/DefaultOrchestrationProcessor.groovy index dfec85a0e36..d9bbbb88c22 100644 --- a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/DefaultOrchestrationProcessor.groovy +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/DefaultOrchestrationProcessor.groovy @@ -25,11 +25,12 @@ import com.netflix.spinnaker.clouddriver.orchestration.events.OperationEvent import com.netflix.spinnaker.clouddriver.orchestration.events.OperationEventHandler import com.netflix.spinnaker.kork.exceptions.ExceptionSummary import com.netflix.spinnaker.security.AuthenticatedRequest +import groovy.transform.Canonical import groovy.util.logging.Slf4j import org.slf4j.MDC -import org.springframework.beans.factory.annotation.Autowired import org.springframework.context.ApplicationContext +import javax.annotation.Nonnull import java.util.concurrent.ExecutorService import java.util.concurrent.SynchronousQueue import java.util.concurrent.ThreadPoolExecutor @@ -52,20 +53,28 @@ class DefaultOrchestrationProcessor implements OrchestrationProcessor { } } - @Autowired - TaskRepository taskRepository - - @Autowired - ApplicationContext applicationContext - - @Autowired - Registry registry - - @Autowired(required = false) - Collection operationEventHandlers = [] - - @Autowired - ObjectMapper objectMapper + private final TaskRepository taskRepository + private final ApplicationContext applicationContext + private final Registry registry + private final Collection operationEventHandlers + private final ObjectMapper objectMapper + private final ExceptionClassifier exceptionClassifier + + DefaultOrchestrationProcessor( + TaskRepository taskRepository, + ApplicationContext applicationContext, + Registry registry, + Optional> operationEventHandlers, + ObjectMapper objectMapper, + ExceptionClassifier exceptionClassifier + ) { + this.taskRepository = taskRepository + this.applicationContext = applicationContext + this.registry = registry + this.operationEventHandlers = operationEventHandlers.orElse([]) + this.objectMapper = objectMapper + this.exceptionClassifier = exceptionClassifier + } @Override Task process(List atomicOperations, String clientRequestId) { @@ -73,13 +82,15 @@ class DefaultOrchestrationProcessor implements OrchestrationProcessor { def orchestrationsId = registry.createId('orchestrations') def atomicOperationId = registry.createId('operations') def tasksId = registry.createId('tasks') - def existingTask = taskRepository.getByClientRequestId(clientRequestId) - if (existingTask) { - // TODO(rz): This branch will need some love for Sagas: If the task exists but the Saga has not been completed, - // we'll want to resume. This work will be handled in a separate PR when Orca is wired up to understand Sagas. - return existingTask + + // Get the task (either an existing one, or a new one). If the task already exists, `shouldExecute` will be false + // if the task is in a non-failed state, or is not retryable. + def result = getTask(clientRequestId) + def task = result.task + if (!result.shouldExecute) { + return task } - def task = taskRepository.create(TASK_PHASE, "Initializing Orchestration Task...", clientRequestId) + def operationClosure = { try { // Autowire the atomic operations @@ -110,7 +121,7 @@ class DefaultOrchestrationProcessor implements OrchestrationProcessor { } catch (AtomicOperationException e) { task.updateStatus TASK_PHASE, "Orchestration failed: ${atomicOperation.class.simpleName} | ${e.class.simpleName}: [${e.errors.join(', ')}]" task.addResultObjects([extractExceptionSummary(e, e.errors.join(", "), [operation: atomicOperation.class.simpleName])]) - task.fail() + failTask(task, e) } catch (e) { def message = e.message def stringWriter = new StringWriter() @@ -124,7 +135,7 @@ class DefaultOrchestrationProcessor implements OrchestrationProcessor { task.addResultObjects([extractExceptionSummary(e, message, [operation: atomicOperation.class.simpleName])]) log.error(stackTrace) - task.fail() + failTask(task, e) } } task.addResultObjects(results.findResults { it }) @@ -137,14 +148,14 @@ class DefaultOrchestrationProcessor implements OrchestrationProcessor { if (e instanceof TimeoutException) { task.updateStatus "INIT", "Orchestration timed out." task.addResultObjects([extractExceptionSummary(e, "Orchestration timed out.")]) - task.fail() + failTask(task, e) } else { def stringWriter = new StringWriter() def printWriter = new PrintWriter(stringWriter) e.printStackTrace(printWriter) task.updateStatus("INIT", "Unknown failure -- ${stringWriter.toString()}") task.addResultObjects([extractExceptionSummary(e, "Failed for unknown reason.")]) - task.fail() + failTask(task, e) } } finally { if (!task.status?.isCompleted()) { @@ -201,4 +212,35 @@ class DefaultOrchestrationProcessor implements OrchestrationProcessor { summary.putAll(additionalFields) return summary } + + @Nonnull + private GetTaskResult getTask(String clientRequestId) { + def existingTask = taskRepository.getByClientRequestId(clientRequestId) + if (existingTask) { + if (!existingTask.isRetryable()) { + return new GetTaskResult(existingTask, false) + } + existingTask.updateStatus(TASK_PHASE, "Re-initializing Orchestration Task") + return new GetTaskResult(existingTask, true) + } + return new GetTaskResult( + taskRepository.create(TASK_PHASE, "Initializing Orchestration Task", clientRequestId), + true + ) + } + + private void failTask(@Nonnull Task task, @Nonnull Exception e) { + if (task.hasSagaIds()) { + task.fail(exceptionClassifier.isRetryable(e)) + } else { + // Tasks that are not Saga-backed are automatically assumed to not be retryable. + task.fail(false) + } + } + + @Canonical + private static class GetTaskResult { + Task task + boolean shouldExecute + } } diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/ExceptionClassifier.java b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/ExceptionClassifier.java new file mode 100644 index 00000000000..b49061df9da --- /dev/null +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/ExceptionClassifier.java @@ -0,0 +1,44 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.orchestration; + +import com.netflix.spinnaker.clouddriver.config.ExceptionClassifierConfigurationProperties; +import com.netflix.spinnaker.kork.exceptions.SpinnakerException; +import java.util.Optional; +import javax.annotation.Nonnull; +import org.springframework.stereotype.Component; + +/** + * Utility class to allow classifying non-SpinnakerException classes according to different + * pre-determined characteristics. + */ +@Component +public class ExceptionClassifier { + + private final ExceptionClassifierConfigurationProperties properties; + + public ExceptionClassifier(ExceptionClassifierConfigurationProperties properties) { + this.properties = properties; + } + + /** Returns whether or not a given Exception is retryable or not. */ + public boolean isRetryable(@Nonnull Exception e) { + if (e instanceof SpinnakerException) { + return Optional.ofNullable(((SpinnakerException) e).getRetryable()).orElse(false); + } + return !properties.getNonRetryableClasses().contains(e.getClass().getName()); + } +} diff --git a/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/OperationsService.java b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/OperationsService.java new file mode 100644 index 00000000000..63ee9d9dc05 --- /dev/null +++ b/clouddriver-core/src/main/groovy/com/netflix/spinnaker/clouddriver/orchestration/OperationsService.java @@ -0,0 +1,258 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.orchestration; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Splitter; +import com.netflix.spectator.api.Id; +import com.netflix.spectator.api.Registry; +import com.netflix.spinnaker.clouddriver.deploy.DescriptionAuthorizer; +import com.netflix.spinnaker.clouddriver.deploy.DescriptionValidationErrors; +import com.netflix.spinnaker.clouddriver.deploy.DescriptionValidationException; +import com.netflix.spinnaker.clouddriver.deploy.DescriptionValidator; +import com.netflix.spinnaker.clouddriver.security.AccountCredentials; +import com.netflix.spinnaker.clouddriver.security.AccountCredentialsRepository; +import com.netflix.spinnaker.clouddriver.security.AllowedAccountsValidator; +import com.netflix.spinnaker.clouddriver.security.ProviderVersion; +import com.netflix.spinnaker.kork.exceptions.SystemException; +import com.netflix.spinnaker.security.AuthenticatedRequest; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.Data; +import lombok.Value; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ResolvableType; +import org.springframework.validation.Errors; + +@Slf4j +public class OperationsService { + + private final Splitter COMMA_SPLITTER = Splitter.on(","); + + private final AtomicOperationsRegistry atomicOperationsRegistry; + private final DescriptionAuthorizer descriptionAuthorizer; + private final Collection allowedAccountValidators; + private final List + atomicOperationDescriptionPreProcessors; + private final AccountCredentialsRepository accountCredentialsRepository; + private final Registry registry; + private final ObjectMapper objectMapper; + + private final Id validationErrorsCounterId; + + public OperationsService( + AtomicOperationsRegistry atomicOperationsRegistry, + DescriptionAuthorizer descriptionAuthorizer, + Optional> allowedAccountValidators, + Optional> + atomicOperationDescriptionPreProcessors, + AccountCredentialsRepository accountCredentialsRepository, + Registry registry, + ObjectMapper objectMapper) { + this.atomicOperationsRegistry = atomicOperationsRegistry; + this.descriptionAuthorizer = descriptionAuthorizer; + this.allowedAccountValidators = allowedAccountValidators.orElse(Collections.emptyList()); + this.atomicOperationDescriptionPreProcessors = + atomicOperationDescriptionPreProcessors.orElse(Collections.emptyList()); + this.accountCredentialsRepository = accountCredentialsRepository; + this.registry = registry; + this.objectMapper = objectMapper; + + validationErrorsCounterId = registry.createId("validationErrors"); + } + + @Nonnull + public List collectAtomicOperations(@Nonnull List> inputs) { + return collectAtomicOperations(null, inputs); + } + + @Nonnull + public List collectAtomicOperations( + @Nullable String cloudProvider, @Nonnull List> inputs) { + List results = convert(cloudProvider, inputs); + + List atomicOperations = new ArrayList<>(); + results.forEach( + bindingResult -> { + if (bindingResult.errors.hasErrors()) { + throw new DescriptionValidationException(bindingResult.errors); + } + atomicOperations.add(bindingResult.atomicOperation); + }); + return atomicOperations; + } + + private List convert( + @Nullable String cloudProvider, @Nonnull List> inputs) { + + String username = AuthenticatedRequest.getSpinnakerUser().orElse("unknown"); + List allowedAccounts = + COMMA_SPLITTER.splitToList(AuthenticatedRequest.getSpinnakerAccounts().orElse("")); + + List descriptions = new ArrayList<>(); + return inputs.stream() + .flatMap( + input -> + input.entrySet().stream() + .map( + e -> { + final String descriptionName = e.getKey(); + final Map descriptionInput = e.getValue(); + final OperationInput operationInput = + objectMapper.convertValue(descriptionInput, OperationInput.class); + final String provider = + Optional.ofNullable(cloudProvider) + .orElse(operationInput.cloudProvider); + + ProviderVersion providerVersion = getOperationVersion(operationInput); + + AtomicOperationConverter converter = + atomicOperationsRegistry.getAtomicOperationConverter( + descriptionName, provider, providerVersion); + + Map processedInput = + processDescriptionInput( + atomicOperationDescriptionPreProcessors, + converter, + descriptionInput); + + Object description = converter.convertDescription(processedInput); + + descriptions.add(description); + + DescriptionValidationErrors errors = + new DescriptionValidationErrors(description); + + DescriptionValidator validator = + atomicOperationsRegistry.getAtomicOperationDescriptionValidator( + DescriptionValidator.getValidatorName(descriptionName), + provider, + providerVersion); + + if (validator == null) { + String operationName = + Optional.ofNullable(description) + .map(it -> it.getClass().getSimpleName()) + .orElse("UNKNOWN"); + log.warn( + "No validator found for operation {} and cloud provider {}", + operationName, + provider); + } else { + // TODO(rz): Assert description is T + validator.validate(descriptions, description, errors); + } + + allowedAccountValidators.forEach( + it -> { + it.validate(username, allowedAccounts, description, errors); + }); + + // TODO(rz): Assert `description` is T + descriptionAuthorizer.authorize(description, errors); + + AtomicOperation atomicOperation = + converter.convertOperation(descriptionInput); + if (atomicOperation == null) { + throw new AtomicOperationNotFoundException(descriptionName); + } + + if (errors.hasErrors()) { + registry + .counter( + validationErrorsCounterId.withTag( + "operation", atomicOperation.getClass().getSimpleName())) + .increment(); + } + + return new AtomicOperationBindingResult(atomicOperation, errors); + })) + .collect(Collectors.toList()); + } + + private ProviderVersion getOperationVersion(OperationInput operation) { + final String accountName = operation.computeAccountName(); + if (accountName == null) { + log.warn("Unable to get account name from operation: {}", operation); + } else { + try { + AccountCredentials credentials = accountCredentialsRepository.getOne(accountName); + return credentials.getProviderVersion(); + } catch (Exception e) { + log.warn("Unable to determine provider version for account {}", accountName, e); + } + } + return ProviderVersion.v1; + } + + /** + * Runs the provided descriptionInput through preprocessors. + * + *

Which preprocessors are used is determined by doing some reflection on the + * AtomicOperationConverter's return type. + */ + private static Map processDescriptionInput( + Collection descriptionPreProcessors, + AtomicOperationConverter converter, + Map descriptionInput) { + + Method convertDescriptionMethod; + try { + convertDescriptionMethod = converter.getClass().getMethod("convertDescription", Map.class); + } catch (NoSuchMethodException e) { + throw new SystemException("Could not find convertDescription method on converter", e); + } + + Class convertDescriptionReturnType = + ResolvableType.forMethodReturnType(convertDescriptionMethod).getRawClass(); + + for (AtomicOperationDescriptionPreProcessor preProcessor : descriptionPreProcessors) { + if (preProcessor.supports(convertDescriptionReturnType)) { + descriptionInput = preProcessor.process(descriptionInput); + } + } + + return descriptionInput; + } + + @Value + public static class AtomicOperationBindingResult { + private AtomicOperation atomicOperation; + private Errors errors; + } + + @Data + private static class OperationInput { + @Nullable private String credentials; + @Nullable private String accountName; + @Nullable private String account; + @Nullable private String cloudProvider; + + @Nullable + public String computeAccountName() { + return Optional.ofNullable(credentials) + .orElse(Optional.ofNullable(accountName).orElse(account)); + } + } +} diff --git a/clouddriver-core/src/test/groovy/com/netflix/spinnaker/clouddriver/data/task/jedis/JedisTaskSpec.groovy b/clouddriver-core/src/test/groovy/com/netflix/spinnaker/clouddriver/data/task/jedis/JedisTaskSpec.groovy index b0ed15d2ad1..00358c4e6fe 100644 --- a/clouddriver-core/src/test/groovy/com/netflix/spinnaker/clouddriver/data/task/jedis/JedisTaskSpec.groovy +++ b/clouddriver-core/src/test/groovy/com/netflix/spinnaker/clouddriver/data/task/jedis/JedisTaskSpec.groovy @@ -18,7 +18,6 @@ package com.netflix.spinnaker.clouddriver.data.task.jedis import com.netflix.spinnaker.clouddriver.data.task.DefaultTaskStatus import com.netflix.spinnaker.clouddriver.data.task.Status -import com.netflix.spinnaker.clouddriver.data.task.Task import com.netflix.spinnaker.clouddriver.data.task.TaskState import spock.lang.Shared import spock.lang.Specification @@ -37,7 +36,7 @@ class JedisTaskSpec extends Specification { void setup() { repository = Mock(RedisTaskRepository) - task = new JedisTask('666', System.currentTimeMillis(), repository, "owner", false) + task = new JedisTask('666', System.currentTimeMillis(), repository, "owner", [], false) } void 'updating task status adds a history entry'() { diff --git a/clouddriver-core/src/test/groovy/com/netflix/spinnaker/clouddriver/orchestration/DefaultOrchestrationProcessorSpec.groovy b/clouddriver-core/src/test/groovy/com/netflix/spinnaker/clouddriver/orchestration/DefaultOrchestrationProcessorSpec.groovy index c5f63752c3b..acb7ca04892 100644 --- a/clouddriver-core/src/test/groovy/com/netflix/spinnaker/clouddriver/orchestration/DefaultOrchestrationProcessorSpec.groovy +++ b/clouddriver-core/src/test/groovy/com/netflix/spinnaker/clouddriver/orchestration/DefaultOrchestrationProcessorSpec.groovy @@ -17,8 +17,10 @@ package com.netflix.spinnaker.clouddriver.orchestration import com.fasterxml.jackson.databind.ObjectMapper -import com.netflix.spectator.api.Spectator +import com.netflix.spectator.api.NoopRegistry +import com.netflix.spinnaker.clouddriver.config.ExceptionClassifierConfigurationProperties import com.netflix.spinnaker.clouddriver.data.task.DefaultTask +import com.netflix.spinnaker.clouddriver.data.task.SagaId import com.netflix.spinnaker.clouddriver.data.task.TaskRepository import com.netflix.spinnaker.security.AuthenticatedRequest import org.slf4j.MDC @@ -27,6 +29,7 @@ import org.springframework.context.ApplicationContext import spock.lang.Shared import spock.lang.Specification import spock.lang.Subject +import spock.lang.Unroll import java.util.concurrent.TimeUnit @@ -44,14 +47,21 @@ class DefaultOrchestrationProcessorSpec extends Specification { def setup() { taskKey = UUID.randomUUID().toString() - processor = new DefaultOrchestrationProcessor() + + taskRepository = Mock(TaskRepository) applicationContext = Mock(ApplicationContext) applicationContext.getAutowireCapableBeanFactory() >> Mock(AutowireCapableBeanFactory) - taskRepository = Mock(TaskRepository) - processor.applicationContext = applicationContext - processor.taskRepository = taskRepository - processor.registry = Spectator.globalRegistry() - processor.objectMapper = new ObjectMapper( ) + + processor = new DefaultOrchestrationProcessor( + taskRepository, + applicationContext, + new NoopRegistry(), + Optional.empty(), + new ObjectMapper(), + new ExceptionClassifier(new ExceptionClassifierConfigurationProperties( + nonRetryableClasses: [NonRetryableException.class.getName()] + )) + ) } void "complete the task when everything goes as planned"() { @@ -68,9 +78,13 @@ class DefaultOrchestrationProcessorSpec extends Specification { !task.status.isFailed() } - void "fail the task when exception is thrown"() { + @Unroll + void "fail the task when exception is thrown (#exception.class.simpleName, #sagaId)"() { setup: def task = new DefaultTask("1") + if (sagaId) { + task.sagaIdentifiers.add(sagaId) + } def atomicOperation = Mock(AtomicOperation) when: @@ -78,8 +92,16 @@ class DefaultOrchestrationProcessorSpec extends Specification { then: 1 * taskRepository.create(_, _, taskKey) >> task - 1 * atomicOperation.operate(_) >> { throw new RuntimeException() } + 1 * atomicOperation.operate(_) >> { throw exception } task.status.isFailed() + task.status.retryable == retryable + + where: + exception | sagaId || retryable + new RuntimeException() | null || false + new NonRetryableException() | null || false + new RuntimeException() | new SagaId("a", "a") || true + new NonRetryableException() | new SagaId("a", "a") || false } void "failure should be logged in the result objects"() { @@ -134,4 +156,6 @@ class DefaultOrchestrationProcessorSpec extends Specification { processor.executorService.shutdown() processor.executorService.awaitTermination(5, TimeUnit.SECONDS) } + + private static class NonRetryableException extends RuntimeException {} } diff --git a/clouddriver-web/src/test/groovy/com/netflix/spinnaker/clouddriver/controllers/OperationsControllerSpec.groovy b/clouddriver-core/src/test/groovy/com/netflix/spinnaker/clouddriver/orchestration/OperationsServiceSpec.groovy similarity index 59% rename from clouddriver-web/src/test/groovy/com/netflix/spinnaker/clouddriver/controllers/OperationsControllerSpec.groovy rename to clouddriver-core/src/test/groovy/com/netflix/spinnaker/clouddriver/orchestration/OperationsServiceSpec.groovy index 4f802952fd2..929ffa77600 100644 --- a/clouddriver-web/src/test/groovy/com/netflix/spinnaker/clouddriver/controllers/OperationsControllerSpec.groovy +++ b/clouddriver-core/src/test/groovy/com/netflix/spinnaker/clouddriver/orchestration/OperationsServiceSpec.groovy @@ -1,11 +1,11 @@ /* - * Copyright 2014 Netflix, Inc. + * Copyright 2019 Netflix, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -13,71 +13,64 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package com.netflix.spinnaker.clouddriver.controllers +package com.netflix.spinnaker.clouddriver.orchestration import com.fasterxml.jackson.databind.ObjectMapper import com.netflix.spectator.api.NoopRegistry -import com.netflix.spinnaker.clouddriver.data.task.Task import com.netflix.spinnaker.clouddriver.deploy.DeployDescription import com.netflix.spinnaker.clouddriver.deploy.DescriptionAuthorizer -import com.netflix.spinnaker.clouddriver.orchestration.AnnotationsBasedAtomicOperationsRegistry -import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperation -import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationConverter -import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationDescriptionPreProcessor -import com.netflix.spinnaker.clouddriver.orchestration.OrchestrationProcessor -import com.netflix.spinnaker.clouddriver.security.config.SecurityConfig +import com.netflix.spinnaker.clouddriver.security.AccountCredentialsRepository import org.springframework.context.annotation.AnnotationConfigApplicationContext import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration -import org.springframework.http.MediaType -import org.springframework.test.web.servlet.request.MockMvcRequestBuilders -import org.springframework.test.web.servlet.setup.MockMvcBuilders import spock.lang.Shared import spock.lang.Specification +import spock.lang.Subject import spock.lang.Unroll -class OperationsControllerSpec extends Specification { - def descriptionAuthorizer = Mock(DescriptionAuthorizer) - - void "controller takes many operation descriptions, resolves them from the spring context, and executes them in order"() { - setup: - """ - AtomicOperationConverter beans must be registered in the application context, with the bean name that corresponds to the key - that is describing them in the request. For example, a description that looks like this: - { "desc1": {} } - will go to the Spring context for a bean named "desc1", and will call the "convertOperation" method on it, with the description as input. - """ - OrchestrationProcessor orchestrationProcessor = Mock(OrchestrationProcessor) - def mvc = MockMvcBuilders.standaloneSetup( - new OperationsController( - orchestrationProcessor: orchestrationProcessor, - descriptionAuthorizer: descriptionAuthorizer, - atomicOperationsRegistry: new AnnotationsBasedAtomicOperationsRegistry( - applicationContext: new AnnotationConfigApplicationContext(TestConfig), - cloudProviders: [] - ), - opsSecurityConfigProps: new SecurityConfig.OperationsSecurityConfigurationProperties() - )).build() - +class OperationsServiceSpec extends Specification { + + DescriptionAuthorizer descriptionAuthorizer = Mock(DescriptionAuthorizer) + + @Subject + OperationsService operationsService = new OperationsService( + new AnnotationsBasedAtomicOperationsRegistry( + applicationContext: new AnnotationConfigApplicationContext(TestConfig), + cloudProviders: [] + ), + descriptionAuthorizer, + Optional.empty(), + Optional.empty(), + Mock(AccountCredentialsRepository), + new NoopRegistry(), + new ObjectMapper() + ) + + void "many operation descriptions are resolved and returned in order"() { when: - mvc.perform(MockMvcRequestBuilders.post("/ops").contentType(MediaType.APPLICATION_JSON).content('[ { "desc1": {}, "desc2": {} } ]')).andReturn() + def atomicOperations = operationsService.collectAtomicOperations([[desc1: [:], desc2: [:]]]) then: - "Operations were supplied IN ORDER to the orchestration processor." - 1 * orchestrationProcessor.process(*_) >> { - // The need for this flatten is weird -- seems like a bug in spock. - assert it?.flatten()*.getClass() == [Op1, Op2, String] - Mock(Task) - } - - 2 * descriptionAuthorizer.authorize(_, _) + atomicOperations.flatten()*.getClass() == [Op1, Op2] } - private static class Provider1DeployDescription implements DeployDescription { - } + @Unroll + void "should only pre-process inputs of supported description classes"() { + when: + def output = operationsService.processDescriptionInput( + descriptionPreProcessors as Collection, + converter, + descriptionInput + ) - private static class Provider2DeployDescription implements DeployDescription { + then: + output == expectedOutput + + where: + descriptionPreProcessors | converter | descriptionInput || expectedOutput + [] | new Provider2DeployAtomicOperationConverter() | ["a": "b"] || ["a": "b"] + [provider1PreProcessor] | new Provider2DeployAtomicOperationConverter() | ["a": "b"] || ["a": "b"] + [provider1PreProcessor, provider2PreProcessor] | new Provider2DeployAtomicOperationConverter() | ["provider2": "false"] || ["additionalKey": "additionalVal", "provider2": "true"] } @Shared @@ -104,41 +97,11 @@ class OperationsControllerSpec extends Specification { Map process(Map description) { return new HashMap(description) + [ "additionalKey": "additionalVal", - "provider2" : "true" + "provider2" : "true" ] } } - private static class Provider2DeployAtomicOperationConverter implements AtomicOperationConverter { - @Override - AtomicOperation convertOperation(Map input) { - throw new UnsupportedOperationException() - } - - Provider2DeployDescription convertDescription(Map input) { - return new ObjectMapper().convertValue(input, Provider2DeployDescription) - } - } - - @Unroll - void "should only pre-process inputs of supported description classes"() { - when: - def output = OperationsController.processDescriptionInput( - descriptionPreProcessors as Collection, - converter, - descriptionInput - ) - - then: - output == expectedOutput - - where: - descriptionPreProcessors | converter | descriptionInput || expectedOutput - [] | new Provider2DeployAtomicOperationConverter() | ["a": "b"] || ["a": "b"] - [provider1PreProcessor] | new Provider2DeployAtomicOperationConverter() | ["a": "b"] || ["a": "b"] - [provider1PreProcessor, provider2PreProcessor] | new Provider2DeployAtomicOperationConverter() | ["provider2": "false"] || ["additionalKey": "additionalVal", "provider2": "true"] - } - @Configuration static class TestConfig { @Bean @@ -152,18 +115,24 @@ class OperationsControllerSpec extends Specification { } } - static class Op1 implements AtomicOperation { - Object operate(List priorOutputs) { - return null - } + private static class Provider1DeployDescription implements DeployDescription { } - static class Op2 implements AtomicOperation { - Object operate(List priorOutputs) { - return null + private static class Provider2DeployDescription implements DeployDescription { + } + + private static class Provider2DeployAtomicOperationConverter implements AtomicOperationConverter { + @Override + AtomicOperation convertOperation(Map input) { + throw new UnsupportedOperationException() + } + + Provider2DeployDescription convertDescription(Map input) { + return new ObjectMapper().convertValue(input, Provider2DeployDescription) } } + static class Converter1 implements AtomicOperationConverter { AtomicOperation convertOperation(Map input) { new Op1() @@ -183,4 +152,16 @@ class OperationsControllerSpec extends Specification { return null } } + + static class Op1 implements AtomicOperation { + Object operate(List priorOutputs) { + return null + } + } + + static class Op2 implements AtomicOperation { + Object operate(List priorOutputs) { + return null + } + } } diff --git a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/SagaService.kt b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/SagaService.kt index 69cf87f0ea1..3f7660a9c1a 100644 --- a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/SagaService.kt +++ b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/SagaService.kt @@ -98,6 +98,8 @@ class SagaService( .increment() } } catch (e: Exception) { + // TODO(rz): Add SagaAction.recover() + log.error( "Encountered error while applying action '${action.javaClass.simpleName}' on ${saga.name}/${saga.id}", e) saga.addEvent(SagaActionErrorOccurred( diff --git a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/SagaAction.kt b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/SagaAction.kt index 47950b3810b..75288251b84 100644 --- a/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/SagaAction.kt +++ b/clouddriver-saga/src/main/kotlin/com/netflix/spinnaker/clouddriver/saga/flow/SagaAction.kt @@ -39,6 +39,17 @@ interface SagaAction { */ fun apply(command: T, saga: Saga): Result + /** + * In the event of an exception being raised from [apply], a [SagaAction] can implement custom error handling logic. + * + * By default, nothing happens. + * + * @param command The input [SagaCommand] that was acted on + * @param saga The [Saga] state used to apply the [command] + * @param exception The resulting exception + */ +// fun recover(command: T, saga: Saga, exception: Exception): Result = Result() + /** * @property nextCommand The next [SagaCommand] to run, if any. [ManyCommands] can be used to emit more * than one command if necessary diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/SqlTask.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/SqlTask.kt index ad2c247deb5..7d7fa7eac46 100644 --- a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/SqlTask.kt +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/SqlTask.kt @@ -16,6 +16,7 @@ package com.netflix.spinnaker.clouddriver.sql import com.fasterxml.jackson.annotation.JsonIgnore +import com.netflix.spinnaker.clouddriver.data.task.SagaId import com.netflix.spinnaker.clouddriver.data.task.Status import com.netflix.spinnaker.clouddriver.data.task.Task import com.netflix.spinnaker.clouddriver.data.task.TaskDisplayStatus @@ -31,6 +32,7 @@ class SqlTask( @JsonIgnore internal val ownerId: String, @JsonIgnore internal val requestId: String, @JsonIgnore internal val startTimeMs: Long, + private val sagaIds: MutableList, private val repository: SqlTaskRepository ) : Task { @@ -91,6 +93,25 @@ class SqlTask( repository.updateState(this, TaskState.FAILED) } + override fun fail(retryable: Boolean) { + this.dirty.set(true) + repository.updateState(this, if (retryable) TaskState.FAILED_RETRYABLE else TaskState.FAILED) + } + + override fun addSagaId(sagaId: SagaId) { + this.dirty.set(true) + sagaIds.add(sagaId) + throw UnsupportedOperationException("not implemented") + } + + override fun getSagaIds(): MutableList { + return sagaIds + } + + override fun hasSagaIds(): Boolean { + return sagaIds.isNotEmpty() + } + internal fun hydrateResultObjects(resultObjects: MutableList) { this.dirty.set(false) this.resultObjects = resultObjects diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/SqlTaskRepository.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/SqlTaskRepository.kt index 909aba5c5b7..ba96ede4b98 100644 --- a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/SqlTaskRepository.kt +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/SqlTaskRepository.kt @@ -55,7 +55,7 @@ class SqlTaskRepository( } override fun create(phase: String, status: String, clientRequestId: String): Task { - var task = SqlTask(ulid.nextULID(), ClouddriverHostname.ID, clientRequestId, clock.millis(), this) + var task = SqlTask(ulid.nextULID(), ClouddriverHostname.ID, clientRequestId, clock.millis(), mutableListOf(), this) val historyId = ulid.nextULID() withPool(POOL_NAME) { @@ -69,7 +69,8 @@ class SqlTaskRepository( field("id") to task.id, field("owner_id") to task.ownerId, field("request_id") to task.requestId, - field("created_at") to task.startTimeMs + field("created_at") to task.startTimeMs, + field("saga_ids") to mapper.writeValueAsString(task.sagaIds) ) ctx.insertInto(tasksTable, *pairs.keys.toTypedArray()).values(*pairs.values.toTypedArray()).execute() @@ -193,11 +194,11 @@ class SqlTaskRepository( withPool(POOL_NAME) { jooq.transactional(sqlRetryProperties.transactions) { ctx -> /** - * (select id as task_id, owner_id, request_id, created_at, null as body, null as state, null as phase, null as status from tasks_copy where id = '01D2H4H50VTF7CGBMP0D6HTGTF') + * (select id as task_id, owner_id, request_id, created_at, saga_ids, null as body, null as state, null as phase, null as status from tasks_copy where id = '01D2H4H50VTF7CGBMP0D6HTGTF') * UNION ALL - * (select task_id, null as owner_id, null as request_id, null as created_at, null as body, state, phase, status from task_states_copy where task_id = '01D2H4H50VTF7CGBMP0D6HTGTF') + * (select task_id, null as owner_id, null as request_id, null as created_at, null as saga_ids, null as body, state, phase, status from task_states_copy where task_id = '01D2H4H50VTF7CGBMP0D6HTGTF') * UNION ALL - * (select task_id, null as owner_id, null as request_id, null as created_at, body, null as state, null as phase, null as status from task_results_copy where task_id = '01D2H4H50VTF7CGBMP0D6HTGTF') + * (select task_id, null as owner_id, null as request_id, null as created_at, null as saga_ids, body, null as state, null as phase, null as status from task_results_copy where task_id = '01D2H4H50VTF7CGBMP0D6HTGTF') */ tasks.addAll( ctx @@ -206,6 +207,7 @@ class SqlTaskRepository( field("owner_id"), field("request_id"), field("created_at"), + field("saga_ids"), field(sql("null")).`as`("body"), field(sql("null")).`as`("state"), field(sql("null")).`as`("phase"), @@ -220,6 +222,7 @@ class SqlTaskRepository( field(sql("null")).`as`("owner_id"), field(sql("null")).`as`("request_id"), field(sql("null")).`as`("created_at"), + field(sql("null")).`as`("saga_ids"), field(sql("null")).`as`("body"), field("state"), field("phase"), @@ -235,6 +238,7 @@ class SqlTaskRepository( field(sql("null")).`as`("owner_id"), field(sql("null")).`as`("request_id"), field(sql("null")).`as`("created_at"), + field(sql("null")).`as`("saga_ids"), field("body"), field(sql("null")).`as`("state"), field(sql("null")).`as`("phase"), diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/TaskMapper.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/TaskMapper.kt index 20db6b968c0..576d326e901 100644 --- a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/TaskMapper.kt +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/TaskMapper.kt @@ -15,8 +15,10 @@ */ package com.netflix.spinnaker.clouddriver.sql +import com.fasterxml.jackson.core.type.TypeReference import com.fasterxml.jackson.databind.ObjectMapper import com.netflix.spinnaker.clouddriver.data.task.DefaultTaskStatus +import com.netflix.spinnaker.clouddriver.data.task.SagaId import com.netflix.spinnaker.clouddriver.data.task.Status import com.netflix.spinnaker.clouddriver.data.task.Task import com.netflix.spinnaker.clouddriver.data.task.TaskState @@ -32,6 +34,8 @@ class TaskMapper( companion object { private val log = LoggerFactory.getLogger(TaskMapper::class.java) + + private val SAGA_IDS_TYPE = object : TypeReference>() {} } fun map(rs: ResultSet): Collection { @@ -46,6 +50,7 @@ class TaskMapper( rs.getString("owner_id"), rs.getString("request_id"), rs.getLong("created_at"), + sagaIds(rs.getString("saga_ids")), sqlTaskRepository ).let { tasks[it.id] = it @@ -84,4 +89,11 @@ class TaskMapper( task } } + + private fun sagaIds(sagaIdsValue: String?): MutableList { + if (sagaIdsValue == null) { + return mutableListOf() + } + return mapper.readValue(sagaIdsValue, SAGA_IDS_TYPE) + } } diff --git a/clouddriver-sql/src/main/resources/db/changelog-master.yml b/clouddriver-sql/src/main/resources/db/changelog-master.yml index e70c31c83b4..9e91c9eac71 100644 --- a/clouddriver-sql/src/main/resources/db/changelog-master.yml +++ b/clouddriver-sql/src/main/resources/db/changelog-master.yml @@ -11,3 +11,6 @@ databaseChangeLog: - include: file: changelog/20190822-initial-event-schema.yml relativeToChangelogFile: true +- include: + file: changelog/20190913-task-sagaids.yml + relativeToChangelogFile: true diff --git a/clouddriver-sql/src/main/resources/db/changelog/20190913-task-sagaids.yml b/clouddriver-sql/src/main/resources/db/changelog/20190913-task-sagaids.yml new file mode 100644 index 00000000000..a792c5b3019 --- /dev/null +++ b/clouddriver-sql/src/main/resources/db/changelog/20190913-task-sagaids.yml @@ -0,0 +1,23 @@ +databaseChangeLog: + - changeSet: + id: add-task-sagaids-column + author: robzienert + changes: + - addColumn: + tableName: tasks + columns: + - name: saga_ids + type: text + afterColumn: created_at + rollback: + - dropColumn: + tableName: tasks + columnName: saga_ids + + - changeSet: + id: mysql-update-state-enum-values + author: robzienert + changes: + - sql: + dbms: mysql + sql: ALTER TABLE `task_states` MODIFY COLUMN `state` ENUM("STARTED", "COMPLETED", "FAILED", "FAILED_RETRYABLE") NOT NULL DEFAULT "STARTED" diff --git a/clouddriver-sql/src/test/java/com/netflix/spinnaker/clouddriver/sql/SqlTaskRepositoryTest.java b/clouddriver-sql/src/test/java/com/netflix/spinnaker/clouddriver/sql/SqlTaskRepositoryTest.java index 1b0be066c09..eab212d7664 100644 --- a/clouddriver-sql/src/test/java/com/netflix/spinnaker/clouddriver/sql/SqlTaskRepositoryTest.java +++ b/clouddriver-sql/src/test/java/com/netflix/spinnaker/clouddriver/sql/SqlTaskRepositoryTest.java @@ -44,6 +44,8 @@ protected TaskRepository createTaskRepository() { @After public void cleanup() { - SqlTestUtil.cleanupDb(database.context); + if (database != null) { + SqlTestUtil.cleanupDb(database.context); + } } } diff --git a/clouddriver-titus/src/main/groovy/com/netflix/spinnaker/clouddriver/titus/deploy/handlers/TitusDeployHandler.java b/clouddriver-titus/src/main/groovy/com/netflix/spinnaker/clouddriver/titus/deploy/handlers/TitusDeployHandler.java index 1a3ff919e56..1f1a4739e6f 100644 --- a/clouddriver-titus/src/main/groovy/com/netflix/spinnaker/clouddriver/titus/deploy/handlers/TitusDeployHandler.java +++ b/clouddriver-titus/src/main/groovy/com/netflix/spinnaker/clouddriver/titus/deploy/handlers/TitusDeployHandler.java @@ -1,5 +1,6 @@ package com.netflix.spinnaker.clouddriver.titus.deploy.handlers; +import com.netflix.spinnaker.clouddriver.data.task.SagaId; import com.netflix.spinnaker.clouddriver.data.task.Task; import com.netflix.spinnaker.clouddriver.data.task.TaskRepository; import com.netflix.spinnaker.clouddriver.deploy.DeployDescription; @@ -59,6 +60,11 @@ public TitusDeploymentResult handle( final String sagaName = TitusDeployHandler.class.getSimpleName(); final String sagaId = Optional.ofNullable(getTask().getRequestId()).orElse(getTask().getId()); + // TODO(rz): This is pretty inelegant. Would be better to inject the saga; so the + // AtomicOperation doesn't need the sagaService at all. Just pass the SagaFlow + // and initial commands. + getTask().addSagaId(new SagaId(sagaName, sagaId)); + final TitusDeploymentResult result = sagaService.applyBlocking( sagaName, diff --git a/clouddriver-web/src/main/groovy/com/netflix/spinnaker/clouddriver/controllers/OperationsController.groovy b/clouddriver-web/src/main/groovy/com/netflix/spinnaker/clouddriver/controllers/OperationsController.groovy index db94f395f77..8fd050c3059 100644 --- a/clouddriver-web/src/main/groovy/com/netflix/spinnaker/clouddriver/controllers/OperationsController.groovy +++ b/clouddriver-web/src/main/groovy/com/netflix/spinnaker/clouddriver/controllers/OperationsController.groovy @@ -16,198 +16,160 @@ package com.netflix.spinnaker.clouddriver.controllers -import com.netflix.spectator.api.Registry +import com.fasterxml.jackson.annotation.JsonProperty import com.netflix.spinnaker.clouddriver.data.task.Task -import com.netflix.spinnaker.clouddriver.deploy.DescriptionAuthorizer -import com.netflix.spinnaker.clouddriver.deploy.DescriptionValidationErrors -import com.netflix.spinnaker.clouddriver.deploy.DescriptionValidationException -import com.netflix.spinnaker.clouddriver.deploy.DescriptionValidator +import com.netflix.spinnaker.clouddriver.data.task.TaskRepository import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperation -import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationConverter -import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationDescriptionPreProcessor -import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationNotFoundException -import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationsRegistry +import com.netflix.spinnaker.clouddriver.orchestration.OperationsService import com.netflix.spinnaker.clouddriver.orchestration.OrchestrationProcessor -import com.netflix.spinnaker.clouddriver.security.AccountCredentialsRepository -import com.netflix.spinnaker.clouddriver.security.AllowedAccountsValidator -import com.netflix.spinnaker.clouddriver.security.ProviderVersion -import com.netflix.spinnaker.clouddriver.security.config.SecurityConfig -import com.netflix.spinnaker.security.AuthenticatedRequest -import groovy.transform.Canonical +import com.netflix.spinnaker.kork.web.exceptions.NotFoundException import groovy.util.logging.Slf4j -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.context.MessageSource -import org.springframework.validation.Errors +import org.springframework.beans.factory.annotation.Value +import org.springframework.web.bind.annotation.GetMapping import org.springframework.web.bind.annotation.PathVariable +import org.springframework.web.bind.annotation.PostMapping import org.springframework.web.bind.annotation.RequestBody -import org.springframework.web.bind.annotation.RequestMapping -import org.springframework.web.bind.annotation.RequestMethod import org.springframework.web.bind.annotation.RequestParam import org.springframework.web.bind.annotation.RestController +import javax.annotation.Nonnull +import javax.annotation.Nullable +import javax.annotation.PreDestroy +import java.util.concurrent.TimeUnit + +import static java.lang.String.format + @Slf4j @RestController class OperationsController { - @Autowired MessageSource messageSource - @Autowired OrchestrationProcessor orchestrationProcessor - @Autowired Registry registry - @Autowired (required = false) Collection allowedAccountValidators = [] - @Autowired (required = false) List atomicOperationDescriptionPreProcessors = [] - @Autowired AtomicOperationsRegistry atomicOperationsRegistry - @Autowired SecurityConfig.OperationsSecurityConfigurationProperties opsSecurityConfigProps - @Autowired AccountCredentialsRepository accountCredentialsRepository - - @Autowired DescriptionAuthorizer descriptionAuthorizer - - /* - * APIs - * ---------------------------------------------------------------------------------------------------------------------------- - */ - - /** + private final OperationsService operationsService + private final OrchestrationProcessor orchestrationProcessor + private final TaskRepository taskRepository + private final long shutdownWaitSeconds + + OperationsController( + OperationsService operationsService, + OrchestrationProcessor orchestrationProcessor, + TaskRepository taskRepository, + @Value('${admin.tasks.shutdown-wait-seconds:-1}') long shutdownWaitSeconds) { + this.operationsService = operationsService + this.orchestrationProcessor = orchestrationProcessor + this.taskRepository = taskRepository + this.shutdownWaitSeconds = shutdownWaitSeconds + } +/** * @deprecated Use /{cloudProvider}/ops instead */ @Deprecated - @RequestMapping(value = "/ops", method = RequestMethod.POST) - Map operations(@RequestParam(value = "clientRequestId", required = false) String clientRequestId, - @RequestBody List> requestBody) { - List atomicOperations = collectAtomicOperations(requestBody) - start(atomicOperations, clientRequestId) + @PostMapping("/ops") + StartOperationResult operations( + @RequestParam(value = "clientRequestId", required = false) String clientRequestId, + @RequestBody List> requestBody) { + List atomicOperations = operationsService.collectAtomicOperations(requestBody) + return start(atomicOperations, clientRequestId) } /** * @deprecated Use /{cloudProvider}/ops/{name} instead */ @Deprecated - @RequestMapping(value = "/ops/{name}", method = RequestMethod.POST) - Map operation(@PathVariable("name") String name, - @RequestParam(value = "clientRequestId", required = false) String clientRequestId, - @RequestBody Map requestBody) { - List atomicOperations = collectAtomicOperations([[(name): requestBody]]) - start(atomicOperations, clientRequestId) + @PostMapping("/ops/{name}") + StartOperationResult operation( + @PathVariable("name") String name, + @RequestParam(value = "clientRequestId", required = false) String clientRequestId, + @RequestBody Map requestBody) { + List atomicOperations = operationsService.collectAtomicOperations([[(name): requestBody]]) + return start(atomicOperations, clientRequestId) } - @RequestMapping(value = "/{cloudProvider}/ops", method = RequestMethod.POST) - Map cloudProviderOperations(@PathVariable("cloudProvider") String cloudProvider, - @RequestParam(value = "clientRequestId", required = false) String clientRequestId, - @RequestBody List> requestBody) { - List atomicOperations = collectAtomicOperations(cloudProvider, requestBody) - start(atomicOperations, clientRequestId) + @PostMapping("/{cloudProvider}/ops") + StartOperationResult cloudProviderOperations( + @PathVariable("cloudProvider") String cloudProvider, + @RequestParam(value = "clientRequestId", required = false) String clientRequestId, + @RequestBody List> requestBody) { + List atomicOperations = operationsService.collectAtomicOperations(cloudProvider, requestBody) + return start(atomicOperations, clientRequestId) } - @RequestMapping(value = "/{cloudProvider}/ops/{name}", method = RequestMethod.POST) - Map cloudProviderOperation(@PathVariable("cloudProvider") String cloudProvider, - @PathVariable("name") String name, - @RequestParam(value = "clientRequestId", required = false) String clientRequestId, - @RequestBody Map requestBody) { - List atomicOperations = collectAtomicOperations(cloudProvider, [[(name): requestBody]]) - start(atomicOperations, clientRequestId) + @PostMapping("/{cloudProvider}/ops/{name}") + StartOperationResult cloudProviderOperation( + @PathVariable("cloudProvider") String cloudProvider, + @PathVariable("name") String name, + @RequestParam(value = "clientRequestId", required = false) String clientRequestId, + @RequestBody Map requestBody) { + List atomicOperations = operationsService.collectAtomicOperations(cloudProvider, [[(name): requestBody]]) + return start(atomicOperations, clientRequestId) } - /* - * ---------------------------------------------------------------------------------------------------------------------------- - */ - - private List collectAtomicOperations(List> inputs) { - collectAtomicOperations(null, inputs) + @GetMapping("/task/{id}") + Task get(@PathVariable("id") String id) { + Task t = taskRepository.get(id) + if (!t) { + throw new NotFoundException("Task not found (id: ${id})") + } + return t } - private List collectAtomicOperations(String cloudProvider, List> inputs) { - def results = convert(cloudProvider, inputs) - def atomicOperations = [] - for (bindingResult in results) { - if (bindingResult.errors.hasErrors()) { - throw new DescriptionValidationException(bindingResult.errors) - } else { - atomicOperations.addAll(bindingResult.atomicOperations) - } - } - atomicOperations + @GetMapping("/task") + List list() { + taskRepository.list() } - private ProviderVersion getOperationVersion(Map operation) { - def providerVersion = ProviderVersion.v1 - try { - String accountName = operation.credentials ?: operation.accountName ?: operation.account - if (accountName) { - def credentials = accountCredentialsRepository.getOne(accountName) - providerVersion = credentials.getProviderVersion() - } else { - log.warn "Unable to get account name from operation: $operation" - } - } catch (Exception e) { - log.warn "Unable to determine account version", e + /** + * Endpoint to allow Orca to resume Tasks, if they're backed by Sagas. + * + * @param id + */ + @PostMapping("/task/{id}:resume") + void resumeTask(@PathVariable("id") String id) { + Task t = taskRepository.get(id); + if (t == null) { + throw new NotFoundException("Task not found (id: $id)") } - return providerVersion + // TODO(rz): Check if task is failed: Only allow resuming failed tasks + // TODO(rz): Lookup saga + + throw new UnsupportedOperationException("omg, pickles") } - private List convert(String cloudProvider, List> inputs) { - def username = AuthenticatedRequest.getSpinnakerUser().orElse("unknown") - def allowedAccounts = AuthenticatedRequest.getSpinnakerAccounts().orElse("").split(",") as List - - def descriptions = [] - inputs.collectMany { Map input -> - input.collect { String k, Map v -> - def providerVersion = getOperationVersion(v) - def converter = atomicOperationsRegistry.getAtomicOperationConverter(k, cloudProvider ?: v.cloudProvider, providerVersion) - - v = processDescriptionInput(atomicOperationDescriptionPreProcessors, converter, v) - def description = converter.convertDescription(v) - - descriptions << description - def errors = new DescriptionValidationErrors(description) - - def validator = atomicOperationsRegistry.getAtomicOperationDescriptionValidator( - DescriptionValidator.getValidatorName(k), cloudProvider ?: v.cloudProvider, providerVersion - ) - if (validator) { - validator.validate(descriptions, description, errors) - } else { - log.warn( - "No validator found for operation `${description?.class?.simpleName}` and cloud provider $cloudProvider" - ); - } - - allowedAccountValidators.each { - it.validate(username, allowedAccounts, description, errors) - } - - descriptionAuthorizer.authorize(description, errors) - - AtomicOperation atomicOperation = converter.convertOperation(v) - if (!atomicOperation) { - throw new AtomicOperationNotFoundException(k) - } - if (errors.hasErrors()) { - registry.counter("validationErrors", "operation", atomicOperation.class.simpleName).increment() - } - new AtomicOperationBindingResult(atomicOperation, errors) - } + /** + * TODO(rz): Seems like a weird place to put this logic...? + */ + @PreDestroy + void destroy() { + long start = System.currentTimeMillis() + def tasks = taskRepository.listByThisInstance() + while (tasks && !tasks.isEmpty() && + (System.currentTimeMillis() - start) / TimeUnit.SECONDS.toMillis(1) < shutdownWaitSeconds) { + log.info("There are {} task(s) still running... sleeping before shutting down", tasks.size()) + sleep(1000) + tasks = taskRepository.listByThisInstance() + } + + if (tasks && !tasks.isEmpty()) { + log.error("Shutting down while tasks '{}' are still in progress!", tasks) } } - private Map start(List atomicOperations, String key) { - key = key ?: UUID.randomUUID().toString() - Task task = orchestrationProcessor.process(atomicOperations, key) - [id: task.id, resourceUri: "/task/${task.id}".toString()] + private StartOperationResult start(@Nonnull List atomicOperations, @Nullable String id) { + Task task = + orchestrationProcessor.process( + atomicOperations, Optional.ofNullable(id).orElse(UUID.randomUUID().toString())); + return new StartOperationResult(task.getId()); } - static Map processDescriptionInput(Collection descriptionPreProcessors, - AtomicOperationConverter converter, - Map descriptionInput) { - def descriptionClass = converter.metaClass.methods.find { it.name == "convertDescription" }.returnType - descriptionPreProcessors.findAll { it.supports(descriptionClass) }.each { - descriptionInput = it.process(descriptionInput) - } + static class StartOperationResult { + @JsonProperty private final String id - return descriptionInput - } + StartOperationResult(String id) { + this.id = id + } - @Canonical - static class AtomicOperationBindingResult { - AtomicOperation atomicOperations - Errors errors + @JsonProperty + String getResourceUri() { + return format("/task/%s", id) + } } } diff --git a/clouddriver-web/src/main/groovy/com/netflix/spinnaker/clouddriver/controllers/TaskController.groovy b/clouddriver-web/src/main/groovy/com/netflix/spinnaker/clouddriver/controllers/TaskController.groovy deleted file mode 100644 index 8d8cf919e66..00000000000 --- a/clouddriver-web/src/main/groovy/com/netflix/spinnaker/clouddriver/controllers/TaskController.groovy +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.netflix.spinnaker.clouddriver.controllers - -import com.netflix.spinnaker.clouddriver.data.task.Task -import com.netflix.spinnaker.clouddriver.data.task.TaskRepository -import com.netflix.spinnaker.kork.web.exceptions.NotFoundException -import groovy.util.logging.Slf4j -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.beans.factory.annotation.Value -import org.springframework.web.bind.annotation.PathVariable -import org.springframework.web.bind.annotation.RequestMapping -import org.springframework.web.bind.annotation.RequestMethod -import org.springframework.web.bind.annotation.RestController - -import javax.annotation.PreDestroy -import java.util.concurrent.TimeUnit - -@RequestMapping("/task") -@RestController -@Slf4j -class TaskController { - @Autowired - TaskRepository taskRepository - - @Value('${admin.tasks.shutdown-wait-seconds:-1}') - Long shutdownWaitSeconds - - @RequestMapping(value = "/{id}", method = RequestMethod.GET) - Task get(@PathVariable("id") String id) { - Task t = taskRepository.get(id) - if (!t) { - throw new NotFoundException("Task not found (id: ${id})") - } - return t - } - - @RequestMapping(method = RequestMethod.GET) - List list() { - taskRepository.list() - } - - @PreDestroy - public void destroy() { - long start = System.currentTimeMillis() - def tasks = taskRepository.listByThisInstance() - while (tasks && !tasks.isEmpty() && - (System.currentTimeMillis() - start) / TimeUnit.SECONDS.toMillis(1) < shutdownWaitSeconds) { - log.info("There are {} task(s) still running... sleeping before shutting down", tasks.size()) - sleep(1000) - tasks = taskRepository.listByThisInstance() - } - - if (tasks && !tasks.isEmpty()) { - log.error("Shutting down while tasks '{}' are still in progress!", tasks) - } - } -} diff --git a/clouddriver-web/src/test/groovy/com/netflix/spinnaker/clouddriver/controllers/FeaturesControllerSpec.groovy b/clouddriver-web/src/test/groovy/com/netflix/spinnaker/clouddriver/controllers/FeaturesControllerSpec.groovy index 9de25cf3291..9b0421c7e57 100644 --- a/clouddriver-web/src/test/groovy/com/netflix/spinnaker/clouddriver/controllers/FeaturesControllerSpec.groovy +++ b/clouddriver-web/src/test/groovy/com/netflix/spinnaker/clouddriver/controllers/FeaturesControllerSpec.groovy @@ -70,4 +70,4 @@ class FeaturesControllerSpec extends Specification { @Retention(RetentionPolicy.RUNTIME) @interface MyCloudOperation { String value() -} \ No newline at end of file +}