Skip to content

Commit

Permalink
enhance: support upsert autoid==true (#30342)
Browse files Browse the repository at this point in the history
related with: #29258

---------

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
  • Loading branch information
smellthemoon and lixinguo committed Jul 11, 2024
1 parent e411548 commit 07b94b4
Show file tree
Hide file tree
Showing 8 changed files with 620 additions and 279 deletions.
55 changes: 41 additions & 14 deletions internal/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func TestProxy(t *testing.T) {
}
}

constructCollectionUpsertRequest := func() *milvuspb.UpsertRequest {
constructCollectionUpsertRequestNoPK := func() *milvuspb.UpsertRequest {
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
bVecColumn := newBinaryVectorFieldData(binaryVecField, rowNum, dim)
hashKeys := testutils.GenerateHashKeys(rowNum)
Expand All @@ -642,6 +642,22 @@ func TestProxy(t *testing.T) {
}
}

constructCollectionUpsertRequestWithPK := func() *milvuspb.UpsertRequest {
pkFieldData := newScalarFieldData(schema.Fields[0], int64Field, rowNum)
fVecColumn := newFloatVectorFieldData(floatVecField, rowNum, dim)
bVecColumn := newBinaryVectorFieldData(binaryVecField, rowNum, dim)
hashKeys := testutils.GenerateHashKeys(rowNum)
return &milvuspb.UpsertRequest{
Base: nil,
DbName: dbName,
CollectionName: collectionName,
PartitionName: partitionName,
FieldsData: []*schemapb.FieldData{pkFieldData, fVecColumn, bVecColumn},
HashKeys: hashKeys,
NumRows: uint32(rowNum),
}
}

constructCreateIndexRequest := func(dataType schemapb.DataType) *milvuspb.CreateIndexRequest {
req := &milvuspb.CreateIndexRequest{
Base: nil,
Expand Down Expand Up @@ -2236,6 +2252,30 @@ func TestProxy(t *testing.T) {
assert.NotEqual(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})

wg.Add(1)
t.Run("upsert when autoID == true", func(t *testing.T) {
defer wg.Done()
// autoID==true but not pass pk in upsert, failed
req := constructCollectionUpsertRequestNoPK()

resp, err := proxy.Upsert(ctx, req)
assert.NoError(t, err)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrParameterInvalid)
assert.Equal(t, 0, len(resp.SuccIndex))
assert.Equal(t, rowNum, len(resp.ErrIndex))
assert.Equal(t, int64(0), resp.UpsertCnt)

// autoID==true and pass pk in upsert, succeed
req = constructCollectionUpsertRequestWithPK()

resp, err = proxy.Upsert(ctx, req)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.Equal(t, rowNum, len(resp.SuccIndex))
assert.Equal(t, 0, len(resp.ErrIndex))
assert.Equal(t, int64(rowNum), resp.UpsertCnt)
})

wg.Add(1)
t.Run("release partition", func(t *testing.T) {
defer wg.Done()
Expand Down Expand Up @@ -2391,19 +2431,6 @@ func TestProxy(t *testing.T) {
assert.NoError(t, err)
})

wg.Add(1)
t.Run("upsert when autoID == true", func(t *testing.T) {
defer wg.Done()
req := constructCollectionUpsertRequest()

resp, err := proxy.Upsert(ctx, req)
assert.NoError(t, err)
assert.ErrorIs(t, merr.Error(resp.GetStatus()), merr.ErrParameterInvalid)
assert.Equal(t, 0, len(resp.SuccIndex))
assert.Equal(t, rowNum, len(resp.ErrIndex))
assert.Equal(t, int64(0), resp.UpsertCnt)
})

wg.Add(1)
t.Run("drop collection", func(t *testing.T) {
defer wg.Done()
Expand Down
2 changes: 1 addition & 1 deletion internal/proxy/task_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (it *insertTask) PreExecute(ctx context.Context) error {
// check primaryFieldData whether autoID is true or not
// set rowIDs as primary data if autoID == true
// TODO(dragondriver): in fact, NumRows is not trustable, we should check all input fields
it.result.IDs, err = checkPrimaryFieldData(it.schema, it.result, it.insertMsg, true)
it.result.IDs, err = checkPrimaryFieldData(it.schema, it.insertMsg, true)
log := log.Ctx(ctx).With(zap.String("collectionName", collectionName))
if err != nil {
log.Warn("check primary field data and hash primary key failed",
Expand Down
6 changes: 3 additions & 3 deletions internal/proxy/task_upsert.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,10 @@ func (it *upsertTask) insertPreExecute(ctx context.Context) error {
}
}

// check primaryFieldData whether autoID is true or not
// only allow support autoID == false
// use the passed pk as new pk when autoID == false
// automatic generate pk as new pk wehen autoID == true
var err error
it.result.IDs, err = checkPrimaryFieldData(it.schema.CollectionSchema, it.result, it.upsertMsg.InsertMsg, false)
it.result.IDs, err = checkPrimaryFieldData(it.schema.CollectionSchema, it.upsertMsg.InsertMsg, false)
log := log.Ctx(ctx).With(zap.String("collectionName", it.upsertMsg.InsertMsg.CollectionName))
if err != nil {
log.Warn("check primary field data and hash primary key failed when upsert",
Expand Down
65 changes: 40 additions & 25 deletions internal/proxy/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ func isPartitionLoaded(ctx context.Context, qc types.QueryCoordClient, collID in
return false, nil
}

func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg) error {
func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg, inInsert bool) error {
log := log.With(zap.String("collection", schema.GetName()))
primaryKeyNum := 0
autoGenFieldNum := 0
Expand All @@ -1142,16 +1142,20 @@ func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgst
log.Warn("not primary key field, but set autoID true", zap.String("field", fieldSchema.GetName()))
return merr.WrapErrParameterInvalidMsg("only primary key could be with AutoID enabled")
}

if fieldSchema.IsPrimaryKey {
primaryKeyNum++
}
if fieldSchema.GetDefaultValue() != nil && fieldSchema.IsPrimaryKey {
return merr.WrapErrParameterInvalidMsg("primary key can't be with default value")
}
if fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && inInsert {
// when inInsert, no need to pass when pk is autoid and SkipAutoIDCheck is false
autoGenFieldNum++
}
if _, ok := dataNameSet[fieldSchema.GetName()]; !ok {
if fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() {
// no need to pass when pk is autoid and SkipAutoIDCheck is false
autoGenFieldNum++
if fieldSchema.IsPrimaryKey && fieldSchema.AutoID && !Params.ProxyCfg.SkipAutoIDCheck.GetAsBool() && inInsert {
// autoGenField
continue
}
if fieldSchema.GetDefaultValue() == nil && !fieldSchema.GetNullable() {
Expand All @@ -1175,7 +1179,8 @@ func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgst
}

expectedNum := len(schema.Fields)
actualNum := autoGenFieldNum + len(insertMsg.FieldsData)
actualNum := len(insertMsg.FieldsData) + autoGenFieldNum

if expectedNum != actualNum {
log.Warn("the number of fields is not the same as needed", zap.Int("expected", expectedNum), zap.Int("actual", actualNum))
return merr.WrapErrParameterInvalid(expectedNum, actualNum, "more fieldData has pass in")
Expand All @@ -1184,20 +1189,21 @@ func checkFieldsDataBySchema(schema *schemapb.CollectionSchema, insertMsg *msgst
return nil
}

func checkPrimaryFieldData(schema *schemapb.CollectionSchema, result *milvuspb.MutationResult, insertMsg *msgstream.InsertMsg, inInsert bool) (*schemapb.IDs, error) {
func checkPrimaryFieldData(schema *schemapb.CollectionSchema, insertMsg *msgstream.InsertMsg, inInsert bool) (*schemapb.IDs, error) {
log := log.With(zap.String("collectionName", insertMsg.CollectionName))
rowNums := uint32(insertMsg.NRows())
// TODO(dragondriver): in fact, NumRows is not trustable, we should check all input fields
if insertMsg.NRows() <= 0 {
return nil, merr.WrapErrParameterInvalid("invalid num_rows", fmt.Sprint(rowNums), "num_rows should be greater than 0")
}

if err := checkFieldsDataBySchema(schema, insertMsg); err != nil {
if err := checkFieldsDataBySchema(schema, insertMsg, inInsert); err != nil {
return nil, err
}

primaryFieldSchema, err := typeutil.GetPrimaryFieldSchema(schema)
if err != nil {
log.Error("get primary field schema failed", zap.String("collectionName", insertMsg.CollectionName), zap.Any("schema", schema), zap.Error(err))
log.Error("get primary field schema failed", zap.Any("schema", schema), zap.Error(err))
return nil, err
}
if primaryFieldSchema.GetNullable() {
Expand All @@ -1215,45 +1221,54 @@ func checkPrimaryFieldData(schema *schemapb.CollectionSchema, result *milvuspb.M
if !primaryFieldSchema.AutoID || skipAutoIDCheck {
primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema)
if err != nil {
log.Info("get primary field data failed", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
log.Info("get primary field data failed", zap.Error(err))
return nil, err
}
} else {
// check primary key data not exist
if typeutil.IsPrimaryFieldDataExist(insertMsg.GetFieldsData(), primaryFieldSchema) {
return nil, fmt.Errorf("can not assign primary field data when auto id enabled %v", primaryFieldSchema.Name)
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("can not assign primary field data when auto id enabled %v", primaryFieldSchema.Name))
}
// if autoID == true, currently support autoID for int64 and varchar PrimaryField
primaryFieldData, err = autoGenPrimaryFieldData(primaryFieldSchema, insertMsg.GetRowIDs())
if err != nil {
log.Info("generate primary field data failed when autoID == true", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
log.Info("generate primary field data failed when autoID == true", zap.Error(err))
return nil, err
}
// if autoID == true, set the primary field data
// insertMsg.fieldsData need append primaryFieldData
insertMsg.FieldsData = append(insertMsg.FieldsData, primaryFieldData)
}
} else {
// when checkPrimaryFieldData in upsert
if primaryFieldSchema.AutoID {
// upsert has not supported when autoID == true
log.Info("can not upsert when auto id enabled",
zap.String("primaryFieldSchemaName", primaryFieldSchema.Name))
err := merr.WrapErrParameterInvalidMsg(fmt.Sprintf("upsert can not assign primary field data when auto id enabled %v", primaryFieldSchema.GetName()))
result.Status = merr.Status(err)
return nil, err
}
primaryFieldData, err = typeutil.GetPrimaryFieldData(insertMsg.GetFieldsData(), primaryFieldSchema)
if err != nil {
log.Error("get primary field data failed when upsert", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
return nil, err
primaryFieldID := primaryFieldSchema.FieldID
primaryFieldName := primaryFieldSchema.Name
for i, field := range insertMsg.GetFieldsData() {
if field.FieldId == primaryFieldID || field.FieldName == primaryFieldName {
primaryFieldData = field
if primaryFieldSchema.AutoID {
// use the passed pk as new pk when autoID == false
// automatic generate pk as new pk wehen autoID == true
newPrimaryFieldData, err := autoGenPrimaryFieldData(primaryFieldSchema, insertMsg.GetRowIDs())
if err != nil {
log.Info("generate new primary field data failed when upsert", zap.Error(err))
return nil, err
}
insertMsg.FieldsData = append(insertMsg.GetFieldsData()[:i], insertMsg.GetFieldsData()[i+1:]...)
insertMsg.FieldsData = append(insertMsg.FieldsData, newPrimaryFieldData)
}
break
}
}
// must assign primary field data when upsert
if primaryFieldData == nil {
return nil, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("must assign pk when upsert, primary field: %v", primaryFieldName))
}
}

// parse primaryFieldData to result.IDs, and as returned primary keys
ids, err := parsePrimaryFieldData2IDs(primaryFieldData)
if err != nil {
log.Warn("parse primary field data to IDs failed", zap.String("collectionName", insertMsg.CollectionName), zap.Error(err))
log.Warn("parse primary field data to IDs failed", zap.Error(err))
return nil, err
}

Expand Down
Loading

0 comments on commit 07b94b4

Please sign in to comment.