Skip to content

Commit

Permalink
feat(saga): Add support for retryable tasks within OrchestrationProce…
Browse files Browse the repository at this point in the history
…ssor (#4025)
  • Loading branch information
robzienert committed Sep 16, 2019
1 parent 5d214b3 commit f8b3319
Show file tree
Hide file tree
Showing 40 changed files with 1,120 additions and 519 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public void testTaskLookup() {
assertThat(t1.getStatus().isFailed()).isEqualTo(t2.getStatus().isFailed());
assertThat(t1.getStatus().isCompleted()).isFalse();
assertThat(t1.getStatus().isFailed()).isFalse();
assertThat(t1.getStatus().isRetryable()).isFalse();
}

@Test
Expand All @@ -73,6 +74,19 @@ public void testFailureStatus() {

assertThat(t2.getStatus().isCompleted()).isTrue();
assertThat(t2.getStatus().isFailed()).isTrue();
assertThat(t2.getStatus().isRetryable()).isFalse();
}

@Test
public void testRetryableStatus() {
Task t1 = subject.create("TEST", "Test Status");
t1.fail(true);

Task t2 = subject.get(t1.getId());

assertThat(t2.getStatus().isCompleted()).isTrue();
assertThat(t2.getStatus().isFailed()).isTrue();
assertThat(t2.getStatus().isRetryable()).isTrue();
}

@Test
Expand Down
1 change: 1 addition & 0 deletions clouddriver-core/clouddriver-core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ dependencies {
implementation project(":cats:cats-core")
implementation project(":cats:cats-redis")
implementation project(":clouddriver-security")
implementation project(":clouddriver-saga")

compileOnly "org.projectlombok:lombok"
annotationProcessor "org.projectlombok:lombok"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,18 @@ import com.netflix.spinnaker.clouddriver.model.SubnetProvider
import com.netflix.spinnaker.clouddriver.names.NamerRegistry
import com.netflix.spinnaker.clouddriver.names.NamingStrategy
import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationConverter
import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationDescriptionPreProcessor
import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationsRegistry
import com.netflix.spinnaker.clouddriver.orchestration.ExceptionClassifier
import com.netflix.spinnaker.clouddriver.orchestration.OperationsService
import com.netflix.spinnaker.clouddriver.search.ApplicationSearchProvider
import com.netflix.spinnaker.clouddriver.search.NoopSearchProvider
import com.netflix.spinnaker.clouddriver.search.ProjectSearchProvider
import com.netflix.spinnaker.clouddriver.search.SearchProvider
import com.netflix.spinnaker.clouddriver.search.executor.SearchExecutorConfig
import com.netflix.spinnaker.clouddriver.security.AccountCredentialsProvider
import com.netflix.spinnaker.clouddriver.security.AccountCredentialsRepository
import com.netflix.spinnaker.clouddriver.security.AllowedAccountsValidator
import com.netflix.spinnaker.clouddriver.security.DefaultAccountCredentialsProvider
import com.netflix.spinnaker.clouddriver.security.MapBackedAccountCredentialsRepository
import com.netflix.spinnaker.fiat.shared.FiatPermissionEvaluator
Expand Down Expand Up @@ -111,7 +116,7 @@ import java.time.Clock
SearchExecutorConfig
])
@PropertySource(value = "classpath:META-INF/clouddriver-core.properties", ignoreResourceNotFound = true)
@EnableConfigurationProperties(ProjectClustersCachingAgentProperties)
@EnableConfigurationProperties([ProjectClustersCachingAgentProperties, ExceptionClassifierConfigurationProperties])
class CloudDriverConfig {

@Bean
Expand Down Expand Up @@ -345,4 +350,9 @@ class CloudDriverConfig {
fiatPermissionEvaluator
)
}

