Skip to content

Commit

Permalink
Fix scheduled queue max read level update for single processor (#3474)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Oct 12, 2022
1 parent 32ea0bf commit 16184da
Show file tree
Hide file tree
Showing 16 changed files with 81 additions and 31 deletions.
1 change: 1 addition & 0 deletions service/history/queues/queue_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ func (p *queueBase) processNewRange() {
newMaxKey := p.shard.GetQueueExclusiveHighReadWatermark(
p.category,
p.shard.GetClusterMetadata().GetCurrentClusterName(),
true,
)

if !p.nonReadableScope.CanSplitByRange(newMaxKey) {
Expand Down
4 changes: 2 additions & 2 deletions service/history/replication/ack_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (p *ackMgrImpl) GetMaxTaskInfo() (int64, time.Time) {
// use ImmediateTaskMaxReadLevel which is the max task id of any immediate task queues.
// ImmediateTaskMaxReadLevel will be the lower bound of new range_id if shard reload. Remote cluster will quickly (in
// a few seconds) ack to the latest ImmediateTaskMaxReadLevel if there is no replication tasks at all.
taskID := p.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, p.currentClusterName).Prev().TaskID
taskID := p.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, p.currentClusterName, false).Prev().TaskID
maxTaskID = &taskID
}
maxVisibilityTimestamp := p.maxTaskVisibilityTimestamp
Expand Down Expand Up @@ -319,7 +319,7 @@ func (p *ackMgrImpl) taskIDsRange(
lastReadMessageID int64,
) (minTaskID int64, maxTaskID int64) {
minTaskID = lastReadMessageID
maxTaskID = p.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, p.currentClusterName).Prev().TaskID
maxTaskID = p.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, p.currentClusterName, false).Prev().TaskID

p.Lock()
defer p.Unlock()
Expand Down
16 changes: 8 additions & 8 deletions service/history/replication/ack_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (s *ackManagerSuite) TestNotifyNewTasks_Initialized() {

func (s *ackManagerSuite) TestTaskIDRange_NotInitialized() {
s.replicationAckManager.sanityCheckTime = time.Time{}
expectMaxTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).Prev().TaskID
expectMaxTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName, false).Prev().TaskID
expectMinTaskID := expectMaxTaskID - 100
s.replicationAckManager.maxTaskID = convert.Int64Ptr(expectMinTaskID - 100)

Expand All @@ -165,8 +165,8 @@ func (s *ackManagerSuite) TestTaskIDRange_Initialized_UseHighestReplicationTaskI
now := time.Now().UTC()
sanityCheckTime := now.Add(2 * time.Minute)
s.replicationAckManager.sanityCheckTime = sanityCheckTime
expectMinTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID - 100
expectMaxTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID - 50
expectMinTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName, false).TaskID - 100
expectMaxTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName, false).TaskID - 50
s.replicationAckManager.maxTaskID = convert.Int64Ptr(expectMaxTaskID)

minTaskID, maxTaskID := s.replicationAckManager.taskIDsRange(expectMinTaskID)
Expand All @@ -180,8 +180,8 @@ func (s *ackManagerSuite) TestTaskIDRange_Initialized_NoHighestReplicationTaskID
now := time.Now().UTC()
sanityCheckTime := now.Add(2 * time.Minute)
s.replicationAckManager.sanityCheckTime = sanityCheckTime
expectMinTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).Prev().TaskID - 100
expectMaxTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).Prev().TaskID
expectMinTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName, false).Prev().TaskID - 100
expectMaxTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName, false).Prev().TaskID
s.replicationAckManager.maxTaskID = nil

minTaskID, maxTaskID := s.replicationAckManager.taskIDsRange(expectMinTaskID)
Expand All @@ -195,9 +195,9 @@ func (s *ackManagerSuite) TestTaskIDRange_Initialized_UseHighestTransferTaskID()
now := time.Now().UTC()
sanityCheckTime := now.Add(-2 * time.Minute)
s.replicationAckManager.sanityCheckTime = sanityCheckTime
expectMinTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).Prev().TaskID - 100
expectMaxTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).Prev().TaskID
s.replicationAckManager.maxTaskID = convert.Int64Ptr(s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName).TaskID - 50)
expectMinTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName, false).Prev().TaskID - 100
expectMaxTaskID := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName, false).Prev().TaskID
s.replicationAckManager.maxTaskID = convert.Int64Ptr(s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, s.replicationAckManager.currentClusterName, false).TaskID - 50)

