Skip to content

Commit

Permalink
enhance: Refine IndexNode code and ensure compatibility (#33458)
Browse files Browse the repository at this point in the history
issue: #33432 , #33183

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 committed Jun 5, 2024
1 parent 8ad2609 commit 412ccfb
Show file tree
Hide file tree
Showing 6 changed files with 687 additions and 916 deletions.
163 changes: 151 additions & 12 deletions internal/indexnode/index_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package indexnode

import "math/rand"
import (
"fmt"
"math/rand"

const (
dim = 8
nb = 10000
nprobe = 8
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

func generateFloatVectors() []float32 {
func generateFloatVectors(nb, dim int) []float32 {
vectors := make([]float32, 0)
for i := 0; i < nb; i++ {
for j := 0; j < dim; j++ {
Expand All @@ -18,12 +22,147 @@ func generateFloatVectors() []float32 {
return vectors
}

func generateBinaryVectors() []byte {
vectors := make([]byte, 0)
for i := 0; i < nb; i++ {
for j := 0; j < dim/8; j++ {
vectors = append(vectors, byte(rand.Intn(8)))
func generateTestSchema() *schemapb.CollectionSchema {
schema := &schemapb.CollectionSchema{Fields: []*schemapb.FieldSchema{
{FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64},
{FieldID: common.RowIDField, Name: "rowid", DataType: schemapb.DataType_Int64},
{FieldID: 10, Name: "bool", DataType: schemapb.DataType_Bool},
{FieldID: 11, Name: "int8", DataType: schemapb.DataType_Int8},
{FieldID: 12, Name: "int16", DataType: schemapb.DataType_Int16},
{FieldID: 13, Name: "int64", DataType: schemapb.DataType_Int64},
{FieldID: 14, Name: "float", DataType: schemapb.DataType_Float},
{FieldID: 15, Name: "double", DataType: schemapb.DataType_Double},
{FieldID: 16, Name: "varchar", DataType: schemapb.DataType_VarChar},
{FieldID: 17, Name: "string", DataType: schemapb.DataType_String},
{FieldID: 18, Name: "array", DataType: schemapb.DataType_Array},
{FieldID: 19, Name: "string", DataType: schemapb.DataType_JSON},
{FieldID: 101, Name: "int32", DataType: schemapb.DataType_Int32},
{FieldID: 102, Name: "floatVector", DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
}},
{FieldID: 103, Name: "binaryVector", DataType: schemapb.DataType_BinaryVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
}},
{FieldID: 104, Name: "float16Vector", DataType: schemapb.DataType_Float16Vector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
}},
{FieldID: 105, Name: "bf16Vector", DataType: schemapb.DataType_BFloat16Vector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
}},
{FieldID: 106, Name: "sparseFloatVector", DataType: schemapb.DataType_SparseFloatVector, TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "28433"},
}},
}}

return schema
}

func generateTestData(collID, partID, segID int64, num int) ([]*Blob, error) {
insertCodec := storage.NewInsertCodecWithSchema(&etcdpb.CollectionMeta{ID: collID, Schema: generateTestSchema()})

var (
field0 []int64
field1 []int64

field10 []bool
field11 []int8
field12 []int16
field13 []int64
field14 []float32
field15 []float64
field16 []string
field17 []string
field18 []*schemapb.ScalarField
field19 [][]byte

field101 []int32
field102 []float32
field103 []byte

field104 []byte
field105 []byte
field106 [][]byte
)

for i := 1; i <= num; i++ {
field0 = append(field0, int64(i))
field1 = append(field1, int64(i))
field10 = append(field10, true)
field11 = append(field11, int8(i))
field12 = append(field12, int16(i))
field13 = append(field13, int64(i))
field14 = append(field14, float32(i))
field15 = append(field15, float64(i))
field16 = append(field16, fmt.Sprint(i))
field17 = append(field17, fmt.Sprint(i))

arr := &schemapb.ScalarField{
Data: &schemapb.ScalarField_IntData{
IntData: &schemapb.IntArray{Data: []int32{int32(i), int32(i), int32(i)}},
},
}
field18 = append(field18, arr)

field19 = append(field19, []byte{byte(i)})
field101 = append(field101, int32(i))

f102 := make([]float32, 8)
for j := range f102 {
f102[j] = float32(i)
}

field102 = append(field102, f102...)
field103 = append(field103, 0xff)

f104 := make([]byte, 16)
for j := range f104 {
f104[j] = byte(i)
}
field104 = append(field104, f104...)
field105 = append(field105, f104...)

field106 = append(field106, typeutil.CreateSparseFloatRow([]uint32{0, uint32(18 * i), uint32(284 * i)}, []float32{1.1, 0.3, 2.4}))
}
return vectors

data := &storage.InsertData{Data: map[int64]storage.FieldData{
common.RowIDField: &storage.Int64FieldData{Data: field0},
common.TimeStampField: &storage.Int64FieldData{Data: field1},

10: &storage.BoolFieldData{Data: field10},
11: &storage.Int8FieldData{Data: field11},
12: &storage.Int16FieldData{Data: field12},
13: &storage.Int64FieldData{Data: field13},
14: &storage.FloatFieldData{Data: field14},
15: &storage.DoubleFieldData{Data: field15},
16: &storage.StringFieldData{Data: field16},
17: &storage.StringFieldData{Data: field17},
18: &storage.ArrayFieldData{Data: field18},
19: &storage.JSONFieldData{Data: field19},
101: &storage.Int32FieldData{Data: field101},
102: &storage.FloatVectorFieldData{
Data: field102,
Dim: 8,
},
103: &storage.BinaryVectorFieldData{
Data: field103,
Dim: 8,
},
104: &storage.Float16VectorFieldData{
Data: field104,
Dim: 8,
},
105: &storage.BFloat16VectorFieldData{
Data: field105,
Dim: 8,
},
106: &storage.SparseFloatVectorFieldData{
SparseFloatArray: schemapb.SparseFloatArray{
Dim: 28433,
Contents: field106,
},
},
}}

blobs, err := insertCodec.Serialize(partID, segID, data)
return blobs, err
}
40 changes: 3 additions & 37 deletions internal/indexnode/indexnode_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"strconv"

"github.com/golang/protobuf/proto"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand All @@ -36,7 +35,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -100,35 +98,9 @@ func (i *IndexNode) CreateJob(ctx context.Context, req *indexpb.CreateJobRequest
}
var task task
if Params.CommonCfg.EnableStorageV2.GetAsBool() {
task = &indexBuildTaskV2{
indexBuildTask: &indexBuildTask{
ident: fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID),
ctx: taskCtx,
cancel: taskCancel,
BuildID: req.GetBuildID(),
ClusterID: req.GetClusterID(),
node: i,
req: req,
cm: cm,
nodeID: i.GetNodeID(),
tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)),
serializedSize: 0,
},
}
task = newIndexBuildTaskV2(taskCtx, taskCancel, req, i)
} else {
task = &indexBuildTask{
ident: fmt.Sprintf("%s/%d", req.ClusterID, req.BuildID),
ctx: taskCtx,
cancel: taskCancel,
BuildID: req.GetBuildID(),
ClusterID: req.GetClusterID(),
node: i,
req: req,
cm: cm,
nodeID: i.GetNodeID(),
tr: timerecord.NewTimeRecorder(fmt.Sprintf("IndexBuildID: %d, ClusterID: %s", req.BuildID, req.ClusterID)),
serializedSize: 0,
}
task = newIndexBuildTask(taskCtx, taskCancel, req, cm, i)
}
ret := merr.Success()
if err := i.sched.IndexBuildQueue.Enqueue(task); err != nil {
Expand Down Expand Up @@ -222,6 +194,7 @@ func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest)
return merr.Success(), nil
}

