Skip to content

Commit

Permalink
enhance: [2.4] Save collection targets by batches (#31616) (#31632)
Browse files Browse the repository at this point in the history
Cherry-pick from master
pr: #31616
See also #28491 #31240

When colleciton number is large, querycoord saves collection target one
by one, which is slow and may block querycoord exits.

In local run, 500 collections scenario may lead to about 40 seconds
saving collection targets.

This PR changes the `SaveCollectionTarget` interface into batch one and
organizes the collection in 16 per bundle batches to accelerate this
procedure.

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Mar 27, 2024
1 parent beae831 commit 34f2179
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 54 deletions.
2 changes: 1 addition & 1 deletion internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ type QueryCoordCatalog interface {
RemoveResourceGroup(rgName string) error
GetResourceGroups() ([]*querypb.ResourceGroup, error)

SaveCollectionTarget(target *querypb.CollectionTarget) error
SaveCollectionTargets(target ...*querypb.CollectionTarget) error
RemoveCollectionTarget(collectionID int64) error
GetCollectionTargets() (map[int64]*querypb.CollectionTarget, error)
}
21 changes: 13 additions & 8 deletions internal/metastore/kv/querycoord/kv_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,16 +241,21 @@ func (s Catalog) ReleaseReplica(collection, replica int64) error {
return s.cli.Remove(key)
}

func (s Catalog) SaveCollectionTarget(target *querypb.CollectionTarget) error {
k := encodeCollectionTargetKey(target.GetCollectionID())
v, err := proto.Marshal(target)
if err != nil {
return err
func (s Catalog) SaveCollectionTargets(targets ...*querypb.CollectionTarget) error {
kvs := make(map[string]string)
for _, target := range targets {
k := encodeCollectionTargetKey(target.GetCollectionID())
v, err := proto.Marshal(target)
if err != nil {
return err
}
var compressed bytes.Buffer
compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
kvs[k] = compressed.String()
}

// to reduce the target size, we do compress before write to etcd
var compressed bytes.Buffer
compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
err = s.cli.Save(k, compressed.String())
err := s.cli.MultiSave(kvs)
if err != nil {
return err
}
Expand Down
34 changes: 17 additions & 17 deletions internal/metastore/kv/querycoord/kv_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,22 +203,22 @@ func (suite *CatalogTestSuite) TestResourceGroup() {
}

func (suite *CatalogTestSuite) TestCollectionTarget() {
suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{
suite.catalog.SaveCollectionTargets(&querypb.CollectionTarget{
CollectionID: 1,
Version: 1,
})
suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{
CollectionID: 2,
Version: 2,
})
suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{
CollectionID: 3,
Version: 3,
})
suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{
CollectionID: 1,
Version: 4,
})
},
&querypb.CollectionTarget{
CollectionID: 2,
Version: 2,
},
&querypb.CollectionTarget{
CollectionID: 3,
Version: 3,
},
&querypb.CollectionTarget{
CollectionID: 1,
Version: 4,
})
suite.catalog.RemoveCollectionTarget(2)

targets, err := suite.catalog.GetCollectionTargets()
Expand All @@ -230,18 +230,18 @@ func (suite *CatalogTestSuite) TestCollectionTarget() {
// test access meta store failed
mockStore := mocks.NewMetaKv(suite.T())
mockErr := errors.New("failed to access etcd")
mockStore.EXPECT().Save(mock.Anything, mock.Anything).Return(mockErr)
mockStore.EXPECT().MultiSave(mock.Anything).Return(mockErr)
mockStore.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr)

suite.catalog.cli = mockStore
err = suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{})
err = suite.catalog.SaveCollectionTargets(&querypb.CollectionTarget{})
suite.ErrorIs(err, mockErr)

_, err = suite.catalog.GetCollectionTargets()
suite.ErrorIs(err, mockErr)

// test invalid message
err = suite.catalog.SaveCollectionTarget(nil)
err = suite.catalog.SaveCollectionTargets(nil)
suite.Error(err)
}

