Skip to content

Commit

Permalink
fix(logging): Wrap all of RunTaskHandler in a withAuth for context (#…
Browse files Browse the repository at this point in the history
…3825)

This hoists up the `.withAuth` and `.withLoggingContext` calls such that all logging (specifically those for thrown exceptions)
contains the execution/stage/task ids
  • Loading branch information
marchello2000 committed Jul 18, 2020
1 parent d96c736 commit c1005e1
Showing 1 changed file with 53 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,44 +89,44 @@ class RunTaskHandler(
message.withTask { origStage, taskModel, task ->
var stage = origStage

val thisInvocationStartTimeMs = clock.millis()
val execution = stage.execution
var taskResult: TaskResult? = null
stage.withAuth {
stage.withLoggingContext(taskModel) {
val thisInvocationStartTimeMs = clock.millis()
val execution = stage.execution
var taskResult: TaskResult? = null

try {
taskExecutionInterceptors.forEach { t -> stage = t.beforeTaskExecution(task, stage) }

if (execution.isCanceled) {
task.onCancel(stage)
queue.push(CompleteTask(message, CANCELED))
} else if (execution.status.isComplete) {
queue.push(CompleteTask(message, CANCELED))
} else if (execution.status == PAUSED) {
queue.push(PauseTask(message))
} else if (stage.isManuallySkipped()) {
queue.push(CompleteTask(message, SKIPPED))
} else {
try {
task.checkForTimeout(stage, taskModel, message)
} catch (e: TimeoutException) {
registry
.timeoutCounter(stage.execution.type, stage.execution.application, stage.type, taskModel.name)
.increment()
taskResult = task.onTimeout(stage)

if (taskResult == null) {
// This means this task doesn't care to alter the timeout flow, just throw
throw e
}
taskExecutionInterceptors.forEach { t -> stage = t.beforeTaskExecution(task, stage) }

if (execution.isCanceled) {
task.onCancel(stage)
queue.push(CompleteTask(message, CANCELED))
} else if (execution.status.isComplete) {
queue.push(CompleteTask(message, CANCELED))
} else if (execution.status == PAUSED) {
queue.push(PauseTask(message))
} else if (stage.isManuallySkipped()) {
queue.push(CompleteTask(message, SKIPPED))
} else {
try {
task.checkForTimeout(stage, taskModel, message)
} catch (e: TimeoutException) {
registry
.timeoutCounter(stage.execution.type, stage.execution.application, stage.type, taskModel.name)
.increment()
taskResult = task.onTimeout(stage)

if (taskResult == null) {
// This means this task doesn't care to alter the timeout flow, just throw
throw e
}

if (!setOf(TERMINAL, FAILED_CONTINUE).contains(taskResult.status)) {
log.error("Task ${task.javaClass.name} returned invalid status (${taskResult.status}) for onTimeout")
throw e
}
}
if (!setOf(TERMINAL, FAILED_CONTINUE).contains(taskResult.status)) {
log.error("Task ${task.javaClass.name} returned invalid status (${taskResult.status}) for onTimeout")
throw e
}
}

stage.withAuth {
stage.withLoggingContext(taskModel) {
if (taskResult == null) {
taskResult = task.execute(stage.withMergedContext())
taskExecutionInterceptors.forEach { t -> taskResult = t.afterTaskExecution(task, stage, taskResult) }
Expand Down Expand Up @@ -160,26 +160,26 @@ class RunTaskHandler(
}
}
}
}
}
} catch (e: Exception) {
val exceptionDetails = exceptionHandlers.shouldRetry(e, taskModel.name)
if (exceptionDetails?.shouldRetry == true) {
log.warn("Error running ${message.taskType.simpleName} for ${message.executionType}[${message.executionId}]")
queue.push(message, task.backoffPeriod(taskModel, stage))
trackResult(stage, thisInvocationStartTimeMs, taskModel, RUNNING)
} else if (e is TimeoutException && stage.context["markSuccessfulOnTimeout"] == true) {
queue.push(CompleteTask(message, SUCCEEDED))
} else {
if (e !is TimeoutException) {
log.error("Error running ${message.taskType.simpleName} for ${message.executionType}[${message.executionId}]", e)
}
} catch (e: Exception) {
val exceptionDetails = exceptionHandlers.shouldRetry(e, taskModel.name)
if (exceptionDetails?.shouldRetry == true) {
log.warn("Error running ${message.taskType.simpleName} for ${message.executionType}[${message.executionId}]")
queue.push(message, task.backoffPeriod(taskModel, stage))
trackResult(stage, thisInvocationStartTimeMs, taskModel, RUNNING)
} else if (e is TimeoutException && stage.context["markSuccessfulOnTimeout"] == true) {
queue.push(CompleteTask(message, SUCCEEDED))
} else {
if (e !is TimeoutException) {
log.error("Error running ${message.taskType.simpleName} for ${message.executionType}[${message.executionId}]", e)
}

val status = stage.failureStatus(default = TERMINAL)
stage.context["exception"] = exceptionDetails
repository.storeStage(stage)
queue.push(CompleteTask(message, status, TERMINAL))
trackResult(stage, thisInvocationStartTimeMs, taskModel, status)
val status = stage.failureStatus(default = TERMINAL)
stage.context["exception"] = exceptionDetails
repository.storeStage(stage)
queue.push(CompleteTask(message, status, TERMINAL))
trackResult(stage, thisInvocationStartTimeMs, taskModel, status)
}
}
}
}
}
Expand Down

0 comments on commit c1005e1

Please sign in to comment.