diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 1a26f3d1cea37..383df36d3e096 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -547,7 +547,7 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { segments := t.getCandidateSegments(channel, partitionID) if len(segments) == 0 { - log.Info("the length of segments is 0, skip to handle compaction") + log.Info("the number of candidate segments is 0, skip to handle compaction") return } diff --git a/internal/datanode/compactor.go b/internal/datanode/compactor.go index ab8dae57af398..787870d71cf9a 100644 --- a/internal/datanode/compactor.go +++ b/internal/datanode/compactor.go @@ -796,6 +796,15 @@ func interface2FieldData(schemaDataType schemapb.DataType, content []interface{} data.Dim = len(data.Data) * 8 / int(numRows) rst = data + case schemapb.DataType_SparseFloatVector: + data := &storage.SparseFloatVectorFieldData{} + for _, c := range content { + if err := data.AppendRow(c); err != nil { + return nil, fmt.Errorf("failed to append row: %v, %w", err, errTransferType) + } + } + rst = data + default: return nil, errUnknownDataType } diff --git a/internal/datanode/compactor_test.go b/internal/datanode/compactor_test.go index cd67f4af96592..8d3b4305d7c59 100644 --- a/internal/datanode/compactor_test.go +++ b/internal/datanode/compactor_test.go @@ -43,6 +43,7 @@ import ( "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/testutils" "github.com/milvus-io/milvus/pkg/util/timerecord" ) @@ -105,6 +106,13 @@ func TestCompactionTaskInnerMethods(t *testing.T) { {false, schemapb.DataType_BinaryVector, []interface{}{nil, nil}, "invalid binaryvector"}, {false, schemapb.DataType_Float16Vector, []interface{}{nil, nil}, "invalid float16vector"}, {false, schemapb.DataType_BFloat16Vector, []interface{}{nil, nil}, "invalid bfloat16vector"}, + + {false, schemapb.DataType_SparseFloatVector, []interface{}{nil, nil}, "invalid sparsefloatvector"}, + {false, schemapb.DataType_SparseFloatVector, []interface{}{[]byte{255}, []byte{15}}, "invalid sparsefloatvector"}, + {true, schemapb.DataType_SparseFloatVector, []interface{}{ + testutils.CreateSparseFloatRow([]uint32{1, 2}, []float32{1.0, 2.0}), + testutils.CreateSparseFloatRow([]uint32{3, 4}, []float32{1.0, 2.0}), + }, "valid sparsefloatvector"}, } // make sure all new data types missed to handle would throw unexpected error diff --git a/internal/indexnode/util.go b/internal/indexnode/util.go index 9387e6fb9d46b..47aef494e3870 100644 --- a/internal/indexnode/util.go +++ b/internal/indexnode/util.go @@ -19,6 +19,8 @@ package indexnode import ( "unsafe" + "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" ) @@ -37,5 +39,8 @@ func estimateFieldDataSize(dim int64, numRows int64, dataType schemapb.DataType) if dataType == schemapb.DataType_BFloat16Vector { return uint64(dim) * uint64(numRows) * 2, nil } + if dataType == schemapb.DataType_SparseFloatVector { + return 0, errors.New("could not estimate field data size of SparseFloatVector") + } return 0, nil } diff --git a/internal/parser/planparserv2/plan_parser_v2.go b/internal/parser/planparserv2/plan_parser_v2.go index 29ed3d0d1fd8a..c1bc6915f4d75 100644 --- a/internal/parser/planparserv2/plan_parser_v2.go +++ b/internal/parser/planparserv2/plan_parser_v2.go @@ -137,6 +137,8 @@ func CreateSearchPlan(schema *typeutil.SchemaHelper, exprStr string, vectorField vectorType = planpb.VectorType_Float16Vector } else if dataType == schemapb.DataType_BFloat16Vector { vectorType = planpb.VectorType_BFloat16Vector + } else if dataType == schemapb.DataType_SparseFloatVector { + vectorType = planpb.VectorType_SparseFloatVector } planNode := &planpb.PlanNode{ Node: &planpb.PlanNode_VectorAnns{ diff --git a/internal/parser/planparserv2/plan_parser_v2_test.go b/internal/parser/planparserv2/plan_parser_v2_test.go index 20fde6d05badc..a12320f37ad53 100644 --- a/internal/parser/planparserv2/plan_parser_v2_test.go +++ b/internal/parser/planparserv2/plan_parser_v2_test.go @@ -428,6 +428,17 @@ func TestCreateBFloat16earchPlan(t *testing.T) { assert.NoError(t, err) } +func TestCreateSparseFloatVectorSearchPlan(t *testing.T) { + schema := newTestSchemaHelper(t) + _, err := CreateSearchPlan(schema, `$meta["A"] != 10`, "SparseFloatVectorField", &planpb.QueryInfo{ + Topk: 0, + MetricType: "", + SearchParams: "", + RoundDecimal: 0, + }) + assert.NoError(t, err) +} + func TestExpr_Invalid(t *testing.T) { schema := newTestSchema() helper, err := typeutil.CreateSchemaHelper(schema) diff --git a/internal/proxy/task.go b/internal/proxy/task.go index ee6feec8e51e2..84c735dc42c8a 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -322,7 +322,7 @@ func (t *createCollectionTask) PreExecute(ctx context.Context) error { if err := validateFieldName(field.Name); err != nil { return err } - // validate vector field type parameters + // validate dense vector field type parameters if isVectorType(field.DataType) { err = validateDimension(field) if err != nil { diff --git a/internal/proxy/task_index.go b/internal/proxy/task_index.go index 3356979188612..3eb1f756c6b61 100644 --- a/internal/proxy/task_index.go +++ b/internal/proxy/task_index.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/indexparams" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -174,9 +175,7 @@ func (cit *createIndexTask) parseIndexParams() error { fmt.Sprintf("create index on %s field", cit.fieldSchema.DataType.String()), fmt.Sprintf("create index on %s field is not supported", cit.fieldSchema.DataType.String())) } - } - - if isVecIndex { + } else { specifyIndexType, exist := indexParamsMap[common.IndexTypeKey] if Params.AutoIndexConfig.Enable.GetAsBool() { // `enable` only for cloud instance. log.Info("create index trigger AutoIndex", @@ -258,6 +257,12 @@ func (cit *createIndexTask) parseIndexParams() error { return err } } + if indexType == indexparamcheck.IndexSparseInverted || indexType == indexparamcheck.IndexSparseWand { + metricType, metricTypeExist := indexParamsMap[common.MetricTypeKey] + if !metricTypeExist || metricType != metric.IP { + return fmt.Errorf("only IP is the supported metric type for sparse index") + } + } err := checkTrain(cit.fieldSchema, indexParamsMap) if err != nil { @@ -309,13 +314,7 @@ func (cit *createIndexTask) getIndexedField(ctx context.Context) (*schemapb.Fiel } func fillDimension(field *schemapb.FieldSchema, indexParams map[string]string) error { - vecDataTypes := []schemapb.DataType{ - schemapb.DataType_FloatVector, - schemapb.DataType_BinaryVector, - schemapb.DataType_Float16Vector, - schemapb.DataType_BFloat16Vector, - } - if !funcutil.SliceContain(vecDataTypes, field.GetDataType()) { + if !isVectorType(field.GetDataType()) { return nil } params := make([]*commonpb.KeyValuePair, 0, len(field.GetTypeParams())+len(field.GetIndexParams())) @@ -338,14 +337,7 @@ func fillDimension(field *schemapb.FieldSchema, indexParams map[string]string) e func checkTrain(field *schemapb.FieldSchema, indexParams map[string]string) error { indexType := indexParams[common.IndexTypeKey] - // skip params check of non-vector field. - vecDataTypes := []schemapb.DataType{ - schemapb.DataType_FloatVector, - schemapb.DataType_BinaryVector, - schemapb.DataType_Float16Vector, - schemapb.DataType_BFloat16Vector, - } - if !funcutil.SliceContain(vecDataTypes, field.GetDataType()) { + if !isVectorType(field.GetDataType()) { return indexparamcheck.CheckIndexValid(field.GetDataType(), indexType, indexParams) } @@ -355,8 +347,10 @@ func checkTrain(field *schemapb.FieldSchema, indexParams map[string]string) erro return fmt.Errorf("invalid index type: %s", indexType) } - if err := fillDimension(field, indexParams); err != nil { - return err + if !isSparseVectorType(field.DataType) { + if err := fillDimension(field, indexParams); err != nil { + return err + } } if err := checker.CheckValidDataType(field.GetDataType()); err != nil { diff --git a/internal/proxy/task_index_test.go b/internal/proxy/task_index_test.go index d7989fe274b11..e365ca88d43fd 100644 --- a/internal/proxy/task_index_test.go +++ b/internal/proxy/task_index_test.go @@ -272,6 +272,76 @@ func TestCreateIndexTask_PreExecute(t *testing.T) { }) } +func Test_sparse_parseIndexParams(t *testing.T) { + cit := &createIndexTask{ + Condition: nil, + req: &milvuspb.CreateIndexRequest{ + Base: nil, + DbName: "", + CollectionName: "", + FieldName: "", + ExtraParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "SPARSE_INVERTED_INDEX", + }, + { + Key: MetricTypeKey, + Value: "IP", + }, + { + Key: common.IndexParamsKey, + Value: "{\"drop_ratio_build\": 0.3}", + }, + }, + IndexName: "", + }, + ctx: nil, + rootCoord: nil, + result: nil, + isAutoIndex: false, + newIndexParams: nil, + newTypeParams: nil, + collectionID: 0, + fieldSchema: &schemapb.FieldSchema{ + FieldID: 101, + Name: "FieldID", + IsPrimaryKey: false, + Description: "field no.1", + DataType: schemapb.DataType_SparseFloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: MetricTypeKey, + Value: "IP", + }, + }, + }, + } + + t.Run("parse index params", func(t *testing.T) { + err := cit.parseIndexParams() + assert.NoError(t, err) + + assert.ElementsMatch(t, + []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "SPARSE_INVERTED_INDEX", + }, + { + Key: MetricTypeKey, + Value: "IP", + }, + { + Key: "drop_ratio_build", + Value: "0.3", + }, + }, cit.newIndexParams) + assert.ElementsMatch(t, + []*commonpb.KeyValuePair{}, cit.newTypeParams) + }) +} + func Test_parseIndexParams(t *testing.T) { cit := &createIndexTask{ Condition: nil, diff --git a/internal/proxy/util.go b/internal/proxy/util.go index b9c3ea8cc4892..1aaac133ea839 100644 --- a/internal/proxy/util.go +++ b/internal/proxy/util.go @@ -98,7 +98,12 @@ func isVectorType(dataType schemapb.DataType) bool { return dataType == schemapb.DataType_FloatVector || dataType == schemapb.DataType_BinaryVector || dataType == schemapb.DataType_Float16Vector || - dataType == schemapb.DataType_BFloat16Vector + dataType == schemapb.DataType_BFloat16Vector || + dataType == schemapb.DataType_SparseFloatVector +} + +func isSparseVectorType(dataType schemapb.DataType) bool { + return dataType == schemapb.DataType_SparseFloatVector } func validateMaxQueryResultWindow(offset int64, limit int64) error { @@ -307,6 +312,12 @@ func validateDimension(field *schemapb.FieldSchema) error { break } } + if isSparseVectorType(field.DataType) { + if exist { + return fmt.Errorf("dim should not be specified for sparse vector field %s(%d)", field.Name, field.FieldID) + } + return nil + } if !exist { return errors.New("dimension is not defined in field type params, check type param `dim` for vector field") } @@ -509,7 +520,7 @@ func isVector(dataType schemapb.DataType) (bool, error) { schemapb.DataType_Float, schemapb.DataType_Double: return false, nil - case schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + case schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: return true, nil } @@ -520,7 +531,7 @@ func validateMetricType(dataType schemapb.DataType, metricTypeStrRaw string) err metricTypeStr := strings.ToUpper(metricTypeStrRaw) switch metricTypeStr { case metric.L2, metric.IP, metric.COSINE: - if dataType == schemapb.DataType_FloatVector || dataType == schemapb.DataType_Float16Vector || dataType == schemapb.DataType_BFloat16Vector { + if dataType == schemapb.DataType_FloatVector || dataType == schemapb.DataType_Float16Vector || dataType == schemapb.DataType_BFloat16Vector || dataType == schemapb.DataType_SparseFloatVector { return nil } case metric.JACCARD, metric.HAMMING, metric.SUBSTRUCTURE, metric.SUPERSTRUCTURE: @@ -581,13 +592,15 @@ func validateSchema(coll *schemapb.CollectionSchema) error { if err2 != nil { return err2 } - dimStr, ok := typeKv[common.DimKey] - if !ok { - return fmt.Errorf("dim not found in type_params for vector field %s(%d)", field.Name, field.FieldID) - } - dim, err := strconv.Atoi(dimStr) - if err != nil || dim < 0 { - return fmt.Errorf("invalid dim; %s", dimStr) + if !isSparseVectorType(field.DataType) { + dimStr, ok := typeKv[common.DimKey] + if !ok { + return fmt.Errorf("dim not found in type_params for vector field %s(%d)", field.Name, field.FieldID) + } + dim, err := strconv.Atoi(dimStr) + if err != nil || dim < 0 { + return fmt.Errorf("invalid dim; %s", dimStr) + } } metricTypeStr, ok := indexKv[common.MetricTypeKey] @@ -624,7 +637,7 @@ func validateMultipleVectorFields(schema *schemapb.CollectionSchema) error { for i := range schema.Fields { name := schema.Fields[i].Name dType := schema.Fields[i].DataType - isVec := dType == schemapb.DataType_BinaryVector || dType == schemapb.DataType_FloatVector || dType == schemapb.DataType_Float16Vector || dType == schemapb.DataType_BFloat16Vector + isVec := dType == schemapb.DataType_BinaryVector || dType == schemapb.DataType_FloatVector || dType == schemapb.DataType_Float16Vector || dType == schemapb.DataType_BFloat16Vector || dType == schemapb.DataType_SparseFloatVector if isVec && vecExist && !enableMultipleVectorFields { return fmt.Errorf( "multiple vector fields is not supported, fields name: %s, %s", diff --git a/internal/proxy/validate_util.go b/internal/proxy/validate_util.go index db8bededaee4a..e75c79cde756c 100644 --- a/internal/proxy/validate_util.go +++ b/internal/proxy/validate_util.go @@ -85,6 +85,10 @@ func (v *validateUtil) Validate(data []*schemapb.FieldData, schema *schemapb.Col if err := v.checkBinaryVectorFieldData(field, fieldSchema); err != nil { return err } + case schemapb.DataType_SparseFloatVector: + if err := v.checkSparseFloatFieldData(field, fieldSchema); err != nil { + return err + } case schemapb.DataType_VarChar: if err := v.checkVarCharFieldData(field, fieldSchema); err != nil { return err @@ -205,6 +209,13 @@ func (v *validateUtil) checkAligned(data []*schemapb.FieldData, schema *typeutil if n != numRows { return errNumRowsMismatch(field.GetFieldName(), n) } + + case schemapb.DataType_SparseFloatVector: + n := uint64(len(field.GetVectors().GetSparseFloatVector().Contents)) + if n != numRows { + return errNumRowsMismatch(field.GetFieldName(), n) + } + default: // error won't happen here. n, err := funcutil.GetNumRowOfFieldData(field) @@ -326,6 +337,19 @@ func (v *validateUtil) checkBinaryVectorFieldData(field *schemapb.FieldData, fie return nil } +func (v *validateUtil) checkSparseFloatFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error { + if field.GetVectors() == nil || field.GetVectors().GetSparseFloatVector() == nil { + msg := fmt.Sprintf("sparse float field '%v' is illegal, nil SparseFloatVector", field.GetFieldName()) + return merr.WrapErrParameterInvalid("need sparse float array", "got nil", msg) + } + sparseRows := field.GetVectors().GetSparseFloatVector().GetContents() + if sparseRows == nil { + msg := fmt.Sprintf("sparse float field '%v' is illegal, array type mismatch", field.GetFieldName()) + return merr.WrapErrParameterInvalid("need sparse float array", "got nil", msg) + } + return typeutil.ValidateSparseFloatRows(sparseRows...) +} + func (v *validateUtil) checkVarCharFieldData(field *schemapb.FieldData, fieldSchema *schemapb.FieldSchema) error { strArr := field.GetScalars().GetStringData().GetData() if strArr == nil && fieldSchema.GetDefaultValue() == nil { diff --git a/internal/querynodev2/segments/utils.go b/internal/querynodev2/segments/utils.go index 1a7b1dc54f853..640d1a4fbcfed 100644 --- a/internal/querynodev2/segments/utils.go +++ b/internal/querynodev2/segments/utils.go @@ -93,6 +93,8 @@ func getPKsFromRowBasedInsertMsg(msg *msgstream.InsertMsg, schema *schemapb.Coll break } } + case schemapb.DataType_SparseFloatVector: + return nil, fmt.Errorf("SparseFloatVector not support in row based message") } } @@ -166,6 +168,10 @@ func fillFloatVecFieldData(ctx context.Context, vcm storage.ChunkManager, dataPa return nil } +func fillSparseFloatVecFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error { + return fmt.Errorf("fillSparseFloatVecFieldData not implemented") +} + func fillBoolFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath string, fieldData *schemapb.FieldData, i int, offset int64, endian binary.ByteOrder) error { // read whole file. // TODO: optimize here. @@ -274,6 +280,8 @@ func fillFieldData(ctx context.Context, vcm storage.ChunkManager, dataPath strin return fillBinVecFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian) case schemapb.DataType_FloatVector: return fillFloatVecFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian) + case schemapb.DataType_SparseFloatVector: + return fillSparseFloatVecFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian) case schemapb.DataType_Bool: return fillBoolFieldData(ctx, vcm, dataPath, fieldData, i, offset, endian) case schemapb.DataType_String, schemapb.DataType_VarChar: diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 0a38c68854564..78d237f366709 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -157,7 +157,7 @@ func (writer *InsertBinlogWriter) NextInsertEventWriter(dim ...int) (*insertEven var event *insertEventWriter var err error - if typeutil.IsVectorType(writer.PayloadDataType) { + if typeutil.IsVectorType(writer.PayloadDataType) && !typeutil.IsSparseVectorType(writer.PayloadDataType) { if len(dim) != 1 { return nil, fmt.Errorf("incorrect input numbers") } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 63d4a5cb87b55..b44e320c1da4f 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -203,7 +203,7 @@ func (insertCodec *InsertCodec) SerializePkStatsByData(data *InsertData) (*Blob, return nil, fmt.Errorf("there is no pk field") } -// Serialize transfer insert data to blob. It will sort insert data by timestamp. +// Serialize transforms insert data to blob. It will sort insert data by timestamp. // From schema, it gets all fields. // For each field, it will create a binlog writer, and write an event to the binlog. // It returns binlog buffer in the end. @@ -259,6 +259,8 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique eventWriter, err = writer.NextInsertEventWriter(singleData.(*Float16VectorFieldData).Dim) case schemapb.DataType_BFloat16Vector: eventWriter, err = writer.NextInsertEventWriter(singleData.(*BFloat16VectorFieldData).Dim) + case schemapb.DataType_SparseFloatVector: + eventWriter, err = writer.NextInsertEventWriter() default: return nil, fmt.Errorf("undefined data type %d", field.DataType) } @@ -384,12 +386,15 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Float16VectorFieldData).GetMemorySize())) case schemapb.DataType_BFloat16Vector: err = eventWriter.AddBFloat16VectorToPayload(singleData.(*BFloat16VectorFieldData).Data, singleData.(*BFloat16VectorFieldData).Dim) + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BFloat16VectorFieldData).GetMemorySize())) + case schemapb.DataType_SparseFloatVector: + err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData)) if err != nil { eventWriter.Close() writer.Close() return nil, err } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BFloat16VectorFieldData).GetMemorySize())) + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*SparseFloatVectorFieldData).GetMemorySize())) default: return nil, fmt.Errorf("undefined data type %d", field.DataType) } @@ -776,6 +781,22 @@ func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int floatVectorFieldData.Dim = dim insertData.Data[fieldID] = floatVectorFieldData + case schemapb.DataType_SparseFloatVector: + sparseData, _, err := eventReader.GetSparseFloatVectorFromPayload() + if err != nil { + eventReader.Close() + binlogReader.Close() + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &SparseFloatVectorFieldData{} + } + vec := insertData.Data[fieldID].(*SparseFloatVectorFieldData) + vec.AppendAllRows(sparseData) + + totalLength += sparseData.RowNum() + insertData.Data[fieldID] = vec + default: eventReader.Close() binlogReader.Close() diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index 8c6468f994627..21b5df30732bc 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -30,28 +30,30 @@ import ( "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/testutils" ) const ( - CollectionID = 1 - PartitionID = 1 - SegmentID = 1 - RowIDField = 0 - TimestampField = 1 - BoolField = 100 - Int8Field = 101 - Int16Field = 102 - Int32Field = 103 - Int64Field = 104 - FloatField = 105 - DoubleField = 106 - StringField = 107 - BinaryVectorField = 108 - FloatVectorField = 109 - ArrayField = 110 - JSONField = 111 - Float16VectorField = 112 - BFloat16VectorField = 113 + CollectionID = 1 + PartitionID = 1 + SegmentID = 1 + RowIDField = 0 + TimestampField = 1 + BoolField = 100 + Int8Field = 101 + Int16Field = 102 + Int32Field = 103 + Int64Field = 104 + FloatField = 105 + DoubleField = 106 + StringField = 107 + BinaryVectorField = 108 + FloatVectorField = 109 + ArrayField = 110 + JSONField = 111 + Float16VectorField = 112 + BFloat16VectorField = 113 + SparseFloatVectorField = 114 ) func genTestCollectionMeta() *etcdpb.CollectionMeta { @@ -187,6 +189,13 @@ func genTestCollectionMeta() *etcdpb.CollectionMeta { }, }, }, + { + FieldID: SparseFloatVectorField, + Name: "field_sparse_float_vector", + Description: "sparse_float_vector", + DataType: schemapb.DataType_SparseFloatVector, + TypeParams: []*commonpb.KeyValuePair{}, + }, }, }, } @@ -266,6 +275,16 @@ func TestInsertCodec(t *testing.T) { Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255}, Dim: 4, }, + SparseFloatVectorField: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + }, + }, + }, }, } @@ -319,6 +338,16 @@ func TestInsertCodec(t *testing.T) { Data: []byte{0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255, 0, 255}, Dim: 4, }, + SparseFloatVectorField: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 300, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{5, 6, 7}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{15, 26, 37}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{105, 207, 299}, []float32{3.1, 3.2, 3.3}), + }, + }, + }, ArrayField: &ArrayFieldData{ ElementType: schemapb.DataType_Int32, Data: []*schemapb.ScalarField{ @@ -359,8 +388,14 @@ func TestInsertCodec(t *testing.T) { FloatVectorField: &FloatVectorFieldData{[]float32{}, 4}, Float16VectorField: &Float16VectorFieldData{[]byte{}, 4}, BFloat16VectorField: &BFloat16VectorFieldData{[]byte{}, 4}, - ArrayField: &ArrayFieldData{schemapb.DataType_Int32, []*schemapb.ScalarField{}}, - JSONField: &JSONFieldData{[][]byte{}}, + SparseFloatVectorField: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 0, + Contents: [][]byte{}, + }, + }, + ArrayField: &ArrayFieldData{schemapb.DataType_Int32, []*schemapb.ScalarField{}}, + JSONField: &JSONFieldData{[][]byte{}}, }, } b, err := insertCodec.Serialize(PartitionID, SegmentID, insertDataEmpty) @@ -414,6 +449,19 @@ func TestInsertCodec(t *testing.T) { 0, 255, 0, 255, 0, 255, 0, 255, }, resultData.Data[BFloat16VectorField].(*BFloat16VectorFieldData).Data) + assert.Equal(t, schemapb.SparseFloatArray{ + // merged dim should be max of all dims + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{5, 6, 7}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{15, 26, 37}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{105, 207, 299}, []float32{3.1, 3.2, 3.3}), + testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + }, + }, resultData.Data[SparseFloatVectorField].(*SparseFloatVectorFieldData).SparseFloatArray) + int32ArrayList := [][]int32{{1, 2, 3}, {4, 5, 6}, {3, 2, 1}, {6, 5, 4}} resultArrayList := [][]int32{} for _, v := range resultData.Data[ArrayField].(*ArrayFieldData).Data { diff --git a/internal/storage/data_sorter.go b/internal/storage/data_sorter.go index 691825b8eaa82..c7e1d3dd884a9 100644 --- a/internal/storage/data_sorter.go +++ b/internal/storage/data_sorter.go @@ -114,6 +114,9 @@ func (ds *DataSorter) Swap(i, j int) { case schemapb.DataType_JSON: data := singleData.(*JSONFieldData).Data data[i], data[j] = data[j], data[i] + case schemapb.DataType_SparseFloatVector: + fieldData := singleData.(*SparseFloatVectorFieldData) + fieldData.Contents[i], fieldData.Contents[j] = fieldData.Contents[j], fieldData.Contents[i] default: errMsg := "undefined data type " + string(field.DataType) panic(errMsg) diff --git a/internal/storage/data_sorter_test.go b/internal/storage/data_sorter_test.go index 5413dd094cb33..324b3a5170508 100644 --- a/internal/storage/data_sorter_test.go +++ b/internal/storage/data_sorter_test.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" + "github.com/milvus-io/milvus/pkg/util/testutils" ) func TestDataSorter(t *testing.T) { @@ -132,9 +133,16 @@ func TestDataSorter(t *testing.T) { FieldID: 111, Name: "field_bfloat16_vector", IsPrimaryKey: false, - Description: "description_12", + Description: "description_13", DataType: schemapb.DataType_BFloat16Vector, }, + { + FieldID: 112, + Name: "field_sparse_float_vector", + IsPrimaryKey: false, + Description: "description_14", + DataType: schemapb.DataType_SparseFloatVector, + }, }, }, } @@ -188,6 +196,16 @@ func TestDataSorter(t *testing.T) { Data: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23}, Dim: 4, }, + 112: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + }, + }, + }, }, } @@ -237,6 +255,7 @@ func TestDataSorter(t *testing.T) { // } // } + // last row should be moved to the first row assert.Equal(t, []int64{2, 3, 4}, dataSorter.InsertData.Data[0].(*Int64FieldData).Data) assert.Equal(t, []int64{5, 3, 4}, dataSorter.InsertData.Data[1].(*Int64FieldData).Data) assert.Equal(t, []bool{true, true, false}, dataSorter.InsertData.Data[100].(*BoolFieldData).Data) @@ -251,6 +270,14 @@ func TestDataSorter(t *testing.T) { assert.Equal(t, []float32{16, 17, 18, 19, 20, 21, 22, 23, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, dataSorter.InsertData.Data[109].(*FloatVectorFieldData).Data) assert.Equal(t, []byte{16, 17, 18, 19, 20, 21, 22, 23, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, dataSorter.InsertData.Data[110].(*Float16VectorFieldData).Data) assert.Equal(t, []byte{16, 17, 18, 19, 20, 21, 22, 23, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}, dataSorter.InsertData.Data[111].(*BFloat16VectorFieldData).Data) + assert.Equal(t, schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + }, + }, dataSorter.InsertData.Data[112].(*SparseFloatVectorFieldData).SparseFloatArray) } func TestDataSorter_Len(t *testing.T) { diff --git a/internal/storage/event_writer.go b/internal/storage/event_writer.go index 5d58361f423c3..040ecec58db92 100644 --- a/internal/storage/event_writer.go +++ b/internal/storage/event_writer.go @@ -215,7 +215,7 @@ func newDescriptorEvent() *descriptorEvent { func newInsertEventWriter(dataType schemapb.DataType, dim ...int) (*insertEventWriter, error) { var payloadWriter PayloadWriterInterface var err error - if typeutil.IsVectorType(dataType) { + if typeutil.IsVectorType(dataType) && !typeutil.IsSparseVectorType(dataType) { if len(dim) != 1 { return nil, fmt.Errorf("incorrect input numbers") } diff --git a/internal/storage/insert_data.go b/internal/storage/insert_data.go index 1ebda6ef8ca8d..945162915b0d4 100644 --- a/internal/storage/insert_data.go +++ b/internal/storage/insert_data.go @@ -20,9 +20,12 @@ import ( "encoding/binary" "fmt" + "github.com/gogo/protobuf/proto" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // TODO: fill it @@ -181,7 +184,8 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema) Data: make([]byte, 0), Dim: dim, }, nil - + case schemapb.DataType_SparseFloatVector: + return &SparseFloatVectorFieldData{}, nil case schemapb.DataType_Bool: return &BoolFieldData{ Data: make([]bool, 0), @@ -283,6 +287,20 @@ type BFloat16VectorFieldData struct { Dim int } +type SparseFloatVectorFieldData struct { + schemapb.SparseFloatArray +} + +func (dst *SparseFloatVectorFieldData) AppendAllRows(src *SparseFloatVectorFieldData) { + if len(src.Contents) == 0 { + return + } + if dst.Dim < src.Dim { + dst.Dim = src.Dim + } + dst.Contents = append(dst.Contents, src.Contents...) +} + // RowNum implements FieldData.RowNum func (data *BoolFieldData) RowNum() int { return len(data.Data) } func (data *Int8FieldData) RowNum() int { return len(data.Data) } @@ -300,6 +318,7 @@ func (data *Float16VectorFieldData) RowNum() int { return len(data.Data) / 2 / d func (data *BFloat16VectorFieldData) RowNum() int { return len(data.Data) / 2 / data.Dim } +func (data *SparseFloatVectorFieldData) RowNum() int { return len(data.Contents) } // GetRow implements FieldData.GetRow func (data *BoolFieldData) GetRow(i int) any { return data.Data[i] } @@ -312,10 +331,14 @@ func (data *DoubleFieldData) GetRow(i int) any { return data.Data[i] } func (data *StringFieldData) GetRow(i int) any { return data.Data[i] } func (data *ArrayFieldData) GetRow(i int) any { return data.Data[i] } func (data *JSONFieldData) GetRow(i int) any { return data.Data[i] } -func (data *BinaryVectorFieldData) GetRow(i int) interface{} { +func (data *BinaryVectorFieldData) GetRow(i int) any { return data.Data[i*data.Dim/8 : (i+1)*data.Dim/8] } +func (data *SparseFloatVectorFieldData) GetRow(i int) interface{} { + return data.Contents[i] +} + func (data *FloatVectorFieldData) GetRow(i int) interface{} { return data.Data[i*data.Dim : (i+1)*data.Dim] } @@ -328,20 +351,21 @@ func (data *BFloat16VectorFieldData) GetRow(i int) interface{} { return data.Data[i*data.Dim*2 : (i+1)*data.Dim*2] } -func (data *BoolFieldData) GetRows() any { return data.Data } -func (data *Int8FieldData) GetRows() any { return data.Data } -func (data *Int16FieldData) GetRows() any { return data.Data } -func (data *Int32FieldData) GetRows() any { return data.Data } -func (data *Int64FieldData) GetRows() any { return data.Data } -func (data *FloatFieldData) GetRows() any { return data.Data } -func (data *DoubleFieldData) GetRows() any { return data.Data } -func (data *StringFieldData) GetRows() any { return data.Data } -func (data *ArrayFieldData) GetRows() any { return data.Data } -func (data *JSONFieldData) GetRows() any { return data.Data } -func (data *BinaryVectorFieldData) GetRows() any { return data.Data } -func (data *FloatVectorFieldData) GetRows() any { return data.Data } -func (data *Float16VectorFieldData) GetRows() any { return data.Data } -func (data *BFloat16VectorFieldData) GetRows() any { return data.Data } +func (data *BoolFieldData) GetRows() any { return data.Data } +func (data *Int8FieldData) GetRows() any { return data.Data } +func (data *Int16FieldData) GetRows() any { return data.Data } +func (data *Int32FieldData) GetRows() any { return data.Data } +func (data *Int64FieldData) GetRows() any { return data.Data } +func (data *FloatFieldData) GetRows() any { return data.Data } +func (data *DoubleFieldData) GetRows() any { return data.Data } +func (data *StringFieldData) GetRows() any { return data.Data } +func (data *ArrayFieldData) GetRows() any { return data.Data } +func (data *JSONFieldData) GetRows() any { return data.Data } +func (data *BinaryVectorFieldData) GetRows() any { return data.Data } +func (data *FloatVectorFieldData) GetRows() any { return data.Data } +func (data *Float16VectorFieldData) GetRows() any { return data.Data } +func (data *BFloat16VectorFieldData) GetRows() any { return data.Data } +func (data *SparseFloatVectorFieldData) GetRows() any { return data.Contents } // AppendRow implements FieldData.AppendRow func (data *BoolFieldData) AppendRow(row interface{}) error { @@ -470,6 +494,22 @@ func (data *BFloat16VectorFieldData) AppendRow(row interface{}) error { return nil } +func (data *SparseFloatVectorFieldData) AppendRow(row interface{}) error { + v, ok := row.([]byte) + if !ok { + return merr.WrapErrParameterInvalid("SparseFloatVectorRowData", row, "Wrong row type") + } + if err := typeutil.ValidateSparseFloatRows(v); err != nil { + return err + } + rowDim := typeutil.SparseFloatRowDim(v) + if data.Dim < rowDim { + data.Dim = rowDim + } + data.Contents = append(data.Contents, v) + return nil +} + func (data *BoolFieldData) AppendRows(rows interface{}) error { v, ok := rows.([]bool) if !ok { @@ -612,6 +652,18 @@ func (data *BFloat16VectorFieldData) AppendRows(rows interface{}) error { return nil } +func (data *SparseFloatVectorFieldData) AppendRows(rows interface{}) error { + v, ok := rows.(SparseFloatVectorFieldData) + if !ok { + return merr.WrapErrParameterInvalid("SparseFloatVectorFieldData", rows, "Wrong rows type") + } + data.Contents = append(data.SparseFloatArray.Contents, v.Contents...) + if data.Dim < v.Dim { + data.Dim = v.Dim + } + return nil +} + // GetMemorySize implements FieldData.GetMemorySize func (data *BoolFieldData) GetMemorySize() int { return binary.Size(data.Data) } func (data *Int8FieldData) GetMemorySize() int { return binary.Size(data.Data) } @@ -627,6 +679,11 @@ func (data *BFloat16VectorFieldData) GetMemorySize() int { return binary.Size(data.Data) + 4 } +func (data *SparseFloatVectorFieldData) GetMemorySize() int { + // TODO(SPARSE): should this be the memory size of serialzied size? + return proto.Size(&data.SparseFloatArray) +} + // GetDataType implements FieldData.GetDataType func (data *BoolFieldData) GetDataType() schemapb.DataType { return schemapb.DataType_Bool } func (data *Int8FieldData) GetDataType() schemapb.DataType { return schemapb.DataType_Int8 } @@ -654,6 +711,10 @@ func (data *BFloat16VectorFieldData) GetDataType() schemapb.DataType { return schemapb.DataType_BFloat16Vector } +func (data *SparseFloatVectorFieldData) GetDataType() schemapb.DataType { + return schemapb.DataType_SparseFloatVector +} + // why not binary.Size(data) directly? binary.Size(data) return -1 // binary.Size returns how many bytes Write would generate to encode the value v, which // must be a fixed-size value or a slice of fixed-size values, or a pointer to such data. @@ -733,3 +794,7 @@ func (data *ArrayFieldData) GetRowSize(i int) int { } return 0 } + +func (data *SparseFloatVectorFieldData) GetRowSize(i int) int { + return len(data.Contents[i]) +} diff --git a/internal/storage/insert_data_test.go b/internal/storage/insert_data_test.go index 5d12da7b3d090..e63a9c515e554 100644 --- a/internal/storage/insert_data_test.go +++ b/internal/storage/insert_data_test.go @@ -9,6 +9,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/testutils" ) func TestInsertDataSuite(t *testing.T) { @@ -84,11 +85,11 @@ func (s *InsertDataSuite) TestInsertData() { s.False(s.iDataOneRow.IsEmpty()) s.Equal(1, s.iDataOneRow.GetRowNum()) - s.Equal(151, s.iDataOneRow.GetMemorySize()) + s.Equal(179, s.iDataOneRow.GetMemorySize()) s.False(s.iDataTwoRows.IsEmpty()) s.Equal(2, s.iDataTwoRows.GetRowNum()) - s.Equal(286, s.iDataTwoRows.GetMemorySize()) + s.Equal(340, s.iDataTwoRows.GetMemorySize()) for _, field := range s.iDataTwoRows.Data { s.Equal(2, field.RowNum()) @@ -187,20 +188,21 @@ func (s *InsertDataSuite) SetupTest() { s.Equal(16, s.iDataEmpty.GetMemorySize()) row1 := map[FieldID]interface{}{ - RowIDField: int64(3), - TimestampField: int64(3), - BoolField: true, - Int8Field: int8(3), - Int16Field: int16(3), - Int32Field: int32(3), - Int64Field: int64(3), - FloatField: float32(3), - DoubleField: float64(3), - StringField: "str", - BinaryVectorField: []byte{0}, - FloatVectorField: []float32{4, 5, 6, 7}, - Float16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255}, - BFloat16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255}, + RowIDField: int64(3), + TimestampField: int64(3), + BoolField: true, + Int8Field: int8(3), + Int16Field: int16(3), + Int32Field: int32(3), + Int64Field: int64(3), + FloatField: float32(3), + DoubleField: float64(3), + StringField: "str", + BinaryVectorField: []byte{0}, + FloatVectorField: []float32{4, 5, 6, 7}, + Float16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255}, + BFloat16VectorField: []byte{0, 0, 0, 0, 255, 255, 255, 255}, + SparseFloatVectorField: testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{4, 5, 6}), ArrayField: &schemapb.ScalarField{ Data: &schemapb.ScalarField_IntData{ IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}}, @@ -219,20 +221,21 @@ func (s *InsertDataSuite) SetupTest() { } row2 := map[FieldID]interface{}{ - RowIDField: int64(1), - TimestampField: int64(1), - BoolField: false, - Int8Field: int8(1), - Int16Field: int16(1), - Int32Field: int32(1), - Int64Field: int64(1), - FloatField: float32(1), - DoubleField: float64(1), - StringField: string("str"), - BinaryVectorField: []byte{0}, - FloatVectorField: []float32{4, 5, 6, 7}, - Float16VectorField: []byte{1, 2, 3, 4, 5, 6, 7, 8}, - BFloat16VectorField: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + RowIDField: int64(1), + TimestampField: int64(1), + BoolField: false, + Int8Field: int8(1), + Int16Field: int16(1), + Int32Field: int32(1), + Int64Field: int64(1), + FloatField: float32(1), + DoubleField: float64(1), + StringField: string("str"), + BinaryVectorField: []byte{0}, + FloatVectorField: []float32{4, 5, 6, 7}, + Float16VectorField: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + BFloat16VectorField: []byte{1, 2, 3, 4, 5, 6, 7, 8}, + SparseFloatVectorField: testutils.CreateSparseFloatRow([]uint32{2, 3, 4}, []float32{4, 5, 6}), ArrayField: &schemapb.ScalarField{ Data: &schemapb.ScalarField_IntData{ IntData: &schemapb.IntArray{Data: []int32{1, 2, 3}}, diff --git a/internal/storage/payload.go b/internal/storage/payload.go index c810c67659287..99fa0f52094e3 100644 --- a/internal/storage/payload.go +++ b/internal/storage/payload.go @@ -42,6 +42,7 @@ type PayloadWriterInterface interface { AddFloatVectorToPayload(binVec []float32, dim int) error AddFloat16VectorToPayload(binVec []byte, dim int) error AddBFloat16VectorToPayload(binVec []byte, dim int) error + AddSparseFloatVectorToPayload(data *SparseFloatVectorFieldData) error FinishPayloadWriter() error GetPayloadBufferFromWriter() ([]byte, error) GetPayloadLengthFromWriter() (int, error) @@ -67,6 +68,7 @@ type PayloadReaderInterface interface { GetFloat16VectorFromPayload() ([]byte, int, error) GetBFloat16VectorFromPayload() ([]byte, int, error) GetFloatVectorFromPayload() ([]float32, int, error) + GetSparseFloatVectorFromPayload() (*SparseFloatVectorFieldData, int, error) GetPayloadLengthFromReader() (int, error) GetByteArrayDataSet() (*DataSet[parquet.ByteArray, *file.ByteArrayColumnChunkReader], error) diff --git a/internal/storage/payload_reader.go b/internal/storage/payload_reader.go index 1a5b462209828..4a53ebf900231 100644 --- a/internal/storage/payload_reader.go +++ b/internal/storage/payload_reader.go @@ -14,6 +14,7 @@ import ( "github.com/golang/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // PayloadReader reads data from payload @@ -73,6 +74,8 @@ func (r *PayloadReader) GetDataFromPayload() (interface{}, int, error) { return r.GetFloat16VectorFromPayload() case schemapb.DataType_BFloat16Vector: return r.GetBFloat16VectorFromPayload() + case schemapb.DataType_SparseFloatVector: + return r.GetSparseFloatVectorFromPayload() case schemapb.DataType_String, schemapb.DataType_VarChar: val, err := r.GetStringFromPayload() return val, 0, err @@ -429,6 +432,36 @@ func (r *PayloadReader) GetFloatVectorFromPayload() ([]float32, int, error) { return ret, dim, nil } +func (r *PayloadReader) GetSparseFloatVectorFromPayload() (*SparseFloatVectorFieldData, int, error) { + if !typeutil.IsSparseVectorType(r.colType) { + return nil, -1, fmt.Errorf("failed to get sparse float vector from datatype %v", r.colType.String()) + } + values := make([]parquet.ByteArray, r.numRows) + valuesRead, err := ReadDataFromAllRowGroups[parquet.ByteArray, *file.ByteArrayColumnChunkReader](r.reader, values, 0, r.numRows) + if err != nil { + return nil, -1, err + } + if valuesRead != r.numRows { + return nil, -1, fmt.Errorf("expect %d binary, but got = %d", r.numRows, valuesRead) + } + + fieldData := &SparseFloatVectorFieldData{} + + for _, value := range values { + if len(value)%8 != 0 { + return nil, -1, fmt.Errorf("invalid bytesData length") + } + + fieldData.Contents = append(fieldData.Contents, value) + rowDim := typeutil.SparseFloatRowDim(value) + if rowDim > fieldData.Dim { + fieldData.Dim = rowDim + } + } + + return fieldData, int(fieldData.Dim), nil +} + func (r *PayloadReader) GetPayloadLengthFromReader() (int, error) { return int(r.numRows), nil } diff --git a/internal/storage/payload_test.go b/internal/storage/payload_test.go index fe0db83732130..b50aad7b34b90 100644 --- a/internal/storage/payload_test.go +++ b/internal/storage/payload_test.go @@ -26,6 +26,7 @@ import ( "github.com/stretchr/testify/require" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/util/testutils" ) func TestPayload_ReaderAndWriter(t *testing.T) { @@ -619,6 +620,170 @@ func TestPayload_ReaderAndWriter(t *testing.T) { defer r.ReleasePayloadReader() }) + t.Run("TestSparseFloatVector", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_SparseFloatVector) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddSparseFloatVectorToPayload(&SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + }, + }, + }) + assert.NoError(t, err) + err = w.AddSparseFloatVectorToPayload(&SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{170, 300, 579}, []float32{3.1, 3.2, 3.3}), + }, + }, + }) + assert.NoError(t, err) + err = w.FinishPayloadWriter() + assert.NoError(t, err) + + length, err := w.GetPayloadLengthFromWriter() + assert.NoError(t, err) + assert.Equal(t, 6, length) + defer w.ReleasePayloadWriter() + + buffer, err := w.GetPayloadBufferFromWriter() + assert.NoError(t, err) + + r, err := NewPayloadReader(schemapb.DataType_SparseFloatVector, buffer) + require.Nil(t, err) + length, err = r.GetPayloadLengthFromReader() + assert.NoError(t, err) + assert.Equal(t, length, 6) + + floatVecs, dim, err := r.GetSparseFloatVectorFromPayload() + assert.NoError(t, err) + assert.Equal(t, 600, dim) + assert.Equal(t, 6, len(floatVecs.Contents)) + assert.Equal(t, schemapb.SparseFloatArray{ + // merged dim should be max of all dims + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{0, 1, 2}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{170, 300, 579}, []float32{3.1, 3.2, 3.3}), + }, + }, floatVecs.SparseFloatArray) + + ifloatVecs, dim, err := r.GetDataFromPayload() + assert.NoError(t, err) + assert.Equal(t, floatVecs, ifloatVecs.(*SparseFloatVectorFieldData)) + assert.Equal(t, 600, dim) + defer r.ReleasePayloadReader() + }) + + testSparseOneBatch := func(t *testing.T, rows [][]byte, actualDim int) { + w, err := NewPayloadWriter(schemapb.DataType_SparseFloatVector) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddSparseFloatVectorToPayload(&SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: int64(actualDim), + Contents: rows, + }, + }) + assert.NoError(t, err) + err = w.FinishPayloadWriter() + assert.NoError(t, err) + + length, err := w.GetPayloadLengthFromWriter() + assert.NoError(t, err) + assert.Equal(t, 3, length) + defer w.ReleasePayloadWriter() + + buffer, err := w.GetPayloadBufferFromWriter() + assert.NoError(t, err) + + r, err := NewPayloadReader(schemapb.DataType_SparseFloatVector, buffer) + require.Nil(t, err) + length, err = r.GetPayloadLengthFromReader() + assert.NoError(t, err) + assert.Equal(t, length, 3) + + floatVecs, dim, err := r.GetSparseFloatVectorFromPayload() + assert.NoError(t, err) + assert.Equal(t, actualDim, dim) + assert.Equal(t, 3, len(floatVecs.Contents)) + assert.Equal(t, schemapb.SparseFloatArray{ + Dim: int64(dim), + Contents: rows, + }, floatVecs.SparseFloatArray) + + ifloatVecs, dim, err := r.GetDataFromPayload() + assert.NoError(t, err) + assert.Equal(t, floatVecs, ifloatVecs.(*SparseFloatVectorFieldData)) + assert.Equal(t, actualDim, dim) + defer r.ReleasePayloadReader() + } + + t.Run("TestSparseFloatVector_emptyRow", func(t *testing.T) { + testSparseOneBatch(t, [][]byte{ + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, 3.2, 3.3}), + }, 600) + testSparseOneBatch(t, [][]byte{ + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + }, 0) + }) + + t.Run("TestSparseFloatVector_largeRow", func(t *testing.T) { + nnz := 100000 + // generate an int slice with nnz random sorted elements + indices := make([]uint32, nnz) + values := make([]float32, nnz) + for i := 0; i < nnz; i++ { + indices[i] = uint32(i * 6) + values[i] = float32(i) + } + dim := int(indices[nnz-1]) + 1 + testSparseOneBatch(t, [][]byte{ + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow(indices, values), + }, dim) + }) + + t.Run("TestSparseFloatVector_negativeValues", func(t *testing.T) { + testSparseOneBatch(t, [][]byte{ + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{-2.1, 2.2, -2.3}), + testutils.CreateSparseFloatRow([]uint32{100, 200, 599}, []float32{3.1, -3.2, 3.3}), + }, 600) + }) + + // even though SPARSE_INVERTED_INDEX and SPARSE_WAND index do not support + // arbitrarily large dimensions, HNSW does, so we still need to test it. + // Dimension range we support is 0 to positive int32 max - 1(to leave room + // for dim). + t.Run("TestSparseFloatVector_largeIndex", func(t *testing.T) { + int32Max := uint32(math.MaxInt32) + testSparseOneBatch(t, [][]byte{ + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + testutils.CreateSparseFloatRow([]uint32{10, 20, 30}, []float32{-2.1, 2.2, -2.3}), + testutils.CreateSparseFloatRow([]uint32{100, int32Max / 2, int32Max - 1}, []float32{3.1, -3.2, 3.3}), + }, int(int32Max)) + }) + // t.Run("TestAddDataToPayload", func(t *testing.T) { // w, err := NewPayloadWriter(schemapb.DataType_Bool) // w.colType = 999 @@ -863,6 +1028,37 @@ func TestPayload_ReaderAndWriter(t *testing.T) { err = w.AddBFloat16VectorToPayload([]byte{1, 0, 0, 0, 0, 0, 0, 0}, 8) assert.Error(t, err) }) + t.Run("TestAddSparseFloatVectorAfterFinish", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_SparseFloatVector) + require.Nil(t, err) + require.NotNil(t, w) + defer w.Close() + + err = w.FinishPayloadWriter() + assert.NoError(t, err) + + err = w.AddSparseFloatVectorToPayload(&SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 53, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + }, + }, + }) + assert.Error(t, err) + err = w.AddSparseFloatVectorToPayload(&SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + }, + }, + }) + assert.Error(t, err) + + err = w.FinishPayloadWriter() + assert.Error(t, err) + }) t.Run("TestNewReadError", func(t *testing.T) { buffer := []byte{0} r, err := NewPayloadReader(999, buffer) @@ -1388,6 +1584,60 @@ func TestPayload_ReaderAndWriter(t *testing.T) { assert.Error(t, err) }) + t.Run("TestGetSparseFloatVectorError", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_Bool) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddBoolToPayload([]bool{false, true, true}) + assert.NoError(t, err) + + err = w.FinishPayloadWriter() + assert.NoError(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.NoError(t, err) + + r, err := NewPayloadReader(schemapb.DataType_SparseFloatVector, buffer) + assert.NoError(t, err) + + _, _, err = r.GetSparseFloatVectorFromPayload() + assert.Error(t, err) + + r.colType = 999 + _, _, err = r.GetSparseFloatVectorFromPayload() + assert.Error(t, err) + }) + + t.Run("TestGetSparseFloatVectorError2", func(t *testing.T) { + w, err := NewPayloadWriter(schemapb.DataType_SparseFloatVector) + require.Nil(t, err) + require.NotNil(t, w) + + err = w.AddSparseFloatVectorToPayload(&SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 53, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + }, + }, + }) + assert.NoError(t, err) + + err = w.FinishPayloadWriter() + assert.NoError(t, err) + + buffer, err := w.GetPayloadBufferFromWriter() + assert.NoError(t, err) + + r, err := NewPayloadReader(schemapb.DataType_SparseFloatVector, buffer) + assert.NoError(t, err) + + r.numRows = 99 + _, _, err = r.GetSparseFloatVectorFromPayload() + assert.Error(t, err) + }) + t.Run("TestWriteLargeSizeData", func(t *testing.T) { t.Skip("Large data skip for online ut") size := 1 << 29 // 512M diff --git a/internal/storage/payload_writer.go b/internal/storage/payload_writer.go index e2956a5fd6225..e1efac35b29c0 100644 --- a/internal/storage/payload_writer.go +++ b/internal/storage/payload_writer.go @@ -50,7 +50,8 @@ type NativePayloadWriter struct { func NewPayloadWriter(colType schemapb.DataType, dim ...int) (PayloadWriterInterface, error) { var arrowType arrow.DataType - if typeutil.IsVectorType(colType) { + // writer for sparse float vector doesn't require dim + if typeutil.IsVectorType(colType) && !typeutil.IsSparseVectorType(colType) { if len(dim) != 1 { return nil, fmt.Errorf("incorrect input numbers") } @@ -164,6 +165,12 @@ func (w *NativePayloadWriter) AddDataToPayload(data interface{}, dim ...int) err return errors.New("incorrect data type") } return w.AddBFloat16VectorToPayload(val, dim[0]) + case schemapb.DataType_SparseFloatVector: + val, ok := data.(*SparseFloatVectorFieldData) + if !ok { + return errors.New("incorrect data type") + } + return w.AddSparseFloatVectorToPayload(val) default: return errors.New("incorrect datatype") } @@ -475,6 +482,23 @@ func (w *NativePayloadWriter) AddBFloat16VectorToPayload(data []byte, dim int) e return nil } +func (w *NativePayloadWriter) AddSparseFloatVectorToPayload(data *SparseFloatVectorFieldData) error { + if w.finished { + return errors.New("can't append data to finished writer") + } + builder, ok := w.builder.(*array.BinaryBuilder) + if !ok { + return errors.New("failed to cast BinaryBuilder") + } + length := len(data.SparseFloatArray.Contents) + builder.Reserve(length) + for i := 0; i < length; i++ { + builder.Append(data.SparseFloatArray.Contents[i]) + } + + return nil +} + func (w *NativePayloadWriter) FinishPayloadWriter() error { if w.finished { return errors.New("can't reuse a finished writer") @@ -574,6 +598,8 @@ func milvusDataTypeToArrowType(dataType schemapb.DataType, dim int) arrow.DataTy return &arrow.FixedSizeBinaryType{ ByteWidth: dim * 2, } + case schemapb.DataType_SparseFloatVector: + return &arrow.BinaryType{} default: panic("unsupported data type") } diff --git a/internal/storage/print_binlog.go b/internal/storage/print_binlog.go index da3eafc968b18..c27cc4715a213 100644 --- a/internal/storage/print_binlog.go +++ b/internal/storage/print_binlog.go @@ -334,6 +334,18 @@ func printPayloadValues(colType schemapb.DataType, reader PayloadReaderInterface for i := 0; i < rows; i++ { fmt.Printf("\t\t%d : %s\n", i, val[i]) } + case schemapb.DataType_SparseFloatVector: + sparseData, _, err := reader.GetSparseFloatVectorFromPayload() + if err != nil { + return err + } + fmt.Println("======= SparseFloatVectorFieldData =======") + fmt.Println("row num:", len(sparseData.Contents)) + fmt.Println("dim:", sparseData.Dim) + for _, v := range sparseData.Contents { + fmt.Println(v) + } + fmt.Println("===== SparseFloatVectorFieldData end =====") default: return errors.New("undefined data type") } diff --git a/internal/storage/stats.go b/internal/storage/stats.go index b5ce61740e876..7914e04b80ef5 100644 --- a/internal/storage/stats.go +++ b/internal/storage/stats.go @@ -187,7 +187,7 @@ func (stats *PrimaryKeyStats) UpdateMinMax(pk PrimaryKey) { func NewPrimaryKeyStats(fieldID, pkType, rowNum int64) (*PrimaryKeyStats, error) { if rowNum <= 0 { - return nil, merr.WrapErrParameterInvalidMsg("non zero & non negative row num", rowNum) + return nil, merr.WrapErrParameterInvalidMsg("zero or negative row num", rowNum) } return &PrimaryKeyStats{ FieldID: fieldID, diff --git a/internal/storage/utils.go b/internal/storage/utils.go index 0000642ff5d46..9c91b9cbcc538 100644 --- a/internal/storage/utils.go +++ b/internal/storage/utils.go @@ -422,6 +422,8 @@ func RowBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *schemap Data: vecs, Dim: dim, } + case schemapb.DataType_SparseFloatVector: + return nil, fmt.Errorf("Sparse Float Vector is not supported in row based data") case schemapb.DataType_Bool: idata.Data[field.FieldID] = &BoolFieldData{ @@ -556,6 +558,11 @@ func ColumnBasedInsertMsgToInsertData(msg *msgstream.InsertMsg, collSchema *sche Dim: dim, } + case schemapb.DataType_SparseFloatVector: + fieldData = &SparseFloatVectorFieldData{ + SparseFloatArray: *srcFields[field.FieldID].GetVectors().GetSparseFloatVector(), + } + case schemapb.DataType_Bool: srcData := srcField.GetScalars().GetBoolData().GetData() @@ -823,6 +830,14 @@ func mergeBFloat16VectorField(data *InsertData, fid FieldID, field *BFloat16Vect fieldData.Data = append(fieldData.Data, field.Data...) } +func mergeSparseFloatVectorField(data *InsertData, fid FieldID, field *SparseFloatVectorFieldData) { + if _, ok := data.Data[fid]; !ok { + data.Data[fid] = &SparseFloatVectorFieldData{} + } + fieldData := data.Data[fid].(*SparseFloatVectorFieldData) + fieldData.AppendAllRows(field) +} + // MergeFieldData merge field into data. func MergeFieldData(data *InsertData, fid FieldID, field FieldData) { if field == nil { @@ -857,6 +872,8 @@ func MergeFieldData(data *InsertData, fid FieldID, field FieldData) { mergeFloat16VectorField(data, fid, field) case *BFloat16VectorFieldData: mergeBFloat16VectorField(data, fid, field) + case *SparseFloatVectorFieldData: + mergeSparseFloatVectorField(data, fid, field) } } @@ -1182,6 +1199,18 @@ func TransferInsertDataToInsertRecord(insertData *InsertData) (*segcorepb.Insert }, }, } + case *SparseFloatVectorFieldData: + fieldData = &schemapb.FieldData{ + Type: schemapb.DataType_SparseFloatVector, + FieldId: fieldID, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &rawData.SparseFloatArray, + }, + }, + }, + } default: return insertRecord, fmt.Errorf("unsupported data type when transter storage.InsertData to internalpb.InsertRecord") } diff --git a/internal/storage/utils_test.go b/internal/storage/utils_test.go index 8717662c61b96..f768b24ac56d8 100644 --- a/internal/storage/utils_test.go +++ b/internal/storage/utils_test.go @@ -35,6 +35,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" + "github.com/milvus-io/milvus/pkg/util/testutils" ) func TestCheckTsField(t *testing.T) { @@ -394,6 +395,9 @@ func genAllFieldsSchema(fVecDim, bVecDim, f16VecDim, bf16VecDim int) (schema *sc }, }, }, + { + DataType: schemapb.DataType_SparseFloatVector, + }, { DataType: schemapb.DataType_Array, }, @@ -900,6 +904,26 @@ func genColumnBasedInsertMsg(schema *schemapb.CollectionSchema, numRows, fVecDim for nrows := 0; nrows < numRows; nrows++ { columns[idx] = append(columns[idx], data[nrows*bf16VecDim*2:(nrows+1)*bf16VecDim*2]) } + case schemapb.DataType_SparseFloatVector: + fmt.Println("sparseFloatVector") + data := testutils.GenerateSparseFloatVectors(numRows) + f := &schemapb.FieldData{ + Type: schemapb.DataType_SparseFloatVector, + FieldName: field.Name, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: data.Dim, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: data, + }, + }, + }, + FieldId: field.FieldID, + } + msg.FieldsData = append(msg.FieldsData, f) + for nrows := 0; nrows < numRows; nrows++ { + columns[idx] = append(columns[idx], data.Contents[nrows]) + } case schemapb.DataType_Array: data := generateInt32ArrayList(numRows) @@ -1246,6 +1270,15 @@ func TestMergeInsertData(t *testing.T) { Data: []byte{0, 1}, Dim: 1, }, + SparseFloatVectorField: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + }, + }, + }, ArrayField: &ArrayFieldData{ Data: []*schemapb.ScalarField{ { @@ -1311,6 +1344,14 @@ func TestMergeInsertData(t *testing.T) { Data: []byte{2, 3}, Dim: 1, }, + SparseFloatVectorField: &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{170, 300, 579}, []float32{3.1, 3.2, 3.3}), + }, + }, + }, ArrayField: &ArrayFieldData{ Data: []*schemapb.ScalarField{ { @@ -1387,6 +1428,19 @@ func TestMergeInsertData(t *testing.T) { assert.True(t, ok) assert.Equal(t, []byte{0, 1, 2, 3}, f.(*BFloat16VectorFieldData).Data) + f, ok = d1.Data[SparseFloatVectorField] + assert.True(t, ok) + assert.Equal(t, &SparseFloatVectorFieldData{ + SparseFloatArray: schemapb.SparseFloatArray{ + Dim: 600, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{170, 300, 579}, []float32{3.1, 3.2, 3.3}), + }, + }, + }, f.(*SparseFloatVectorFieldData)) + f, ok = d1.Data[ArrayField] assert.True(t, ok) assert.Equal(t, []int32{1, 2, 3}, f.(*ArrayFieldData).Data[0].GetIntData().GetData()) diff --git a/internal/util/indexcgowrapper/dataset.go b/internal/util/indexcgowrapper/dataset.go index 8a29996dc25f2..a4269f85d56b4 100644 --- a/internal/util/indexcgowrapper/dataset.go +++ b/internal/util/indexcgowrapper/dataset.go @@ -41,6 +41,16 @@ func GenBFloat16VecDataset(vectors []byte) *Dataset { } } +func GenSparseFloatVecDataset(data *storage.SparseFloatVectorFieldData) *Dataset { + // TODO(SPARSE): in search for the usage of this method, only the DType + // of the returned Dataset is used. + // If this is designed to generate a Dataset that will be sent to knowhere, + // we'll need to expose knowhere::sparse::SparseRow to Go. + return &Dataset{ + DType: schemapb.DataType_SparseFloatVector, + } +} + func GenBinaryVecDataset(vectors []byte) *Dataset { return &Dataset{ DType: schemapb.DataType_BinaryVector, @@ -116,6 +126,8 @@ func GenDataset(data storage.FieldData) *Dataset { return GenFloat16VecDataset(f.Data) case *storage.BFloat16VectorFieldData: return GenBFloat16VecDataset(f.Data) + case *storage.SparseFloatVectorFieldData: + return GenSparseFloatVecDataset(f) default: return &Dataset{ DType: schemapb.DataType_None, diff --git a/internal/util/indexcgowrapper/index.go b/internal/util/indexcgowrapper/index.go index ed0a964a2be47..9ed3ae73f2d6a 100644 --- a/internal/util/indexcgowrapper/index.go +++ b/internal/util/indexcgowrapper/index.go @@ -49,6 +49,7 @@ type CgoIndex struct { close bool } +// used only in test // TODO: use proto.Marshal instead of proto.MarshalTextString for better compatibility. func NewCgoIndex(dtype schemapb.DataType, typeParams, indexParams map[string]string) (CodecIndex, error) { protoTypeParams := &indexcgopb.TypeParams{ @@ -123,6 +124,8 @@ func CreateIndexV2(ctx context.Context, buildIndexInfo *BuildIndexInfo) (CodecIn return index, nil } +// TODO: this seems to be used only for test. We should mark the method +// name with ForTest, or maybe move to test file. func (index *CgoIndex) Build(dataset *Dataset) error { switch dataset.DType { case schemapb.DataType_None: @@ -176,6 +179,12 @@ func (index *CgoIndex) buildBFloat16VecIndex(dataset *Dataset) error { return HandleCStatus(&status, "failed to build bfloat16 vector index") } +func (index *CgoIndex) buildSparseFloatVecIndex(dataset *Dataset) error { + vectors := dataset.Data[keyRawArr].([]byte) + status := C.BuildSparseFloatVecIndex(index.indexPtr, (C.int64_t)(len(vectors)), (C.int64_t)(0), (*C.uint8_t)(&vectors[0])) + return HandleCStatus(&status, "failed to build sparse float vector index") +} + func (index *CgoIndex) buildBinaryVecIndex(dataset *Dataset) error { vectors := dataset.Data[keyRawArr].([]byte) status := C.BuildBinaryVecIndex(index.indexPtr, (C.int64_t)(len(vectors)), (*C.uint8_t)(&vectors[0])) diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index 091f6482f2475..4c7e1dad5a139 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -146,7 +146,7 @@ func CheckCtxValid(ctx context.Context) bool { func GetVecFieldIDs(schema *schemapb.CollectionSchema) []int64 { var vecFieldIDs []int64 for _, field := range schema.Fields { - if field.DataType == schemapb.DataType_BinaryVector || field.DataType == schemapb.DataType_FloatVector || field.DataType == schemapb.DataType_Float16Vector { + if field.DataType == schemapb.DataType_BinaryVector || field.DataType == schemapb.DataType_FloatVector || field.DataType == schemapb.DataType_Float16Vector || field.DataType == schemapb.DataType_SparseFloatVector { vecFieldIDs = append(vecFieldIDs, field.FieldID) } } @@ -335,6 +335,8 @@ func GetNumRowOfFieldData(fieldData *schemapb.FieldData) (uint64, error) { if err != nil { return 0, err } + case *schemapb.VectorField_SparseFloatVector: + fieldNumRows = uint64(len(vectorField.GetSparseFloatVector().GetContents())) default: return 0, fmt.Errorf("%s is not supported now", vectorFieldType) } diff --git a/pkg/util/funcutil/placeholdergroup.go b/pkg/util/funcutil/placeholdergroup.go index 5dd13cd1d445d..538a70fcbefcc 100644 --- a/pkg/util/funcutil/placeholdergroup.go +++ b/pkg/util/funcutil/placeholdergroup.go @@ -2,6 +2,7 @@ package funcutil import ( "encoding/binary" + "fmt" "math" "github.com/cockroachdb/errors" @@ -76,6 +77,22 @@ func fieldDataToPlaceholderValue(fieldData *schemapb.FieldData) (*commonpb.Place Values: flattenedFloat16VectorsToByteVectors(x.Bfloat16Vector, int(vectors.Dim)), } return placeholderValue, nil + case schemapb.DataType_SparseFloatVector: + vectors, ok := fieldData.GetVectors().GetData().(*schemapb.VectorField_SparseFloatVector) + if !ok { + return nil, errors.New("vector data is not schemapb.VectorField_SparseFloatVector") + } + vec := vectors.SparseFloatVector + bytes, err := proto.Marshal(vec) + if err != nil { + return nil, fmt.Errorf("failed to marshal schemapb.SparseFloatArray to bytes: %w", err) + } + placeholderValue := &commonpb.PlaceholderValue{ + Tag: "$0", + Type: commonpb.PlaceholderType_SparseFloatVector, + Values: [][]byte{bytes}, + } + return placeholderValue, nil default: return nil, errors.New("field is not a vector field") } diff --git a/pkg/util/gc/gc_tuner.go b/pkg/util/gc/gc_tuner.go index 04a6c33aa729c..26e1f74c335b7 100644 --- a/pkg/util/gc/gc_tuner.go +++ b/pkg/util/gc/gc_tuner.go @@ -87,7 +87,7 @@ func optimizeGOGC() { // currently we assume 20 ms as long gc pause if (m.PauseNs[(m.NumGC+255)%256] / uint64(time.Millisecond)) < 20 { - log.Info("GC Tune done", zap.Uint32("previous GOGC", previousGOGC), + log.Debug("GC Tune done", zap.Uint32("previous GOGC", previousGOGC), zap.Uint64("heapuse ", toMB(heapuse)), zap.Uint64("total memory", toMB(totaluse)), zap.Uint64("next GC", toMB(m.NextGC)), diff --git a/pkg/util/indexparamcheck/conf_adapter_mgr.go b/pkg/util/indexparamcheck/conf_adapter_mgr.go index ca2d53a3f39b9..f62fbe1bf859a 100644 --- a/pkg/util/indexparamcheck/conf_adapter_mgr.go +++ b/pkg/util/indexparamcheck/conf_adapter_mgr.go @@ -56,6 +56,10 @@ func (mgr *indexCheckerMgrImpl) registerIndexChecker() { mgr.checkers[IndexFaissBinIvfFlat] = newBinIVFFlatChecker() mgr.checkers[IndexHNSW] = newHnswChecker() mgr.checkers[IndexDISKANN] = newDiskannChecker() + mgr.checkers[IndexSparseInverted] = newSparseInvertedIndexChecker() + // WAND doesn't have more index params than sparse inverted index, thus + // using the same checker. + mgr.checkers[IndexSparseWand] = newSparseInvertedIndexChecker() } func newIndexCheckerMgr() *indexCheckerMgrImpl { diff --git a/pkg/util/indexparamcheck/constraints.go b/pkg/util/indexparamcheck/constraints.go index dd4ce2a352212..e3ea7548a2c56 100644 --- a/pkg/util/indexparamcheck/constraints.go +++ b/pkg/util/indexparamcheck/constraints.go @@ -41,6 +41,9 @@ const ( CargaBuildAlgoIVFPQ = "IVF_PQ" CargaBuildAlgoNNDESCENT = "NN_DESCENT" + + // Sparse Index Param + SparseDropRatioBuild = "drop_ratio_build" ) // METRICS is a set of all metrics types supported for float vector. @@ -55,9 +58,11 @@ var ( CagraBuildAlgoTypes = []string{CargaBuildAlgoIVFPQ, CargaBuildAlgoNNDESCENT} supportDimPerSubQuantizer = []int{32, 28, 24, 20, 16, 12, 10, 8, 6, 4, 3, 2, 1} // const supportSubQuantizer = []int{96, 64, 56, 48, 40, 32, 28, 24, 20, 16, 12, 8, 4, 3, 2, 1} // const + SparseMetrics = []string{metric.IP} // const ) const ( - FloatVectorDefaultMetricType = metric.IP - BinaryVectorDefaultMetricType = metric.JACCARD + FloatVectorDefaultMetricType = metric.IP + SparseFloatVectorDefaultMetricType = metric.IP + BinaryVectorDefaultMetricType = metric.JACCARD ) diff --git a/pkg/util/indexparamcheck/hnsw_checker.go b/pkg/util/indexparamcheck/hnsw_checker.go index a4dd5b1e7163f..522b31fe16bc3 100644 --- a/pkg/util/indexparamcheck/hnsw_checker.go +++ b/pkg/util/indexparamcheck/hnsw_checker.go @@ -31,6 +31,7 @@ func (c hnswChecker) CheckTrain(params map[string]string) error { } func (c hnswChecker) CheckValidDataType(dType schemapb.DataType) error { + // TODO(SPARSE) we'll add sparse vector support in HNSW later in cardinal if dType != schemapb.DataType_FloatVector && dType != schemapb.DataType_BinaryVector && dType != schemapb.DataType_Float16Vector && dType != schemapb.DataType_BFloat16Vector { return fmt.Errorf("only support float vector or binary vector") } diff --git a/pkg/util/indexparamcheck/index_type.go b/pkg/util/indexparamcheck/index_type.go index 7d361cc117e00..f559e0cabc9ba 100644 --- a/pkg/util/indexparamcheck/index_type.go +++ b/pkg/util/indexparamcheck/index_type.go @@ -30,6 +30,8 @@ const ( IndexFaissBinIvfFlat IndexType = "BIN_IVF_FLAT" IndexHNSW IndexType = "HNSW" IndexDISKANN IndexType = "DISKANN" + IndexSparseInverted IndexType = "SPARSE_INVERTED_INDEX" + IndexSparseWand IndexType = "SPARSE_WAND" ) func IsGpuIndex(indexType IndexType) bool { diff --git a/pkg/util/indexparamcheck/sparse_float_vector_base_checker.go b/pkg/util/indexparamcheck/sparse_float_vector_base_checker.go new file mode 100644 index 0000000000000..d84bbb32c4eaa --- /dev/null +++ b/pkg/util/indexparamcheck/sparse_float_vector_base_checker.go @@ -0,0 +1,47 @@ +package indexparamcheck + +import ( + "fmt" + "strconv" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/common" +) + +// sparse vector don't check for dim, but baseChecker does, thus not including baseChecker +type sparseFloatVectorBaseChecker struct{} + +func (c sparseFloatVectorBaseChecker) StaticCheck(params map[string]string) error { + if !CheckStrByValues(params, Metric, SparseMetrics) { + return fmt.Errorf("metric type not found or not supported, supported: %v", SparseMetrics) + } + + return nil +} + +func (c sparseFloatVectorBaseChecker) CheckTrain(params map[string]string) error { + dropRatioBuildStr, exist := params[SparseDropRatioBuild] + if exist { + dropRatioBuild, err := strconv.ParseFloat(dropRatioBuildStr, 64) + if err != nil || dropRatioBuild < 0 || dropRatioBuild >= 1 { + return fmt.Errorf("invalid drop_ratio_build: %s, must be in range [0, 1)", dropRatioBuildStr) + } + } + + return nil +} + +func (c sparseFloatVectorBaseChecker) CheckValidDataType(dType schemapb.DataType) error { + if dType != schemapb.DataType_SparseFloatVector { + return fmt.Errorf("only sparse float vector is supported for the specified index tpye") + } + return nil +} + +func (c sparseFloatVectorBaseChecker) SetDefaultMetricTypeIfNotExist(params map[string]string) { + setDefaultIfNotExist(params, common.MetricTypeKey, SparseFloatVectorDefaultMetricType) +} + +func newSparseFloatVectorBaseChecker() IndexChecker { + return &sparseFloatVectorBaseChecker{} +} diff --git a/pkg/util/indexparamcheck/sparse_inverted_index_checker.go b/pkg/util/indexparamcheck/sparse_inverted_index_checker.go new file mode 100644 index 0000000000000..c6d62ed01585b --- /dev/null +++ b/pkg/util/indexparamcheck/sparse_inverted_index_checker.go @@ -0,0 +1,9 @@ +package indexparamcheck + +type sparseInvertedIndexChecker struct { + sparseFloatVectorBaseChecker +} + +func newSparseInvertedIndexChecker() *sparseInvertedIndexChecker { + return &sparseInvertedIndexChecker{} +} diff --git a/pkg/util/testutils/sparse_test_utils.go b/pkg/util/testutils/sparse_test_utils.go new file mode 100644 index 0000000000000..6b4a7d117fcc8 --- /dev/null +++ b/pkg/util/testutils/sparse_test_utils.go @@ -0,0 +1,84 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package testutils + +import ( + "encoding/binary" + "math" + "math/rand" + "sort" + + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" +) + +func SparseFloatRowSetAt(row []byte, pos int, idx uint32, value float32) { + binary.LittleEndian.PutUint32(row[pos*8:], idx) + binary.LittleEndian.PutUint32(row[pos*8+4:], math.Float32bits(value)) +} + +func CreateSparseFloatRow(indices []uint32, values []float32) []byte { + row := make([]byte, len(indices)*8) + for i := 0; i < len(indices); i++ { + SparseFloatRowSetAt(row, i, indices[i], values[i]) + } + return row +} + +func GenerateSparseFloatVectors(numRows int) *schemapb.SparseFloatArray { + dim := 700 + avgNnz := 20 + var contents [][]byte + maxDim := 0 + + uniqueAndSort := func(indices []uint32) []uint32 { + seen := make(map[uint32]bool) + var result []uint32 + for _, value := range indices { + if _, ok := seen[value]; !ok { + seen[value] = true + result = append(result, value) + } + } + sort.Slice(result, func(i, j int) bool { + return result[i] < result[j] + }) + return result + } + + for i := 0; i < numRows; i++ { + nnz := rand.Intn(avgNnz*2) + 1 + indices := make([]uint32, 0, nnz) + for j := 0; j < nnz; j++ { + indices = append(indices, uint32(rand.Intn(dim))) + } + indices = uniqueAndSort(indices) + values := make([]float32, 0, len(indices)) + for j := 0; j < len(indices); j++ { + values = append(values, rand.Float32()) + } + if len(indices) > 0 && int(indices[len(indices)-1])+1 > maxDim { + maxDim = int(indices[len(indices)-1]) + 1 + } + rowBytes := CreateSparseFloatRow(indices, values) + + contents = append(contents, rowBytes) + } + return &schemapb.SparseFloatArray{ + Dim: int64(maxDim), + Contents: contents, + } +} diff --git a/pkg/util/typeutil/gen_empty_field_data.go b/pkg/util/typeutil/gen_empty_field_data.go index 3eb5abe3004e2..5d39df275caba 100644 --- a/pkg/util/typeutil/gen_empty_field_data.go +++ b/pkg/util/typeutil/gen_empty_field_data.go @@ -207,6 +207,26 @@ func genEmptyBFloat16VectorFieldData(field *schemapb.FieldSchema) (*schemapb.Fie }, nil } +func genEmptySparseFloatVectorFieldData(field *schemapb.FieldSchema) (*schemapb.FieldData, error) { + return &schemapb.FieldData{ + Type: field.GetDataType(), + FieldName: field.GetName(), + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 0, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: 0, + Contents: make([][]byte, 0), + }, + }, + }, + }, + FieldId: field.GetFieldID(), + IsDynamic: field.GetIsDynamic(), + }, nil +} + func GenEmptyFieldData(field *schemapb.FieldSchema) (*schemapb.FieldData, error) { dataType := field.GetDataType() switch dataType { @@ -234,6 +254,8 @@ func GenEmptyFieldData(field *schemapb.FieldSchema) (*schemapb.FieldData, error) return genEmptyFloat16VectorFieldData(field) case schemapb.DataType_BFloat16Vector: return genEmptyBFloat16VectorFieldData(field) + case schemapb.DataType_SparseFloatVector: + return genEmptySparseFloatVectorFieldData(field) default: return nil, fmt.Errorf("unsupported data type: %s", dataType.String()) } diff --git a/pkg/util/typeutil/get_dim.go b/pkg/util/typeutil/get_dim.go index db102398c1175..8d0b8086bdca2 100644 --- a/pkg/util/typeutil/get_dim.go +++ b/pkg/util/typeutil/get_dim.go @@ -13,6 +13,9 @@ func GetDim(field *schemapb.FieldSchema) (int64, error) { if !IsVectorType(field.GetDataType()) { return 0, fmt.Errorf("%s is not of vector type", field.GetDataType()) } + if IsSparseVectorType(field.GetDataType()) { + return 0, fmt.Errorf("typeutil.GetDim should not invoke on sparse vector type") + } h := NewKvPairs(append(field.GetIndexParams(), field.GetTypeParams()...)) dimStr, err := h.Get(common.DimKey) if err != nil { diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index 18f74080af644..a0937342cd286 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -159,6 +159,12 @@ func estimateSizeBy(schema *schemapb.CollectionSchema, policy getVariableFieldLe break } } + case schemapb.DataType_SparseFloatVector: + // TODO(SPARSE, zhengbuqian): size of sparse flaot vector + // varies depending on the number of non-zeros. Using sparse vector + // generated by SPLADE as reference and returning size of a sparse + // vector with 150 non-zeros. + res += 1200 } } return res, nil @@ -235,6 +241,11 @@ func EstimateEntitySize(fieldsData []*schemapb.FieldData, rowOffset int) (int, e res += int(fs.GetVectors().GetDim()) case schemapb.DataType_FloatVector: res += int(fs.GetVectors().GetDim() * 4) + case schemapb.DataType_SparseFloatVector: + vec := fs.GetVectors().GetSparseFloatVector() + // counting only the size of the vector data, ignoring other + // bytes used in proto. + res += len(vec.Contents[rowOffset]) } } return res, nil @@ -359,13 +370,17 @@ func (helper *SchemaHelper) GetVectorDimFromID(fieldID int64) (int, error) { // IsVectorType returns true if input is a vector type, otherwise false func IsVectorType(dataType schemapb.DataType) bool { switch dataType { - case schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector: + case schemapb.DataType_FloatVector, schemapb.DataType_BinaryVector, schemapb.DataType_Float16Vector, schemapb.DataType_BFloat16Vector, schemapb.DataType_SparseFloatVector: return true default: return false } } +func IsSparseVectorType(dataType schemapb.DataType) bool { + return dataType == schemapb.DataType_SparseFloatVector +} + // IsIntegerType returns true if input is an integer type, otherwise false func IsIntegerType(dataType schemapb.DataType) bool { switch dataType { @@ -516,6 +531,15 @@ func PrepareResultFieldData(sample []*schemapb.FieldData, topK int64) []*schemap vectors.Vectors.Data = &schemapb.VectorField_BinaryVector{ BinaryVector: make([]byte, 0, topK*dim/8), } + case *schemapb.VectorField_SparseFloatVector: + vectors.Vectors.Data = &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + // dim to be updated when appending data. + Dim: 0, + Contents: make([][]byte, 0, topK), + }, + } + vectors.Vectors.Dim = 0 } fd.Field = vectors } @@ -525,7 +549,7 @@ func PrepareResultFieldData(sample []*schemapb.FieldData, topK int64) []*schemap } // AppendFieldData appends fields data of specified index from src to dst -func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx int64) (appendSize int64) { +func AppendFieldData(dst, src []*schemapb.FieldData, idx int64) (appendSize int64) { for i, fieldData := range src { switch fieldType := fieldData.Field.(type) { case *schemapb.FieldData_Scalars: @@ -711,6 +735,18 @@ func AppendFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData, idx i } /* #nosec G103 */ appendSize += int64(unsafe.Sizeof(srcVector.Bfloat16Vector[idx*(dim*2) : (idx+1)*(dim*2)])) + case *schemapb.VectorField_SparseFloatVector: + if dstVector.GetSparseFloatVector() == nil { + dstVector.Data = &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: 0, + Contents: make([][]byte, 0), + }, + } + dstVector.Dim = srcVector.SparseFloatVector.Dim + } + vec := dstVector.Data.(*schemapb.VectorField_SparseFloatVector).SparseFloatVector + appendSize += appendSparseFloatArraySingleRow(vec, srcVector.SparseFloatVector, idx) default: log.Error("Not supported field type", zap.String("field type", fieldData.Type.String())) } @@ -767,6 +803,8 @@ func DeleteFieldData(dst []*schemapb.FieldData) { case *schemapb.VectorField_Bfloat16Vector: dstBfloat16Vector := dstVector.Data.(*schemapb.VectorField_Bfloat16Vector) dstBfloat16Vector.Bfloat16Vector = dstBfloat16Vector.Bfloat16Vector[:len(dstBfloat16Vector.Bfloat16Vector)-int(dim*2)] + case *schemapb.VectorField_SparseFloatVector: + trimSparseFloatArray(dstVector.GetSparseFloatVector()) default: log.Error("wrong field type added", zap.String("field type", fieldData.Type.String())) } @@ -929,6 +967,14 @@ func MergeFieldData(dst []*schemapb.FieldData, src []*schemapb.FieldData) error } else { dstVector.GetFloatVector().Data = append(dstVector.GetFloatVector().Data, srcVector.FloatVector.Data...) } + case *schemapb.VectorField_SparseFloatVector: + if dstVector.GetSparseFloatVector() == nil { + dstVector.Data = &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: srcVector.SparseFloatVector, + } + } else { + appendSparseFloatArray(dstVector.GetSparseFloatVector(), srcVector.SparseFloatVector) + } default: log.Error("Not supported data type", zap.String("data type", srcFieldData.Type.String())) return errors.New("unsupported data type: " + srcFieldData.Type.String()) @@ -1166,6 +1212,8 @@ func GetData(field *schemapb.FieldData, idx int) interface{} { dim := int(field.GetVectors().GetDim()) dataBytes := dim * 2 return field.GetVectors().GetBfloat16Vector()[idx*dataBytes : (idx+1)*dataBytes] + case schemapb.DataType_SparseFloatVector: + return field.GetVectors().GetSparseFloatVector().Contents[idx] } return nil } @@ -1325,3 +1373,81 @@ func AppendGroupByValue(dstResData *schemapb.SearchResultData, } return nil } + +func appendSparseFloatArray(dst, src *schemapb.SparseFloatArray) { + if len(src.Contents) == 0 { + return + } + if dst.Dim < src.Dim { + dst.Dim = src.Dim + } + dst.Contents = append(dst.Contents, src.Contents...) +} + +// return the size of indices and values of the appended row +func appendSparseFloatArraySingleRow(dst, src *schemapb.SparseFloatArray, idx int64) int64 { + row := src.Contents[idx] + dst.Contents = append(dst.Contents, row) + rowDim := SparseFloatRowDim(row) + if rowDim == 0 { + return 0 + } + if dst.Dim < rowDim { + dst.Dim = rowDim + } + return int64(len(row)) +} + +func trimSparseFloatArray(vec *schemapb.SparseFloatArray) { + if len(vec.Contents) == 0 { + return + } + // not decreasing dim of the entire SparseFloatArray, as we don't want to + // iterate through the entire array to find the new max dim. Correctness + // will not be affected. + vec.Contents = vec.Contents[:len(vec.Contents)-1] +} + +func ValidateSparseFloatRows(rows ...[]byte) error { + for _, row := range rows { + if len(row) == 0 { + return errors.New("empty sparse float vector row") + } + if len(row)%8 != 0 { + return fmt.Errorf("invalid data length in sparse float vector: %d", len(row)) + } + for i := 0; i < SparseFloatRowElementCount(row); i++ { + if i > 0 && SparseFloatRowIndexAt(row, i) < SparseFloatRowIndexAt(row, i-1) { + return errors.New("unsorted indices in sparse float vector") + } + VerifyFloat(float64(SparseFloatRowValueAt(row, i))) + } + } + return nil +} + +// SparseFloatRowUtils +func SparseFloatRowElementCount(row []byte) int { + if row == nil { + return 0 + } + return len(row) / 8 +} + +// does not check for out-of-range access +func SparseFloatRowIndexAt(row []byte, idx int) uint32 { + return common.Endian.Uint32(row[idx*8:]) +} + +// does not check for out-of-range access +func SparseFloatRowValueAt(row []byte, idx int) float32 { + return math.Float32frombits(common.Endian.Uint32(row[idx*8+4:])) +} + +// dim of a sparse float vector is the maximum/last index + 1 +func SparseFloatRowDim(row []byte) int64 { + if len(row) == 0 { + return 0 + } + return int64(SparseFloatRowIndexAt(row, SparseFloatRowElementCount(row)-1)) + 1 +} diff --git a/pkg/util/typeutil/schema_test.go b/pkg/util/typeutil/schema_test.go index ade19afbfc8d9..3835fa9317480 100644 --- a/pkg/util/typeutil/schema_test.go +++ b/pkg/util/typeutil/schema_test.go @@ -21,6 +21,7 @@ import ( "reflect" "testing" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -30,6 +31,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/testutils" ) func TestSchema(t *testing.T) { @@ -162,6 +164,7 @@ func TestSchema(t *testing.T) { }, }, }, + // Do not test on sparse float vector field. }, } @@ -221,6 +224,7 @@ func TestSchema(t *testing.T) { assert.True(t, IsVectorType(schemapb.DataType_FloatVector)) assert.True(t, IsVectorType(schemapb.DataType_Float16Vector)) assert.True(t, IsVectorType(schemapb.DataType_BFloat16Vector)) + assert.True(t, IsVectorType(schemapb.DataType_SparseFloatVector)) assert.False(t, IsIntegerType(schemapb.DataType_Bool)) assert.True(t, IsIntegerType(schemapb.DataType_Int8)) @@ -234,6 +238,7 @@ func TestSchema(t *testing.T) { assert.False(t, IsIntegerType(schemapb.DataType_FloatVector)) assert.False(t, IsIntegerType(schemapb.DataType_Float16Vector)) assert.False(t, IsIntegerType(schemapb.DataType_BFloat16Vector)) + assert.False(t, IsIntegerType(schemapb.DataType_SparseFloatVector)) assert.False(t, IsFloatingType(schemapb.DataType_Bool)) assert.False(t, IsFloatingType(schemapb.DataType_Int8)) @@ -247,6 +252,21 @@ func TestSchema(t *testing.T) { assert.False(t, IsFloatingType(schemapb.DataType_FloatVector)) assert.False(t, IsFloatingType(schemapb.DataType_Float16Vector)) assert.False(t, IsFloatingType(schemapb.DataType_BFloat16Vector)) + assert.False(t, IsFloatingType(schemapb.DataType_SparseFloatVector)) + + assert.False(t, IsSparseVectorType(schemapb.DataType_Bool)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Int8)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Int16)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Int32)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Int64)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Float)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Double)) + assert.False(t, IsSparseVectorType(schemapb.DataType_String)) + assert.False(t, IsSparseVectorType(schemapb.DataType_BinaryVector)) + assert.False(t, IsSparseVectorType(schemapb.DataType_FloatVector)) + assert.False(t, IsSparseVectorType(schemapb.DataType_Float16Vector)) + assert.False(t, IsSparseVectorType(schemapb.DataType_BFloat16Vector)) + assert.True(t, IsSparseVectorType(schemapb.DataType_SparseFloatVector)) }) } @@ -285,6 +305,35 @@ func TestSchema_GetVectorFieldSchema(t *testing.T) { assert.Equal(t, "field_float_vector", fieldSchema[0].Name) }) + schemaSparse := &schemapb.CollectionSchema{ + Name: "testColl", + Description: "", + AutoID: false, + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "field_int64", + IsPrimaryKey: true, + Description: "", + DataType: 5, + }, + { + FieldID: 107, + Name: "field_sparse_float_vector", + IsPrimaryKey: false, + Description: "", + DataType: 104, + TypeParams: []*commonpb.KeyValuePair{}, + }, + }, + } + + t.Run("GetSparseFloatVectorFieldSchema", func(t *testing.T) { + fieldSchema := GetVectorFieldSchemas(schemaSparse) + assert.Equal(t, 1, len(fieldSchema)) + assert.Equal(t, "field_sparse_float_vector", fieldSchema[0].Name) + }) + schemaInvalid := &schemapb.CollectionSchema{ Name: "testColl", Description: "", @@ -655,6 +704,23 @@ func genFieldData(fieldName string, fieldID int64, fieldType schemapb.DataType, }, FieldId: fieldID, } + case schemapb.DataType_SparseFloatVector: + fieldData = &schemapb.FieldData{ + Type: schemapb.DataType_SparseFloatVector, + FieldName: fieldName, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: dim, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: dim, + Contents: [][]byte{fieldValue.([]byte)}, + }, + }, + }, + }, + FieldId: fieldID, + } case schemapb.DataType_Array: fieldData = &schemapb.FieldData{ Type: schemapb.DataType_Array, @@ -696,27 +762,29 @@ func genFieldData(fieldName string, fieldID int64, fieldType schemapb.DataType, func TestAppendFieldData(t *testing.T) { const ( - Dim = 8 - BoolFieldName = "BoolField" - Int32FieldName = "Int32Field" - Int64FieldName = "Int64Field" - FloatFieldName = "FloatField" - DoubleFieldName = "DoubleField" - BinaryVectorFieldName = "BinaryVectorField" - FloatVectorFieldName = "FloatVectorField" - Float16VectorFieldName = "Float16VectorField" - BFloat16VectorFieldName = "BFloat16VectorField" - ArrayFieldName = "ArrayField" - BoolFieldID = common.StartOfUserFieldID + 1 - Int32FieldID = common.StartOfUserFieldID + 2 - Int64FieldID = common.StartOfUserFieldID + 3 - FloatFieldID = common.StartOfUserFieldID + 4 - DoubleFieldID = common.StartOfUserFieldID + 5 - BinaryVectorFieldID = common.StartOfUserFieldID + 6 - FloatVectorFieldID = common.StartOfUserFieldID + 7 - Float16VectorFieldID = common.StartOfUserFieldID + 8 - BFloat16VectorFieldID = common.StartOfUserFieldID + 9 - ArrayFieldID = common.StartOfUserFieldID + 10 + Dim = 8 + BoolFieldName = "BoolField" + Int32FieldName = "Int32Field" + Int64FieldName = "Int64Field" + FloatFieldName = "FloatField" + DoubleFieldName = "DoubleField" + BinaryVectorFieldName = "BinaryVectorField" + FloatVectorFieldName = "FloatVectorField" + Float16VectorFieldName = "Float16VectorField" + BFloat16VectorFieldName = "BFloat16VectorField" + ArrayFieldName = "ArrayField" + SparseFloatVectorFieldName = "SparseFloatVectorField" + BoolFieldID = common.StartOfUserFieldID + 1 + Int32FieldID = common.StartOfUserFieldID + 2 + Int64FieldID = common.StartOfUserFieldID + 3 + FloatFieldID = common.StartOfUserFieldID + 4 + DoubleFieldID = common.StartOfUserFieldID + 5 + BinaryVectorFieldID = common.StartOfUserFieldID + 6 + FloatVectorFieldID = common.StartOfUserFieldID + 7 + Float16VectorFieldID = common.StartOfUserFieldID + 8 + BFloat16VectorFieldID = common.StartOfUserFieldID + 9 + ArrayFieldID = common.StartOfUserFieldID + 10 + SparseFloatVectorFieldID = common.StartOfUserFieldID + 11 ) BoolArray := []bool{true, false} Int32Array := []int32{1, 2} @@ -749,8 +817,15 @@ func TestAppendFieldData(t *testing.T) { }, }, } + SparseFloatVector := &schemapb.SparseFloatArray{ + Dim: 231, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + }, + } - result := make([]*schemapb.FieldData, 10) + result := make([]*schemapb.FieldData, 11) var fieldDataArray1 []*schemapb.FieldData fieldDataArray1 = append(fieldDataArray1, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[0:1], 1)) fieldDataArray1 = append(fieldDataArray1, genFieldData(Int32FieldName, Int32FieldID, schemapb.DataType_Int32, Int32Array[0:1], 1)) @@ -762,6 +837,7 @@ func TestAppendFieldData(t *testing.T) { fieldDataArray1 = append(fieldDataArray1, genFieldData(Float16VectorFieldName, Float16VectorFieldID, schemapb.DataType_Float16Vector, Float16Vector[0:Dim*2], Dim)) fieldDataArray1 = append(fieldDataArray1, genFieldData(BFloat16VectorFieldName, BFloat16VectorFieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector[0:Dim*2], Dim)) fieldDataArray1 = append(fieldDataArray1, genFieldData(ArrayFieldName, ArrayFieldID, schemapb.DataType_Array, ArrayArray[0:1], 1)) + fieldDataArray1 = append(fieldDataArray1, genFieldData(SparseFloatVectorFieldName, SparseFloatVectorFieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[0], SparseFloatVector.Dim)) var fieldDataArray2 []*schemapb.FieldData fieldDataArray2 = append(fieldDataArray2, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[1:2], 1)) @@ -774,6 +850,7 @@ func TestAppendFieldData(t *testing.T) { fieldDataArray2 = append(fieldDataArray2, genFieldData(Float16VectorFieldName, Float16VectorFieldID, schemapb.DataType_Float16Vector, Float16Vector[2*Dim:4*Dim], Dim)) fieldDataArray2 = append(fieldDataArray2, genFieldData(BFloat16VectorFieldName, BFloat16VectorFieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector[2*Dim:4*Dim], Dim)) fieldDataArray2 = append(fieldDataArray2, genFieldData(ArrayFieldName, ArrayFieldID, schemapb.DataType_Array, ArrayArray[1:2], 1)) + fieldDataArray2 = append(fieldDataArray2, genFieldData(SparseFloatVectorFieldName, SparseFloatVectorFieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[1], SparseFloatVector.Dim)) AppendFieldData(result, fieldDataArray1, 0) AppendFieldData(result, fieldDataArray2, 0) @@ -788,21 +865,23 @@ func TestAppendFieldData(t *testing.T) { assert.Equal(t, Float16Vector, result[7].GetVectors().Data.(*schemapb.VectorField_Float16Vector).Float16Vector) assert.Equal(t, BFloat16Vector, result[8].GetVectors().Data.(*schemapb.VectorField_Bfloat16Vector).Bfloat16Vector) assert.Equal(t, ArrayArray, result[9].GetScalars().GetArrayData().Data) + assert.Equal(t, SparseFloatVector, result[10].GetVectors().GetSparseFloatVector()) } func TestDeleteFieldData(t *testing.T) { const ( - Dim = 8 - BoolFieldName = "BoolField" - Int32FieldName = "Int32Field" - Int64FieldName = "Int64Field" - FloatFieldName = "FloatField" - DoubleFieldName = "DoubleField" - JSONFieldName = "JSONField" - BinaryVectorFieldName = "BinaryVectorField" - FloatVectorFieldName = "FloatVectorField" - Float16VectorFieldName = "Float16VectorField" - BFloat16VectorFieldName = "BFloat16VectorField" + Dim = 8 + BoolFieldName = "BoolField" + Int32FieldName = "Int32Field" + Int64FieldName = "Int64Field" + FloatFieldName = "FloatField" + DoubleFieldName = "DoubleField" + JSONFieldName = "JSONField" + BinaryVectorFieldName = "BinaryVectorField" + FloatVectorFieldName = "FloatVectorField" + Float16VectorFieldName = "Float16VectorField" + BFloat16VectorFieldName = "BFloat16VectorField" + SparseFloatVectorFieldName = "SparseFloatVectorField" ) const ( @@ -816,6 +895,7 @@ func TestDeleteFieldData(t *testing.T) { FloatVectorFieldID Float16VectorFieldID BFloat16VectorFieldID + SparseFloatVectorFieldID ) BoolArray := []bool{true, false} Int32Array := []int32{1, 2} @@ -833,9 +913,16 @@ func TestDeleteFieldData(t *testing.T) { 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, } + SparseFloatVector := &schemapb.SparseFloatArray{ + Dim: 231, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + }, + } - result1 := make([]*schemapb.FieldData, 10) - result2 := make([]*schemapb.FieldData, 10) + result1 := make([]*schemapb.FieldData, 11) + result2 := make([]*schemapb.FieldData, 11) var fieldDataArray1 []*schemapb.FieldData fieldDataArray1 = append(fieldDataArray1, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[0:1], 1)) fieldDataArray1 = append(fieldDataArray1, genFieldData(Int32FieldName, Int32FieldID, schemapb.DataType_Int32, Int32Array[0:1], 1)) @@ -847,6 +934,7 @@ func TestDeleteFieldData(t *testing.T) { fieldDataArray1 = append(fieldDataArray1, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[0:Dim], Dim)) fieldDataArray1 = append(fieldDataArray1, genFieldData(Float16VectorFieldName, Float16VectorFieldID, schemapb.DataType_Float16Vector, Float16Vector[0:2*Dim], Dim)) fieldDataArray1 = append(fieldDataArray1, genFieldData(BFloat16VectorFieldName, BFloat16VectorFieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector[0:2*Dim], Dim)) + fieldDataArray1 = append(fieldDataArray1, genFieldData(SparseFloatVectorFieldName, SparseFloatVectorFieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[0], SparseFloatVector.Dim)) var fieldDataArray2 []*schemapb.FieldData fieldDataArray2 = append(fieldDataArray2, genFieldData(BoolFieldName, BoolFieldID, schemapb.DataType_Bool, BoolArray[1:2], 1)) @@ -859,6 +947,7 @@ func TestDeleteFieldData(t *testing.T) { fieldDataArray2 = append(fieldDataArray2, genFieldData(FloatVectorFieldName, FloatVectorFieldID, schemapb.DataType_FloatVector, FloatVector[Dim:2*Dim], Dim)) fieldDataArray2 = append(fieldDataArray2, genFieldData(Float16VectorFieldName, Float16VectorFieldID, schemapb.DataType_Float16Vector, Float16Vector[2*Dim:4*Dim], Dim)) fieldDataArray2 = append(fieldDataArray2, genFieldData(BFloat16VectorFieldName, BFloat16VectorFieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector[2*Dim:4*Dim], Dim)) + fieldDataArray2 = append(fieldDataArray2, genFieldData(SparseFloatVectorFieldName, SparseFloatVectorFieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[1], SparseFloatVector.Dim)) AppendFieldData(result1, fieldDataArray1, 0) AppendFieldData(result1, fieldDataArray2, 0) @@ -873,6 +962,9 @@ func TestDeleteFieldData(t *testing.T) { assert.Equal(t, FloatVector[0:Dim], result1[FloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetFloatVector().Data) assert.Equal(t, Float16Vector[0:2*Dim], result1[Float16VectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_Float16Vector).Float16Vector) assert.Equal(t, BFloat16Vector[0:2*Dim], result1[BFloat16VectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_Bfloat16Vector).Bfloat16Vector) + tmpSparseFloatVector := proto.Clone(SparseFloatVector).(*schemapb.SparseFloatArray) + tmpSparseFloatVector.Contents = [][]byte{SparseFloatVector.Contents[0]} + assert.Equal(t, tmpSparseFloatVector, result1[SparseFloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetSparseFloatVector()) AppendFieldData(result2, fieldDataArray2, 0) AppendFieldData(result2, fieldDataArray1, 0) @@ -887,6 +979,9 @@ func TestDeleteFieldData(t *testing.T) { assert.Equal(t, FloatVector[Dim:2*Dim], result2[FloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetFloatVector().Data) assert.Equal(t, Float16Vector[2*Dim:4*Dim], result2[Float16VectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_Float16Vector).Float16Vector) assert.Equal(t, BFloat16Vector[2*Dim:4*Dim], result2[BFloat16VectorFieldID-common.StartOfUserFieldID].GetVectors().Data.(*schemapb.VectorField_Bfloat16Vector).Bfloat16Vector) + tmpSparseFloatVector = proto.Clone(SparseFloatVector).(*schemapb.SparseFloatArray) + tmpSparseFloatVector.Contents = [][]byte{SparseFloatVector.Contents[1]} + assert.Equal(t, tmpSparseFloatVector, result2[SparseFloatVectorFieldID-common.StartOfUserFieldID].GetVectors().GetSparseFloatVector()) } func TestGetPrimaryFieldSchema(t *testing.T) { @@ -1234,6 +1329,13 @@ func TestGetDataAndGetDataSize(t *testing.T) { 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x00, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, 0x77, } + SparseFloatVector := &schemapb.SparseFloatArray{ + Dim: 231, + Contents: [][]byte{ + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + }, + } boolData := genFieldData(fieldName, fieldID, schemapb.DataType_Bool, BoolArray, 1) int8Data := genFieldData(fieldName, fieldID, schemapb.DataType_Int8, Int8Array, 1) @@ -1247,6 +1349,7 @@ func TestGetDataAndGetDataSize(t *testing.T) { floatVecData := genFieldData(fieldName, fieldID, schemapb.DataType_FloatVector, FloatVector, Dim) float16VecData := genFieldData(fieldName, fieldID, schemapb.DataType_Float16Vector, Float16Vector, Dim) bfloat16VecData := genFieldData(fieldName, fieldID, schemapb.DataType_BFloat16Vector, BFloat16Vector, Dim) + sparseFloatData := genFieldData(fieldName, fieldID, schemapb.DataType_SparseFloatVector, SparseFloatVector.Contents[0], SparseFloatVector.Dim) invalidData := &schemapb.FieldData{ Type: schemapb.DataType_None, } @@ -1272,6 +1375,7 @@ func TestGetDataAndGetDataSize(t *testing.T) { floatVecDataRes := GetData(floatVecData, 0) float16VecDataRes := GetData(float16VecData, 0) bfloat16VecDataRes := GetData(bfloat16VecData, 0) + sparseFloatDataRes := GetData(sparseFloatData, 0) invalidDataRes := GetData(invalidData, 0) assert.Equal(t, BoolArray[0], boolDataRes) @@ -1286,11 +1390,23 @@ func TestGetDataAndGetDataSize(t *testing.T) { assert.ElementsMatch(t, FloatVector[:Dim], floatVecDataRes) assert.ElementsMatch(t, Float16Vector[:2*Dim], float16VecDataRes) assert.ElementsMatch(t, BFloat16Vector[:2*Dim], bfloat16VecDataRes) + assert.Equal(t, SparseFloatVector.Contents[0], sparseFloatDataRes) assert.Nil(t, invalidDataRes) }) } func TestMergeFieldData(t *testing.T) { + sparseFloatRows := [][]byte{ + // 3 rows for dst + testutils.CreateSparseFloatRow([]uint32{30, 41, 52}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{60, 80, 230}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{300, 410, 520}, []float32{1.1, 1.2, 1.3}), + // 3 rows for src + testutils.CreateSparseFloatRow([]uint32{600, 800, 2300}, []float32{2.1, 2.2, 2.3}), + testutils.CreateSparseFloatRow([]uint32{90, 141, 352}, []float32{1.1, 1.2, 1.3}), + testutils.CreateSparseFloatRow([]uint32{160, 280, 340}, []float32{2.1, 2.2, 2.3}), + } + t.Run("merge data", func(t *testing.T) { dstFields := []*schemapb.FieldData{ genFieldData("int64", 100, schemapb.DataType_Int64, []int64{1, 2, 3}, 1), @@ -1329,6 +1445,22 @@ func TestMergeFieldData(t *testing.T) { }, FieldId: 105, }, + { + Type: schemapb.DataType_SparseFloatVector, + FieldName: "sparseFloat", + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 521, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: 521, + Contents: sparseFloatRows[:3], + }, + }, + }, + }, + FieldId: 106, + }, } srcFields := []*schemapb.FieldData{ @@ -1372,6 +1504,22 @@ func TestMergeFieldData(t *testing.T) { }, FieldId: 105, }, + { + Type: schemapb.DataType_SparseFloatVector, + FieldName: "sparseFloat", + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 2301, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: 2301, + Contents: sparseFloatRows[3:], + }, + }, + }, + }, + FieldId: 106, + }, } err := MergeFieldData(dstFields, srcFields) @@ -1400,6 +1548,10 @@ func TestMergeFieldData(t *testing.T) { dstFields[3].GetScalars().GetArrayData().Data) assert.Equal(t, [][]byte{[]byte("hoo"), []byte("foo")}, dstFields[4].GetScalars().GetBytesData().Data) assert.Equal(t, [][]byte{[]byte("hello"), []byte("world"), []byte("hoo")}, dstFields[5].GetScalars().GetBytesData().Data) + assert.Equal(t, &schemapb.SparseFloatArray{ + Dim: 2301, + Contents: sparseFloatRows, + }, dstFields[6].GetVectors().GetSparseFloatVector()) }) t.Run("merge with nil", func(t *testing.T) { @@ -1416,6 +1568,22 @@ func TestMergeFieldData(t *testing.T) { }, }, }, 1), + { + Type: schemapb.DataType_SparseFloatVector, + FieldName: "sparseFloat", + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 521, + Data: &schemapb.VectorField_SparseFloatVector{ + SparseFloatVector: &schemapb.SparseFloatArray{ + Dim: 521, + Contents: sparseFloatRows[:3], + }, + }, + }, + }, + FieldId: 104, + }, } dstFields := []*schemapb.FieldData{ @@ -1423,6 +1591,7 @@ func TestMergeFieldData(t *testing.T) { {Type: schemapb.DataType_FloatVector, FieldName: "vector", Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Data: &schemapb.VectorField_FloatVector{}}}, FieldId: 101}, {Type: schemapb.DataType_JSON, FieldName: "json", Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_JsonData{}}}, FieldId: 102}, {Type: schemapb.DataType_Array, FieldName: "array", Field: &schemapb.FieldData_Scalars{Scalars: &schemapb.ScalarField{Data: &schemapb.ScalarField_ArrayData{}}}, FieldId: 103}, + {Type: schemapb.DataType_SparseFloatVector, FieldName: "sparseFloat", Field: &schemapb.FieldData_Vectors{Vectors: &schemapb.VectorField{Data: &schemapb.VectorField_SparseFloatVector{}}}, FieldId: 104}, } err := MergeFieldData(dstFields, srcFields) @@ -1442,6 +1611,10 @@ func TestMergeFieldData(t *testing.T) { }, }, dstFields[3].GetScalars().GetArrayData().Data) + assert.Equal(t, &schemapb.SparseFloatArray{ + Dim: 521, + Contents: sparseFloatRows[:3], + }, dstFields[4].GetVectors().GetSparseFloatVector()) }) t.Run("error case", func(t *testing.T) { @@ -1755,8 +1928,80 @@ func (s *FieldDataSuite) TestPrepareFieldData() { s.EqualValues(128, field.GetVectors().GetDim()) s.EqualValues(topK*128/8, cap(field.GetVectors().GetBinaryVector())) }) + + s.Run("sparse_float_vector", func() { + samples := []*schemapb.FieldData{ + { + FieldId: fieldID, + FieldName: fieldName, + Type: schemapb.DataType_SparseFloatVector, + Field: &schemapb.FieldData_Vectors{ + Vectors: &schemapb.VectorField{ + Dim: 128, + Data: &schemapb.VectorField_SparseFloatVector{}, + }, + }, + }, + } + + fields := PrepareResultFieldData(samples, topK) + s.Require().Len(fields, 1) + field := fields[0] + s.Equal(fieldID, field.GetFieldId()) + s.Equal(fieldName, field.GetFieldName()) + s.Equal(schemapb.DataType_SparseFloatVector, field.GetType()) + + s.EqualValues(0, field.GetVectors().GetDim()) + s.EqualValues(topK, cap(field.GetVectors().GetSparseFloatVector().GetContents())) + }) } func TestFieldData(t *testing.T) { suite.Run(t, new(FieldDataSuite)) } + +func TestValidateSparseFloatRows(t *testing.T) { + t.Run("valid rows", func(t *testing.T) { + rows := [][]byte{ + testutils.CreateSparseFloatRow([]uint32{1, 3, 5}, []float32{1.0, 2.0, 3.0}), + testutils.CreateSparseFloatRow([]uint32{2, 4, 6}, []float32{4.0, 5.0, 6.0}), + testutils.CreateSparseFloatRow([]uint32{0, 7, 8}, []float32{7.0, 8.0, 9.0}), + } + err := ValidateSparseFloatRows(rows...) + assert.NoError(t, err) + }) + + t.Run("nil row", func(t *testing.T) { + err := ValidateSparseFloatRows(nil) + assert.Error(t, err) + }) + + t.Run("incorrect lengths", func(t *testing.T) { + rows := [][]byte{ + make([]byte, 10), + } + err := ValidateSparseFloatRows(rows...) + assert.Error(t, err) + }) + + t.Run("unordered index", func(t *testing.T) { + rows := [][]byte{ + testutils.CreateSparseFloatRow([]uint32{100, 2000, 500}, []float32{1.0, 2.0, 3.0}), + } + err := ValidateSparseFloatRows(rows...) + assert.Error(t, err) + }) + + t.Run("empty indices or values", func(t *testing.T) { + rows := [][]byte{ + testutils.CreateSparseFloatRow([]uint32{}, []float32{}), + } + err := ValidateSparseFloatRows(rows...) + assert.Error(t, err) + }) + + t.Run("no rows", func(t *testing.T) { + err := ValidateSparseFloatRows() + assert.NoError(t, err) + }) +}