Skip to content

Commit

Permalink
Ensure queue processor cluster ack level is below failover ack level (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed May 5, 2022
1 parent f62095d commit 91b099b
Showing 1 changed file with 11 additions and 0 deletions.
11 changes: 11 additions & 0 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ func (s *ContextImpl) UpdateQueueAckLevel(
s.wLock()
defer s.wUnlock()

// the ack level is already the min ack level across all types of
// queue processors: active, passive, failover

// backward compatibility (for rollback)
switch category {
case tasks.CategoryTransfer:
Expand Down Expand Up @@ -379,6 +382,14 @@ func (s *ContextImpl) UpdateQueueClusterAckLevel(
s.wLock()
defer s.wUnlock()

if levels, ok := s.shardInfo.FailoverLevels[category]; ok {
for _, failoverLevel := range levels {
if ackLevel.CompareTo(failoverLevel.CurrentLevel) > 0 {
ackLevel = failoverLevel.CurrentLevel
}
}
}

// backward compatibility (for rollback)
switch category {
case tasks.CategoryTransfer:
Expand Down

0 comments on commit 91b099b

Please sign in to comment.