Skip to content

Commit

Permalink
enhance: Speed up target recovery after query coord restart
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Mar 13, 2024
1 parent b5c6794 commit b8e3242
Show file tree
Hide file tree
Showing 13 changed files with 702 additions and 11 deletions.
4 changes: 4 additions & 0 deletions internal/distributed/querycoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,10 @@ func (s *Server) start() error {
return s.queryCoord.Start()
}

func (s *Server) GetQueryCoord() types.QueryCoordComponent {
return s.queryCoord
}

// Stop stops QueryCoord's grpc service.
func (s *Server) Stop() (err error) {
Params := &paramtable.Get().QueryCoordGrpcServerCfg
Expand Down
4 changes: 4 additions & 0 deletions internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,8 @@ type QueryCoordCatalog interface {
SaveResourceGroup(rgs ...*querypb.ResourceGroup) error
RemoveResourceGroup(rgName string) error
GetResourceGroups() ([]*querypb.ResourceGroup, error)

SaveCollectionTarget(target *querypb.CollectionTarget) error
RemoveCollectionTarget(collectionID int64) error
GetCollectionTargets() (map[int64]*querypb.CollectionTarget, error)
}
51 changes: 50 additions & 1 deletion internal/metastore/kv/querycoord/kv_catalog.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package querycoord

import (
"bytes"
"fmt"
"io"

"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/klauspost/compress/zstd"
"github.com/samber/lo"

"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/compressor"
)

var ErrInvalidKey = errors.New("invalid load info key")
Expand All @@ -22,7 +26,8 @@ const (
ReplicaMetaPrefixV1 = "queryCoord-ReplicaMeta"
ResourceGroupPrefix = "queryCoord-ResourceGroup"

MetaOpsBatchSize = 128
MetaOpsBatchSize = 128
CollectionTargetPrefix = "queryCoord-Collection-Target"
)

type Catalog struct {
Expand Down Expand Up @@ -234,6 +239,46 @@ func (s Catalog) ReleaseReplica(collection, replica int64) error {
return s.cli.Remove(key)
}

func (s Catalog) SaveCollectionTarget(target *querypb.CollectionTarget) error {
k := encodeCollectionTargetKey(target.GetCollectionID())
v, err := proto.Marshal(target)
if err != nil {
return err
}
// to reduce the target size, we do compress before write to etcd
var compressed bytes.Buffer
compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
err = s.cli.Save(k, compressed.String())
if err != nil {
return err
}
return nil
}

func (s Catalog) RemoveCollectionTarget(collectionID int64) error {
k := encodeCollectionTargetKey(collectionID)
return s.cli.Remove(k)
}

func (s Catalog) GetCollectionTargets() (map[int64]*querypb.CollectionTarget, error) {
_, values, err := s.cli.LoadWithPrefix(CollectionTargetPrefix)
if err != nil {
return nil, err
}
ret := make(map[int64]*querypb.CollectionTarget)
for _, v := range values {
var decompressed bytes.Buffer
compressor.ZstdDecompress(bytes.NewReader([]byte(v)), io.Writer(&decompressed))
target := &querypb.CollectionTarget{}
if err := proto.Unmarshal(decompressed.Bytes(), target); err != nil {
return nil, err
}
ret[target.GetCollectionID()] = target
}

return ret, nil
}

func EncodeCollectionLoadInfoKey(collection int64) string {
return fmt.Sprintf("%s/%d", CollectionLoadInfoPrefix, collection)
}
Expand All @@ -253,3 +298,7 @@ func encodeCollectionReplicaKey(collection int64) string {
func encodeResourceGroupKey(rgName string) string {
return fmt.Sprintf("%s/%s", ResourceGroupPrefix, rgName)
}

func encodeCollectionTargetKey(collection int64) string {
return fmt.Sprintf("%s/%d", CollectionTargetPrefix, collection)
}
46 changes: 46 additions & 0 deletions internal/metastore/kv/querycoord/kv_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"sort"
"testing"

"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/util/etcd"
Expand Down Expand Up @@ -199,6 +202,49 @@ func (suite *CatalogTestSuite) TestResourceGroup() {
suite.Equal([]int64{4, 5}, groups[1].GetNodes())
}

func (suite *CatalogTestSuite) TestCollectionTarget() {
suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{
CollectionID: 1,
Version: 1,
})
suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{
CollectionID: 2,
Version: 2,
})
suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{
CollectionID: 3,
Version: 3,
})
suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{
CollectionID: 1,
Version: 4,
})
suite.catalog.RemoveCollectionTarget(2)

targets, err := suite.catalog.GetCollectionTargets()
suite.NoError(err)
suite.Len(targets, 2)
suite.Equal(int64(4), targets[1].Version)
suite.Equal(int64(3), targets[3].Version)

// test access meta store failed
mockStore := mocks.NewMetaKv(suite.T())
mockErr := errors.New("failed to access etcd")
mockStore.EXPECT().Save(mock.Anything, mock.Anything).Return(mockErr)
mockStore.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr)

suite.catalog.cli = mockStore
err = suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{})
suite.ErrorIs(err, mockErr)

_, err = suite.catalog.GetCollectionTargets()
suite.ErrorIs(err, mockErr)

// test invalid message
err = suite.catalog.SaveCollectionTarget(nil)
suite.Error(err)
}

func (suite *CatalogTestSuite) TestLoadRelease() {
// TODO(sunby): add ut
}
Expand Down
137 changes: 137 additions & 0 deletions internal/metastore/mocks/mock_querycoord_catalog.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -767,3 +767,27 @@ message CheckerInfo {
bool activated = 3;
bool found = 4;
}

message SegmentTarget {
int64 ID = 1;
data.SegmentLevel level = 2;
}

message PartitionTarget {
int64 partitionID = 1;
repeated SegmentTarget segments = 2;
}

message ChannelTarget {
string channelName = 1;
repeated int64 dropped_segmentIDs = 2;
repeated int64 growing_segmentIDs = 3;
repeated PartitionTarget partition_targets = 4;
msg.MsgPosition seek_position = 5;
}

message CollectionTarget {
int64 collectionID = 1;
repeated ChannelTarget Channel_targets = 2;
int64 version = 3;
}
Loading

0 comments on commit b8e3242

Please sign in to comment.