Skip to content

Commit

Permalink
enhance: Speed up target recovery after query coord restart (#31240) (#…
Browse files Browse the repository at this point in the history
…31449)

issue: #28491
pr: #31240

after querycoord restart, it will pull a new target, which include
channel and segment list. when segments loaded on querynode has reached
the target, the collection could provide search/query. but if segment
list changes by time, ater querycoord pull a new target, it will takes a
few minutes to catch up the target's segment distribution. and before
that, query/search will fail due to lack of segments.

This PR save the current loaded target to meta storein querycoord's stop
progress, and recover it when query coord starts, to speed up the target
recovery time.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Mar 22, 2024
1 parent f8496db commit 0bf595a
Show file tree
Hide file tree
Showing 9 changed files with 481 additions and 6 deletions.
4 changes: 4 additions & 0 deletions internal/metastore/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,8 @@ type QueryCoordCatalog interface {
SaveResourceGroup(rgs ...*querypb.ResourceGroup) error
RemoveResourceGroup(rgName string) error
GetResourceGroups() ([]*querypb.ResourceGroup, error)

SaveCollectionTarget(target *querypb.CollectionTarget) error
RemoveCollectionTarget(collectionID int64) error
GetCollectionTargets() (map[int64]*querypb.CollectionTarget, error)
}
55 changes: 54 additions & 1 deletion internal/metastore/kv/querycoord/kv_catalog.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
package querycoord

import (
"bytes"
"fmt"
"io"

"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/klauspost/compress/zstd"
"github.com/pingcap/log"
"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/kv"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/pkg/util/compressor"
)

var ErrInvalidKey = errors.New("invalid load info key")
Expand All @@ -22,7 +28,8 @@ const (
ReplicaMetaPrefixV1 = "queryCoord-ReplicaMeta"
ResourceGroupPrefix = "queryCoord-ResourceGroup"

MetaOpsBatchSize = 128
MetaOpsBatchSize = 128
CollectionTargetPrefix = "queryCoord-Collection-Target"
)

type Catalog struct {
Expand Down Expand Up @@ -234,6 +241,48 @@ func (s Catalog) ReleaseReplica(collection, replica int64) error {
return s.cli.Remove(key)
}

func (s Catalog) SaveCollectionTarget(target *querypb.CollectionTarget) error {
k := encodeCollectionTargetKey(target.GetCollectionID())
v, err := proto.Marshal(target)
if err != nil {
return err
}
// to reduce the target size, we do compress before write to etcd
var compressed bytes.Buffer
compressor.ZstdCompress(bytes.NewReader(v), io.Writer(&compressed), zstd.WithEncoderLevel(zstd.SpeedBetterCompression))
err = s.cli.Save(k, compressed.String())
if err != nil {
return err
}
return nil
}

func (s Catalog) RemoveCollectionTarget(collectionID int64) error {
k := encodeCollectionTargetKey(collectionID)
return s.cli.Remove(k)
}

func (s Catalog) GetCollectionTargets() (map[int64]*querypb.CollectionTarget, error) {
keys, values, err := s.cli.LoadWithPrefix(CollectionTargetPrefix)
if err != nil {
return nil, err
}
ret := make(map[int64]*querypb.CollectionTarget)
for i, v := range values {
var decompressed bytes.Buffer
compressor.ZstdDecompress(bytes.NewReader([]byte(v)), io.Writer(&decompressed))
target := &querypb.CollectionTarget{}
if err := proto.Unmarshal(decompressed.Bytes(), target); err != nil {
// recover target from meta is a optimize policy, skip when failure happens
log.Warn("failed to unmarshal collection target", zap.String("key", keys[i]), zap.Error(err))
continue
}
ret[target.GetCollectionID()] = target
}

return ret, nil
}

func EncodeCollectionLoadInfoKey(collection int64) string {
return fmt.Sprintf("%s/%d", CollectionLoadInfoPrefix, collection)
}
Expand All @@ -253,3 +302,7 @@ func encodeCollectionReplicaKey(collection int64) string {
func encodeResourceGroupKey(rgName string) string {
return fmt.Sprintf("%s/%s", ResourceGroupPrefix, rgName)
}

func encodeCollectionTargetKey(collection int64) string {
return fmt.Sprintf("%s/%d", CollectionTargetPrefix, collection)
}
46 changes: 46 additions & 0 deletions internal/metastore/kv/querycoord/kv_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"sort"
"testing"

"github.com/cockroachdb/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/kv/mocks"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/util/etcd"
Expand Down Expand Up @@ -199,6 +202,49 @@ func (suite *CatalogTestSuite) TestResourceGroup() {
suite.Equal([]int64{4, 5}, groups[1].GetNodes())
}

func (suite *CatalogTestSuite) TestCollectionTarget() {
suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{
CollectionID: 1,
Version: 1,
})
suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{
CollectionID: 2,
Version: 2,
})
suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{
CollectionID: 3,
Version: 3,
})
suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{
CollectionID: 1,
Version: 4,
})
suite.catalog.RemoveCollectionTarget(2)

targets, err := suite.catalog.GetCollectionTargets()
suite.NoError(err)
suite.Len(targets, 2)
suite.Equal(int64(4), targets[1].Version)
suite.Equal(int64(3), targets[3].Version)

// test access meta store failed
mockStore := mocks.NewMetaKv(suite.T())
mockErr := errors.New("failed to access etcd")
mockStore.EXPECT().Save(mock.Anything, mock.Anything).Return(mockErr)
mockStore.EXPECT().LoadWithPrefix(mock.Anything).Return(nil, nil, mockErr)

suite.catalog.cli = mockStore
err = suite.catalog.SaveCollectionTarget(&querypb.CollectionTarget{})
suite.ErrorIs(err, mockErr)

_, err = suite.catalog.GetCollectionTargets()
suite.ErrorIs(err, mockErr)

// test invalid message
err = suite.catalog.SaveCollectionTarget(nil)
suite.Error(err)
}

func (suite *CatalogTestSuite) TestLoadRelease() {
// TODO(sunby): add ut
}
Expand Down
137 changes: 137 additions & 0 deletions internal/metastore/mocks/mock_querycoord_catalog.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -636,3 +636,27 @@ message DeleteRequest {
repeated uint64 timestamps = 7;
DataScope scope = 8;
}

message SegmentTarget {
int64 ID = 1;
data.SegmentLevel level = 2;
}

message PartitionTarget {
int64 partitionID = 1;
repeated SegmentTarget segments = 2;
}

message ChannelTarget {
string channelName = 1;
repeated int64 dropped_segmentIDs = 2;
repeated int64 growing_segmentIDs = 3;
repeated PartitionTarget partition_targets = 4;
msg.MsgPosition seek_position = 5;
}

message CollectionTarget {
int64 collectionID = 1;
repeated ChannelTarget Channel_targets = 2;
int64 version = 3;
}
Loading

0 comments on commit 0bf595a

Please sign in to comment.