Skip to content

Commit

Permalink
feat(sql): Add cache cleanup agent for removed accounts
Browse files Browse the repository at this point in the history
Adds a background agent that regularly scans the database for cache records that are
related to caching agents that do not exist anymore, and deletes them. This addresses
spinnaker/spinnaker#4803
  • Loading branch information
robzienert committed Jan 8, 2020
1 parent 91e85ad commit 73ee311
Show file tree
Hide file tree
Showing 8 changed files with 603 additions and 82 deletions.
6 changes: 1 addition & 5 deletions cats/cats-sql/cats-sql.gradle
Expand Up @@ -15,9 +15,9 @@
*/

apply from: "$rootDir/gradle/kotlin.gradle"
apply from: "$rootDir/gradle/kotlin-test.gradle"
apply plugin: "groovy"


tasks.compileGroovy.enabled = false

dependencies {
Expand All @@ -27,10 +27,6 @@ dependencies {
implementation project(":clouddriver-security")
implementation project(":clouddriver-sql")

compileOnly "org.projectlombok:lombok"
annotationProcessor "org.projectlombok:lombok"
testAnnotationProcessor "org.projectlombok:lombok"

implementation "com.fasterxml.jackson.core:jackson-databind"
implementation "com.netflix.spectator:spectator-api"
implementation "com.netflix.spinnaker.fiat:fiat-api:$fiatVersion"
Expand Down
@@ -1,7 +1,6 @@
package com.netflix.spinnaker.cats.sql.cache

import com.fasterxml.jackson.databind.ObjectMapper
import com.google.common.hash.Hashing
import com.netflix.spinnaker.cats.cache.CacheData
import com.netflix.spinnaker.cats.cache.CacheFilter
import com.netflix.spinnaker.cats.cache.DefaultCacheData
Expand Down Expand Up @@ -52,7 +51,7 @@ class SqlCache(
private val coroutineContext: CoroutineContext?,
private val clock: Clock,
private val sqlRetryProperties: SqlRetryProperties,
private val tableNamespace: String?,
tableNamespace: String?,
private val cacheMetrics: SqlCacheMetrics,
private val dynamicConfigService: DynamicConfigService,
private val sqlConstraints: SqlConstraints
Expand All @@ -64,11 +63,12 @@ class SqlCache(
private val schemaVersion = SqlSchemaVersion.current()
private val useRegexp = """.*[\?\[].*""".toRegex()
private val cleanRegexp = """\.+\*""".toRegex()
private val typeSanitization = """[^A-Za-z0-9_]""".toRegex()

private val log = LoggerFactory.getLogger(SqlCache::class.java)
}

private val sqlNames = SqlNames(tableNamespace, sqlConstraints)

private var createdTables = ConcurrentSkipListSet<String>()

init {
Expand All @@ -90,7 +90,7 @@ class SqlCache(
try {
ids.chunked(dynamicConfigService.getConfig(Int::class.java, "sql.cache.read-batch-size", 500)) { chunk ->
withRetry(RetryCategory.WRITE) {
jooq.deleteFrom(table(resourceTableName(type)))
jooq.deleteFrom(table(sqlNames.resourceTableName(type)))
.where("id in (${chunk.joinToString(",") { "'$it'" }})")
.execute()
}
Expand Down Expand Up @@ -338,7 +338,7 @@ class SqlCache(
val ids = try {
withRetry(RetryCategory.READ) {
jooq.select(field("id"))
.from(table(resourceTableName(type)))
.from(table(sqlNames.resourceTableName(type)))
.fetch()
.intoSet(field("id"), String::class.java)
}
Expand Down Expand Up @@ -423,9 +423,9 @@ class SqlCache(

val sql = if (glob.matches(useRegexp)) {
val filter = glob.replace("?", ".", true).replace("*", ".*").replace(cleanRegexp, ".*")
"SELECT id FROM ${resourceTableName(type)} WHERE id REGEXP '^$filter\$'"
"SELECT id FROM ${sqlNames.resourceTableName(type)} WHERE id REGEXP '^$filter\$'"
} else {
"SELECT id FROM ${resourceTableName(type)} WHERE id LIKE '${glob.replace('*', '%')}'"
"SELECT id FROM ${sqlNames.resourceTableName(type)} WHERE id LIKE '${glob.replace('*', '%')}'"
}

val ids = try {
Expand Down Expand Up @@ -474,7 +474,7 @@ class SqlCache(
fun cleanOnDemand(maxAgeMs: Long): Int {
val toClean = withRetry(RetryCategory.READ) {
jooq.select(field("id"))
.from(table(resourceTableName(onDemandType)))
.from(table(sqlNames.resourceTableName(onDemandType)))
.where(field("last_updated").lt(clock.millis() - maxAgeMs))
.fetch()
.into(String::class.java)
Expand Down Expand Up @@ -559,7 +559,7 @@ class SqlCache(
toStore.chunked(dynamicConfigService.getConfig(Int::class.java, "sql.cache.write-batch-size", 100)) { chunk ->
try {
val insert = jooq.insertInto(
table(resourceTableName(type)),
table(sqlNames.resourceTableName(type)),
field("id"),
field("agent"),
field("application"),
Expand Down Expand Up @@ -592,15 +592,15 @@ class SqlCache(
val exists = withRetry(RetryCategory.READ) {
jooq.fetchExists(
jooq.select()
.from(resourceTableName(type))
.from(sqlNames.resourceTableName(type))
.where(field("id").eq(it), field("agent").eq(agent))
.forUpdate()
)
}
result.selectQueries.incrementAndGet()
if (exists) {
withRetry(RetryCategory.WRITE) {
jooq.update(table(resourceTableName(type)))
jooq.update(table(sqlNames.resourceTableName(type)))
.set(field("application"), apps[it])
.set(field("body_hash"), hashes[it])
.set(field("body"), bodies[it])
Expand All @@ -613,7 +613,7 @@ class SqlCache(
} else {
withRetry(RetryCategory.WRITE) {
jooq.insertInto(
table(resourceTableName(type)),
table(sqlNames.resourceTableName(type)),
field("id"),
field("agent"),
field("application"),
Expand Down Expand Up @@ -745,7 +745,7 @@ class SqlCache(
pointers.chunked(dynamicConfigService.getConfig(Int::class.java, "sql.cache.write-batch-size", 100)) { chunk ->
try {
val insert = jooq.insertInto(
table(relTableName(type)),
table(sqlNames.relTableName(type)),
field("uuid"),
field("id"),
field("rel_id"),
Expand Down Expand Up @@ -775,7 +775,7 @@ class SqlCache(
.chunked(dynamicConfigService.getConfig(Int::class.java, "sql.cache.write-batch-size", 100)) { chunk ->
try {
val insert = jooq.insertInto(
table(relTableName(relType)),
table(sqlNames.relTableName(relType)),
field("uuid"),
field("id"),
field("rel_id"),
Expand Down Expand Up @@ -813,7 +813,7 @@ class SqlCache(
try {
fwdToDelete.forEach {
withRetry(RetryCategory.WRITE) {
jooq.deleteFrom(table(relTableName(type)))
jooq.deleteFrom(table(sqlNames.relTableName(type)))
.where(field("uuid").eq(it.value))
.execute()
}
Expand All @@ -822,7 +822,7 @@ class SqlCache(
revToDelete.forEach {
if (oldRevIdsToType.getOrDefault(it.key, "").isNotBlank()) {
withRetry(RetryCategory.WRITE) {
jooq.deleteFrom(table(relTableName(oldRevIdsToType[it.key]!!)))
jooq.deleteFrom(table(sqlNames.relTableName(oldRevIdsToType[it.key]!!)))
.where(field("uuid").eq(it.value))
.execute()
}
Expand All @@ -843,9 +843,9 @@ class SqlCache(
if (!createdTables.contains(type)) {
try {
withRetry(RetryCategory.WRITE) {
jooq.execute("CREATE TABLE IF NOT EXISTS ${resourceTableName(type)} " +
jooq.execute("CREATE TABLE IF NOT EXISTS ${sqlNames.resourceTableName(type)} " +
"LIKE cats_v${schemaVersion}_resource_template")
jooq.execute("CREATE TABLE IF NOT EXISTS ${relTableName(type)} " +
jooq.execute("CREATE TABLE IF NOT EXISTS ${sqlNames.relTableName(type)} " +
"LIKE cats_v${schemaVersion}_rel_template")
}

Expand All @@ -858,9 +858,9 @@ class SqlCache(
// TODO not sure if best schema for onDemand
try {
withRetry(RetryCategory.WRITE) {
jooq.execute("CREATE TABLE IF NOT EXISTS ${resourceTableName(onDemandType)} " +
jooq.execute("CREATE TABLE IF NOT EXISTS ${sqlNames.resourceTableName(onDemandType)} " +
"LIKE cats_v${schemaVersion}_resource_template")
jooq.execute("CREATE TABLE IF NOT EXISTS ${relTableName(onDemandType)} " +
jooq.execute("CREATE TABLE IF NOT EXISTS ${sqlNames.relTableName(onDemandType)} " +
"LIKE cats_v${schemaVersion}_rel_template")
}

Expand All @@ -884,47 +884,6 @@ class SqlCache(
}
}

private fun resourceTableName(type: String): String =
checkTableName("cats_v${schemaVersion}_", sanitizeType(type), "")

private fun relTableName(type: String): String =
checkTableName("cats_v${schemaVersion}_", sanitizeType(type), "_rel")

private fun sanitizeType(type: String): String {
return type.replace(typeSanitization, "_")
}

/**
* Computes the actual name of the table less than MAX_TABLE_NAME_LENGTH characters long.
* It always keeps prefix with tablenamespace but can shorten name and suffix in that order.
* @return computed table name
*/
private fun checkTableName(prefix: String, name: String, suffix: String): String {
var base = prefix
if (tableNamespace != null) {
base = "${prefix + tableNamespace}_"
}

// Optimistic and most frequent case
val tableName = base + name + suffix
if (tableName.length < sqlConstraints.maxTableNameLength) {
return tableName
}

// Hash the name and keep the suffix
val hash = Hashing.murmur3_128().hashBytes((name + suffix).toByteArray()).toString().substring(0..15)
val available = sqlConstraints.maxTableNameLength - base.length - suffix.length - hash.length - 1
if (available >= 0) {
return base + name.substring(0..available) + hash + suffix
}

// Remove suffix
if (available + suffix.length >= 0) {
return base + name.substring(0..(available + suffix.length)) + hash
}
throw IllegalArgumentException("property sql.table-namespace $tableNamespace is too long")
}

private fun getHash(body: String?): String? {
if (body.isNullOrBlank()) {
return null
Expand All @@ -945,7 +904,7 @@ class SqlCache(
return withRetry(RetryCategory.READ) {
jooq
.select(field("body_hash"), field("id"))
.from(table(resourceTableName(type)))
.from(table(sqlNames.resourceTableName(type)))
.where(
field("agent").eq(agent)
)
Expand All @@ -958,7 +917,7 @@ class SqlCache(
return withRetry(RetryCategory.READ) {
jooq
.select(field("uuid"), field("id"), field("rel_id"), field("rel_agent"))
.from(table(relTableName(type)))
.from(table(sqlNames.relTableName(type)))
.where(field("rel_agent").eq(sourceAgent))
.fetch()
.into(RelId::class.java)
Expand All @@ -969,7 +928,7 @@ class SqlCache(
return withRetry(RetryCategory.READ) {
jooq
.select(field("uuid"), field("id"), field("rel_id"), field("rel_agent"))
.from(table(relTableName(type)))
.from(table(sqlNames.relTableName(type)))
.where(
field("rel_agent").eq(sourceAgent),
field("rel_type").eq(origType)
Expand Down Expand Up @@ -998,7 +957,7 @@ class SqlCache(
withRetry(RetryCategory.READ) {
cacheData.addAll(
jooq.select(field("body"))
.from(table(resourceTableName(type)))
.from(table(sqlNames.resourceTableName(type)))
.fetch()
.getValues(0)
.asSequence()
Expand Down Expand Up @@ -1060,7 +1019,7 @@ class SqlCache(
withRetry(RetryCategory.READ) {
cacheData.addAll(
jooq.select(field("body"))
.from(table(resourceTableName(type)))
.from(table(sqlNames.resourceTableName(type)))
.where(field("application").eq(application))
.fetch()
.getValues(0)
Expand Down Expand Up @@ -1119,7 +1078,7 @@ class SqlCache(
field(sql("null")).`as`("rel_id"),
field(sql("null")).`as`("rel_type")
)
.from(table(resourceTableName(type)))
.from(table(sqlNames.resourceTableName(type)))
.where(field("application").eq(application))
.unionAll(
jooq.select(
Expand All @@ -1128,8 +1087,8 @@ class SqlCache(
field("rel.rel_id").`as`("rel_id"),
field("rel.rel_type").`as`("rel_type")
)
.from(table(resourceTableName(type)).`as`("r"))
.innerJoin(table(relTableName(type)).`as`("rel"))
.from(table(sqlNames.resourceTableName(type)).`as`("r"))
.innerJoin(table(sqlNames.relTableName(type)).`as`("rel"))
.on(sql("rel.id=r.id"))
.where(relWhere)
.groupBy(
Expand Down Expand Up @@ -1204,15 +1163,15 @@ class SqlCache(
field(sql("null")).`as`("rel_id"),
field(sql("null")).`as`("rel_type")
)
.from(table(resourceTableName(type)))
.from(table(sqlNames.resourceTableName(type)))
.unionAll(
jooq.select(
field(sql("null")).`as`("body"),
field("id").`as`("id"),
field("rel_id").`as`("rel_id"),
field("rel_type").`as`("rel_type")
)
.from(table(relTableName(type)))
.from(table(sqlNames.relTableName(type)))
.where(relWhere)
)
.fetch()
Expand Down Expand Up @@ -1275,7 +1234,7 @@ class SqlCache(
private fun selectBodies(type: String, ids: List<String>): Collection<CacheData> {
return withRetry(RetryCategory.READ) {
jooq.select(field("body"))
.from(table(resourceTableName(type)))
.from(table(sqlNames.resourceTableName(type)))
.where("ID in (${ids.joinToString(",") { "'$it'" }})")
.fetch()
.getValues(0)
Expand All @@ -1301,7 +1260,7 @@ class SqlCache(
field(sql("null")).`as`("rel_id"),
field(sql("null")).`as`("rel_type")
)
.from(table(resourceTableName(type)))
.from(table(sqlNames.resourceTableName(type)))
.where(where)
.unionAll(
jooq.select(
Expand All @@ -1310,7 +1269,7 @@ class SqlCache(
field("rel_id").`as`("rel_id"),
field("rel_type").`as`("rel_type")
)
.from(table(relTableName(type)))
.from(table(sqlNames.relTableName(type)))
.where(relWhere)
)
.fetch()
Expand All @@ -1321,7 +1280,7 @@ class SqlCache(
private fun selectIdentifiers(type: String, ids: List<String>): MutableCollection<String> {
return withRetry(RetryCategory.READ) {
jooq.select(field("id"))
.from(table(resourceTableName(type)))
.from(table(sqlNames.resourceTableName(type)))
.where("id in (${ids.joinToString(",") { "'$it'" }})")
.fetch()
.intoSet(field("id"), String::class.java)
Expand Down

0 comments on commit 73ee311

Please sign in to comment.