From 3d5a52c28858799c8d6961a72e4892862f45aa12 Mon Sep 17 00:00:00 2001 From: Buqian Zheng Date: Tue, 6 Feb 2024 15:31:14 +0800 Subject: [PATCH] [Sparse Float Vector] add sparse float vector support to different milvus components, including proxy, data node to receive and write sparse float vectors to binlog, query node to handle search requests, index node to build index for sparse float column, etc. Signed-off-by: Buqian Zheng --- internal/datacoord/compaction_trigger.go | 2 +- internal/datanode/compactor.go | 9 + internal/datanode/compactor_test.go | 8 + internal/indexnode/util.go | 5 + .../parser/planparserv2/plan_parser_v2.go | 2 + .../planparserv2/plan_parser_v2_test.go | 11 + internal/proxy/task.go | 2 +- internal/proxy/task_index.go | 34 +- internal/proxy/task_index_test.go | 70 ++++ internal/proxy/util.go | 35 +- internal/proxy/validate_util.go | 24 ++ internal/querynodev2/segments/utils.go | 8 + internal/storage/binlog_writer.go | 2 +- internal/storage/data_codec.go | 25 +- internal/storage/data_codec_test.go | 90 +++-- internal/storage/data_sorter.go | 3 + internal/storage/data_sorter_test.go | 29 +- internal/storage/event_writer.go | 2 +- internal/storage/insert_data.go | 97 +++++- internal/storage/insert_data_test.go | 63 ++-- internal/storage/payload.go | 2 + internal/storage/payload_reader.go | 33 ++ internal/storage/payload_test.go | 250 ++++++++++++++ internal/storage/payload_writer.go | 28 +- internal/storage/print_binlog.go | 12 + internal/storage/stats.go | 2 +- internal/storage/utils.go | 29 ++ internal/storage/utils_test.go | 54 +++ internal/util/indexcgowrapper/dataset.go | 12 + internal/util/indexcgowrapper/index.go | 9 + pkg/util/funcutil/func.go | 4 +- pkg/util/funcutil/placeholdergroup.go | 17 + pkg/util/gc/gc_tuner.go | 2 +- pkg/util/indexparamcheck/conf_adapter_mgr.go | 4 + pkg/util/indexparamcheck/constraints.go | 9 +- pkg/util/indexparamcheck/hnsw_checker.go | 1 + pkg/util/indexparamcheck/index_type.go | 2 + .../sparse_float_vector_base_checker.go | 47 +++ .../sparse_inverted_index_checker.go | 9 + pkg/util/testutils/sparse_test_utils.go | 84 +++++ pkg/util/typeutil/gen_empty_field_data.go | 22 ++ pkg/util/typeutil/get_dim.go | 3 + pkg/util/typeutil/schema.go | 130 +++++++- pkg/util/typeutil/schema_test.go | 315 ++++++++++++++++-- 44 files changed, 1453 insertions(+), 148 deletions(-) create mode 100644 pkg/util/indexparamcheck/sparse_float_vector_base_checker.go create mode 100644 pkg/util/indexparamcheck/sparse_inverted_index_checker.go create mode 100644 pkg/util/testutils/sparse_test_utils.go diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 1a26f3d1cea37..383df36d3e096 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -547,7 +547,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { segments := t.getCandidateSegments(channel, partitionID) if len(segments) == 0 { - log.Info("the length of segments is 0, skip to handle compaction") + log.Info("the number of candidate segments is 0, skip to handle compaction") return } diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index ab8dae57af398..787870d71cf9a 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -796,6 +796,15 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{} data.Dim = len(data.Data) * 8 / int(numRows) rst = data + case schemapb.DataType_SparseFloatVector: + data := &storage.SparseFloatVectorFieldData{} + for _, c := range content { + if err := data.AppendRow(c); err != nil { + return nil, fmt.Errorf("failed to append row: %v, %w", err, errTransferType) + } + } + rst = data + default: return nil, errUnknownDataType } diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index cd67f4af96592..8d3b4305d7c59 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -43,6 +43,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -105,6 +106,13 @@ func TestCompactionTaskInnerMethods(t *testing.T) { {false, schemapb.DataType_BinaryVector, []interface{}{nil, nil}, "invalid binaryvector"}, {false, schemapb.DataType_Float16Vector, []interface{}{nil, nil}, "invalid float16vector"}, {false, schemapb.DataType_BFloat16Vector, []interface{}{nil, nil}, "invalid bfloat16vector"}, + + {false, schemapb.DataType_SparseFloatVector, []interface{}{nil, nil}, "invalid sparsefloatvector"}, + {false, schemapb.DataType_SparseFloatVector, []interface{}{[]byte{255}, []byte{15}}, "invalid sparsefloatvector"}, + {true, schemapb.DataType_SparseFloatVector, []interface{}{ + testutils.CreateSparseFloatRow([]uint32{1, 2}, []float32{1.0, 2.0}), + testutils.CreateSparseFloatRow([]uint32{3, 4}, []float32{1.0, 2.0}), + }, "valid sparsefloatvector"}, } // make sure all new data types missed to handle would throw unexpected error diff --git a/internal/indexnode/util.go b/internal/indexnode/util.go index 9387e6fb9d46b..47aef494e3870 100644 --- a/internal/indexnode/util.go +++ b/internal/indexnode/util.go @@ -19,6 +19,8 @@ package indexnode import ( "unsafe" + "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" ) @@ -37,5 +39,8 @@ func estimateFieldDataSize(dim int64, numRows int64, dataType schemapb.DataType) if dataType == schemapb.DataType_BFloat16Vector { return uint64(dim) * uint64(numRows) * 2, nil } + if dataType == schemapb.DataType_SparseFloatVector { + return 0, errors.New("could not estimate field data size of SparseFloatVector") + } return 0, nil } diff --git a/internal/parser/planparserv2/plan_parser_v2.go b/internal/parser/planparserv2/plan_parser_v2.go index 29ed3d0d1fd8a..c1bc6915f4d75 100644 --- a/internal/parser/planparserv2/plan_parser_v2.go +++ b/internal/parser/planparserv2/plan_parser_v2.go @@ -137,6 +137,8 @@ func CreateSearchPlan(schema *typeutil.SchemaHelper, exprStr string, vectorField vectorType = planpb.VectorType_Float16Vector } else if dataType == schemapb.DataType_BFloat16Vector { vectorType = planpb.VectorType_BFloat16Vector + } else if dataType == schemapb.DataType_SparseFloatVector { + vectorType = planpb.VectorType_SparseFloatVector } planNode := &planpb.PlanNode{ Node: &planpb.PlanNode_VectorAnns{ diff --git a/internal/parser/planparserv2/plan_parser_v2_test.go b/internal/parser/planparserv2/plan_parser_v2_test.go index 20fde6d05badc..a12320f37ad53 100644 --- a/internal/parser/planparserv2/plan_parser_v2_test.go +++ b/internal/parser/planparserv2/plan_parser_v2_test.go @@ -428,6 +428,17 @@ func TestCreateBFloat16earchPlan(t *testing.T) { assert.NoError(t, err) } +func TestCreateSparseFloatVectorSearchPlan(t *testing.T) { + schema := newTestSchemaHelper(t) + _, err := CreateSearchPlan(schema, `$meta["A"] != 10`, "SparseFloatVectorField", &planpb.QueryInfo{ + Topk: 0, + MetricType: "", + SearchParams: "", + RoundDecimal: 0, + }) + assert.NoError(t, err) +} + func TestExpr_Invalid(t *testing.T) { schema := newTestSchema() helper, err := typeutil.CreateSchemaHelper(schema) diff --git a/internal/proxy/task.go b/internal/proxy/task.go index ee6feec8e51e2..84c735dc42c8a 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -322,7 +322,7 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error { if err := validateFieldName(field.Name); err != nil { return err } - // validate vector field type parameters + // validate dense vector field type parameters if isVectorType(field.DataType) { err = validateDimension(field) if err != nil { diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index 3356979188612..3eb1f756c6b61 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/indexparams" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -174,9 +175,7 @@ func (cit *createIndexTask) parseIndexParams() error { fmt.Sprintf("create index on %s field", cit.fieldSchema.DataType.String()), fmt.Sprintf("create index on %s field is not supported", cit.fieldSchema.DataType.String())) } - } - - if isVecIndex { + } else { specifyIndexType, exist := indexParamsMap[common.IndexTypeKey] if Params.AutoIndexConfig.Enable.GetAsBool() { // `enable` only for cloud instance. log.Info("create index trigger AutoIndex", @@ -258,6 +257,12 @@ func (cit *createIndexTask) parseIndexParams() error { return err } } + if indexType == indexparamcheck.IndexSparseInverted || indexType == indexparamcheck.IndexSparseWand { + metricType, metricTypeExist := indexParamsMap[common.MetricTypeKey] + if !metricTypeExist || metricType != metric.IP { + return fmt.Errorf("only IP is the supported metric type for sparse index") + } + } err := checkTrain(cit.fieldSchema, indexParamsMap) if err != nil { @@ -309,13 +314,7 @@ func (cit *createIndexTask) getIndexedField(ctx context.Context) (*schemapb.Fiel } func fillDimension(field *schemapb.FieldSchema, indexParams map[string]string) error { - vecDataTypes := []schemapb.DataType{ - schemapb.DataType_FloatVector, - schemapb.DataType_BinaryVector, - schemapb.DataType_Float16Vector, - schemapb.DataType_BFloat16Vector, - } - if !funcutil.SliceContain(vecDataTypes, field.GetDataType()) { + if !isVectorType(field.GetDataType()) { return nil } params := make([]*commonpb.KeyValuePair, 0, len(field.GetTypeParams())+len(field.GetIndexParams())) @@ -338,14 +337,7 @@ func fillDimension(field *schemapb.FieldSchema, indexParams map[string]string) e func checkTrain(field *schemapb.FieldSchema, indexParams map[string]string) error { indexType := indexParams[common.IndexTypeKey] - // skip params check of non-vector field. - vecDataTypes := []schemapb.DataType{ - schemapb.DataType_FloatVector, - schemapb.DataType_BinaryVector, - schemapb.DataType_Float16Vector, - schemapb.DataType_BFloat16Vector, - } - if !funcutil.SliceContain(vecDataTypes, field.GetDataType()) { + if !isVectorType(field.GetDataType()) { return indexparamcheck.CheckIndexValid(field.GetDataType(), indexType, indexParams) } @@ -355,8 +347,10 @@ func checkTrain(field *schemapb.FieldSchema, indexParams map[string]string) erro return fmt.Errorf("invalid index type: %s", indexType) } - if err := fillDimension(field, indexParams); err != nil { - return err + if !isSparseVectorType(field.DataType) { + if err := fillDimension(field, indexParams); err != nil { + return err + } } if err := checker.CheckValidDataType(field.GetDataType()); err != nil { diff --git a/internal/proxy/task_index_test.go b/internal/proxy/task_index_test.go index d7989fe274b11..e365ca88d43fd 100644 --- a/internal/proxy/task_index_test.go +++ b/internal/proxy/task_index_test.go @@ -272,6 +272,76 @@ func TestCreateIndexTask_PreExecute(t *testing.T) { }) } +func Test_sparse_parseIndexParams(t *testing.T) { + cit := &createIndexTask{ + Condition: nil, + req: &milvuspb.CreateIndexRequest{ + Base: nil, + DbName: "", + CollectionName: "", + FieldName: "", + ExtraParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "SPARSE_INVERTED_INDEX", + }, + { + Key: MetricTypeKey, + Value: "IP", + }, + { + Key: common.IndexParamsKey, + Value: "{\"drop_ratio_build\": 0.3}", + }, + }, + IndexName: "", + }, + ctx: nil, + rootCoord: nil, + result: nil, + isAutoIndex: false, + newIndexParams: nil, + newTypeParams: nil, + collectionID: 0, + fieldSchema: &schemapb.FieldSchema{ + FieldID: 101, + Name: "FieldID", + IsPrimaryKey: false, + Description: "field no.1", + DataType: schemapb.DataType_SparseFloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: MetricTypeKey, + Value: "IP", + }, + }, + }, + } + + t.Run("parse index params", func(t *testing.T) { + err := cit.parseIndexParams() + assert.NoError(t, err) + + assert.ElementsMatch(t, + []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "SPARSE_INVERTED_INDEX", + }, + { + Key: MetricTypeKey, + Value: "IP", + }, + { + Key: "drop_ratio_build", + Value: "0.3", + }, + }, cit.newIndexParams) + assert.ElementsMatch(t, + []*commonpb.KeyValuePair{}, cit.newTypeParams) + }) +} + func Test_parseIndexParams(t *testing.T) { cit := &createIndexTask{ Condition: nil, diff --git a/internal/proxy/util.go b/internal/proxy/util.go index b9c3ea8cc4892..1aaac133ea839 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -98,7 +98,12 @@ func isVectorType(dataType schemapb.DataType) bool { return dataType == schemapb.DataType_FloatVector || dataType == schemapb.DataType_BinaryVector || dataType == schemapb.DataType_Float16Vector || - dataType == schemapb.DataType_BFloat16Vector + dataType == schemapb.DataType_BFloat16Vector || + dataType == schemapb.DataType_SparseFloatVector +} + +func isSparseVectorType(dataType schemapb.DataType) bool { + return dataType == schemapb.DataType_SparseFloatVector } func validateMaxQueryResultWindow(offset int64, limit int64) error { @@ -307,6 +312,12 @@ func validateDimension(field *schemapb.FieldSchema) error { break } } + if isSparseVectorType(field.DataType) { + if exist { + return fmt.Errorf("dim should not be specified for sparse vector field %s(%d)", field.Name, field.FieldID) + } + return nil + } if !exist { return errors.New("dimension is not defined in field type params, check type param `dim` for vector field") } @@ -509,7 +520,7 @@ func isVector(dataType schemapb.DataType) (bool, error) { schemapb.DataType_Float, schemapb.DataType_Double: return false, nil - case schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + case schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: return true, nil } @@ -520,7 +531,7 @@ func validateMetricType(dataType schemapb.DataType, metricTypeStrRaw string) err metricTypeStr := strings.ToUpper(metricTypeStrRaw) switch metricTypeStr { case metric.L2, metric.IP, metric.COSINE: - if dataType == schemapb.DataType_FloatVector || dataType == schemapb.DataType_Float16Vector || dataType == schemapb.DataType_BFloat16Vector { + if dataType == schemapb.DataType_FloatVector || dataType == schemapb.DataType_Float16Vector || dataType == schemapb.DataType_BFloat16Vector || dataType == schemapb.DataType_SparseFloatVector { return nil } case metric.JACCARD, metric.HAMMING, metric.SUBSTRUCTURE, metric.SUPERSTRUCTURE: @@ -581,13 +592,15 @@ func validateSchema(coll *schemapb.CollectionSchema) error { if err2 != nil { return err2 } - dimStr, ok := typeKv[common.DimKey] - if !ok { - return fmt.Errorf("dim not found in type_params for vector field %s(%d)", field.Name, field.FieldID) - } - dim, err := strconv.Atoi(dimStr) - if err != nil || dim < 0 { - return fmt.Errorf("invalid dim; %s", dimStr) + if !isSparseVectorType(field.DataType) { + dimStr, ok := typeKv[common.DimKey] + if !ok { + return fmt.Errorf("dim not found in type_params for vector field %s(%d)", field.Name, field.FieldID) + } + dim, err := strconv.Atoi(dimStr) + if err != nil || dim < 0 { + return fmt.Errorf("invalid dim; %s", dimStr) + } } metricTypeStr, ok := indexKv[common.MetricTypeKey] @@ -624,7 +637,7 @@ func validateMultipleVectorFields(schema *schemapb.CollectionSchema) error { for i := range schema.Fields { name := schema.Fields[i].Name dType := schema.Fields[i].DataType - isVec := dType == schemapb.DataType_BinaryVector || dType == schemapb.DataType_FloatVector || dType == schemapb.DataType_Float16Vector || dType == schemapb.DataType_BFloat16Vector + isVec := dType == schemapb.DataType_BinaryVector || dType == schemapb.DataType_FloatVector || dType == schemapb.DataType_Float16Vector || dType == schemapb.DataType_BFloat16Vector || dType == schemapb.DataType_SparseFloatVector if isVec && vecExist && !enableMultipleVectorFields { return fmt.Errorf( "multiple vector fields is not supported, fields name: %s, %s", diff --git a/internal/proxy/validate_util.go b/internal/proxy/validate_util.go index db8bededaee4a..e75c79cde756c 100644 --- a/internal/proxy/validate_util.go +++ b/internal/proxy/validate_util.go @@ -85,6 +85,10 @@ func (v *validateUtil) Validate(data []*schemapb.FieldData, schema *schemapb.Col if err := v.checkBinaryVectorFieldData(field, fieldSchema); err != nil { return err } + case schemapb.DataType_SparseFloatVector: + if err := v.checkSparseFloatFieldData(field, fieldSchema); err != nil { + return err + } case schemapb.DataType_VarChar: if err := v.checkVarCharFieldData(field, fieldSchema); err != nil { return err @@ -205,6 +209,13 @@ func (v *validateUtil) checkAligned(data []*schemapb.FieldData, schema *typeutil if n != numRows { return errNumRowsMismatch(field.GetFieldName(), n) } + + case schemapb.DataType_SparseFloatVector: + n := uint64(len(field.GetVectors().GetSparseFloatVector().Contents)) + if n != numRows { + return errNumRowsMismatch(field.GetFieldName(), n) + } + default: // error won't happen here. n, err := funcutil.GetNumRowOfFieldData(field) @@ -326,6 +337,19 @@ func (v *validateUtil) checkBinaryVectorFieldData(field *schemapb.FieldData, fie return nil } +func (v *validateUtil) checkSparseFloatFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error { + if field.GetVectors() == nil || field.GetVectors().GetSparseFloatVector() == nil { + msg := fmt.Sprintf("sparse float field '%v' is illegal, nil SparseFloatVector", field.GetFieldName()) + return merr.WrapErrParameterInvalid("need sparse float array", "got nil", msg) + } + sparseRows := field.GetVectors().GetSparseFloatVector().GetContents() + if sparseRows == nil { + msg := fmt.Sprintf("sparse float field '%v' is illegal, array type mismatch", field.GetFieldName()) + return merr.WrapErrParameterInvalid("need sparse float array", "got nil", msg) + } + return typeutil.ValidateSparseFloatRows(sparseRows...) +} + func (v *validateUtil) checkVarCharFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error { strArr := field.GetScalars().GetStringData().GetData() if strArr == nil && fieldSchema.GetDefaultValue() == nil { diff --git a/internal/querynodev2/segments/utils.go b/internal/querynodev2/segments/utils.go index 1a7b1dc54f853..640d1a4fbcfed 100644 --- a/internal/querynodev2/segments/utils.go +++ b/internal/querynodev2/segments/utils.go @@ -93,6 +93,8 @@ func getPKsFromRowBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.Coll break } } + case schemapb.DataType_SparseFloatVector: + return nil, fmt.Errorf("SparseFloatVector not support in row based message") } } @@ -166,6 +168,10 @@ func fillFloatVecFieldData(ctx context.Context, vcm storage.ChunkManager, dataPa return nil } +func fillSparseFloatVecFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error { + return fmt.Errorf("fillSparseFloatVecFieldData not implemented") +} + func fillBoolFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error { // read whole file. // TODO: optimize here. @@ -274,6 +280,8 @@ func fillFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath strin return fillBinVecFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian) case schemapb.DataType_FloatVector: return fillFloatVecFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian) + case schemapb.DataType_SparseFloatVector: + return fillSparseFloatVecFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian) case schemapb.DataType_Bool: return fillBoolFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian) case schemapb.DataType_String, schemapb.DataType_VarChar: diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 0a38c68854564..78d237f366709 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -157,7 +157,7 @@ func (writer *InsertBinlogWriter) NextInsertEventWriter(dim ...int) (*insertEven var event *insertEventWriter var err error - if typeutil.IsVectorType(writer.PayloadDataType) { + if typeutil.IsVectorType(writer.PayloadDataType) && !typeutil.IsSparseVectorType(writer.PayloadDataType) { if len(dim) != 1 { return nil, fmt.Errorf("incorrect input numbers") } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 63d4a5cb87b55..b44e320c1da4f 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -203,7 +203,7 @@ func (insertCodec *InsertCodec) SerializePkStatsByData(data *InsertData) (*Blob, return nil, fmt.Errorf("there is no pk field") } -// Serialize transfer insert data to blob. It will sort insert data by timestamp. +// Serialize transforms insert data to blob. It will sort insert data by timestamp. // From schema, it gets all fields. // For each field, it will create a binlog writer, and write an event to the binlog. // It returns binlog buffer in the end. @@ -259,6 +259,8 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique eventWriter, err = writer.NextInsertEventWriter(singleData.(*Float16VectorFieldData).Dim) case schemapb.DataType_BFloat16Vector: eventWriter, err = writer.NextInsertEventWriter(singleData.(*BFloat16VectorFieldData).Dim) + case schemapb.DataType_SparseFloatVector: + eventWriter, err = writer.NextInsertEventWriter() default: return nil, fmt.Errorf("undefined data type %d", field.DataType) } @@ -384,12 +386,15 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Float16VectorFieldData).GetMemorySize())) case schemapb.DataType_BFloat16Vector: err = eventWriter.AddBFloat16VectorToPayload(singleData.(*BFloat16VectorFieldData).Data, singleData.(*BFloat16VectorFieldData).Dim) + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BFloat16VectorFieldData).GetMemorySize())) + case schemapb.DataType_SparseFloatVector: + err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData)) if err != nil { eventWriter.Close() writer.Close() return nil, err } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BFloat16VectorFieldData).GetMemorySize())) + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*SparseFloatVectorFieldData).GetMemorySize())) default: return nil, fmt.Errorf("undefined data type %d", field.DataType) } @@ -776,6 +781,22 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int floatVectorFieldData.Dim = dim insertData.Data[fieldID] = floatVectorFieldData + case schemapb.DataType_SparseFloatVector: + sparseData, _, err := eventReader.GetSparseFloatVectorFromPayload() + if err != nil { + eventReader.Close() + binlogReader.Close() + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &SparseFloatVectorFieldData{} + } + vec := insertData.Data[fieldID].(*SparseFloatVectorFieldData) + vec.AppendAllRows(sparseData) + + totalLength += sparseData.RowNum() + insertData.Data[fieldID] = vec + default: eventReader.Close() binlogReader.Close() diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 8c6468f994627..21b5df30732bc 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -30,28 +30,30 @@ import ( "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/testutils" ) const ( - CollectionID = 1 - PartitionID = 1 - SegmentID = 1 - RowIDField = 0 - TimestampField = 1 - BoolField = 100 - Int8Field = 101 - Int16Field = 102 - Int32Field = 103 - Int64Field = 104 - FloatField = 105 - DoubleField = 106 - StringField = 107 - BinaryVectorField = 108 - FloatVectorField = 109 - ArrayField = 110 - JSONField = 111 - Float16VectorField = 112 - BFloat16VectorField = 113 + CollectionID = 1 + PartitionID = 1 + SegmentID = 1 + RowIDField = 0 + TimestampField = 1 + BoolField = 100 + Int8Field = 101 + Int16Field = 102 + Int32Field = 103 + Int64Field = 104 + FloatField = 105 + DoubleField = 106 + StringField = 107 + BinaryVectorField = 108 + FloatVectorField = 109 + ArrayField = 110 + JSONField = 111 + Float16VectorField = 112 + BFloat16VectorField = 113 + SparseFloatVectorField = 114 ) func genTestCollectionMeta() *etcdpb.CollectionMeta { @@ -187,6 +189,13 @@ func genTestCollectionMeta() *etcdpb.CollectionMeta { }, }, }, + { + FieldID: SparseFloatVectorField, + Name: "field_sparse_float_vector", + Description: "sparse_float_vector", + DataType: schemapb.DataType_SparseFloatVector, + TypeParams: []*commonpb.KeyValuePair{}, + }, }, }, } @@ -266,6 +275,16 @@ func TestInsertCodec(t *testing.T) { Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255}, Dim: 4, }, + SparseFloatVectorField: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + }, + }, + }, }, } @@ -319,6 +338,16 @@ func TestInsertCodec(t *testing.T) { Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255}, Dim: 4, }, + SparseFloatVectorField: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 300, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{5, 6, 7}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{15, 26, 37}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{105, 207, 299}, []float32{3.1, 3.2, 3.3}), + }, + }, + }, ArrayField: &ArrayFieldData{ ElementType: schemapb.DataType_Int32, Data: []*schemapb.ScalarField{ @@ -359,8 +388,14 @@ func TestInsertCodec(t *testing.T) { FloatVectorField: &FloatVectorFieldData{[]float32{}, 4}, Float16VectorField: &Float16VectorFieldData{[]byte{}, 4}, BFloat16VectorField: &BFloat16VectorFieldData{[]byte{}, 4}, - ArrayField: &ArrayFieldData{schemapb.DataType_Int32, []*schemapb.ScalarField{}}, - JSONField: &JSONFieldData{[][]byte{}}, + SparseFloatVectorField: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 0, + Contents: [][]byte{}, + }, + }, + ArrayField: &ArrayFieldData{schemapb.DataType_Int32, []*schemapb.ScalarField{}}, + JSONField: &JSONFieldData{[][]byte{}}, }, } b, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty) @@ -414,6 +449,19 @@ func TestInsertCodec(t *testing.T) { 0, 255, 0, 255, 0, 255, 0, 255, }, resultData.Data[BFloat16VectorField].(*BFloat16VectorFieldData).Data) + assert.Equal(t, schemapb.SparseFloatArray{ + // merged dim should be max of all dims + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{5, 6, 7}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{15, 26, 37}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{105, 207, 299}, []float32{3.1, 3.2, 3.3}), + testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + }, + }, resultData.Data[SparseFloatVectorField].(*SparseFloatVectorFieldData).SparseFloatArray) + int32ArrayList := [][]int32{{1, 2, 3}, {4, 5, 6}, {3, 2, 1}, {6, 5, 4}} resultArrayList := [][]int32{} for _, v := range resultData.Data[ArrayField].(*ArrayFieldData).Data { diff --git a/internal/storage/data_sorter.go b/internal/storage/data_sorter.go index 691825b8eaa82..c7e1d3dd884a9 100644 --- a/internal/storage/data_sorter.go +++ b/internal/storage/data_sorter.go @@ -114,6 +114,9 @@ func (ds *DataSorter) Swap(i, j int) { case schemapb.DataType_JSON: data := singleData.(*JSONFieldData).Data data[i], data[j] = data[j], data[i] + case schemapb.DataType_SparseFloatVector: + fieldData := singleData.(*SparseFloatVectorFieldData) + fieldData.Contents[i], fieldData.Contents[j] = fieldData.Contents[j], fieldData.Contents[i] default: errMsg := "undefined data type " + string(field.DataType) panic(errMsg) diff --git a/internal/storage/data_sorter_test.go b/internal/storage/data_sorter_test.go index 5413dd094cb33..324b3a5170508 100644 --- a/internal/storage/data_sorter_test.go +++ b/internal/storage/data_sorter_test.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/pkg/util/testutils" ) func TestDataSorter(t *testing.T) { @@ -132,9 +133,16 @@ func TestDataSorter(t *testing.T) { FieldID: 111, Name: "field_bfloat16_vector", IsPrimaryKey: false, - Description: "description_12", + Description: "description_13", DataType: schemapb.DataType_BFloat16Vector, }, + { + FieldID: 112, + Name: "field_sparse_float_vector", + IsPrimaryKey: false, + Description: "description_14", + DataType: schemapb.DataType_SparseFloatVector, + }, }, }, } @@ -188,6 +196,16 @@ func TestDataSorter(t *testing.T) { Data: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23}, Dim: 4, }, + 112: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + }, + }, + }, }, } @@ -237,6 +255,7 @@ func TestDataSorter(t *testing.T) { // } // } + // last row should be moved to the first row assert.Equal(t, []int64{2, 3, 4}, dataSorter.InsertData.Data[0].(*Int64FieldData).Data) assert.Equal(t, []int64{5, 3, 4}, dataSorter.InsertData.Data[1].(*Int64FieldData).Data) assert.Equal(t, []bool{true, true, false}, dataSorter.InsertData.Data[100].(*BoolFieldData).Data) @@ -251,6 +270,14 @@ func TestDataSorter(t *testing.T) { assert.Equal(t, []float32{16, 17, 18, 19, 20, 21, 22, 23, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, dataSorter.InsertData.Data[109].(*FloatVectorFieldData).Data) assert.Equal(t, []byte{16, 17, 18, 19, 20, 21, 22, 23, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, dataSorter.InsertData.Data[110].(*Float16VectorFieldData).Data) assert.Equal(t, []byte{16, 17, 18, 19, 20, 21, 22, 23, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, dataSorter.InsertData.Data[111].(*BFloat16VectorFieldData).Data) + assert.Equal(t, schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + }, + }, dataSorter.InsertData.Data[112].(*SparseFloatVectorFieldData).SparseFloatArray) } func TestDataSorter_Len(t *testing.T) { diff --git a/internal/storage/event_writer.go b/internal/storage/event_writer.go index 5d58361f423c3..040ecec58db92 100644 --- a/internal/storage/event_writer.go +++ b/internal/storage/event_writer.go @@ -215,7 +215,7 @@ func newDescriptorEvent() *descriptorEvent { func newInsertEventWriter(dataType schemapb.DataType, dim ...int) (*insertEventWriter, error) { var payloadWriter PayloadWriterInterface var err error - if typeutil.IsVectorType(dataType) { + if typeutil.IsVectorType(dataType) && !typeutil.IsSparseVectorType(dataType) { if len(dim) != 1 { return nil, fmt.Errorf("incorrect input numbers") } diff --git a/internal/storage/insert_data.go b/internal/storage/insert_data.go index 1ebda6ef8ca8d..945162915b0d4 100644 --- a/internal/storage/insert_data.go +++ b/internal/storage/insert_data.go @@ -20,9 +20,12 @@ import ( "encoding/binary" "fmt" + "github.com/gogo/protobuf/proto" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // TODO: fill it @@ -181,7 +184,8 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema) Data: make([]byte, 0), Dim: dim, }, nil - + case schemapb.DataType_SparseFloatVector: + return &SparseFloatVectorFieldData{}, nil case schemapb.DataType_Bool: return &BoolFieldData{ Data: make([]bool, 0), @@ -283,6 +287,20 @@ type BFloat16VectorFieldData struct { Dim int } +type SparseFloatVectorFieldData struct { + schemapb.SparseFloatArray +} + +func (dst *SparseFloatVectorFieldData) AppendAllRows(src *SparseFloatVectorFieldData) { + if len(src.Contents) == 0 { + return + } + if dst.Dim < src.Dim { + dst.Dim = src.Dim + } + dst.Contents = append(dst.Contents, src.Contents...) +} + // RowNum implements FieldData.RowNum func (data *BoolFieldData) RowNum() int { return len(data.Data) } func (data *Int8FieldData) RowNum() int { return len(data.Data) } @@ -300,6 +318,7 @@ func (data *Float16VectorFieldData) RowNum() int { return len(data.Data) / 2 / d func (data *BFloat16VectorFieldData) RowNum() int { return len(data.Data) / 2 / data.Dim } +func (data *SparseFloatVectorFieldData) RowNum() int { return len(data.Contents) } // GetRow implements FieldData.GetRow func (data *BoolFieldData) GetRow(i int) any { return data.Data[i] } @@ -312,10 +331,14 @@ func (data *DoubleFieldData) GetRow(i int) any { return data.Data[i] } func (data *StringFieldData) GetRow(i int) any { return data.Data[i] } func (data *ArrayFieldData) GetRow(i int) any { return data.Data[i] } func (data *JSONFieldData) GetRow(i int) any { return data.Data[i] } -func (data *BinaryVectorFieldData) GetRow(i int) interface{} { +func (data *BinaryVectorFieldData) GetRow(i int) any { return data.Data[i*data.Dim/8 : (i+1)*data.Dim/8] } +func (data *SparseFloatVectorFieldData) GetRow(i int) interface{} { + return data.Contents[i] +} + func (data *FloatVectorFieldData) GetRow(i int) interface{} { return data.Data[i*data.Dim : (i+1)*data.Dim] } @@ -328,20 +351,21 @@ func (data *BFloat16VectorFieldData) GetRow(i int) interface{} { return data.Data[i*data.Dim*2 : (i+1)*data.Dim*2] } -func (data *BoolFieldData) GetRows() any { return data.Data } -func (data *Int8FieldData) GetRows() any { return data.Data } -func (data *Int16FieldData) GetRows() any { return data.Data } -func (data *Int32FieldData) GetRows() any { return data.Data } -func (data *Int64FieldData) GetRows() any { return data.Data } -func (data *FloatFieldData) GetRows() any { return data.Data } -func (data *DoubleFieldData) GetRows() any { return data.Data } -func (data *StringFieldData) GetRows() any { return data.Data } -func (data *ArrayFieldData) GetRows() any { return data.Data } -func (data *JSONFieldData) GetRows() any { return data.Data } -func (data *BinaryVectorFieldData) GetRows() any { return data.Data } -func (data *FloatVectorFieldData) GetRows() any { return data.Data } -func (data *Float16VectorFieldData) GetRows() any { return data.Data } -func (data *BFloat16VectorFieldData) GetRows() any { return data.Data } +func (data *BoolFieldData) GetRows() any { return data.Data } +func (data *Int8FieldData) GetRows() any { return data.Data } +func (data *Int16FieldData) GetRows() any { return data.Data } +func (data *Int32FieldData) GetRows() any { return data.Data } +func (data *Int64FieldData) GetRows() any { return data.Data } +func (data *FloatFieldData) GetRows() any { return data.Data } +func (data *DoubleFieldData) GetRows() any { return data.Data } +func (data *StringFieldData) GetRows() any { return data.Data } +func (data *ArrayFieldData) GetRows() any { return data.Data } +func (data *JSONFieldData) GetRows() any { return data.Data } +func (data *BinaryVectorFieldData) GetRows() any { return data.Data } +func (data *FloatVectorFieldData) GetRows() any { return data.Data } +func (data *Float16VectorFieldData) GetRows() any { return data.Data } +func (data *BFloat16VectorFieldData) GetRows() any { return data.Data } +func (data *SparseFloatVectorFieldData) GetRows() any { return data.Contents } // AppendRow implements FieldData.AppendRow func (data *BoolFieldData) AppendRow(row interface{}) error { @@ -470,6 +494,22 @@ func (data *BFloat16VectorFieldData) AppendRow(row interface{}) error { return nil } +func (data *SparseFloatVectorFieldData) AppendRow(row interface{}) error { + v, ok := row.([]byte) + if !ok { + return merr.WrapErrParameterInvalid("SparseFloatVectorRowData", row, "Wrong row type") + } + if err := typeutil.ValidateSparseFloatRows(v); err != nil { + return err + } + rowDim := typeutil.SparseFloatRowDim(v) + if data.Dim < rowDim { + data.Dim = rowDim + } + data.Contents = append(data.Contents, v) + return nil +} + func (data *BoolFieldData) AppendRows(rows interface{}) error { v, ok := rows.([]bool) if !ok { @@ -612,6 +652,18 @@ func (data *BFloat16VectorFieldData) AppendRows(rows interface{}) error { return nil } +func (data *SparseFloatVectorFieldData) AppendRows(rows interface{}) error { + v, ok := rows.(SparseFloatVectorFieldData) + if !ok { + return merr.WrapErrParameterInvalid("SparseFloatVectorFieldData", rows, "Wrong rows type") + } + data.Contents = append(data.SparseFloatArray.Contents, v.Contents...) + if data.Dim < v.Dim { + data.Dim = v.Dim + } + return nil +} + // GetMemorySize implements FieldData.GetMemorySize func (data *BoolFieldData) GetMemorySize() int { return binary.Size(data.Data) } func (data *Int8FieldData) GetMemorySize() int { return binary.Size(data.Data) } @@ -627,6 +679,11 @@ func (data *BFloat16VectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 } +func (data *SparseFloatVectorFieldData) GetMemorySize() int { + // TODO(SPARSE): should this be the memory size of serialzied size? + return proto.Size(&data.SparseFloatArray) +} + // GetDataType implements FieldData.GetDataType func (data *BoolFieldData) GetDataType() schemapb.DataType { return schemapb.DataType_Bool } func (data *Int8FieldData) GetDataType() schemapb.DataType { return schemapb.DataType_Int8 } @@ -654,6 +711,10 @@ func (data *BFloat16VectorFieldData) GetDataType() schemapb.DataType { return schemapb.DataType_BFloat16Vector } +func (data *SparseFloatVectorFieldData) GetDataType() schemapb.DataType { + return schemapb.DataType_SparseFloatVector +} + // why not binary.Size(data) directly? binary.Size(data) return -1 // binary.Size returns how many bytes Write would generate to encode the value v, which // must be a fixed-size value or a slice of fixed-size values, or a pointer to such data. @@ -733,3 +794,7 @@ func (data *ArrayFieldData) GetRowSize(i int) int { } return 0 } + +func (data *SparseFloatVectorFieldData) GetRowSize(i int) int { + return len(data.Contents[i]) +} diff --git a/internal/storage/insert_data_test.go b/internal/storage/insert_data_test.go index 5d12da7b3d090..e63a9c515e554 100644 --- a/internal/storage/insert_data_test.go +++ b/internal/storage/insert_data_test.go @@ -9,6 +9,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/testutils" ) func TestInsertDataSuite(t *testing.T) { @@ -84,11 +85,11 @@ func (s *InsertDataSuite) TestInsertData() { s.False(s.iDataOneRow.IsEmpty()) s.Equal(1, s.iDataOneRow.GetRowNum()) - s.Equal(151, s.iDataOneRow.GetMemorySize()) + s.Equal(179, s.iDataOneRow.GetMemorySize()) s.False(s.iDataTwoRows.IsEmpty()) s.Equal(2, s.iDataTwoRows.GetRowNum()) - s.Equal(286, s.iDataTwoRows.GetMemorySize()) + s.Equal(340, s.iDataTwoRows.GetMemorySize()) for _, field := range s.iDataTwoRows.Data { s.Equal(2, field.RowNum()) @@ -187,20 +188,21 @@ func (s *InsertDataSuite) SetupTest() { s.Equal(16, s.iDataEmpty.GetMemorySize()) row1 := map[FieldID]interface{}{ - RowIDField: int64(3), - TimestampField: int64(3), - BoolField: true, - Int8Field: int8(3), - Int16Field: int16(3), - Int32Field: int32(3), - Int64Field: int64(3), - FloatField: float32(3), - DoubleField: float64(3), - StringField: "str", - BinaryVectorField: []byte{0}, - FloatVectorField: []float32{4, 5, 6, 7}, - Float16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255}, - BFloat16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255}, + RowIDField: int64(3), + TimestampField: int64(3), + BoolField: true, + Int8Field: int8(3), + Int16Field: int16(3), + Int32Field: int32(3), + Int64Field: int64(3), + FloatField: float32(3), + DoubleField: float64(3), + StringField: "str", + BinaryVectorField: []byte{0}, + FloatVectorField: []float32{4, 5, 6, 7}, + Float16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255}, + BFloat16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255}, + SparseFloatVectorField: testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{4, 5, 6}), ArrayField: &schemapb.ScalarField{ Data: &schemapb.ScalarField_IntData{ IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}}, @@ -219,20 +221,21 @@ func (s *InsertDataSuite) SetupTest() { } row2 := map[FieldID]interface{}{ - RowIDField: int64(1), - TimestampField: int64(1), - BoolField: false, - Int8Field: int8(1), - Int16Field: int16(1), - Int32Field: int32(1), - Int64Field: int64(1), - FloatField: float32(1), - DoubleField: float64(1), - StringField: string("str"), - BinaryVectorField: []byte{0}, - FloatVectorField: []float32{4, 5, 6, 7}, - Float16VectorField: []byte{1, 2, 3, 4, 5, 6, 7, 8}, - BFloat16VectorField: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + RowIDField: int64(1), + TimestampField: int64(1), + BoolField: false, + Int8Field: int8(1), + Int16Field: int16(1), + Int32Field: int32(1), + Int64Field: int64(1), + FloatField: float32(1), + DoubleField: float64(1), + StringField: string("str"), + BinaryVectorField: []byte{0}, + FloatVectorField: []float32{4, 5, 6, 7}, + Float16VectorField: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + BFloat16VectorField: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + SparseFloatVectorField: testutils.CreateSparseFloatRow([]uint32{2, 3, 4}, []float32{4, 5, 6}), ArrayField: &schemapb.ScalarField{ Data: &schemapb.ScalarField_IntData{ IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}}, diff --git a/internal/storage/payload.go b/internal/storage/payload.go index c810c67659287..99fa0f52094e3 100644 --- a/internal/storage/payload.go +++ b/internal/storage/payload.go @@ -42,6 +42,7 @@ type PayloadWriterInterface interface { AddFloatVectorToPayload(binVec []float32, dim int) error AddFloat16VectorToPayload(binVec []byte, dim int) error AddBFloat16VectorToPayload(binVec []byte, dim int) error + AddSparseFloatVectorToPayload(data *SparseFloatVectorFieldData) error FinishPayloadWriter() error GetPayloadBufferFromWriter() ([]byte, error) GetPayloadLengthFromWriter() (int, error) @@ -67,6 +68,7 @@ type PayloadReaderInterface interface { GetFloat16VectorFromPayload() ([]byte, int, error) GetBFloat16VectorFromPayload() ([]byte, int, error) GetFloatVectorFromPayload() ([]float32, int, error) + GetSparseFloatVectorFromPayload() (*SparseFloatVectorFieldData, int, error) GetPayloadLengthFromReader() (int, error) GetByteArrayDataSet() (*DataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader], error) diff --git a/internal/storage/payload_reader.go b/internal/storage/payload_reader.go index 1a5b462209828..4a53ebf900231 100644 --- a/internal/storage/payload_reader.go +++ b/internal/storage/payload_reader.go @@ -14,6 +14,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // PayloadReader reads data from payload @@ -73,6 +74,8 @@ func (r *PayloadReader) GetDataFromPayload() (interface{}, int, error) { return r.GetFloat16VectorFromPayload() case schemapb.DataType_BFloat16Vector: return r.GetBFloat16VectorFromPayload() + case schemapb.DataType_SparseFloatVector: + return r.GetSparseFloatVectorFromPayload() case schemapb.DataType_String, schemapb.DataType_VarChar: val, err := r.GetStringFromPayload() return val, 0, err @@ -429,6 +432,36 @@ func (r *PayloadReader) GetFloatVectorFromPayload() ([]float32, int, error) { return ret, dim, nil } +func (r *PayloadReader) GetSparseFloatVectorFromPayload() (*SparseFloatVectorFieldData, int, error) { + if !typeutil.IsSparseVectorType(r.colType) { + return nil, -1, fmt.Errorf("failed to get sparse float vector from datatype %v", r.colType.String()) + } + values := make([]parquet.ByteArray, r.numRows) + valuesRead, err := ReadDataFromAllRowGroups[parquet.ByteArray, *file.ByteArrayColumnChunkReader](r.reader, values, 0, r.numRows) + if err != nil { + return nil, -1, err + } + if valuesRead != r.numRows { + return nil, -1, fmt.Errorf("expect %d binary, but got = %d", r.numRows, valuesRead) + } + + fieldData := &SparseFloatVectorFieldData{} + + for _, value := range values { + if len(value)%8 != 0 { + return nil, -1, fmt.Errorf("invalid bytesData length") + } + + fieldData.Contents = append(fieldData.Contents, value) + rowDim := typeutil.SparseFloatRowDim(value) + if rowDim > fieldData.Dim { + fieldData.Dim = rowDim + } + } + + return fieldData, int(fieldData.Dim), nil +} + func (r *PayloadReader) GetPayloadLengthFromReader() (int, error) { return int(r.numRows), nil } diff --git a/internal/storage/payload_test.go b/internal/storage/payload_test.go index fe0db83732130..b50aad7b34b90 100644 --- a/internal/storage/payload_test.go +++ b/internal/storage/payload_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/util/testutils" ) func TestPayload_ReaderAndWriter(t *testing.T) { @@ -619,6 +620,170 @@ func TestPayload_ReaderAndWriter(t *testing.T) { defer r.ReleasePayloadReader() }) + t.Run("TestSparseFloatVector", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_SparseFloatVector) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddSparseFloatVectorToPayload(&SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + }, + }, + }) + assert.NoError(t, err) + err = w.AddSparseFloatVectorToPayload(&SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{170, 300, 579}, []float32{3.1, 3.2, 3.3}), + }, + }, + }) + assert.NoError(t, err) + err = w.FinishPayloadWriter() + assert.NoError(t, err) + + length, err := w.GetPayloadLengthFromWriter() + assert.NoError(t, err) + assert.Equal(t, 6, length) + defer w.ReleasePayloadWriter() + + buffer, err := w.GetPayloadBufferFromWriter() + assert.NoError(t, err) + + r, err := NewPayloadReader(schemapb.DataType_SparseFloatVector, buffer) + require.Nil(t, err) + length, err = r.GetPayloadLengthFromReader() + assert.NoError(t, err) + assert.Equal(t, length, 6) + + floatVecs, dim, err := r.GetSparseFloatVectorFromPayload() + assert.NoError(t, err) + assert.Equal(t, 600, dim) + assert.Equal(t, 6, len(floatVecs.Contents)) + assert.Equal(t, schemapb.SparseFloatArray{ + // merged dim should be max of all dims + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{170, 300, 579}, []float32{3.1, 3.2, 3.3}), + }, + }, floatVecs.SparseFloatArray) + + ifloatVecs, dim, err := r.GetDataFromPayload() + assert.NoError(t, err) + assert.Equal(t, floatVecs, ifloatVecs.(*SparseFloatVectorFieldData)) + assert.Equal(t, 600, dim) + defer r.ReleasePayloadReader() + }) + + testSparseOneBatch := func(t *testing.T, rows [][]byte, actualDim int) { + w, err := NewPayloadWriter(schemapb.DataType_SparseFloatVector) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddSparseFloatVectorToPayload(&SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: int64(actualDim), + Contents: rows, + }, + }) + assert.NoError(t, err) + err = w.FinishPayloadWriter() + assert.NoError(t, err) + + length, err := w.GetPayloadLengthFromWriter() + assert.NoError(t, err) + assert.Equal(t, 3, length) + defer w.ReleasePayloadWriter() + + buffer, err := w.GetPayloadBufferFromWriter() + assert.NoError(t, err) + + r, err := NewPayloadReader(schemapb.DataType_SparseFloatVector, buffer) + require.Nil(t, err) + length, err = r.GetPayloadLengthFromReader() + assert.NoError(t, err) + assert.Equal(t, length, 3) + + floatVecs, dim, err := r.GetSparseFloatVectorFromPayload() + assert.NoError(t, err) + assert.Equal(t, actualDim, dim) + assert.Equal(t, 3, len(floatVecs.Contents)) + assert.Equal(t, schemapb.SparseFloatArray{ + Dim: int64(dim), + Contents: rows, + }, floatVecs.SparseFloatArray) + + ifloatVecs, dim, err := r.GetDataFromPayload() + assert.NoError(t, err) + assert.Equal(t, floatVecs, ifloatVecs.(*SparseFloatVectorFieldData)) + assert.Equal(t, actualDim, dim) + defer r.ReleasePayloadReader() + } + + t.Run("TestSparseFloatVector_emptyRow", func(t *testing.T) { + testSparseOneBatch(t, [][]byte{ + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + }, 600) + testSparseOneBatch(t, [][]byte{ + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + }, 0) + }) + + t.Run("TestSparseFloatVector_largeRow", func(t *testing.T) { + nnz := 100000 + // generate an int slice with nnz random sorted elements + indices := make([]uint32, nnz) + values := make([]float32, nnz) + for i := 0; i < nnz; i++ { + indices[i] = uint32(i * 6) + values[i] = float32(i) + } + dim := int(indices[nnz-1]) + 1 + testSparseOneBatch(t, [][]byte{ + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow(indices, values), + }, dim) + }) + + t.Run("TestSparseFloatVector_negativeValues", func(t *testing.T) { + testSparseOneBatch(t, [][]byte{ + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{-2.1, 2.2, -2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, -3.2, 3.3}), + }, 600) + }) + + // even though SPARSE_INVERTED_INDEX and SPARSE_WAND index do not support + // arbitrarily large dimensions, HNSW does, so we still need to test it. + // Dimension range we support is 0 to positive int32 max - 1(to leave room + // for dim). + t.Run("TestSparseFloatVector_largeIndex", func(t *testing.T) { + int32Max := uint32(math.MaxInt32) + testSparseOneBatch(t, [][]byte{ + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{-2.1, 2.2, -2.3}), + testutils.CreateSparseFloatRow([]uint32{100, int32Max / 2, int32Max - 1}, []float32{3.1, -3.2, 3.3}), + }, int(int32Max)) + }) + // t.Run("TestAddDataToPayload", func(t *testing.T) { // w, err := NewPayloadWriter(schemapb.DataType_Bool) // w.colType = 999 @@ -863,6 +1028,37 @@ func TestPayload_ReaderAndWriter(t *testing.T) { err = w.AddBFloat16VectorToPayload([]byte{1, 0, 0, 0, 0, 0, 0, 0}, 8) assert.Error(t, err) }) + t.Run("TestAddSparseFloatVectorAfterFinish", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_SparseFloatVector) + require.Nil(t, err) + require.NotNil(t, w) + defer w.Close() + + err = w.FinishPayloadWriter() + assert.NoError(t, err) + + err = w.AddSparseFloatVectorToPayload(&SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 53, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + }, + }, + }) + assert.Error(t, err) + err = w.AddSparseFloatVectorToPayload(&SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + }, + }, + }) + assert.Error(t, err) + + err = w.FinishPayloadWriter() + assert.Error(t, err) + }) t.Run("TestNewReadError", func(t *testing.T) { buffer := []byte{0} r, err := NewPayloadReader(999, buffer) @@ -1388,6 +1584,60 @@ func TestPayload_ReaderAndWriter(t *testing.T) { assert.Error(t, err) }) + t.Run("TestGetSparseFloatVectorError", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{false, true, true}) + assert.NoError(t, err) + + err = w.FinishPayloadWriter() + assert.NoError(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.NoError(t, err) + + r, err := NewPayloadReader(schemapb.DataType_SparseFloatVector, buffer) + assert.NoError(t, err) + + _, _, err = r.GetSparseFloatVectorFromPayload() + assert.Error(t, err) + + r.colType = 999 + _, _, err = r.GetSparseFloatVectorFromPayload() + assert.Error(t, err) + }) + + t.Run("TestGetSparseFloatVectorError2", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_SparseFloatVector) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddSparseFloatVectorToPayload(&SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 53, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + }, + }, + }) + assert.NoError(t, err) + + err = w.FinishPayloadWriter() + assert.NoError(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.NoError(t, err) + + r, err := NewPayloadReader(schemapb.DataType_SparseFloatVector, buffer) + assert.NoError(t, err) + + r.numRows = 99 + _, _, err = r.GetSparseFloatVectorFromPayload() + assert.Error(t, err) + }) + t.Run("TestWriteLargeSizeData", func(t *testing.T) { t.Skip("Large data skip for online ut") size := 1 << 29 // 512M diff --git a/internal/storage/payload_writer.go b/internal/storage/payload_writer.go index e2956a5fd6225..e1efac35b29c0 100644 --- a/internal/storage/payload_writer.go +++ b/internal/storage/payload_writer.go @@ -50,7 +50,8 @@ type NativePayloadWriter struct { func NewPayloadWriter(colType schemapb.DataType, dim ...int) (PayloadWriterInterface, error) { var arrowType arrow.DataType - if typeutil.IsVectorType(colType) { + // writer for sparse float vector doesn't require dim + if typeutil.IsVectorType(colType) && !typeutil.IsSparseVectorType(colType) { if len(dim) != 1 { return nil, fmt.Errorf("incorrect input numbers") } @@ -164,6 +165,12 @@ func (w *NativePayloadWriter) AddDataToPayload(data interface{}, dim ...int) err return errors.New("incorrect data type") } return w.AddBFloat16VectorToPayload(val, dim[0]) + case schemapb.DataType_SparseFloatVector: + val, ok := data.(*SparseFloatVectorFieldData) + if !ok { + return errors.New("incorrect data type") + } + return w.AddSparseFloatVectorToPayload(val) default: return errors.New("incorrect datatype") } @@ -475,6 +482,23 @@ func (w *NativePayloadWriter) AddBFloat16VectorToPayload(data []byte, dim int) e return nil } +func (w *NativePayloadWriter) AddSparseFloatVectorToPayload(data *SparseFloatVectorFieldData) error { + if w.finished { + return errors.New("can't append data to finished writer") + } + builder, ok := w.builder.(*array.BinaryBuilder) + if !ok { + return errors.New("failed to cast BinaryBuilder") + } + length := len(data.SparseFloatArray.Contents) + builder.Reserve(length) + for i := 0; i < length; i++ { + builder.Append(data.SparseFloatArray.Contents[i]) + } + + return nil +} + func (w *NativePayloadWriter) FinishPayloadWriter() error { if w.finished { return errors.New("can't reuse a finished writer") @@ -574,6 +598,8 @@ func milvusDataTypeToArrowType(dataType schemapb.DataType, dim int) arrow.DataTy return &arrow.FixedSizeBinaryType{ ByteWidth: dim * 2, } + case schemapb.DataType_SparseFloatVector: + return &arrow.BinaryType{} default: panic("unsupported data type") } diff --git a/internal/storage/print_binlog.go b/internal/storage/print_binlog.go index da3eafc968b18..c27cc4715a213 100644 --- a/internal/storage/print_binlog.go +++ b/internal/storage/print_binlog.go @@ -334,6 +334,18 @@ func printPayloadValues(colType schemapb.DataType, reader PayloadReaderInterface for i := 0; i < rows; i++ { fmt.Printf("\t\t%d : %s\n", i, val[i]) } + case schemapb.DataType_SparseFloatVector: + sparseData, _, err := reader.GetSparseFloatVectorFromPayload() + if err != nil { + return err + } + fmt.Println("======= SparseFloatVectorFieldData =======") + fmt.Println("row num:", len(sparseData.Contents)) + fmt.Println("dim:", sparseData.Dim) + for _, v := range sparseData.Contents { + fmt.Println(v) + } + fmt.Println("===== SparseFloatVectorFieldData end =====") default: return errors.New("undefined data type") } diff --git a/internal/storage/stats.go b/internal/storage/stats.go index b5ce61740e876..7914e04b80ef5 100644 --- a/internal/storage/stats.go +++ b/internal/storage/stats.go @@ -187,7 +187,7 @@ func (stats *PrimaryKeyStats) UpdateMinMax(pk PrimaryKey) { func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error) { if rowNum <= 0 { - return nil, merr.WrapErrParameterInvalidMsg("non zero & non negative row num", rowNum) + return nil, merr.WrapErrParameterInvalidMsg("zero or negative row num", rowNum) } return &PrimaryKeyStats{ FieldID: fieldID, diff --git a/internal/storage/utils.go b/internal/storage/utils.go index 0000642ff5d46..9c91b9cbcc538 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -422,6 +422,8 @@ func RowBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *schemap Data: vecs, Dim: dim, } + case schemapb.DataType_SparseFloatVector: + return nil, fmt.Errorf("Sparse Float Vector is not supported in row based data") case schemapb.DataType_Bool: idata.Data[field.FieldID] = &BoolFieldData{ @@ -556,6 +558,11 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche Dim: dim, } + case schemapb.DataType_SparseFloatVector: + fieldData = &SparseFloatVectorFieldData{ + SparseFloatArray: *srcFields[field.FieldID].GetVectors().GetSparseFloatVector(), + } + case schemapb.DataType_Bool: srcData := srcField.GetScalars().GetBoolData().GetData() @@ -823,6 +830,14 @@ func mergeBFloat16VectorField(data *InsertData, fid FieldID, field *BFloat16Vect fieldData.Data = append(fieldData.Data, field.Data...) } +func mergeSparseFloatVectorField(data *InsertData, fid FieldID, field *SparseFloatVectorFieldData) { + if _, ok := data.Data[fid]; !ok { + data.Data[fid] = &SparseFloatVectorFieldData{} + } + fieldData := data.Data[fid].(*SparseFloatVectorFieldData) + fieldData.AppendAllRows(field) +} + // MergeFieldData merge field into data. func MergeFieldData(data *InsertData, fid FieldID, field FieldData) { if field == nil { @@ -857,6 +872,8 @@ func MergeFieldData(data *InsertData, fid FieldID, field FieldData) { mergeFloat16VectorField(data, fid, field) case *BFloat16VectorFieldData: mergeBFloat16VectorField(data, fid, field) + case *SparseFloatVectorFieldData: + mergeSparseFloatVectorField(data, fid, field) } } @@ -1182,6 +1199,18 @@ func TransferInsertDataToInsertRecord(insertData *InsertData) (*segcorepb.Insert }, }, } + case *SparseFloatVectorFieldData: + fieldData = &schemapb.FieldData{ + Type: schemapb.DataType_SparseFloatVector, + FieldId: fieldID, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &rawData.SparseFloatArray, + }, + }, + }, + } default: return insertRecord, fmt.Errorf("unsupported data type when transter storage.InsertData to internalpb.InsertRecord") } diff --git a/internal/storage/utils_test.go b/internal/storage/utils_test.go index 8717662c61b96..f768b24ac56d8 100644 --- a/internal/storage/utils_test.go +++ b/internal/storage/utils_test.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/testutils" ) func TestCheckTsField(t *testing.T) { @@ -394,6 +395,9 @@ func genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim int) (schema *sc }, }, }, + { + DataType: schemapb.DataType_SparseFloatVector, + }, { DataType: schemapb.DataType_Array, }, @@ -900,6 +904,26 @@ func genColumnBasedInsertMsg(schema *schemapb.CollectionSchema, numRows, fVecDim for nrows := 0; nrows < numRows; nrows++ { columns[idx] = append(columns[idx], data[nrows*bf16VecDim*2:(nrows+1)*bf16VecDim*2]) } + case schemapb.DataType_SparseFloatVector: + fmt.Println("sparseFloatVector") + data := testutils.GenerateSparseFloatVectors(numRows) + f := &schemapb.FieldData{ + Type: schemapb.DataType_SparseFloatVector, + FieldName: field.Name, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: data.Dim, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: data, + }, + }, + }, + FieldId: field.FieldID, + } + msg.FieldsData = append(msg.FieldsData, f) + for nrows := 0; nrows < numRows; nrows++ { + columns[idx] = append(columns[idx], data.Contents[nrows]) + } case schemapb.DataType_Array: data := generateInt32ArrayList(numRows) @@ -1246,6 +1270,15 @@ func TestMergeInsertData(t *testing.T) { Data: []byte{0, 1}, Dim: 1, }, + SparseFloatVectorField: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + }, + }, + }, ArrayField: &ArrayFieldData{ Data: []*schemapb.ScalarField{ { @@ -1311,6 +1344,14 @@ func TestMergeInsertData(t *testing.T) { Data: []byte{2, 3}, Dim: 1, }, + SparseFloatVectorField: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{170, 300, 579}, []float32{3.1, 3.2, 3.3}), + }, + }, + }, ArrayField: &ArrayFieldData{ Data: []*schemapb.ScalarField{ { @@ -1387,6 +1428,19 @@ func TestMergeInsertData(t *testing.T) { assert.True(t, ok) assert.Equal(t, []byte{0, 1, 2, 3}, f.(*BFloat16VectorFieldData).Data) + f, ok = d1.Data[SparseFloatVectorField] + assert.True(t, ok) + assert.Equal(t, &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{170, 300, 579}, []float32{3.1, 3.2, 3.3}), + }, + }, + }, f.(*SparseFloatVectorFieldData)) + f, ok = d1.Data[ArrayField] assert.True(t, ok) assert.Equal(t, []int32{1, 2, 3}, f.(*ArrayFieldData).Data[0].GetIntData().GetData()) diff --git a/internal/util/indexcgowrapper/dataset.go b/internal/util/indexcgowrapper/dataset.go index 8a29996dc25f2..a4269f85d56b4 100644 --- a/internal/util/indexcgowrapper/dataset.go +++ b/internal/util/indexcgowrapper/dataset.go @@ -41,6 +41,16 @@ func GenBFloat16VecDataset(vectors []byte) *Dataset { } } +func GenSparseFloatVecDataset(data *storage.SparseFloatVectorFieldData) *Dataset { + // TODO(SPARSE): in search for the usage of this method, only the DType + // of the returned Dataset is used. + // If this is designed to generate a Dataset that will be sent to knowhere, + // we'll need to expose knowhere::sparse::SparseRow to Go. + return &Dataset{ + DType: schemapb.DataType_SparseFloatVector, + } +} + func GenBinaryVecDataset(vectors []byte) *Dataset { return &Dataset{ DType: schemapb.DataType_BinaryVector, @@ -116,6 +126,8 @@ func GenDataset(data storage.FieldData) *Dataset { return GenFloat16VecDataset(f.Data) case *storage.BFloat16VectorFieldData: return GenBFloat16VecDataset(f.Data) + case *storage.SparseFloatVectorFieldData: + return GenSparseFloatVecDataset(f) default: return &Dataset{ DType: schemapb.DataType_None, diff --git a/internal/util/indexcgowrapper/index.go b/internal/util/indexcgowrapper/index.go index ed0a964a2be47..9ed3ae73f2d6a 100644 --- a/internal/util/indexcgowrapper/index.go +++ b/internal/util/indexcgowrapper/index.go @@ -49,6 +49,7 @@ type CgoIndex struct { close bool } +// used only in test // TODO: use proto.Marshal instead of proto.MarshalTextString for better compatibility. func NewCgoIndex(dtype schemapb.DataType, typeParams, indexParams map[string]string) (CodecIndex, error) { protoTypeParams := &indexcgopb.TypeParams{ @@ -123,6 +124,8 @@ func CreateIndexV2(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecIn return index, nil } +// TODO: this seems to be used only for test. We should mark the method +// name with ForTest, or maybe move to test file. func (index *CgoIndex) Build(dataset *Dataset) error { switch dataset.DType { case schemapb.DataType_None: @@ -176,6 +179,12 @@ func (index *CgoIndex) buildBFloat16VecIndex(dataset *Dataset) error { return HandleCStatus(&status, "failed to build bfloat16 vector index") } +func (index *CgoIndex) buildSparseFloatVecIndex(dataset *Dataset) error { + vectors := dataset.Data[keyRawArr].([]byte) + status := C.BuildSparseFloatVecIndex(index.indexPtr, (C.int64_t)(len(vectors)), (C.int64_t)(0), (*C.uint8_t)(&vectors[0])) + return HandleCStatus(&status, "failed to build sparse float vector index") +} + func (index *CgoIndex) buildBinaryVecIndex(dataset *Dataset) error { vectors := dataset.Data[keyRawArr].([]byte) status := C.BuildBinaryVecIndex(index.indexPtr, (C.int64_t)(len(vectors)), (*C.uint8_t)(&vectors[0])) diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index 091f6482f2475..4c7e1dad5a139 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -146,7 +146,7 @@ func CheckCtxValid(ctx context.Context) bool { func GetVecFieldIDs(schema *schemapb.CollectionSchema) []int64 { var vecFieldIDs []int64 for _, field := range schema.Fields { - if field.DataType == schemapb.DataType_BinaryVector || field.DataType == schemapb.DataType_FloatVector || field.DataType == schemapb.DataType_Float16Vector { + if field.DataType == schemapb.DataType_BinaryVector || field.DataType == schemapb.DataType_FloatVector || field.DataType == schemapb.DataType_Float16Vector || field.DataType == schemapb.DataType_SparseFloatVector { vecFieldIDs = append(vecFieldIDs, field.FieldID) } } @@ -335,6 +335,8 @@ func GetNumRowOfFieldData(fieldData *schemapb.FieldData) (uint64, error) { if err != nil { return 0, err } + case *schemapb.VectorField_SparseFloatVector: + fieldNumRows = uint64(len(vectorField.GetSparseFloatVector().GetContents())) default: return 0, fmt.Errorf("%s is not supported now", vectorFieldType) } diff --git a/pkg/util/funcutil/placeholdergroup.go b/pkg/util/funcutil/placeholdergroup.go index 5dd13cd1d445d..538a70fcbefcc 100644 --- a/pkg/util/funcutil/placeholdergroup.go +++ b/pkg/util/funcutil/placeholdergroup.go @@ -2,6 +2,7 @@ package funcutil import ( "encoding/binary" + "fmt" "math" "github.com/cockroachdb/errors" @@ -76,6 +77,22 @@ func fieldDataToPlaceholderValue(fieldData *schemapb.FieldData) (*commonpb.Place Values: flattenedFloat16VectorsToByteVectors(x.Bfloat16Vector, int(vectors.Dim)), } return placeholderValue, nil + case schemapb.DataType_SparseFloatVector: + vectors, ok := fieldData.GetVectors().GetData().(*schemapb.VectorField_SparseFloatVector) + if !ok { + return nil, errors.New("vector data is not schemapb.VectorField_SparseFloatVector") + } + vec := vectors.SparseFloatVector + bytes, err := proto.Marshal(vec) + if err != nil { + return nil, fmt.Errorf("failed to marshal schemapb.SparseFloatArray to bytes: %w", err) + } + placeholderValue := &commonpb.PlaceholderValue{ + Tag: "$0", + Type: commonpb.PlaceholderType_SparseFloatVector, + Values: [][]byte{bytes}, + } + return placeholderValue, nil default: return nil, errors.New("field is not a vector field") } diff --git a/pkg/util/gc/gc_tuner.go b/pkg/util/gc/gc_tuner.go index 04a6c33aa729c..26e1f74c335b7 100644 --- a/pkg/util/gc/gc_tuner.go +++ b/pkg/util/gc/gc_tuner.go @@ -87,7 +87,7 @@ func optimizeGOGC() { // currently we assume 20 ms as long gc pause if (m.PauseNs[(m.NumGC+255)%256] / uint64(time.Millisecond)) < 20 { - log.Info("GC Tune done", zap.Uint32("previous GOGC", previousGOGC), + log.Debug("GC Tune done", zap.Uint32("previous GOGC", previousGOGC), zap.Uint64("heapuse ", toMB(heapuse)), zap.Uint64("total memory", toMB(totaluse)), zap.Uint64("next GC", toMB(m.NextGC)), diff --git a/pkg/util/indexparamcheck/conf_adapter_mgr.go b/pkg/util/indexparamcheck/conf_adapter_mgr.go index ca2d53a3f39b9..f62fbe1bf859a 100644 --- a/pkg/util/indexparamcheck/conf_adapter_mgr.go +++ b/pkg/util/indexparamcheck/conf_adapter_mgr.go @@ -56,6 +56,10 @@ func (mgr *indexCheckerMgrImpl) registerIndexChecker() { mgr.checkers[IndexFaissBinIvfFlat] = newBinIVFFlatChecker() mgr.checkers[IndexHNSW] = newHnswChecker() mgr.checkers[IndexDISKANN] = newDiskannChecker() + mgr.checkers[IndexSparseInverted] = newSparseInvertedIndexChecker() + // WAND doesn't have more index params than sparse inverted index, thus + // using the same checker. + mgr.checkers[IndexSparseWand] = newSparseInvertedIndexChecker() } func newIndexCheckerMgr() *indexCheckerMgrImpl { diff --git a/pkg/util/indexparamcheck/constraints.go b/pkg/util/indexparamcheck/constraints.go index dd4ce2a352212..e3ea7548a2c56 100644 --- a/pkg/util/indexparamcheck/constraints.go +++ b/pkg/util/indexparamcheck/constraints.go @@ -41,6 +41,9 @@ const ( CargaBuildAlgoIVFPQ = "IVF_PQ" CargaBuildAlgoNNDESCENT = "NN_DESCENT" + + // Sparse Index Param + SparseDropRatioBuild = "drop_ratio_build" ) // METRICS is a set of all metrics types supported for float vector. @@ -55,9 +58,11 @@ var ( CagraBuildAlgoTypes = []string{CargaBuildAlgoIVFPQ, CargaBuildAlgoNNDESCENT} supportDimPerSubQuantizer = []int{32, 28, 24, 20, 16, 12, 10, 8, 6, 4, 3, 2, 1} // const supportSubQuantizer = []int{96, 64, 56, 48, 40, 32, 28, 24, 20, 16, 12, 8, 4, 3, 2, 1} // const + SparseMetrics = []string{metric.IP} // const ) const ( - FloatVectorDefaultMetricType = metric.IP - BinaryVectorDefaultMetricType = metric.JACCARD + FloatVectorDefaultMetricType = metric.IP + SparseFloatVectorDefaultMetricType = metric.IP + BinaryVectorDefaultMetricType = metric.JACCARD ) diff --git a/pkg/util/indexparamcheck/hnsw_checker.go b/pkg/util/indexparamcheck/hnsw_checker.go index a4dd5b1e7163f..522b31fe16bc3 100644 --- a/pkg/util/indexparamcheck/hnsw_checker.go +++ b/pkg/util/indexparamcheck/hnsw_checker.go @@ -31,6 +31,7 @@ func (c hnswChecker) CheckTrain(params map[string]string) error { } func (c hnswChecker) CheckValidDataType(dType schemapb.DataType) error { + // TODO(SPARSE) we'll add sparse vector support in HNSW later in cardinal if dType != schemapb.DataType_FloatVector && dType != schemapb.DataType_BinaryVector && dType != schemapb.DataType_Float16Vector && dType != schemapb.DataType_BFloat16Vector { return fmt.Errorf("only support float vector or binary vector") } diff --git a/pkg/util/indexparamcheck/index_type.go b/pkg/util/indexparamcheck/index_type.go index 7d361cc117e00..f559e0cabc9ba 100644 --- a/pkg/util/indexparamcheck/index_type.go +++ b/pkg/util/indexparamcheck/index_type.go @@ -30,6 +30,8 @@ const ( IndexFaissBinIvfFlat IndexType = "BIN_IVF_FLAT" IndexHNSW IndexType = "HNSW" IndexDISKANN IndexType = "DISKANN" + IndexSparseInverted IndexType = "SPARSE_INVERTED_INDEX" + IndexSparseWand IndexType = "SPARSE_WAND" ) func IsGpuIndex(indexType IndexType) bool { diff --git a/pkg/util/indexparamcheck/sparse_float_vector_base_checker.go b/pkg/util/indexparamcheck/sparse_float_vector_base_checker.go new file mode 100644 index 0000000000000..d84bbb32c4eaa --- /dev/null +++ b/pkg/util/indexparamcheck/sparse_float_vector_base_checker.go @@ -0,0 +1,47 @@ +package indexparamcheck + +import ( + "fmt" + "strconv" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/common" +) + +// sparse vector don't check for dim, but baseChecker does, thus not including baseChecker +type sparseFloatVectorBaseChecker struct{} + +func (c sparseFloatVectorBaseChecker) StaticCheck(params map[string]string) error { + if !CheckStrByValues(params, Metric, SparseMetrics) { + return fmt.Errorf("metric type not found or not supported, supported: %v", SparseMetrics) + } + + return nil +} + +func (c sparseFloatVectorBaseChecker) CheckTrain(params map[string]string) error { + dropRatioBuildStr, exist := params[SparseDropRatioBuild] + if exist { + dropRatioBuild, err := strconv.ParseFloat(dropRatioBuildStr, 64) + if err != nil || dropRatioBuild < 0 || dropRatioBuild >= 1 { + return fmt.Errorf("invalid drop_ratio_build: %s, must be in range [0, 1)", dropRatioBuildStr) + } + } + + return nil +} + +func (c sparseFloatVectorBaseChecker) CheckValidDataType(dType schemapb.DataType) error { + if dType != schemapb.DataType_SparseFloatVector { + return fmt.Errorf("only sparse float vector is supported for the specified index tpye") + } + return nil +} + +func (c sparseFloatVectorBaseChecker) SetDefaultMetricTypeIfNotExist(params map[string]string) { + setDefaultIfNotExist(params, common.MetricTypeKey, SparseFloatVectorDefaultMetricType) +} + +func newSparseFloatVectorBaseChecker() IndexChecker { + return &sparseFloatVectorBaseChecker{} +} diff --git a/pkg/util/indexparamcheck/sparse_inverted_index_checker.go b/pkg/util/indexparamcheck/sparse_inverted_index_checker.go new file mode 100644 index 0000000000000..c6d62ed01585b --- /dev/null +++ b/pkg/util/indexparamcheck/sparse_inverted_index_checker.go @@ -0,0 +1,9 @@ +package indexparamcheck + +type sparseInvertedIndexChecker struct { + sparseFloatVectorBaseChecker +} + +func newSparseInvertedIndexChecker() *sparseInvertedIndexChecker { + return &sparseInvertedIndexChecker{} +} diff --git a/pkg/util/testutils/sparse_test_utils.go b/pkg/util/testutils/sparse_test_utils.go new file mode 100644 index 0000000000000..6b4a7d117fcc8 --- /dev/null +++ b/pkg/util/testutils/sparse_test_utils.go @@ -0,0 +1,84 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutils + +import ( + "encoding/binary" + "math" + "math/rand" + "sort" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" +) + +func SparseFloatRowSetAt(row []byte, pos int, idx uint32, value float32) { + binary.LittleEndian.PutUint32(row[pos*8:], idx) + binary.LittleEndian.PutUint32(row[pos*8+4:], math.Float32bits(value)) +} + +func CreateSparseFloatRow(indices []uint32, values []float32) []byte { + row := make([]byte, len(indices)*8) + for i := 0; i < len(indices); i++ { + SparseFloatRowSetAt(row, i, indices[i], values[i]) + } + return row +} + +func GenerateSparseFloatVectors(numRows int) *schemapb.SparseFloatArray { + dim := 700 + avgNnz := 20 + var contents [][]byte + maxDim := 0 + + uniqueAndSort := func(indices []uint32) []uint32 { + seen := make(map[uint32]bool) + var result []uint32 + for _, value := range indices { + if _, ok := seen[value]; !ok { + seen[value] = true + result = append(result, value) + } + } + sort.Slice(result, func(i, j int) bool { + return result[i] < result[j] + }) + return result + } + + for i := 0; i < numRows; i++ { + nnz := rand.Intn(avgNnz*2) + 1 + indices := make([]uint32, 0, nnz) + for j := 0; j < nnz; j++ { + indices = append(indices, uint32(rand.Intn(dim))) + } + indices = uniqueAndSort(indices) + values := make([]float32, 0, len(indices)) + for j := 0; j < len(indices); j++ { + values = append(values, rand.Float32()) + } + if len(indices) > 0 && int(indices[len(indices)-1])+1 > maxDim { + maxDim = int(indices[len(indices)-1]) + 1 + } + rowBytes := CreateSparseFloatRow(indices, values) + + contents = append(contents, rowBytes) + } + return &schemapb.SparseFloatArray{ + Dim: int64(maxDim), + Contents: contents, + } +} diff --git a/pkg/util/typeutil/gen_empty_field_data.go b/pkg/util/typeutil/gen_empty_field_data.go index 3eb5abe3004e2..5d39df275caba 100644 --- a/pkg/util/typeutil/gen_empty_field_data.go +++ b/pkg/util/typeutil/gen_empty_field_data.go @@ -207,6 +207,26 @@ func genEmptyBFloat16VectorFieldData(field *schemapb.FieldSchema) (*schemapb.Fie }, nil } +func genEmptySparseFloatVectorFieldData(field *schemapb.FieldSchema) (*schemapb.FieldData, error) { + return &schemapb.FieldData{ + Type: field.GetDataType(), + FieldName: field.GetName(), + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 0, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: 0, + Contents: make([][]byte, 0), + }, + }, + }, + }, + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), + }, nil +} + func GenEmptyFieldData(field *schemapb.FieldSchema) (*schemapb.FieldData, error) { dataType := field.GetDataType() switch dataType { @@ -234,6 +254,8 @@ func GenEmptyFieldData(field *schemapb.FieldSchema) (*schemapb.FieldData, error) return genEmptyFloat16VectorFieldData(field) case schemapb.DataType_BFloat16Vector: return genEmptyBFloat16VectorFieldData(field) + case schemapb.DataType_SparseFloatVector: + return genEmptySparseFloatVectorFieldData(field) default: return nil, fmt.Errorf("unsupported data type: %s", dataType.String()) } diff --git a/pkg/util/typeutil/get_dim.go b/pkg/util/typeutil/get_dim.go index db102398c1175..8d0b8086bdca2 100644 --- a/pkg/util/typeutil/get_dim.go +++ b/pkg/util/typeutil/get_dim.go @@ -13,6 +13,9 @@ func GetDim(field *schemapb.FieldSchema) (int64, error) { if !IsVectorType(field.GetDataType()) { return 0, fmt.Errorf("%s is not of vector type", field.GetDataType()) } + if IsSparseVectorType(field.GetDataType()) { + return 0, fmt.Errorf("typeutil.GetDim should not invoke on sparse vector type") + } h := NewKvPairs(append(field.GetIndexParams(), field.GetTypeParams()...)) dimStr, err := h.Get(common.DimKey) if err != nil { diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 18f74080af644..a0937342cd286 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -159,6 +159,12 @@ func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLe break } } + case schemapb.DataType_SparseFloatVector: + // TODO(SPARSE, zhengbuqian): size of sparse flaot vector + // varies depending on the number of non-zeros. Using sparse vector + // generated by SPLADE as reference and returning size of a sparse + // vector with 150 non-zeros. + res += 1200 } } return res, nil @@ -235,6 +241,11 @@ func EstimateEntitySize(fieldsData []*schemapb.FieldData, rowOffset int) (int, e res += int(fs.GetVectors().GetDim()) case schemapb.DataType_FloatVector: res += int(fs.GetVectors().GetDim() * 4) + case schemapb.DataType_SparseFloatVector: + vec := fs.GetVectors().GetSparseFloatVector() + // counting only the size of the vector data, ignoring other + // bytes used in proto. + res += len(vec.Contents[rowOffset]) } } return res, nil @@ -359,13 +370,17 @@ func (helper *SchemaHelper) GetVectorDimFromID(fieldID int64) (int, error) { // IsVectorType returns true if input is a vector type, otherwise false func IsVectorType(dataType schemapb.DataType) bool { switch dataType { - case schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + case schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: return true default: return false } } +func IsSparseVectorType(dataType schemapb.DataType) bool { + return dataType == schemapb.DataType_SparseFloatVector +} + // IsIntegerType returns true if input is an integer type, otherwise false func IsIntegerType(dataType schemapb.DataType) bool { switch dataType { @@ -516,6 +531,15 @@ func PrepareResultFieldData(sample []*schemapb.FieldData, topK int64) []*schemap vectors.Vectors.Data = &schemapb.VectorField_BinaryVector{ BinaryVector: make([]byte, 0, topK*dim/8), } + case *schemapb.VectorField_SparseFloatVector: + vectors.Vectors.Data = &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + // dim to be updated when appending data. + Dim: 0, + Contents: make([][]byte, 0, topK), + }, + } + vectors.Vectors.Dim = 0 } fd.Field = vectors } @@ -525,7 +549,7 @@ func PrepareResultFieldData(sample []*schemapb.FieldData, topK int64) []*schemap } // AppendFieldData appends fields data of specified index from src to dst -func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx int64) (appendSize int64) { +func AppendFieldData(dst, src []*schemapb.FieldData, idx int64) (appendSize int64) { for i, fieldData := range src { switch fieldType := fieldData.Field.(type) { case *schemapb.FieldData_Scalars: @@ -711,6 +735,18 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i } /* #nosec G103 */ appendSize += int64(unsafe.Sizeof(srcVector.Bfloat16Vector[idx*(dim*2) : (idx+1)*(dim*2)])) + case *schemapb.VectorField_SparseFloatVector: + if dstVector.GetSparseFloatVector() == nil { + dstVector.Data = &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: 0, + Contents: make([][]byte, 0), + }, + } + dstVector.Dim = srcVector.SparseFloatVector.Dim + } + vec := dstVector.Data.(*schemapb.VectorField_SparseFloatVector).SparseFloatVector + appendSize += appendSparseFloatArraySingleRow(vec, srcVector.SparseFloatVector, idx) default: log.Error("Not supported field type", zap.String("field type", fieldData.Type.String())) } @@ -767,6 +803,8 @@ func DeleteFieldData(dst []*schemapb.FieldData) { case *schemapb.VectorField_Bfloat16Vector: dstBfloat16Vector := dstVector.Data.(*schemapb.VectorField_Bfloat16Vector) dstBfloat16Vector.Bfloat16Vector = dstBfloat16Vector.Bfloat16Vector[:len(dstBfloat16Vector.Bfloat16Vector)-int(dim*2)] + case *schemapb.VectorField_SparseFloatVector: + trimSparseFloatArray(dstVector.GetSparseFloatVector()) default: log.Error("wrong field type added", zap.String("field type", fieldData.Type.String())) } @@ -929,6 +967,14 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error } else { dstVector.GetFloatVector().Data = append(dstVector.GetFloatVector().Data, srcVector.FloatVector.Data...) } + case *schemapb.VectorField_SparseFloatVector: + if dstVector.GetSparseFloatVector() == nil { + dstVector.Data = &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: srcVector.SparseFloatVector, + } + } else { + appendSparseFloatArray(dstVector.GetSparseFloatVector(), srcVector.SparseFloatVector) + } default: log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String())) return errors.New("unsupported data type: " + srcFieldData.Type.String()) @@ -1166,6 +1212,8 @@ func GetData(field *schemapb.FieldData, idx int) interface{} { dim := int(field.GetVectors().GetDim()) dataBytes := dim * 2 return field.GetVectors().GetBfloat16Vector()[idx*dataBytes : (idx+1)*dataBytes] + case schemapb.DataType_SparseFloatVector: + return field.GetVectors().GetSparseFloatVector().Contents[idx] } return nil } @@ -1325,3 +1373,81 @@ func AppendGroupByValue(dstResData *schemapb.SearchResultData, } return nil } + +func appendSparseFloatArray(dst, src *schemapb.SparseFloatArray) { + if len(src.Contents) == 0 { + return + } + if dst.Dim < src.Dim { + dst.Dim = src.Dim + } + dst.Contents = append(dst.Contents, src.Contents...) +} + +// return the size of indices and values of the appended row +func appendSparseFloatArraySingleRow(dst, src *schemapb.SparseFloatArray, idx int64) int64 { + row := src.Contents[idx] + dst.Contents = append(dst.Contents, row) + rowDim := SparseFloatRowDim(row) + if rowDim == 0 { + return 0 + } + if dst.Dim < rowDim { + dst.Dim = rowDim + } + return int64(len(row)) +} + +func trimSparseFloatArray(vec *schemapb.SparseFloatArray) { + if len(vec.Contents) == 0 { + return + } + // not decreasing dim of the entire SparseFloatArray, as we don't want to + // iterate through the entire array to find the new max dim. Correctness + // will not be affected. + vec.Contents = vec.Contents[:len(vec.Contents)-1] +} + +func ValidateSparseFloatRows(rows ...[]byte) error { + for _, row := range rows { + if len(row) == 0 { + return errors.New("empty sparse float vector row") + } + if len(row)%8 != 0 { + return fmt.Errorf("invalid data length in sparse float vector: %d", len(row)) + } + for i := 0; i < SparseFloatRowElementCount(row); i++ { + if i > 0 && SparseFloatRowIndexAt(row, i) < SparseFloatRowIndexAt(row, i-1) { + return errors.New("unsorted indices in sparse float vector") + } + VerifyFloat(float64(SparseFloatRowValueAt(row, i))) + } + } + return nil +} + +// SparseFloatRowUtils +func SparseFloatRowElementCount(row []byte) int { + if row == nil { + return 0 + } + return len(row) / 8 +} + +// does not check for out-of-range access +func SparseFloatRowIndexAt(row []byte, idx int) uint32 { + return common.Endian.Uint32(row[idx*8:]) +} + +// does not check for out-of-range access +func SparseFloatRowValueAt(row []byte, idx int) float32 { + return math.Float32frombits(common.Endian.Uint32(row[idx*8+4:])) +} + +// dim of a sparse float vector is the maximum/last index + 1 +func SparseFloatRowDim(row []byte) int64 { + if len(row) == 0 { + return 0 + } + return int64(SparseFloatRowIndexAt(row, SparseFloatRowElementCount(row)-1)) + 1 +} diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index ade19afbfc8d9..3835fa9317480 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -21,6 +21,7 @@ import ( "reflect" "testing" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -30,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/testutils" ) func TestSchema(t *testing.T) { @@ -162,6 +164,7 @@ func TestSchema(t *testing.T) { }, }, }, + // Do not test on sparse float vector field. }, } @@ -221,6 +224,7 @@ func TestSchema(t *testing.T) { assert.True(t, IsVectorType(schemapb.DataType_FloatVector)) assert.True(t, IsVectorType(schemapb.DataType_Float16Vector)) assert.True(t, IsVectorType(schemapb.DataType_BFloat16Vector)) + assert.True(t, IsVectorType(schemapb.DataType_SparseFloatVector)) assert.False(t, IsIntegerType(schemapb.DataType_Bool)) assert.True(t, IsIntegerType(schemapb.DataType_Int8)) @@ -234,6 +238,7 @@ func TestSchema(t *testing.T) { assert.False(t, IsIntegerType(schemapb.DataType_FloatVector)) assert.False(t, IsIntegerType(schemapb.DataType_Float16Vector)) assert.False(t, IsIntegerType(schemapb.DataType_BFloat16Vector)) + assert.False(t, IsIntegerType(schemapb.DataType_SparseFloatVector)) assert.False(t, IsFloatingType(schemapb.DataType_Bool)) assert.False(t, IsFloatingType(schemapb.DataType_Int8)) @@ -247,6 +252,21 @@ func TestSchema(t *testing.T) { assert.False(t, IsFloatingType(schemapb.DataType_FloatVector)) assert.False(t, IsFloatingType(schemapb.DataType_Float16Vector)) assert.False(t, IsFloatingType(schemapb.DataType_BFloat16Vector)) + assert.False(t, IsFloatingType(schemapb.DataType_SparseFloatVector)) + + assert.False(t, IsSparseVectorType(schemapb.DataType_Bool)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Int8)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Int16)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Int32)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Int64)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Float)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Double)) + assert.False(t, IsSparseVectorType(schemapb.DataType_String)) + assert.False(t, IsSparseVectorType(schemapb.DataType_BinaryVector)) + assert.False(t, IsSparseVectorType(schemapb.DataType_FloatVector)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Float16Vector)) + assert.False(t, IsSparseVectorType(schemapb.DataType_BFloat16Vector)) + assert.True(t, IsSparseVectorType(schemapb.DataType_SparseFloatVector)) }) } @@ -285,6 +305,35 @@ func TestSchema_GetVectorFieldSchema(t *testing.T) { assert.Equal(t, "field_float_vector", fieldSchema[0].Name) }) + schemaSparse := &schemapb.CollectionSchema{ + Name: "testColl", + Description: "", + AutoID: false, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "field_int64", + IsPrimaryKey: true, + Description: "", + DataType: 5, + }, + { + FieldID: 107, + Name: "field_sparse_float_vector", + IsPrimaryKey: false, + Description: "", + DataType: 104, + TypeParams: []*commonpb.KeyValuePair{}, + }, + }, + } + + t.Run("GetSparseFloatVectorFieldSchema", func(t *testing.T) { + fieldSchema := GetVectorFieldSchemas(schemaSparse) + assert.Equal(t, 1, len(fieldSchema)) + assert.Equal(t, "field_sparse_float_vector", fieldSchema[0].Name) + }) + schemaInvalid := &schemapb.CollectionSchema{ Name: "testColl", Description: "", @@ -655,6 +704,23 @@ func genFieldData(fieldName string, fieldID int64, fieldType schemapb.DataType, }, FieldId: fieldID, } + case schemapb.DataType_SparseFloatVector: + fieldData = &schemapb.FieldData{ + Type: schemapb.DataType_SparseFloatVector, + FieldName: fieldName, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: dim, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: dim, + Contents: [][]byte{fieldValue.([]byte)}, + }, + }, + }, + }, + FieldId: fieldID, + } case schemapb.DataType_Array: fieldData = &schemapb.FieldData{ Type: schemapb.DataType_Array, @@ -696,27 +762,29 @@ func genFieldData(fieldName string, fieldID int64, fieldType schemapb.DataType, func TestAppendFieldData(t *testing.T) { const ( - Dim = 8 - BoolFieldName = "BoolField" - Int32FieldName = "Int32Field" - Int64FieldName = "Int64Field" - FloatFieldName = "FloatField" - DoubleFieldName = "DoubleField" - BinaryVectorFieldName = "BinaryVectorField" - FloatVectorFieldName = "FloatVectorField" - Float16VectorFieldName = "Float16VectorField" - BFloat16VectorFieldName = "BFloat16VectorField" - ArrayFieldName = "ArrayField" - BoolFieldID = common.StartOfUserFieldID + 1 - Int32FieldID = common.StartOfUserFieldID + 2 - Int64FieldID = common.StartOfUserFieldID + 3 - FloatFieldID = common.StartOfUserFieldID + 4 - DoubleFieldID = common.StartOfUserFieldID + 5 - BinaryVectorFieldID = common.StartOfUserFieldID + 6 - FloatVectorFieldID = common.StartOfUserFieldID + 7 - Float16VectorFieldID = common.StartOfUserFieldID + 8 - BFloat16VectorFieldID = common.StartOfUserFieldID + 9 - ArrayFieldID = common.StartOfUserFieldID + 10 + Dim = 8 + BoolFieldName = "BoolField" + Int32FieldName = "Int32Field" + Int64FieldName = "Int64Field" + FloatFieldName = "FloatField" + DoubleFieldName = "DoubleField" + BinaryVectorFieldName = "BinaryVectorField" + FloatVectorFieldName = "FloatVectorField" + Float16VectorFieldName = "Float16VectorField" + BFloat16VectorFieldName = "BFloat16VectorField" + ArrayFieldName = "ArrayField" + SparseFloatVectorFieldName = "SparseFloatVectorField" + BoolFieldID = common.StartOfUserFieldID + 1 + Int32FieldID = common.StartOfUserFieldID + 2 + Int64FieldID = common.StartOfUserFieldID + 3 + FloatFieldID = common.StartOfUserFieldID + 4 + DoubleFieldID = common.StartOfUserFieldID + 5 + BinaryVectorFieldID = common.StartOfUserFieldID + 6 + FloatVectorFieldID = common.StartOfUserFieldID + 7 + Float16VectorFieldID = common.StartOfUserFieldID + 8 + BFloat16VectorFieldID = common.StartOfUserFieldID + 9 + ArrayFieldID = common.StartOfUserFieldID + 10 + SparseFloatVectorFieldID = common.StartOfUserFieldID + 11 ) BoolArray := []bool{true, false} Int32Array := []int32{1, 2} @@ -749,8 +817,15 @@ func TestAppendFieldData(t *testing.T) { }, }, } + SparseFloatVector := &schemapb.SparseFloatArray{ + Dim: 231, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + }, + } - result := make([]*schemapb.FieldData, 10) + result := make([]*schemapb.FieldData, 11) var fieldDataArray1 []*schemapb.FieldData fieldDataArray1 = append(fieldDataArray1, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[0:1], 1)) fieldDataArray1 = append(fieldDataArray1, genFieldData(Int32FieldName, Int32FieldID, schemapb.DataType_Int32, Int32Array[0:1], 1)) @@ -762,6 +837,7 @@ func TestAppendFieldData(t *testing.T) { fieldDataArray1 = append(fieldDataArray1, genFieldData(Float16VectorFieldName, Float16VectorFieldID, schemapb.DataType_Float16Vector, Float16Vector[0:Dim*2], Dim)) fieldDataArray1 = append(fieldDataArray1, genFieldData(BFloat16VectorFieldName, BFloat16VectorFieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector[0:Dim*2], Dim)) fieldDataArray1 = append(fieldDataArray1, genFieldData(ArrayFieldName, ArrayFieldID, schemapb.DataType_Array, ArrayArray[0:1], 1)) + fieldDataArray1 = append(fieldDataArray1, genFieldData(SparseFloatVectorFieldName, SparseFloatVectorFieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[0], SparseFloatVector.Dim)) var fieldDataArray2 []*schemapb.FieldData fieldDataArray2 = append(fieldDataArray2, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[1:2], 1)) @@ -774,6 +850,7 @@ func TestAppendFieldData(t *testing.T) { fieldDataArray2 = append(fieldDataArray2, genFieldData(Float16VectorFieldName, Float16VectorFieldID, schemapb.DataType_Float16Vector, Float16Vector[2*Dim:4*Dim], Dim)) fieldDataArray2 = append(fieldDataArray2, genFieldData(BFloat16VectorFieldName, BFloat16VectorFieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector[2*Dim:4*Dim], Dim)) fieldDataArray2 = append(fieldDataArray2, genFieldData(ArrayFieldName, ArrayFieldID, schemapb.DataType_Array, ArrayArray[1:2], 1)) + fieldDataArray2 = append(fieldDataArray2, genFieldData(SparseFloatVectorFieldName, SparseFloatVectorFieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[1], SparseFloatVector.Dim)) AppendFieldData(result, fieldDataArray1, 0) AppendFieldData(result, fieldDataArray2, 0) @@ -788,21 +865,23 @@ func TestAppendFieldData(t *testing.T) { assert.Equal(t, Float16Vector, result[7].GetVectors().Data.(*schemapb.VectorField_Float16Vector).Float16Vector) assert.Equal(t, BFloat16Vector, result[8].GetVectors().Data.(*schemapb.VectorField_Bfloat16Vector).Bfloat16Vector) assert.Equal(t, ArrayArray, result[9].GetScalars().GetArrayData().Data) + assert.Equal(t, SparseFloatVector, result[10].GetVectors().GetSparseFloatVector()) } func TestDeleteFieldData(t *testing.T) { const ( - Dim = 8 - BoolFieldName = "BoolField" - Int32FieldName = "Int32Field" - Int64FieldName = "Int64Field" - FloatFieldName = "FloatField" - DoubleFieldName = "DoubleField" - JSONFieldName = "JSONField" - BinaryVectorFieldName = "BinaryVectorField" - FloatVectorFieldName = "FloatVectorField" - Float16VectorFieldName = "Float16VectorField" - BFloat16VectorFieldName = "BFloat16VectorField" + Dim = 8 + BoolFieldName = "BoolField" + Int32FieldName = "Int32Field" + Int64FieldName = "Int64Field" + FloatFieldName = "FloatField" + DoubleFieldName = "DoubleField" + JSONFieldName = "JSONField" + BinaryVectorFieldName = "BinaryVectorField" + FloatVectorFieldName = "FloatVectorField" + Float16VectorFieldName = "Float16VectorField" + BFloat16VectorFieldName = "BFloat16VectorField" + SparseFloatVectorFieldName = "SparseFloatVectorField" ) const ( @@ -816,6 +895,7 @@ func TestDeleteFieldData(t *testing.T) { FloatVectorFieldID Float16VectorFieldID BFloat16VectorFieldID + SparseFloatVectorFieldID ) BoolArray := []bool{true, false} Int32Array := []int32{1, 2} @@ -833,9 +913,16 @@ func TestDeleteFieldData(t *testing.T) { 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, } + SparseFloatVector := &schemapb.SparseFloatArray{ + Dim: 231, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + }, + } - result1 := make([]*schemapb.FieldData, 10) - result2 := make([]*schemapb.FieldData, 10) + result1 := make([]*schemapb.FieldData, 11) + result2 := make([]*schemapb.FieldData, 11) var fieldDataArray1 []*schemapb.FieldData fieldDataArray1 = append(fieldDataArray1, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[0:1], 1)) fieldDataArray1 = append(fieldDataArray1, genFieldData(Int32FieldName, Int32FieldID, schemapb.DataType_Int32, Int32Array[0:1], 1)) @@ -847,6 +934,7 @@ func TestDeleteFieldData(t *testing.T) { fieldDataArray1 = append(fieldDataArray1, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[0:Dim], Dim)) fieldDataArray1 = append(fieldDataArray1, genFieldData(Float16VectorFieldName, Float16VectorFieldID, schemapb.DataType_Float16Vector, Float16Vector[0:2*Dim], Dim)) fieldDataArray1 = append(fieldDataArray1, genFieldData(BFloat16VectorFieldName, BFloat16VectorFieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector[0:2*Dim], Dim)) + fieldDataArray1 = append(fieldDataArray1, genFieldData(SparseFloatVectorFieldName, SparseFloatVectorFieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[0], SparseFloatVector.Dim)) var fieldDataArray2 []*schemapb.FieldData fieldDataArray2 = append(fieldDataArray2, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[1:2], 1)) @@ -859,6 +947,7 @@ func TestDeleteFieldData(t *testing.T) { fieldDataArray2 = append(fieldDataArray2, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[Dim:2*Dim], Dim)) fieldDataArray2 = append(fieldDataArray2, genFieldData(Float16VectorFieldName, Float16VectorFieldID, schemapb.DataType_Float16Vector, Float16Vector[2*Dim:4*Dim], Dim)) fieldDataArray2 = append(fieldDataArray2, genFieldData(BFloat16VectorFieldName, BFloat16VectorFieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector[2*Dim:4*Dim], Dim)) + fieldDataArray2 = append(fieldDataArray2, genFieldData(SparseFloatVectorFieldName, SparseFloatVectorFieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[1], SparseFloatVector.Dim)) AppendFieldData(result1, fieldDataArray1, 0) AppendFieldData(result1, fieldDataArray2, 0) @@ -873,6 +962,9 @@ func TestDeleteFieldData(t *testing.T) { assert.Equal(t, FloatVector[0:Dim], result1[FloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetFloatVector().Data) assert.Equal(t, Float16Vector[0:2*Dim], result1[Float16VectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_Float16Vector).Float16Vector) assert.Equal(t, BFloat16Vector[0:2*Dim], result1[BFloat16VectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_Bfloat16Vector).Bfloat16Vector) + tmpSparseFloatVector := proto.Clone(SparseFloatVector).(*schemapb.SparseFloatArray) + tmpSparseFloatVector.Contents = [][]byte{SparseFloatVector.Contents[0]} + assert.Equal(t, tmpSparseFloatVector, result1[SparseFloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetSparseFloatVector()) AppendFieldData(result2, fieldDataArray2, 0) AppendFieldData(result2, fieldDataArray1, 0) @@ -887,6 +979,9 @@ func TestDeleteFieldData(t *testing.T) { assert.Equal(t, FloatVector[Dim:2*Dim], result2[FloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetFloatVector().Data) assert.Equal(t, Float16Vector[2*Dim:4*Dim], result2[Float16VectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_Float16Vector).Float16Vector) assert.Equal(t, BFloat16Vector[2*Dim:4*Dim], result2[BFloat16VectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_Bfloat16Vector).Bfloat16Vector) + tmpSparseFloatVector = proto.Clone(SparseFloatVector).(*schemapb.SparseFloatArray) + tmpSparseFloatVector.Contents = [][]byte{SparseFloatVector.Contents[1]} + assert.Equal(t, tmpSparseFloatVector, result2[SparseFloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetSparseFloatVector()) } func TestGetPrimaryFieldSchema(t *testing.T) { @@ -1234,6 +1329,13 @@ func TestGetDataAndGetDataSize(t *testing.T) { 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, } + SparseFloatVector := &schemapb.SparseFloatArray{ + Dim: 231, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + }, + } boolData := genFieldData(fieldName, fieldID, schemapb.DataType_Bool, BoolArray, 1) int8Data := genFieldData(fieldName, fieldID, schemapb.DataType_Int8, Int8Array, 1) @@ -1247,6 +1349,7 @@ func TestGetDataAndGetDataSize(t *testing.T) { floatVecData := genFieldData(fieldName, fieldID, schemapb.DataType_FloatVector, FloatVector, Dim) float16VecData := genFieldData(fieldName, fieldID, schemapb.DataType_Float16Vector, Float16Vector, Dim) bfloat16VecData := genFieldData(fieldName, fieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector, Dim) + sparseFloatData := genFieldData(fieldName, fieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[0], SparseFloatVector.Dim) invalidData := &schemapb.FieldData{ Type: schemapb.DataType_None, } @@ -1272,6 +1375,7 @@ func TestGetDataAndGetDataSize(t *testing.T) { floatVecDataRes := GetData(floatVecData, 0) float16VecDataRes := GetData(float16VecData, 0) bfloat16VecDataRes := GetData(bfloat16VecData, 0) + sparseFloatDataRes := GetData(sparseFloatData, 0) invalidDataRes := GetData(invalidData, 0) assert.Equal(t, BoolArray[0], boolDataRes) @@ -1286,11 +1390,23 @@ func TestGetDataAndGetDataSize(t *testing.T) { assert.ElementsMatch(t, FloatVector[:Dim], floatVecDataRes) assert.ElementsMatch(t, Float16Vector[:2*Dim], float16VecDataRes) assert.ElementsMatch(t, BFloat16Vector[:2*Dim], bfloat16VecDataRes) + assert.Equal(t, SparseFloatVector.Contents[0], sparseFloatDataRes) assert.Nil(t, invalidDataRes) }) } func TestMergeFieldData(t *testing.T) { + sparseFloatRows := [][]byte{ + // 3 rows for dst + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{300, 410, 520}, []float32{1.1, 1.2, 1.3}), + // 3 rows for src + testutils.CreateSparseFloatRow([]uint32{600, 800, 2300}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{90, 141, 352}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{160, 280, 340}, []float32{2.1, 2.2, 2.3}), + } + t.Run("merge data", func(t *testing.T) { dstFields := []*schemapb.FieldData{ genFieldData("int64", 100, schemapb.DataType_Int64, []int64{1, 2, 3}, 1), @@ -1329,6 +1445,22 @@ func TestMergeFieldData(t *testing.T) { }, FieldId: 105, }, + { + Type: schemapb.DataType_SparseFloatVector, + FieldName: "sparseFloat", + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 521, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: 521, + Contents: sparseFloatRows[:3], + }, + }, + }, + }, + FieldId: 106, + }, } srcFields := []*schemapb.FieldData{ @@ -1372,6 +1504,22 @@ func TestMergeFieldData(t *testing.T) { }, FieldId: 105, }, + { + Type: schemapb.DataType_SparseFloatVector, + FieldName: "sparseFloat", + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 2301, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: 2301, + Contents: sparseFloatRows[3:], + }, + }, + }, + }, + FieldId: 106, + }, } err := MergeFieldData(dstFields, srcFields) @@ -1400,6 +1548,10 @@ func TestMergeFieldData(t *testing.T) { dstFields[3].GetScalars().GetArrayData().Data) assert.Equal(t, [][]byte{[]byte("hoo"), []byte("foo")}, dstFields[4].GetScalars().GetBytesData().Data) assert.Equal(t, [][]byte{[]byte("hello"), []byte("world"), []byte("hoo")}, dstFields[5].GetScalars().GetBytesData().Data) + assert.Equal(t, &schemapb.SparseFloatArray{ + Dim: 2301, + Contents: sparseFloatRows, + }, dstFields[6].GetVectors().GetSparseFloatVector()) }) t.Run("merge with nil", func(t *testing.T) { @@ -1416,6 +1568,22 @@ func TestMergeFieldData(t *testing.T) { }, }, }, 1), + { + Type: schemapb.DataType_SparseFloatVector, + FieldName: "sparseFloat", + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 521, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: 521, + Contents: sparseFloatRows[:3], + }, + }, + }, + }, + FieldId: 104, + }, } dstFields := []*schemapb.FieldData{ @@ -1423,6 +1591,7 @@ func TestMergeFieldData(t *testing.T) { {Type: schemapb.DataType_FloatVector, FieldName: "vector", Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Data: &schemapb.VectorField_FloatVector{}}}, FieldId: 101}, {Type: schemapb.DataType_JSON, FieldName: "json", Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_JsonData{}}}, FieldId: 102}, {Type: schemapb.DataType_Array, FieldName: "array", Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_ArrayData{}}}, FieldId: 103}, + {Type: schemapb.DataType_SparseFloatVector, FieldName: "sparseFloat", Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Data: &schemapb.VectorField_SparseFloatVector{}}}, FieldId: 104}, } err := MergeFieldData(dstFields, srcFields) @@ -1442,6 +1611,10 @@ func TestMergeFieldData(t *testing.T) { }, }, dstFields[3].GetScalars().GetArrayData().Data) + assert.Equal(t, &schemapb.SparseFloatArray{ + Dim: 521, + Contents: sparseFloatRows[:3], + }, dstFields[4].GetVectors().GetSparseFloatVector()) }) t.Run("error case", func(t *testing.T) { @@ -1755,8 +1928,80 @@ func (s *FieldDataSuite) TestPrepareFieldData() { s.EqualValues(128, field.GetVectors().GetDim()) s.EqualValues(topK*128/8, cap(field.GetVectors().GetBinaryVector())) }) + + s.Run("sparse_float_vector", func() { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: schemapb.DataType_SparseFloatVector, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 128, + Data: &schemapb.VectorField_SparseFloatVector{}, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(schemapb.DataType_SparseFloatVector, field.GetType()) + + s.EqualValues(0, field.GetVectors().GetDim()) + s.EqualValues(topK, cap(field.GetVectors().GetSparseFloatVector().GetContents())) + }) } func TestFieldData(t *testing.T) { suite.Run(t, new(FieldDataSuite)) } + +func TestValidateSparseFloatRows(t *testing.T) { + t.Run("valid rows", func(t *testing.T) { + rows := [][]byte{ + testutils.CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), + testutils.CreateSparseFloatRow([]uint32{2, 4, 6}, []float32{4.0, 5.0, 6.0}), + testutils.CreateSparseFloatRow([]uint32{0, 7, 8}, []float32{7.0, 8.0, 9.0}), + } + err := ValidateSparseFloatRows(rows...) + assert.NoError(t, err) + }) + + t.Run("nil row", func(t *testing.T) { + err := ValidateSparseFloatRows(nil) + assert.Error(t, err) + }) + + t.Run("incorrect lengths", func(t *testing.T) { + rows := [][]byte{ + make([]byte, 10), + } + err := ValidateSparseFloatRows(rows...) + assert.Error(t, err) + }) + + t.Run("unordered index", func(t *testing.T) { + rows := [][]byte{ + testutils.CreateSparseFloatRow([]uint32{100, 2000, 500}, []float32{1.0, 2.0, 3.0}), + } + err := ValidateSparseFloatRows(rows...) + assert.Error(t, err) + }) + + t.Run("empty indices or values", func(t *testing.T) { + rows := [][]byte{ + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + } + err := ValidateSparseFloatRows(rows...) + assert.Error(t, err) + }) + + t.Run("no rows", func(t *testing.T) { + err := ValidateSparseFloatRows() + assert.NoError(t, err) + }) +}