@Bean
ExceptionClassifier exceptionClassifier(ExceptionClassifierConfigurationProperties properties) {
return new ExceptionClassifier(properties)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,27 @@

package com.netflix.spinnaker.clouddriver.config

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.clouddriver.data.task.InMemoryTaskRepository
import com.netflix.spinnaker.clouddriver.data.task.TaskRepository
import com.netflix.spinnaker.clouddriver.deploy.DefaultDeployHandlerRegistry
import com.netflix.spinnaker.clouddriver.deploy.DeployHandler
import com.netflix.spinnaker.clouddriver.deploy.DeployHandlerRegistry
import com.netflix.spinnaker.clouddriver.deploy.DescriptionAuthorizer
import com.netflix.spinnaker.clouddriver.deploy.NullOpDeployHandler
import com.netflix.spinnaker.clouddriver.orchestration.AnnotationsBasedAtomicOperationsRegistry
import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationDescriptionPreProcessor
import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationsRegistry
import com.netflix.spinnaker.clouddriver.orchestration.DefaultOrchestrationProcessor
import com.netflix.spinnaker.clouddriver.orchestration.ExceptionClassifier
import com.netflix.spinnaker.clouddriver.orchestration.OperationsService
import com.netflix.spinnaker.clouddriver.orchestration.OrchestrationProcessor
import com.netflix.spinnaker.clouddriver.orchestration.events.OperationEventHandler
import com.netflix.spinnaker.clouddriver.security.AccountCredentialsRepository
import com.netflix.spinnaker.clouddriver.security.AllowedAccountsValidator
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

Expand All @@ -46,8 +56,22 @@ class DeployConfiguration {

@Bean
@ConditionalOnMissingBean(OrchestrationProcessor)
OrchestrationProcessor orchestrationProcessor() {
new DefaultOrchestrationProcessor()
OrchestrationProcessor orchestrationProcessor(
TaskRepository taskRepository,
ApplicationContext applicationContext,
Registry registry,
Optional<Collection<OperationEventHandler>> operationEventHandlers,
ObjectMapper objectMapper,
ExceptionClassifier exceptionClassifier
) {
new DefaultOrchestrationProcessor(
taskRepository,
applicationContext,
registry,
operationEventHandlers,
objectMapper,
exceptionClassifier
)
}

@Bean
Expand All @@ -60,4 +84,25 @@ class DeployConfiguration {
AtomicOperationsRegistry atomicOperationsRegistry() {
new AnnotationsBasedAtomicOperationsRegistry()
}

@Bean
OperationsService operationsService(
AtomicOperationsRegistry atomicOperationsRegistry,
DescriptionAuthorizer descriptionAuthorizer,
Optional<Collection<AllowedAccountsValidator>> allowedAccountsValidators,
Optional<List<AtomicOperationDescriptionPreProcessor>> atomicOperationDescriptionPreProcessors,
AccountCredentialsRepository accountCredentialsRepository,
Registry registry,
ObjectMapper objectMapper
) {
return new OperationsService(
atomicOperationsRegistry,
descriptionAuthorizer,
allowedAccountsValidators,
atomicOperationDescriptionPreProcessors,
accountCredentialsRepository,
registry,
objectMapper
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.config;

import java.util.ArrayList;
import java.util.List;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("exception-classifier")
@Data
public class ExceptionClassifierConfigurationProperties {

/**
* A list of fully-qualified Exception class names that are not retryable within the scope of
* Saga-backed orchestrations.
*/
private List<String> nonRetryableClasses = new ArrayList<>();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.netflix.spinnaker.clouddriver.core.ClouddriverHostname
import groovy.transform.CompileStatic
import groovy.transform.Immutable

import javax.annotation.Nonnull
import java.util.concurrent.ConcurrentLinkedDeque
import java.util.logging.Logger

Expand All @@ -34,6 +35,7 @@ public class DefaultTask implements Task {
final String requestId = null
private final Deque<Status> statusHistory = new ConcurrentLinkedDeque<Status>()
private final Deque<Object> resultObjects = new ConcurrentLinkedDeque<Object>()
private final Deque<SagaId> sagaIdentifiers = new ConcurrentLinkedDeque<>()
final long startTimeMs = System.currentTimeMillis()

public String getOwnerId() {
Expand Down Expand Up @@ -67,6 +69,11 @@ public class DefaultTask implements Task {
statusHistory.addLast(currentStatus().update(TaskState.FAILED))
}

@Override
void fail(boolean retryable) {
statusHistory.addLast(currentStatus().update(retryable ? TaskState.FAILED_RETRYABLE : TaskState.FAILED))
}

public Status getStatus() {
currentStatus()
}
Expand All @@ -90,6 +97,21 @@ public class DefaultTask implements Task {
private DefaultTaskStatus currentStatus() {
statusHistory.getLast() as DefaultTaskStatus
}

@Override
void addSagaId(@Nonnull SagaId sagaId) {
sagaIdentifiers.addLast(sagaId)
}

@Override
List<SagaId> getSagaIds() {
return sagaIdentifiers.toList()
}

@Override
boolean hasSagaIds() {
return !sagaIdentifiers.isEmpty()
}
}

@Immutable(knownImmutableClasses = [Status])
Expand Down Expand Up @@ -117,6 +139,12 @@ class TaskDisplayStatus implements Status {

@JsonIgnore
Boolean isFailed() { taskStatus.isFailed() }

@JsonIgnore
@Override
Boolean isRetryable() {
return taskStatus.isRetryable()
}
}

@Immutable
Expand All @@ -142,6 +170,11 @@ class DefaultTaskStatus implements Status {
@JsonProperty
public Boolean isFailed() { state.isFailed() }

@JsonProperty
public Boolean isRetryable() {
return state.isRetryable()
}

DefaultTaskStatus update(String phase, String status) {
ensureUpdateable()
new DefaultTaskStatus(phase, status, state)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
/*
* Copyright 2015 Netflix, Inc.
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.data.task;

package com.netflix.spinnaker.clouddriver.orchestration
import javax.annotation.Nonnull;
import lombok.Value;

import groovy.transform.InheritConstructors
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.ResponseStatus

@ResponseStatus(value = HttpStatus.BAD_REQUEST)
@InheritConstructors
class AtomicOperationConverterNotFoundException extends RuntimeException {}
@Value
public class SagaId {
@Nonnull String name;
@Nonnull String id;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,9 @@ public interface Status {
* Informs whether the task has failed or not. A "failed" state is always indicitive of a "completed" state.
*/
Boolean isFailed()

/**
* Informs whether a failed task is retryable or not.
*/
Boolean isRetryable()
}

This file was deleted.

Loading

0 comments on commit f8b3319

Please sign in to comment.