Skip to content

Commit

Permalink
feat(sagas): Add resume task endpoint (#4032)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert authored Sep 17, 2019
1 parent 92b3bc9 commit ba18802
Show file tree
Hide file tree
Showing 30 changed files with 580 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationDescriptio
import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperationsRegistry
import com.netflix.spinnaker.clouddriver.orchestration.ExceptionClassifier
import com.netflix.spinnaker.clouddriver.orchestration.OperationsService
import com.netflix.spinnaker.clouddriver.saga.SagaEvent
import com.netflix.spinnaker.clouddriver.search.ApplicationSearchProvider
import com.netflix.spinnaker.clouddriver.search.NoopSearchProvider
import com.netflix.spinnaker.clouddriver.search.ProjectSearchProvider
Expand All @@ -91,6 +92,7 @@ import com.netflix.spinnaker.clouddriver.security.DefaultAccountCredentialsProvi
import com.netflix.spinnaker.clouddriver.security.MapBackedAccountCredentialsRepository
import com.netflix.spinnaker.fiat.shared.FiatPermissionEvaluator
import com.netflix.spinnaker.kork.core.RetrySupport
import com.netflix.spinnaker.kork.jackson.ObjectMapperSubtypeConfigurer
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
Expand Down Expand Up @@ -138,6 +140,13 @@ class CloudDriverConfig {
}
}

@Bean
ObjectMapperSubtypeConfigurer.SubtypeLocator clouddriverSubtypeLocator() {
return new ObjectMapperSubtypeConfigurer.ClassSubtypeLocator(SagaEvent, [
"com.netflix.spinnaker.clouddriver.orchestration.sagas"
])
}

