From e135fb215c452aa9d2c4e6914c14c0bb6bbeb1bf Mon Sep 17 00:00:00 2001 From: Adam Jordens Date: Fri, 21 Jun 2019 13:56:31 -0700 Subject: [PATCH] feat(sql): Cleanup insert/update usage Also add theoretical support for databases that do not support `onDuplicateKeyUpdate()`. --- .../front50/model/SqlStorageService.kt | 103 ++++++-- .../front50/model/sql/TableDefinitions.kt | 224 +++++++----------- 2 files changed, 157 insertions(+), 170 deletions(-) diff --git a/front50-sql/src/main/kotlin/com/netflix/spinnaker/front50/model/SqlStorageService.kt b/front50-sql/src/main/kotlin/com/netflix/spinnaker/front50/model/SqlStorageService.kt index f89e182a6..d98466773 100644 --- a/front50-sql/src/main/kotlin/com/netflix/spinnaker/front50/model/SqlStorageService.kt +++ b/front50-sql/src/main/kotlin/com/netflix/spinnaker/front50/model/SqlStorageService.kt @@ -23,13 +23,13 @@ import com.netflix.spinnaker.security.AuthenticatedRequest import org.jooq.DSLContext import org.jooq.impl.DSL import org.jooq.impl.DSL.* -import org.jooq.util.mysql.MySQLDSL import org.slf4j.LoggerFactory import java.time.Clock import com.netflix.spinnaker.front50.model.ObjectType.* import com.netflix.spinnaker.front50.model.sql.* import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties +import org.jooq.exception.SQLDialectNotSupportedException import kotlin.system.measureTimeMillis class SqlStorageService( @@ -141,37 +141,88 @@ class SqlStorageService( try { jooq.transactional(sqlRetryProperties.transactions) { ctx -> - val insert = ctx.insertInto( - table(definitionsByType[objectType]!!.tableName), - definitionsByType[objectType]!!.getFields() - ) - - insert.apply { - values(definitionsByType[objectType]!!.getValues(objectMapper, objectKey, item)) + val insertPairs = definitionsByType[objectType]!!.getInsertPairs(objectMapper, objectKey, item) + val updatePairs = definitionsByType[objectType]!!.getUpdatePairs(insertPairs) + + try { + ctx + .insertInto( + table(definitionsByType[objectType]!!.tableName), + *insertPairs.keys.map { DSL.field(it) }.toTypedArray() + ) + .values(insertPairs.values) + .onDuplicateKeyUpdate() + .set(updatePairs.mapKeys { DSL.field(it.key) }) + .execute() + } catch (e: SQLDialectNotSupportedException) { + val exists = jooq.withRetry(sqlRetryProperties.reads) { + jooq.fetchExists( + jooq.select() + .from(definitionsByType[objectType]!!.tableName) + .where(field("id").eq(objectKey).and(field("is_deleted").eq(false))) + .forUpdate() + ) + } - onDuplicateKeyUpdate() - .set(field("body"), MySQLDSL.values(field("body")) as Any) - .set(field("last_modified_at"), MySQLDSL.values(field("last_modified_at")) as Any) - .set(field("last_modified_by"), MySQLDSL.values(field("last_modified_by")) as Any) - .set(field("is_deleted"), MySQLDSL.values(field("is_deleted")) as Any) - .apply(definitionsByType[objectType]!!.onDuplicateKeyUpdate()) + if (exists) { + jooq.withRetry(sqlRetryProperties.transactions) { + jooq + .update(table(definitionsByType[objectType]!!.tableName)).apply { + updatePairs.forEach { k, v -> + set(field(k), v) + } + } + .set(field("id"), objectKey) // satisfy jooq fluent interface + .where(field("id").eq(objectKey)) + .execute() + } + } else { + jooq.withRetry(sqlRetryProperties.transactions) { + jooq + .insertInto( + table(definitionsByType[objectType]!!.tableName), + *insertPairs.keys.map { DSL.field(it) }.toTypedArray() + ) + .values(insertPairs.values) + .execute() + } + } } - insert.execute() - if (definitionsByType[objectType]!!.supportsHistory) { - val historicalInsert = ctx.insertInto( - table(definitionsByType[objectType]!!.historyTableName), - definitionsByType[objectType]!!.getHistoryFields() - ) - - historicalInsert.apply { - values(definitionsByType[objectType]!!.getHistoryValues(objectMapper, clock, objectKey, item)) + val historyPairs = definitionsByType[objectType]!!.getHistoryPairs(objectMapper, clock, objectKey, item) - onDuplicateKeyIgnore() + try { + ctx + .insertInto( + table(definitionsByType[objectType]!!.historyTableName), + *historyPairs.keys.map { DSL.field(it) }.toTypedArray() + ) + .values(historyPairs.values) + .onDuplicateKeyIgnore() + .execute() + } catch (e: SQLDialectNotSupportedException) { + val exists = jooq.withRetry(sqlRetryProperties.reads) { + jooq.fetchExists( + jooq.select() + .from(definitionsByType[objectType]!!.historyTableName) + .where(field("id").eq(objectKey).and(field("body_sig").eq(historyPairs.getValue("body_sig")))) + .forUpdate() + ) + } + + if (!exists) { + jooq.withRetry(sqlRetryProperties.transactions) { + jooq + .insertInto( + table(definitionsByType[objectType]!!.historyTableName), + *historyPairs.keys.map { DSL.field(it) }.toTypedArray() + ) + .values(historyPairs.values) + .execute() + } + } } - - historicalInsert.execute() } } } catch (e: Exception) { diff --git a/front50-sql/src/main/kotlin/com/netflix/spinnaker/front50/model/sql/TableDefinitions.kt b/front50-sql/src/main/kotlin/com/netflix/spinnaker/front50/model/sql/TableDefinitions.kt index fa25ad16e..41d5612d0 100644 --- a/front50-sql/src/main/kotlin/com/netflix/spinnaker/front50/model/sql/TableDefinitions.kt +++ b/front50-sql/src/main/kotlin/com/netflix/spinnaker/front50/model/sql/TableDefinitions.kt @@ -23,11 +23,6 @@ import com.netflix.spinnaker.front50.model.Timestamped import com.netflix.spinnaker.front50.model.delivery.Delivery import com.netflix.spinnaker.front50.model.pipeline.Pipeline import com.netflix.spinnaker.front50.model.project.Project -import org.jooq.Field -import org.jooq.InsertOnDuplicateSetMoreStep -import org.jooq.Record -import org.jooq.impl.DSL -import org.jooq.util.mysql.MySQLDSL import java.nio.charset.StandardCharsets.UTF_8 import java.time.Clock @@ -36,189 +31,130 @@ open class DefaultTableDefinition( val tableName: String, val supportsHistory: Boolean ) { - val historyTableName:String + val historyTableName: String get() = "${tableName}_history" - open fun getFields(): List> { - return listOf( - DSL.field("id"), - DSL.field("body"), - DSL.field("created_at"), - DSL.field("last_modified_at"), - DSL.field("last_modified_by"), - DSL.field("is_deleted") - ) - } - - open fun getValues(objectMapper: ObjectMapper, objectKey: String, item: Timestamped): List { + open fun getInsertPairs(objectMapper: ObjectMapper, objectKey: String, item: Timestamped): Map { val objectAsString = objectMapper.writeValueAsString(item) - return listOf( - objectKey, objectAsString, item.lastModified, item.lastModified, item.lastModifiedBy, false - ) - } - - fun getHistoryFields(): List> { - return listOf( - DSL.field("id"), - DSL.field("body"), - DSL.field("body_sig"), - DSL.field("last_modified_at"), - DSL.field("recorded_at") + return mapOf( + "id" to objectKey, + "body" to objectAsString, + "created_at" to item.lastModified, + "last_modified_at" to item.lastModified, + "last_modified_by" to item.lastModifiedBy, + "is_deleted" to false ) } - fun getHistoryValues(objectMapper: ObjectMapper, - clock: Clock, - objectKey: String, - item: Timestamped): List { + fun getHistoryPairs(objectMapper: ObjectMapper, + clock: Clock, + objectKey: String, + item: Timestamped): Map { val objectAsString = objectMapper.writeValueAsString(item) val signature = Hashing.murmur3_128().newHasher().putString(objectAsString, UTF_8).hash().toString() - return listOf( - objectKey, objectAsString, signature, item.lastModified, clock.millis() + return mapOf( + "id" to objectKey, + "body" to objectAsString, + "body_sig" to signature, + "last_modified_at" to item.lastModified, + "recorded_at" to clock.millis() ) } - open fun onDuplicateKeyUpdate(): InsertOnDuplicateSetMoreStep.() -> Unit { - return {} + open fun getUpdatePairs(insertPairs: Map): Map { + return mapOf( + "body" to insertPairs.getValue("body"), + "last_modified_at" to insertPairs.getValue("last_modified_at"), + "last_modified_by" to insertPairs.getValue("last_modified_by"), + "is_deleted" to insertPairs.getValue("is_deleted") + ) } } class DeliveryTableDefinition : DefaultTableDefinition(ObjectType.DELIVERY, "deliveries", true) { - override fun getFields(): List> { - return listOf( - DSL.field("id"), - DSL.field("application"), - DSL.field("body"), - DSL.field("created_at"), - DSL.field("last_modified_at"), - DSL.field("last_modified_by"), - DSL.field("is_deleted") - ) - } - - override fun getValues(objectMapper: ObjectMapper, objectKey: String, item: Timestamped): List { + override fun getInsertPairs(objectMapper: ObjectMapper, objectKey: String, item: Timestamped): Map { val objectAsString = objectMapper.writeValueAsString(item) - return listOf( - objectKey, - (item as Delivery).application, - objectAsString, - item.lastModified, - item.lastModified, - item.lastModifiedBy, - false + return mapOf( + "id" to objectKey, + "application" to (item as Delivery).application, + "body" to objectAsString, + "created_at" to item.lastModified, + "last_modified_at" to item.lastModified, + "last_modified_by" to item.lastModifiedBy, + "is_deleted" to false ) } - override fun onDuplicateKeyUpdate(): InsertOnDuplicateSetMoreStep.() -> Unit { - return { - set(DSL.field("application"), MySQLDSL.values(DSL.field("application")) as Any) - } + override fun getUpdatePairs(insertPairs: Map): Map { + return super.getUpdatePairs(insertPairs) + mapOf("application" to insertPairs.getValue("application")) } } class PipelineTableDefinition : DefaultTableDefinition(ObjectType.PIPELINE, "pipelines", true) { - override fun getFields(): List> { - return listOf( - DSL.field("id"), - DSL.field("name"), - DSL.field("application"), - DSL.field("body"), - DSL.field("created_at"), - DSL.field("last_modified_at"), - DSL.field("last_modified_by"), - DSL.field("is_deleted") - ) - } - - override fun getValues(objectMapper: ObjectMapper, objectKey: String, item: Timestamped): List { + override fun getInsertPairs(objectMapper: ObjectMapper, objectKey: String, item: Timestamped): Map { val objectAsString = objectMapper.writeValueAsString(item) - return listOf( - objectKey, - (item as Pipeline).name, - (item as Pipeline).application, - objectAsString, - item.lastModified, - item.lastModified, - item.lastModifiedBy, - false + return mapOf( + "id" to objectKey, + "name" to (item as Pipeline).name, + "application" to (item as Pipeline).application, + "body" to objectAsString, + "created_at" to item.lastModified, + "last_modified_at" to item.lastModified, + "last_modified_by" to item.lastModifiedBy, + "is_deleted" to false ) } - override fun onDuplicateKeyUpdate(): InsertOnDuplicateSetMoreStep.() -> Unit { - return { - set(DSL.field("name"), MySQLDSL.values(DSL.field("name")) as Any) - set(DSL.field("application"), MySQLDSL.values(DSL.field("application")) as Any) - } + override fun getUpdatePairs(insertPairs: Map): Map { + return super.getUpdatePairs(insertPairs) + mapOf( + "name" to insertPairs.getValue("name"), + "application" to insertPairs.getValue("application") + ) } } class PipelineStrategyTableDefinition : DefaultTableDefinition(ObjectType.STRATEGY, "pipeline_strategies", true) { - override fun getFields(): List> { - return listOf( - DSL.field("id"), - DSL.field("name"), - DSL.field("application"), - DSL.field("body"), - DSL.field("created_at"), - DSL.field("last_modified_at"), - DSL.field("last_modified_by"), - DSL.field("is_deleted") - ) - } - - override fun getValues(objectMapper: ObjectMapper, objectKey: String, item: Timestamped): List { + override fun getInsertPairs(objectMapper: ObjectMapper, objectKey: String, item: Timestamped): Map { val objectAsString = objectMapper.writeValueAsString(item) - return listOf( - objectKey, - (item as Pipeline).name, - (item as Pipeline).application, - objectAsString, - item.lastModified, - item.lastModified, - item.lastModifiedBy, - false + return mapOf( + "id" to objectKey, + "name" to (item as Pipeline).name, + "application" to (item as Pipeline).application, + "body" to objectAsString, + "created_at" to item.lastModified, + "last_modified_at" to item.lastModified, + "last_modified_by" to item.lastModifiedBy, + "is_deleted" to false ) } - override fun onDuplicateKeyUpdate(): InsertOnDuplicateSetMoreStep.() -> Unit { - return { - set(DSL.field("name"), MySQLDSL.values(DSL.field("name")) as Any) - set(DSL.field("application"), MySQLDSL.values(DSL.field("application")) as Any) - } + override fun getUpdatePairs(insertPairs: Map): Map { + return super.getUpdatePairs(insertPairs) + mapOf( + "name" to insertPairs.getValue("name"), + "application" to insertPairs.getValue("application") + ) } } class ProjectTableDefinition : DefaultTableDefinition(ObjectType.PROJECT, "projects", true) { - override fun getFields(): List> { - return listOf( - DSL.field("id"), - DSL.field("name"), - DSL.field("body"), - DSL.field("created_at"), - DSL.field("last_modified_at"), - DSL.field("last_modified_by"), - DSL.field("is_deleted") - ) - } - - override fun getValues(objectMapper: ObjectMapper, objectKey: String, item: Timestamped): List { + override fun getInsertPairs(objectMapper: ObjectMapper, objectKey: String, item: Timestamped): Map { val objectAsString = objectMapper.writeValueAsString(item) - return listOf( - objectKey, - (item as Project).name, - objectAsString, - item.lastModified, - item.lastModified, - item.lastModifiedBy, - false + return mapOf( + "id" to objectKey, + "name" to (item as Project).name, + "body" to objectAsString, + "created_at" to item.lastModified, + "last_modified_at" to item.lastModified, + "last_modified_by" to item.lastModifiedBy, + "is_deleted" to false ) } - override fun onDuplicateKeyUpdate(): InsertOnDuplicateSetMoreStep.() -> Unit { - return { - set(DSL.field("name"), MySQLDSL.values(DSL.field("name")) as Any) - } + override fun getUpdatePairs(insertPairs: Map): Map { + return super.getUpdatePairs(insertPairs) + mapOf( + "name" to insertPairs.getValue("name") + ) } }