// GetJobStats should be GetSlots
func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsRequest) (*indexpb.GetJobStatsResponse, error) {
if err := i.lifetime.Add(merr.IsHealthyOrStopping); err != nil {
log.Ctx(ctx).Warn("index node not ready", zap.Error(err))
Expand All @@ -231,12 +204,6 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
}
defer i.lifetime.Done()
unissued, active := i.sched.IndexBuildQueue.GetTaskNum()
jobInfos := make([]*indexpb.JobInfo, 0)
i.foreachTaskInfo(func(ClusterID string, buildID UniqueID, info *taskInfo) {
if info.statistic != nil {
jobInfos = append(jobInfos, proto.Clone(info.statistic).(*indexpb.JobInfo))
}
})
slots := 0
if i.sched.buildParallel > unissued+active {
slots = i.sched.buildParallel - unissued - active
Expand All @@ -252,7 +219,6 @@ func (i *IndexNode) GetJobStats(ctx context.Context, req *indexpb.GetJobStatsReq
InProgressJobNum: int64(active),
EnqueueJobNum: int64(unissued),
TaskSlots: int64(slots),
JobInfos: jobInfos,
EnableDisk: Params.IndexNodeCfg.EnableDisk.GetAsBool(),
}, nil
}
Expand Down
Loading

0 comments on commit 412ccfb

Please sign in to comment.