Skip to content

Commit

Permalink
fix(webhooks): addresses issue 3450 - introduce a delay before pollin…
Browse files Browse the repository at this point in the history
…g wehook (#2984)

Add additional parameters to the monitored webhook to allow:
1. waiting some number of seconds before polling starts
2. retrying on specific HTTP status codes

Still needs deck counterpart

see [3450](spinnaker/spinnaker#3450)
  • Loading branch information
marchello2000 committed Jun 24, 2019
1 parent 062e60f commit ac8f4eb
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import static com.netflix.spinnaker.orca.test.model.ExecutionBuilder.stage

class PreconfiguredJobStageSpec extends Specification {

def "should should replace properties in context"() {
def "should replace properties in context"() {
given:
def jobService = Mock(JobService) {
1 * getPreconfiguredStages() >> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.netflix.spinnaker.orca.CancellableStage
import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilder
import com.netflix.spinnaker.orca.pipeline.TaskNode
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.tasks.WaitTask
import com.netflix.spinnaker.orca.webhook.tasks.CreateWebhookTask
import com.netflix.spinnaker.orca.webhook.tasks.MonitorWebhookTask
import com.netflix.spinnaker.orca.pipeline.tasks.artifacts.BindProducedArtifactsTask
Expand All @@ -33,16 +34,19 @@ class WebhookStage implements StageDefinitionBuilder, CancellableStage {

@Override
void taskGraph(Stage stage, TaskNode.Builder builder) {
String waitForCompletion = stage.context.waitForCompletion
StageData stageData = stage.mapTo(StageData)

builder.withTask("createWebhook", CreateWebhookTask)
if (waitForCompletion?.toBoolean()) {
builder
.withTask("monitorWebhook", MonitorWebhookTask)
if (stageData.waitForCompletion) {
if (stageData.waitBeforeMonitor > 0) {
stage.context.putIfAbsent("waitTime", stageData.waitBeforeMonitor)
builder.withTask("waitBeforeMonitorWebhook", WaitTask)
}

builder.withTask("monitorWebhook", MonitorWebhookTask)
}
if (stage.context.containsKey("expectedArtifacts")) {
builder
.withTask(BindProducedArtifactsTask.TASK_NAME, BindProducedArtifactsTask.class);
builder.withTask(BindProducedArtifactsTask.TASK_NAME, BindProducedArtifactsTask.class)
}
}

Expand All @@ -51,4 +55,9 @@ class WebhookStage implements StageDefinitionBuilder, CancellableStage {
log.info("Cancelling stage (stageId: ${stage.id}, executionId: ${stage.execution.id}, context: ${stage.context as Map})")
return new CancellableStage.Result(stage, [:])
}

static class StageData {
boolean waitForCompletion
int waitBeforeMonitor
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.netflix.spinnaker.orca.webhook.tasks

import com.google.common.base.Strings
import com.jayway.jsonpath.JsonPath
import com.jayway.jsonpath.PathNotFoundException
import com.netflix.spinnaker.orca.ExecutionStatus
Expand Down Expand Up @@ -53,48 +54,54 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask {
@Autowired
WebhookService webhookService

static requiredParameters = ["statusEndpoint", "statusJsonPath"]

@Override
TaskResult execute(Stage stage) {
def missing = requiredParameters.findAll { !stage.context.get(it) }
if (!missing.empty) {
throw new IllegalStateException("Missing required parameter${missing.size() > 1 ? 's' : ''} '${missing.join('\', \'')}'")
}
StageData stageData = stage.mapTo(StageData)

String statusEndpoint = stage.context.statusEndpoint
String statusJsonPath = stage.context.statusJsonPath
String progressJsonPath = stage.context.progressJsonPath
String successStatuses = stage.context.successStatuses
String canceledStatuses = stage.context.canceledStatuses
String terminalStatuses = stage.context.terminalStatuses
def customHeaders = stage.context.customHeaders
if (Strings.isNullOrEmpty(stageData.statusEndpoint) || Strings.isNullOrEmpty(stageData.statusJsonPath)) {
throw new IllegalStateException(
"Missing required parameter(s): statusEndpoint = ${stageData.statusEndpoint}, statusJsonPath = ${stageData.statusJsonPath}")
}

def response
try {
response = webhookService.getStatus(statusEndpoint, customHeaders)
response = webhookService.getStatus(stageData.statusEndpoint, stageData.customHeaders)
log.debug(
"Received status code {} from status endpoint {} in execution {} in stage {}",
response.statusCode,
statusEndpoint,
stageData.statusEndpoint,
stage.execution.id,
stage.id
)
} catch (IllegalArgumentException e) {
if (e.cause instanceof UnknownHostException) {
log.warn("name resolution failure in webhook for pipeline ${stage.execution.id} to ${statusEndpoint}, will retry.", e)
log.warn("name resolution failure in webhook for pipeline ${stage.execution.id} to ${stageData.statusEndpoint}, will retry.", e)
return TaskResult.ofStatus(ExecutionStatus.RUNNING)
}

throw e
String errorMessage = "an exception occurred in webhook monitor to ${stageData.statusEndpoint}: ${e}"
log.error(errorMessage, e)
Map<String, ?> outputs = [webhook: [monitor: [:]]]
outputs.webhook.monitor << [error: errorMessage]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
} catch (HttpStatusCodeException e) {
def statusCode = e.getStatusCode()
if (statusCode.is5xxServerError() || statusCode.value() == 429) {
log.warn("error getting webhook status from ${statusEndpoint}, will retry", e)
def statusValue = statusCode.value()

boolean shouldRetry = statusCode.is5xxServerError() ||
(statusValue == 429) ||
((stageData.retryStatusCodes != null) && (stageData.retryStatusCodes.contains(statusValue)))

if (shouldRetry) {
log.warn("Failed to get webhook status from ${stageData.statusEndpoint} with statusCode=${statusCode.value()}, will retry", e)
return TaskResult.ofStatus(ExecutionStatus.RUNNING)
}

throw e
String errorMessage = "an exception occurred in webhook monitor to ${stageData.statusEndpoint}: ${e}"
log.error(errorMessage, e)
Map<String, ?> outputs = [webhook: [monitor: [:]]]
outputs.webhook.monitor << [error: errorMessage]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(outputs).build()
}

def result
Expand All @@ -111,26 +118,26 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask {
"and the keys 'statusCode', 'buildInfo', 'statusEndpoint' and 'error' will be removed. Please migrate today."
]
try {
result = JsonPath.read(response.body, statusJsonPath)
result = JsonPath.read(response.body, stageData.statusJsonPath)
} catch (PathNotFoundException e) {
responsePayload.webhook.monitor << [error: String.format(JSON_PATH_NOT_FOUND_ERR_FMT, "status", statusJsonPath)]
responsePayload.webhook.monitor << [error: String.format(JSON_PATH_NOT_FOUND_ERR_FMT, "status", stageData.statusJsonPath)]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(responsePayload).build()
}
if (!(result instanceof String || result instanceof Number || result instanceof Boolean)) {
responsePayload.webhook.monitor << [error: "The json path '${statusJsonPath}' did not resolve to a single value", resolvedValue: result]
responsePayload.webhook.monitor << [error: "The json path '${stageData.statusJsonPath}' did not resolve to a single value", resolvedValue: result]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(responsePayload).build()
}

if (progressJsonPath) {
if (stageData.progressJsonPath) {
def progress
try {
progress = JsonPath.read(response.body, progressJsonPath)
progress = JsonPath.read(response.body, stageData.progressJsonPath)
} catch (PathNotFoundException e) {
responsePayload.webhook.monitor << [error: String.format(JSON_PATH_NOT_FOUND_ERR_FMT, "progress", statusJsonPath)]
responsePayload.webhook.monitor << [error: String.format(JSON_PATH_NOT_FOUND_ERR_FMT, "progress", stageData.statusJsonPath)]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(responsePayload).build()
}
if (!(progress instanceof String)) {
responsePayload.webhook.monitor << [error: "The json path '${progressJsonPath}' did not resolve to a String value", resolvedValue: progress]
responsePayload.webhook.monitor << [error: "The json path '${stageData.progressJsonPath}' did not resolve to a String value", resolvedValue: progress]
return TaskResult.builder(ExecutionStatus.TERMINAL).context(responsePayload).build()
}
if (progress) {
Expand All @@ -139,7 +146,7 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask {
}
}

def statusMap = createStatusMap(successStatuses, canceledStatuses, terminalStatuses)
def statusMap = createStatusMap(stageData.successStatuses, stageData.canceledStatuses, stageData.terminalStatuses)

if (result instanceof Number) {
def status = result == 100 ? ExecutionStatus.SUCCEEDED : ExecutionStatus.RUNNING
Expand Down Expand Up @@ -168,4 +175,15 @@ class MonitorWebhookTask implements OverridableTimeoutRetryableTask {
private static Map<String, ExecutionStatus> mapStatuses(String statuses, ExecutionStatus status) {
statuses.split(",").collectEntries { [(it.trim().toUpperCase()): status] }
}

private static class StageData {
public String statusEndpoint
public String statusJsonPath
public String progressJsonPath
public String successStatuses
public String canceledStatuses
public String terminalStatuses
public Object customHeaders
public List<Integer> retryStatusCodes
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.webhook.pipeline

import com.netflix.spinnaker.orca.pipeline.TaskNode
import com.netflix.spinnaker.orca.pipeline.model.Execution
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.tasks.WaitTask
import com.netflix.spinnaker.orca.webhook.tasks.CreateWebhookTask
import com.netflix.spinnaker.orca.webhook.tasks.MonitorWebhookTask
import spock.lang.Specification
import spock.lang.Subject
import spock.lang.Unroll

class WebhookStageSpec extends Specification {

def builder = Mock(TaskNode.Builder)

@Subject
webhookStage = new WebhookStage()

@Unroll
def "Should create correct tasks"() {
given:
def stage = new Stage(
Execution.newPipeline("orca"),
"webhook",
[
waitForCompletion: waitForCompletion,
waitBeforeMonitor: waitTime
])

when:
webhookStage.taskGraph(stage, builder)

then:
1 * builder.withTask("createWebhook", CreateWebhookTask)

then:
expectedWaitTaskCount * builder.withTask("waitBeforeMonitorWebhook", WaitTask)

then:
expectedMonitorTaskCount * builder.withTask("monitorWebhook", MonitorWebhookTask)

stage.context.waitTime == expectedWaitTimeInContext

where:
waitForCompletion | waitTime || expectedWaitTimeInContext | expectedWaitTaskCount | expectedMonitorTaskCount
true | 10 || 10 | 1 | 1
true | "2" || 2 | 1 | 1
"true" | 0 || null | 0 | 1
true | -1 || null | 0 | 1
false | 10 || null | 0 | 0
false | 0 || null | 0 | 0
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
package com.netflix.spinnaker.orca.webhook.tasks

import com.netflix.spinnaker.orca.ExecutionStatus
import com.netflix.spinnaker.orca.events.ExecutionStarted
import com.netflix.spinnaker.orca.pipeline.model.Execution
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.webhook.service.WebhookService
import org.apache.tools.ant.taskdefs.condition.Http
import org.springframework.http.HttpStatus
import org.springframework.http.ResponseEntity
import org.springframework.web.client.HttpServerErrorException
import org.springframework.web.client.HttpStatusCodeException
import spock.lang.Specification
import spock.lang.Subject
import spock.lang.Unroll
Expand Down Expand Up @@ -52,10 +56,10 @@ class MonitorWebhookTaskSpec extends Specification {

then:
def ex = thrown IllegalStateException
ex.message == "Missing required parameter '${parameter}'" as String
ex.message.startsWith("Missing required parameter")

where:
parameter << MonitorWebhookTask.requiredParameters
parameter << ["statusEndpoint", "statusJsonPath"]
}

def "should fail if no parameters are supplied"() {
Expand All @@ -67,7 +71,7 @@ class MonitorWebhookTaskSpec extends Specification {

then:
def ex = thrown IllegalStateException
ex.message == "Missing required parameters 'statusEndpoint', 'statusJsonPath'" as String
ex.message == "Missing required parameter(s): statusEndpoint = null, statusJsonPath = null" as String
}

def "should fail in case of URL validation error"() {
Expand All @@ -83,11 +87,11 @@ class MonitorWebhookTaskSpec extends Specification {
}

when:
monitorWebhookTask.execute stage
def result = monitorWebhookTask.execute(stage)

then:
def ex = thrown IllegalArgumentException
ex.message == "Invalid URL"
result.status == ExecutionStatus.TERMINAL
result.context.webhook.monitor.error == "an exception occurred in webhook monitor to https://my-service.io/api/status/123: java.lang.IllegalArgumentException: Invalid URL"
}

def "should retry in case of name resolution error"() {
Expand All @@ -109,6 +113,38 @@ class MonitorWebhookTaskSpec extends Specification {
result.status == ExecutionStatus.RUNNING
}

@Unroll
def "should be #expectedTaskStatus in case of #statusCode"() {
setup:
def stage = new Stage(pipeline, "webhook", [
statusEndpoint: 'https://my-service.io/api/status/123',
statusJsonPath: '$.status',
successStatuses: 'SUCCESS',
canceledStatuses: 'CANCELED',
terminalStatuses: 'TERMINAL',
retryStatusCodes: [404, 405]
])

monitorWebhookTask.webhookService = Mock(WebhookService) {
1 * getStatus("https://my-service.io/api/status/123", _) >> {
throw new HttpServerErrorException(statusCode, statusCode.name())
}
}

when:
def result = monitorWebhookTask.execute stage

then:
result.status == expectedTaskStatus

where:
statusCode | expectedTaskStatus
HttpStatus.TOO_MANY_REQUESTS | ExecutionStatus.RUNNING
HttpStatus.NOT_FOUND | ExecutionStatus.RUNNING
HttpStatus.METHOD_NOT_ALLOWED | ExecutionStatus.RUNNING
HttpStatus.NOT_ACCEPTABLE | ExecutionStatus.TERMINAL
}

def "should do a get request to the defined statusEndpoint"() {
setup:
monitorWebhookTask.webhookService = Mock(WebhookService) {
Expand Down

0 comments on commit ac8f4eb

Please sign in to comment.