Expand Down
43 changes: 28 additions & 15 deletions internal/metastore/mocks/mock_querycoord_catalog.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 32 additions & 4 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package meta
import (
"context"
"fmt"
"runtime"
"sync"

"github.com/cockroachdb/errors"
Expand All @@ -28,9 +29,11 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
Expand Down Expand Up @@ -594,13 +597,38 @@ func (mgr *TargetManager) SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
mgr.rwMutex.Lock()
defer mgr.rwMutex.Unlock()
if mgr.current != nil {
// use pool here to control maximal writer used by save target
pool := conc.NewPool[any](runtime.GOMAXPROCS(0) * 2)
// use batch write in case of the number of collections is large
batchSize := 16
var wg sync.WaitGroup
submit := func(tasks []typeutil.Pair[int64, *querypb.CollectionTarget]) {
wg.Add(1)
pool.Submit(func() (any, error) {
defer wg.Done()
ids := lo.Map(tasks, func(p typeutil.Pair[int64, *querypb.CollectionTarget], _ int) int64 { return p.A })
if err := catalog.SaveCollectionTargets(lo.Map(tasks, func(p typeutil.Pair[int64, *querypb.CollectionTarget], _ int) *querypb.CollectionTarget {
return p.B
})...); err != nil {
log.Warn("failed to save current target for collection", zap.Int64s("collectionIDs", ids), zap.Error(err))
} else {
log.Info("succeed to save current target for collection", zap.Int64s("collectionIDs", ids))
}
return nil, nil
})
}
tasks := make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize)
for id, target := range mgr.current.collectionTargetMap {
if err := catalog.SaveCollectionTarget(target.toPbMsg()); err != nil {
log.Warn("failed to save current target for collection", zap.Int64("collectionID", id), zap.Error(err))
} else {
log.Warn("succeed to save current target for collection", zap.Int64("collectionID", id))
tasks = append(tasks, typeutil.NewPair(id, target.toPbMsg()))
if len(tasks) >= batchSize {
submit(tasks)
tasks = make([]typeutil.Pair[int64, *querypb.CollectionTarget], 0, batchSize)
}
}
if len(tasks) > 0 {
submit(tasks)
}
wg.Wait()
}
}

Expand Down
20 changes: 11 additions & 9 deletions tests/integration/coordrecovery/coord_recovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
Expand Down Expand Up @@ -146,8 +147,8 @@ func (s *CoordSwitchSuite) checkCollections() bool {
TimeStamp: 0, // means now
}
resp, err := s.Cluster.Proxy.ShowCollections(context.TODO(), req)
s.NoError(err)
s.Equal(len(resp.CollectionIds), numCollections)
s.Require().NoError(merr.CheckRPCCall(resp, err))
s.Require().Equal(len(resp.CollectionIds), numCollections)
notLoaded := 0
loaded := 0
for _, name := range resp.CollectionNames {
Expand Down Expand Up @@ -181,7 +182,7 @@ func (s *CoordSwitchSuite) search(collectionName string, dim int) {
GuaranteeTimestamp: 0,
}
queryResult, err := c.Proxy.Query(context.TODO(), queryReq)
s.NoError(err)
s.Require().NoError(merr.CheckRPCCall(queryResult, err))
s.Equal(len(queryResult.FieldsData), 1)
numEntities := queryResult.FieldsData[0].GetScalars().GetLongData().Data[0]
s.Equal(numEntities, int64(rowsPerCollection))
Expand All @@ -198,10 +199,9 @@ func (s *CoordSwitchSuite) search(collectionName string, dim int) {
searchReq := integration.ConstructSearchRequest("", collectionName, expr,
integration.FloatVecField, schemapb.DataType_FloatVector, nil, metric.IP, params, nq, dim, topk, roundDecimal)

searchResult, _ := c.Proxy.Search(context.TODO(), searchReq)
searchResult, err := c.Proxy.Search(context.TODO(), searchReq)

err = merr.Error(searchResult.GetStatus())
s.NoError(err)
s.NoError(merr.CheckRPCCall(searchResult, err))
}

func (s *CoordSwitchSuite) insertBatchCollections(prefix string, collectionBatchSize, idxStart, dim int, wg *sync.WaitGroup) {
Expand Down Expand Up @@ -229,7 +229,7 @@ func (s *CoordSwitchSuite) setupData() {
}
wg.Wait()
log.Info("=========================Data injection finished=========================")
s.checkCollections()
s.Require().True(s.checkCollections())
log.Info(fmt.Sprintf("=========================start to search %s=========================", searchName))
s.search(searchName, Dim)
log.Info("=========================Search finished=========================")
Expand All @@ -238,11 +238,13 @@ func (s *CoordSwitchSuite) setupData() {
func (s *CoordSwitchSuite) switchCoord() float64 {
var err error
c := s.Cluster
start := time.Now()
log.Info("=========================Stopping Coordinators========================")
c.RootCoord.Stop()
c.DataCoord.Stop()
c.QueryCoord.Stop()
log.Info("=========================Coordinators stopped=========================")
start := time.Now()
log.Info("=========================Coordinators stopped=========================", zap.Duration("elapsed", time.Since(start)))
start = time.Now()

c.RootCoord, err = grpcrootcoord.NewServer(context.TODO(), c.GetFactory())
s.NoError(err)
Expand Down

0 comments on commit 34f2179

Please sign in to comment.