Skip to content

Commit

Permalink
feat(sql): Support event storage in SQL backend (#4004)
Browse files Browse the repository at this point in the history
  • Loading branch information
robzienert committed Sep 6, 2019
1 parent c0cf70b commit 4830ed8
Show file tree
Hide file tree
Showing 14 changed files with 1,025 additions and 5 deletions.
18 changes: 17 additions & 1 deletion clouddriver-sql/clouddriver-sql.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,33 @@ apply from: "$rootDir/gradle/kotlin.gradle"
dependencies {
implementation project(":cats:cats-core")
implementation project(":clouddriver-core")
implementation project(":clouddriver-event")

implementation "com.netflix.spinnaker.kork:kork-core"
implementation "com.netflix.spinnaker.kork:kork-exceptions"
implementation "com.netflix.spinnaker.kork:kork-sql"
implementation "com.netflix.spinnaker.kork:kork-telemetry"
implementation "de.huxhorn.sulky:de.huxhorn.sulky.ulid"
implementation "io.github.resilience4j:resilience4j-retry"
implementation "org.jooq:jooq"
implementation "org.hibernate.validator:hibernate-validator"

testImplementation project(":clouddriver-core-tck")

testImplementation "com.netflix.spinnaker.kork:kork-sql-test"
testImplementation "org.testcontainers:mysql"
testImplementation "mysql:mysql-connector-java"

testImplementation "cglib:cglib-nodep"
testImplementation "org.objenesis:objenesis"
testImplementation "org.junit.platform:junit-platform-runner"
testImplementation "org.junit.jupiter:junit-jupiter-api"
testImplementation "org.springframework:spring-test"
testImplementation "org.springframework.boot:spring-boot-test"
testImplementation "org.assertj:assertj-core"
testImplementation "io.strikt:strikt-core"
testImplementation "dev.minutest:minutest"
testImplementation "io.mockk:mockk"

testRuntimeOnly "org.junit.platform:junit-platform-launcher"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.sql.event

import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.cats.agent.RunnableAgent
import com.netflix.spinnaker.clouddriver.cache.CustomScheduledAgent
import com.netflix.spinnaker.clouddriver.core.provider.CoreProvider
import com.netflix.spinnaker.config.ConnectionPools
import com.netflix.spinnaker.config.SqlEventCleanupAgentConfigProperties
import com.netflix.spinnaker.kork.sql.routing.withPool
import org.jooq.DSLContext
import org.jooq.impl.DSL.currentTimestamp
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.table
import org.jooq.impl.DSL.timestampDiff
import org.jooq.types.DayToSecond
import org.slf4j.LoggerFactory
import java.sql.Timestamp
import java.time.Duration
import java.time.Instant

/**
* Cleans up [SpinnakerEvent]s (by [Aggregate]) that are older than a configured number of days.
*/
class SqlEventCleanupAgent(
private val jooq: DSLContext,
private val registry: Registry,
private val properties: SqlEventCleanupAgentConfigProperties
) : RunnableAgent, CustomScheduledAgent {

private val log by lazy { LoggerFactory.getLogger(javaClass) }

private val deletedId = registry.createId("sql.eventCleanupAgent.deleted")
private val timingId = registry.createId("sql.eventCleanupAgent.timing")

override fun run() {
val duration = Duration.ofDays(properties.maxAggregateAgeDays)
val cutoff = Instant.now().minus(duration)
log.info("Deleting aggregates last updated earlier than $cutoff ($duration)")

registry.timer(timingId).record {
withPool(ConnectionPools.EVENTS.value) {
val rs = jooq.select(field("aggregate_type"), field("aggregateId"))
.from(table("event_aggregates"))
.where(timestampDiff(field("last_change_timestamp", Timestamp::class.java), currentTimestamp())
.greaterThan(DayToSecond.valueOf(duration)))
.fetch()
.intoResultSet()

var deleted = 0L
while (rs.next()) {
deleted++
jooq.deleteFrom(table("event_aggregates"))
.where(field("aggregate_type").eq(rs.getString("aggregate_type"))
.and(field("aggregate_id").eq(rs.getString("aggregate_id"))))
.execute()
}

registry.counter(deletedId).increment(deleted)
log.info("Deleted $deleted event aggregates")
}
}
}

override fun getAgentType(): String = javaClass.simpleName
override fun getProviderName(): String = CoreProvider.PROVIDER_NAME
override fun getPollIntervalMillis() = properties.frequency.toMillis()
override fun getTimeoutMillis() = properties.timeout.toMillis()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
* Copyright 2019 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.clouddriver.sql.event

import com.fasterxml.jackson.databind.ObjectMapper
import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.clouddriver.event.Aggregate
import com.netflix.spinnaker.clouddriver.event.CompositeSpinnakerEvent
import com.netflix.spinnaker.clouddriver.event.EventMetadata
import com.netflix.spinnaker.clouddriver.event.SpinnakerEvent
import com.netflix.spinnaker.clouddriver.event.exceptions.AggregateChangeRejectedException
import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository
import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository.ListAggregatesCriteria
import com.netflix.spinnaker.clouddriver.sql.transactional
import com.netflix.spinnaker.config.ConnectionPools
import com.netflix.spinnaker.kork.sql.config.SqlProperties
import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties
import com.netflix.spinnaker.kork.sql.routing.withPool
import com.netflix.spinnaker.kork.version.ServiceVersion
import de.huxhorn.sulky.ulid.ULID
import io.github.resilience4j.retry.Retry
import io.github.resilience4j.retry.RetryConfig
import io.github.resilience4j.retry.RetryRegistry
import org.jooq.Condition
import org.jooq.DSLContext
import org.jooq.impl.DSL.currentTimestamp
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.table
import org.slf4j.LoggerFactory
import org.springframework.context.ApplicationEventPublisher
import org.springframework.validation.Validator
import java.time.Duration
import java.util.UUID

class SqlEventRepository(
private val jooq: DSLContext,
sqlProperties: SqlProperties,
private val serviceVersion: ServiceVersion,
private val objectMapper: ObjectMapper,
private val applicationEventPublisher: ApplicationEventPublisher,
private val registry: Registry,
private val validator: Validator,
private val retryRegistry: RetryRegistry
) : EventRepository {

private val log by lazy { LoggerFactory.getLogger(javaClass) }
private val retryProperties: SqlRetryProperties = sqlProperties.retries

private val eventCountId = registry.createId("eventing.events")

override fun save(
aggregateType: String,
aggregateId: String,
originatingVersion: Long,
newEvents: List<SpinnakerEvent>
) {
val eventNames = newEvents.joinToString { it.javaClass.simpleName }
log.debug("Saving $aggregateType/$aggregateId expecting version $originatingVersion with [$eventNames]")

val aggregateCondition = field("aggregate_type").eq(aggregateType)
.and(field("aggregate_id").eq(aggregateId))

// TODO(rz): Get this from Spring?
val retry = RetryConfig.custom<Retry>()
.maxAttempts(retryProperties.transactions.maxRetries)
.waitDuration(Duration.ofMillis(retryProperties.transactions.backoffMs))
.ignoreExceptions(AggregateChangeRejectedException::class.java)
.build()

try {
withPool(POOL_NAME) {
jooq.transactional(retryRegistry.retry("eventSave", retry)) { ctx ->
// Get or create the aggregate and immediately assert that this save operation is being committed against the
// most recent aggregate state.
val aggregate = ctx.maybeGetAggregate(aggregateCondition) ?: {
if (originatingVersion != 0L) {
// The aggregate doesn't exist and we're already expecting a non-zero version.
throw AggregateChangeRejectedException(-1, originatingVersion)
}

// The aggregate doesn't exist yet, so we'll go ahead and seed it immediately.
val initialAggregate = mapOf(
field("aggregate_type") to aggregateType,
field("aggregate_id") to aggregateId,
field("token") to ulid.nextULID(),
field("version") to 0
)

ctx.insertInto(AGGREGATES_TABLE)
.columns(initialAggregate.keys)
.values(initialAggregate.values)
.execute()

Aggregate(aggregateType, aggregateId, 0)
}()

if (aggregate.version != originatingVersion) {
throw AggregateChangeRejectedException(aggregate.version, originatingVersion)
}

// Events have their own auto-incrementing sequence within an aggregate; so we need to get the last sequence
// and generate from there.
val lastSequence = ctx.select(field("sequence")).from(EVENTS_TABLE)
.where(aggregateCondition)
.orderBy(field("timestamp").desc())
.limit(1)
.fetchOne(0, Long::class.java)

log.debug("Last event sequence number is $lastSequence")
var nextSequence = lastSequence

// Add the new events, doesn't matter what they are: At this point, they're "probably" valid, as the higher
// libs should be validating the event payload.
ctx.insertInto(EVENTS_TABLE)
.columns(
field("id"),
field("aggregate_type"),
field("aggregate_id"),
field("sequence"),
field("originating_version"),
field("timestamp"),
field("metadata"),
field("data")
)
.let { insertValuesStep ->
var step = insertValuesStep
newEvents.forEach {
nextSequence = it.initialize(aggregateType, aggregateId, originatingVersion, nextSequence)
step = step.values(it.toSqlValues(objectMapper))
}
step
}
.execute()

// Update the aggregates table with a new version
ctx.update(AGGREGATES_TABLE)
.set(field("version"), field("version", Long::class.java).add(1))
.set(field("last_change_timestamp"), currentTimestamp())
.where(aggregateCondition)
.execute()

log.debug("Event sequence number is now $nextSequence")
}
}
} catch (e: AggregateChangeRejectedException) {
registry.counter(eventCountId.withTags("aggregateType", aggregateType)).increment(newEvents.size.toLong())
throw e
} catch (e: Exception) {
// This is totally handling it...
registry.counter(eventCountId.withTags("aggregateType", aggregateType)).increment(newEvents.size.toLong())
throw SqlEventSystemException("Failed saving new events", e)
}

log.debug("Saved $aggregateType/$aggregateId: [${newEvents.joinToString { it.javaClass.simpleName}}]")
registry.counter(eventCountId.withTags("aggregateType", aggregateType)).increment(newEvents.size.toLong())

newEvents.forEach { applicationEventPublisher.publishEvent(it) }
}

/**
* Initialize the [SpinnakerEvent] lateinit properties (recursively, if necessary).
*
* This is a bit wonky: In the case of [ComposedSpinnakerEvent]s, we want to initialize the event so we can
* correctly serialize it, but we don't want to increment the sequence for these events as they aren't
* actually on the event log yet. If we're in a [ComposedSpinnakerEvent], we just provide a "-1" sequence
* number and a real, valid sequence will be assigned if/when it gets saved to the event log.
*/
private fun SpinnakerEvent.initialize(
aggregateType: String,
aggregateId: String,
originatingVersion: Long,
currentSequence: Long?
): Long? {
var nextSequence = if (currentSequence != null) {
currentSequence + 1
} else {
null
}

// timestamp is calculated on the SQL server
setMetadata(EventMetadata(
id = UUID.randomUUID().toString(),
aggregateType = aggregateType,
aggregateId = aggregateId,
sequence = nextSequence ?: -1,
originatingVersion = originatingVersion,
serviceVersion = serviceVersion.resolve()
))

if (this is CompositeSpinnakerEvent) {
this.getComposedEvents().forEach { event ->
// We initialize composed events with a null sequence, since they won't actually get added to the log at
// this point; that's up to the action to either add it or not, at which point it'll get a sequence number
event.initialize(aggregateType, aggregateId, originatingVersion, null)?.let {
nextSequence = it
}
}
}

return nextSequence
}

override fun list(aggregateType: String, aggregateId: String): List<SpinnakerEvent> {
return withPool(POOL_NAME) {
jooq.select().from(EVENTS_TABLE)
.where(field("aggregate_type").eq(aggregateType)
.and(field("aggregate_id").eq(aggregateId)))
.orderBy(field("sequence").asc())
.fetchEvents(objectMapper)
}
}

override fun listAggregates(criteria: ListAggregatesCriteria): EventRepository.ListAggregatesResult {
// TODO(rz): validate criteria

return withPool(POOL_NAME) {
val conditions = mutableListOf<Condition>()
criteria.aggregateType?.let { conditions.add(field("aggregate_type").eq(it)) }
criteria.token?.let { conditions.add(field("token").greaterThan(it)) }

val perPage = criteria.perPage.coerceAtMost(10_000)

val aggregates = jooq.select().from(AGGREGATES_TABLE)
.withConditions(conditions)
.orderBy(field("token").asc())
.limit(perPage)
.fetchAggregates()

val remaining = jooq.selectCount().from(AGGREGATES_TABLE)
.withConditions(conditions)
.fetchOne(0, Int::class.java) - perPage

EventRepository.ListAggregatesResult(
aggregates = aggregates.map { it.model },
nextPageToken = if (remaining > 0) aggregates.lastOrNull()?.token else null
)
}
}

private fun DSLContext.maybeGetAggregate(aggregateCondition: Condition): Aggregate? {
return select()
.from(AGGREGATES_TABLE)
.where(aggregateCondition)
.limit(1)
.fetchAggregates()
.firstOrNull()
?.model
}

companion object {
private val POOL_NAME = ConnectionPools.EVENTS.value
private val AGGREGATES_TABLE = table("event_aggregates")
private val EVENTS_TABLE = table("events")

private val ulid = ULID()
}
}
Loading

0 comments on commit 4830ed8

Please sign in to comment.