Skip to content

Commit

Permalink
feat(sql): Cleanup insert/update usage
Browse files Browse the repository at this point in the history
Also add theoretical support for databases that do not support
`onDuplicateKeyUpdate()`.
  • Loading branch information
ajordens committed Jul 8, 2019
1 parent afdc6f7 commit e135fb2
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import com.netflix.spinnaker.security.AuthenticatedRequest
import org.jooq.DSLContext
import org.jooq.impl.DSL
import org.jooq.impl.DSL.*
import org.jooq.util.mysql.MySQLDSL
import org.slf4j.LoggerFactory
import java.time.Clock

import com.netflix.spinnaker.front50.model.ObjectType.*
import com.netflix.spinnaker.front50.model.sql.*
import com.netflix.spinnaker.kork.sql.config.SqlRetryProperties
import org.jooq.exception.SQLDialectNotSupportedException
import kotlin.system.measureTimeMillis

class SqlStorageService(
Expand Down Expand Up @@ -141,37 +141,88 @@ class SqlStorageService(

try {
jooq.transactional(sqlRetryProperties.transactions) { ctx ->
val insert = ctx.insertInto(
table(definitionsByType[objectType]!!.tableName),
definitionsByType[objectType]!!.getFields()
)

insert.apply {
values(definitionsByType[objectType]!!.getValues(objectMapper, objectKey, item))
val insertPairs = definitionsByType[objectType]!!.getInsertPairs(objectMapper, objectKey, item)
val updatePairs = definitionsByType[objectType]!!.getUpdatePairs(insertPairs)

try {
ctx
.insertInto(
table(definitionsByType[objectType]!!.tableName),
*insertPairs.keys.map { DSL.field(it) }.toTypedArray()
)
.values(insertPairs.values)
.onDuplicateKeyUpdate()
.set(updatePairs.mapKeys { DSL.field(it.key) })
.execute()
} catch (e: SQLDialectNotSupportedException) {
val exists = jooq.withRetry(sqlRetryProperties.reads) {
jooq.fetchExists(
jooq.select()
.from(definitionsByType[objectType]!!.tableName)
.where(field("id").eq(objectKey).and(field("is_deleted").eq(false)))
.forUpdate()
)
}

onDuplicateKeyUpdate()
.set(field("body"), MySQLDSL.values(field("body")) as Any)
.set(field("last_modified_at"), MySQLDSL.values(field("last_modified_at")) as Any)
.set(field("last_modified_by"), MySQLDSL.values(field("last_modified_by")) as Any)
.set(field("is_deleted"), MySQLDSL.values(field("is_deleted")) as Any)
.apply(definitionsByType[objectType]!!.onDuplicateKeyUpdate())
if (exists) {
jooq.withRetry(sqlRetryProperties.transactions) {
jooq
.update(table(definitionsByType[objectType]!!.tableName)).apply {
updatePairs.forEach { k, v ->
set(field(k), v)
}
}
.set(field("id"), objectKey) // satisfy jooq fluent interface
.where(field("id").eq(objectKey))
.execute()
}
} else {
jooq.withRetry(sqlRetryProperties.transactions) {
jooq
.insertInto(
table(definitionsByType[objectType]!!.tableName),
*insertPairs.keys.map { DSL.field(it) }.toTypedArray()
)
.values(insertPairs.values)
.execute()
}
}
}

insert.execute()

if (definitionsByType[objectType]!!.supportsHistory) {
val historicalInsert = ctx.insertInto(
table(definitionsByType[objectType]!!.historyTableName),
definitionsByType[objectType]!!.getHistoryFields()
)

historicalInsert.apply {
values(definitionsByType[objectType]!!.getHistoryValues(objectMapper, clock, objectKey, item))
val historyPairs = definitionsByType[objectType]!!.getHistoryPairs(objectMapper, clock, objectKey, item)

onDuplicateKeyIgnore()
try {
ctx
.insertInto(
table(definitionsByType[objectType]!!.historyTableName),
*historyPairs.keys.map { DSL.field(it) }.toTypedArray()
)
.values(historyPairs.values)
.onDuplicateKeyIgnore()
.execute()
} catch (e: SQLDialectNotSupportedException) {
val exists = jooq.withRetry(sqlRetryProperties.reads) {
jooq.fetchExists(
jooq.select()
.from(definitionsByType[objectType]!!.historyTableName)
.where(field("id").eq(objectKey).and(field("body_sig").eq(historyPairs.getValue("body_sig"))))
.forUpdate()
)
}

if (!exists) {
jooq.withRetry(sqlRetryProperties.transactions) {
jooq
.insertInto(
table(definitionsByType[objectType]!!.historyTableName),
*historyPairs.keys.map { DSL.field(it) }.toTypedArray()
)
.values(historyPairs.values)
.execute()
}
}
}

historicalInsert.execute()
}
}
} catch (e: Exception) {
Expand Down
Loading

0 comments on commit e135fb2

Please sign in to comment.