Skip to content

Commit

Permalink
feat(retry): Resilience4j Spring Boot2 implementation in clouddriver-…
Browse files Browse the repository at this point in the history
…sql (#4071)
  • Loading branch information
jonsie authored and robzienert committed Oct 7, 2019
1 parent f01ab55 commit 2e383b6
Show file tree
Hide file tree
Showing 9 changed files with 48 additions and 96 deletions.
1 change: 0 additions & 1 deletion clouddriver-sql/clouddriver-sql.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ dependencies {
implementation project(":clouddriver-event")

implementation "com.netflix.spinnaker.kork:kork-core"
implementation "com.netflix.spinnaker.kork:kork-exceptions"
implementation "com.netflix.spinnaker.kork:kork-sql"
implementation "com.netflix.spinnaker.kork:kork-telemetry"
implementation "de.huxhorn.sulky:de.huxhorn.sulky.ulid"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import com.netflix.spinnaker.clouddriver.data.task.TaskState.COMPLETED
import com.netflix.spinnaker.clouddriver.data.task.TaskState.FAILED
import com.netflix.spinnaker.config.ConnectionPools
import com.netflix.spinnaker.config.SqlTaskCleanupAgentProperties
import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties
import com.netflix.spinnaker.kork.sql.routing.withPool
import org.jooq.DSLContext
import org.jooq.impl.DSL.field
Expand All @@ -39,8 +38,7 @@ class SqlTaskCleanupAgent(
private val jooq: DSLContext,
private val clock: Clock,
private val registry: Registry,
private val properties: SqlTaskCleanupAgentProperties,
private val sqlRetryProperties: SqlRetryProperties
private val properties: SqlTaskCleanupAgentProperties
) : RunnableAgent, CustomScheduledAgent {

private val log = LoggerFactory.getLogger(javaClass)
Expand All @@ -50,7 +48,7 @@ class SqlTaskCleanupAgent(

override fun run() {
withPool(ConnectionPools.TASKS.value) {
val candidates = jooq.withRetry(sqlRetryProperties.reads) { j ->
val candidates = jooq.read { j ->
val candidates = j.select(field("id"), field("task_id"))
.from(taskStatesTable)
.where(
Expand Down Expand Up @@ -105,23 +103,23 @@ class SqlTaskCleanupAgent(

registry.timer(timingId).record {
candidates.resultIds.chunked(properties.batchSize) { chunk ->
jooq.withRetry(sqlRetryProperties.transactions) { ctx ->
jooq.transactional { ctx ->
ctx.deleteFrom(taskResultsTable)
.where("id IN (${chunk.joinToString(",") { "'$it'" }})")
.execute()
}
}

candidates.stateIds.chunked(properties.batchSize) { chunk ->
jooq.withRetry(sqlRetryProperties.transactions) { ctx ->
jooq.transactional { ctx ->
ctx.deleteFrom(taskStatesTable)
.where("id IN (${chunk.joinToString(",") { "'$it'" }})")
.execute()
}
}

candidates.taskIds.chunked(properties.batchSize) { chunk ->
jooq.withRetry(sqlRetryProperties.transactions) { ctx ->
jooq.transactional { ctx ->
ctx.deleteFrom(tasksTable)
.where("id IN (${chunk.joinToString(",") { "'$it'" }})")
.execute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.netflix.spinnaker.clouddriver.data.task.TaskState
import com.netflix.spinnaker.clouddriver.data.task.TaskState.FAILED
import com.netflix.spinnaker.clouddriver.data.task.TaskState.STARTED
import com.netflix.spinnaker.config.ConnectionPools
import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties
import com.netflix.spinnaker.kork.sql.routing.withPool
import de.huxhorn.sulky.ulid.ULID
import org.jooq.Condition
Expand All @@ -40,8 +39,7 @@ import java.time.Clock
class SqlTaskRepository(
private val jooq: DSLContext,
private val mapper: ObjectMapper,
private val clock: Clock,
private val sqlRetryProperties: SqlRetryProperties
private val clock: Clock
) : TaskRepository {

private val log = LoggerFactory.getLogger(javaClass)
Expand All @@ -59,7 +57,7 @@ class SqlTaskRepository(
val historyId = ulid.nextULID()

withPool(POOL_NAME) {
jooq.transactional(sqlRetryProperties.transactions) { ctx ->
jooq.transactional { ctx ->
val existingTask = getByClientRequestId(clientRequestId)
if (existingTask != null) {
task = existingTask as SqlTask
Expand All @@ -86,7 +84,7 @@ class SqlTaskRepository(

fun updateSagaIds(task: Task) {
return withPool(POOL_NAME) {
jooq.transactional(sqlRetryProperties.transactions) { ctx ->
jooq.transactional { ctx ->
ctx.update(tasksTable)
.set(field("saga_ids"), mapper.writeValueAsString(task.sagaIds))
.where(field("id").eq(task.id))
Expand All @@ -101,7 +99,7 @@ class SqlTaskRepository(

override fun getByClientRequestId(clientRequestId: String): Task? {
return withPool(POOL_NAME) {
jooq.withRetry(sqlRetryProperties.reads) {
jooq.read {
it.select(field("id"))
.from(tasksTable)
.where(field("request_id").eq(clientRequestId))
Expand All @@ -115,7 +113,7 @@ class SqlTaskRepository(

override fun list(): MutableList<Task> {
return withPool(POOL_NAME) {
jooq.withRetry(sqlRetryProperties.reads) {
jooq.read {
runningTaskIds(it, false).let { taskIds ->
retrieveInternal(field("id").`in`(*taskIds), field("task_id").`in`(*taskIds)).toMutableList()
}
Expand All @@ -125,7 +123,7 @@ class SqlTaskRepository(

override fun listByThisInstance(): MutableList<Task> {
return withPool(POOL_NAME) {
jooq.withRetry(sqlRetryProperties.reads) {
jooq.read {
runningTaskIds(it, true).let { taskIds ->
retrieveInternal(field("id").`in`(*taskIds), field("task_id").`in`(*taskIds)).toMutableList()
}
Expand All @@ -137,7 +135,7 @@ class SqlTaskRepository(
val resultIdPairs = results.map { ulid.nextULID() to it }.toMap()

withPool(POOL_NAME) {
jooq.transactional(sqlRetryProperties.transactions) { ctx ->
jooq.transactional { ctx ->
ctx.select(taskStatesFields)
.from(taskStatesTable)
.where(field("task_id").eq(task.id))
Expand All @@ -164,7 +162,7 @@ class SqlTaskRepository(
internal fun updateCurrentStatus(task: Task, phase: String, status: String) {
val historyId = ulid.nextULID()
withPool(POOL_NAME) {
jooq.transactional(sqlRetryProperties.transactions) { ctx ->
jooq.transactional { ctx ->
val state = selectLatestState(ctx, task.id)
addToHistory(ctx, historyId, task.id, state?.state ?: STARTED, phase, status)
}
Expand All @@ -184,7 +182,7 @@ class SqlTaskRepository(
internal fun updateState(task: Task, state: TaskState) {
val historyId = ulid.nextULID()
withPool(POOL_NAME) {
jooq.transactional(sqlRetryProperties.transactions) { ctx ->
jooq.transactional { ctx ->
selectLatestState(ctx, task.id)?.let {
addToHistory(ctx, historyId, task.id, state, it.phase, it.status)
}
Expand All @@ -203,7 +201,7 @@ class SqlTaskRepository(
// on every connection acquire - need to change this so running on !aurora will behave consistently.
// REPEATABLE_READ is correct here.
withPool(POOL_NAME) {
jooq.transactional(sqlRetryProperties.transactions) { ctx ->
jooq.transactional { ctx ->
/**
* (select id as task_id, owner_id, request_id, created_at, saga_ids, null as body, null as state, null as phase, null as status from tasks_copy where id = '01D2H4H50VTF7CGBMP0D6HTGTF')
* UNION ALL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@ import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository
import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository.ListAggregatesCriteria
import com.netflix.spinnaker.clouddriver.sql.transactional
import com.netflix.spinnaker.config.ConnectionPools
import com.netflix.spinnaker.kork.sql.config.SqlProperties
import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties
import com.netflix.spinnaker.kork.sql.routing.withPool
import com.netflix.spinnaker.kork.version.ServiceVersion
import de.huxhorn.sulky.ulid.ULID
import io.github.resilience4j.retry.Retry
import io.github.resilience4j.retry.RetryConfig
import io.github.resilience4j.retry.RetryRegistry
import org.jooq.Condition
import org.jooq.DSLContext
import org.jooq.impl.DSL.currentTimestamp
Expand All @@ -42,23 +37,17 @@ import org.jooq.impl.DSL.max
import org.jooq.impl.DSL.table
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationEventPublisher
import org.springframework.validation.Validator
import java.time.Duration
import java.util.UUID

class SqlEventRepository(
private val jooq: DSLContext,
sqlProperties: SqlProperties,
private val serviceVersion: ServiceVersion,
private val objectMapper: ObjectMapper,
private val applicationEventPublisher: ApplicationEventPublisher,
private val registry: Registry,
private val validator: Validator,
private val retryRegistry: RetryRegistry
private val registry: Registry
) : EventRepository {

private val log by lazy { LoggerFactory.getLogger(javaClass) }
private val retryProperties: SqlRetryProperties = sqlProperties.retries

private val eventCountId = registry.createId("eventing.events")

Expand All @@ -74,16 +63,9 @@ class SqlEventRepository(
val aggregateCondition = field("aggregate_type").eq(aggregateType)
.and(field("aggregate_id").eq(aggregateId))

// TODO(rz): Get this from Spring?
val retry = RetryConfig.custom<Retry>()
.maxAttempts(retryProperties.transactions.maxRetries)
.waitDuration(Duration.ofMillis(retryProperties.transactions.backoffMs))
.ignoreExceptions(AggregateChangeRejectedException::class.java)
.build()

try {
withPool(POOL_NAME) {
jooq.transactional(retryRegistry.retry("eventSave", retry)) { ctx ->
jooq.transactional { ctx ->
// Get or create the aggregate and immediately assert that this save operation is being committed against the
// most recent aggregate state.
val aggregate = ctx.maybeGetAggregate(aggregateCondition) ?: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,12 @@
*/
package com.netflix.spinnaker.clouddriver.sql

import com.netflix.spinnaker.kork.core.RetrySupport
import com.netflix.spinnaker.kork.sql.config.RetryProperties
import io.github.resilience4j.retry.Retry
import io.github.resilience4j.retry.annotation.Retry
import org.jooq.DSLContext
import org.jooq.impl.DSL
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.table

internal val retrySupport = RetrySupport()

internal val tasksTable = table("tasks")
internal val taskStatesTable = table("task_states")
internal val taskResultsTable = table("task_results")
Expand All @@ -34,36 +30,20 @@ internal val taskStatesFields = listOf("id", "task_id", "created_at", "state", "
internal val taskResultsFields = listOf("id", "task_id", "body").map { field(it) }

/**
* Run the provided [fn] in a transaction, retrying on failures using [retryProperties].
*/
@Deprecated("use transactional(retry, fn) instead")
internal fun DSLContext.transactional(retryProperties: RetryProperties, fn: (DSLContext) -> Unit) {
retrySupport.retry({
transaction { ctx ->
fn(DSL.using(ctx))
}
}, retryProperties.maxRetries, retryProperties.backoffMs, false)
}

/**
* Run the provided [fn] in a transaction, retrying on failures using [retry].
* Run the provided [fn] in a transaction, retrying on failures using resilience4j.retry.instances.sqlTransaction
* configuration.
*/
internal fun DSLContext.transactional(
retry: Retry,
fn: (DSLContext) -> Unit
) {
retry.executeRunnable {
transaction { ctx ->
fn(DSL.using(ctx))
}
@Retry(name = "sqlTransaction")
internal fun DSLContext.transactional(fn: (DSLContext) -> Unit) {
transaction { ctx ->
fn(DSL.using(ctx))
}
}

/**
* Run the provided [fn] with retry support.
* Run the provided [fn], retrying on failures using resilience4j.retry.instances.sqlRead configuration.
*/
internal fun <T> DSLContext.withRetry(retryProperties: RetryProperties, fn: (DSLContext) -> T): T {
return retrySupport.retry({
fn(this)
}, retryProperties.maxRetries, retryProperties.backoffMs, false)
@Retry(name = "sqlRead")
internal fun <T> DSLContext.read(fn: (DSLContext) -> T): T {
return fn(this)
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import com.netflix.spinnaker.kork.sql.config.DefaultSqlConfiguration
import com.netflix.spinnaker.kork.sql.config.SqlProperties
import com.netflix.spinnaker.kork.telemetry.InstrumentedProxy
import com.netflix.spinnaker.kork.version.ServiceVersion
import io.github.resilience4j.retry.RetryRegistry
import org.jooq.DSLContext
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
Expand All @@ -39,7 +38,6 @@ import org.springframework.context.ApplicationEventPublisher
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Import
import org.springframework.validation.Validator
import java.time.Clock

@Configuration
Expand All @@ -52,10 +50,9 @@ class SqlConfiguration {
@ConditionalOnProperty("sql.task-repository.enabled")
fun sqlTaskRepository(
jooq: DSLContext,
clock: Clock,
sqlProperties: SqlProperties
clock: Clock
): TaskRepository =
SqlTaskRepository(jooq, ObjectMapper(), clock, sqlProperties.retries)
SqlTaskRepository(jooq, ObjectMapper(), clock)

@Bean
@ConditionalOnProperty("sql.task-repository.enabled")
Expand All @@ -64,10 +61,9 @@ class SqlConfiguration {
jooq: DSLContext,
clock: Clock,
registry: Registry,
properties: SqlTaskCleanupAgentProperties,
sqlProperties: SqlProperties
properties: SqlTaskCleanupAgentProperties
): SqlTaskCleanupAgent =
SqlTaskCleanupAgent(jooq, clock, registry, properties, sqlProperties.retries)
SqlTaskCleanupAgent(jooq, clock, registry, properties)

@Bean
@ConditionalOnProperty("sql.task-repository.enabled")
Expand All @@ -83,21 +79,16 @@ class SqlConfiguration {
objectMapper: ObjectMapper,
applicationEventPublisher: ApplicationEventPublisher,
registry: Registry,
defaultValidator: Validator,
retryRegistry: RetryRegistry,
subtypeLocators: List<SubtypeLocator>
): EventRepository {
// TODO(rz): ObjectMapperSubtypeConfigurer should become a standard kork feature. This is pretty gross.
ObjectMapperSubtypeConfigurer(true).registerSubtypes(objectMapper, subtypeLocators)
return SqlEventRepository(
jooq,
sqlProperties,
serviceVersion,
objectMapper,
applicationEventPublisher,
registry,
defaultValidator,
retryRegistry
registry
).let {
InstrumentedProxy.proxy(registry, it, "eventRepository", mapOf("backend" to "sql"))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ protected TaskRepository createTaskRepository() {
properties.setReads(retry);
properties.setTransactions(retry);

return new SqlTaskRepository(
database.context, new ObjectMapper(), Clock.systemDefaultZone(), properties);
return new SqlTaskRepository(database.context, new ObjectMapper(), Clock.systemDefaultZone());
}

@After
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,13 @@ import com.netflix.spinnaker.clouddriver.event.AbstractSpinnakerEvent
import com.netflix.spinnaker.clouddriver.event.exceptions.AggregateChangeRejectedException
import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository.ListAggregatesCriteria
import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository.ListAggregatesResult
import com.netflix.spinnaker.kork.sql.config.SqlProperties
import com.netflix.spinnaker.kork.sql.test.SqlTestUtil
import com.netflix.spinnaker.kork.version.ServiceVersion
import dev.minutest.junit.JUnit5Minutests
import dev.minutest.rootContext
import io.github.resilience4j.retry.RetryRegistry
import io.mockk.every
import io.mockk.mockk
import org.springframework.context.ApplicationEventPublisher
import org.springframework.validation.Validator
import org.testcontainers.shaded.com.fasterxml.jackson.annotation.JsonTypeName
import strikt.api.expect
import strikt.api.expectThat
Expand Down Expand Up @@ -196,21 +193,16 @@ class SqlEventRepositoryTest : JUnit5Minutests {

val serviceVersion: ServiceVersion = mockk(relaxed = true)
val applicationEventPublisher: ApplicationEventPublisher = mockk(relaxed = true)
val validator: Validator = mockk(relaxed = true)
val retryRegistry: RetryRegistry = RetryRegistry.ofDefaults()

val subject = SqlEventRepository(
jooq = database.context,
sqlProperties = SqlProperties(),
serviceVersion = serviceVersion,
objectMapper = ObjectMapper().apply {
findAndRegisterModules()
registerSubtypes(MyEvent::class.java)
},
applicationEventPublisher = applicationEventPublisher,
registry = NoopRegistry(),
validator = validator,
retryRegistry = retryRegistry
registry = NoopRegistry()
)

init {
Expand Down
Loading

0 comments on commit 2e383b6

Please sign in to comment.