Skip to content

Commit

Permalink
feat(queue): sql backed pending execution service (#3035)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher authored Jul 15, 2019
1 parent df0485c commit ffd9e97
Show file tree
Hide file tree
Showing 13 changed files with 624 additions and 168 deletions.
1 change: 1 addition & 0 deletions orca-queue-redis/orca-queue-redis.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ dependencies {
implementation("com.netflix.spinnaker.keiko:keiko-redis-spring:$keikoVersion")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

testImplementation(project(":orca-queue"))
testImplementation(project(":orca-queue-tck"))
testImplementation(project(":orca-test-kotlin"))
testImplementation(project(":orca-test-redis"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.netflix.spinnaker.orca.TaskResolver
import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType
import com.netflix.spinnaker.orca.q.pending.PendingExecutionService
import com.netflix.spinnaker.orca.q.redis.migration.ExecutionTypeDeserializer
import com.netflix.spinnaker.orca.q.redis.migration.OrcaToKeikoSerializationMigrator
import com.netflix.spinnaker.orca.q.redis.migration.TaskTypeDeserializer
Expand All @@ -31,6 +32,7 @@ import com.netflix.spinnaker.q.redis.RedisDeadMessageHandler
import com.netflix.spinnaker.q.redis.RedisQueue
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
Expand Down Expand Up @@ -80,6 +82,7 @@ class RedisOrcaQueueConfiguration : RedisQueueConfiguration() {
}

@Bean
@ConditionalOnMissingBean(PendingExecutionService::class)
fun pendingExecutionService(
@Qualifier("queueRedisPool") jedisPool: Pool<Jedis>,
mapper: ObjectMapper
Expand Down
Original file line number Diff line number Diff line change
@@ -1,187 +1,27 @@
/*
* Copyright 2018 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.q.redis.pending

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.netflix.spinnaker.kork.jedis.EmbeddedRedis
import com.netflix.spinnaker.orca.fixture.pipeline
import com.netflix.spinnaker.orca.fixture.stage
import com.netflix.spinnaker.orca.q.PendingExecutionServiceTest
import com.netflix.spinnaker.orca.q.RestartStage
import com.netflix.spinnaker.orca.q.StartExecution
import com.netflix.spinnaker.q.Message
import com.nhaarman.mockito_kotlin.mock
import com.nhaarman.mockito_kotlin.verify
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import java.util.UUID
import org.jetbrains.spek.subject.SubjectSpek
import org.jetbrains.spek.subject.itBehavesLike

internal object RedisPendingExecutionServiceTest : SubjectSpek<RedisPendingExecutionService> ({

internal object RedisPendingExecutionServiceTest : Spek({
itBehavesLike(PendingExecutionServiceTest)

lateinit var redis: EmbeddedRedis
val redis = EmbeddedRedis.embed()
val mapper = ObjectMapper().apply {
registerModule(KotlinModule())
registerSubtypes(StartExecution::class.java, RestartStage::class.java)
}
lateinit var subject: RedisPendingExecutionService

beforeGroup {
redis = EmbeddedRedis.embed()
subject = RedisPendingExecutionService(redis.pool, mapper)
}
subject { RedisPendingExecutionService(redis.pool, mapper) }

afterGroup {
redis.destroy()
}

fun flushAll() {
redis.pool.resource.use { it.flushAll() }
}

val id = UUID.randomUUID().toString()
val pipeline = pipeline {
pipelineConfigId = id
stage {
refId = "1"
}
stage {
refId = "2"
requisiteStageRefIds = setOf("1")
}
}
val startMessage = StartExecution(pipeline)
val restartMessage = RestartStage(pipeline.stageByRef("2"), "fzlem@netflix.com")

sequenceOf<Message>(startMessage, restartMessage).forEach { message ->
describe("enqueueing a ${message.javaClass.simpleName} message") {
given("the queue is empty") {
beforeGroup {
assertThat(subject.depth(id)).isZero()
}

on("enqueueing the message") {
subject.enqueue(id, message)

it("makes the depth 1") {
assertThat(subject.depth(id)).isOne()
}
}

afterGroup(::flushAll)
}
}
}

describe("popping a message") {
given("the queue is empty") {
beforeGroup {
assertThat(subject.depth(id)).isZero()
}

on("popping a message") {
val popped = subject.popOldest(id)

it("returns null") {
assertThat(popped).isNull()
}
}
}

given("a message was enqueued") {
beforeGroup {
subject.enqueue(id, startMessage)
}

on("popping a message") {
val popped = subject.popOldest(id)

it("returns the message") {
assertThat(popped).isEqualTo(startMessage)
}

it("removes the message from the queue") {
assertThat(subject.depth(id)).isZero()
}
}

afterGroup(::flushAll)
}

given("multiple messages were enqueued") {
beforeEachTest {
subject.enqueue(id, startMessage)
subject.enqueue(id, restartMessage)
}

on("popping the oldest message") {
val popped = subject.popOldest(id)

it("returns the oldest message") {
assertThat(popped).isEqualTo(startMessage)
}

it("removes the message from the queue") {
assertThat(subject.depth(id)).isOne()
}
}

on("popping the newest message") {
val popped = subject.popNewest(id)

it("returns the newest message") {
assertThat(popped).isEqualTo(restartMessage)
}

it("removes the message from the queue") {
assertThat(subject.depth(id)).isOne()
}
}

afterEachTest(::flushAll)
}
}

describe("purging the queue") {
val callback = mock<(Message) -> Unit>()

given("there are some messages on the queue") {
beforeGroup {
subject.enqueue(id, startMessage)
subject.enqueue(id, restartMessage)
}

on("purging the queue") {
subject.purge(id, callback)

it("makes the queue empty") {
assertThat(subject.depth(id)).isZero()
}

it("invokes the callback passing each message") {
verify(callback).invoke(startMessage)
verify(callback).invoke(restartMessage)
}
}

afterGroup(::flushAll)
}
}
})
21 changes: 21 additions & 0 deletions orca-queue-sql/orca-queue-sql.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apply from: "$rootDir/gradle/kotlin.gradle"
apply from: "$rootDir/gradle/spock.gradle"
apply from: "$rootDir/gradle/spek.gradle"


dependencies {
implementation(project(":orca-core"))
implementation(project(":orca-queue"))
implementation(project(":orca-sql"))
implementation("com.netflix.spinnaker.keiko:keiko-redis-spring:$keikoVersion")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

implementation("org.jooq:jooq")

testImplementation("com.netflix.spinnaker.kork:kork-sql-test")
testImplementation(project(":orca-queue-tck"))
testImplementation(project(":orca-sql"))
testImplementation("org.testcontainers:mysql")

testRuntimeOnly("mysql:mysql-connector-java")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.netflix.spinnaker.config

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.sql.pending.SqlPendingExecutionService
import com.netflix.spinnaker.q.Queue
import org.jooq.DSLContext
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.time.Clock

@Configuration
@EnableConfigurationProperties(SqlProperties::class)
class SqlPendingExecutionConfiguration {

@Bean
@ConditionalOnExpression(
"\${execution-repository.sql.enabled:false} && \${queue.sql.pending-execution-service.enabled:false}"
)
fun pendingExecutionService(
jooq: DSLContext,
queue: Queue,
repository: ExecutionRepository,
@Qualifier("redisQueueObjectMapper") mapper: ObjectMapper,
clock: Clock,
registry: Registry,
properties: SqlProperties,
@Value("\${queue.pending.max-depth:50}") maxDepth: Int
) =
SqlPendingExecutionService(
properties.partitionName,
jooq,
queue,
repository,
mapper,
clock,
registry,
properties.transactionRetry,
maxDepth
)
}
Loading

0 comments on commit ffd9e97

Please sign in to comment.