From 4830ed8f999eae9ffa01f48556451e56aba11b2f Mon Sep 17 00:00:00 2001 From: Rob Zienert Date: Fri, 6 Sep 2019 10:22:45 -0700 Subject: [PATCH] feat(sql): Support event storage in SQL backend (#4004) --- clouddriver-sql/clouddriver-sql.gradle | 18 +- .../sql/event/SqlEventCleanupAgent.kt | 83 ++++++ .../sql/event/SqlEventRepository.kt | 270 ++++++++++++++++++ .../sql/event/SqlEventSystemException.kt | 27 ++ .../spinnaker/clouddriver/sql/event/dsl.kt | 119 ++++++++ .../sql/exceptions/SqlException.kt | 21 ++ .../netflix/spinnaker/clouddriver/sql/sql.kt | 18 +- .../spinnaker/config/ConnectionPools.kt | 3 +- .../spinnaker/config/SqlConfiguration.kt | 49 +++- .../SqlEventCleanupAgentConfigProperties.kt | 42 +++ .../config/SqlTaskCleanupAgentProperties.kt | 2 +- .../main/resources/db/changelog-master.yml | 3 + .../20190822-initial-event-schema.yml | 159 +++++++++++ .../sql/event/SqlEventRepositoryTest.kt | 216 ++++++++++++++ 14 files changed, 1025 insertions(+), 5 deletions(-) create mode 100644 clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventCleanupAgent.kt create mode 100644 clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventRepository.kt create mode 100644 clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventSystemException.kt create mode 100644 clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/dsl.kt create mode 100644 clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/exceptions/SqlException.kt create mode 100644 clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlEventCleanupAgentConfigProperties.kt create mode 100644 clouddriver-sql/src/main/resources/db/changelog/20190822-initial-event-schema.yml create mode 100644 clouddriver-sql/src/test/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventRepositoryTest.kt diff --git a/clouddriver-sql/clouddriver-sql.gradle b/clouddriver-sql/clouddriver-sql.gradle index e89bb582c82..eef6869440e 100644 --- a/clouddriver-sql/clouddriver-sql.gradle +++ b/clouddriver-sql/clouddriver-sql.gradle @@ -19,17 +19,33 @@ apply from: "$rootDir/gradle/kotlin.gradle" dependencies { implementation project(":cats:cats-core") implementation project(":clouddriver-core") + implementation project(":clouddriver-event") implementation "com.netflix.spinnaker.kork:kork-core" implementation "com.netflix.spinnaker.kork:kork-exceptions" implementation "com.netflix.spinnaker.kork:kork-sql" + implementation "com.netflix.spinnaker.kork:kork-telemetry" implementation "de.huxhorn.sulky:de.huxhorn.sulky.ulid" - implementation "io.github.resilience4j:resilience4j-retry" implementation "org.jooq:jooq" + implementation "org.hibernate.validator:hibernate-validator" testImplementation project(":clouddriver-core-tck") testImplementation "com.netflix.spinnaker.kork:kork-sql-test" testImplementation "org.testcontainers:mysql" testImplementation "mysql:mysql-connector-java" + + testImplementation "cglib:cglib-nodep" + testImplementation "org.objenesis:objenesis" + testImplementation "org.junit.platform:junit-platform-runner" + testImplementation "org.junit.jupiter:junit-jupiter-api" + testImplementation "org.springframework:spring-test" + testImplementation "org.springframework.boot:spring-boot-test" + testImplementation "org.assertj:assertj-core" + testImplementation "io.strikt:strikt-core" + testImplementation "dev.minutest:minutest" + testImplementation "io.mockk:mockk" + + testRuntimeOnly "org.junit.platform:junit-platform-launcher" + testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine" } diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventCleanupAgent.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventCleanupAgent.kt new file mode 100644 index 00000000000..22079ff44d5 --- /dev/null +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventCleanupAgent.kt @@ -0,0 +1,83 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.sql.event + +import com.netflix.spectator.api.Registry +import com.netflix.spinnaker.cats.agent.RunnableAgent +import com.netflix.spinnaker.clouddriver.cache.CustomScheduledAgent +import com.netflix.spinnaker.clouddriver.core.provider.CoreProvider +import com.netflix.spinnaker.config.ConnectionPools +import com.netflix.spinnaker.config.SqlEventCleanupAgentConfigProperties +import com.netflix.spinnaker.kork.sql.routing.withPool +import org.jooq.DSLContext +import org.jooq.impl.DSL.currentTimestamp +import org.jooq.impl.DSL.field +import org.jooq.impl.DSL.table +import org.jooq.impl.DSL.timestampDiff +import org.jooq.types.DayToSecond +import org.slf4j.LoggerFactory +import java.sql.Timestamp +import java.time.Duration +import java.time.Instant + +/** + * Cleans up [SpinnakerEvent]s (by [Aggregate]) that are older than a configured number of days. + */ +class SqlEventCleanupAgent( + private val jooq: DSLContext, + private val registry: Registry, + private val properties: SqlEventCleanupAgentConfigProperties +) : RunnableAgent, CustomScheduledAgent { + + private val log by lazy { LoggerFactory.getLogger(javaClass) } + + private val deletedId = registry.createId("sql.eventCleanupAgent.deleted") + private val timingId = registry.createId("sql.eventCleanupAgent.timing") + + override fun run() { + val duration = Duration.ofDays(properties.maxAggregateAgeDays) + val cutoff = Instant.now().minus(duration) + log.info("Deleting aggregates last updated earlier than $cutoff ($duration)") + + registry.timer(timingId).record { + withPool(ConnectionPools.EVENTS.value) { + val rs = jooq.select(field("aggregate_type"), field("aggregateId")) + .from(table("event_aggregates")) + .where(timestampDiff(field("last_change_timestamp", Timestamp::class.java), currentTimestamp()) + .greaterThan(DayToSecond.valueOf(duration))) + .fetch() + .intoResultSet() + + var deleted = 0L + while (rs.next()) { + deleted++ + jooq.deleteFrom(table("event_aggregates")) + .where(field("aggregate_type").eq(rs.getString("aggregate_type")) + .and(field("aggregate_id").eq(rs.getString("aggregate_id")))) + .execute() + } + + registry.counter(deletedId).increment(deleted) + log.info("Deleted $deleted event aggregates") + } + } + } + + override fun getAgentType(): String = javaClass.simpleName + override fun getProviderName(): String = CoreProvider.PROVIDER_NAME + override fun getPollIntervalMillis() = properties.frequency.toMillis() + override fun getTimeoutMillis() = properties.timeout.toMillis() +} diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventRepository.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventRepository.kt new file mode 100644 index 00000000000..eec785f466d --- /dev/null +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventRepository.kt @@ -0,0 +1,270 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.sql.event + +import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.spectator.api.Registry +import com.netflix.spinnaker.clouddriver.event.Aggregate +import com.netflix.spinnaker.clouddriver.event.CompositeSpinnakerEvent +import com.netflix.spinnaker.clouddriver.event.EventMetadata +import com.netflix.spinnaker.clouddriver.event.SpinnakerEvent +import com.netflix.spinnaker.clouddriver.event.exceptions.AggregateChangeRejectedException +import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository +import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository.ListAggregatesCriteria +import com.netflix.spinnaker.clouddriver.sql.transactional +import com.netflix.spinnaker.config.ConnectionPools +import com.netflix.spinnaker.kork.sql.config.SqlProperties +import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties +import com.netflix.spinnaker.kork.sql.routing.withPool +import com.netflix.spinnaker.kork.version.ServiceVersion +import de.huxhorn.sulky.ulid.ULID +import io.github.resilience4j.retry.Retry +import io.github.resilience4j.retry.RetryConfig +import io.github.resilience4j.retry.RetryRegistry +import org.jooq.Condition +import org.jooq.DSLContext +import org.jooq.impl.DSL.currentTimestamp +import org.jooq.impl.DSL.field +import org.jooq.impl.DSL.table +import org.slf4j.LoggerFactory +import org.springframework.context.ApplicationEventPublisher +import org.springframework.validation.Validator +import java.time.Duration +import java.util.UUID + +class SqlEventRepository( + private val jooq: DSLContext, + sqlProperties: SqlProperties, + private val serviceVersion: ServiceVersion, + private val objectMapper: ObjectMapper, + private val applicationEventPublisher: ApplicationEventPublisher, + private val registry: Registry, + private val validator: Validator, + private val retryRegistry: RetryRegistry +) : EventRepository { + + private val log by lazy { LoggerFactory.getLogger(javaClass) } + private val retryProperties: SqlRetryProperties = sqlProperties.retries + + private val eventCountId = registry.createId("eventing.events") + + override fun save( + aggregateType: String, + aggregateId: String, + originatingVersion: Long, + newEvents: List + ) { + val eventNames = newEvents.joinToString { it.javaClass.simpleName } + log.debug("Saving $aggregateType/$aggregateId expecting version $originatingVersion with [$eventNames]") + + val aggregateCondition = field("aggregate_type").eq(aggregateType) + .and(field("aggregate_id").eq(aggregateId)) + + // TODO(rz): Get this from Spring? + val retry = RetryConfig.custom() + .maxAttempts(retryProperties.transactions.maxRetries) + .waitDuration(Duration.ofMillis(retryProperties.transactions.backoffMs)) + .ignoreExceptions(AggregateChangeRejectedException::class.java) + .build() + + try { + withPool(POOL_NAME) { + jooq.transactional(retryRegistry.retry("eventSave", retry)) { ctx -> + // Get or create the aggregate and immediately assert that this save operation is being committed against the + // most recent aggregate state. + val aggregate = ctx.maybeGetAggregate(aggregateCondition) ?: { + if (originatingVersion != 0L) { + // The aggregate doesn't exist and we're already expecting a non-zero version. + throw AggregateChangeRejectedException(-1, originatingVersion) + } + + // The aggregate doesn't exist yet, so we'll go ahead and seed it immediately. + val initialAggregate = mapOf( + field("aggregate_type") to aggregateType, + field("aggregate_id") to aggregateId, + field("token") to ulid.nextULID(), + field("version") to 0 + ) + + ctx.insertInto(AGGREGATES_TABLE) + .columns(initialAggregate.keys) + .values(initialAggregate.values) + .execute() + + Aggregate(aggregateType, aggregateId, 0) + }() + + if (aggregate.version != originatingVersion) { + throw AggregateChangeRejectedException(aggregate.version, originatingVersion) + } + + // Events have their own auto-incrementing sequence within an aggregate; so we need to get the last sequence + // and generate from there. + val lastSequence = ctx.select(field("sequence")).from(EVENTS_TABLE) + .where(aggregateCondition) + .orderBy(field("timestamp").desc()) + .limit(1) + .fetchOne(0, Long::class.java) + + log.debug("Last event sequence number is $lastSequence") + var nextSequence = lastSequence + + // Add the new events, doesn't matter what they are: At this point, they're "probably" valid, as the higher + // libs should be validating the event payload. + ctx.insertInto(EVENTS_TABLE) + .columns( + field("id"), + field("aggregate_type"), + field("aggregate_id"), + field("sequence"), + field("originating_version"), + field("timestamp"), + field("metadata"), + field("data") + ) + .let { insertValuesStep -> + var step = insertValuesStep + newEvents.forEach { + nextSequence = it.initialize(aggregateType, aggregateId, originatingVersion, nextSequence) + step = step.values(it.toSqlValues(objectMapper)) + } + step + } + .execute() + + // Update the aggregates table with a new version + ctx.update(AGGREGATES_TABLE) + .set(field("version"), field("version", Long::class.java).add(1)) + .set(field("last_change_timestamp"), currentTimestamp()) + .where(aggregateCondition) + .execute() + + log.debug("Event sequence number is now $nextSequence") + } + } + } catch (e: AggregateChangeRejectedException) { + registry.counter(eventCountId.withTags("aggregateType", aggregateType)).increment(newEvents.size.toLong()) + throw e + } catch (e: Exception) { + // This is totally handling it... + registry.counter(eventCountId.withTags("aggregateType", aggregateType)).increment(newEvents.size.toLong()) + throw SqlEventSystemException("Failed saving new events", e) + } + + log.debug("Saved $aggregateType/$aggregateId: [${newEvents.joinToString { it.javaClass.simpleName}}]") + registry.counter(eventCountId.withTags("aggregateType", aggregateType)).increment(newEvents.size.toLong()) + + newEvents.forEach { applicationEventPublisher.publishEvent(it) } + } + + /** + * Initialize the [SpinnakerEvent] lateinit properties (recursively, if necessary). + * + * This is a bit wonky: In the case of [ComposedSpinnakerEvent]s, we want to initialize the event so we can + * correctly serialize it, but we don't want to increment the sequence for these events as they aren't + * actually on the event log yet. If we're in a [ComposedSpinnakerEvent], we just provide a "-1" sequence + * number and a real, valid sequence will be assigned if/when it gets saved to the event log. + */ + private fun SpinnakerEvent.initialize( + aggregateType: String, + aggregateId: String, + originatingVersion: Long, + currentSequence: Long? + ): Long? { + var nextSequence = if (currentSequence != null) { + currentSequence + 1 + } else { + null + } + + // timestamp is calculated on the SQL server + setMetadata(EventMetadata( + id = UUID.randomUUID().toString(), + aggregateType = aggregateType, + aggregateId = aggregateId, + sequence = nextSequence ?: -1, + originatingVersion = originatingVersion, + serviceVersion = serviceVersion.resolve() + )) + + if (this is CompositeSpinnakerEvent) { + this.getComposedEvents().forEach { event -> + // We initialize composed events with a null sequence, since they won't actually get added to the log at + // this point; that's up to the action to either add it or not, at which point it'll get a sequence number + event.initialize(aggregateType, aggregateId, originatingVersion, null)?.let { + nextSequence = it + } + } + } + + return nextSequence + } + + override fun list(aggregateType: String, aggregateId: String): List { + return withPool(POOL_NAME) { + jooq.select().from(EVENTS_TABLE) + .where(field("aggregate_type").eq(aggregateType) + .and(field("aggregate_id").eq(aggregateId))) + .orderBy(field("sequence").asc()) + .fetchEvents(objectMapper) + } + } + + override fun listAggregates(criteria: ListAggregatesCriteria): EventRepository.ListAggregatesResult { + // TODO(rz): validate criteria + + return withPool(POOL_NAME) { + val conditions = mutableListOf() + criteria.aggregateType?.let { conditions.add(field("aggregate_type").eq(it)) } + criteria.token?.let { conditions.add(field("token").greaterThan(it)) } + + val perPage = criteria.perPage.coerceAtMost(10_000) + + val aggregates = jooq.select().from(AGGREGATES_TABLE) + .withConditions(conditions) + .orderBy(field("token").asc()) + .limit(perPage) + .fetchAggregates() + + val remaining = jooq.selectCount().from(AGGREGATES_TABLE) + .withConditions(conditions) + .fetchOne(0, Int::class.java) - perPage + + EventRepository.ListAggregatesResult( + aggregates = aggregates.map { it.model }, + nextPageToken = if (remaining > 0) aggregates.lastOrNull()?.token else null + ) + } + } + + private fun DSLContext.maybeGetAggregate(aggregateCondition: Condition): Aggregate? { + return select() + .from(AGGREGATES_TABLE) + .where(aggregateCondition) + .limit(1) + .fetchAggregates() + .firstOrNull() + ?.model + } + + companion object { + private val POOL_NAME = ConnectionPools.EVENTS.value + private val AGGREGATES_TABLE = table("event_aggregates") + private val EVENTS_TABLE = table("events") + + private val ulid = ULID() + } +} diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventSystemException.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventSystemException.kt new file mode 100644 index 00000000000..ce2fe035f09 --- /dev/null +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventSystemException.kt @@ -0,0 +1,27 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.sql.event + +import com.netflix.spinnaker.clouddriver.event.exceptions.EventingException +import com.netflix.spinnaker.clouddriver.sql.exceptions.SqlException +import com.netflix.spinnaker.kork.exceptions.SystemException + +class SqlEventSystemException(message: String, cause: Throwable?) : SystemException( + message, + cause +), EventingException, SqlException { + constructor(message: String) : this(message, null) +} diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/dsl.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/dsl.kt new file mode 100644 index 00000000000..0c18d6fd182 --- /dev/null +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/event/dsl.kt @@ -0,0 +1,119 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.sql.event + +import com.fasterxml.jackson.core.JsonProcessingException +import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.spinnaker.clouddriver.event.Aggregate +import com.netflix.spinnaker.clouddriver.event.EventMetadata +import com.netflix.spinnaker.clouddriver.event.SpinnakerEvent +import com.netflix.spinnaker.clouddriver.event.exceptions.InvalidEventTypeException +import org.jooq.Condition +import org.jooq.Record +import org.jooq.Select +import org.jooq.SelectConditionStep +import org.jooq.SelectWhereStep +import org.jooq.impl.DSL.currentTimestamp +import java.sql.ResultSet + +/** + * Adds an arbitrary number of [conditions] to a query joined by `AND` operator. + */ +internal fun SelectWhereStep.withConditions(conditions: List): SelectConditionStep { + return if (conditions.isNotEmpty()) this.where( + conditions.reduce { acc, condition -> acc.and(condition) } + ) else { + where("1=1") + } +} + +/** + * Internal model of [Aggregate]. + */ +internal class SqlAggregate( + val model: Aggregate, + val token: String +) + +/** + * Runs [this] as a "select one" query and returns a single [SqlAggregate]. + * It is assumed the underlying [Aggregate] exists. + */ +internal fun Select.fetchAggregates(): List = + fetch().intoResultSet().let { rs -> + mutableListOf().apply { + while (rs.next()) { + add(SqlAggregate( + model = Aggregate( + type = rs.getString("aggregate_type"), + id = rs.getString("aggregate_id"), + version = rs.getLong("version") + ), + token = rs.getString("token") + )) + } + } + } + +/** + * Maps a jOOQ result set to an [Aggregate] collection. + */ +internal class AggregateMapper { + fun map(rs: ResultSet): Collection { + val results = mutableListOf() + while (rs.next()) { + results.add(Aggregate( + rs.getString("aggregate_type"), + rs.getString("aggregate_id"), + rs.getLong("version") + )) + } + return results + } +} + +/** + * Converts a [SpinnakerEvent] to a SQL event row. The values are ordered the same as the schema's columns. + */ +internal fun SpinnakerEvent.toSqlValues(objectMapper: ObjectMapper): Collection = listOf( + getMetadata().id, + getMetadata().aggregateType, + getMetadata().aggregateId, + getMetadata().sequence, + getMetadata().originatingVersion, + currentTimestamp(), + objectMapper.writeValueAsString(getMetadata()), + // TODO(rz): optimize + objectMapper.writeValueAsString(this) +) + +/** + * Executes a SQL select query and converts the ResultSet into a list of [SpinnakerEvent]. + */ +internal fun Select.fetchEvents(objectMapper: ObjectMapper): List = + fetch().intoResultSet().let { rs -> + mutableListOf().apply { + while (rs.next()) { + try { + add(objectMapper.readValue(rs.getString("data"), SpinnakerEvent::class.java).apply { + setMetadata(objectMapper.readValue(rs.getString("metadata"), EventMetadata::class.java)) + }) + } catch (e: JsonProcessingException) { + throw InvalidEventTypeException(e) + } + } + } + } diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/exceptions/SqlException.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/exceptions/SqlException.kt new file mode 100644 index 00000000000..140b0e5e7ec --- /dev/null +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/exceptions/SqlException.kt @@ -0,0 +1,21 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.sql.exceptions + +/** + * Marker + */ +interface SqlException diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/sql.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/sql.kt index 7439e5deff1..3e8b120eaa4 100644 --- a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/sql.kt +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/clouddriver/sql/sql.kt @@ -17,6 +17,7 @@ package com.netflix.spinnaker.clouddriver.sql import com.netflix.spinnaker.kork.core.RetrySupport import com.netflix.spinnaker.kork.sql.config.RetryProperties +import io.github.resilience4j.retry.Retry import org.jooq.DSLContext import org.jooq.impl.DSL import org.jooq.impl.DSL.field @@ -33,8 +34,9 @@ internal val taskStatesFields = listOf("id", "task_id", "created_at", "state", " internal val taskResultsFields = listOf("id", "task_id", "body").map { field(it) } /** - * Run the provided [fn] in a transaction. + * Run the provided [fn] in a transaction, retrying on failures using [retryProperties]. */ +@Deprecated("use transactional(retry, fn) instead") internal fun DSLContext.transactional(retryProperties: RetryProperties, fn: (DSLContext) -> Unit) { retrySupport.retry({ transaction { ctx -> @@ -43,6 +45,20 @@ internal fun DSLContext.transactional(retryProperties: RetryProperties, fn: (DSL }, retryProperties.maxRetries, retryProperties.backoffMs, false) } +/** + * Run the provided [fn] in a transaction, retrying on failures using [retry]. + */ +internal fun DSLContext.transactional( + retry: Retry, + fn: (DSLContext) -> Unit +) { + retry.executeRunnable { + transaction { ctx -> + fn(DSL.using(ctx)) + } + } +} + /** * Run the provided [fn] with retry support. */ diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/ConnectionPools.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/ConnectionPools.kt index d273ec30f0b..906c3de2225 100644 --- a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/ConnectionPools.kt +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/ConnectionPools.kt @@ -20,5 +20,6 @@ enum class ConnectionPools( ) { TASKS("tasks"), CACHE_WRITER("cacheWriter"), - CACHE_READER("cacheReader") + CACHE_READER("cacheReader"), + EVENTS("events") } diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt index f07964a780e..65dace3bf33 100644 --- a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt @@ -18,24 +18,34 @@ package com.netflix.spinnaker.config import com.fasterxml.jackson.databind.ObjectMapper import com.netflix.spectator.api.Registry import com.netflix.spinnaker.clouddriver.data.task.TaskRepository +import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository import com.netflix.spinnaker.clouddriver.sql.SqlProvider import com.netflix.spinnaker.clouddriver.sql.SqlTaskCleanupAgent import com.netflix.spinnaker.clouddriver.sql.SqlTaskRepository +import com.netflix.spinnaker.clouddriver.sql.event.SqlEventCleanupAgent +import com.netflix.spinnaker.clouddriver.sql.event.SqlEventRepository +import com.netflix.spinnaker.kork.jackson.ObjectMapperSubtypeConfigurer +import com.netflix.spinnaker.kork.jackson.ObjectMapperSubtypeConfigurer.SubtypeLocator import com.netflix.spinnaker.kork.sql.config.DefaultSqlConfiguration import com.netflix.spinnaker.kork.sql.config.SqlProperties +import com.netflix.spinnaker.kork.telemetry.InstrumentedProxy +import com.netflix.spinnaker.kork.version.ServiceVersion +import io.github.resilience4j.retry.RetryRegistry import org.jooq.DSLContext import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.context.ApplicationEventPublisher import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Import +import org.springframework.validation.Validator import java.time.Clock @Configuration @ConditionalOnProperty("sql.enabled") @Import(DefaultSqlConfiguration::class) -@EnableConfigurationProperties(SqlTaskCleanupAgentProperties::class) +@EnableConfigurationProperties(SqlTaskCleanupAgentProperties::class, SqlEventCleanupAgentConfigProperties::class) class SqlConfiguration { @Bean @@ -64,4 +74,41 @@ class SqlConfiguration { @ConditionalOnExpression("\${sql.read-only:false} == false") fun sqlProvider(sqlTaskCleanupAgent: SqlTaskCleanupAgent): SqlProvider = SqlProvider(mutableListOf(sqlTaskCleanupAgent)) + + @Bean + fun sqlEventRepository( + jooq: DSLContext, + sqlProperties: SqlProperties, + serviceVersion: ServiceVersion, + objectMapper: ObjectMapper, + applicationEventPublisher: ApplicationEventPublisher, + registry: Registry, + defaultValidator: Validator, + retryRegistry: RetryRegistry, + subtypeLocators: List + ): EventRepository { + // TODO(rz): ObjectMapperSubtypeConfigurer should become a standard kork feature. This is pretty gross. + ObjectMapperSubtypeConfigurer(true).registerSubtypes(objectMapper, subtypeLocators) + return SqlEventRepository( + jooq, + sqlProperties, + serviceVersion, + objectMapper, + applicationEventPublisher, + registry, + defaultValidator, + retryRegistry + ).let { + InstrumentedProxy.proxy(registry, it, "eventRepository", mapOf("backend" to "sql")) + } + } + + @Bean + fun sqlEventCleanupAgent( + jooq: DSLContext, + registry: Registry, + properties: SqlEventCleanupAgentConfigProperties + ): SqlEventCleanupAgent { + return SqlEventCleanupAgent(jooq, registry, properties) + } } diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlEventCleanupAgentConfigProperties.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlEventCleanupAgentConfigProperties.kt new file mode 100644 index 00000000000..75fde254781 --- /dev/null +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlEventCleanupAgentConfigProperties.kt @@ -0,0 +1,42 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.config + +import org.springframework.boot.context.properties.ConfigurationProperties +import org.springframework.validation.annotation.Validated +import java.time.Duration +import javax.validation.constraints.Positive + +@Validated +@ConfigurationProperties("spinnaker.clouddriver.eventing.cleanup-agent") +class SqlEventCleanupAgentConfigProperties { + /** + * The frequency, in milliseconds, of how often the cleanup agent will run. Defaults to 30 minutes. + */ + var frequency: Duration = Duration.ofMinutes(30) + + /** + * The ceiling execution time that the agent should be allowed to run before it will be timed out and available for + * reschedule onto a different Clouddriver instance. Defaults to 1 hour. + */ + var timeout: Duration = Duration.ofHours(1) + + /** + * The max age of an [Aggregate]. Defaults to 7 days. + */ + @Positive + var maxAggregateAgeDays: Long = 7 +} diff --git a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlTaskCleanupAgentProperties.kt b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlTaskCleanupAgentProperties.kt index 08c32820a49..254cbaa252c 100644 --- a/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlTaskCleanupAgentProperties.kt +++ b/clouddriver-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlTaskCleanupAgentProperties.kt @@ -19,7 +19,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties import java.util.concurrent.TimeUnit @ConfigurationProperties("sql.agent.task-cleanup") -data class SqlTaskCleanupAgentProperties( +class SqlTaskCleanupAgentProperties( var completedTtlMs: Long = TimeUnit.MINUTES.toMillis(60), var batchSize: Int = 100 ) diff --git a/clouddriver-sql/src/main/resources/db/changelog-master.yml b/clouddriver-sql/src/main/resources/db/changelog-master.yml index bca0f82f5b8..e70c31c83b4 100644 --- a/clouddriver-sql/src/main/resources/db/changelog-master.yml +++ b/clouddriver-sql/src/main/resources/db/changelog-master.yml @@ -8,3 +8,6 @@ databaseChangeLog: - include: file: changelog/20181205-agent-scheduler.yml relativeToChangelogFile: true +- include: + file: changelog/20190822-initial-event-schema.yml + relativeToChangelogFile: true diff --git a/clouddriver-sql/src/main/resources/db/changelog/20190822-initial-event-schema.yml b/clouddriver-sql/src/main/resources/db/changelog/20190822-initial-event-schema.yml new file mode 100644 index 00000000000..aad39ccd5fd --- /dev/null +++ b/clouddriver-sql/src/main/resources/db/changelog/20190822-initial-event-schema.yml @@ -0,0 +1,159 @@ +databaseChangeLog: + - changeSet: + id: create-event-aggregates-table + author: robzienert + changes: + - createTable: + tableName: event_aggregates + columns: + - column: + name: aggregate_type + type: varchar(255) + constraints: + nullable: false + primaryKey: true + - column: + name: aggregate_id + type: char(64) + constraints: + nullable: false + primaryKey: true + - column: + name: token + type: char(26) + constraints: + nullable: false + unique: true + - column: + name: version + type: bigint(20) + constraints: + nullable: false + - column: + name: last_change_timestamp + type: timestamp + - modifySql: + dbms: mysql + append: + value: " engine innodb" + rollback: + - dropTable: + tableName: event_aggregates + + - changeSet: + id: create-event-aggregates-table-indices + author: robzienert + changes: + - createIndex: + indexName: aggregate_type_token_idx + tableName: event_aggregates + columns: + - column: + name: aggregate_type + - column: + name: token + - createIndex: + indexName: aggregate_token_idx + tableName: event_aggregates + columns: + - column: + name: token + - createIndex: + indexName: aggregate_last_change_timestamp_idx + tableName: event_aggregates + columns: + - column: + name: last_change_timestamp + rollback: + - dropIndex: + indexName: aggregate_type_token_idx + tableName: event_aggregates + - dropIndex: + indexName: aggregate_token_idx + tableName: event_aggregates + + - changeSet: + id: create-events-table + author: robzienert + changes: + - createTable: + tableName: events + columns: + - column: + name: id + type: char(36) + constraints: + primaryKey: true + nullable: false + - column: + name: aggregate_type + type: varchar(255) + constraints: + nullable: false + - column: + name: aggregate_id + type: char(64) + constraints: + nullable: false + - column: + name: sequence + type: bigint(20) + constraints: + nullable: false + - column: + name: originating_version + type: bigint(20) + constraints: + nullable: false + - column: + name: timestamp + type: timestamp(6) + constraints: + nullable: false + - column: + name: metadata + type: text + constraints: + nullable: false + - column: + name: data + type: longtext + constraints: + nullable: false + - modifySql: + dbms: mysql + append: + value: " engine innodb" + rollback: + - dropTable: + tableName: events + + - changeSet: + id: create-events-table-indices + author: robzienert + changes: + - createIndex: + indexName: event_aggregate_type_id_sequence_idx + tableName: events + columns: + - column: + name: aggregate_type + - column: + name: aggregate_id + - column: + name: sequence + - addForeignKeyConstraint: + baseColumnNames: aggregate_type,aggregate_id + baseTableName: events + constraintName: event_aggregate_fk + onDelete: CASCADE + onUpdate: RESTRICT + referencedColumnNames: aggregate_type,aggregate_id + referencedTableName: event_aggregates + rollback: + - dropIndex: + indexName: event_aggregate_type_id_sequence_idx + tableName: events + - dropForeignKeyConstraint: + constraintName: event_aggregate_fk + baseTableName: events diff --git a/clouddriver-sql/src/test/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventRepositoryTest.kt b/clouddriver-sql/src/test/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventRepositoryTest.kt new file mode 100644 index 00000000000..17085efb651 --- /dev/null +++ b/clouddriver-sql/src/test/kotlin/com/netflix/spinnaker/clouddriver/sql/event/SqlEventRepositoryTest.kt @@ -0,0 +1,216 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.clouddriver.sql.event + +import com.fasterxml.jackson.databind.ObjectMapper +import com.netflix.spectator.api.NoopRegistry +import com.netflix.spinnaker.clouddriver.event.AbstractSpinnakerEvent +import com.netflix.spinnaker.clouddriver.event.exceptions.AggregateChangeRejectedException +import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository.ListAggregatesCriteria +import com.netflix.spinnaker.clouddriver.event.persistence.EventRepository.ListAggregatesResult +import com.netflix.spinnaker.kork.sql.config.SqlProperties +import com.netflix.spinnaker.kork.sql.test.SqlTestUtil +import com.netflix.spinnaker.kork.version.ServiceVersion +import dev.minutest.junit.JUnit5Minutests +import dev.minutest.rootContext +import io.github.resilience4j.retry.RetryRegistry +import io.mockk.every +import io.mockk.mockk +import org.springframework.context.ApplicationEventPublisher +import org.springframework.validation.Validator +import org.testcontainers.shaded.com.fasterxml.jackson.annotation.JsonTypeName +import strikt.api.expect +import strikt.api.expectThat +import strikt.api.expectThrows +import strikt.assertions.containsExactly +import strikt.assertions.hasSize +import strikt.assertions.isA +import strikt.assertions.isEqualTo +import strikt.assertions.isNotEmpty +import strikt.assertions.isNotNull +import strikt.assertions.isNull + +class SqlEventRepositoryTest : JUnit5Minutests { + + fun tests() = rootContext { + fixture { + Fixture() + } + + context("event lifecycle") { + test("events can be saved") { + subject.save("agg", "1", 0, listOf(MyEvent("one"))) + + expectThat(subject.listAggregates(ListAggregatesCriteria())) + .isA() + .get { aggregates }.isNotEmpty() + .get { first() } + .and { + get { type }.isEqualTo("agg") + get { id }.isEqualTo("1") + get { version }.isEqualTo(1) + } + + subject.save("agg", "1", 1, listOf(MyEvent("two"), MyEvent("three"))) + + expectThat(subject.list("agg", "1")) + .isA>() + .isNotEmpty() + .hasSize(3) + .and { + get { map { it.value } } + .isA>() + .containsExactly("one", "two", "three") + } + } + + test("events saved against old version are rejected") { + expectThrows { + subject.save("agg", "1", 10, listOf(MyEvent("two"))) + } + + subject.save("agg", "1", 0, listOf(MyEvent("one"))) + + expectThrows { + subject.save("agg", "1", 0, listOf(MyEvent("two"))) + } + } + + context("listing aggregates") { + fun Fixture.setupAggregates() { + subject.save("foo", "1", 0, listOf(MyEvent("hi foo"))) + subject.save("bar", "1", 0, listOf(MyEvent("hi bar 1"))) + subject.save("bar", "2", 0, listOf(MyEvent("hi bar 2"))) + subject.save("bar", "3", 0, listOf(MyEvent("hi bar 3"))) + subject.save("bar", "4", 0, listOf(MyEvent("hi bar 4"))) + subject.save("bar", "5", 0, listOf(MyEvent("hi bar 5"))) + } + + test("default criteria") { + setupAggregates() + + expectThat(subject.listAggregates(ListAggregatesCriteria())) + .isA() + .and { + get { aggregates }.hasSize(6) + get { nextPageToken }.isNull() + } + } + + test("filtering by type") { + setupAggregates() + + expectThat(subject.listAggregates(ListAggregatesCriteria(aggregateType = "foo"))) + .isA() + .and { + get { aggregates }.hasSize(1) + .get { first() } + .and { + get { type }.isEqualTo("foo") + get { id }.isEqualTo("1") + } + get { nextPageToken }.isNull() + } + } + + test("pagination") { + setupAggregates() + + expect { + var response = subject.listAggregates(ListAggregatesCriteria(perPage = 2)) + that(response) + .describedAs("first page") + .isA() + .and { + get { aggregates }.hasSize(2) + .and { + get { first().type }.isEqualTo("foo") + get { last() } + .and { + get { type }.isEqualTo("bar") + get { id }.isEqualTo("1") + } + } + get { nextPageToken }.isNotNull() + } + + response = subject.listAggregates(ListAggregatesCriteria(perPage = 2, token = response.nextPageToken)) + that(response) + .describedAs("second page") + .isA() + .and { + get { aggregates }.hasSize(2) + .and { + get { first().type }.isEqualTo("bar") + get { first().id }.isEqualTo("2") + get { last().type }.isEqualTo("bar") + get { last().id }.isEqualTo("3") + } + get { nextPageToken }.isNotNull() + } + + that(subject.listAggregates(ListAggregatesCriteria(perPage = 2, token = response.nextPageToken))) + .describedAs("last page") + .isA() + .and { + get { aggregates }.hasSize(2) + .and { + get { first().type }.isEqualTo("bar") + get { first().id }.isEqualTo("4") + get { last().type }.isEqualTo("bar") + get { last().id }.isEqualTo("5") + } + get { nextPageToken }.isNull() + } + } + } + } + } + } + + private inner class Fixture { + val database = SqlTestUtil.initTcMysqlDatabase()!! + + val serviceVersion: ServiceVersion = mockk(relaxed = true) + val applicationEventPublisher: ApplicationEventPublisher = mockk(relaxed = true) + val validator: Validator = mockk(relaxed = true) + val retryRegistry: RetryRegistry = RetryRegistry.ofDefaults() + + val subject = SqlEventRepository( + jooq = database.context, + sqlProperties = SqlProperties(), + serviceVersion = serviceVersion, + objectMapper = ObjectMapper().apply { + findAndRegisterModules() + registerSubtypes(MyEvent::class.java) + }, + applicationEventPublisher = applicationEventPublisher, + registry = NoopRegistry(), + validator = validator, + retryRegistry = retryRegistry + ) + + init { + every { serviceVersion.resolve() } returns "v1.2.3" + SqlTestUtil.cleanupDb(database.context) + } + } + + @JsonTypeName("myEvent") + private class MyEvent( + val value: String + ) : AbstractSpinnakerEvent() +}