diff --git a/cats/cats-sql/cats-sql.gradle b/cats/cats-sql/cats-sql.gradle index b1ec306c212..feb7c7039b8 100644 --- a/cats/cats-sql/cats-sql.gradle +++ b/cats/cats-sql/cats-sql.gradle @@ -15,9 +15,9 @@ */ apply from: "$rootDir/gradle/kotlin.gradle" +apply from: "$rootDir/gradle/kotlin-test.gradle" apply plugin: "groovy" - tasks.compileGroovy.enabled = false dependencies { @@ -27,10 +27,6 @@ dependencies { implementation project(":clouddriver-security") implementation project(":clouddriver-sql") - compileOnly "org.projectlombok:lombok" - annotationProcessor "org.projectlombok:lombok" - testAnnotationProcessor "org.projectlombok:lombok" - implementation "com.fasterxml.jackson.core:jackson-databind" implementation "com.netflix.spectator:spectator-api" implementation "com.netflix.spinnaker.fiat:fiat-api:$fiatVersion" diff --git a/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlCache.kt b/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlCache.kt index dd9ecab7df9..534d2afd4e3 100644 --- a/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlCache.kt +++ b/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlCache.kt @@ -1,7 +1,6 @@ package com.netflix.spinnaker.cats.sql.cache import com.fasterxml.jackson.databind.ObjectMapper -import com.google.common.hash.Hashing import com.netflix.spinnaker.cats.cache.CacheData import com.netflix.spinnaker.cats.cache.CacheFilter import com.netflix.spinnaker.cats.cache.DefaultCacheData @@ -52,7 +51,7 @@ class SqlCache( private val coroutineContext: CoroutineContext?, private val clock: Clock, private val sqlRetryProperties: SqlRetryProperties, - private val tableNamespace: String?, + tableNamespace: String?, private val cacheMetrics: SqlCacheMetrics, private val dynamicConfigService: DynamicConfigService, private val sqlConstraints: SqlConstraints @@ -64,11 +63,12 @@ class SqlCache( private val schemaVersion = SqlSchemaVersion.current() private val useRegexp = """.*[\?\[].*""".toRegex() private val cleanRegexp = """\.+\*""".toRegex() - private val typeSanitization = """[^A-Za-z0-9_]""".toRegex() private val log = LoggerFactory.getLogger(SqlCache::class.java) } + private val sqlNames = SqlNames(tableNamespace, sqlConstraints) + private var createdTables = ConcurrentSkipListSet() init { @@ -90,7 +90,7 @@ class SqlCache( try { ids.chunked(dynamicConfigService.getConfig(Int::class.java, "sql.cache.read-batch-size", 500)) { chunk -> withRetry(RetryCategory.WRITE) { - jooq.deleteFrom(table(resourceTableName(type))) + jooq.deleteFrom(table(sqlNames.resourceTableName(type))) .where("id in (${chunk.joinToString(",") { "'$it'" }})") .execute() } @@ -338,7 +338,7 @@ class SqlCache( val ids = try { withRetry(RetryCategory.READ) { jooq.select(field("id")) - .from(table(resourceTableName(type))) + .from(table(sqlNames.resourceTableName(type))) .fetch() .intoSet(field("id"), String::class.java) } @@ -423,9 +423,9 @@ class SqlCache( val sql = if (glob.matches(useRegexp)) { val filter = glob.replace("?", ".", true).replace("*", ".*").replace(cleanRegexp, ".*") - "SELECT id FROM ${resourceTableName(type)} WHERE id REGEXP '^$filter\$'" + "SELECT id FROM ${sqlNames.resourceTableName(type)} WHERE id REGEXP '^$filter\$'" } else { - "SELECT id FROM ${resourceTableName(type)} WHERE id LIKE '${glob.replace('*', '%')}'" + "SELECT id FROM ${sqlNames.resourceTableName(type)} WHERE id LIKE '${glob.replace('*', '%')}'" } val ids = try { @@ -474,7 +474,7 @@ class SqlCache( fun cleanOnDemand(maxAgeMs: Long): Int { val toClean = withRetry(RetryCategory.READ) { jooq.select(field("id")) - .from(table(resourceTableName(onDemandType))) + .from(table(sqlNames.resourceTableName(onDemandType))) .where(field("last_updated").lt(clock.millis() - maxAgeMs)) .fetch() .into(String::class.java) @@ -559,7 +559,7 @@ class SqlCache( toStore.chunked(dynamicConfigService.getConfig(Int::class.java, "sql.cache.write-batch-size", 100)) { chunk -> try { val insert = jooq.insertInto( - table(resourceTableName(type)), + table(sqlNames.resourceTableName(type)), field("id"), field("agent"), field("application"), @@ -592,7 +592,7 @@ class SqlCache( val exists = withRetry(RetryCategory.READ) { jooq.fetchExists( jooq.select() - .from(resourceTableName(type)) + .from(sqlNames.resourceTableName(type)) .where(field("id").eq(it), field("agent").eq(agent)) .forUpdate() ) @@ -600,7 +600,7 @@ class SqlCache( result.selectQueries.incrementAndGet() if (exists) { withRetry(RetryCategory.WRITE) { - jooq.update(table(resourceTableName(type))) + jooq.update(table(sqlNames.resourceTableName(type))) .set(field("application"), apps[it]) .set(field("body_hash"), hashes[it]) .set(field("body"), bodies[it]) @@ -613,7 +613,7 @@ class SqlCache( } else { withRetry(RetryCategory.WRITE) { jooq.insertInto( - table(resourceTableName(type)), + table(sqlNames.resourceTableName(type)), field("id"), field("agent"), field("application"), @@ -745,7 +745,7 @@ class SqlCache( pointers.chunked(dynamicConfigService.getConfig(Int::class.java, "sql.cache.write-batch-size", 100)) { chunk -> try { val insert = jooq.insertInto( - table(relTableName(type)), + table(sqlNames.relTableName(type)), field("uuid"), field("id"), field("rel_id"), @@ -775,7 +775,7 @@ class SqlCache( .chunked(dynamicConfigService.getConfig(Int::class.java, "sql.cache.write-batch-size", 100)) { chunk -> try { val insert = jooq.insertInto( - table(relTableName(relType)), + table(sqlNames.relTableName(relType)), field("uuid"), field("id"), field("rel_id"), @@ -813,7 +813,7 @@ class SqlCache( try { fwdToDelete.forEach { withRetry(RetryCategory.WRITE) { - jooq.deleteFrom(table(relTableName(type))) + jooq.deleteFrom(table(sqlNames.relTableName(type))) .where(field("uuid").eq(it.value)) .execute() } @@ -822,7 +822,7 @@ class SqlCache( revToDelete.forEach { if (oldRevIdsToType.getOrDefault(it.key, "").isNotBlank()) { withRetry(RetryCategory.WRITE) { - jooq.deleteFrom(table(relTableName(oldRevIdsToType[it.key]!!))) + jooq.deleteFrom(table(sqlNames.relTableName(oldRevIdsToType[it.key]!!))) .where(field("uuid").eq(it.value)) .execute() } @@ -843,9 +843,9 @@ class SqlCache( if (!createdTables.contains(type)) { try { withRetry(RetryCategory.WRITE) { - jooq.execute("CREATE TABLE IF NOT EXISTS ${resourceTableName(type)} " + + jooq.execute("CREATE TABLE IF NOT EXISTS ${sqlNames.resourceTableName(type)} " + "LIKE cats_v${schemaVersion}_resource_template") - jooq.execute("CREATE TABLE IF NOT EXISTS ${relTableName(type)} " + + jooq.execute("CREATE TABLE IF NOT EXISTS ${sqlNames.relTableName(type)} " + "LIKE cats_v${schemaVersion}_rel_template") } @@ -858,9 +858,9 @@ class SqlCache( // TODO not sure if best schema for onDemand try { withRetry(RetryCategory.WRITE) { - jooq.execute("CREATE TABLE IF NOT EXISTS ${resourceTableName(onDemandType)} " + + jooq.execute("CREATE TABLE IF NOT EXISTS ${sqlNames.resourceTableName(onDemandType)} " + "LIKE cats_v${schemaVersion}_resource_template") - jooq.execute("CREATE TABLE IF NOT EXISTS ${relTableName(onDemandType)} " + + jooq.execute("CREATE TABLE IF NOT EXISTS ${sqlNames.relTableName(onDemandType)} " + "LIKE cats_v${schemaVersion}_rel_template") } @@ -884,47 +884,6 @@ class SqlCache( } } - private fun resourceTableName(type: String): String = - checkTableName("cats_v${schemaVersion}_", sanitizeType(type), "") - - private fun relTableName(type: String): String = - checkTableName("cats_v${schemaVersion}_", sanitizeType(type), "_rel") - - private fun sanitizeType(type: String): String { - return type.replace(typeSanitization, "_") - } - - /** - * Computes the actual name of the table less than MAX_TABLE_NAME_LENGTH characters long. - * It always keeps prefix with tablenamespace but can shorten name and suffix in that order. - * @return computed table name - */ - private fun checkTableName(prefix: String, name: String, suffix: String): String { - var base = prefix - if (tableNamespace != null) { - base = "${prefix + tableNamespace}_" - } - - // Optimistic and most frequent case - val tableName = base + name + suffix - if (tableName.length < sqlConstraints.maxTableNameLength) { - return tableName - } - - // Hash the name and keep the suffix - val hash = Hashing.murmur3_128().hashBytes((name + suffix).toByteArray()).toString().substring(0..15) - val available = sqlConstraints.maxTableNameLength - base.length - suffix.length - hash.length - 1 - if (available >= 0) { - return base + name.substring(0..available) + hash + suffix - } - - // Remove suffix - if (available + suffix.length >= 0) { - return base + name.substring(0..(available + suffix.length)) + hash - } - throw IllegalArgumentException("property sql.table-namespace $tableNamespace is too long") - } - private fun getHash(body: String?): String? { if (body.isNullOrBlank()) { return null @@ -945,7 +904,7 @@ class SqlCache( return withRetry(RetryCategory.READ) { jooq .select(field("body_hash"), field("id")) - .from(table(resourceTableName(type))) + .from(table(sqlNames.resourceTableName(type))) .where( field("agent").eq(agent) ) @@ -958,7 +917,7 @@ class SqlCache( return withRetry(RetryCategory.READ) { jooq .select(field("uuid"), field("id"), field("rel_id"), field("rel_agent")) - .from(table(relTableName(type))) + .from(table(sqlNames.relTableName(type))) .where(field("rel_agent").eq(sourceAgent)) .fetch() .into(RelId::class.java) @@ -969,7 +928,7 @@ class SqlCache( return withRetry(RetryCategory.READ) { jooq .select(field("uuid"), field("id"), field("rel_id"), field("rel_agent")) - .from(table(relTableName(type))) + .from(table(sqlNames.relTableName(type))) .where( field("rel_agent").eq(sourceAgent), field("rel_type").eq(origType) @@ -998,7 +957,7 @@ class SqlCache( withRetry(RetryCategory.READ) { cacheData.addAll( jooq.select(field("body")) - .from(table(resourceTableName(type))) + .from(table(sqlNames.resourceTableName(type))) .fetch() .getValues(0) .asSequence() @@ -1060,7 +1019,7 @@ class SqlCache( withRetry(RetryCategory.READ) { cacheData.addAll( jooq.select(field("body")) - .from(table(resourceTableName(type))) + .from(table(sqlNames.resourceTableName(type))) .where(field("application").eq(application)) .fetch() .getValues(0) @@ -1119,7 +1078,7 @@ class SqlCache( field(sql("null")).`as`("rel_id"), field(sql("null")).`as`("rel_type") ) - .from(table(resourceTableName(type))) + .from(table(sqlNames.resourceTableName(type))) .where(field("application").eq(application)) .unionAll( jooq.select( @@ -1128,8 +1087,8 @@ class SqlCache( field("rel.rel_id").`as`("rel_id"), field("rel.rel_type").`as`("rel_type") ) - .from(table(resourceTableName(type)).`as`("r")) - .innerJoin(table(relTableName(type)).`as`("rel")) + .from(table(sqlNames.resourceTableName(type)).`as`("r")) + .innerJoin(table(sqlNames.relTableName(type)).`as`("rel")) .on(sql("rel.id=r.id")) .where(relWhere) .groupBy( @@ -1204,7 +1163,7 @@ class SqlCache( field(sql("null")).`as`("rel_id"), field(sql("null")).`as`("rel_type") ) - .from(table(resourceTableName(type))) + .from(table(sqlNames.resourceTableName(type))) .unionAll( jooq.select( field(sql("null")).`as`("body"), @@ -1212,7 +1171,7 @@ class SqlCache( field("rel_id").`as`("rel_id"), field("rel_type").`as`("rel_type") ) - .from(table(relTableName(type))) + .from(table(sqlNames.relTableName(type))) .where(relWhere) ) .fetch() @@ -1275,7 +1234,7 @@ class SqlCache( private fun selectBodies(type: String, ids: List): Collection { return withRetry(RetryCategory.READ) { jooq.select(field("body")) - .from(table(resourceTableName(type))) + .from(table(sqlNames.resourceTableName(type))) .where("ID in (${ids.joinToString(",") { "'$it'" }})") .fetch() .getValues(0) @@ -1301,7 +1260,7 @@ class SqlCache( field(sql("null")).`as`("rel_id"), field(sql("null")).`as`("rel_type") ) - .from(table(resourceTableName(type))) + .from(table(sqlNames.resourceTableName(type))) .where(where) .unionAll( jooq.select( @@ -1310,7 +1269,7 @@ class SqlCache( field("rel_id").`as`("rel_id"), field("rel_type").`as`("rel_type") ) - .from(table(relTableName(type))) + .from(table(sqlNames.relTableName(type))) .where(relWhere) ) .fetch() @@ -1321,7 +1280,7 @@ class SqlCache( private fun selectIdentifiers(type: String, ids: List): MutableCollection { return withRetry(RetryCategory.READ) { jooq.select(field("id")) - .from(table(resourceTableName(type))) + .from(table(sqlNames.resourceTableName(type))) .where("id in (${ids.joinToString(",") { "'$it'" }})") .fetch() .intoSet(field("id"), String::class.java) diff --git a/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlCacheCleanupAgent.kt b/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlCacheCleanupAgent.kt new file mode 100644 index 00000000000..5a80395a5dd --- /dev/null +++ b/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlCacheCleanupAgent.kt @@ -0,0 +1,187 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.cats.sql.cache + +import com.netflix.spectator.api.Registry +import com.netflix.spinnaker.cats.agent.AgentDataType.Authority.AUTHORITATIVE +import com.netflix.spinnaker.cats.agent.CachingAgent +import com.netflix.spinnaker.cats.agent.RunnableAgent +import com.netflix.spinnaker.cats.provider.ProviderRegistry +import com.netflix.spinnaker.clouddriver.cache.CustomScheduledAgent +import com.netflix.spinnaker.clouddriver.core.provider.CoreProvider +import com.netflix.spinnaker.config.ConnectionPools +import com.netflix.spinnaker.kork.sql.routing.withPool +import org.jooq.DSLContext +import org.jooq.Field +import org.jooq.impl.DSL.field +import org.jooq.impl.DSL.table +import org.slf4j.LoggerFactory +import java.sql.SQLException +import java.util.concurrent.TimeUnit + +/** + * Intermittently scans the entire database looking for records created by caching agents that + * are no longer configured. + */ +class SqlCacheCleanupAgent( + private val providerRegistry: ProviderRegistry, + private val jooq: DSLContext, + private val registry: Registry, + private val sqlNames: SqlNames +) : RunnableAgent, CustomScheduledAgent { + + private val log by lazy { LoggerFactory.getLogger(javaClass) } + + private val deletedId = registry.createId("sql.cacheCleanupAgent.dataTypeRecordsDeleted") + private val timingId = registry.createId("sql.cacheCleanupAgent.dataTypeCleanupDuration") + + override fun run() { + log.info("Scanning for cache records to cleanup") + + val (agentTypes, agentDataTypes) = findAgentDataTypes() + val runState = RunState(agentTypes) + + val numDataTypes = agentDataTypes.size + log.info("Found {} cache data types generated from {} agent types", numDataTypes, agentTypes.size) + + var failures = 0 + withPool(ConnectionPools.CACHE_WRITER.value) { + agentDataTypes.forEachIndexed { i, dataType -> + log.info("Scanning '$dataType' (${i + 1}/$numDataTypes) cache records to cleanup") + try { + registry.timer(timingId.withTag("dataType", dataType)).record { + cleanTable(CacheTable.RELATIONSHIP, dataType, runState) + cleanTable(CacheTable.RESOURCE, dataType, runState) + } + } catch (e: SQLException) { + log.error("Failed to cleanup '$dataType'", e) + failures++ + } + } + } + + log.info("Finished cleanup ($failures failures)") + } + + /** + * If the table for [dataType] has not been touched yet, scan through each record it contains, + * deleting all records that do not correlate to a currently configured agent. + */ + private fun cleanTable(cacheTable: CacheTable, dataType: String, state: RunState) { + val tableName = cacheTable.getName(sqlNames, dataType) + + if (state.touchedTables.contains(tableName)) { + // Nothing to do here, we've already processed this table. + return + } + log.debug("Cleaning table '$tableName' for '$dataType'") + + val rs = jooq.select(*cacheTable.fields) + .from(table(tableName)) + .fetch() + .intoResultSet() + + val cleanedAgentTypes = mutableSetOf() + val idsToClean = mutableListOf() + while (rs.next()) { + val agentType = rs.getString(2) + if (!state.agentTypes.contains(agentType)) { + idsToClean.add(rs.getString(1)) + cleanedAgentTypes.add(agentType) + } + } + + if (idsToClean.isNotEmpty()) { + log.info( + "Found ${idsToClean.size} records to cleanup from '$tableName' for data type '$dataType'. " + + "Reason: Data generated by unknown caching agents ($cleanedAgentTypes})" + ) + idsToClean.chunked(100) { chunk -> + jooq.deleteFrom(table(tableName)) + .where("${cacheTable.idColumn()} in (${chunk.joinToString(",") { "'$it'" }})") + .execute() + } + } + + state.touchedTables.add(tableName) + + registry + .counter(deletedId.withTags("dataType", dataType, "table", cacheTable.name)) + .increment(idsToClean.size.toLong()) + } + + /** + * Returns a set of all known caching agent names and another set of all known authoritative + * data types from those caching agents. + * + * Agent names will be used to identify what records in the database are no longer attached + * to existing caching agents, whereas the data types themselves are needed to create the + * SQL table names, as the tables are derived from the data types, not the agents. + */ + private fun findAgentDataTypes(): Pair, Set> { + val agents = providerRegistry.providers + .flatMap { it.agents } + .filterIsInstance() + + val dataTypes = agents + .flatMap { it.providedDataTypes } + .filter { it.authority == AUTHORITATIVE } + .map { it.typeName } + .toSet() + + return Pair(agents.map { it.agentType }.toSet(), dataTypes) + } + + /** + * Contains per-run state of this cleanup agent. + */ + private data class RunState( + val agentTypes: Set, + val touchedTables: MutableList = mutableListOf() + ) + + /** + * Abstracts the logical differences--as far as this agent is concerned--between the two + * varieties of cache tables: The table names and the associated fields we need to read + * from the database. + */ + private enum class CacheTable(val fields: Array>) { + RESOURCE(arrayOf(field("id"), field("agent"))), + RELATIONSHIP(arrayOf(field("uuid"), field("rel_agent"))); + + fun idColumn(): String = + when (this) { + RESOURCE -> "id" + RELATIONSHIP -> "uuid" + } + + fun getName(sqlNames: SqlNames, dataType: String): String = + when (this) { + RESOURCE -> sqlNames.resourceTableName(dataType) + RELATIONSHIP -> sqlNames.relTableName(dataType) + } + } + + override fun getProviderName(): String = CoreProvider.PROVIDER_NAME + override fun getPollIntervalMillis(): Long = DEFAULT_POLL_INTERVAL + override fun getTimeoutMillis(): Long = DEFAULT_TIMEOUT + override fun getAgentType(): String = javaClass.simpleName + + companion object { + private val DEFAULT_POLL_INTERVAL = TimeUnit.MINUTES.toMillis(2) + private val DEFAULT_TIMEOUT = TimeUnit.MINUTES.toMillis(1) + } +} diff --git a/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlNamedCacheFactory.kt b/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlNamedCacheFactory.kt index 161a6a70337..d9496f388f3 100644 --- a/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlNamedCacheFactory.kt +++ b/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlNamedCacheFactory.kt @@ -21,7 +21,6 @@ class SqlNamedCacheFactory( private val cacheMetrics: SqlCacheMetrics, private val dynamicConfigService: DynamicConfigService, private val sqlConstraints: SqlConstraints - ) : NamedCacheFactory { @ExperimentalContracts diff --git a/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlNames.kt b/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlNames.kt new file mode 100644 index 00000000000..4de0c1ebba2 --- /dev/null +++ b/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlNames.kt @@ -0,0 +1,80 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.cats.sql.cache + +import com.google.common.hash.Hashing +import com.netflix.spinnaker.config.SqlConstraints + +/** + * Provides utility methods for clouddriver's SQL naming conventions. + */ +class SqlNames( + private val tableNamespace: String? = null, + private val sqlConstraints: SqlConstraints = SqlConstraints() +) { + + /** + * Get the resource table name for a given agent type. + */ + fun resourceTableName(type: String): String = + checkTableName("cats_v${schemaVersion}_", sanitizeType(type), "") + + /** + * Get the relationship table name for a given agent type. + */ + fun relTableName(type: String): String = + checkTableName("cats_v${schemaVersion}_", sanitizeType(type), "_rel") + + private fun sanitizeType(type: String): String { + return type.replace(typeSanitization, "_") + } + + /** + * Computes the actual name of the table less than MAX_TABLE_NAME_LENGTH characters long. + * It always keeps prefix with tableNamespace but can shorten name and suffix in that order. + * @return computed table name + */ + private fun checkTableName(prefix: String, name: String, suffix: String): String { + var base = prefix + if (tableNamespace != null) { + base = "${prefix + tableNamespace}_" + } + + // Optimistic and most frequent case + val tableName = base + name + suffix + if (tableName.length < sqlConstraints.maxTableNameLength) { + return tableName + } + + // Hash the name and keep the suffix + val hash = Hashing.murmur3_128().hashBytes((name + suffix).toByteArray()).toString().substring(0..15) + val available = sqlConstraints.maxTableNameLength - base.length - suffix.length - hash.length - 1 + if (available >= 0) { + return base + name.substring(0..available) + hash + suffix + } + + // Remove suffix + if (available + suffix.length >= 0) { + return base + name.substring(0..(available + suffix.length)) + hash + } + throw IllegalArgumentException("property sql.table-namespace $tableNamespace is too long") + } + + companion object { + private val schemaVersion = SqlSchemaVersion.current() + private val typeSanitization = """[^A-Za-z0-9_]""".toRegex() + } +} diff --git a/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlCacheConfiguration.kt b/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlCacheConfiguration.kt index b31dc5d0de3..594a0f1fa12 100644 --- a/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlCacheConfiguration.kt +++ b/cats/cats-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlCacheConfiguration.kt @@ -11,11 +11,14 @@ import com.netflix.spinnaker.cats.cluster.DefaultNodeStatusProvider import com.netflix.spinnaker.cats.cluster.NodeStatusProvider import com.netflix.spinnaker.cats.module.CatsModule import com.netflix.spinnaker.cats.provider.Provider +import com.netflix.spinnaker.cats.provider.ProviderRegistry import com.netflix.spinnaker.cats.sql.SqlProviderRegistry import com.netflix.spinnaker.cats.sql.cache.SpectatorSqlCacheMetrics +import com.netflix.spinnaker.cats.sql.cache.SqlCacheCleanupAgent import com.netflix.spinnaker.cats.sql.cache.SqlCacheMetrics import com.netflix.spinnaker.cats.sql.cache.SqlCleanupStaleOnDemandCachesAgent import com.netflix.spinnaker.cats.sql.cache.SqlNamedCacheFactory +import com.netflix.spinnaker.cats.sql.cache.SqlNames import com.netflix.spinnaker.cats.sql.cache.SqlTableMetricsAgent import com.netflix.spinnaker.clouddriver.cache.CustomSchedulableAgentIntervalProvider import com.netflix.spinnaker.clouddriver.cache.EurekaStatusNodeStatusProvider @@ -37,7 +40,6 @@ import org.springframework.context.annotation.Bean import org.springframework.context.annotation.ComponentScan import org.springframework.context.annotation.Configuration import org.springframework.context.annotation.Import -import java.lang.IllegalArgumentException import java.time.Clock import java.time.Duration import java.util.Optional @@ -161,6 +163,17 @@ class SqlCacheConfiguration { ): SqlCleanupStaleOnDemandCachesAgent = SqlCleanupStaleOnDemandCachesAgent(applicationContext, registry, clock) + @Bean + @ConditionalOnExpression("\${sql.read-only:false} == false and \${sql.cache-cleanup-agent.enabled:false}") + fun sqlCacheCleanupAgent( + providerRegistry: ProviderRegistry, + jooq: DSLContext, + registry: Registry, + sqlConstraints: SqlConstraints, + @Value("\${sql.table-namespace:#{null}}") tableNamespace: String? + ): SqlCacheCleanupAgent = + SqlCacheCleanupAgent(providerRegistry, jooq, registry, SqlNames(tableNamespace, sqlConstraints)) + @Bean @ConditionalOnExpression("\${sql.read-only:false} == false") fun sqlAgentProvider( diff --git a/cats/cats-sql/src/test/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlCacheCleanupAgentTest.kt b/cats/cats-sql/src/test/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlCacheCleanupAgentTest.kt new file mode 100644 index 00000000000..ecd33730ddb --- /dev/null +++ b/cats/cats-sql/src/test/kotlin/com/netflix/spinnaker/cats/sql/cache/SqlCacheCleanupAgentTest.kt @@ -0,0 +1,243 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.spinnaker.cats.sql.cache + +import com.netflix.spectator.api.NoopRegistry +import com.netflix.spinnaker.cats.agent.CachingAgent +import com.netflix.spinnaker.cats.cache.DefaultCacheData +import com.netflix.spinnaker.cats.mem.InMemoryNamedCacheFactory +import com.netflix.spinnaker.cats.provider.DefaultProviderRegistry +import com.netflix.spinnaker.cats.provider.ProviderRegistry +import com.netflix.spinnaker.cats.test.TestAgent +import com.netflix.spinnaker.cats.test.TestProvider +import com.netflix.spinnaker.clouddriver.core.provider.agent.Namespace.INSTANCES +import com.netflix.spinnaker.clouddriver.core.provider.agent.Namespace.SERVER_GROUPS +import com.netflix.spinnaker.kork.sql.test.SqlTestUtil +import de.huxhorn.sulky.ulid.ULID +import dev.minutest.junit.JUnit5Minutests +import dev.minutest.rootContext +import org.jooq.impl.DSL.field +import org.jooq.impl.DSL.table +import strikt.api.expect +import strikt.api.expectThat +import strikt.assertions.get +import strikt.assertions.hasSize +import strikt.assertions.isEqualTo + +class SqlCacheCleanupAgentTest : JUnit5Minutests { + + fun tests() = rootContext { + fixture { Fixture() } + + after { + SqlTestUtil.cleanupDb(dslContext) + dslContext.close() + } + + context("test and prod accounts exist") { + deriveFixture { + fixture.providerAgents.addAll(listOf( + testCachingAgent(), + prodCachingAgent() + )) + seedDatabase(includeTestAccount = true, includeProdAccount = true) + fixture + } + + test("nothing happens") { + expect { + that(selectAllResources()).describedAs("initial resources").hasSize(2) + that(selectAllRels()).describedAs("initial relationships").hasSize(2) + } + + subject.run() + + expect { + that(selectAllResources()).describedAs("modified resources").hasSize(2) + that(selectAllRels()).describedAs("modified relationships").hasSize(2) + } + } + + context("test account is removed") { + modifyFixture { + fixture.providerAgents.removeIf { it.scope == "test" } + } + + before { subject.run() } + + test("relationships referencing old data are deleted") { + expectThat(selectAllResources()) + .hasSize(1)[0].isEqualTo("aws:instances:prod:us-east-1:i-abcd1234") + } + + test("resources referencing old data are deleted") { + expectThat(selectAllRels()) + .hasSize(1)[0].isEqualTo("aws:serverGroups:myapp-prod:prod:us-east-1:myapp-prod-v000") + } + } + } + } + + private inner class Fixture { + val testDatabase = SqlTestUtil.initTcMysqlDatabase() + val dslContext = testDatabase.context + + val providerAgents: MutableList = mutableListOf() + val providerRegistry: ProviderRegistry = DefaultProviderRegistry( + listOf(TestProvider(providerAgents as Collection)), + InMemoryNamedCacheFactory() + ) + val registry = NoopRegistry() + + val subject = SqlCacheCleanupAgent(providerRegistry, dslContext, registry, SqlNames()) + + fun seedDatabase(includeTestAccount: Boolean, includeProdAccount: Boolean) { + SqlNames().run { + val resource = resourceTableName("instances") + val rel = relTableName("instances") + dslContext.execute("CREATE TABLE IF NOT EXISTS $resource LIKE cats_v1_resource_template") + dslContext.execute("CREATE TABLE IF NOT EXISTS $rel LIKE cats_v1_rel_template") + } + + dslContext.insertInto(table("cats_v1_instances")) + .columns( + field("id"), field("agent"), field("application"), field("body_hash"), field("body"), field("last_updated") + ) + .let { + if (includeProdAccount) { + it + .values( + "aws:instances:prod:us-east-1:i-abcd1234", + "prod/TestAgent", + "myapp", + "", + "", + System.currentTimeMillis() + ) + } else { + it + } + } + .let { + if (includeTestAccount) { + it + .values( + "aws:instances:test:us-east-1:i-abcd1234", + "test/TestAgent", + "myapp", + "", + "", + System.currentTimeMillis() + ) + } else { + it + } + } + .execute() + + dslContext.insertInto(table("cats_v1_instances_rel")) + .columns( + field("uuid"), + field("id"), + field("rel_id"), + field("rel_agent"), + field("rel_type"), + field("last_updated") + ) + .let { + if (includeProdAccount) { + it + .values( + ULID().nextULID(), + "aws:instances:prod:us-east-1:i-abcd1234", + "aws:serverGroups:myapp-prod:prod:us-east-1:myapp-prod-v000", + "prod/TestAgent", + "serverGroups", + System.currentTimeMillis() + ) + } else { + it + } + } + .let { + if (includeTestAccount) { + it + .values( + ULID().nextULID(), + "aws:instances:test:us-east-1:i-abcd1234", + "aws:serverGroups:myapp-test:test:us-east-1:myapp-test-v000", + "test/TestAgent", + "serverGroups", + System.currentTimeMillis() + ) + } else { + it + } + } + .execute() + } + + fun testCachingAgent(): TestAgent = + TestAgent().also { + it.scope = "test" + it.types = setOf(INSTANCES.ns, SERVER_GROUPS.ns) + it.authoritative = setOf(INSTANCES.ns) + it.results = mapOf( + INSTANCES.ns to listOf( + DefaultCacheData( + "aws:instances:test:us-east-1:i-abcd1234", + mapOf(), + mapOf( + SERVER_GROUPS.ns to listOf( + "aws:serverGroups:myapp-test:test:us-east-1:myapp-test-v000" + ) + ) + ) + ) + ) + } + + fun prodCachingAgent(): TestAgent = + TestAgent().also { + it.scope = "prod" + it.types = setOf(INSTANCES.ns, SERVER_GROUPS.ns) + it.authoritative = setOf(INSTANCES.ns) + it.results = mapOf( + INSTANCES.ns to listOf( + DefaultCacheData( + "aws:instances:prod:us-east-1:i-abcd1234", + mapOf(), + mapOf( + SERVER_GROUPS.ns to listOf( + "aws:serverGroups:myapp-prod:prod:us-east-1:myapp-prod-v000" + ) + ) + ) + ) + ) + } + + fun selectAllResources(): List = + dslContext.select(field("id")) + .from(table(SqlNames().resourceTableName("instances"))) + .fetch(0, String::class.java) + + fun selectAllRels(): List = + dslContext.select(field("rel_id")) + .from(table(SqlNames().relTableName("instances"))) + .fetch(0, String::class.java) + } +} diff --git a/gradle/kotlin-test.gradle b/gradle/kotlin-test.gradle new file mode 100644 index 00000000000..10b8fe89e80 --- /dev/null +++ b/gradle/kotlin-test.gradle @@ -0,0 +1,44 @@ +/* + * Copyright 2019 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: "kotlin" + +dependencies { + testImplementation "org.jetbrains.kotlin:kotlin-stdlib-jdk8:1.3.41" + + testImplementation "org.junit.jupiter:junit-jupiter-api" + testImplementation "org.junit.platform:junit-platform-runner" + testImplementation "org.assertj:assertj-core" + testImplementation "io.strikt:strikt-core" + testImplementation "dev.minutest:minutest" + testImplementation "io.mockk:mockk" + + testRuntimeOnly "org.junit.platform:junit-platform-launcher" + testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine" +} + +test { + useJUnitPlatform { + includeEngines "junit-jupiter" + } +} + +compileTestKotlin { + kotlinOptions { + languageVersion = "1.3" + jvmTarget = "1.8" + } +}