Skip to content

Commit

Permalink
Fix getRemoteClusterInfo race condition (#2971)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 committed Jun 8, 2022
1 parent 8610945 commit 9de6b45
Showing 1 changed file with 8 additions and 8 deletions.
16 changes: 8 additions & 8 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func (s *ContextImpl) updateScheduledTaskMaxReadLevel(cluster string) tasks.Key

currentTime := s.timeSource.Now()
if cluster != "" && cluster != s.GetClusterMetadata().GetCurrentClusterName() {
currentTime = s.getRemoteClusterInfoLocked(cluster).CurrentTime
currentTime = s.getOrUpdateRemoteClusterInfoLocked(cluster).CurrentTime
}

newMaxReadLevel := currentTime.Add(s.config.TimerProcessorMaxTimeShift()).Truncate(time.Millisecond)
Expand Down Expand Up @@ -419,7 +419,7 @@ func (s *ContextImpl) UpdateRemoteClusterInfo(
s.wLock()
defer s.wUnlock()

remoteClusterInfo := s.getRemoteClusterInfoLocked(cluster)
remoteClusterInfo := s.getOrUpdateRemoteClusterInfoLocked(cluster)
remoteClusterInfo.AckedReplicationTaskID = ackTaskID
remoteClusterInfo.AckedReplicationTimestamp = ackTimestamp
}
Expand Down Expand Up @@ -1306,20 +1306,20 @@ func (s *ContextImpl) SetCurrentTime(cluster string, currentTime time.Time) {
s.wLock()
defer s.wUnlock()
if cluster != s.GetClusterMetadata().GetCurrentClusterName() {
prevTime := s.getRemoteClusterInfoLocked(cluster).CurrentTime
prevTime := s.getOrUpdateRemoteClusterInfoLocked(cluster).CurrentTime
if prevTime.Before(currentTime) {
s.getRemoteClusterInfoLocked(cluster).CurrentTime = currentTime
s.getOrUpdateRemoteClusterInfoLocked(cluster).CurrentTime = currentTime
}
} else {
panic("Cannot set current time for current cluster")
}
}

func (s *ContextImpl) GetCurrentTime(cluster string) time.Time {
s.rLock()
defer s.rUnlock()
if cluster != s.GetClusterMetadata().GetCurrentClusterName() {
return s.getRemoteClusterInfoLocked(cluster).CurrentTime
s.wLock()
defer s.wUnlock()
return s.getOrUpdateRemoteClusterInfoLocked(cluster).CurrentTime
}
return s.timeSource.Now().UTC()
}
Expand Down Expand Up @@ -1716,7 +1716,7 @@ func (s *ContextImpl) GetReplicationStatus(cluster []string) (map[string]*histor
return remoteClusters, handoverNamespaces, nil
}

func (s *ContextImpl) getRemoteClusterInfoLocked(clusterName string) *remoteClusterInfo {
func (s *ContextImpl) getOrUpdateRemoteClusterInfoLocked(clusterName string) *remoteClusterInfo {
if info, ok := s.remoteClusterInfos[clusterName]; ok {
return info
}
Expand Down

0 comments on commit 9de6b45

Please sign in to comment.