Skip to content

Commit

Permalink
Fix the refresh may be notified finished early (#24438) (#24466)
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <yah2er0ne@outlook.com>
  • Loading branch information
yah01 committed May 29, 2023
1 parent 2bd8be4 commit c314d54
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
8 changes: 6 additions & 2 deletions internal/querycoordv2/meta/target_manager.go
Expand Up @@ -59,7 +59,10 @@ func NewTargetManager(broker Broker, meta *Meta) *TargetManager {
}
}

func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64, partitionIDs ...int64) {
// UpdateCollectionCurrentTarget updates the current target to next target,
// WARN: DO NOT call this method for an existing collection as target observer running, or it will lead to a double-update,
// which may make the current target not available
func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64, partitionIDs ...int64) bool {
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
log := log.With(zap.Int64("collectionID", collectionID),
Expand All @@ -70,14 +73,15 @@ func (mgr *TargetManager) UpdateCollectionCurrentTarget(collectionID int64, part
newTarget := mgr.next.getCollectionTarget(collectionID)
if newTarget == nil || newTarget.IsEmpty() {
log.Info("next target does not exist, skip it")
return
return false
}
mgr.current.updateCollectionTarget(collectionID, newTarget)
mgr.next.removeCollectionTarget(collectionID)

log.Debug("finish to update current target for collection",
zap.Int64s("segments", newTarget.GetAllSegmentIDs()),
zap.Strings("channels", newTarget.GetAllDmChannelNames()))
return true
}

// UpdateCollectionNextTargetWithPartitions for collection_loading request, which offer partitionIDs outside
Expand Down
25 changes: 12 additions & 13 deletions internal/querycoordv2/observers/target_observer.go
Expand Up @@ -259,18 +259,17 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool {
}

func (ob *TargetObserver) updateCurrentTarget(collectionID int64) {
log.Warn("observer trigger update current target",
zap.Int64("collectionID", collectionID))
ob.targetMgr.UpdateCollectionCurrentTarget(collectionID)

ob.mut.Lock()
defer ob.mut.Unlock()
notifiers := ob.readyNotifiers[collectionID]
for _, notifier := range notifiers {
close(notifier)
}
// Reuse the capacity of notifiers slice
if notifiers != nil {
ob.readyNotifiers[collectionID] = notifiers[:0]
log.Info("observer trigger update current target", zap.Int64("collectionID", collectionID))
if ob.targetMgr.UpdateCollectionCurrentTarget(collectionID) {
ob.mut.Lock()
defer ob.mut.Unlock()
notifiers := ob.readyNotifiers[collectionID]
for _, notifier := range notifiers {
close(notifier)
}
// Reuse the capacity of notifiers slice
if notifiers != nil {
ob.readyNotifiers[collectionID] = notifiers[:0]
}
}
}

0 comments on commit c314d54

Please sign in to comment.