Skip to content

Commit

Permalink
perf(peering): improve peering perf (#3552)
Browse files Browse the repository at this point in the history
A big chunk of time was spent storing all stages of running executions.
This is not necessary because most of those stages don't change (only about 8% change with ~30s agent interval)
Only store stages that have a different `updated_at` timestamp (or those that don't exist).
This improves peering performance by about 25%
  • Loading branch information
marchello2000 committed Mar 30, 2020
1 parent 6cd5a6e commit acc85ec
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,29 +131,36 @@ open class ExecutionCopier(
// We can't have the execution update in the time we capture its stages and the time we capture the execution it self
// It's totally fine for the stages to be "newer" than the execution since that will be fixed up in the next agent run
val executionRows = srcDB.getExecutions(executionType, idsToMigrate)
val stagesToMigrate = srcDB.getStageIdsForExecutions(executionType, idsToMigrate)
val stagesInSource = srcDB.getStageIdsForExecutions(executionType, idsToMigrate)
val stagesInSourceHash = stagesInSource.map { it.id }.toHashSet()

// Step 1: Copy over stages before the executions themselves -
// if we saved executions first the user could request an execution but it wouldn't have any stages yet

// It is possible that the source stage list has mutated. Normally, this is only possible when an execution
// is restarted (e.g. restarting a deploy stage will delete all its synthetic stages and start over).
// We delete all stages that are no longer in our peer first, then we update/copy all other stages
val stagesPresent = destDB.getStageIdsForExecutions(executionType, idsToMigrate)
val stagesToMigrateHash = stagesToMigrate.toHashSet()
val stagesToDelete = stagesPresent.filter { !stagesToMigrateHash.contains(it) }
if (stagesToDelete.any()) {
destDB.deleteStages(executionType, stagesToDelete)
peeringMetrics.incrementNumStagesDeleted(executionType, stagesToDelete.size)
val stagesInDest = destDB.getStageIdsForExecutions(executionType, idsToMigrate)
val stagesInDestMap = stagesInDest.map { it.id to it }.toMap()

val stageIdsToDelete = stagesInDest.filter { !stagesInSourceHash.contains(it.id) }.map { it.id }
if (stageIdsToDelete.any()) {
destDB.deleteStages(executionType, stageIdsToDelete)
peeringMetrics.incrementNumStagesDeleted(executionType, stageIdsToDelete.size)
}

for (chunk in stagesToMigrate.chunked(chunkSize)) {
val stageIdsToMigrate = stagesInSource
.filter { key -> stagesInDestMap[key.id]?.updated_at ?: 0 < key.updated_at }
.map { it.id }

for (chunk in stageIdsToMigrate.chunked(chunkSize)) {
val stageRows = srcDB.getStages(executionType, chunk)
destDB.loadRecords(getStagesTable(executionType).name, stageRows)
}

// Step 2: Copy all executions
executionRows.forEach { r -> r.set(DSL.field("partition"), peeredId)
executionRows.forEach { r ->
r.set(DSL.field("partition"), peeredId)
latestUpdatedAt = max(latestUpdatedAt, r.get("updated_at", Long::class.java))
}
destDB.loadRecords(getExecutionTable(executionType).name, executionRows)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ open class MySqlRawAccess(
}
}

override fun getStageIdsForExecutions(executionType: ExecutionType, executionIds: List<String>): List<String> {
override fun getStageIdsForExecutions(executionType: ExecutionType, executionIds: List<String>): List<ExecutionDiffKey> {
return withPool(poolName) {
jooq
.select(field("id"))
.select(field("id"), field("updated_at"))
.from(getStagesTable(executionType))
.where(field("execution_id").`in`(*executionIds.toTypedArray()))
.fetch(field("id"), String::class.java)
.fetchInto(ExecutionDiffKey::class.java)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,12 +165,10 @@ class PeeringAgent(
val pipelineIdsToMigrate = completedPipelineKeys
.filter { key -> migratedPipelineKeysMap[key.id]?.updated_at ?: 0 < key.updated_at }
.map { it.id }
.toList()

val pipelineIdsToDelete = migratedPipelineKeys
.filter { key -> !completedPipelineKeysMap.containsKey(key.id) }
.map { it.id }
.toList()

fun getLatestCompletedUpdatedTime() =
(completedPipelineKeys.map { it.updated_at }.max() ?: 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ abstract class SqlRawAccess(
/**
* Returns a list of stage IDs that belong to the given executions
*/
abstract fun getStageIdsForExecutions(executionType: ExecutionType, executionIds: List<String>): List<String>
abstract fun getStageIdsForExecutions(executionType: ExecutionType, executionIds: List<String>): List<ExecutionDiffKey>

/**
* Returns (a list of) full execution DB records with given execution IDs
Expand Down

0 comments on commit acc85ec

Please sign in to comment.