@Bean
String clouddriverUserAgentApplicationName(Environment environment) {
return "Spinnaker/${environment.getProperty("Implementation-Version", "Unknown")}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ import com.netflix.spinnaker.clouddriver.orchestration.ExceptionClassifier
import com.netflix.spinnaker.clouddriver.orchestration.OperationsService
import com.netflix.spinnaker.clouddriver.orchestration.OrchestrationProcessor
import com.netflix.spinnaker.clouddriver.orchestration.events.OperationEventHandler
import com.netflix.spinnaker.clouddriver.saga.persistence.SagaRepository
import com.netflix.spinnaker.clouddriver.security.AccountCredentialsRepository
import com.netflix.spinnaker.clouddriver.security.AllowedAccountsValidator
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.ApplicationContext
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration

@Configuration
@ComponentScan("com.netflix.spinnaker.clouddriver.orchestration.sagas")
class DeployConfiguration {
@Bean
@ConditionalOnMissingBean(TaskRepository)
Expand Down Expand Up @@ -92,6 +95,7 @@ class DeployConfiguration {
Optional<Collection<AllowedAccountsValidator>> allowedAccountsValidators,
Optional<List<AtomicOperationDescriptionPreProcessor>> atomicOperationDescriptionPreProcessors,
AccountCredentialsRepository accountCredentialsRepository,
Optional<SagaRepository> sagaRepository,
Registry registry,
ObjectMapper objectMapper
) {
Expand All @@ -101,6 +105,7 @@ class DeployConfiguration {
allowedAccountsValidators,
atomicOperationDescriptionPreProcessors,
accountCredentialsRepository,
sagaRepository,
registry,
objectMapper
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public class DefaultTask implements Task {
boolean hasSagaIds() {
return !sagaIdentifiers.isEmpty()
}

@Override
void retry() {
statusHistory.addLast(currentStatus().update(TaskState.STARTED))
}
}

@Immutable(knownImmutableClasses = [Status])
Expand Down Expand Up @@ -190,5 +195,4 @@ class DefaultTaskStatus implements Status {
throw new IllegalStateException("Task is already completed! No further updates allowed!")
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,7 @@ default boolean isRetryable() {
}
return getStatus().isFailed() && getStatus().isRetryable();
}

/** Updates the status of a failed Task to running in response to a retry operation. */
void retry();
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,16 @@ class JedisTask implements Task {
return !sagaIds.isEmpty()
}

@Override
void retry() {
checkMutable()
repository.addToHistory(
repository.currentState(this).update(TaskState.STARTED),
this
)

}

private void checkMutable() {
if (previousRedis) {
throw new IllegalStateException("Read-only task")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,18 @@ package com.netflix.spinnaker.clouddriver.deploy
import com.netflix.spinnaker.clouddriver.data.task.Task
import com.netflix.spinnaker.clouddriver.data.task.TaskRepository
import com.netflix.spinnaker.clouddriver.orchestration.AtomicOperation
import com.netflix.spinnaker.clouddriver.orchestration.SagaContextAware
import com.netflix.spinnaker.clouddriver.orchestration.events.OperationEvent
import org.springframework.beans.factory.annotation.Autowired

class DeployAtomicOperation implements AtomicOperation<DeploymentResult> {
class DeployAtomicOperation implements AtomicOperation<DeploymentResult>, SagaContextAware {
private static final String TASK_PHASE = "DEPLOY"

@Autowired
DeployHandlerRegistry deploymentHandlerRegistry

private final DeployDescription description
SagaContext sagaContext

DeployAtomicOperation(DeployDescription description) {
this.description = description
Expand All @@ -52,6 +54,10 @@ class DeployAtomicOperation implements AtomicOperation<DeploymentResult> {
throw new DeployHandlerNotFoundException("Could not find handler for ${description.getClass().simpleName}!")
}

if (deployHandler instanceof SagaContextAware) {
deployHandler.sagaContext = sagaContext
}

task.updateStatus TASK_PHASE, "Found handler: ${deployHandler.getClass().simpleName}"

task.updateStatus TASK_PHASE, "Invoking Handler."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ class DefaultOrchestrationProcessor implements OrchestrationProcessor {
return new GetTaskResult(existingTask, false)
}
existingTask.updateStatus(TASK_PHASE, "Re-initializing Orchestration Task")
existingTask.retry()
return new GetTaskResult(existingTask, true)
}
return new GetTaskResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
import com.google.common.base.Splitter;
import com.netflix.spectator.api.Id;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.clouddriver.data.task.SagaId;
import com.netflix.spinnaker.clouddriver.deploy.DescriptionAuthorizer;
import com.netflix.spinnaker.clouddriver.deploy.DescriptionValidationErrors;
import com.netflix.spinnaker.clouddriver.deploy.DescriptionValidationException;
import com.netflix.spinnaker.clouddriver.deploy.DescriptionValidator;
import com.netflix.spinnaker.clouddriver.orchestration.sagas.SnapshotAtomicOperationInput.SnapshotAtomicOperationInputCommand;
import com.netflix.spinnaker.clouddriver.saga.persistence.SagaRepository;
import com.netflix.spinnaker.clouddriver.security.AccountCredentials;
import com.netflix.spinnaker.clouddriver.security.AccountCredentialsRepository;
import com.netflix.spinnaker.clouddriver.security.AllowedAccountsValidator;
Expand All @@ -35,6 +38,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
Expand All @@ -56,6 +60,7 @@ public class OperationsService {
private final List<AtomicOperationDescriptionPreProcessor>
atomicOperationDescriptionPreProcessors;
private final AccountCredentialsRepository accountCredentialsRepository;
private final Optional<SagaRepository> sagaRepository;
private final Registry registry;
private final ObjectMapper objectMapper;

Expand All @@ -68,6 +73,7 @@ public OperationsService(
Optional<List<AtomicOperationDescriptionPreProcessor>>
atomicOperationDescriptionPreProcessors,
AccountCredentialsRepository accountCredentialsRepository,
Optional<SagaRepository> sagaRepository,
Registry registry,
ObjectMapper objectMapper) {
this.atomicOperationsRegistry = atomicOperationsRegistry;
Expand All @@ -76,6 +82,7 @@ public OperationsService(
this.atomicOperationDescriptionPreProcessors =
atomicOperationDescriptionPreProcessors.orElse(Collections.emptyList());
this.accountCredentialsRepository = accountCredentialsRepository;
this.sagaRepository = sagaRepository;
this.registry = registry;
this.objectMapper = objectMapper;

Expand Down Expand Up @@ -131,6 +138,8 @@ private List<AtomicOperationBindingResult> convert(
atomicOperationsRegistry.getAtomicOperationConverter(
descriptionName, provider, providerVersion);

// TODO(rz): What if a preprocessor fails due to a downstream error? How
// does this affect retrying?
Map processedInput =
processDescriptionInput(
atomicOperationDescriptionPreProcessors,
Expand Down Expand Up @@ -178,6 +187,13 @@ private List<AtomicOperationBindingResult> convert(
throw new AtomicOperationNotFoundException(descriptionName);
}

if (atomicOperation instanceof SagaContextAware) {
((SagaContextAware) atomicOperation)
.setSagaContext(
new SagaContextAware.SagaContext(
cloudProvider, descriptionName, descriptionInput));
}

if (errors.hasErrors()) {
registry
.counter(
Expand All @@ -191,6 +207,38 @@ private List<AtomicOperationBindingResult> convert(
.collect(Collectors.toList());
}

public List<AtomicOperation> collectAtomicOperationsFromSagas(List<SagaId> sagaIds) {
if (!sagaRepository.isPresent()) {
return Collections.emptyList();
}
// Resuming a saga-backed AtomicOperation is kind of a pain. This is because AtomicOperations
// and their descriptions are totally decoupled from their input & description name, so we
// have to store additional state in the Saga and then use that to reconstruct
// AtomicOperations. It'd make sense to refactor all of this someday.
return sagaIds.stream()
.map(id -> sagaRepository.get().get(id.getName(), id.getId()))
.filter(Objects::nonNull)
.filter(it -> !it.isComplete())
.map(saga -> saga.getEvent(SnapshotAtomicOperationInputCommand.class))
.map(
it ->
convert(
it.getCloudProvider(),
Collections.singletonList(
Collections.singletonMap(
it.getDescriptionName(), it.getDescriptionInput()))))
.flatMap(Collection::stream)
.map(this::atomicOperationOrError)
.collect(Collectors.toList());
}

private AtomicOperation atomicOperationOrError(AtomicOperationBindingResult bindingResult) {
if (bindingResult.errors.hasErrors()) {
throw new DescriptionValidationException(bindingResult.errors);
}
return bindingResult.atomicOperation;
}

private ProviderVersion getOperationVersion(OperationInput operation) {
final String accountName = operation.computeAccountName();
if (accountName == null) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright 2019 Netflix, Inc.
*
* <p>Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file
* except in compliance with the License. You may obtain a copy of the License at
*
* <p>http://www.apache.org/licenses/LICENSE-2.0
*
* <p>Unless required by applicable law or agreed to in writing, software distributed under the
* License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.orchestration;

import com.netflix.spinnaker.clouddriver.data.task.Task;
import java.util.List;

/**
* Implementations of this interface should perform orchestration of operations in a workflow. Often
* will be used in conjunction with {@link AtomicOperation} instances.
*/
public interface OrchestrationProcessor {
/**
* This is the invocation point of orchestration.
*
* @param key a unique key, used to de-dupe orchestration requests
* @return a list of results
*/
Task process(List<AtomicOperation> atomicOperations, String key);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.orchestration;

import java.util.Map;
import javax.annotation.Nonnull;
import lombok.AllArgsConstructor;
import lombok.Data;

/**
* Used to bridge AtomicOperations with Sagas.
*
* <p>Unfortunately, AtomicOperations and their descriptions are pretty well decoupled from their
* original input. This makes it difficult to retry operations without re-sending the entire
* operation ayload.
*/
public interface SagaContextAware {
void setSagaContext(@Nonnull SagaContext sagaContext);

@Data
@AllArgsConstructor
class SagaContext {
private String cloudProvider;
private String descriptionName;
private Map originalInput;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.titus.deploy.actions;
package com.netflix.spinnaker.clouddriver.orchestration.sagas;

import static java.lang.String.format;

Expand Down Expand Up @@ -135,7 +135,7 @@ public Result apply(@NotNull LoadFront50AppCommand command, @NotNull Saga saga)
}

/** Marks a SagaCommand as being aware of the result of the LoadFront50App SagaAction. */
interface Front50AppAware {
public interface Front50AppAware {
void setFront50App(Front50App app);
}

Expand Down
Loading

0 comments on commit ba18802

Please sign in to comment.