minTaskID, maxTaskID := s.replicationAckManager.taskIDsRange(expectMinTaskID)
s.Equal(expectMinTaskID, minTaskID)
Expand Down
4 changes: 2 additions & 2 deletions service/history/replication/task_processor_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (r *taskProcessorManagerImpl) cleanupReplicationTasks() error {

var ackLevel int64
if clusterName == currentCluster {
ackLevel = r.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, clusterName).TaskID
ackLevel = r.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, clusterName, false).TaskID
} else {
ackLevel = r.shard.GetQueueClusterAckLevel(tasks.CategoryReplication, clusterName).TaskID
}
Expand All @@ -255,7 +255,7 @@ func (r *taskProcessorManagerImpl) cleanupReplicationTasks() error {
metrics.ReplicationTaskFetcherScope,
).RecordDistribution(
metrics.ReplicationTasksLag,
int(r.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, currentCluster).Prev().TaskID-*minAckedTaskID),
int(r.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, currentCluster, false).Prev().TaskID-*minAckedTaskID),
)

ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
Expand Down
4 changes: 2 additions & 2 deletions service/history/replication/task_processor_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (s *taskProcessorManagerSuite) TearDownTest() {

func (s *taskProcessorManagerSuite) TestCleanupReplicationTask_Noop() {
ackedTaskID := int64(12345)
s.mockShard.EXPECT().GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, cluster.TestCurrentClusterName).Return(tasks.NewImmediateKey(ackedTaskID))
s.mockShard.EXPECT().GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, cluster.TestCurrentClusterName, false).Return(tasks.NewImmediateKey(ackedTaskID))
s.mockShard.EXPECT().GetQueueClusterAckLevel(tasks.CategoryReplication, cluster.TestAlternativeClusterName).Return(tasks.NewImmediateKey(ackedTaskID))

