Skip to content

Commit

Permalink
Fix data race between read APIs and finshiMe:wq keyspace group manager
Browse files Browse the repository at this point in the history
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Jun 30, 2023
1 parent fa721e7 commit c6b8e82
Showing 1 changed file with 53 additions and 31 deletions.
84 changes: 53 additions & 31 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,39 @@ func (s *state) getKeyspaceGroupMeta(
return s.ams[groupID], s.kgs[groupID]
}

func (s *state) checkTSOSplit(
targetGroupID uint32,
) (splitTargetAM, splitSourceAM *AllocatorManager, err error) {
s.RLock()
defer s.RUnlock()
splitTargetAM, splitTargetGroup := s.ams[targetGroupID], s.kgs[targetGroupID]
// Only the split target keyspace group needs to check the TSO split.
if !splitTargetGroup.IsSplitTarget() {
return nil, nil, nil
}
sourceGroupID := splitTargetGroup.SplitSource()
splitSourceAM, splitSourceGroup := s.ams[sourceGroupID], s.kgs[sourceGroupID]
if splitSourceAM == nil || splitSourceGroup == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("source", sourceGroupID))
return nil, nil, errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(sourceGroupID)
}
return splitTargetAM, splitSourceAM, nil
}

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (s *state) checkTSOMerge(
groupID uint32,
) error {
s.RLock()
defer s.RUnlock()
if s.kgs[groupID] == nil || !s.kgs[groupID].IsMerging() {
return nil
}
return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(groupID)
}

// getKeyspaceGroupMetaWithCheck returns the keyspace group meta of the given keyspace.
// It also checks if the keyspace is served by the given keyspace group. If not, it returns the meta
// of the keyspace group to which the keyspace currently belongs and returns NotServed (by the given
Expand Down Expand Up @@ -957,7 +990,7 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest(
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
err = kgm.checkTSOMerge(curKeyspaceGroupID)
err = kgm.state.checkTSOMerge(curKeyspaceGroupID)
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
Expand Down Expand Up @@ -1032,19 +1065,11 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
keyspaceGroupID uint32,
dcLocation string,
) error {
splitAM, splitGroup := kgm.getKeyspaceGroupMeta(keyspaceGroupID)
// Only the split target keyspace group needs to check the TSO split.
if !splitGroup.IsSplitTarget() {
return nil
}
splitSource := splitGroup.SplitSource()
splitSourceAM, splitSourceGroup := kgm.getKeyspaceGroupMeta(splitSource)
if splitSourceAM == nil || splitSourceGroup == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("source", splitSource))
return errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(splitSource)
splitTargetAM, splitSourceAM, err := kgm.state.checkTSOSplit(keyspaceGroupID)
if err != nil {
return err
}
splitAllocator, err := splitAM.GetAllocator(dcLocation)
splitAllocator, err := splitTargetAM.GetAllocator(dcLocation)
if err != nil {
return err
}
Expand Down Expand Up @@ -1116,9 +1141,13 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error {
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
// Pre-update the split keyspace group split state in memory.
splitGroup.SplitState = nil
kgm.kgs[id] = splitGroup
// Pre-update the split keyspace group's split state in memory.
// Note: to avoid data race with state read APIs, we always replace the group in memory as a whole.
// For now, we only have scenarios to update split state/merge state, and the other fields are always
// loaded from etcd without any modification, so we can simply copy the group and replace the state.
newSplitGroup := *splitGroup
newSplitGroup.SplitState = nil
kgm.kgs[id] = &newSplitGroup
return nil
}

Expand Down Expand Up @@ -1146,9 +1175,14 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error {
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
// Pre-update the split keyspace group split state in memory.
mergeTarget.MergeState = nil
kgm.kgs[id] = mergeTarget

// Pre-update the merge target keyspace group's merge state in memory.
// Note: to avoid data race with state read APIs, we always replace the group in memory as a whole.
// For now, we only have scenarios to update split state/merge state, and the other fields are always
// loaded from etcd without any modification, so we can simply copy the group and replace the state.
newTargetGroup := *mergeTarget
newTargetGroup.MergeState = nil
kgm.kgs[id] = &newTargetGroup
return nil
}

Expand Down Expand Up @@ -1286,15 +1320,3 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
return
}
}

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (kgm *KeyspaceGroupManager) checkTSOMerge(
keyspaceGroupID uint32,
) error {
_, group := kgm.getKeyspaceGroupMeta(keyspaceGroupID)
if !group.IsMerging() {
return nil
}
return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(keyspaceGroupID)
}

0 comments on commit c6b8e82

Please sign in to comment.