Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cherry-Pick] Fix current target may be updated to an invalid target (#21742) #21762

Merged
merged 1 commit into from Jan 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 5 additions & 7 deletions internal/querycoordv2/observers/collection_observer.go
Expand Up @@ -37,7 +37,7 @@ type CollectionObserver struct {
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
broker meta.Broker
targetObserver *TargetObserver
collectionLoadedCount map[int64]int
partitionLoadedCount map[int64]int

Expand All @@ -50,14 +50,14 @@ func NewCollectionObserver(
dist *meta.DistributionManager,
meta *meta.Meta,
targetMgr *meta.TargetManager,
broker meta.Broker,
targetObserver *TargetObserver,
) *CollectionObserver {
return &CollectionObserver{
stopCh: make(chan struct{}),
dist: dist,
meta: meta,
targetMgr: targetMgr,
broker: broker,
targetObserver: targetObserver,
collectionLoadedCount: make(map[int64]int),
partitionLoadedCount: make(map[int64]int),

Expand Down Expand Up @@ -208,9 +208,8 @@ func (ob *CollectionObserver) observeCollectionLoadStatus(collection *meta.Colle
return
}
ob.collectionLoadedCount[collection.GetCollectionID()] = loadedCount
if updated.LoadPercentage == 100 {
if updated.LoadPercentage == 100 && ob.targetObserver.Check(updated.GetCollectionID()) {
delete(ob.collectionLoadedCount, collection.GetCollectionID())
ob.targetMgr.UpdateCollectionCurrentTarget(updated.CollectionID)
updated.Status = querypb.LoadStatus_Loaded
ob.meta.CollectionManager.UpdateCollection(updated)

Expand Down Expand Up @@ -272,9 +271,8 @@ func (ob *CollectionObserver) observePartitionLoadStatus(partition *meta.Partiti
return
}
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
if updated.LoadPercentage == 100 {
if updated.LoadPercentage == 100 && ob.targetObserver.Check(updated.GetCollectionID()) {
delete(ob.partitionLoadedCount, partition.GetPartitionID())
ob.targetMgr.UpdateCollectionCurrentTarget(partition.GetCollectionID(), partition.GetPartitionID())
updated.Status = querypb.LoadStatus_Loaded
ob.meta.CollectionManager.PutPartition(updated)

Expand Down
30 changes: 22 additions & 8 deletions internal/querycoordv2/observers/collection_observer_test.go
Expand Up @@ -55,10 +55,11 @@ type CollectionObserverSuite struct {
store meta.Store

// Dependencies
dist *meta.DistributionManager
meta *meta.Meta
targetMgr *meta.TargetManager
broker *meta.MockBroker
dist *meta.DistributionManager
meta *meta.Meta
broker *meta.MockBroker
targetMgr *meta.TargetManager
targetObserver *TargetObserver

// Test object
ob *CollectionObserver
Expand Down Expand Up @@ -175,21 +176,30 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.meta = meta.NewMeta(suite.idAllocator, suite.store)
suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.targetObserver = NewTargetObserver(suite.meta,
suite.targetMgr,
suite.dist,
suite.broker,
)

// Test object
suite.ob = NewCollectionObserver(
suite.dist,
suite.meta,
suite.targetMgr,
suite.broker,
suite.targetObserver,
)

Params.QueryCoordCfg.LoadTimeoutSeconds = 600 * time.Second
for _, collection := range suite.collections {
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe()
}
suite.targetObserver.Start(context.Background())

suite.loadAll()
}

func (suite *CollectionObserverSuite) TearDownTest() {
suite.targetObserver.Stop()
suite.ob.Stop()
suite.kv.Close()
}
Expand Down Expand Up @@ -356,8 +366,12 @@ func (suite *CollectionObserverSuite) load(collection int64) {
})

}
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, collection, int64(1)).Return(dmChannels, allSegments, nil)
suite.targetMgr.UpdateCollectionNextTargetWithPartitions(collection, int64(1))

partitions := suite.partitions[collection]
for _, partition := range partitions {
suite.broker.EXPECT().GetRecoveryInfo(mock.Anything, collection, partition).Return(dmChannels, allSegments, nil)
}
suite.targetMgr.UpdateCollectionNextTargetWithPartitions(collection, partitions...)
}

func TestCollectionObserver(t *testing.T) {
Expand Down
59 changes: 40 additions & 19 deletions internal/querycoordv2/observers/target_observer.go
Expand Up @@ -30,6 +30,11 @@ import (
"github.com/milvus-io/milvus/internal/util/typeutil"
)

type checkRequest struct {
CollectionID int64
Notifier chan bool
}

type targetUpdateRequest struct {
CollectionID int64
Notifier chan error
Expand All @@ -44,6 +49,7 @@ type TargetObserver struct {
distMgr *meta.DistributionManager
broker meta.Broker

manualCheck chan checkRequest
nextTargetLastUpdate map[int64]time.Time
updateChan chan targetUpdateRequest
mut sync.Mutex // Guard readyNotifiers
Expand All @@ -59,6 +65,7 @@ func NewTargetObserver(meta *meta.Meta, targetMgr *meta.TargetManager, distMgr *
targetMgr: targetMgr,
distMgr: distMgr,
broker: broker,
manualCheck: make(chan checkRequest, 10),
nextTargetLastUpdate: make(map[int64]time.Time),
updateChan: make(chan targetUpdateRequest),
readyNotifiers: make(map[int64][]chan struct{}),
Expand Down Expand Up @@ -95,21 +102,48 @@ func (ob *TargetObserver) schedule(ctx context.Context) {
ob.clean()
ob.tryUpdateTarget()

case request := <-ob.updateChan:
err := ob.updateNextTarget(request.CollectionID)
case req := <-ob.manualCheck:
ob.check(req.CollectionID)
req.Notifier <- ob.targetMgr.IsCurrentTargetExist(req.CollectionID)

case req := <-ob.updateChan:
err := ob.updateNextTarget(req.CollectionID)
if err != nil {
close(request.ReadyNotifier)
close(req.ReadyNotifier)
} else {
ob.mut.Lock()
ob.readyNotifiers[request.CollectionID] = append(ob.readyNotifiers[request.CollectionID], request.ReadyNotifier)
ob.readyNotifiers[req.CollectionID] = append(ob.readyNotifiers[req.CollectionID], req.ReadyNotifier)
ob.mut.Unlock()
}

request.Notifier <- err
req.Notifier <- err
}
}
}

// Check checks whether the next target is ready,
// and updates the current target if it is,
// returns true if current target is not nil
func (ob *TargetObserver) Check(collectionID int64) bool {
notifier := make(chan bool)
ob.manualCheck <- checkRequest{
CollectionID: collectionID,
Notifier: notifier,
}
return <-notifier
}

func (ob *TargetObserver) check(collectionID int64) {
if ob.shouldUpdateCurrentTarget(collectionID) {
ob.updateCurrentTarget(collectionID)
}

if ob.shouldUpdateNextTarget(collectionID) {
// update next target in collection level
ob.updateNextTarget(collectionID)
}
}

// UpdateNextTarget updates the next target,
// returns a channel which will be closed when the next target is ready,
// or returns error if failed to pull target
Expand Down Expand Up @@ -138,14 +172,7 @@ func (ob *TargetObserver) ReleaseCollection(collectionID int64) {
func (ob *TargetObserver) tryUpdateTarget() {
collections := ob.meta.GetAll()
for _, collectionID := range collections {
if ob.shouldUpdateCurrentTarget(collectionID) {
ob.updateCurrentTarget(collectionID)
}

if ob.shouldUpdateNextTarget(collectionID) {
// update next target in collection level
ob.updateNextTarget(collectionID)
}
ob.check(collectionID)
}

collectionSet := typeutil.NewUniqueSet(collections...)
Expand Down Expand Up @@ -199,12 +226,6 @@ func (ob *TargetObserver) updateNextTargetTimestamp(collectionID int64) {
}

func (ob *TargetObserver) shouldUpdateCurrentTarget(collectionID int64) bool {
// Collection observer will update the current target as loading done,
// avoid double updating, which will cause update current target to a unfinished next target
if !ob.targetMgr.IsCurrentTargetExist(collectionID) {
return false
}

replicaNum := ob.meta.CollectionManager.GetReplicaNumber(collectionID)

// check channel first
Expand Down
12 changes: 6 additions & 6 deletions internal/querycoordv2/server.go
Expand Up @@ -306,12 +306,6 @@ func (s *Server) initMeta() error {

func (s *Server) initObserver() {
log.Info("init observers")
s.collectionObserver = observers.NewCollectionObserver(
s.dist,
s.meta,
s.targetMgr,
s.broker,
)
s.leaderObserver = observers.NewLeaderObserver(
s.dist,
s.meta,
Expand All @@ -324,6 +318,12 @@ func (s *Server) initObserver() {
s.dist,
s.broker,
)
s.collectionObserver = observers.NewCollectionObserver(
s.dist,
s.meta,
s.targetMgr,
s.targetObserver,
)
}

func (s *Server) afterStart() {
Expand Down
13 changes: 13 additions & 0 deletions internal/querycoordv2/server_test.go
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/dist"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/mocks"
"github.com/milvus-io/milvus/internal/querycoordv2/observers"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/util/dependency"
Expand Down Expand Up @@ -448,6 +449,18 @@ func (suite *ServerSuite) hackServer() {
suite.server.balancer,
suite.server.taskScheduler,
)
suite.server.targetObserver = observers.NewTargetObserver(
suite.server.meta,
suite.server.targetMgr,
suite.server.dist,
suite.broker,
)
suite.server.collectionObserver = observers.NewCollectionObserver(
suite.server.dist,
suite.server.meta,
suite.server.targetMgr,
suite.server.targetObserver,
)

suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(&schemapb.CollectionSchema{}, nil).Maybe()
for _, collection := range suite.collections {
Expand Down