Skip to content

Commit

Permalink
perf(sql): Support for bulk sql imports (#828)
Browse files Browse the repository at this point in the history
This PR also adjusts the bulk import behavior for entity
tags to avoid a one-by-one fetch after the import succeeds.

Given our use-cases, I believe this unnecessarily adds to the
import time.
  • Loading branch information
ajordens committed May 20, 2020
1 parent 92676fd commit f48a7d0
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.front50.model;

import java.util.Collection;

public interface BulkStorageService {
<T extends Timestamped> void storeObjects(ObjectType objectType, Collection<T> items);
}
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,20 @@ public void bulkImport(Collection<T> items) {
User authenticatedUser = new User();
authenticatedUser.setUsername(AuthenticatedRequest.getSpinnakerUser().orElse("anonymous"));

if (service instanceof BulkStorageService) {
String lastModifiedBy = AuthenticatedRequest.getSpinnakerUser().orElse("anonymous");
Long lastModified = System.currentTimeMillis();

items.forEach(
item -> {
item.setLastModifiedBy(lastModifiedBy);
item.setLastModified(lastModified);
});

((BulkStorageService) service).storeObjects(objectType, items);
return;
}

Observable.from(items)
.buffer(10)
.flatMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class SqlStorageService(
private val sqlRetryProperties: SqlRetryProperties,
private val chunkSize: Int,
private val poolName: String
) : StorageService, AdminOperations {
) : StorageService, BulkStorageService, AdminOperations {

companion object {
private val log = LoggerFactory.getLogger(SqlStorageService::class.java)
Expand Down Expand Up @@ -165,6 +165,65 @@ class SqlStorageService(
}
}

override fun <T : Timestamped> storeObjects(objectType: ObjectType, allItems: Collection<T>) {
// using a lower `chunkSize` to avoid exceeding default packet size limits.
allItems.chunked(100).forEach { items ->
try {
jooq.transactional(sqlRetryProperties.transactions) { ctx ->
try {
ctx.batch(
items.map { item ->
val insertPairs = definitionsByType[objectType]!!.getInsertPairs(
objectMapper, item.id.toLowerCase(), item
)
val updatePairs = definitionsByType[objectType]!!.getUpdatePairs(insertPairs)

ctx.insertInto(
table(definitionsByType[objectType]!!.tableName),
*insertPairs.keys.map { DSL.field(it) }.toTypedArray()
)
.values(insertPairs.values)
.onDuplicateKeyUpdate()
.set(updatePairs.mapKeys { DSL.field(it.key) })
}
).execute()
} catch (e: SQLDialectNotSupportedException) {
for (item in items) {
storeSingleObject(objectType, item.id.toLowerCase(), item)
}
}

if (definitionsByType[objectType]!!.supportsHistory) {
try {
ctx.batch(
items.map { item ->
val historyPairs = definitionsByType[objectType]!!.getHistoryPairs(
objectMapper, clock, item.id.toLowerCase(), item
)

ctx
.insertInto(
table(definitionsByType[objectType]!!.historyTableName),
*historyPairs.keys.map { DSL.field(it) }.toTypedArray()
)
.values(historyPairs.values)
.onDuplicateKeyIgnore()
}
).execute()
} catch (e: SQLDialectNotSupportedException) {
for (item in items) {
storeSingleObjectHistory(objectType, item.id.toLowerCase(), item)
}
}
}
}
} catch (e: Exception) {
log.error("Unable to store objects (objectType: {}, objectKeys: {})", objectType, items.map { it.id })
throw e
}
}
}

override fun <T : Timestamped> storeObject(objectType: ObjectType, objectKey: String, item: T) {
item.lastModifiedBy = AuthenticatedRequest.getSpinnakerUser().orElse("anonymous")

Expand All @@ -186,38 +245,7 @@ class SqlStorageService(
.set(updatePairs.mapKeys { DSL.field(it.key) })
.execute()
} catch (e: SQLDialectNotSupportedException) {
val exists = jooq.withRetry(sqlRetryProperties.reads) {
jooq.fetchExists(
jooq.select()
.from(definitionsByType[objectType]!!.tableName)
.where(field("id").eq(objectKey).and(field("is_deleted").eq(false)))
.forUpdate()
)
}

if (exists) {
jooq.withRetry(sqlRetryProperties.transactions) {
jooq
.update(table(definitionsByType[objectType]!!.tableName)).apply {
updatePairs.forEach { k, v ->
set(field(k), v)
}
}
.set(field("id"), objectKey) // satisfy jooq fluent interface
.where(field("id").eq(objectKey))
.execute()
}
} else {
jooq.withRetry(sqlRetryProperties.transactions) {
jooq
.insertInto(
table(definitionsByType[objectType]!!.tableName),
*insertPairs.keys.map { DSL.field(it) }.toTypedArray()
)
.values(insertPairs.values)
.execute()
}
}
storeSingleObject(objectType, objectKey, item)
}

if (definitionsByType[objectType]!!.supportsHistory) {
Expand All @@ -233,26 +261,7 @@ class SqlStorageService(
.onDuplicateKeyIgnore()
.execute()
} catch (e: SQLDialectNotSupportedException) {
val exists = jooq.withRetry(sqlRetryProperties.reads) {
jooq.fetchExists(
jooq.select()
.from(definitionsByType[objectType]!!.historyTableName)
.where(field("id").eq(objectKey).and(field("body_sig").eq(historyPairs.getValue("body_sig"))))
.forUpdate()
)
}

if (!exists) {
jooq.withRetry(sqlRetryProperties.transactions) {
jooq
.insertInto(
table(definitionsByType[objectType]!!.historyTableName),
*historyPairs.keys.map { DSL.field(it) }.toTypedArray()
)
.values(historyPairs.values)
.execute()
}
}
storeSingleObjectHistory(objectType, objectKey, item)
}
}
}
Expand Down Expand Up @@ -370,4 +379,67 @@ class SqlStorageService(
}
log.info("Object ${operation.objectType}:${operation.objectId} was recovered")
}

private fun storeSingleObject(objectType: ObjectType, objectKey: String, item: Timestamped) {
val insertPairs = definitionsByType[objectType]!!.getInsertPairs(objectMapper, objectKey, item)
val updatePairs = definitionsByType[objectType]!!.getUpdatePairs(insertPairs)

val exists = jooq.withRetry(sqlRetryProperties.reads) {
jooq.fetchExists(
jooq.select()
.from(definitionsByType[objectType]!!.tableName)
.where(field("id").eq(objectKey).and(field("is_deleted").eq(false)))
.forUpdate()
)
}

if (exists) {
jooq.withRetry(sqlRetryProperties.transactions) {
jooq
.update(table(definitionsByType[objectType]!!.tableName)).apply {
updatePairs.forEach { k, v ->
set(field(k), v)
}
}
.set(field("id"), objectKey) // satisfy jooq fluent interface
.where(field("id").eq(objectKey))
.execute()
}
} else {
jooq.withRetry(sqlRetryProperties.transactions) {
jooq
.insertInto(
table(definitionsByType[objectType]!!.tableName),
*insertPairs.keys.map { DSL.field(it) }.toTypedArray()
)
.values(insertPairs.values)
.execute()
}
}
}

private fun storeSingleObjectHistory(objectType: ObjectType, objectKey: String, item: Timestamped) {
val historyPairs = definitionsByType[objectType]!!.getHistoryPairs(objectMapper, clock, objectKey, item)

val exists = jooq.withRetry(sqlRetryProperties.reads) {
jooq.fetchExists(
jooq.select()
.from(definitionsByType[objectType]!!.historyTableName)
.where(field("id").eq(objectKey).and(field("body_sig").eq(historyPairs.getValue("body_sig"))))
.forUpdate()
)
}

if (!exists) {
jooq.withRetry(sqlRetryProperties.transactions) {
jooq
.insertInto(
table(definitionsByType[objectType]!!.historyTableName),
*historyPairs.keys.map { DSL.field(it) }.toTypedArray()
)
.values(historyPairs.values)
.execute()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class EntityTagsController {
}

taggedEntityDAO.bulkImport(tags)
return findAllByIds(tags.findResults { it.id })
return tags;
}

@RequestMapping(value = "/batchDelete", method = RequestMethod.POST)
Expand All @@ -107,12 +107,4 @@ class EntityTagsController {
taggedEntityDAO.delete(tagId)
response.setStatus(HttpStatus.NO_CONTENT.value())
}

private Set<EntityTags> findAllByIds(Collection<String> ids) {
return ids.findResults {
try {
taggedEntityDAO.findById(it)
} catch (ignored) {}
}
}
}

0 comments on commit f48a7d0

Please sign in to comment.