Skip to content

Commit

Permalink
feat(interlink): handle executions in foreign partitions (#3459)
Browse files Browse the repository at this point in the history
* feat(interlink): handle executions in foreign partitions

We can optionally inject an Interlink object in SqlExecutionRepository. When present, it can be used to notify a pubsub topic of a request that the execution repository could not handle (instead of throwing a `ForeignExecutionException`). Other orca clusters can then subscribe to that topic, unpack the message about the failed action and potentially replay it on their local repository if they handle that partition.

To avoid pathologic cases where 2 orca clusters end up endlessly bouncing the same message back and forth across the interlink, we add `MessageFlagger` to act as a circuit breaker. Based on a fixed size evicting queue, it inspects the fingerprint of the last ${maxSize} events published by this instance, and if it detects that we are over some threshold, it will not let the message get published on the topic. One example where this may happen is the following scenario with an orphaned execution that everybody considers as belonging to someone else:
1. execution repo 1 has an execution with id I and partition `apple`
2. execution repo 2 has an execution with the same id I and partition `banana`
3. when a user makes a request to orca 1 with partition `banana` to delete I, it is considered a foreign execution and a message is sent on the interlink
4. orca 2 receives the interlink message and tries to delete it from execution repo 2, but it is also considered foreign (because it has partition `apple`)
5. therefore, orca 2 sends a message on the interlink, and so on

Caveats:
- this change only handles pause, resume, cancel and delete execution events
- to cover every use case, we will also need to handle restart stage, skip wait, manual judgement...
  • Loading branch information
dreynaud committed Feb 25, 2020
1 parent fbb8967 commit cc7ed9d
Show file tree
Hide file tree
Showing 29 changed files with 1,056 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,12 @@
"com.netflix.spinnaker.orca.pipeline.util",
"com.netflix.spinnaker.orca.preprocessors",
"com.netflix.spinnaker.orca.telemetry",
"com.netflix.spinnaker.orca.notifications.scheduling"
"com.netflix.spinnaker.orca.notifications.scheduling",
})
@Import({
PreprocessorConfiguration.class,
PluginsAutoConfiguration.class,
})
@Import({PreprocessorConfiguration.class, PluginsAutoConfiguration.class})
@EnableConfigurationProperties
public class OrcaConfiguration {
@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,24 @@ List<String> retrieveAllApplicationNames(

List<String> retrieveAllExecutionIds(@Nonnull ExecutionType type);

/**
* Returns the name of the partition that this execution repository owns/handles. {@code null}
* means that it handles any partition (in other words, this execution repository is not partition
* aware)
*/
@Nullable
default String getPartition() {
return null;
}

default boolean handlesPartition(@Nullable String partitionOfExecution) {
return partitionOfExecution
== null // executions with no partition are implied to be owned by current orca
|| getPartition()
== null // this repository is not restricted to a partition, can handle any execution
|| partitionOfExecution.equals(getPartition()); // both are set and must match
}

final class ExecutionCriteria {
private int pageSize = 3500;
private Collection<ExecutionStatus> statuses = new ArrayList<>();
Expand Down
32 changes: 32 additions & 0 deletions orca-interlink/orca-interlink.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


apply from: "$rootDir/gradle/kotlin.gradle"
apply from: "$rootDir/gradle/spock.gradle"
apply plugin: "java"

dependencies {
api("com.netflix.spinnaker.kork:kork-pubsub")
api("com.netflix.spinnaker.kork:kork-pubsub-aws")

implementation(project(":orca-core"))
implementation("com.amazonaws:aws-java-sdk-sqs")

compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
testImplementation(project(":orca-test"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.kork.pubsub.PubsubPublishers;
import com.netflix.spinnaker.kork.pubsub.aws.SNSPublisherProvider;
import com.netflix.spinnaker.kork.pubsub.aws.api.AmazonPubsubMessageHandler;
import com.netflix.spinnaker.kork.pubsub.aws.api.AmazonPubsubMessageHandlerFactory;
import com.netflix.spinnaker.kork.pubsub.aws.config.AmazonPubsubConfig;
import com.netflix.spinnaker.kork.pubsub.aws.config.AmazonPubsubProperties;
import com.netflix.spinnaker.kork.pubsub.config.PubsubConfig;
import com.netflix.spinnaker.orca.interlink.Interlink;
import com.netflix.spinnaker.orca.interlink.MessageFlagger;
import com.netflix.spinnaker.orca.interlink.aws.InterlinkAmazonMessageHandler;
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository;
import java.time.Clock;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@Import({PubsubConfig.class, AmazonPubsubConfig.class})
@ConditionalOnProperty("interlink.enabled")
@EnableConfigurationProperties(InterlinkConfigurationProperties.class)
@Slf4j
public class InterlinkConfiguration {
@Bean
@ConditionalOnProperty({"pubsub.enabled", "pubsub.amazon.enabled"})
public AmazonPubsubMessageHandlerFactory amazonPubsubMessageHandlerFactory(
ObjectMapper objectMapper, ExecutionRepository repository) {
return new AmazonPubsubMessageHandlerFactory() {
@Override
public AmazonPubsubMessageHandler create(
AmazonPubsubProperties.AmazonPubsubSubscription subscription) {
if (!Interlink.SUBSCRIPTION_NAME.equals(subscription.getName())) {
log.debug(
"Skipping non-interlink pubsub subscription named '{}'", subscription.getName());
return null;
}

return new InterlinkAmazonMessageHandler(objectMapper, repository);
}
};
}

@Bean
@ConditionalOnProperty({"pubsub.enabled", "pubsub.amazon.enabled"})
public Interlink amazonInterlink(
PubsubPublishers publishers,
ObjectMapper objectMapper,
InterlinkConfigurationProperties properties,
Registry registry,
Clock clock,

// injected here to make sure the provider ran before Interlink,
// otherwise the publisher may not have been initialized
SNSPublisherProvider snsProvider) {
return new Interlink(
publishers, objectMapper, new MessageFlagger(clock, properties.flagger), registry);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

@Data
@ConfigurationProperties(prefix = "interlink")
public class InterlinkConfigurationProperties {
FlaggerProperties flagger;

/** see {@link com.netflix.spinnaker.orca.interlink.MessageFlagger} */
@Data
public static class FlaggerProperties {
public boolean enabled = true;
public int maxSize = 32;
public int threshold = 8;
public long lookbackSeconds = 60;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.interlink;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.netflix.spectator.api.Counter;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.kork.exceptions.ConfigurationException;
import com.netflix.spinnaker.kork.pubsub.PubsubPublishers;
import com.netflix.spinnaker.kork.pubsub.model.PubsubPublisher;
import com.netflix.spinnaker.orca.interlink.events.InterlinkEvent;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class Interlink {
public static final String SUBSCRIPTION_NAME = "interlink";
private final PubsubPublisher publisher;
private final ObjectMapper objectMapper;
private final MessageFlagger flagger;
private Counter flaggedCounter;

public Interlink(
PubsubPublishers publishers,
ObjectMapper objectMapper,
MessageFlagger flagger,
Registry registry) {
this.objectMapper = objectMapper;
this.flagger = flagger;

publisher =
publishers.getAll().stream()
.filter(pubsubPublisher -> SUBSCRIPTION_NAME.equals(pubsubPublisher.getTopicName()))
.findFirst()
.orElse(null);

if (publisher == null) {
throw new ConfigurationException(
"could not find interlink publisher in ["
+ publishers.getAll().stream()
.map(PubsubPublisher::getTopicName)
.collect(Collectors.joining(", "))
+ "]");
}

this.flaggedCounter =
registry.counter(
"pubsub." + publisher.getPubsubSystem() + ".flagged",
"subscription",
publisher.getName());
}

public void publish(InterlinkEvent event) {
try {
flagger.process(event);
} catch (MessageFlaggedException e) {
log.warn("Will not publish event {} to interlink", event, e);
flaggedCounter.increment();
return;
}

try {
publisher.publish(objectMapper.writeValueAsString(event));
} catch (JsonProcessingException e) {
log.error("Failed to serialize event {}", event, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.interlink;

import com.netflix.spinnaker.kork.exceptions.SpinnakerException;

public class InterlinkMessageHandlingException extends SpinnakerException {
public InterlinkMessageHandlingException(String message) {
super(message);
}

public InterlinkMessageHandlingException(String message, Throwable cause) {
super(message, cause);
}

public InterlinkMessageHandlingException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.interlink

import com.netflix.spinnaker.kork.exceptions.SystemException

class MessageFlaggedException(message: String?) : SystemException(message)
Loading

0 comments on commit cc7ed9d

Please sign in to comment.