Skip to content

Commit

Permalink
chore(queue): removed unused throttle time attribute
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed Apr 4, 2018
1 parent fe9c0c4 commit 1b48de9
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType
import com.netflix.spinnaker.orca.q.redis.migration.ExecutionTypeDeserializer
import com.netflix.spinnaker.orca.q.redis.migration.OrcaToKeikoSerializationMigrator
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.migration.SerializationMigrator
import com.netflix.spinnaker.q.redis.RedisDeadMessageHandler
import com.netflix.spinnaker.q.redis.RedisQueue
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.context.properties.EnableConfigurationProperties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class RedisQueueShovelConfiguration {
return RedisConfiguration.createPool(redisPoolConfig, previousConnection, timeout, registry, "previousQueueJedisPool")
}

@Bean(name = ["previous"])
@Bean(name = ["previousQueue"])
@ConditionalOnBean(name = ["previousQueueJedisPool"]) fun previousRedisQueue(
@Qualifier("previousQueueJedisPool") redisPool: Pool<Jedis>,
redisQueueProperties: RedisQueueProperties,
Expand All @@ -83,7 +83,7 @@ class RedisQueueShovelConfiguration {
@Bean
@ConditionalOnBean(name = arrayOf("previousQueueJedisPool")) fun redisQueueShovel(
queueImpl: RedisQueue,
@Qualifier("previous") previousQueueImpl: RedisQueue,
@Qualifier("previousQueue") previousQueueImpl: RedisQueue,
registry: Registry,
activator: Activator
) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.pipeline.util.ContextParameterProcessor
import com.netflix.spinnaker.orca.pipeline.util.StageNavigator
import com.netflix.spinnaker.orca.q.*
import com.netflix.spinnaker.orca.q.CompleteTask
import com.netflix.spinnaker.orca.q.InvalidTaskType
import com.netflix.spinnaker.orca.q.PauseTask
import com.netflix.spinnaker.orca.q.RunTask
import com.netflix.spinnaker.orca.time.toDuration
import com.netflix.spinnaker.orca.time.toInstant
import com.netflix.spinnaker.q.Message
Expand All @@ -42,6 +45,7 @@ import java.time.Duration.ZERO
import java.time.Instant
import java.time.temporal.TemporalAmount
import java.util.concurrent.TimeUnit
import kotlin.collections.set

@Component
class RunTaskHandler(
Expand Down Expand Up @@ -71,20 +75,20 @@ class RunTaskHandler(
// TODO: rather send this data with CompleteTask message
stage.processTaskOutput(result)
when (result.status) {
RUNNING -> {
RUNNING -> {
queue.push(message, task.backoffPeriod(taskModel, stage))
trackResult(stage, taskModel, result.status)
}
SUCCEEDED, REDIRECT, FAILED_CONTINUE -> {
queue.push(CompleteTask(message, result.status))
trackResult(stage, taskModel, result.status)
}
TERMINAL, CANCELED -> {
TERMINAL, CANCELED -> {
val status = stage.failureStatus(default = result.status)
queue.push(CompleteTask(message, status))
trackResult(stage, taskModel, status)
}
else ->
else ->
TODO("Unhandled task status ${result.status}")
}
}
Expand Down Expand Up @@ -160,9 +164,10 @@ class RunTaskHandler(
private fun Task.backoffPeriod(taskModel: com.netflix.spinnaker.orca.pipeline.model.Task, stage: Stage): TemporalAmount =
when (this) {
is RetryableTask -> Duration.ofMillis(
getDynamicBackoffPeriod(stage, Duration.ofMillis(System.currentTimeMillis() - (taskModel.startTime ?: 0)))
getDynamicBackoffPeriod(stage, Duration.ofMillis(System.currentTimeMillis() - (taskModel.startTime
?: 0)))
)
else -> Duration.ofSeconds(1)
else -> Duration.ofSeconds(1)
}

private fun formatTimeout(timeout: Long): String {
Expand All @@ -180,18 +185,16 @@ class RunTaskHandler(
if (startTime != null) {
val pausedDuration = stage.execution.pausedDurationRelativeTo(startTime)
val elapsedTime = Duration.between(startTime, clock.instant())
val throttleTime = message.getAttribute<TotalThrottleTimeAttribute>()?.totalThrottleTimeMs ?: 0
val actualTimeout = (
if (this is OverridableTimeoutRetryableTask && stage.parentWithTimeout.isPresent)
stage.parentWithTimeout.get().timeout.get().toDuration()
else
timeout.toDuration()
)
if (elapsedTime.minus(pausedDuration).minusMillis(throttleTime) > actualTimeout) {
if (elapsedTime.minus(pausedDuration) > actualTimeout) {
val durationString = formatTimeout(elapsedTime.toMillis())
val msg = StringBuilder("${javaClass.simpleName} of stage ${stage.name} timed out after $durationString. ")
msg.append("pausedDuration: ${formatTimeout(pausedDuration.toMillis())}, ")
msg.append("throttleTime: ${formatTimeout(throttleTime)}, ")
msg.append("elapsedTime: ${formatTimeout(elapsedTime.toMillis())},")
msg.append("timeoutValue: ${formatTimeout(actualTimeout.toMillis())}")

Expand All @@ -216,7 +219,6 @@ class RunTaskHandler(
}
}


private fun Execution.pausedDurationRelativeTo(instant: Instant?): Duration {
val pausedDetails = paused
return if (pausedDetails != null) {
Expand Down Expand Up @@ -251,6 +253,6 @@ class RunTaskHandler(
private fun Stage.failureStatus(default: ExecutionStatus = TERMINAL) =
when {
shouldContinueOnFailure() -> FAILED_CONTINUE
shouldFailPipeline() -> default
else -> STOPPED
shouldFailPipeline() -> default
else -> STOPPED
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,8 @@ import com.netflix.spinnaker.orca.pipeline.model.Execution
import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType
import com.netflix.spinnaker.orca.pipeline.model.Stage
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.q.Attribute
import com.netflix.spinnaker.q.Message

@JsonTypeName("totalThrottleTime")
data class TotalThrottleTimeAttribute(var totalThrottleTimeMs: Long = 0) : Attribute {
fun add(throttleTimeMs: Long) {
this.totalThrottleTimeMs += throttleTimeMs
}
}

/**
* Messages used internally by the queueing system.
*/
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1182,38 +1182,6 @@ object RunTaskHandlerTest : SubjectSpek<RunTaskHandler>({
}
}

describe("deduct the time spent throttled from the elapsed time") {

val timeout = Duration.ofMinutes(5)
val pipeline = pipeline {
stage {
type = "somethingFun"
task {
id = "1"
startTime = clock.instant().minusMillis(timeout.toMillis() + 1).toEpochMilli()
status = RUNNING
}
}
}
val message = RunTask(pipeline.type, pipeline.id, "foo", pipeline.stages.first().id, "1", DummyTask::class.java)
message.setAttribute(TotalThrottleTimeAttribute(5000L))
val taskResult = TaskResult(RUNNING)

beforeGroup {
whenever(task.execute(any())) doReturn taskResult
whenever(repository.retrieve(PIPELINE, message.executionId)) doReturn pipeline
whenever(task.timeout) doReturn timeout.toMillis()
}

action("the handler receives a message") {
subject.handle(message)
}

it("should not timeout and push the message back on the queue") {
verify(queue).push(message, Duration.ofMillis(0))
}
}

describe("should bucket task durations") {
mapOf(
0 to "lt5m",
Expand Down

0 comments on commit 1b48de9

Please sign in to comment.