From c5b55476e094b91258024c1e1c8bc96210801764 Mon Sep 17 00:00:00 2001 From: Asher Feldman Date: Wed, 29 May 2019 10:53:57 -0700 Subject: [PATCH] feat(sql): batch fetch stages (#2941) --- .../spinnaker/config/SqlConfiguration.kt | 3 +- .../netflix/spinnaker/config/SqlProperties.kt | 3 +- .../pipeline/persistence/ExecutionMapper.kt | 59 ++++++++++++++----- .../persistence/SqlExecutionRepository.kt | 5 +- .../orca/sql/pipeline/persistence/jooq.kt | 8 +-- ...CleanupPollingNotificationAgentSpec.groovy | 2 +- .../SqlExecutionRepositorySpec.groovy | 4 +- 7 files changed, 58 insertions(+), 26 deletions(-) diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt index 83b5dfb59e..2288cd4fac 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlConfiguration.kt @@ -112,7 +112,8 @@ class SqlConfiguration { dsl, mapper, properties.transactionRetry, - properties.batchReadSize + properties.batchReadSize, + properties.stageReadSize ), registry ) diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlProperties.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlProperties.kt index b98e097751..255281b124 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlProperties.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/config/SqlProperties.kt @@ -33,7 +33,8 @@ data class SqlProperties( var connectionPool: ConnectionPoolProperties = ConnectionPoolProperties(), var transactionRetry: TransactionRetryProperties = TransactionRetryProperties(), var partitionName: String? = null, - var batchReadSize: Int = 10 + var batchReadSize: Int = 10, + var stageReadSize: Int = 200 ) data class MigrationProperties( diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt index 18e343cae4..8cbdf9a970 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/ExecutionMapper.kt @@ -30,39 +30,68 @@ import java.sql.ResultSet * in this mapper as well. */ class ExecutionMapper( - private val mapper: ObjectMapper + private val mapper: ObjectMapper, + private val stageBatchSize: Int ) { private val log = LoggerFactory.getLogger(javaClass) fun map(rs: ResultSet, context: DSLContext): Collection { val results = mutableListOf() + val executionMap = mutableMapOf() + val legacyMap = mutableMapOf() while (rs.next()) { mapper.readValue(rs.getString("body")) - .also { execution -> - context.selectExecutionStages(execution.type, rs.getString("id")).let { stageResultSet -> - while (stageResultSet.next()) { - mapStage(stageResultSet, execution) - } + .also { + execution -> results.add(execution) + + if (rs.getString("id") != execution.id) { + // Map legacyId executions to their current ULID + legacyMap[execution.id] = rs.getString("id") + executionMap[rs.getString("id")] = execution + } else { + executionMap[execution.id] = execution } - execution.stages.sortBy { it.refId } } - .also { - if (!results.any { r -> r.id == it.id }) { - results.add(it) + } + + if (results.isNotEmpty()) { + val type = results[0].type + + results.chunked(stageBatchSize) { executions -> + val executionIds: List = executions.map { + if (legacyMap.containsKey(it.id)) { + legacyMap[it.id]!! } else { - log.warn("Duplicate execution for ${it.id} found in sql result") + it.id } } + + context.selectExecutionStages(type, executionIds).let { stageResultSet -> + while (stageResultSet.next()) { + mapStage(stageResultSet, executionMap) + } + } + + executions.forEach { execution -> + execution.stages.sortBy { it.refId } + } + } } return results } - private fun mapStage(rs: ResultSet, execution: Execution) { - execution.stages.add(mapper.readValue(rs.getString("body")).apply { - setExecution(execution) - }) + private fun mapStage(rs: ResultSet, executions: Map) { + val executionId = rs.getString("execution_id") + executions.getValue(executionId) + .stages + .add( + mapper.readValue(rs.getString("body")) + .apply { + execution = executions.getValue(executionId) + } + ) } } diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt index c68be6256d..44f29684ee 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepository.kt @@ -71,7 +71,8 @@ class SqlExecutionRepository( private val jooq: DSLContext, private val mapper: ObjectMapper, private val transactionRetryProperties: TransactionRetryProperties, - private val batchReadSize: Int = 10 + private val batchReadSize: Int = 10, + private val stageReadSize: Int = 200 ) : ExecutionRepository, ExecutionStatisticsRepository { companion object { val ulid = SpinULID(SecureRandom()) @@ -798,7 +799,7 @@ class SqlExecutionRepository( listOf(field("id"), field("body")) private fun SelectForUpdateStep.fetchExecutions() = - ExecutionMapper(mapper).map(fetch().intoResultSet(), jooq) + ExecutionMapper(mapper, stageReadSize).map(fetch().intoResultSet(), jooq) private fun SelectForUpdateStep.fetchExecution() = fetchExecutions().firstOrNull() diff --git a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/jooq.kt b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/jooq.kt index 311556978c..b8daaf2f9b 100644 --- a/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/jooq.kt +++ b/orca-sql/src/main/kotlin/com/netflix/spinnaker/orca/sql/pipeline/persistence/jooq.kt @@ -82,11 +82,11 @@ internal val ExecutionType.stagesTableName: Table } /** - * Selects all stages for an [executionType] and [executionId]. + * Selects all stages for an [executionType] and List [executionIds]. */ -internal fun DSLContext.selectExecutionStages(executionType: ExecutionType, executionId: String) = - select(field("body")) +internal fun DSLContext.selectExecutionStages(executionType: ExecutionType, executionIds: Collection) = + select(field("execution_id"), field("body")) .from(executionType.stagesTableName) - .where(field("execution_id").eq(executionId)) + .where(field("execution_id").`in`(*executionIds.toTypedArray())) .fetch() .intoResultSet() diff --git a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy index dad56a6597..4e0efb687c 100644 --- a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy +++ b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/cleanup/OldPipelineCleanupPollingNotificationAgentSpec.groovy @@ -67,7 +67,7 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification { def setupSpec() { currentDatabase = initDatabase() - executionRepository = new SqlExecutionRepository("test", currentDatabase.context, mapper, new TransactionRetryProperties(), 10) + executionRepository = new SqlExecutionRepository("test", currentDatabase.context, mapper, new TransactionRetryProperties(), 10, 100) } @Ignore("Broken by H2's inversion of order by desc when using a limit") diff --git a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositorySpec.groovy b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositorySpec.groovy index 607505ca33..264e24d879 100644 --- a/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositorySpec.groovy +++ b/orca-sql/src/test/groovy/com/netflix/spinnaker/orca/sql/pipeline/persistence/SqlExecutionRepositorySpec.groovy @@ -67,12 +67,12 @@ class SqlExecutionRepositorySpec extends ExecutionRepositoryTck