Skip to content

Commit

Permalink
Fix UpdateHandoverNamespaces
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Jan 25, 2023
1 parent 5175ab6 commit 3184cfe
Showing 1 changed file with 16 additions and 18 deletions.
34 changes: 16 additions & 18 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,9 +580,15 @@ func (s *ContextImpl) UpdateNamespaceNotificationVersion(namespaceNotificationVe

func (s *ContextImpl) UpdateHandoverNamespaces(ns *namespace.Namespace, deletedFromDb bool) {
nsName := ns.Name()
// NOTE: replication state field won't be replicated and currently we only update a namespace
// to handover state from active cluster, so the second condition will always be true. Adding
// it here to be more safe in case above assumption no longer holds in the future.
isHandoverNamespace := ns.IsGlobalNamespace() &&
ns.ActiveInCluster(s.GetClusterMetadata().GetCurrentClusterName()) &&
ns.ReplicationState() == enums.REPLICATION_STATE_HANDOVER

s.wLock()
if deletedFromDb {
if deletedFromDb || !isHandoverNamespace {
delete(s.handoverNamespaces, ns.Name())
s.wUnlock()
return
Expand All @@ -595,23 +601,15 @@ func (s *ContextImpl) UpdateHandoverNamespaces(ns *namespace.Namespace, deletedF
maxReplicationTaskID = pendingMaxReplicationTaskID
}

// NOTE: replication state field won't be replicated and currently we only update a namespace
// to handover state from active cluster, so the second condition will always be true. Adding
// it here to be more safe in case above assumption no longer holds in the future.
if ns.IsGlobalNamespace() &&
ns.ActiveInCluster(s.GetClusterMetadata().GetCurrentClusterName()) &&
ns.ReplicationState() == enums.REPLICATION_STATE_HANDOVER {

if handover, ok := s.handoverNamespaces[nsName]; ok {
if handover.NotificationVersion < ns.NotificationVersion() {
handover.NotificationVersion = ns.NotificationVersion()
handover.MaxReplicationTaskID = maxReplicationTaskID
}
} else {
s.handoverNamespaces[nsName] = &namespaceHandOverInfo{
NotificationVersion: ns.NotificationVersion(),
MaxReplicationTaskID: maxReplicationTaskID,
}
if handover, ok := s.handoverNamespaces[nsName]; ok {
if handover.NotificationVersion < ns.NotificationVersion() {
handover.NotificationVersion = ns.NotificationVersion()
handover.MaxReplicationTaskID = maxReplicationTaskID
}
} else {
s.handoverNamespaces[nsName] = &namespaceHandOverInfo{
NotificationVersion: ns.NotificationVersion(),
MaxReplicationTaskID: maxReplicationTaskID,
}
}

Expand Down

0 comments on commit 3184cfe

Please sign in to comment.