Skip to content

Commit

Permalink
feat(queue): add support for keiko-sql (#3134)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher authored Sep 9, 2019
1 parent 6dadf14 commit 0d0a1ba
Show file tree
Hide file tree
Showing 16 changed files with 351 additions and 32 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ fiatVersion=1.1.0
enablePublishing=false
spinnakerGradleVersion=7.0.1
korkVersion=6.6.3
keikoVersion=2.12.0
keikoVersion=2.13.0
org.gradle.parallel=true
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.netflix.spinnaker.orca.TaskResolver
import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType
import com.netflix.spinnaker.orca.q.redis.migration.ExecutionTypeDeserializer
import com.netflix.spinnaker.orca.q.redis.migration.OrcaToKeikoSerializationMigrator
import com.netflix.spinnaker.orca.q.redis.migration.TaskTypeDeserializer
import com.netflix.spinnaker.orca.q.migration.ExecutionTypeDeserializer
import com.netflix.spinnaker.orca.q.migration.OrcaToKeikoSerializationMigrator
import com.netflix.spinnaker.orca.q.migration.TaskTypeDeserializer
import com.netflix.spinnaker.orca.q.redis.pending.RedisPendingExecutionService
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.migration.SerializationMigrator
Expand All @@ -46,6 +46,10 @@ import java.util.Optional

@Configuration
@EnableConfigurationProperties(ObjectMapperSubtypeProperties::class)
@ConditionalOnProperty(
value = ["keiko.queue.redis.enabled"],
havingValue = "true",
matchIfMissing = true)
class RedisOrcaQueueConfiguration : RedisQueueConfiguration() {

@Autowired
Expand All @@ -70,7 +74,8 @@ class RedisOrcaQueueConfiguration : RedisQueueConfiguration() {
}
}

@Bean fun orcaToKeikoSerializationMigrator(objectMapper: ObjectMapper) = OrcaToKeikoSerializationMigrator(objectMapper)
@Bean
fun orcaToKeikoSerializationMigrator(objectMapper: ObjectMapper) = OrcaToKeikoSerializationMigrator(objectMapper)

@Bean
@ConditionalOnProperty(value = ["redis.cluster-enabled"], havingValue = "false", matchIfMissing = true)
Expand Down
6 changes: 6 additions & 0 deletions orca-queue-sql/orca-queue-sql.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,22 @@ apply from: "$rootDir/gradle/spek.gradle"
dependencies {
implementation(project(":orca-core"))
implementation(project(":orca-queue"))
api("com.netflix.spinnaker.kork:kork-sql")
api("com.netflix.spinnaker.keiko:keiko-sql:$keikoVersion")
implementation(project(":orca-sql"))
implementation("com.netflix.spinnaker.keiko:keiko-redis-spring:$keikoVersion")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")

implementation("org.jooq:jooq")

testImplementation("com.netflix.spinnaker.kork:kork-sql-test")
testImplementation(project(":orca-redis"))
testImplementation(project(":orca-test-redis"))
testImplementation(project(":orca-queue-tck"))
testImplementation(project(":orca-sql"))
testImplementation("org.testcontainers:mysql")
testImplementation("org.springframework:spring-test")
testImplementation("org.springframework.boot:spring-boot-test")

testRuntimeOnly("mysql:mysql-connector-java")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.config

import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.netflix.spinnaker.orca.TaskResolver
import com.netflix.spinnaker.orca.pipeline.model.Execution
import com.netflix.spinnaker.orca.q.migration.ExecutionTypeDeserializer
import com.netflix.spinnaker.orca.q.migration.TaskTypeDeserializer
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.migration.SerializationMigrator
import com.netflix.spinnaker.q.sql.SqlDeadMessageHandler
import com.netflix.spinnaker.q.sql.SqlQueue
import org.jooq.DSLContext
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.time.Clock
import java.util.Optional

@Configuration
@EnableConfigurationProperties(ObjectMapperSubtypeProperties::class)
@ConditionalOnProperty(
value = ["keiko.queue.sql.enabled"],
havingValue = "true",
matchIfMissing = false)
class SqlOrcaQueueConfiguration : SqlQueueConfiguration() {

@Autowired
fun sqlQueueObjectMapper(
mapper: ObjectMapper,
objectMapperSubtypeProperties: ObjectMapperSubtypeProperties,
taskResolver: TaskResolver
) {
mapper.apply {
registerModule(KotlinModule())
registerModule(
SimpleModule()
.addDeserializer(Execution.ExecutionType::class.java, ExecutionTypeDeserializer())
.addDeserializer(Class::class.java, TaskTypeDeserializer(taskResolver))
)
disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)

SpringObjectMapperConfigurer(objectMapperSubtypeProperties.apply {
messagePackages = messagePackages + listOf("com.netflix.spinnaker.orca.q")
attributePackages = attributePackages + listOf("com.netflix.spinnaker.orca.q")
}).registerSubtypes(this)
}
}

@Bean
override fun queue(
jooq: DSLContext,
clock: Clock,
mapper: ObjectMapper,
deadMessageHandler: SqlDeadMessageHandler,
publisher: EventPublisher,
serializationMigrator: Optional<SerializationMigrator>,
properties: SqlQueueProperties
) =
SqlQueue(
queueName = properties.queueName,
schemaVersion = 1,
jooq = jooq,
clock = clock,
lockTtlSeconds = properties.lockTtlSeconds,
mapper = mapper,
serializationMigrator = serializationMigrator,
ackTimeout = properties.ackTimeout,
deadMessageHandlers = listOf(deadMessageHandler),
publisher = publisher,
sqlRetryProperties = properties.retries
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.sql.pending.SqlPendingExecutionService
import com.netflix.spinnaker.q.Queue
import org.jooq.DSLContext
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean
Expand All @@ -23,7 +22,7 @@ class SqlPendingExecutionConfiguration {
jooq: DSLContext,
queue: Queue,
repository: ExecutionRepository,
@Qualifier("redisQueueObjectMapper") mapper: ObjectMapper,
mapper: ObjectMapper,
clock: Clock,
registry: Registry,
sqlProperties: SqlProperties,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.q.sql

import com.fasterxml.jackson.databind.DeserializationFeature
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.config.ObjectMapperSubtypeProperties
import com.netflix.spinnaker.config.SpringObjectMapperConfigurer
import com.netflix.spinnaker.config.SqlProperties
import com.netflix.spinnaker.config.TransactionRetryProperties
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate
import com.netflix.spinnaker.kork.jedis.RedisClientSelector
import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties
import com.netflix.spinnaker.kork.sql.test.SqlTestUtil
import com.netflix.spinnaker.orca.TaskResolver
import com.netflix.spinnaker.orca.config.JedisConfiguration
import com.netflix.spinnaker.orca.config.RedisConfiguration
import com.netflix.spinnaker.orca.pipeline.model.Execution
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.QueueIntegrationTest
import com.netflix.spinnaker.orca.q.TestConfig
import com.netflix.spinnaker.orca.q.migration.ExecutionTypeDeserializer
import com.netflix.spinnaker.orca.q.migration.TaskTypeDeserializer
import com.netflix.spinnaker.orca.q.sql.pending.SqlPendingExecutionService
import com.netflix.spinnaker.orca.sql.pipeline.persistence.SqlExecutionRepository
import com.netflix.spinnaker.orca.test.redis.EmbeddedRedisConfiguration
import com.netflix.spinnaker.q.Queue
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.metrics.MonitorableQueue
import com.netflix.spinnaker.q.sql.SqlQueue
import de.huxhorn.sulky.ulid.ULID
import org.jooq.DSLContext
import org.junit.runner.RunWith
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.test.context.junit4.SpringRunner
import java.time.Clock
import java.time.Duration
import java.util.Optional

@Configuration
class SqlTestConfig {
@Bean
fun jooq(): DSLContext {
val testDatabase = SqlTestUtil.initTcMysqlDatabase()
return testDatabase.context
}

@Autowired
fun sqlQueueObjectMapper(
mapper: ObjectMapper,
objectMapperSubtypeProperties: ObjectMapperSubtypeProperties,
taskResolver: TaskResolver
) {
mapper.apply {
registerModule(KotlinModule())
registerModule(
SimpleModule()
.addDeserializer(Execution.ExecutionType::class.java, ExecutionTypeDeserializer())
.addDeserializer(Class::class.java, TaskTypeDeserializer(taskResolver))
)
disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)

SpringObjectMapperConfigurer(objectMapperSubtypeProperties.apply {
messagePackages = messagePackages + listOf("com.netflix.spinnaker.orca.q")
attributePackages = attributePackages + listOf("com.netflix.spinnaker.orca.q")
}).registerSubtypes(this)
}
}

@Bean
fun queue(
jooq: DSLContext,
clock: Clock,
mapper: ObjectMapper,
publisher: EventPublisher
): MonitorableQueue =
SqlQueue(
"test",
1,
jooq,
clock,
1,
mapper,
Optional.empty(),
Duration.ofSeconds(1),
emptyList(),
true,
publisher,
SqlRetryProperties(),
ULID()
)

@Bean
fun sqlExecutionRepository(
dsl: DSLContext,
mapper: ObjectMapper,
registry: Registry,
properties: SqlProperties
) = SqlExecutionRepository(
properties.partitionName,
dsl,
mapper,
properties.transactionRetry,
properties.batchReadSize,
properties.stageReadSize
)

@Bean
fun pendingExecutionService(
jooq: DSLContext,
queue: Queue,
repository: ExecutionRepository,
mapper: ObjectMapper,
clock: Clock,
registry: Registry
) =
SqlPendingExecutionService("test",
jooq,
queue,
repository,
mapper,
clock,
registry,
TransactionRetryProperties(),
5
)

// TODO: remove this once Redis is no longer needed for distributed locking
@Bean
fun redisClientSelector(redisClientDelegates: List<RedisClientDelegate>) =
RedisClientSelector(redisClientDelegates)
}

@RunWith(SpringRunner::class)
@SpringBootTest(
classes = [
SqlTestConfig::class,
SqlProperties::class,
TestConfig::class,
DynamicConfigService.NoopDynamicConfig::class,
EmbeddedRedisConfiguration::class,
JedisConfiguration::class,
RedisConfiguration::class
],
properties = [
"queue.retry.delay.ms=10",
"logging.level.root=ERROR",
"logging.level.org.springframework.test=ERROR",
"logging.level.com.netflix.spinnaker=FATAL",
"execution-repository.sql.enabled=true",
"execution-repository.redis.enabled=false",
"keiko.queue.redis.enabled=false",
"keiko.queue.sql.enabled=true",
"sql.enabled=true"
])

class SqlQueueIntegrationTest : QueueIntegrationTest()
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ExecutionLatch(private val predicate: Predicate<ExecutionComplete>) :
}
}

fun await() = latch.await(2, TimeUnit.SECONDS)
fun await() = latch.await(5, TimeUnit.SECONDS)
}

fun ConfigurableApplicationContext.runToCompletion(execution: Execution, launcher: (Execution) -> Unit, repository: ExecutionRepository) {
Expand Down
Loading

0 comments on commit 0d0a1ba

Please sign in to comment.