From ac8f4ebdae2127f0649adbcc01bce6bb31690be8 Mon Sep 17 00:00:00 2001 From: Mark Vulfson Date: Mon, 24 Jun 2019 08:44:16 -0700 Subject: [PATCH] fix(webhooks): addresses issue 3450 - introduce a delay before polling wehook (#2984) Add additional parameters to the monitored webhook to allow: 1. waiting some number of seconds before polling starts 2. retrying on specific HTTP status codes Still needs deck counterpart see [3450](https://github.com/spinnaker/spinnaker/issues/3450) --- .../job/PreconfiguredJobStageSpec.groovy | 2 +- .../orca/webhook/pipeline/WebhookStage.groovy | 21 ++++-- .../webhook/tasks/MonitorWebhookTask.groovy | 74 ++++++++++++------- .../webhook/pipeline/WebhookStageSpec.groovy | 71 ++++++++++++++++++ .../tasks/MonitorWebhookTaskSpec.groovy | 48 ++++++++++-- 5 files changed, 175 insertions(+), 41 deletions(-) create mode 100644 orca-webhook/src/test/groovy/com/netflix/spinnaker/orca/webhook/pipeline/WebhookStageSpec.groovy diff --git a/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/clouddriver/pipeline/job/PreconfiguredJobStageSpec.groovy b/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/clouddriver/pipeline/job/PreconfiguredJobStageSpec.groovy index c01afcc803..5f8ff15db3 100644 --- a/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/clouddriver/pipeline/job/PreconfiguredJobStageSpec.groovy +++ b/orca-clouddriver/src/test/groovy/com/netflix/spinnaker/orca/clouddriver/pipeline/job/PreconfiguredJobStageSpec.groovy @@ -27,7 +27,7 @@ import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.stage class PreconfiguredJobStageSpec extends Specification { - def "should should replace properties in context"() { + def "should replace properties in context"() { given: def jobService = Mock(JobService) { 1 * getPreconfiguredStages() >> { diff --git a/orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/pipeline/WebhookStage.groovy b/orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/pipeline/WebhookStage.groovy index 04fec7bc00..2dcdc75d74 100644 --- a/orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/pipeline/WebhookStage.groovy +++ b/orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/pipeline/WebhookStage.groovy @@ -21,6 +21,7 @@ import com.netflix.spinnaker.orca.CancellableStage import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilder import com.netflix.spinnaker.orca.pipeline.TaskNode import com.netflix.spinnaker.orca.pipeline.model.Stage +import com.netflix.spinnaker.orca.pipeline.tasks.WaitTask import com.netflix.spinnaker.orca.webhook.tasks.CreateWebhookTask import com.netflix.spinnaker.orca.webhook.tasks.MonitorWebhookTask import com.netflix.spinnaker.orca.pipeline.tasks.artifacts.BindProducedArtifactsTask @@ -33,16 +34,19 @@ class WebhookStage implements StageDefinitionBuilder, CancellableStage { @Override void taskGraph(Stage stage, TaskNode.Builder builder) { - String waitForCompletion = stage.context.waitForCompletion + StageData stageData = stage.mapTo(StageData) builder.withTask("createWebhook", CreateWebhookTask) - if (waitForCompletion?.toBoolean()) { - builder - .withTask("monitorWebhook", MonitorWebhookTask) + if (stageData.waitForCompletion) { + if (stageData.waitBeforeMonitor > 0) { + stage.context.putIfAbsent("waitTime", stageData.waitBeforeMonitor) + builder.withTask("waitBeforeMonitorWebhook", WaitTask) + } + + builder.withTask("monitorWebhook", MonitorWebhookTask) } if (stage.context.containsKey("expectedArtifacts")) { - builder - .withTask(BindProducedArtifactsTask.TASK_NAME, BindProducedArtifactsTask.class); + builder.withTask(BindProducedArtifactsTask.TASK_NAME, BindProducedArtifactsTask.class) } } @@ -51,4 +55,9 @@ class WebhookStage implements StageDefinitionBuilder, CancellableStage { log.info("Cancelling stage (stageId: ${stage.id}, executionId: ${stage.execution.id}, context: ${stage.context as Map})") return new CancellableStage.Result(stage, [:]) } + + static class StageData { + boolean waitForCompletion + int waitBeforeMonitor + } } diff --git a/orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/tasks/MonitorWebhookTask.groovy b/orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/tasks/MonitorWebhookTask.groovy index 7f4bcf1ab6..2d7baab57b 100644 --- a/orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/tasks/MonitorWebhookTask.groovy +++ b/orca-webhook/src/main/groovy/com/netflix/spinnaker/orca/webhook/tasks/MonitorWebhookTask.groovy @@ -17,6 +17,7 @@ package com.netflix.spinnaker.orca.webhook.tasks +import com.google.common.base.Strings import com.jayway.jsonpath.JsonPath import com.jayway.jsonpath.PathNotFoundException import com.netflix.spinnaker.orca.ExecutionStatus @@ -53,48 +54,54 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask { @Autowired WebhookService webhookService - static requiredParameters = ["statusEndpoint", "statusJsonPath"] - @Override TaskResult execute(Stage stage) { - def missing = requiredParameters.findAll { !stage.context.get(it) } - if (!missing.empty) { - throw new IllegalStateException("Missing required parameter${missing.size() > 1 ? 's' : ''} '${missing.join('\', \'')}'") - } + StageData stageData = stage.mapTo(StageData) - String statusEndpoint = stage.context.statusEndpoint - String statusJsonPath = stage.context.statusJsonPath - String progressJsonPath = stage.context.progressJsonPath - String successStatuses = stage.context.successStatuses - String canceledStatuses = stage.context.canceledStatuses - String terminalStatuses = stage.context.terminalStatuses - def customHeaders = stage.context.customHeaders + if (Strings.isNullOrEmpty(stageData.statusEndpoint) || Strings.isNullOrEmpty(stageData.statusJsonPath)) { + throw new IllegalStateException( + "Missing required parameter(s): statusEndpoint = ${stageData.statusEndpoint}, statusJsonPath = ${stageData.statusJsonPath}") + } def response try { - response = webhookService.getStatus(statusEndpoint, customHeaders) + response = webhookService.getStatus(stageData.statusEndpoint, stageData.customHeaders) log.debug( "Received status code {} from status endpoint {} in execution {} in stage {}", response.statusCode, - statusEndpoint, + stageData.statusEndpoint, stage.execution.id, stage.id ) } catch (IllegalArgumentException e) { if (e.cause instanceof UnknownHostException) { - log.warn("name resolution failure in webhook for pipeline ${stage.execution.id} to ${statusEndpoint}, will retry.", e) + log.warn("name resolution failure in webhook for pipeline ${stage.execution.id} to ${stageData.statusEndpoint}, will retry.", e) return TaskResult.ofStatus(ExecutionStatus.RUNNING) } - throw e + String errorMessage = "an exception occurred in webhook monitor to ${stageData.statusEndpoint}: ${e}" + log.error(errorMessage, e) + Map outputs = [webhook: [monitor: [:]]] + outputs.webhook.monitor << [error: errorMessage] + return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build() } catch (HttpStatusCodeException e) { def statusCode = e.getStatusCode() - if (statusCode.is5xxServerError() || statusCode.value() == 429) { - log.warn("error getting webhook status from ${statusEndpoint}, will retry", e) + def statusValue = statusCode.value() + + boolean shouldRetry = statusCode.is5xxServerError() || + (statusValue == 429) || + ((stageData.retryStatusCodes != null) && (stageData.retryStatusCodes.contains(statusValue))) + + if (shouldRetry) { + log.warn("Failed to get webhook status from ${stageData.statusEndpoint} with statusCode=${statusCode.value()}, will retry", e) return TaskResult.ofStatus(ExecutionStatus.RUNNING) } - throw e + String errorMessage = "an exception occurred in webhook monitor to ${stageData.statusEndpoint}: ${e}" + log.error(errorMessage, e) + Map outputs = [webhook: [monitor: [:]]] + outputs.webhook.monitor << [error: errorMessage] + return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build() } def result @@ -111,26 +118,26 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask { "and the keys 'statusCode', 'buildInfo', 'statusEndpoint' and 'error' will be removed. Please migrate today." ] try { - result = JsonPath.read(response.body, statusJsonPath) + result = JsonPath.read(response.body, stageData.statusJsonPath) } catch (PathNotFoundException e) { - responsePayload.webhook.monitor << [error: String.format(JSON_PATH_NOT_FOUND_ERR_FMT, "status", statusJsonPath)] + responsePayload.webhook.monitor << [error: String.format(JSON_PATH_NOT_FOUND_ERR_FMT, "status", stageData.statusJsonPath)] return TaskResult.builder(ExecutionStatus.TERMINAL).context(responsePayload).build() } if (!(result instanceof String || result instanceof Number || result instanceof Boolean)) { - responsePayload.webhook.monitor << [error: "The json path '${statusJsonPath}' did not resolve to a single value", resolvedValue: result] + responsePayload.webhook.monitor << [error: "The json path '${stageData.statusJsonPath}' did not resolve to a single value", resolvedValue: result] return TaskResult.builder(ExecutionStatus.TERMINAL).context(responsePayload).build() } - if (progressJsonPath) { + if (stageData.progressJsonPath) { def progress try { - progress = JsonPath.read(response.body, progressJsonPath) + progress = JsonPath.read(response.body, stageData.progressJsonPath) } catch (PathNotFoundException e) { - responsePayload.webhook.monitor << [error: String.format(JSON_PATH_NOT_FOUND_ERR_FMT, "progress", statusJsonPath)] + responsePayload.webhook.monitor << [error: String.format(JSON_PATH_NOT_FOUND_ERR_FMT, "progress", stageData.statusJsonPath)] return TaskResult.builder(ExecutionStatus.TERMINAL).context(responsePayload).build() } if (!(progress instanceof String)) { - responsePayload.webhook.monitor << [error: "The json path '${progressJsonPath}' did not resolve to a String value", resolvedValue: progress] + responsePayload.webhook.monitor << [error: "The json path '${stageData.progressJsonPath}' did not resolve to a String value", resolvedValue: progress] return TaskResult.builder(ExecutionStatus.TERMINAL).context(responsePayload).build() } if (progress) { @@ -139,7 +146,7 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask { } } - def statusMap = createStatusMap(successStatuses, canceledStatuses, terminalStatuses) + def statusMap = createStatusMap(stageData.successStatuses, stageData.canceledStatuses, stageData.terminalStatuses) if (result instanceof Number) { def status = result == 100 ? ExecutionStatus.SUCCEEDED : ExecutionStatus.RUNNING @@ -168,4 +175,15 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask { private static Map mapStatuses(String statuses, ExecutionStatus status) { statuses.split(",").collectEntries { [(it.trim().toUpperCase()): status] } } + + private static class StageData { + public String statusEndpoint + public String statusJsonPath + public String progressJsonPath + public String successStatuses + public String canceledStatuses + public String terminalStatuses + public Object customHeaders + public List retryStatusCodes + } } diff --git a/orca-webhook/src/test/groovy/com/netflix/spinnaker/orca/webhook/pipeline/WebhookStageSpec.groovy b/orca-webhook/src/test/groovy/com/netflix/spinnaker/orca/webhook/pipeline/WebhookStageSpec.groovy new file mode 100644 index 0000000000..40f1c0f8e9 --- /dev/null +++ b/orca-webhook/src/test/groovy/com/netflix/spinnaker/orca/webhook/pipeline/WebhookStageSpec.groovy @@ -0,0 +1,71 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netflix.spinnaker.orca.webhook.pipeline + +import com.netflix.spinnaker.orca.pipeline.TaskNode +import com.netflix.spinnaker.orca.pipeline.model.Execution +import com.netflix.spinnaker.orca.pipeline.model.Stage +import com.netflix.spinnaker.orca.pipeline.tasks.WaitTask +import com.netflix.spinnaker.orca.webhook.tasks.CreateWebhookTask +import com.netflix.spinnaker.orca.webhook.tasks.MonitorWebhookTask +import spock.lang.Specification +import spock.lang.Subject +import spock.lang.Unroll + +class WebhookStageSpec extends Specification { + + def builder = Mock(TaskNode.Builder) + + @Subject + webhookStage = new WebhookStage() + + @Unroll + def "Should create correct tasks"() { + given: + def stage = new Stage( + Execution.newPipeline("orca"), + "webhook", + [ + waitForCompletion: waitForCompletion, + waitBeforeMonitor: waitTime + ]) + + when: + webhookStage.taskGraph(stage, builder) + + then: + 1 * builder.withTask("createWebhook", CreateWebhookTask) + + then: + expectedWaitTaskCount * builder.withTask("waitBeforeMonitorWebhook", WaitTask) + + then: + expectedMonitorTaskCount * builder.withTask("monitorWebhook", MonitorWebhookTask) + + stage.context.waitTime == expectedWaitTimeInContext + + where: + waitForCompletion | waitTime || expectedWaitTimeInContext | expectedWaitTaskCount | expectedMonitorTaskCount + true | 10 || 10 | 1 | 1 + true | "2" || 2 | 1 | 1 + "true" | 0 || null | 0 | 1 + true | -1 || null | 0 | 1 + false | 10 || null | 0 | 0 + false | 0 || null | 0 | 0 + } + +} diff --git a/orca-webhook/src/test/groovy/com/netflix/spinnaker/orca/webhook/tasks/MonitorWebhookTaskSpec.groovy b/orca-webhook/src/test/groovy/com/netflix/spinnaker/orca/webhook/tasks/MonitorWebhookTaskSpec.groovy index feb98d78c6..08cde8be54 100644 --- a/orca-webhook/src/test/groovy/com/netflix/spinnaker/orca/webhook/tasks/MonitorWebhookTaskSpec.groovy +++ b/orca-webhook/src/test/groovy/com/netflix/spinnaker/orca/webhook/tasks/MonitorWebhookTaskSpec.groovy @@ -18,11 +18,15 @@ package com.netflix.spinnaker.orca.webhook.tasks import com.netflix.spinnaker.orca.ExecutionStatus +import com.netflix.spinnaker.orca.events.ExecutionStarted import com.netflix.spinnaker.orca.pipeline.model.Execution import com.netflix.spinnaker.orca.pipeline.model.Stage import com.netflix.spinnaker.orca.webhook.service.WebhookService +import org.apache.tools.ant.taskdefs.condition.Http import org.springframework.http.HttpStatus import org.springframework.http.ResponseEntity +import org.springframework.web.client.HttpServerErrorException +import org.springframework.web.client.HttpStatusCodeException import spock.lang.Specification import spock.lang.Subject import spock.lang.Unroll @@ -52,10 +56,10 @@ class MonitorWebhookTaskSpec extends Specification { then: def ex = thrown IllegalStateException - ex.message == "Missing required parameter '${parameter}'" as String + ex.message.startsWith("Missing required parameter") where: - parameter << MonitorWebhookTask.requiredParameters + parameter << ["statusEndpoint", "statusJsonPath"] } def "should fail if no parameters are supplied"() { @@ -67,7 +71,7 @@ class MonitorWebhookTaskSpec extends Specification { then: def ex = thrown IllegalStateException - ex.message == "Missing required parameters 'statusEndpoint', 'statusJsonPath'" as String + ex.message == "Missing required parameter(s): statusEndpoint = null, statusJsonPath = null" as String } def "should fail in case of URL validation error"() { @@ -83,11 +87,11 @@ class MonitorWebhookTaskSpec extends Specification { } when: - monitorWebhookTask.execute stage + def result = monitorWebhookTask.execute(stage) then: - def ex = thrown IllegalArgumentException - ex.message == "Invalid URL" + result.status == ExecutionStatus.TERMINAL + result.context.webhook.monitor.error == "an exception occurred in webhook monitor to https://my-service.io/api/status/123: java.lang.IllegalArgumentException: Invalid URL" } def "should retry in case of name resolution error"() { @@ -109,6 +113,38 @@ class MonitorWebhookTaskSpec extends Specification { result.status == ExecutionStatus.RUNNING } + @Unroll + def "should be #expectedTaskStatus in case of #statusCode"() { + setup: + def stage = new Stage(pipeline, "webhook", [ + statusEndpoint: 'https://my-service.io/api/status/123', + statusJsonPath: '$.status', + successStatuses: 'SUCCESS', + canceledStatuses: 'CANCELED', + terminalStatuses: 'TERMINAL', + retryStatusCodes: [404, 405] + ]) + + monitorWebhookTask.webhookService = Mock(WebhookService) { + 1 * getStatus("https://my-service.io/api/status/123", _) >> { + throw new HttpServerErrorException(statusCode, statusCode.name()) + } + } + + when: + def result = monitorWebhookTask.execute stage + + then: + result.status == expectedTaskStatus + + where: + statusCode | expectedTaskStatus + HttpStatus.TOO_MANY_REQUESTS | ExecutionStatus.RUNNING + HttpStatus.NOT_FOUND | ExecutionStatus.RUNNING + HttpStatus.METHOD_NOT_ALLOWED | ExecutionStatus.RUNNING + HttpStatus.NOT_ACCEPTABLE | ExecutionStatus.TERMINAL + } + def "should do a get request to the defined statusEndpoint"() { setup: monitorWebhookTask.webhookService = Mock(WebhookService) {