Skip to content

Commit

Permalink
fix(peering): Capture the source execution BEFORE the source stage du…
Browse files Browse the repository at this point in the history
…ring peering (#3467)

It's important that we capture the executions BEFORE we capture their stages becuase we use the execution's `updated_at` for our diff calculation.
Before this change, it was possible to see a peered execution that was `SUCCEEDED` but a stage within that execution still showing as `RUNNING`.
This happens becuase we would grab the stages from the source DB, persist them to the dest DB (this can take some time) and only then we would
grab the executions and persist those. But in this flow our peer could have updated the stage in the time between we capture stages and capture
the execution and we would forever miss that update (if the execution was completed now)

It's totally fine for the stages to be "newer" than their execution since that will be fixed up in the next agent run

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
marchello2000 and mergify[bot] committed Feb 27, 2020
1 parent 2dc550c commit 7cd879c
Showing 1 changed file with 13 additions and 6 deletions.
Expand Up @@ -125,9 +125,17 @@ open class ExecutionCopier(
private fun copyExecutionChunk(executionType: Execution.ExecutionType, idsToMigrate: List<String>, state: ExecutionState): MigrationChunkResult {
var latestUpdatedAt = 0L
try {
// Step 1: Copy all stages
// Step 0: Capture the source data for executions and their stages
// NOTE: it's important that we capture the executions BEFORE we capture their stages
// The reason is that we key our diff off the updated_at timestamp of the execution.
// We can't have the execution update in the time we capture its stages and the time we capture the execution it self
// It's totally fine for the stages to be "newer" than the execution since that will be fixed up in the next agent run
val executionRows = srcDB.getExecutions(executionType, idsToMigrate)
val stagesToMigrate = srcDB.getStageIdsForExecutions(executionType, idsToMigrate)

// Step 1: Copy over stages before the executions themselves -
// if we saved executions first the user could request an execution but it wouldn't have any stages yet

// It is possible that the source stage list has mutated. Normally, this is only possible when an execution
// is restarted (e.g. restarting a deploy stage will delete all its synthetic stages and start over).
// We delete all stages that are no longer in our peer first, then we update/copy all other stages
Expand All @@ -140,16 +148,15 @@ open class ExecutionCopier(
}

for (chunk in stagesToMigrate.chunked(chunkSize)) {
val rows = srcDB.getStages(executionType, chunk)
destDB.loadRecords(getStagesTable(executionType).name, rows)
val stageRows = srcDB.getStages(executionType, chunk)
destDB.loadRecords(getStagesTable(executionType).name, stageRows)
}

// Step 2: Copy all executions
val rows = srcDB.getExecutions(executionType, idsToMigrate)
rows.forEach { r -> r.set(DSL.field("partition"), peeredId)
executionRows.forEach { r -> r.set(DSL.field("partition"), peeredId)
latestUpdatedAt = max(latestUpdatedAt, r.get("updated_at", Long::class.java))
}
destDB.loadRecords(getExecutionTable(executionType).name, rows)
destDB.loadRecords(getExecutionTable(executionType).name, executionRows)
peeringMetrics.incrementNumPeered(executionType, state, idsToMigrate.size)

return MigrationChunkResult(latestUpdatedAt, idsToMigrate.size, hadErrors = false)
Expand Down

0 comments on commit 7cd879c

Please sign in to comment.