From 412ccfbb2063d6e13da72e4c6003f32c9af0307b Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Wed, 5 Jun 2024 19:35:52 +0800 Subject: [PATCH] enhance: Refine IndexNode code and ensure compatibility (#33458) issue: #33432 , #33183 Signed-off-by: Cai Zhang --- internal/indexnode/index_test.go | 163 ++++- internal/indexnode/indexnode_service.go | 40 +- internal/indexnode/indexnode_test.go | 792 +++++++++++------------- internal/indexnode/task.go | 400 +++++------- internal/indexnode/task_test.go | 202 +----- internal/indexnode/taskinfo_ops.go | 6 - 6 files changed, 687 insertions(+), 916 deletions(-) diff --git a/internal/indexnode/index_test.go b/internal/indexnode/index_test.go index 235d00a908a3..11a6a2bd8be3 100644 --- a/internal/indexnode/index_test.go +++ b/internal/indexnode/index_test.go @@ -1,14 +1,18 @@ package indexnode -import "math/rand" +import ( + "fmt" + "math/rand" -const ( - dim = 8 - nb = 10000 - nprobe = 8 + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) -func generateFloatVectors() []float32 { +func generateFloatVectors(nb, dim int) []float32 { vectors := make([]float32, 0) for i := 0; i < nb; i++ { for j := 0; j < dim; j++ { @@ -18,12 +22,147 @@ func generateFloatVectors() []float32 { return vectors } -func generateBinaryVectors() []byte { - vectors := make([]byte, 0) - for i := 0; i < nb; i++ { - for j := 0; j < dim/8; j++ { - vectors = append(vectors, byte(rand.Intn(8))) +func generateTestSchema() *schemapb.CollectionSchema { + schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{ + {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, + {FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64}, + {FieldID: 10, Name: "bool", DataType: schemapb.DataType_Bool}, + {FieldID: 11, Name: "int8", DataType: schemapb.DataType_Int8}, + {FieldID: 12, Name: "int16", DataType: schemapb.DataType_Int16}, + {FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64}, + {FieldID: 14, Name: "float", DataType: schemapb.DataType_Float}, + {FieldID: 15, Name: "double", DataType: schemapb.DataType_Double}, + {FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar}, + {FieldID: 17, Name: "string", DataType: schemapb.DataType_String}, + {FieldID: 18, Name: "array", DataType: schemapb.DataType_Array}, + {FieldID: 19, Name: "string", DataType: schemapb.DataType_JSON}, + {FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32}, + {FieldID: 102, Name: "floatVector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }}, + {FieldID: 106, Name: "sparseFloatVector", DataType: schemapb.DataType_SparseFloatVector, TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "28433"}, + }}, + }} + + return schema +} + +func generateTestData(collID, partID, segID int64, num int) ([]*Blob, error) { + insertCodec := storage.NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: collID, Schema: generateTestSchema()}) + + var ( + field0 []int64 + field1 []int64 + + field10 []bool + field11 []int8 + field12 []int16 + field13 []int64 + field14 []float32 + field15 []float64 + field16 []string + field17 []string + field18 []*schemapb.ScalarField + field19 [][]byte + + field101 []int32 + field102 []float32 + field103 []byte + + field104 []byte + field105 []byte + field106 [][]byte + ) + + for i := 1; i <= num; i++ { + field0 = append(field0, int64(i)) + field1 = append(field1, int64(i)) + field10 = append(field10, true) + field11 = append(field11, int8(i)) + field12 = append(field12, int16(i)) + field13 = append(field13, int64(i)) + field14 = append(field14, float32(i)) + field15 = append(field15, float64(i)) + field16 = append(field16, fmt.Sprint(i)) + field17 = append(field17, fmt.Sprint(i)) + + arr := &schemapb.ScalarField{ + Data: &schemapb.ScalarField_IntData{ + IntData: &schemapb.IntArray{Data: []int32{int32(i), int32(i), int32(i)}}, + }, } + field18 = append(field18, arr) + + field19 = append(field19, []byte{byte(i)}) + field101 = append(field101, int32(i)) + + f102 := make([]float32, 8) + for j := range f102 { + f102[j] = float32(i) + } + + field102 = append(field102, f102...) + field103 = append(field103, 0xff) + + f104 := make([]byte, 16) + for j := range f104 { + f104[j] = byte(i) + } + field104 = append(field104, f104...) + field105 = append(field105, f104...) + + field106 = append(field106, typeutil.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4})) } - return vectors + + data := &storage.InsertData{Data: map[int64]storage.FieldData{ + common.RowIDField: &storage.Int64FieldData{Data: field0}, + common.TimeStampField: &storage.Int64FieldData{Data: field1}, + + 10: &storage.BoolFieldData{Data: field10}, + 11: &storage.Int8FieldData{Data: field11}, + 12: &storage.Int16FieldData{Data: field12}, + 13: &storage.Int64FieldData{Data: field13}, + 14: &storage.FloatFieldData{Data: field14}, + 15: &storage.DoubleFieldData{Data: field15}, + 16: &storage.StringFieldData{Data: field16}, + 17: &storage.StringFieldData{Data: field17}, + 18: &storage.ArrayFieldData{Data: field18}, + 19: &storage.JSONFieldData{Data: field19}, + 101: &storage.Int32FieldData{Data: field101}, + 102: &storage.FloatVectorFieldData{ + Data: field102, + Dim: 8, + }, + 103: &storage.BinaryVectorFieldData{ + Data: field103, + Dim: 8, + }, + 104: &storage.Float16VectorFieldData{ + Data: field104, + Dim: 8, + }, + 105: &storage.BFloat16VectorFieldData{ + Data: field105, + Dim: 8, + }, + 106: &storage.SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 28433, + Contents: field106, + }, + }, + }} + + blobs, err := insertCodec.Serialize(partID, segID, data) + return blobs, err } diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index fb9d5a0cc19a..ff54fb02b950 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -21,7 +21,6 @@ import ( "fmt" "strconv" - "github.com/golang/protobuf/proto" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" @@ -36,7 +35,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -100,35 +98,9 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest } var task task if Params.CommonCfg.EnableStorageV2.GetAsBool() { - task = &indexBuildTaskV2{ - indexBuildTask: &indexBuildTask{ - ident: fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID), - ctx: taskCtx, - cancel: taskCancel, - BuildID: req.GetBuildID(), - ClusterID: req.GetClusterID(), - node: i, - req: req, - cm: cm, - nodeID: i.GetNodeID(), - tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)), - serializedSize: 0, - }, - } + task = newIndexBuildTaskV2(taskCtx, taskCancel, req, i) } else { - task = &indexBuildTask{ - ident: fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID), - ctx: taskCtx, - cancel: taskCancel, - BuildID: req.GetBuildID(), - ClusterID: req.GetClusterID(), - node: i, - req: req, - cm: cm, - nodeID: i.GetNodeID(), - tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)), - serializedSize: 0, - } + task = newIndexBuildTask(taskCtx, taskCancel, req, cm, i) } ret := merr.Success() if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil { @@ -222,6 +194,7 @@ func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest) return merr.Success(), nil } +// GetJobStats should be GetSlots func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) { if err := i.lifetime.Add(merr.IsHealthyOrStopping); err != nil { log.Ctx(ctx).Warn("index node not ready", zap.Error(err)) @@ -231,12 +204,6 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq } defer i.lifetime.Done() unissued, active := i.sched.IndexBuildQueue.GetTaskNum() - jobInfos := make([]*indexpb.JobInfo, 0) - i.foreachTaskInfo(func(ClusterID string, buildID UniqueID, info *taskInfo) { - if info.statistic != nil { - jobInfos = append(jobInfos, proto.Clone(info.statistic).(*indexpb.JobInfo)) - } - }) slots := 0 if i.sched.buildParallel > unissued+active { slots = i.sched.buildParallel - unissued - active @@ -252,7 +219,6 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq InProgressJobNum: int64(active), EnqueueJobNum: int64(unissued), TaskSlots: int64(slots), - JobInfos: jobInfos, EnableDisk: Params.IndexNodeCfg.EnableDisk.GetAsBool(), }, nil } diff --git a/internal/indexnode/indexnode_test.go b/internal/indexnode/indexnode_test.go index 5c2ff6ccebac..20aadd172b6e 100644 --- a/internal/indexnode/indexnode_test.go +++ b/internal/indexnode/indexnode_test.go @@ -19,442 +19,23 @@ package indexnode import ( "context" "os" + "strconv" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" + "github.com/milvus-io/milvus/internal/proto/indexpb" + "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) -//func TestRegister(t *testing.T) { -// var ( -// factory = &mockFactory{} -// ctx = context.TODO() -// ) -// Params.Init() -// in, err := NewIndexNode(ctx, factory) -// assert.NoError(t, err) -// in.SetEtcdClient(getEtcdClient()) -// assert.Nil(t, in.initSession()) -// assert.Nil(t, in.Register()) -// key := in.session.ServerName -// if !in.session.Exclusive { -// key = fmt.Sprintf("%s-%d", key, in.session.ServerID) -// } -// resp, err := getEtcdClient().Get(ctx, path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot, key)) -// assert.NoError(t, err) -// assert.Equal(t, int64(1), resp.Count) -// sess := &sessionutil.Session{} -// assert.Nil(t, json.Unmarshal(resp.Kvs[0].Value, sess)) -// assert.Equal(t, sess.ServerID, in.session.ServerID) -// assert.Equal(t, sess.Address, in.session.Address) -// assert.Equal(t, sess.ServerName, in.session.ServerName) -// -// // revoke lease -// in.session.Revoke(time.Second) -// -// in.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/lib/milvus")) -// t.Run("CreateIndex FloatVector", func(t *testing.T) { -// var insertCodec storage.InsertCodec -// -// insertCodec.Schema = &etcdpb.CollectionMeta{ -// ID: collectionID, -// Schema: &schemapb.CollectionSchema{ -// Fields: []*schemapb.FieldSchema{ -// { -// FieldID: floatVectorFieldID, -// Name: floatVectorFieldName, -// IsPrimaryKey: false, -// DataType: schemapb.DataType_FloatVector, -// }, -// }, -// }, -// } -// data := make(map[UniqueID]storage.FieldData) -// tsData := make([]int64, nb) -// for i := 0; i < nb; i++ { -// tsData[i] = int64(i + 100) -// } -// data[tsFieldID] = &storage.Int64FieldData{ -// NumRows: []int64{nb}, -// Data: tsData, -// } -// data[floatVectorFieldID] = &storage.FloatVectorFieldData{ -// NumRows: []int64{nb}, -// Data: generateFloatVectors(), -// Dim: dim, -// } -// insertData := storage.InsertData{ -// Data: data, -// Infos: []storage.BlobInfo{ -// { -// Length: 10, -// }, -// }, -// } -// binLogs, _, err := insertCodec.Serialize(999, 888, &insertData) -// assert.NoError(t, err) -// kvs := make(map[string][]byte, len(binLogs)) -// paths := make([]string, 0, len(binLogs)) -// for i, blob := range binLogs { -// key := path.Join(floatVectorBinlogPath, strconv.Itoa(i)) -// paths = append(paths, key) -// kvs[key] = blob.Value[:] -// } -// err = in.chunkManager.MultiWrite(kvs) -// assert.NoError(t, err) -// -// indexMeta := &indexpb.IndexMeta{ -// IndexBuildID: indexBuildID1, -// State: commonpb.IndexState_InProgress, -// IndexVersion: 1, -// } -// -// value, err := proto.Marshal(indexMeta) -// assert.NoError(t, err) -// err = in.etcdKV.Save(metaPath1, string(value)) -// assert.NoError(t, err) -// req := &indexpb.CreateIndexRequest{ -// IndexBuildID: indexBuildID1, -// IndexName: "FloatVector", -// IndexID: indexID, -// Version: 1, -// MetaPath: metaPath1, -// DataPaths: paths, -// TypeParams: []*commonpb.KeyValuePair{ -// { -// Key: common.DimKey, -// Value: "8", -// }, -// }, -// IndexParams: []*commonpb.KeyValuePair{ -// { -// Key: common.IndexTypeKey, -// Value: "IVF_SQ8", -// }, -// { -// Key: common.IndexParamsKey, -// Value: "{\"nlist\": 128}", -// }, -// { -// Key: common.MetricTypeKey, -// Value: "L2", -// }, -// }, -// } -// -// status, err2 := in.CreateIndex(ctx, req) -// assert.Nil(t, err2) -// assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) -// -// strValue, err3 := in.etcdKV.Load(metaPath1) -// assert.Nil(t, err3) -// indexMetaTmp := indexpb.IndexMeta{} -// err = proto.Unmarshal([]byte(strValue), &indexMetaTmp) -// assert.NoError(t, err) -// for indexMetaTmp.State != commonpb.IndexState_Finished { -// time.Sleep(100 * time.Millisecond) -// strValue, err := in.etcdKV.Load(metaPath1) -// assert.NoError(t, err) -// err = proto.Unmarshal([]byte(strValue), &indexMetaTmp) -// assert.NoError(t, err) -// } -// defer in.chunkManager.MultiRemove(indexMetaTmp.IndexFileKeys) -// defer func() { -// for k := range kvs { -// err = in.chunkManager.Remove(k) -// assert.NoError(t, err) -// } -// }() -// -// defer in.etcdKV.RemoveWithPrefix(metaPath1) -// }) -// t.Run("CreateIndex BinaryVector", func(t *testing.T) { -// var insertCodec storage.InsertCodec -// -// insertCodec.Schema = &etcdpb.CollectionMeta{ -// ID: collectionID, -// Schema: &schemapb.CollectionSchema{ -// Fields: []*schemapb.FieldSchema{ -// { -// FieldID: binaryVectorFieldID, -// Name: binaryVectorFieldName, -// IsPrimaryKey: false, -// DataType: schemapb.DataType_BinaryVector, -// }, -// }, -// }, -// } -// data := make(map[UniqueID]storage.FieldData) -// tsData := make([]int64, nb) -// for i := 0; i < nb; i++ { -// tsData[i] = int64(i + 100) -// } -// data[tsFieldID] = &storage.Int64FieldData{ -// NumRows: []int64{nb}, -// Data: tsData, -// } -// data[binaryVectorFieldID] = &storage.BinaryVectorFieldData{ -// NumRows: []int64{nb}, -// Data: generateBinaryVectors(), -// Dim: dim, -// } -// insertData := storage.InsertData{ -// Data: data, -// Infos: []storage.BlobInfo{ -// { -// Length: 10, -// }, -// }, -// } -// binLogs, _, err := insertCodec.Serialize(999, 888, &insertData) -// assert.NoError(t, err) -// kvs := make(map[string][]byte, len(binLogs)) -// paths := make([]string, 0, len(binLogs)) -// for i, blob := range binLogs { -// key := path.Join(binaryVectorBinlogPath, strconv.Itoa(i)) -// paths = append(paths, key) -// kvs[key] = blob.Value[:] -// } -// err = in.chunkManager.MultiWrite(kvs) -// assert.NoError(t, err) -// -// indexMeta := &indexpb.IndexMeta{ -// IndexBuildID: indexBuildID2, -// State: commonpb.IndexState_InProgress, -// IndexVersion: 1, -// } -// -// value, err := proto.Marshal(indexMeta) -// assert.NoError(t, err) -// err = in.etcdKV.Save(metaPath2, string(value)) -// assert.NoError(t, err) -// req := &indexpb.CreateIndexRequest{ -// IndexBuildID: indexBuildID2, -// IndexName: "BinaryVector", -// IndexID: indexID, -// Version: 1, -// MetaPath: metaPath2, -// DataPaths: paths, -// TypeParams: []*commonpb.KeyValuePair{ -// { -// Key: common.DimKey, -// Value: "8", -// }, -// }, -// IndexParams: []*commonpb.KeyValuePair{ -// { -// Key: common.IndexTypeKey, -// Value: "BIN_FLAT", -// }, -// { -// Key: common.MetricTypeKey, -// Value: "JACCARD", -// }, -// }, -// } -// -// status, err2 := in.CreateIndex(ctx, req) -// assert.Nil(t, err2) -// assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) -// -// strValue, err3 := in.etcdKV.Load(metaPath2) -// assert.Nil(t, err3) -// indexMetaTmp := indexpb.IndexMeta{} -// err = proto.Unmarshal([]byte(strValue), &indexMetaTmp) -// assert.NoError(t, err) -// for indexMetaTmp.State != commonpb.IndexState_Finished { -// time.Sleep(100 * time.Millisecond) -// strValue, err = in.etcdKV.Load(metaPath2) -// assert.NoError(t, err) -// err = proto.Unmarshal([]byte(strValue), &indexMetaTmp) -// assert.NoError(t, err) -// } -// defer in.chunkManager.MultiRemove(indexMetaTmp.IndexFileKeys) -// defer func() { -// for k := range kvs { -// err = in.chunkManager.Remove(k) -// assert.NoError(t, err) -// } -// }() -// -// defer in.etcdKV.RemoveWithPrefix(metaPath2) -// }) -// -// t.Run("Create DeletedIndex", func(t *testing.T) { -// var insertCodec storage.InsertCodec -// -// insertCodec.Schema = &etcdpb.CollectionMeta{ -// ID: collectionID, -// Schema: &schemapb.CollectionSchema{ -// Fields: []*schemapb.FieldSchema{ -// { -// FieldID: floatVectorFieldID, -// Name: floatVectorFieldName, -// IsPrimaryKey: false, -// DataType: schemapb.DataType_FloatVector, -// }, -// }, -// }, -// } -// data := make(map[UniqueID]storage.FieldData) -// tsData := make([]int64, nb) -// for i := 0; i < nb; i++ { -// tsData[i] = int64(i + 100) -// } -// data[tsFieldID] = &storage.Int64FieldData{ -// NumRows: []int64{nb}, -// Data: tsData, -// } -// data[floatVectorFieldID] = &storage.FloatVectorFieldData{ -// NumRows: []int64{nb}, -// Data: generateFloatVectors(), -// Dim: dim, -// } -// insertData := storage.InsertData{ -// Data: data, -// Infos: []storage.BlobInfo{ -// { -// Length: 10, -// }, -// }, -// } -// binLogs, _, err := insertCodec.Serialize(999, 888, &insertData) -// assert.NoError(t, err) -// kvs := make(map[string][]byte, len(binLogs)) -// paths := make([]string, 0, len(binLogs)) -// for i, blob := range binLogs { -// key := path.Join(floatVectorBinlogPath, strconv.Itoa(i)) -// paths = append(paths, key) -// kvs[key] = blob.Value[:] -// } -// err = in.chunkManager.MultiWrite(kvs) -// assert.NoError(t, err) -// -// indexMeta := &indexpb.IndexMeta{ -// IndexBuildID: indexBuildID1, -// State: commonpb.IndexState_InProgress, -// IndexVersion: 1, -// MarkDeleted: true, -// } -// -// value, err := proto.Marshal(indexMeta) -// assert.NoError(t, err) -// err = in.etcdKV.Save(metaPath3, string(value)) -// assert.NoError(t, err) -// req := &indexpb.CreateIndexRequest{ -// IndexBuildID: indexBuildID1, -// IndexName: "FloatVector", -// IndexID: indexID, -// Version: 1, -// MetaPath: metaPath3, -// DataPaths: paths, -// TypeParams: []*commonpb.KeyValuePair{ -// { -// Key: common.DimKey, -// Value: "8", -// }, -// }, -// IndexParams: []*commonpb.KeyValuePair{ -// { -// Key: common.IndexTypeKey, -// Value: "IVF_SQ8", -// }, -// { -// Key: common.IndexParamsKey, -// Value: "{\"nlist\": 128}", -// }, -// { -// Key: common.MetricTypeKey, -// Value: "L2", -// }, -// }, -// } -// -// status, err2 := in.CreateIndex(ctx, req) -// assert.Nil(t, err2) -// assert.Equal(t, commonpb.ErrorCode_Success, status.ErrorCode) -// time.Sleep(100 * time.Millisecond) -// strValue, err3 := in.etcdKV.Load(metaPath3) -// assert.Nil(t, err3) -// indexMetaTmp := indexpb.IndexMeta{} -// err = proto.Unmarshal([]byte(strValue), &indexMetaTmp) -// assert.NoError(t, err) -// assert.Equal(t, true, indexMetaTmp.MarkDeleted) -// assert.Equal(t, int64(1), indexMetaTmp.IndexVersion) -// //for indexMetaTmp.State != commonpb.IndexState_Finished { -// // time.Sleep(100 * time.Millisecond) -// // strValue, err := in.etcdKV.Load(metaPath3) -// // assert.NoError(t, err) -// // err = proto.Unmarshal([]byte(strValue), &indexMetaTmp) -// // assert.NoError(t, err) -// //} -// defer in.chunkManager.MultiRemove(indexMetaTmp.IndexFileKeys) -// defer func() { -// for k := range kvs { -// err = in.chunkManager.Remove(k) -// assert.NoError(t, err) -// } -// }() -// -// defer in.etcdKV.RemoveWithPrefix(metaPath3) -// }) -// -// t.Run("GetComponentStates", func(t *testing.T) { -// resp, err := in.GetComponentStates(ctx) -// assert.NoError(t, err) -// assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) -// assert.Equal(t, commonpb.StateCode_Healthy, resp.State.StateCode) -// }) -// -// t.Run("GetTimeTickChannel", func(t *testing.T) { -// resp, err := in.GetTimeTickChannel(ctx) -// assert.NoError(t, err) -// assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) -// }) -// -// t.Run("GetStatisticsChannel", func(t *testing.T) { -// resp, err := in.GetStatisticsChannel(ctx) -// assert.NoError(t, err) -// assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) -// }) -// -// t.Run("ShowConfigurations", func(t *testing.T) { -// pattern := "Port" -// req := &internalpb.ShowConfigurationsRequest{ -// Base: &commonpb.MsgBase{ -// MsgType: commonpb.MsgType_WatchQueryChannels, -// MsgID: rand.Int63(), -// }, -// Pattern: pattern, -// } -// -// resp, err := in.ShowConfigurations(ctx, req) -// assert.NoError(t, err) -// assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) -// assert.Equal(t, 1, len(resp.Configuations)) -// assert.Equal(t, "indexnode.port", resp.Configuations[0].Key) -// }) -// -// t.Run("GetMetrics_system_info", func(t *testing.T) { -// req, err := metricsinfo.ConstructRequestByMetricType(metricsinfo.SystemInfoMetrics) -// assert.NoError(t, err) -// resp, err := in.GetMetrics(ctx, req) -// assert.NoError(t, err) -// log.Info("GetMetrics_system_info", -// zap.String("resp", resp.Response), -// zap.String("name", resp.ComponentName)) -// }) -// err = in.etcdKV.RemoveWithPrefix("session/IndexNode") -// assert.NoError(t, err) -// -// resp, err = getEtcdClient().Get(ctx, path.Join(Params.EtcdCfg.MetaRootPath, sessionutil.DefaultServiceRoot, in.session.ServerName)) -// assert.NoError(t, err) -// assert.Equal(t, resp.Count, int64(0)) -//} - func TestComponentState(t *testing.T) { var ( factory = &mockFactory{ @@ -591,3 +172,360 @@ func TestMain(m *testing.M) { teardown() os.Exit(code) } + +type IndexNodeSuite struct { + suite.Suite + + collID int64 + partID int64 + segID int64 + fieldID int64 + logID int64 + data []*Blob + in *IndexNode + storageConfig *indexpb.StorageConfig + cm storage.ChunkManager +} + +func Test_IndexNodeSuite(t *testing.T) { + suite.Run(t, new(IndexNodeSuite)) +} + +func (s *IndexNodeSuite) SetupTest() { + s.collID = 1 + s.partID = 2 + s.segID = 3 + s.fieldID = 102 + s.logID = 10000 + paramtable.Init() + Params.MinioCfg.RootPath.SwapTempValue("indexnode-ut") + + var err error + s.data, err = generateTestData(s.collID, s.partID, s.segID, 1025) + s.NoError(err) + + s.storageConfig = &indexpb.StorageConfig{ + Address: Params.MinioCfg.Address.GetValue(), + AccessKeyID: Params.MinioCfg.AccessKeyID.GetValue(), + SecretAccessKey: Params.MinioCfg.SecretAccessKey.GetValue(), + UseSSL: Params.MinioCfg.UseSSL.GetAsBool(), + SslCACert: Params.MinioCfg.SslCACert.GetValue(), + BucketName: Params.MinioCfg.BucketName.GetValue(), + RootPath: Params.MinioCfg.RootPath.GetValue(), + UseIAM: Params.MinioCfg.UseIAM.GetAsBool(), + IAMEndpoint: Params.MinioCfg.IAMEndpoint.GetValue(), + StorageType: Params.CommonCfg.StorageType.GetValue(), + Region: Params.MinioCfg.Region.GetValue(), + UseVirtualHost: Params.MinioCfg.UseVirtualHost.GetAsBool(), + CloudProvider: Params.MinioCfg.CloudProvider.GetValue(), + RequestTimeoutMs: Params.MinioCfg.RequestTimeoutMs.GetAsInt64(), + } + + var ( + factory = &mockFactory{ + chunkMgr: &mockChunkmgr{}, + } + ctx = context.TODO() + ) + s.in = NewIndexNode(ctx, factory) + + err = s.in.Init() + s.NoError(err) + + err = s.in.Start() + s.NoError(err) + + s.cm, err = s.in.storageFactory.NewChunkManager(context.Background(), s.storageConfig) + s.NoError(err) + logID := int64(10000) + for i, blob := range s.data { + fID, _ := strconv.ParseInt(blob.GetKey(), 10, 64) + filePath, err := binlog.BuildLogPath(storage.InsertBinlog, s.collID, s.partID, s.segID, fID, logID+int64(i)) + s.NoError(err) + err = s.cm.Write(context.Background(), filePath, blob.GetValue()) + s.NoError(err) + } +} + +func (s *IndexNodeSuite) TearDownSuite() { + err := s.cm.RemoveWithPrefix(context.Background(), "indexnode-ut") + s.NoError(err) + Params.MinioCfg.RootPath.SwapTempValue("files") + + err = s.in.Stop() + s.NoError(err) +} + +func (s *IndexNodeSuite) Test_CreateIndexJob_Compatibility() { + s.Run("create vec index", func() { + ctx := context.Background() + + s.Run("v2.3.x", func() { + buildID := int64(1) + dataPath, err := binlog.BuildLogPath(storage.InsertBinlog, s.collID, s.partID, s.segID, s.fieldID, s.logID+13) + s.NoError(err) + req := &indexpb.CreateJobRequest{ + ClusterID: "cluster1", + IndexFilePrefix: "indexnode-ut/index_files", + BuildID: buildID, + DataPaths: []string{dataPath}, + IndexVersion: 1, + StorageConfig: s.storageConfig, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "index_type", Value: "HNSW", + }, + { + Key: "metric_type", Value: "L2", + }, + { + Key: "M", Value: "4", + }, + { + Key: "efConstruction", Value: "16", + }, + }, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", Value: "8", + }, + }, + NumRows: 1025, + } + + status, err := s.in.CreateJob(ctx, req) + s.NoError(err) + err = merr.Error(status) + s.NoError(err) + + for { + resp, err := s.in.QueryJobs(ctx, &indexpb.QueryJobsRequest{ + ClusterID: "cluster1", + BuildIDs: []int64{buildID}, + }) + s.NoError(err) + err = merr.Error(resp.GetStatus()) + s.NoError(err) + s.Equal(1, len(resp.GetIndexInfos())) + if resp.GetIndexInfos()[0].GetState() == commonpb.IndexState_Finished { + break + } + require.Equal(s.T(), resp.GetIndexInfos()[0].GetState(), commonpb.IndexState_InProgress) + time.Sleep(time.Second) + } + + status, err = s.in.DropJobs(ctx, &indexpb.DropJobsRequest{ + ClusterID: "cluster1", + BuildIDs: []int64{buildID}, + }) + s.NoError(err) + err = merr.Error(status) + s.NoError(err) + }) + + s.Run("v2.4.x", func() { + buildID := int64(2) + req := &indexpb.CreateJobRequest{ + ClusterID: "cluster1", + IndexFilePrefix: "indexnode-ut/index_files", + BuildID: buildID, + DataPaths: nil, + IndexVersion: 1, + StorageConfig: s.storageConfig, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "index_type", Value: "HNSW", + }, + { + Key: "metric_type", Value: "L2", + }, + { + Key: "M", Value: "4", + }, + { + Key: "efConstruction", Value: "16", + }, + }, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", Value: "8", + }, + }, + NumRows: 1025, + CurrentIndexVersion: 0, + CollectionID: s.collID, + PartitionID: s.partID, + SegmentID: s.segID, + FieldID: s.fieldID, + FieldName: "floatVector", + FieldType: schemapb.DataType_FloatVector, + Dim: 8, + DataIds: []int64{s.logID + 13}, + } + + status, err := s.in.CreateJob(ctx, req) + s.NoError(err) + err = merr.Error(status) + s.NoError(err) + + for { + resp, err := s.in.QueryJobs(ctx, &indexpb.QueryJobsRequest{ + ClusterID: "cluster1", + BuildIDs: []int64{buildID}, + }) + s.NoError(err) + err = merr.Error(resp.GetStatus()) + s.NoError(err) + s.Equal(1, len(resp.GetIndexInfos())) + if resp.GetIndexInfos()[0].GetState() == commonpb.IndexState_Finished { + break + } + require.Equal(s.T(), resp.GetIndexInfos()[0].GetState(), commonpb.IndexState_InProgress) + time.Sleep(time.Second) + } + + status, err = s.in.DropJobs(ctx, &indexpb.DropJobsRequest{ + ClusterID: "cluster1", + BuildIDs: []int64{buildID}, + }) + s.NoError(err) + err = merr.Error(status) + s.NoError(err) + }) + + s.Run("v2.5.x", func() { + buildID := int64(3) + req := &indexpb.CreateJobRequest{ + ClusterID: "cluster1", + IndexFilePrefix: "indexnode-ut/index_files", + BuildID: buildID, + IndexVersion: 1, + StorageConfig: s.storageConfig, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "index_type", Value: "HNSW", + }, + { + Key: "metric_type", Value: "L2", + }, + { + Key: "M", Value: "4", + }, + { + Key: "efConstruction", Value: "16", + }, + }, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: "dim", Value: "8", + }, + }, + NumRows: 1025, + CurrentIndexVersion: 0, + CollectionID: s.collID, + PartitionID: s.partID, + SegmentID: s.segID, + FieldID: s.fieldID, + FieldName: "floatVector", + FieldType: schemapb.DataType_FloatVector, + Dim: 8, + DataIds: []int64{s.logID + 13}, + Field: &schemapb.FieldSchema{ + FieldID: s.fieldID, + Name: "floatVector", + DataType: schemapb.DataType_FloatVector, + }, + } + + status, err := s.in.CreateJob(ctx, req) + s.NoError(err) + err = merr.Error(status) + s.NoError(err) + + for { + resp, err := s.in.QueryJobs(ctx, &indexpb.QueryJobsRequest{ + ClusterID: "cluster1", + BuildIDs: []int64{buildID}, + }) + s.NoError(err) + err = merr.Error(resp.GetStatus()) + s.NoError(err) + s.Equal(1, len(resp.GetIndexInfos())) + if resp.GetIndexInfos()[0].GetState() == commonpb.IndexState_Finished { + break + } + require.Equal(s.T(), resp.GetIndexInfos()[0].GetState(), commonpb.IndexState_InProgress) + time.Sleep(time.Second) + } + + status, err = s.in.DropJobs(ctx, &indexpb.DropJobsRequest{ + ClusterID: "cluster1", + BuildIDs: []int64{buildID}, + }) + s.NoError(err) + err = merr.Error(status) + s.NoError(err) + }) + }) +} + +func (s *IndexNodeSuite) Test_CreateIndexJob_ScalarIndex() { + ctx := context.Background() + + s.Run("int64 inverted", func() { + buildID := int64(10) + fieldID := int64(13) + dataPath, err := binlog.BuildLogPath(storage.InsertBinlog, s.collID, s.partID, s.segID, s.fieldID, s.logID+13) + s.NoError(err) + req := &indexpb.CreateJobRequest{ + ClusterID: "cluster1", + IndexFilePrefix: "indexnode-ut/index_files", + BuildID: buildID, + DataPaths: []string{dataPath}, + IndexVersion: 1, + StorageConfig: s.storageConfig, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: "index_type", Value: "INVERTED", + }, + }, + TypeParams: nil, + NumRows: 1025, + DataIds: []int64{s.logID + 13}, + Field: &schemapb.FieldSchema{ + FieldID: fieldID, + Name: "int64", + DataType: schemapb.DataType_Int64, + }, + } + + status, err := s.in.CreateJob(ctx, req) + s.NoError(err) + err = merr.Error(status) + s.NoError(err) + + for { + resp, err := s.in.QueryJobs(ctx, &indexpb.QueryJobsRequest{ + ClusterID: "cluster1", + BuildIDs: []int64{buildID}, + }) + s.NoError(err) + err = merr.Error(resp.GetStatus()) + s.NoError(err) + s.Equal(1, len(resp.GetIndexInfos())) + if resp.GetIndexInfos()[0].GetState() == commonpb.IndexState_Finished { + break + } + require.Equal(s.T(), resp.GetIndexInfos()[0].GetState(), commonpb.IndexState_InProgress) + time.Sleep(time.Second) + } + + status, err = s.in.DropJobs(ctx, &indexpb.DropJobsRequest{ + ClusterID: "cluster1", + BuildIDs: []int64{buildID}, + }) + s.NoError(err) + err = merr.Error(status) + s.NoError(err) + }) +} diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 54c8b3fe45a6..177f1282c538 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -19,7 +19,6 @@ package indexnode import ( "context" "fmt" - "runtime/debug" "strconv" "strings" "time" @@ -36,8 +35,6 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/util/funcutil" - "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/indexparams" "github.com/milvus-io/milvus/pkg/util/merr" @@ -61,16 +58,12 @@ type taskInfo struct { failReason string currentIndexVersion int32 indexStoreVersion int64 - - // task statistics - statistic *indexpb.JobInfo } type task interface { Ctx() context.Context Name() string Prepare(context.Context) error - LoadData(context.Context) error BuildIndex(context.Context) error SaveIndexFiles(context.Context) error OnEnqueue(context.Context) error @@ -83,37 +76,47 @@ type indexBuildTaskV2 struct { *indexBuildTask } -func (it *indexBuildTaskV2) parseParams(ctx context.Context) error { - it.collectionID = it.req.GetCollectionID() - it.partitionID = it.req.GetPartitionID() - it.segmentID = it.req.GetSegmentID() - it.fieldType = it.req.GetFieldType() - if it.fieldType == schemapb.DataType_None { - it.fieldType = it.req.GetField().GetDataType() - } - it.fieldID = it.req.GetFieldID() - if it.fieldID == 0 { - it.fieldID = it.req.GetField().GetFieldID() - } - it.fieldName = it.req.GetFieldName() - if it.fieldName == "" { - it.fieldName = it.req.GetField().GetName() +func newIndexBuildTaskV2(ctx context.Context, + cancel context.CancelFunc, + req *indexpb.CreateJobRequest, + node *IndexNode, +) *indexBuildTaskV2 { + t := &indexBuildTaskV2{ + indexBuildTask: &indexBuildTask{ + ident: fmt.Sprintf("%s/%d", req.GetClusterID(), req.GetBuildID()), + cancel: cancel, + ctx: ctx, + req: req, + tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.GetBuildID(), req.GetClusterID())), + node: node, + }, + } + + t.parseParams() + return t +} + +func (it *indexBuildTaskV2) parseParams() { + // fill field for requests before v2.5.0 + if it.req.GetField() == nil || it.req.GetField().GetDataType() == schemapb.DataType_None { + it.req.Field = &schemapb.FieldSchema{ + FieldID: it.req.GetFieldID(), + Name: it.req.GetFieldName(), + DataType: it.req.GetFieldType(), + } } - return nil } func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error { - err := it.parseParams(ctx) - if err != nil { - log.Ctx(ctx).Warn("parse field meta from binlog failed", zap.Error(err)) - return err - } + log := log.Ctx(ctx).With(zap.String("clusterID", it.req.GetClusterID()), zap.Int64("buildID", it.req.GetBuildID()), + zap.Int64("collection", it.req.GetCollectionID()), zap.Int64("segmentID", it.req.GetSegmentID()), + zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion())) indexType := it.newIndexParams[common.IndexTypeKey] if indexType == indexparamcheck.IndexDISKANN { // check index node support disk index if !Params.IndexNodeCfg.EnableDisk.GetAsBool() { - log.Ctx(ctx).Warn("IndexNode don't support build disk index", + log.Warn("IndexNode don't support build disk index", zap.String("index type", it.newIndexParams[common.IndexTypeKey]), zap.Bool("enable disk", Params.IndexNodeCfg.EnableDisk.GetAsBool())) return merr.WrapErrIndexNotSupported("disk index") @@ -122,19 +125,19 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error { // check load size and size of field data localUsedSize, err := indexcgowrapper.GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue()) if err != nil { - log.Ctx(ctx).Warn("IndexNode get local used size failed") + log.Warn("IndexNode get local used size failed") return err } - fieldDataSize, err := estimateFieldDataSize(it.statistic.Dim, it.req.GetNumRows(), it.fieldType) + fieldDataSize, err := estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType()) if err != nil { - log.Ctx(ctx).Warn("IndexNode get local used size failed") + log.Warn("IndexNode get local used size failed") return err } usedLocalSizeWhenBuild := int64(float64(fieldDataSize)*diskUsageRatio) + localUsedSize maxUsedLocalSize := int64(Params.IndexNodeCfg.DiskCapacityLimit.GetAsFloat() * Params.IndexNodeCfg.MaxDiskUsagePercentage.GetAsFloat()) if usedLocalSizeWhenBuild > maxUsedLocalSize { - log.Ctx(ctx).Warn("IndexNode don't has enough disk size to build disk ann index", + log.Warn("IndexNode don't has enough disk size to build disk ann index", zap.Int64("usedLocalSizeWhenBuild", usedLocalSizeWhenBuild), zap.Int64("maxUsedLocalSize", maxUsedLocalSize)) return merr.WrapErrServiceDiskLimitExceeded(float32(usedLocalSizeWhenBuild), float32(maxUsedLocalSize)) @@ -142,7 +145,7 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error { err = indexparams.SetDiskIndexBuildParams(it.newIndexParams, int64(fieldDataSize)) if err != nil { - log.Ctx(ctx).Warn("failed to fill disk index params", zap.Error(err)) + log.Warn("failed to fill disk index params", zap.Error(err)) return err } } @@ -174,29 +177,19 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error { }) } - it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion()) - field := it.req.GetField() - if field == nil || field.GetDataType() == schemapb.DataType_None { - field = &schemapb.FieldSchema{ - FieldID: it.fieldID, - Name: it.fieldName, - DataType: it.fieldType, - } - } - buildIndexParams := &indexcgopb.BuildIndexInfo{ - ClusterID: it.ClusterID, - BuildID: it.BuildID, - CollectionID: it.collectionID, - PartitionID: it.partitionID, - SegmentID: it.segmentID, + ClusterID: it.req.GetClusterID(), + BuildID: it.req.GetBuildID(), + CollectionID: it.req.GetCollectionID(), + PartitionID: it.req.GetPartitionID(), + SegmentID: it.req.GetSegmentID(), IndexVersion: it.req.GetIndexVersion(), - CurrentIndexVersion: it.currentIndexVersion, + CurrentIndexVersion: it.req.GetCurrentIndexVersion(), NumRows: it.req.GetNumRows(), Dim: it.req.GetDim(), IndexFilePrefix: it.req.GetIndexFilePrefix(), InsertFiles: it.req.GetDataPaths(), - FieldSchema: field, + FieldSchema: it.req.GetField(), StorageConfig: storageConfig, IndexParams: mapToKVPairs(it.newIndexParams), TypeParams: mapToKVPairs(it.newTypeParams), @@ -206,33 +199,36 @@ func (it *indexBuildTaskV2) BuildIndex(ctx context.Context) error { OptFields: optFields, } + var err error it.index, err = indexcgowrapper.CreateIndexV2(ctx, buildIndexParams) if err != nil { if it.index != nil && it.index.CleanLocalData() != nil { - log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed", - zap.Int64("buildID", it.BuildID), - zap.Int64("index version", it.req.GetIndexVersion())) + log.Warn("failed to clean cached data on disk after build index failed") } - log.Ctx(ctx).Error("failed to build index", zap.Error(err)) + log.Warn("failed to build index", zap.Error(err)) return err } buildIndexLatency := it.tr.RecordSpan() metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(buildIndexLatency.Milliseconds())) - log.Ctx(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID)) + log.Info("Successfully build index") return nil } func (it *indexBuildTaskV2) SaveIndexFiles(ctx context.Context) error { + log := log.Ctx(ctx).With(zap.String("clusterID", it.req.GetClusterID()), zap.Int64("buildID", it.req.GetBuildID()), + zap.Int64("collection", it.req.GetCollectionID()), zap.Int64("segmentID", it.req.GetSegmentID()), + zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion())) + gcIndex := func() { if err := it.index.Delete(); err != nil { - log.Ctx(ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err)) + log.Warn("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err)) } } version, err := it.index.UpLoadV2() if err != nil { - log.Ctx(ctx).Error("failed to upload index", zap.Error(err)) + log.Warn("failed to upload index", zap.Error(err)) gcIndex() return err } @@ -244,17 +240,15 @@ func (it *indexBuildTaskV2) SaveIndexFiles(ctx context.Context) error { gcIndex() // use serialized size before encoding - it.serializedSize = 0 + var serializedSize uint64 saveFileKeys := make([]string, 0) - it.statistic.EndTime = time.Now().UnixMicro() - it.node.storeIndexFilesAndStatisticV2(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic, it.currentIndexVersion, version) - log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", saveFileKeys)) + it.node.storeIndexFilesAndStatisticV2(it.req.GetClusterID(), it.req.GetBuildID(), saveFileKeys, serializedSize, it.req.GetCurrentIndexVersion(), version) + log.Debug("save index files done", zap.Strings("IndexFiles", saveFileKeys)) saveIndexFileDur := it.tr.RecordSpan() metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(saveIndexFileDur.Seconds()) it.tr.Elapse("index building all done") - log.Ctx(ctx).Info("Successfully save index files", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), - zap.Int64("partition", it.partitionID), zap.Int64("SegmentId", it.segmentID)) + log.Info("Successfully save index files") return nil } @@ -264,29 +258,45 @@ type indexBuildTask struct { cancel context.CancelFunc ctx context.Context - cm storage.ChunkManager - index indexcgowrapper.CodecIndex - savePaths []string - req *indexpb.CreateJobRequest - currentIndexVersion int32 - BuildID UniqueID - nodeID UniqueID - ClusterID string - collectionID UniqueID - partitionID UniqueID - segmentID UniqueID - fieldID UniqueID - fieldName string - fieldType schemapb.DataType - fieldData storage.FieldData - indexBlobs []*storage.Blob - newTypeParams map[string]string - newIndexParams map[string]string - serializedSize uint64 - tr *timerecord.TimeRecorder - queueDur time.Duration - statistic indexpb.JobInfo - node *IndexNode + cm storage.ChunkManager + index indexcgowrapper.CodecIndex + req *indexpb.CreateJobRequest + newTypeParams map[string]string + newIndexParams map[string]string + tr *timerecord.TimeRecorder + queueDur time.Duration + node *IndexNode +} + +func newIndexBuildTask(ctx context.Context, + cancel context.CancelFunc, + req *indexpb.CreateJobRequest, + cm storage.ChunkManager, + node *IndexNode, +) *indexBuildTask { + t := &indexBuildTask{ + ident: fmt.Sprintf("%s/%d", req.GetClusterID(), req.GetBuildID()), + cancel: cancel, + ctx: ctx, + cm: cm, + req: req, + tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.GetBuildID(), req.GetClusterID())), + node: node, + } + + t.parseParams() + return t +} + +func (it *indexBuildTask) parseParams() { + // fill field for requests before v2.5.0 + if it.req.GetField() == nil || it.req.GetField().GetDataType() == schemapb.DataType_None { + it.req.Field = &schemapb.FieldSchema{ + FieldID: it.req.GetFieldID(), + Name: it.req.GetFieldName(), + DataType: it.req.GetFieldType(), + } + } } func (it *indexBuildTask) Reset() { @@ -295,10 +305,7 @@ func (it *indexBuildTask) Reset() { it.ctx = nil it.cm = nil it.index = nil - it.savePaths = nil it.req = nil - it.fieldData = nil - it.indexBlobs = nil it.newTypeParams = nil it.newIndexParams = nil it.tr = nil @@ -316,27 +323,27 @@ func (it *indexBuildTask) Name() string { } func (it *indexBuildTask) SetState(state commonpb.IndexState, failReason string) { - it.node.storeTaskState(it.ClusterID, it.BuildID, state, failReason) + it.node.storeTaskState(it.req.GetClusterID(), it.req.GetBuildID(), state, failReason) } func (it *indexBuildTask) GetState() commonpb.IndexState { - return it.node.loadTaskState(it.ClusterID, it.BuildID) + return it.node.loadTaskState(it.req.GetClusterID(), it.req.GetBuildID()) } // OnEnqueue enqueues indexing tasks. func (it *indexBuildTask) OnEnqueue(ctx context.Context) error { it.queueDur = 0 it.tr.RecordSpan() - it.statistic.StartTime = time.Now().UnixMicro() - it.statistic.PodID = it.node.GetNodeID() - log.Ctx(ctx).Info("IndexNode IndexBuilderTask Enqueue", zap.Int64("buildID", it.BuildID), zap.Int64("segmentID", it.segmentID)) + log.Ctx(ctx).Info("IndexNode IndexBuilderTask Enqueue", zap.Int64("buildID", it.req.GetBuildID()), + zap.Int64("segmentID", it.req.GetSegmentID())) return nil } func (it *indexBuildTask) Prepare(ctx context.Context) error { it.queueDur = it.tr.RecordSpan() - log.Ctx(ctx).Info("Begin to prepare indexBuildTask", zap.Int64("buildID", it.BuildID), - zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID)) + log.Ctx(ctx).Info("Begin to prepare indexBuildTask", zap.Int64("buildID", it.req.GetBuildID()), + zap.Int64("Collection", it.req.GetCollectionID()), zap.Int64("SegmentID", it.req.GetSegmentID())) + typeParams := make(map[string]string) indexParams := make(map[string]string) @@ -377,86 +384,41 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error { it.newTypeParams = typeParams it.newIndexParams = indexParams - it.statistic.IndexParams = it.req.GetIndexParams() - it.statistic.Dim = it.req.GetDim() - - log.Ctx(ctx).Info("Successfully prepare indexBuildTask", zap.Int64("buildID", it.BuildID), - zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID)) - return nil -} - -func (it *indexBuildTask) LoadData(ctx context.Context) error { - getValueByPath := func(path string) ([]byte, error) { - data, err := it.cm.Read(ctx, path) - if err != nil { - if errors.Is(err, merr.ErrIoKeyNotFound) { - return nil, err + if it.req.GetDim() == 0 { + // fill dim for requests before v2.4.0 + if dimStr, ok := typeParams[common.DimKey]; ok { + var err error + it.req.Dim, err = strconv.ParseInt(dimStr, 10, 64) + if err != nil { + log.Ctx(ctx).Error("parse dimesion failed", zap.Error(err)) + // ignore error } - return nil, err } - return data, nil } - getBlobByPath := func(path string) (*Blob, error) { - value, err := getValueByPath(path) - if err != nil { - return nil, err - } - return &Blob{ - Key: path, - Value: value, - }, nil - } - - toLoadDataPaths := it.req.GetDataPaths() - keys := make([]string, len(toLoadDataPaths)) - blobs := make([]*Blob, len(toLoadDataPaths)) - loadKey := func(idx int) error { - keys[idx] = toLoadDataPaths[idx] - blob, err := getBlobByPath(toLoadDataPaths[idx]) + if it.req.GetCollectionID() == 0 { + err := it.parseFieldMetaFromBinlog(ctx) if err != nil { + log.Ctx(ctx).Warn("parse field meta from binlog failed", zap.Error(err)) return err } - blobs[idx] = blob - return nil - } - // Use hardware.GetCPUNum() instead of hardware.GetCPUNum() - // to respect CPU quota of container/pod - // gomaxproc will be set by `automaxproc`, passing 0 will just retrieve the value - err := funcutil.ProcessFuncParallel(len(toLoadDataPaths), hardware.GetCPUNum(), loadKey, "loadKey") - if err != nil { - log.Ctx(ctx).Warn("loadKey failed", zap.Error(err)) - return err } - loadFieldDataLatency := it.tr.CtxRecord(ctx, "load field data done") - metrics.IndexNodeLoadFieldLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(loadFieldDataLatency.Seconds()) - - err = it.decodeBlobs(ctx, blobs) - if err != nil { - log.Ctx(ctx).Info("failed to decode blobs", zap.Int64("buildID", it.BuildID), - zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID), zap.Error(err)) - } else { - log.Ctx(ctx).Info("Successfully load data", zap.Int64("buildID", it.BuildID), - zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID)) - } - blobs = nil - debug.FreeOSMemory() - return err + log.Ctx(ctx).Info("Successfully prepare indexBuildTask", zap.Int64("buildID", it.req.GetBuildID()), + zap.Int64("collectionID", it.req.GetCollectionID()), zap.Int64("segmentID", it.req.GetSegmentID())) + return nil } func (it *indexBuildTask) BuildIndex(ctx context.Context) error { - err := it.parseFieldMetaFromBinlog(ctx) - if err != nil { - log.Ctx(ctx).Warn("parse field meta from binlog failed", zap.Error(err)) - return err - } + log := log.Ctx(ctx).With(zap.String("clusterID", it.req.GetClusterID()), zap.Int64("buildID", it.req.GetBuildID()), + zap.Int64("collection", it.req.GetCollectionID()), zap.Int64("segmentID", it.req.GetSegmentID()), + zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion())) indexType := it.newIndexParams[common.IndexTypeKey] if indexType == indexparamcheck.IndexDISKANN { // check index node support disk index if !Params.IndexNodeCfg.EnableDisk.GetAsBool() { - log.Ctx(ctx).Warn("IndexNode don't support build disk index", + log.Warn("IndexNode don't support build disk index", zap.String("index type", it.newIndexParams[common.IndexTypeKey]), zap.Bool("enable disk", Params.IndexNodeCfg.EnableDisk.GetAsBool())) return errors.New("index node don't support build disk index") @@ -465,19 +427,19 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { // check load size and size of field data localUsedSize, err := indexcgowrapper.GetLocalUsedSize(paramtable.Get().LocalStorageCfg.Path.GetValue()) if err != nil { - log.Ctx(ctx).Warn("IndexNode get local used size failed") + log.Warn("IndexNode get local used size failed") return err } - fieldDataSize, err := estimateFieldDataSize(it.statistic.Dim, it.req.GetNumRows(), it.fieldType) + fieldDataSize, err := estimateFieldDataSize(it.req.GetDim(), it.req.GetNumRows(), it.req.GetField().GetDataType()) if err != nil { - log.Ctx(ctx).Warn("IndexNode get local used size failed") + log.Warn("IndexNode get local used size failed") return err } usedLocalSizeWhenBuild := int64(float64(fieldDataSize)*diskUsageRatio) + localUsedSize maxUsedLocalSize := int64(Params.IndexNodeCfg.DiskCapacityLimit.GetAsFloat() * Params.IndexNodeCfg.MaxDiskUsagePercentage.GetAsFloat()) if usedLocalSizeWhenBuild > maxUsedLocalSize { - log.Ctx(ctx).Warn("IndexNode don't has enough disk size to build disk ann index", + log.Warn("IndexNode don't has enough disk size to build disk ann index", zap.Int64("usedLocalSizeWhenBuild", usedLocalSizeWhenBuild), zap.Int64("maxUsedLocalSize", maxUsedLocalSize)) return errors.New("index node don't has enough disk size to build disk ann index") @@ -485,7 +447,7 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { err = indexparams.SetDiskIndexBuildParams(it.newIndexParams, int64(fieldDataSize)) if err != nil { - log.Ctx(ctx).Warn("failed to fill disk index params", zap.Error(err)) + log.Warn("failed to fill disk index params", zap.Error(err)) return err } } @@ -517,28 +479,19 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { }) } - it.currentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion()) - field := it.req.GetField() - if field == nil || field.GetDataType() == schemapb.DataType_None { - field = &schemapb.FieldSchema{ - FieldID: it.fieldID, - Name: it.fieldName, - DataType: it.fieldType, - } - } buildIndexParams := &indexcgopb.BuildIndexInfo{ - ClusterID: it.ClusterID, - BuildID: it.BuildID, - CollectionID: it.collectionID, - PartitionID: it.partitionID, - SegmentID: it.segmentID, + ClusterID: it.req.GetClusterID(), + BuildID: it.req.GetBuildID(), + CollectionID: it.req.GetCollectionID(), + PartitionID: it.req.GetPartitionID(), + SegmentID: it.req.GetSegmentID(), IndexVersion: it.req.GetIndexVersion(), - CurrentIndexVersion: it.currentIndexVersion, + CurrentIndexVersion: it.req.GetCurrentIndexVersion(), NumRows: it.req.GetNumRows(), Dim: it.req.GetDim(), IndexFilePrefix: it.req.GetIndexFilePrefix(), InsertFiles: it.req.GetDataPaths(), - FieldSchema: field, + FieldSchema: it.req.GetField(), StorageConfig: storageConfig, IndexParams: mapToKVPairs(it.newIndexParams), TypeParams: mapToKVPairs(it.newTypeParams), @@ -548,33 +501,37 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { OptFields: optFields, } + log.Info("debug create index", zap.Any("buildIndexParams", buildIndexParams)) + var err error it.index, err = indexcgowrapper.CreateIndex(ctx, buildIndexParams) if err != nil { if it.index != nil && it.index.CleanLocalData() != nil { - log.Ctx(ctx).Error("failed to clean cached data on disk after build index failed", - zap.Int64("buildID", it.BuildID), - zap.Int64("index version", it.req.GetIndexVersion())) + log.Warn("failed to clean cached data on disk after build index failed") } - log.Ctx(ctx).Error("failed to build index", zap.Error(err)) + log.Warn("failed to build index", zap.Error(err)) return err } buildIndexLatency := it.tr.RecordSpan() metrics.IndexNodeKnowhereBuildIndexLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(buildIndexLatency.Seconds()) - log.Ctx(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID), zap.Int32("currentIndexVersion", it.currentIndexVersion)) + log.Info("Successfully build index") return nil } func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error { + log := log.Ctx(ctx).With(zap.String("clusterID", it.req.GetClusterID()), zap.Int64("buildID", it.req.GetBuildID()), + zap.Int64("collection", it.req.GetCollectionID()), zap.Int64("segmentID", it.req.GetSegmentID()), + zap.Int32("currentIndexVersion", it.req.GetCurrentIndexVersion())) + gcIndex := func() { if err := it.index.Delete(); err != nil { - log.Ctx(ctx).Error("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err)) + log.Warn("IndexNode indexBuildTask Execute CIndexDelete failed", zap.Error(err)) } } indexFilePath2Size, err := it.index.UpLoad() if err != nil { - log.Ctx(ctx).Error("failed to upload index", zap.Error(err)) + log.Warn("failed to upload index", zap.Error(err)) gcIndex() return err } @@ -585,27 +542,26 @@ func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error { gcIndex() // use serialized size before encoding - it.serializedSize = 0 + var serializedSize uint64 saveFileKeys := make([]string, 0) for filePath, fileSize := range indexFilePath2Size { - it.serializedSize += uint64(fileSize) + serializedSize += uint64(fileSize) parts := strings.Split(filePath, "/") fileKey := parts[len(parts)-1] saveFileKeys = append(saveFileKeys, fileKey) } - it.statistic.EndTime = time.Now().UnixMicro() - it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic, it.currentIndexVersion) - log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", saveFileKeys)) + it.node.storeIndexFilesAndStatistic(it.req.GetClusterID(), it.req.GetBuildID(), saveFileKeys, serializedSize, it.req.GetCurrentIndexVersion()) + log.Debug("save index files done", zap.Strings("IndexFiles", saveFileKeys)) saveIndexFileDur := it.tr.RecordSpan() metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(saveIndexFileDur.Seconds()) it.tr.Elapse("index building all done") - log.Ctx(ctx).Info("Successfully save index files", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), - zap.Int64("partition", it.partitionID), zap.Int64("SegmentId", it.segmentID)) + log.Info("Successfully save index files") return nil } func (it *indexBuildTask) parseFieldMetaFromBinlog(ctx context.Context) error { + // fill collectionID, partitionID... for requests before v2.4.0 toLoadDataPaths := it.req.GetDataPaths() if len(toLoadDataPaths) == 0 { return merr.WrapErrParameterInvalidMsg("data insert path must be not empty") @@ -627,51 +583,17 @@ func (it *indexBuildTask) parseFieldMetaFromBinlog(ctx context.Context) error { return merr.WrapErrParameterInvalidMsg("we expect only one field in deserialized insert data") } - it.collectionID = collectionID - it.partitionID = partitionID - it.segmentID = segmentID - for fID, value := range insertData.Data { - it.fieldType = value.GetDataType() - it.fieldID = fID - break - } - - return nil -} - -func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob) error { - var insertCodec storage.InsertCodec - collectionID, partitionID, segmentID, insertData, err2 := insertCodec.DeserializeAll(blobs) - if err2 != nil { - return err2 + it.req.CollectionID = collectionID + it.req.PartitionID = partitionID + it.req.SegmentID = segmentID + if it.req.GetField().GetFieldID() == 0 { + for fID, value := range insertData.Data { + it.req.Field.DataType = value.GetDataType() + it.req.Field.FieldID = fID + break + } } - metrics.IndexNodeDecodeFieldLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(it.tr.RecordSpan().Seconds()) + it.req.CurrentIndexVersion = getCurrentIndexVersion(it.req.GetCurrentIndexVersion()) - if len(insertData.Data) != 1 { - return merr.WrapErrParameterInvalidMsg("we expect only one field in deserialized insert data") - } - it.collectionID = collectionID - it.partitionID = partitionID - it.segmentID = segmentID - - deserializeDur := it.tr.RecordSpan() - - log.Ctx(ctx).Info("IndexNode deserialize data success", - zap.Int64("collectionID", it.collectionID), - zap.Int64("partitionID", it.partitionID), - zap.Int64("segmentID", it.segmentID), - zap.Duration("deserialize duration", deserializeDur)) - - // we can ensure that there blobs are in one Field - var data storage.FieldData - var fieldID storage.FieldID - for fID, value := range insertData.Data { - data = value - fieldID = fID - break - } - it.statistic.NumRows = int64(data.RowNum()) - it.fieldID = fieldID - it.fieldData = data return nil } diff --git a/internal/indexnode/task_test.go b/internal/indexnode/task_test.go index 6450c3e504a7..530dcdadac09 100644 --- a/internal/indexnode/task_test.go +++ b/internal/indexnode/task_test.go @@ -36,184 +36,8 @@ import ( "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/metric" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/timerecord" ) -// import ( -// "context" -// "github.com/cockroachdb/errors" -// "math/rand" -// "path" -// "strconv" -// "testing" - -// "github.com/milvus-io/milvus/internal/kv" - -// "github.com/golang/protobuf/proto" -// etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" -// "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" -// "github.com/milvus-io/milvus/internal/proto/indexpb" -// "github.com/milvus-io/milvus/internal/storage" -// "github.com/milvus-io/milvus/pkg/util/etcd" -// "github.com/milvus-io/milvus/pkg/util/timerecord" -// "github.com/stretchr/testify/assert" -// ) - -// func TestIndexBuildTask_saveIndexMeta(t *testing.T) { -// Params.Init() -// etcdCli, err := etcd.GetEtcdClient(&Params.EtcdCfg) -// assert.NoError(t, err) -// assert.NotNil(t, etcdCli) -// etcdKV := etcdkv.NewEtcdKV(etcdCli, Params.EtcdCfg.MetaRootPath) -// assert.NotNil(t, etcdKV) -// indexBuildID := rand.Int63() -// indexMeta := &indexpb.IndexMeta{ -// IndexBuildID: indexBuildID, -// State: commonpb.IndexState_InProgress, -// NodeID: 1, -// IndexVersion: 1, -// } -// metaPath := path.Join("indexes", strconv.FormatInt(indexMeta.IndexBuildID, 10)) -// metaValue, err := proto.Marshal(indexMeta) -// assert.NoError(t, err) -// err = etcdKV.Save(metaPath, string(metaValue)) -// assert.NoError(t, err) -// indexBuildTask := &IndexBuildTask{ -// BaseTask: BaseTask{ -// internalErr: errors.New("internal err"), -// }, -// etcdKV: etcdKV, -// req: &indexpb.CreateIndexRequest{ -// IndexBuildID: indexBuildID, -// Version: 1, -// MetaPath: metaPath, -// }, -// tr: &timerecord.TimeRecorder{}, -// } -// err = indexBuildTask.saveIndexMeta(context.Background()) -// assert.NoError(t, err) - -// indexMeta2, _, err := indexBuildTask.loadIndexMeta(context.Background()) -// assert.NoError(t, err) -// assert.NotNil(t, indexMeta2) -// assert.Equal(t, commonpb.IndexState_Unissued, indexMeta2.State) - -// err = etcdKV.Remove(metaPath) -// assert.NoError(t, err) -// } - -// type mockChunkManager struct { -// storage.ChunkManager - -// read func(key string) ([]byte, error) -// } - -// func (mcm *mockChunkManager) Read(key string) ([]byte, error) { -// return mcm.read(key) -// } - -// func TestIndexBuildTask_Execute(t *testing.T) { -// t.Run("task retry", func(t *testing.T) { -// indexTask := &IndexBuildTask{ -// cm: &mockChunkManager{ -// read: func(key string) ([]byte, error) { -// return nil, errors.New("error occurred") -// }, -// }, -// req: &indexpb.CreateIndexRequest{ -// IndexBuildID: 1, -// DataPaths: []string{"path1", "path2"}, -// }, -// } - -// err := indexTask.Execute(context.Background()) -// assert.Error(t, err) -// assert.Equal(t, TaskStateRetry, indexTask.state) -// }) - -// t.Run("task failed", func(t *testing.T) { -// indexTask := &IndexBuildTask{ -// cm: &mockChunkManager{ -// read: func(key string) ([]byte, error) { -// return nil, ErrNoSuchKey -// }, -// }, -// req: &indexpb.CreateIndexRequest{ -// IndexBuildID: 1, -// DataPaths: []string{"path1", "path2"}, -// }, -// } - -// err := indexTask.Execute(context.Background()) -// assert.ErrorIs(t, err, ErrNoSuchKey) -// assert.Equal(t, TaskStateFailed, indexTask.state) - -// }) -// } - -// type mockETCDKV struct { -// kv.MetaKv - -// loadWithPrefix2 func(key string) ([]string, []string, []int64, error) -// } - -// func TestIndexBuildTask_loadIndexMeta(t *testing.T) { -// t.Run("load empty meta", func(t *testing.T) { -// indexTask := &IndexBuildTask{ -// etcdKV: &mockETCDKV{ -// loadWithPrefix2: func(key string) ([]string, []string, []int64, error) { -// return []string{}, []string{}, []int64{}, nil -// }, -// }, -// req: &indexpb.CreateIndexRequest{ -// IndexBuildID: 1, -// DataPaths: []string{"path1", "path2"}, -// }, -// } - -// indexMeta, revision, err := indexTask.loadIndexMeta(context.Background()) -// assert.NoError(t, err) -// assert.Equal(t, int64(0), revision) -// assert.Equal(t, TaskStateAbandon, indexTask.GetState()) - -// indexTask.updateTaskState(indexMeta, nil) -// assert.Equal(t, TaskStateAbandon, indexTask.GetState()) -// }) -// } - -// func TestIndexBuildTask_saveIndex(t *testing.T) { -// t.Run("save index failed", func(t *testing.T) { -// indexTask := &IndexBuildTask{ -// etcdKV: &mockETCDKV{ -// loadWithPrefix2: func(key string) ([]string, []string, []int64, error) { -// return []string{}, []string{}, []int64{}, errors.New("error") -// }, -// }, -// partitionID: 1, -// segmentID: 1, -// req: &indexpb.CreateIndexRequest{ -// IndexBuildID: 1, -// DataPaths: []string{"path1", "path2"}, -// Version: 1, -// }, -// } - -// blobs := []*storage.Blob{ -// { -// Key: "key1", -// Value: []byte("value1"), -// }, -// { -// Key: "key2", -// Value: []byte("value2"), -// }, -// } - -// err := indexTask.saveIndex(context.Background(), blobs) -// assert.Error(t, err) -// }) -// } - type IndexBuildTaskV2Suite struct { suite.Suite schema *schemapb.CollectionSchema @@ -283,14 +107,12 @@ func (suite *IndexBuildTaskV2Suite) TestBuildIndex() { RootPath: "/tmp/milvus/data", StorageType: "local", }, - CollectionID: 1, - PartitionID: 1, - SegmentID: 1, - Field: &schemapb.FieldSchema{ - FieldID: 3, - Name: "vec", - DataType: schemapb.DataType_FloatVector, - }, + CollectionID: 1, + PartitionID: 1, + SegmentID: 1, + FieldID: 3, + FieldName: "vec", + FieldType: schemapb.DataType_FloatVector, StorePath: "file://" + suite.space.Path(), StoreVersion: suite.space.GetCurrentVersion(), IndexStorePath: "file://" + suite.space.Path(), @@ -300,17 +122,7 @@ func (suite *IndexBuildTaskV2Suite) TestBuildIndex() { }, } - task := &indexBuildTaskV2{ - indexBuildTask: &indexBuildTask{ - ident: "test", - ctx: context.Background(), - BuildID: req.GetBuildID(), - ClusterID: req.GetClusterID(), - req: req, - tr: timerecord.NewTimeRecorder("test"), - node: NewIndexNode(context.Background(), dependency.NewDefaultFactory(true)), - }, - } + task := newIndexBuildTaskV2(context.Background(), nil, req, NewIndexNode(context.Background(), dependency.NewDefaultFactory(true))) var err error err = task.Prepare(context.Background()) diff --git a/internal/indexnode/taskinfo_ops.go b/internal/indexnode/taskinfo_ops.go index 166c73436aa0..957fab23d76c 100644 --- a/internal/indexnode/taskinfo_ops.go +++ b/internal/indexnode/taskinfo_ops.go @@ -4,11 +4,9 @@ import ( "context" "time" - "github.com/golang/protobuf/proto" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" ) @@ -61,7 +59,6 @@ func (i *IndexNode) storeIndexFilesAndStatistic( buildID UniqueID, fileKeys []string, serializedSize uint64, - statistic *indexpb.JobInfo, currentIndexVersion int32, ) { key := taskKey{ClusterID: ClusterID, BuildID: buildID} @@ -70,7 +67,6 @@ func (i *IndexNode) storeIndexFilesAndStatistic( if info, ok := i.tasks[key]; ok { info.fileKeys = common.CloneStringList(fileKeys) info.serializedSize = serializedSize - info.statistic = proto.Clone(statistic).(*indexpb.JobInfo) info.currentIndexVersion = currentIndexVersion return } @@ -81,7 +77,6 @@ func (i *IndexNode) storeIndexFilesAndStatisticV2( buildID UniqueID, fileKeys []string, serializedSize uint64, - statistic *indexpb.JobInfo, currentIndexVersion int32, indexStoreVersion int64, ) { @@ -91,7 +86,6 @@ func (i *IndexNode) storeIndexFilesAndStatisticV2( if info, ok := i.tasks[key]; ok { info.fileKeys = common.CloneStringList(fileKeys) info.serializedSize = serializedSize - info.statistic = proto.Clone(statistic).(*indexpb.JobInfo) info.currentIndexVersion = currentIndexVersion info.indexStoreVersion = indexStoreVersion return