Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix data race between read APIs and finshiSplit/finishMerge in keyspace group manager #6723

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 60 additions & 38 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,39 @@
return s.ams[groupID], s.kgs[groupID]
}

func (s *state) checkTSOSplit(
targetGroupID uint32,
) (splitTargetAM, splitSourceAM *AllocatorManager, err error) {
s.RLock()
defer s.RUnlock()
splitTargetAM, splitTargetGroup := s.ams[targetGroupID], s.kgs[targetGroupID]
// Only the split target keyspace group needs to check the TSO split.
if !splitTargetGroup.IsSplitTarget() {
return nil, nil, nil // it isn't in the split state
}
sourceGroupID := splitTargetGroup.SplitSource()
splitSourceAM, splitSourceGroup := s.ams[sourceGroupID], s.kgs[sourceGroupID]
if splitSourceAM == nil || splitSourceGroup == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("source", sourceGroupID))
return nil, nil, errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(sourceGroupID)

Check warning on line 130 in pkg/tso/keyspace_group_manager.go

View check run for this annotation

Codecov / codecov/patch

pkg/tso/keyspace_group_manager.go#L128-L130

Added lines #L128 - L130 were not covered by tests
}
return splitTargetAM, splitSourceAM, nil
}

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (s *state) checkTSOMerge(
groupID uint32,
) error {
s.RLock()
defer s.RUnlock()
if s.kgs[groupID] == nil || !s.kgs[groupID].IsMerging() {
return nil
}
return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(groupID)
}

// getKeyspaceGroupMetaWithCheck returns the keyspace group meta of the given keyspace.
// It also checks if the keyspace is served by the given keyspace group. If not, it returns the meta
// of the keyspace group to which the keyspace currently belongs and returns NotServed (by the given
Expand Down Expand Up @@ -957,7 +990,7 @@
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
err = kgm.checkTSOMerge(curKeyspaceGroupID)
err = kgm.state.checkTSOMerge(curKeyspaceGroupID)
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
Expand Down Expand Up @@ -1032,27 +1065,19 @@
keyspaceGroupID uint32,
dcLocation string,
) error {
splitAM, splitGroup := kgm.getKeyspaceGroupMeta(keyspaceGroupID)
// Only the split target keyspace group needs to check the TSO split.
if !splitGroup.IsSplitTarget() {
return nil
}
splitSource := splitGroup.SplitSource()
splitSourceAM, splitSourceGroup := kgm.getKeyspaceGroupMeta(splitSource)
if splitSourceAM == nil || splitSourceGroup == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("source", splitSource))
return errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(splitSource)
splitTargetAM, splitSourceAM, err := kgm.state.checkTSOSplit(keyspaceGroupID)
binshi-bing marked this conversation as resolved.
Show resolved Hide resolved
if err != nil || splitTargetAM == nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if splitTargetAM == nil, the err returned here is nil

return err
}
splitAllocator, err := splitAM.GetAllocator(dcLocation)
splitTargetAllocator, err := splitTargetAM.GetAllocator(dcLocation)
if err != nil {
return err
}
splitSourceAllocator, err := splitSourceAM.GetAllocator(dcLocation)
if err != nil {
return err
}
splitTSO, err := splitAllocator.GenerateTSO(1)
splitTargetTSO, err := splitTargetAllocator.GenerateTSO(1)
if err != nil {
return err
}
Expand All @@ -1061,19 +1086,19 @@
return err
}
// If the split source TSO is not greater than the newly split TSO, we don't need to do anything.
if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 {
if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTargetTSO) <= 0 {
log.Info("the split source tso is less than the newly split tso",
zap.Int64("split-source-tso-physical", splitSourceTSO.Physical),
zap.Int64("split-source-tso-logical", splitSourceTSO.Logical),
zap.Int64("split-tso-physical", splitTSO.Physical),
zap.Int64("split-tso-logical", splitTSO.Logical))
zap.Int64("split-tso-physical", splitTargetTSO.Physical),
zap.Int64("split-tso-logical", splitTargetTSO.Logical))
// Finish the split state directly.
return kgm.finishSplitKeyspaceGroup(keyspaceGroupID)
}
// If the split source TSO is greater than the newly split TSO, we need to update the split
// TSO to make sure the following TSO will be greater than the split keyspaces ever had
// in the past.
err = splitAllocator.SetTSO(tsoutil.GenerateTS(&pdpb.Timestamp{
err = splitTargetAllocator.SetTSO(tsoutil.GenerateTS(&pdpb.Timestamp{
Physical: splitSourceTSO.Physical + 1,
Logical: splitSourceTSO.Logical,
}), true, true)
Expand All @@ -1083,8 +1108,8 @@
log.Info("the split source tso is greater than the newly split tso",
zap.Int64("split-source-tso-physical", splitSourceTSO.Physical),
zap.Int64("split-source-tso-logical", splitSourceTSO.Logical),
zap.Int64("split-tso-physical", splitTSO.Physical),
zap.Int64("split-tso-logical", splitTSO.Logical))
zap.Int64("split-tso-physical", splitTargetTSO.Physical),
zap.Int64("split-tso-logical", splitTargetTSO.Logical))
// Finish the split state.
return kgm.finishSplitKeyspaceGroup(keyspaceGroupID)
}
Expand Down Expand Up @@ -1116,9 +1141,13 @@
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
// Pre-update the split keyspace group split state in memory.
splitGroup.SplitState = nil
kgm.kgs[id] = splitGroup
// Pre-update the split keyspace group's split state in memory.
// Note: to avoid data race with state read APIs, we always replace the group in memory as a whole.
// For now, we only have scenarios to update split state/merge state, and the other fields are always
// loaded from etcd without any modification, so we can simply copy the group and replace the state.
newSplitGroup := *splitGroup
newSplitGroup.SplitState = nil
kgm.kgs[id] = &newSplitGroup
return nil
}

Expand Down Expand Up @@ -1146,9 +1175,14 @@
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
// Pre-update the split keyspace group split state in memory.
mergeTarget.MergeState = nil
kgm.kgs[id] = mergeTarget

// Pre-update the merge target keyspace group's merge state in memory.
// Note: to avoid data race with state read APIs, we always replace the group in memory as a whole.
// For now, we only have scenarios to update split state/merge state, and the other fields are always
// loaded from etcd without any modification, so we can simply copy the group and replace the state.
newTargetGroup := *mergeTarget
newTargetGroup.MergeState = nil
kgm.kgs[id] = &newTargetGroup
return nil
}

Expand Down Expand Up @@ -1286,15 +1320,3 @@
return
}
}

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (kgm *KeyspaceGroupManager) checkTSOMerge(
keyspaceGroupID uint32,
) error {
_, group := kgm.getKeyspaceGroupMeta(keyspaceGroupID)
if !group.IsMerging() {
return nil
}
return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(keyspaceGroupID)
}