diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandler.kt index 844e983ff2..33378debea 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/RunTaskHandler.kt @@ -89,44 +89,44 @@ class RunTaskHandler( message.withTask { origStage, taskModel, task -> var stage = origStage - val thisInvocationStartTimeMs = clock.millis() - val execution = stage.execution - var taskResult: TaskResult? = null + stage.withAuth { + stage.withLoggingContext(taskModel) { + val thisInvocationStartTimeMs = clock.millis() + val execution = stage.execution + var taskResult: TaskResult? = null - try { - taskExecutionInterceptors.forEach { t -> stage = t.beforeTaskExecution(task, stage) } - - if (execution.isCanceled) { - task.onCancel(stage) - queue.push(CompleteTask(message, CANCELED)) - } else if (execution.status.isComplete) { - queue.push(CompleteTask(message, CANCELED)) - } else if (execution.status == PAUSED) { - queue.push(PauseTask(message)) - } else if (stage.isManuallySkipped()) { - queue.push(CompleteTask(message, SKIPPED)) - } else { try { - task.checkForTimeout(stage, taskModel, message) - } catch (e: TimeoutException) { - registry - .timeoutCounter(stage.execution.type, stage.execution.application, stage.type, taskModel.name) - .increment() - taskResult = task.onTimeout(stage) - - if (taskResult == null) { - // This means this task doesn't care to alter the timeout flow, just throw - throw e - } + taskExecutionInterceptors.forEach { t -> stage = t.beforeTaskExecution(task, stage) } + + if (execution.isCanceled) { + task.onCancel(stage) + queue.push(CompleteTask(message, CANCELED)) + } else if (execution.status.isComplete) { + queue.push(CompleteTask(message, CANCELED)) + } else if (execution.status == PAUSED) { + queue.push(PauseTask(message)) + } else if (stage.isManuallySkipped()) { + queue.push(CompleteTask(message, SKIPPED)) + } else { + try { + task.checkForTimeout(stage, taskModel, message) + } catch (e: TimeoutException) { + registry + .timeoutCounter(stage.execution.type, stage.execution.application, stage.type, taskModel.name) + .increment() + taskResult = task.onTimeout(stage) + + if (taskResult == null) { + // This means this task doesn't care to alter the timeout flow, just throw + throw e + } - if (!setOf(TERMINAL, FAILED_CONTINUE).contains(taskResult.status)) { - log.error("Task ${task.javaClass.name} returned invalid status (${taskResult.status}) for onTimeout") - throw e - } - } + if (!setOf(TERMINAL, FAILED_CONTINUE).contains(taskResult.status)) { + log.error("Task ${task.javaClass.name} returned invalid status (${taskResult.status}) for onTimeout") + throw e + } + } - stage.withAuth { - stage.withLoggingContext(taskModel) { if (taskResult == null) { taskResult = task.execute(stage.withMergedContext()) taskExecutionInterceptors.forEach { t -> taskResult = t.afterTaskExecution(task, stage, taskResult) } @@ -160,26 +160,26 @@ class RunTaskHandler( } } } - } - } - } catch (e: Exception) { - val exceptionDetails = exceptionHandlers.shouldRetry(e, taskModel.name) - if (exceptionDetails?.shouldRetry == true) { - log.warn("Error running ${message.taskType.simpleName} for ${message.executionType}[${message.executionId}]") - queue.push(message, task.backoffPeriod(taskModel, stage)) - trackResult(stage, thisInvocationStartTimeMs, taskModel, RUNNING) - } else if (e is TimeoutException && stage.context["markSuccessfulOnTimeout"] == true) { - queue.push(CompleteTask(message, SUCCEEDED)) - } else { - if (e !is TimeoutException) { - log.error("Error running ${message.taskType.simpleName} for ${message.executionType}[${message.executionId}]", e) - } + } catch (e: Exception) { + val exceptionDetails = exceptionHandlers.shouldRetry(e, taskModel.name) + if (exceptionDetails?.shouldRetry == true) { + log.warn("Error running ${message.taskType.simpleName} for ${message.executionType}[${message.executionId}]") + queue.push(message, task.backoffPeriod(taskModel, stage)) + trackResult(stage, thisInvocationStartTimeMs, taskModel, RUNNING) + } else if (e is TimeoutException && stage.context["markSuccessfulOnTimeout"] == true) { + queue.push(CompleteTask(message, SUCCEEDED)) + } else { + if (e !is TimeoutException) { + log.error("Error running ${message.taskType.simpleName} for ${message.executionType}[${message.executionId}]", e) + } - val status = stage.failureStatus(default = TERMINAL) - stage.context["exception"] = exceptionDetails - repository.storeStage(stage) - queue.push(CompleteTask(message, status, TERMINAL)) - trackResult(stage, thisInvocationStartTimeMs, taskModel, status) + val status = stage.failureStatus(default = TERMINAL) + stage.context["exception"] = exceptionDetails + repository.storeStage(stage) + queue.push(CompleteTask(message, status, TERMINAL)) + trackResult(stage, thisInvocationStartTimeMs, taskModel, status) + } + } } } }