Skip to content

Commit

Permalink
fix(queue): don't time out stages that take a long time to run before…
Browse files Browse the repository at this point in the history
… stages
  • Loading branch information
robfletcher committed May 2, 2017
1 parent 5e0ea45 commit bc62c31
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,17 @@ open class RunTaskHandler
private fun Task.isTimedOut(stage: Stage<*>): Boolean =
when (this) {
is RetryableTask -> {
val startTime = Instant.ofEpochMilli(stage.getStartTime())
val startTime = Instant.ofEpochMilli(stage.firstTask()?.startTime ?: stage.getStartTime())
val pausedDuration = stage.getExecution().pausedDuration()
Duration
if (Duration
.between(startTime, clock.instant())
.minus(pausedDuration)
.toMillis() > timeout
.toMillis() > timeout) {
log.warn("${javaClass.simpleName} of stage ${stage.getName()} timed out after ${Duration.between(startTime, clock.instant())}")
true
} else {
false
}
}
else -> false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,84 @@ class RunTaskHandlerSpec : Spek({
verify(task, never()).execute(any())
}
}

context("the execution spent a long time running before stages") {
val timeout = Duration.ofMinutes(5)
val pipeline = pipeline {
stage {
type = "whatever"
startTime = clock.instant().minusMillis(timeout.toMillis() + 1).toEpochMilli()
task {
id = "1"
implementingClass = DummyTask::class.qualifiedName
status = SUCCEEDED
startTime = clock.instant().minusMillis(timeout.toMillis() - 1).toEpochMilli()
}
task {
id = "2"
implementingClass = DummyTask::class.qualifiedName
status = RUNNING
}
}
}
val message = RunTask(Pipeline::class.java, pipeline.id, "foo", pipeline.stages.first().id, "2", DummyTask::class.java)

beforeGroup {
whenever(repository.retrievePipeline(message.executionId)) doReturn pipeline
whenever(task.timeout) doReturn timeout.toMillis()
}

afterGroup(::resetMocks)

action("the handler receives a message") {
handler.handle(message)
}

it("executes the task") {
verify(task).execute(any())
}
}

context("the execution spent a long time running before stages but is timed out anyway") {
val timeout = Duration.ofMinutes(5)
val pipeline = pipeline {
stage {
type = "whatever"
startTime = clock.instant().minusMillis(timeout.toMillis() + 2).toEpochMilli()
task {
id = "1"
implementingClass = DummyTask::class.qualifiedName
status = SUCCEEDED
startTime = clock.instant().minusMillis(timeout.toMillis() + 1).toEpochMilli()
}
task {
id = "2"
implementingClass = DummyTask::class.qualifiedName
status = RUNNING
}
}
}
val message = RunTask(Pipeline::class.java, pipeline.id, "foo", pipeline.stages.first().id, "1", DummyTask::class.java)

beforeGroup {
whenever(repository.retrievePipeline(message.executionId)) doReturn pipeline
whenever(task.timeout) doReturn timeout.toMillis()
}

afterGroup(::resetMocks)

action("the handler receives a message") {
handler.handle(message)
}

it("fails the task") {
verify(queue).push(CompleteTask(message, TERMINAL))
}

it("does not execute the task") {
verify(task, never()).execute(any())
}
}
}

describe("the context passed to the task") {
Expand Down

0 comments on commit bc62c31

Please sign in to comment.