Skip to content

Commit

Permalink
Clean up history branch ancestor operation (#3046)
Browse files Browse the repository at this point in the history
* Clean up history branch ancestor operation
  • Loading branch information
yux0 authored and alexshtin committed Jul 8, 2022
1 parent fb208d5 commit 3278111
Showing 1 changed file with 16 additions and 17 deletions.
33 changes: 16 additions & 17 deletions service/history/nDCHistoryReplicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,18 +866,17 @@ func (r *nDCHistoryReplicatorImpl) backfillHistory(
lastEventID,
lastEventVersion),
)
var lastHistoryBatch *commonpb.DataBlob
prevTxnID := common.EmptyVersion
historyBranch, err := serialization.HistoryBranchFromBlob(branchToken, enumspb.ENCODING_TYPE_PROTO3.String())
if err != nil {
return nil, err
}
latestBranchID := historyBranch.GetBranchId()
var prevBranchID string

sortedAncestors := copyAndSortAncestors(historyBranch.GetAncestors())
prevTxnID := common.EmptyVersion
var lastHistoryBatch *commonpb.DataBlob
var prevBranchID string
sortedAncestors := sortAncestors(historyBranch.GetAncestors())
sortedAncestorsIdx := 0
historyBranch.Ancestors = nil
var ancestors []*persistencespb.HistoryBranchRange

BackfillLoop:
for remoteHistoryIterator.HasNext() {
Expand All @@ -891,17 +890,18 @@ BackfillLoop:
continue BackfillLoop
}

branchID := historyBranch.GetBranchId()
if sortedAncestorsIdx < len(sortedAncestors) {
currentAncestor := sortedAncestors[sortedAncestorsIdx]
if historyBlob.nodeID >= currentAncestor.GetEndNodeId() {
// update ancestor
historyBranch.Ancestors = append(historyBranch.Ancestors, currentAncestor)
ancestors = append(ancestors, currentAncestor)
sortedAncestorsIdx++
}
if sortedAncestorsIdx < len(sortedAncestors) {
// use ancestor branch id
currentAncestor = sortedAncestors[sortedAncestorsIdx]
historyBranch.BranchId = currentAncestor.GetBranchId()
branchID = currentAncestor.GetBranchId()
if historyBlob.nodeID < currentAncestor.GetBeginNodeId() || historyBlob.nodeID >= currentAncestor.GetEndNodeId() {
return nil, serviceerror.NewInternal(
fmt.Sprintf("The backfill history blob node id %d is not in acestoer range [%d, %d]",
Expand All @@ -910,13 +910,14 @@ BackfillLoop:
currentAncestor.GetEndNodeId()),
)
}
} else {
// no more ancestor, use the latest branch ID
historyBranch.BranchId = latestBranchID
}
}

filteredHistoryBranch, err := serialization.HistoryBranchToBlob(historyBranch)
filteredHistoryBranch, err := serialization.HistoryBranchToBlob(&persistencespb.HistoryBranch{
TreeId: historyBranch.GetTreeId(),
BranchId: branchID,
Ancestors: ancestors,
})
if err != nil {
return nil, err
}
Expand All @@ -926,7 +927,7 @@ BackfillLoop:
}
_, err = r.shard.GetExecutionManager().AppendRawHistoryNodes(ctx, &persistence.AppendRawHistoryNodesRequest{
ShardID: r.shard.GetShardID(),
IsNewBranch: prevBranchID != historyBranch.BranchId,
IsNewBranch: prevBranchID != branchID,
BranchToken: filteredHistoryBranch.GetData(),
History: historyBlob.rawHistory,
PrevTransactionID: prevTxnID,
Expand All @@ -942,7 +943,7 @@ BackfillLoop:
return nil, err
}
prevTxnID = txnID
prevBranchID = historyBranch.BranchId
prevBranchID = branchID
lastHistoryBatch = historyBlob.rawHistory
}

Expand All @@ -954,9 +955,7 @@ BackfillLoop:
return lastEventTime, nil
}

func copyAndSortAncestors(input []*persistencespb.HistoryBranchRange) []*persistencespb.HistoryBranchRange {
ans := make([]*persistencespb.HistoryBranchRange, len(input))
copy(ans, input)
func sortAncestors(ans []*persistencespb.HistoryBranchRange) []*persistencespb.HistoryBranchRange {
if len(ans) > 0 {
// sort ans based onf EndNodeID so that we can set BeginNodeID
sort.Slice(ans, func(i, j int) bool { return ans[i].GetEndNodeId() < ans[j].GetEndNodeId() })
Expand Down

0 comments on commit 3278111

Please sign in to comment.