Skip to content

Commit

Permalink
Move cluster ack level on ack level update (#2927)
Browse files Browse the repository at this point in the history
  • Loading branch information
meiliang86 committed Jun 1, 2022
1 parent 021d501 commit 05a90f8
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 17 deletions.
18 changes: 15 additions & 3 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,19 @@ func (s *ContextImpl) UpdateQueueAckLevel(
ClusterAckLevel: make(map[string]int64),
}
}
s.shardInfo.QueueAckLevels[category.ID()].AckLevel = convertTaskKeyToAckLevel(category.Type(), ackLevel)
persistenceAckLevel := convertTaskKeyToPersistenceAckLevel(category.Type(), ackLevel)
s.shardInfo.QueueAckLevels[category.ID()].AckLevel = persistenceAckLevel

// if cluster ack level is less than the overall ack level, update cluster ack level
// as well to prevent loading too many tombstones if the cluster ack level is used later
// this may happen when adding back a removed cluster or rolling back the change for using
// single queue in timer/transfer queue processor
clusterAckLevel := s.shardInfo.QueueAckLevels[category.ID()].ClusterAckLevel
for clusterName, persistenceClusterAckLevel := range clusterAckLevel {
if persistenceClusterAckLevel < persistenceAckLevel {
clusterAckLevel[clusterName] = persistenceAckLevel
}
}

s.shardInfo.StolenSinceRenew = 0
return s.updateShardInfoLocked()
Expand Down Expand Up @@ -407,7 +419,7 @@ func (s *ContextImpl) UpdateQueueClusterAckLevel(
ClusterAckLevel: make(map[string]int64),
}
}
s.shardInfo.QueueAckLevels[category.ID()].ClusterAckLevel[cluster] = convertTaskKeyToAckLevel(category.Type(), ackLevel)
s.shardInfo.QueueAckLevels[category.ID()].ClusterAckLevel[cluster] = convertTaskKeyToPersistenceAckLevel(category.Type(), ackLevel)

s.shardInfo.StolenSinceRenew = 0
return s.updateShardInfoLocked()
Expand Down Expand Up @@ -1979,7 +1991,7 @@ func convertAckLevelToTaskKey(
return tasks.Key{FireTime: timestamp.UnixOrZeroTime(ackLevel)}
}

func convertTaskKeyToAckLevel(
func convertTaskKeyToPersistenceAckLevel(
categoryType tasks.CategoryType,
taskKey tasks.Key,
) int64 {
Expand Down
14 changes: 0 additions & 14 deletions service/history/workflow/mutable_state_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 05a90f8

Please sign in to comment.