s.taskProcessorManager.minTxAckedTaskID = ackedTaskID
Expand All @@ -145,7 +145,7 @@ func (s *taskProcessorManagerSuite) TestCleanupReplicationTask_Noop() {

func (s *taskProcessorManagerSuite) TestCleanupReplicationTask_Cleanup() {
ackedTaskID := int64(12345)
s.mockShard.EXPECT().GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, cluster.TestCurrentClusterName).Return(tasks.NewImmediateKey(ackedTaskID)).Times(2)
s.mockShard.EXPECT().GetQueueExclusiveHighReadWatermark(tasks.CategoryReplication, cluster.TestCurrentClusterName, false).Return(tasks.NewImmediateKey(ackedTaskID)).Times(2)
s.mockShard.EXPECT().GetQueueClusterAckLevel(tasks.CategoryReplication, cluster.TestAlternativeClusterName).Return(tasks.NewImmediateKey(ackedTaskID))
s.taskProcessorManager.minTxAckedTaskID = ackedTaskID - 1
s.mockExecutionManager.EXPECT().RangeCompleteHistoryTasks(
Expand Down
4 changes: 3 additions & 1 deletion service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ type (
GenerateTaskID() (int64, error)
GenerateTaskIDs(number int) ([]int64, error)

GetQueueExclusiveHighReadWatermark(category tasks.Category, cluster string) tasks.Key
// TODO: remove cluster and singleProcessorMode parameter after deprecating old task procesing logic
// In multi-cursor world, there's only one maxReadLevel for scheduled queue for all clusters.
GetQueueExclusiveHighReadWatermark(category tasks.Category, cluster string, singleProcessorMode bool) tasks.Key
GetQueueAckLevel(category tasks.Category) tasks.Key
UpdateQueueAckLevel(category tasks.Category, ackLevel tasks.Key) error
GetQueueClusterAckLevel(category tasks.Category, cluster string) tasks.Key
Expand Down
23 changes: 19 additions & 4 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,13 @@ func (s *ContextImpl) GenerateTaskIDs(number int) ([]int64, error) {
func (s *ContextImpl) GetQueueExclusiveHighReadWatermark(
category tasks.Category,
cluster string,
singleProcessorMode bool,
) tasks.Key {
switch categoryType := category.Type(); categoryType {
case tasks.CategoryTypeImmediate:
return s.getImmediateTaskExclusiveMaxReadLevel()
case tasks.CategoryTypeScheduled:
return s.updateScheduledTaskMaxReadLevel(cluster)
return s.updateScheduledTaskMaxReadLevel(cluster, singleProcessorMode)
default:
panic(fmt.Sprintf("invalid task category type: %v", categoryType))
}
Expand All @@ -293,7 +294,10 @@ func (s *ContextImpl) getScheduledTaskMaxReadLevel(cluster string) tasks.Key {
return tasks.NewKey(s.scheduledTaskMaxReadLevelMap[cluster], 0)
}

func (s *ContextImpl) updateScheduledTaskMaxReadLevel(cluster string) tasks.Key {
func (s *ContextImpl) updateScheduledTaskMaxReadLevel(
cluster string,
singleProcessorMode bool,
) tasks.Key {
s.wLock()
defer s.wUnlock()

Expand All @@ -311,7 +315,18 @@ func (s *ContextImpl) updateScheduledTaskMaxReadLevel(cluster string) tasks.Key
}

newMaxReadLevel := currentTime.Add(s.config.TimerProcessorMaxTimeShift()).Truncate(time.Millisecond)
s.scheduledTaskMaxReadLevelMap[cluster] = util.MaxTime(s.scheduledTaskMaxReadLevelMap[cluster], newMaxReadLevel)
if singleProcessorMode {
// When generating scheduled tasks, the task's timestamp will be compared to the namespace's active cluster's
// maxReadLevel to avoid generatnig a task before maxReadLevel.
// But when there's a single procssor, the queue is only using current cluster maxReadLevel.
// So update the maxReadLevel map for all clusters to ensure scheduled task won't be lost.
for key := range s.scheduledTaskMaxReadLevelMap {
s.scheduledTaskMaxReadLevelMap[key] = util.MaxTime(s.scheduledTaskMaxReadLevelMap[key], newMaxReadLevel)
}
} else {
s.scheduledTaskMaxReadLevelMap[cluster] = util.MaxTime(s.scheduledTaskMaxReadLevelMap[cluster], newMaxReadLevel)
}

return tasks.NewKey(s.scheduledTaskMaxReadLevelMap[cluster], 0)
}

Expand Down Expand Up @@ -1327,7 +1342,7 @@ func (s *ContextImpl) allocateTaskIDsLocked(
// if scheduled task, check if fire time is in the past
if category.Type() == tasks.CategoryTypeScheduled {
ts := task.GetVisibilityTime()
if task.GetVersion() != common.EmptyVersion {
if task.GetVersion() != common.EmptyVersion && category.ID() == tasks.CategoryIDTimer {
// cannot use version to determine the corresponding cluster for timer task
// this is because during failover, timer task should be created as active
// or otherwise, failover + active processing logic may not pick up the task.
Expand Down
8 changes: 4 additions & 4 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 23 additions & 4 deletions service/history/shard/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,35 @@ func (s *contextSuite) TestTimerMaxReadLevelInitialization() {
}
}

func (s *contextSuite) TestTimerMaxReadLevelUpdate() {
func (s *contextSuite) TestTimerMaxReadLevelUpdate_MultiProcessor() {
now := time.Now()
s.timeSource.Update(now)
maxReadLevel := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, cluster.TestCurrentClusterName)
maxReadLevel := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, cluster.TestCurrentClusterName, false)

s.timeSource.Update(now.Add(-time.Minute))
newMaxReadLevel := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, cluster.TestCurrentClusterName)
newMaxReadLevel := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, cluster.TestCurrentClusterName, false)
s.Equal(maxReadLevel, newMaxReadLevel)

s.timeSource.Update(now.Add(time.Minute))
newMaxReadLevel = s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, cluster.TestCurrentClusterName)
newMaxReadLevel = s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, cluster.TestCurrentClusterName, false)
s.True(newMaxReadLevel.FireTime.After(maxReadLevel.FireTime))
}

func (s *contextSuite) TestTimerMaxReadLevelUpdate_SingleProcessor() {
now := time.Now()
s.timeSource.Update(now)

// make sure the scheduledTaskMaxReadLevelMap has value for both current cluster and alternative cluster
s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, cluster.TestCurrentClusterName, false)
s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, cluster.TestAlternativeClusterName, false)

now = time.Now().Add(time.Minute)
s.timeSource.Update(now)

// update in single processor mode
s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, cluster.TestCurrentClusterName, true)
scheduledTaskMaxReadLevelMap := s.mockShard.(*ContextTest).scheduledTaskMaxReadLevelMap
s.Len(scheduledTaskMaxReadLevelMap, 2)
s.True(scheduledTaskMaxReadLevelMap[cluster.TestCurrentClusterName].After(now))
s.True(scheduledTaskMaxReadLevelMap[cluster.TestAlternativeClusterName].After(now))
}
6 changes: 5 additions & 1 deletion service/history/timerQueueAckMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type (
timeNow timeNow
updateTimerAckLevel updateTimerAckLevel
timerQueueShutdown timerQueueShutdown
singleProcessorMode bool
// isReadFinished indicate timer queue ack manager
// have no more task to send out
isReadFinished bool
Expand Down Expand Up @@ -103,6 +104,7 @@ func newTimerQueueAckMgr(
logger log.Logger,
clusterName string,
executableInitializer taskExecutableInitializer,
singleProcessorMode bool,
) *timerQueueAckMgrImpl {
ackLevel := tasks.NewKey(minLevel, 0)

Expand All @@ -117,6 +119,7 @@ func newTimerQueueAckMgr(
timeNow: timeNow,
updateTimerAckLevel: updateTimerAckLevel,
timerQueueShutdown: func() error { return nil },
singleProcessorMode: singleProcessorMode,
outstandingExecutables: make(map[tasks.Key]queues.Executable),
ackLevel: ackLevel,
readLevel: ackLevel,
Expand Down Expand Up @@ -156,6 +159,7 @@ func newTimerQueueFailoverAckMgr(
timeNow: timeNow,
updateTimerAckLevel: updateTimerAckLevel,
timerQueueShutdown: timerQueueShutdown,
singleProcessorMode: false,
outstandingExecutables: make(map[tasks.Key]queues.Executable),
ackLevel: ackLevel,
readLevel: ackLevel,
Expand All @@ -176,7 +180,7 @@ func (t *timerQueueAckMgrImpl) getFinishedChan() <-chan struct{} {

func (t *timerQueueAckMgrImpl) readTimerTasks() ([]queues.Executable, *time.Time, bool, error) {
if t.maxQueryLevel == t.minQueryLevel {
t.maxQueryLevel = t.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, t.clusterName).FireTime
t.maxQueryLevel = t.shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, t.clusterName, t.singleProcessorMode).FireTime
t.maxQueryLevel = util.MaxTime(t.minQueryLevel, t.maxQueryLevel)
}
minQueryLevel := t.minQueryLevel
Expand Down
3 changes: 2 additions & 1 deletion service/history/timerQueueAckMgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ func (s *timerQueueAckMgrSuite) SetupTest() {
nil,
)
},
false,
)
}

Expand Down Expand Up @@ -563,7 +564,7 @@ func (s *timerQueueAckMgrSuite) TestReadCompleteUpdateTimerTasks() {

func (s *timerQueueAckMgrSuite) TestReadLookAheadTask() {
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(s.clusterName).AnyTimes()
level := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, s.clusterName).FireTime
level := s.mockShard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTimer, s.clusterName, false).FireTime

s.timerQueueAckMgr.minQueryLevel = level
s.timerQueueAckMgr.maxQueryLevel = s.timerQueueAckMgr.minQueryLevel
Expand Down
1 change: 1 addition & 0 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func newTimerQueueActiveProcessor(
config.NamespaceCacheRefreshInterval,
)
},
singleProcessor,
)

