Skip to content

Commit

Permalink
Fix failover queue creation after shard reload (#2862)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored and meiliang86 committed Jun 1, 2022
1 parent 79102bf commit 360062b
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
15 changes: 10 additions & 5 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,11 +345,16 @@ func (e *historyEngineImpl) registerNamespaceFailoverCallback() {

newNotificationVersion := nextNamespaces[len(nextNamespaces)-1].NotificationVersion() + 1
shardNotificationVersion := e.shard.GetNamespaceNotificationVersion()
if newNotificationVersion <= shardNotificationVersion {
// skip if this is known version. this could happen once after shard reload because we use
// 0 as initialNotificationVersion when RegisterNamespaceChangeCallback.
return
}

// 1. We can't return when newNotificationVersion == shardNotificationVersion
// since we don't know if the previous failover queue processing has finished or not
// 2. We can return when newNotificationVersion < shardNotificationVersion. But the check
// is basically the same as the check in failover predicate. Because
// failover notification version <= NotificationVersion,
// there's no notification version that can make
// newNotificationVersion < shardNotificationVersion and
// failoverNotificationVersion >= shardNotificationVersion are true at the same time
// Meaning if the check decides to return, no namespace will pass the failover predicate.

failoverNamespaceIDs := map[string]struct{}{}
for _, nextNamespace := range nextNamespaces {
Expand Down
10 changes: 4 additions & 6 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,10 @@ func (s *ContextImpl) UpdateQueueClusterAckLevel(
s.wLock()
defer s.wUnlock()

if levels, ok := s.shardInfo.FailoverLevels[category]; ok {
for _, failoverLevel := range levels {
if ackLevel.CompareTo(failoverLevel.CurrentLevel) > 0 {
ackLevel = failoverLevel.CurrentLevel
}
}
if levels, ok := s.shardInfo.FailoverLevels[category]; ok && len(levels) != 0 {
// do not move ack level when there's failover queue
// so that after shard reload we can re-create the failover queue
return nil
}

// backward compatibility (for rollback)
Expand Down

0 comments on commit 360062b

Please sign in to comment.