Skip to content

Commit

Permalink
feat(sql): batch fetch stages (#2941)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher committed May 29, 2019
1 parent c47fd19 commit c5b5547
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class SqlConfiguration {
dsl,
mapper,
properties.transactionRetry,
properties.batchReadSize
properties.batchReadSize,
properties.stageReadSize
),
registry
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ data class SqlProperties(
var connectionPool: ConnectionPoolProperties = ConnectionPoolProperties(),
var transactionRetry: TransactionRetryProperties = TransactionRetryProperties(),
var partitionName: String? = null,
var batchReadSize: Int = 10
var batchReadSize: Int = 10,
var stageReadSize: Int = 200
)

data class MigrationProperties(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,39 +30,68 @@ import java.sql.ResultSet
* in this mapper as well.
*/
class ExecutionMapper(
private val mapper: ObjectMapper
private val mapper: ObjectMapper,
private val stageBatchSize: Int
) {

private val log = LoggerFactory.getLogger(javaClass)

fun map(rs: ResultSet, context: DSLContext): Collection<Execution> {
val results = mutableListOf<Execution>()
val executionMap = mutableMapOf<String, Execution>()
val legacyMap = mutableMapOf<String, String>()

while (rs.next()) {
mapper.readValue<Execution>(rs.getString("body"))
.also { execution ->
context.selectExecutionStages(execution.type, rs.getString("id")).let { stageResultSet ->
while (stageResultSet.next()) {
mapStage(stageResultSet, execution)
}
.also {
execution -> results.add(execution)

if (rs.getString("id") != execution.id) {
// Map legacyId executions to their current ULID
legacyMap[execution.id] = rs.getString("id")
executionMap[rs.getString("id")] = execution
} else {
executionMap[execution.id] = execution
}
execution.stages.sortBy { it.refId }
}
.also {
if (!results.any { r -> r.id == it.id }) {
results.add(it)
}

if (results.isNotEmpty()) {
val type = results[0].type

results.chunked(stageBatchSize) { executions ->
val executionIds: List<String> = executions.map {
if (legacyMap.containsKey(it.id)) {
legacyMap[it.id]!!
} else {
log.warn("Duplicate execution for ${it.id} found in sql result")
it.id
}
}

context.selectExecutionStages(type, executionIds).let { stageResultSet ->
while (stageResultSet.next()) {
mapStage(stageResultSet, executionMap)
}
}

executions.forEach { execution ->
execution.stages.sortBy { it.refId }
}
}
}

return results
}

private fun mapStage(rs: ResultSet, execution: Execution) {
execution.stages.add(mapper.readValue<Stage>(rs.getString("body")).apply {
setExecution(execution)
})
private fun mapStage(rs: ResultSet, executions: Map<String, Execution>) {
val executionId = rs.getString("execution_id")
executions.getValue(executionId)
.stages
.add(
mapper.readValue<Stage>(rs.getString("body"))
.apply {
execution = executions.getValue(executionId)
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class SqlExecutionRepository(
private val jooq: DSLContext,
private val mapper: ObjectMapper,
private val transactionRetryProperties: TransactionRetryProperties,
private val batchReadSize: Int = 10
private val batchReadSize: Int = 10,
private val stageReadSize: Int = 200
) : ExecutionRepository, ExecutionStatisticsRepository {
companion object {
val ulid = SpinULID(SecureRandom())
Expand Down Expand Up @@ -798,7 +799,7 @@ class SqlExecutionRepository(
listOf(field("id"), field("body"))

private fun SelectForUpdateStep<out Record>.fetchExecutions() =
ExecutionMapper(mapper).map(fetch().intoResultSet(), jooq)
ExecutionMapper(mapper, stageReadSize).map(fetch().intoResultSet(), jooq)

private fun SelectForUpdateStep<out Record>.fetchExecution() =
fetchExecutions().firstOrNull()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ internal val ExecutionType.stagesTableName: Table<Record>
}

/**
* Selects all stages for an [executionType] and [executionId].
* Selects all stages for an [executionType] and List [executionIds].
*/
internal fun DSLContext.selectExecutionStages(executionType: ExecutionType, executionId: String) =
select(field("body"))
internal fun DSLContext.selectExecutionStages(executionType: ExecutionType, executionIds: Collection<String>) =
select(field("execution_id"), field("body"))
.from(executionType.stagesTableName)
.where(field("execution_id").eq(executionId))
.where(field("execution_id").`in`(*executionIds.toTypedArray()))
.fetch()
.intoResultSet()
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class OldPipelineCleanupPollingNotificationAgentSpec extends Specification {

def setupSpec() {
currentDatabase = initDatabase()
executionRepository = new SqlExecutionRepository("test", currentDatabase.context, mapper, new TransactionRetryProperties(), 10)
executionRepository = new SqlExecutionRepository("test", currentDatabase.context, mapper, new TransactionRetryProperties(), 10, 100)
}

@Ignore("Broken by H2's inversion of order by desc when using a limit")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ class SqlExecutionRepositorySpec extends ExecutionRepositoryTck<SqlExecutionRepo

@Override
SqlExecutionRepository createExecutionRepository() {
new SqlExecutionRepository("test", currentDatabase.context, mapper, new TransactionRetryProperties(), 10)
new SqlExecutionRepository("test", currentDatabase.context, mapper, new TransactionRetryProperties(), 10, 100)
}

@Override
SqlExecutionRepository createExecutionRepositoryPrevious() {
new SqlExecutionRepository("test", previousDatabase.context, mapper, new TransactionRetryProperties(), 10)
new SqlExecutionRepository("test", previousDatabase.context, mapper, new TransactionRetryProperties(), 10, 100)
}

def "can store a new pipeline"() {
Expand Down

0 comments on commit c5b5547

Please sign in to comment.