processor.timerQueueProcessorBase = newTimerQueueProcessorBase(
Expand Down
3 changes: 3 additions & 0 deletions service/history/timerQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func newTimerQueueStandbyProcessor(
config.NamespaceCacheRefreshInterval,
)
},
// we are creating standby processor,
// so we know we are not in single processor mode
false,
)

processor.timerQueueProcessorBase = newTimerQueueProcessorBase(
Expand Down
2 changes: 1 addition & 1 deletion service/history/transferQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func newTransferQueueActiveProcessor(
logger = log.With(logger, tag.ClusterName(currentClusterName))

maxReadLevel := func() int64 {
return shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTransfer, currentClusterName).TaskID
return shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTransfer, currentClusterName, singleProcessor).TaskID
}
updateTransferAckLevel := func(ackLevel int64) error {
// in single cursor mode, continue to update cluster ack level
Expand Down
3 changes: 2 additions & 1 deletion service/history/transferQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ func newTransferQueueStandbyProcessor(
}
}
maxReadLevel := func() int64 {
return shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTransfer, clusterName).TaskID
// we are creating standby processor, so we know we are not in single processor mode
return shard.GetQueueExclusiveHighReadWatermark(tasks.CategoryTransfer, clusterName, false).TaskID
}
updateClusterAckLevel := func(ackLevel int64) error {
return shard.UpdateQueueClusterAckLevel(tasks.CategoryTransfer, clusterName, tasks.NewImmediateKey(ackLevel))
Expand Down
3 changes: 3 additions & 0 deletions service/history/visibilityQueueProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ func newVisibilityQueueProcessor(
return shard.GetQueueExclusiveHighReadWatermark(
tasks.CategoryVisibility,
shard.GetClusterMetadata().GetCurrentClusterName(),
// the value doesn't actually used for immediate queue,
// but logically visibility queue only has one processor
true,
).TaskID
}
updateVisibilityAckLevel := func(ackLevel int64) error {
Expand Down

0 comments on commit 16184da

Please sign in to comment.