Skip to content

Commit

Permalink
fix(discovery): Respect discovery status (#1050)
Browse files Browse the repository at this point in the history
Add discovery status check for triggering, polling pipelines configs, and pausing/resuming quartz based on discovery status
  • Loading branch information
marchello2000 committed Nov 2, 2020
1 parent e1dfa74 commit 7042076
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 21 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
plugins {
id 'io.spinnaker.project' version "$spinnakerGradleVersion" apply false
id "org.jetbrains.kotlin.jvm" version "$kotlinVersion" apply false
id 'com.adarshr.test-logger' version '2.1.0'
}

subprojects {
Expand All @@ -34,6 +35,7 @@ subprojects {
apply plugin: "java-library"
apply plugin: "groovy"
apply plugin: 'jacoco'
apply plugin: 'com.adarshr.test-logger'

test {
testLogging {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private List<Map<String, Object>> buildStageNotifications(
}

List<Map<String, Object>> notifications =
(List<Map<String, Object>>) context.get("notifications");
(List<Map<String, Object>>) context.getOrDefault("notifications", Collections.emptyList());

return notifications.stream()
.filter(it -> shouldSendRequestForNotification(it, configType, status))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.netflix.spinnaker.echo.model.Trigger;
import com.netflix.spinnaker.echo.pipelinetriggers.orca.OrcaService;
import com.netflix.spinnaker.echo.services.Front50Service;
import com.netflix.spinnaker.kork.discovery.DiscoveryStatusListener;
import com.netflix.spinnaker.security.AuthenticatedRequest;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -60,6 +61,8 @@ public class PipelineCache implements MonitoredPoller {
private volatile Boolean running;
private volatile Instant lastPollTimestamp;

private final DiscoveryStatusListener discoveryStatusListener;

@Nullable private volatile List<Pipeline> pipelines;

@Nullable private volatile Map<String, List<Trigger>> triggersByType;
Expand All @@ -69,6 +72,7 @@ public PipelineCache(
@Value("${front50.polling-interval-ms:30000}") int pollingIntervalMs,
@Value("${front50.polling-sleep-ms:100}") int pollingSleepMs,
ObjectMapper objectMapper,
@NonNull DiscoveryStatusListener discoveryStatusListener,
@NonNull Front50Service front50,
@NonNull OrcaService orca,
@NonNull Registry registry) {
Expand All @@ -77,6 +81,7 @@ public PipelineCache(
pollingIntervalMs,
pollingSleepMs,
objectMapper,
discoveryStatusListener,
front50,
orca,
registry);
Expand All @@ -88,13 +93,15 @@ public PipelineCache(
int pollingIntervalMs,
int pollingSleepMs,
ObjectMapper objectMapper,
@NonNull DiscoveryStatusListener discoveryStatusListener,
@NonNull Front50Service front50,
@NonNull OrcaService orca,
@NonNull Registry registry) {
this.objectMapper = objectMapper;
this.executorService = executorService;
this.pollingIntervalMs = pollingIntervalMs;
this.pollingSleepMs = pollingSleepMs;
this.discoveryStatusListener = discoveryStatusListener;
this.front50 = front50;
this.orca = orca;
this.registry = registry;
Expand Down Expand Up @@ -130,14 +137,14 @@ public void run() {
}

private Double getDurationSeconds() {
return lastPollTimestamp == null
return (lastPollTimestamp == null || !discoveryStatusListener.isEnabled())
? -1d
: (double) Duration.between(lastPollTimestamp, now()).getSeconds();
}

// VisibleForTesting
void pollPipelineConfigs() {
if (!isRunning()) {
if (!isRunning() || !discoveryStatusListener.isEnabled()) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.netflix.spinnaker.fiat.model.resources.Account;
import com.netflix.spinnaker.fiat.shared.FiatPermissionEvaluator;
import com.netflix.spinnaker.fiat.shared.FiatStatus;
import com.netflix.spinnaker.kork.discovery.DiscoveryStatusListener;
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService;
import com.netflix.spinnaker.security.AuthenticatedRequest;
import com.netflix.spinnaker.security.User;
Expand Down Expand Up @@ -66,6 +67,7 @@ public class PipelineInitiator {
private final int retryCount;
private final long retryDelayMillis;
private final ExecutorService executorService;
private final DiscoveryStatusListener discoveryStatusListener;

@Autowired
public PipelineInitiator(
Expand All @@ -77,6 +79,7 @@ public PipelineInitiator(
ObjectMapper objectMapper,
@NonNull QuietPeriodIndicator quietPeriodIndicator,
@NonNull DynamicConfigService dynamicConfigService,
@NonNull DiscoveryStatusListener discoveryStatusListener,
@Value("${orca.pipeline-initiator-retry-count:5}") int retryCount,
@Value("${orca.pipeline-initiator-retry-delay-millis:5000}") long retryDelayMillis) {
this.registry = registry;
Expand All @@ -89,6 +92,7 @@ public PipelineInitiator(
this.retryCount = retryCount;
this.retryDelayMillis = retryDelayMillis;
this.executorService = executorService;
this.discoveryStatusListener = discoveryStatusListener;
}

@PostConstruct
Expand Down Expand Up @@ -393,6 +397,10 @@ private String getTriggerType(Pipeline pipeline) {
private boolean isEnabled(TriggerSource triggerSource) {
boolean triggerEnabled = true;

if (!discoveryStatusListener.isEnabled()) {
return false;
}

if (triggerSource == TriggerSource.COMPENSATION_SCHEDULER) {
triggerEnabled = dynamicConfigService.isEnabled("scheduler.compensation-job.triggers", true);
} else if (triggerSource == TriggerSource.CRON_SCHEDULER) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.netflix.spinnaker.echo.model.Trigger
import com.netflix.spinnaker.echo.pipelinetriggers.orca.OrcaService
import com.netflix.spinnaker.echo.services.Front50Service
import com.netflix.spinnaker.echo.test.RetrofitStubs
import com.netflix.spinnaker.kork.discovery.DiscoveryStatusListener
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Subject
Expand All @@ -37,6 +38,7 @@ class PipelineCacheSpec extends Specification implements RetrofitStubs {
def orca = Mock(OrcaService)
def registry = new NoopRegistry()
def objectMapper = EchoObjectMapper.getInstance()
def activator = Mock(DiscoveryStatusListener)

@Shared
def interval = 30
Expand All @@ -45,7 +47,7 @@ class PipelineCacheSpec extends Specification implements RetrofitStubs {
def sleepMs = 100

@Subject
def pipelineCache = new PipelineCache(Mock(ScheduledExecutorService), interval, sleepMs, objectMapper, front50, orca, registry)
def pipelineCache = new PipelineCache(Mock(ScheduledExecutorService), interval, sleepMs, objectMapper, activator, front50, orca, registry)

def "keeps polling if Front50 returns an error"() {
given:
Expand All @@ -56,6 +58,8 @@ class PipelineCacheSpec extends Specification implements RetrofitStubs {
]
def pipeline = Pipeline.builder().application('application').name('Pipeline').id('P1').build()

activator.isEnabled() >> true

def initialLoad = []
front50.getPipelines() >> initialLoad >> { throw unavailable() } >> [pipelineMap]
pipelineCache.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.netflix.spinnaker.fiat.model.UserPermission
import com.netflix.spinnaker.fiat.model.resources.Account
import com.netflix.spinnaker.fiat.shared.FiatPermissionEvaluator
import com.netflix.spinnaker.fiat.shared.FiatStatus
import com.netflix.spinnaker.kork.discovery.DiscoveryStatusListener
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.kork.web.context.AuthenticatedRequestContextProvider
import com.netflix.spinnaker.kork.web.context.RequestContext
Expand All @@ -29,6 +30,7 @@ class PipelineInitiatorSpec extends Specification {
def objectMapper = Mock(ObjectMapper)
def quietPeriodIndicator = Mock(QuietPeriodIndicator)
def contextProvider = new AuthenticatedRequestContextProvider()
def activator = Mock(DiscoveryStatusListener)

Optional<String> capturedSpinnakerUser
Optional<String> capturedSpinnakerAccounts
Expand Down Expand Up @@ -60,7 +62,7 @@ class PipelineInitiatorSpec extends Specification {
given:
def dynamicConfigService = Mock(DynamicConfigService)
def pipelineInitiator = new PipelineInitiator(
registry, orca, Optional.of(fiatPermissionEvaluator), fiatStatus, MoreExecutors.newDirectExecutorService(), objectMapper, quietPeriodIndicator, dynamicConfigService, 5, 5000
registry, orca, Optional.of(fiatPermissionEvaluator), fiatStatus, MoreExecutors.newDirectExecutorService(), objectMapper, quietPeriodIndicator, dynamicConfigService, activator, 5, 5000
)

def pipeline = Pipeline
Expand All @@ -78,10 +80,11 @@ class PipelineInitiatorSpec extends Specification {
pipelineInitiator.startPipeline(pipeline, PipelineInitiator.TriggerSource.CRON_SCHEDULER)

then:
1 * dynamicConfigService.isEnabled('scheduler.triggers', true) >> { return !suppress }
_ * dynamicConfigService.isEnabled("orca", true) >> { return enabled }
_ * fiatStatus.isEnabled() >> { return enabled }
_ * fiatStatus.isLegacyFallbackEnabled() >> { return legacyFallbackEnabled }
1 * activator.isEnabled() >> upInDiscovery
_ * dynamicConfigService.isEnabled('scheduler.triggers', true) >> !suppress
_ * dynamicConfigService.isEnabled("orca", true) >> enabled
_ * fiatStatus.isEnabled() >> enabled
_ * fiatStatus.isLegacyFallbackEnabled() >> legacyFallbackEnabled

(legacyFallbackEnabled ? 1 : 0) * fiatPermissionEvaluator.getPermission(user ?: "anonymous") >> {
return userPermissions.get(user ?: "anonymous")
Expand All @@ -96,21 +99,22 @@ class PipelineInitiatorSpec extends Specification {
capturedSpinnakerAccounts.orElse(null)?.split(",") as Set<String> == expectedSpinnakerAccounts?.split(",") as Set<String>

where:
user | enabled | suppress | legacyFallbackEnabled || expectedTriggerCalls || expectedSpinnakerUser || expectedSpinnakerAccounts
"anonymous" | false | false | false || 0 || null || null // orca not enabled
null | true | true | false || 0 || null || null // cron triggers enabled but suppressed
"anonymous" | true | false | false || 1 || "anonymous" || null // fallback disabled (no accounts)
"anonymous" | true | false | true || 1 || "anonymous" || "account2,account3" // fallback enabled (all WRITE accounts)
"not-anonymous" | true | false | true || 1 || "not-anonymous" || "account1,account2,account3" // fallback enabled (all WRITE accounts)
null | true | false | true || 1 || "anonymous" || "account2,account3" // null trigger user should default to 'anonymous'
user | upInDiscovery | enabled | suppress | legacyFallbackEnabled || expectedTriggerCalls || expectedSpinnakerUser || expectedSpinnakerAccounts
"anonymous" | false | true | false | false || 0 || null || null // down in discovery
"anonymous" | true | false | false | false || 0 || null || null // orca not enabled
null | true | true | true | false || 0 || null || null // cron triggers enabled but suppressed
"anonymous" | true | true | false | false || 1 || "anonymous" || null // fallback disabled (no accounts)
"anonymous" | true | true | false | true || 1 || "anonymous" || "account2,account3" // fallback enabled (all WRITE accounts)
"not-anonymous" | true | true | false | true || 1 || "not-anonymous" || "account1,account2,account3" // fallback enabled (all WRITE accounts)
null | true | true | false | true || 1 || "anonymous" || "account2,account3" // null trigger user should default to 'anonymous'
}

def "propages auth headers to orca calls without runAs"() {
given:
RequestContext context = contextProvider.get()
def executor = Executors.newFixedThreadPool(2)
def pipelineInitiator = new PipelineInitiator(
registry, orca, Optional.of(fiatPermissionEvaluator), fiatStatus, executor, objectMapper, quietPeriodIndicator, noopDynamicConfigService, 5, 5000
registry, orca, Optional.of(fiatPermissionEvaluator), fiatStatus, executor, objectMapper, quietPeriodIndicator, noopDynamicConfigService, activator, 5, 5000
)

Trigger trigger = (new Trigger.TriggerBuilder().type("cron").build()).atPropagateAuth(true)
Expand Down Expand Up @@ -138,8 +142,9 @@ class PipelineInitiatorSpec extends Specification {
executor.awaitTermination(2, TimeUnit.SECONDS)

then:
_ * fiatStatus.isEnabled() >> { return enabled }
_ * fiatStatus.isLegacyFallbackEnabled() >> { return false }
1 * activator.isEnabled() >> true
_ * fiatStatus.isEnabled() >> true
_ * fiatStatus.isLegacyFallbackEnabled() >> false

1 * orca.trigger(pipeline) >> {
captureAuthorizationContext()
Expand All @@ -154,7 +159,7 @@ class PipelineInitiatorSpec extends Specification {
def "calls orca #expectedPlanCalls to plan pipeline if templated"() {
given:
def pipelineInitiator = new PipelineInitiator(
registry, orca, Optional.empty(), fiatStatus, MoreExecutors.newDirectExecutorService(), objectMapper, quietPeriodIndicator, noopDynamicConfigService, 5, 5000
registry, orca, Optional.empty(), fiatStatus, MoreExecutors.newDirectExecutorService(), objectMapper, quietPeriodIndicator, noopDynamicConfigService, activator, 5, 5000
)

def pipeline = Pipeline.builder()
Expand All @@ -170,7 +175,8 @@ class PipelineInitiatorSpec extends Specification {
pipelineInitiator.startPipeline(pipeline, PipelineInitiator.TriggerSource.CRON_SCHEDULER)

then:
1 * fiatStatus.isEnabled() >> { return true }
1 * fiatStatus.isEnabled() >> true
1 * activator.isEnabled() >> true
expectedPlanCalls * orca.plan(_, true) >> pipelineMap
objectMapper.convertValue(pipelineMap, Pipeline.class) >> pipeline
1 * orca.trigger(_) >> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.netflix.spinnaker.echo.config

import com.netflix.spinnaker.echo.scheduler.actions.pipeline.PipelineConfigsPollingJob
import com.netflix.spinnaker.echo.scheduler.actions.pipeline.PipelineTriggerJob
import com.netflix.spinnaker.echo.scheduler.actions.pipeline.QuartzDiscoveryActivator
import com.netflix.spinnaker.echo.scheduler.actions.pipeline.TriggerConverter
import com.netflix.spinnaker.echo.scheduler.actions.pipeline.TriggerListener
import com.netflix.spinnaker.kork.sql.config.DefaultSqlConfiguration
Expand Down Expand Up @@ -122,4 +123,9 @@ class SchedulerConfiguration {
}
}
}

@Bean
QuartzDiscoveryActivator quartzDiscoveryActivator(SchedulerFactoryBean schedulerFactory) {
return new QuartzDiscoveryActivator(schedulerFactory.getScheduler())
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.echo.scheduler.actions.pipeline;

import com.netflix.spinnaker.kork.discovery.InstanceStatus;
import com.netflix.spinnaker.kork.discovery.RemoteStatusChangedEvent;
import lombok.extern.slf4j.Slf4j;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.springframework.context.ApplicationListener;

@Slf4j
public class QuartzDiscoveryActivator implements ApplicationListener<RemoteStatusChangedEvent> {
private final Scheduler scheduler;

public QuartzDiscoveryActivator(Scheduler scheduler) {
this.scheduler = scheduler;
}

@Override
public void onApplicationEvent(RemoteStatusChangedEvent event) {
if (event.getSource().getStatus() == InstanceStatus.UP) {
log.info("Instance is ${e.status}... resuming quartz scheduler");
try {
scheduler.start();
} catch (SchedulerException e) {
log.warn("Failed to resume quartz scheduler", e);
}
} else if (event.getSource().getPreviousStatus() == InstanceStatus.UP) {
log.info("Instance is ${e.status}... placing quartz into standby");
try {
scheduler.standby();
} catch (SchedulerException e) {
log.warn("Failed to place quartz scheduler into standby", e);
}
}
}
}

0 comments on commit 7042076

Please sign in to comment.