Skip to content

Commit

Permalink
feat(sql): add support for PostgreSQL (#5097)
Browse files Browse the repository at this point in the history
* feat(sql): support for Postgres

* feat(sql): Update field types

* feat(sql): default limits per dialect

* feat(sql): add postgres test

* feat(sql): fix Sql scheduler, fix migration checksum

* feat(sql): simplify test case

* feat(sql): fix hardcoded mysql

* feat(sql): fix Unit and formatting
  • Loading branch information
ncknt committed Nov 19, 2020
1 parent 4e0e57c commit 9adb9c7
Show file tree
Hide file tree
Showing 22 changed files with 404 additions and 95 deletions.
2 changes: 2 additions & 0 deletions cats/cats-sql/cats-sql.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ dependencies {
testImplementation "org.spockframework:spock-spring"
testImplementation "org.springframework:spring-test"
testImplementation "org.testcontainers:mysql"
testImplementation "org.testcontainers:postgresql"
testImplementation "mysql:mysql-connector-java"
testImplementation "org.postgresql:postgresql"
}

test {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2020 Armory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.cats.sql

import org.jooq.DSLContext
import org.jooq.Field
import org.jooq.SQLDialect
import org.jooq.impl.DSL
import java.sql.ResultSet

object SqlUtil {

fun createTableLike(jooq: DSLContext, baseName: String, template: String) {
when (jooq.dialect()) {
SQLDialect.POSTGRES ->
jooq.execute("CREATE TABLE IF NOT EXISTS $baseName (LIKE $template INCLUDING ALL)")
else ->
jooq.execute(
"CREATE TABLE IF NOT EXISTS $baseName LIKE $template"
)
}
}

fun getTablesLike(jooq: DSLContext, baseName: String): ResultSet {
return when (jooq.dialect()) {
SQLDialect.POSTGRES ->
jooq.select(DSL.field("tablename"))
.from(DSL.table("pg_catalog.pg_tables"))
.where(DSL.field("tablename").like("$baseName%"))
.fetch()
.intoResultSet()
else ->
jooq.fetch("show tables like '$baseName%'").intoResultSet()
}
}

fun <T> excluded(values: Field<T>): Field<T> {
return DSL.field("excluded.{0}", values.dataType, values)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.netflix.spinnaker.cats.cache.CacheFilter
import com.netflix.spinnaker.cats.cache.DefaultJsonCacheData
import com.netflix.spinnaker.cats.cache.RelationshipCacheFilter
import com.netflix.spinnaker.cats.cache.WriteableCache
import com.netflix.spinnaker.cats.sql.SqlUtil
import com.netflix.spinnaker.clouddriver.core.provider.agent.Namespace.ON_DEMAND
import com.netflix.spinnaker.config.SqlConstraints
import com.netflix.spinnaker.config.coroutineThreadPrefix
Expand Down Expand Up @@ -35,6 +36,7 @@ import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.runBlocking
import org.jooq.Condition
import org.jooq.DSLContext
import org.jooq.SQLDialect
import org.jooq.exception.DataAccessException
import org.jooq.exception.SQLDialectNotSupportedException
import org.jooq.impl.DSL.field
Expand Down Expand Up @@ -582,13 +584,24 @@ class SqlCache(
insert.apply {
chunk.forEach {
values(it, sqlNames.checkAgentName(agent), apps[it], hashes[it], bodies[it], now)
when (jooq.dialect()) {
SQLDialect.POSTGRES ->
onConflict(field("id"), field("agent"))
.doUpdate()
.set(field("application"), SqlUtil.excluded(field("application")) as Any)
.set(field("body_hash"), SqlUtil.excluded(field("body_hash")) as Any)
.set(field("body"), SqlUtil.excluded(field("body")) as Any)
.set(field("last_updated"), SqlUtil.excluded(field("last_updated")) as Any)
.execute()
else ->
onDuplicateKeyUpdate()
.set(field("application"), MySQLDSL.values(field("application")) as Any)
.set(field("body_hash"), MySQLDSL.values(field("body_hash")) as Any)
.set(field("body"), MySQLDSL.values(field("body")) as Any)
.set(field("last_updated"), MySQLDSL.values(field("last_updated")) as Any)
.execute()
}
}

onDuplicateKeyUpdate()
.set(field("application"), MySQLDSL.values(field("application")) as Any)
.set(field("body_hash"), MySQLDSL.values(field("body_hash")) as Any)
.set(field("body"), MySQLDSL.values(field("body")) as Any)
.set(field("last_updated"), MySQLDSL.values(field("last_updated")) as Any)
}

withRetry(RetryCategory.WRITE) {
Expand Down Expand Up @@ -854,14 +867,8 @@ class SqlCache(
if (!createdTables.contains(type)) {
try {
withRetry(RetryCategory.WRITE) {
jooq.execute(
"CREATE TABLE IF NOT EXISTS ${sqlNames.resourceTableName(type)} " +
"LIKE cats_v${schemaVersion}_resource_template"
)
jooq.execute(
"CREATE TABLE IF NOT EXISTS ${sqlNames.relTableName(type)} " +
"LIKE cats_v${schemaVersion}_rel_template"
)
SqlUtil.createTableLike(jooq, sqlNames.resourceTableName(type), "cats_v${schemaVersion}_resource_template")
SqlUtil.createTableLike(jooq, sqlNames.relTableName(type), "cats_v${schemaVersion}_rel_template")
}

createdTables.add(type)
Expand All @@ -873,14 +880,8 @@ class SqlCache(
// TODO not sure if best schema for onDemand
try {
withRetry(RetryCategory.WRITE) {
jooq.execute(
"CREATE TABLE IF NOT EXISTS ${sqlNames.resourceTableName(onDemandType)} " +
"LIKE cats_v${schemaVersion}_resource_template"
)
jooq.execute(
"CREATE TABLE IF NOT EXISTS ${sqlNames.relTableName(onDemandType)} " +
"LIKE cats_v${schemaVersion}_rel_template"
)
SqlUtil.createTableLike(jooq, sqlNames.resourceTableName(onDemandType), "cats_v${schemaVersion}_resource_template")
SqlUtil.createTableLike(jooq, sqlNames.relTableName(onDemandType), "cats_v${schemaVersion}_rel_template")
}

createdTables.add(onDemandType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import com.netflix.spinnaker.kork.annotations.VisibleForTesting
*/
class SqlNames(
private val tableNamespace: String? = null,
private val sqlConstraints: SqlConstraints = SqlConstraints()
private val sqlConstraints: SqlConstraints
) {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package com.netflix.spinnaker.cats.sql.cache

import com.netflix.spectator.api.Registry
import com.netflix.spinnaker.cats.agent.RunnableAgent
import com.netflix.spinnaker.cats.sql.SqlUtil
import com.netflix.spinnaker.clouddriver.cache.CustomScheduledAgent
import com.netflix.spinnaker.clouddriver.core.provider.CoreProvider
import com.netflix.spinnaker.clouddriver.sql.SqlAgent
import java.time.Clock
import java.util.concurrent.TimeUnit
import org.jooq.DSLContext
import org.jooq.SQLDialect
import org.jooq.impl.DSL.field
import org.jooq.impl.DSL.table
import org.slf4j.LoggerFactory

Expand Down Expand Up @@ -41,8 +44,7 @@ class SqlTableMetricsAgent(
"cats_v${SqlSchemaVersion.current()}_${namespace}_"
}

val rs = jooq.fetch("show tables like '$baseName%'").intoResultSet()

val rs = SqlUtil.getTablesLike(jooq, baseName)
while (rs.next()) {
val tableName = rs.getString(1)
val type = tableName.replace(baseName, "")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.netflix.spinnaker.cats.agent.AgentDataType.Authority.AUTHORITATIVE
import com.netflix.spinnaker.cats.agent.CachingAgent
import com.netflix.spinnaker.cats.agent.RunnableAgent
import com.netflix.spinnaker.cats.provider.ProviderRegistry
import com.netflix.spinnaker.cats.sql.SqlUtil
import com.netflix.spinnaker.clouddriver.cache.CustomScheduledAgent
import com.netflix.spinnaker.clouddriver.core.provider.CoreProvider
import com.netflix.spinnaker.clouddriver.sql.SqlAgent
Expand Down Expand Up @@ -91,7 +92,8 @@ class SqlUnknownAgentCleanupAgent(
}
log.debug("Checking table '$tableName' for '$dataType' data cleanup")

val tableExists = jooq.fetch("show tables like '$tableName'").intoResultSet()
val tableExists = SqlUtil.getTablesLike(jooq, tableName)

if (!tableExists.next()) {
log.debug("Table '$tableName' not found")
state.touchedTables.add(tableName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.netflix.spinnaker.cats.cluster.AgentIntervalProvider
import com.netflix.spinnaker.cats.cluster.NodeIdentity
import com.netflix.spinnaker.cats.cluster.NodeStatusProvider
import com.netflix.spinnaker.cats.module.CatsModuleAware
import com.netflix.spinnaker.cats.sql.SqlUtil
import com.netflix.spinnaker.config.ConnectionPools
import com.netflix.spinnaker.kork.dynamicconfig.DynamicConfigService
import com.netflix.spinnaker.kork.sql.routing.withPool
Expand Down Expand Up @@ -84,7 +85,7 @@ class SqlClusteredAgentScheduler(
init {
if (!tableNamespace.isNullOrBlank()) {
withPool(POOL_NAME) {
jooq.execute("CREATE TABLE IF NOT EXISTS $lockTable LIKE $referenceTable")
SqlUtil.createTableLike(jooq, lockTable, referenceTable)
}
}

Expand Down Expand Up @@ -182,18 +183,19 @@ class SqlClusteredAgentScheduler(

val now = System.currentTimeMillis()
while (existingLocks.next()) {
if (now > existingLocks.getLong("lock_expiry")) {
val lockExpiry = existingLocks.getLong("lock_expiry")
if (now > lockExpiry) {
try {
jooq.deleteFrom(table(lockTable))
.where(
field("agent_name").eq(existingLocks.getString("agent_name"))
.and(field("lock_expiry").eq(existingLocks.getString("lock_expiry")))
.and(field("lock_expiry").eq(lockExpiry))
)
.execute()
} catch (e: SQLException) {
log.error(
"Failed deleting agent lock ${existingLocks.getString("agent_name")} with expiry " +
existingLocks.getString("lock_expiry"),
lockExpiry,
e
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.netflix.spinnaker.cats.sql.controllers

import com.netflix.spinnaker.cats.sql.SqlUtil
import com.netflix.spinnaker.cats.sql.cache.SqlSchemaVersion
import com.netflix.spinnaker.fiat.shared.FiatPermissionEvaluator
import com.netflix.spinnaker.kork.sql.config.SqlProperties
import com.netflix.spinnaker.security.AuthenticatedRequest
import java.sql.DriverManager
import org.jooq.SQLDialect
import org.jooq.impl.DSL
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
Expand Down Expand Up @@ -48,11 +48,10 @@ class CatsSqlAdminController(
)

val tablesTruncated = mutableListOf<String>()
val sql = "show tables like 'cats_v${SqlSchemaVersion.current()}_${truncateNamespace}_%'"

conn.use { c ->
val jooq = DSL.using(c, SQLDialect.MYSQL)
val rs = jooq.fetch(sql).intoResultSet()
val jooq = DSL.using(c, properties.getDefaultConnectionPoolProperties().dialect)
val rs = SqlUtil.getTablesLike(jooq, "cats_v${SqlSchemaVersion.current()}_${truncateNamespace}_")

while (rs.next()) {
val table = rs.getString(1)
Expand Down Expand Up @@ -83,11 +82,10 @@ class CatsSqlAdminController(
)

val tablesDropped = mutableListOf<String>()
val sql = "show tables like 'cats_v${SqlSchemaVersion.current()}_${dropNamespace}_%'"

conn.use { c ->
val jooq = DSL.using(c, SQLDialect.MYSQL)
val rs = jooq.fetch(sql).intoResultSet()
val jooq = DSL.using(c, properties.getDefaultConnectionPoolProperties().dialect)
val rs = SqlUtil.getTablesLike(jooq, "cats_v${SqlSchemaVersion.current()}_${dropNamespace}_")

while (rs.next()) {
val table = rs.getString(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const val coroutineThreadPrefix = "catsSql"
@Configuration
@ConditionalOnProperty("sql.cache.enabled")
@Import(DefaultSqlConfiguration::class)
@EnableConfigurationProperties(SqlAgentProperties::class, SqlConstraints::class)
@EnableConfigurationProperties(SqlAgentProperties::class, SqlConstraintsProperties::class)
@ComponentScan("com.netflix.spinnaker.cats.sql.controllers")
class SqlCacheConfiguration {

Expand Down Expand Up @@ -79,6 +79,10 @@ class SqlCacheConfiguration {
.build(providers)
}

@Bean
fun sqlConstraints(jooq: DSLContext, sqlConstraintsProperties: SqlConstraintsProperties): SqlConstraints =
SqlConstraints(SqlConstraintsInitializer.getDefaultSqlConstraints(jooq.dialect()), sqlConstraintsProperties)

/**
* sql.cache.async.poolSize: If set to a positive integer, a fixed thread pool of this size is created
* as part of a coroutineContext. If sql.cache.maxQueryConcurrency is also >1 (default value: 4),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,38 @@

package com.netflix.spinnaker.config

import org.jooq.SQLDialect
import org.springframework.boot.context.properties.ConfigurationProperties

class SqlConstraints(
val maxTableNameLength: Int,
val maxIdLength: Int,
val maxAgentLength: Int,
) {

constructor(defaultConstraints: SqlConstraints, constraintsProperties: SqlConstraintsProperties) : this(
constraintsProperties.maxTableNameLength ?: defaultConstraints.maxTableNameLength,
constraintsProperties.maxIdLength ?: defaultConstraints.maxIdLength,
constraintsProperties.maxAgentLength ?: defaultConstraints.maxAgentLength
)
}

@ConfigurationProperties("sql.constraints")
class SqlConstraints {
var maxTableNameLength: Int = 64
// 352 * 2 + 64 (max rel_type length) == 768; 768 * 4 (utf8mb4) == 3072 == Aurora's max index length
var maxIdLength: Int = 352
var maxAgentLength: Int = 127
class SqlConstraintsProperties {
var maxTableNameLength: Int? = null
var maxIdLength: Int? = null
var maxAgentLength: Int? = null
}

object SqlConstraintsInitializer {

fun getDefaultSqlConstraints(dialect: SQLDialect): SqlConstraints =
when(dialect) {
SQLDialect.POSTGRES ->
// https://www.postgresql.org/docs/current/limits.html
SqlConstraints(63, Int.MAX_VALUE, Int.MAX_VALUE)
else ->
// 352 * 2 + 64 (max rel_type length) == 768; 768 * 4 (utf8mb4) == 3072 == Aurora's max index length
SqlConstraints(64, 352, 127)
}
}
Loading

0 comments on commit 9adb9c7

Please sign in to comment.