Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
  • Loading branch information
shaoting-huang committed Jul 3, 2024
1 parent ec2e083 commit 519ac3e
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 34 deletions.
50 changes: 19 additions & 31 deletions internal/storage/serde_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,11 +792,11 @@ func NewDeltalogMultiFieldWriter(partitionID, segmentID UniqueID, eventWriter *M
arr := []arrow.Array{builder.Field(0).NewArray(), builder.Field(1).NewArray()}

field2Col := map[FieldID]int{
common.RowIDField: 0,
common.DeltaPkField: 0,
common.TimeStampField: 1,
}
schema := map[FieldID]schemapb.DataType{
common.RowIDField: pkType,
common.DeltaPkField: pkType,
common.TimeStampField: schemapb.DataType_Int64,
}
return newSimpleArrowRecord(array.NewRecord(arrowSchema, arr, int64(len(v))), schema, field2Col), memorySize, nil
Expand All @@ -809,41 +809,39 @@ func NewDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog],
return nil, err
}
return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error {
for i := 0; i < r.Len(); i++ {
if v[i] == nil {
v[i] = &DeleteLog{}
}
}
rec, ok := r.(*simpleArrowRecord)
if !ok {
return fmt.Errorf("can not cast to simple arrow record")
}
for i, fid := range rec.r.Schema().Fields() {
a := r.Column(int64(i))

fields := rec.r.Schema().Fields()
for i, fid := range fields {
switch fid.Name {
case "pk":
switch fid.Type.ID() {
case arrow.INT64:
arr := a.(*array.Int64)
for i := 0; i < a.Len(); i++ {
v[i].Pk = &Int64PrimaryKey{Value: arr.Value(i)}
v[i].PkType = int64(schemapb.DataType_Int64)
arr := r.Column(int64(i)).(*array.Int64)
for j := 0; j < r.Len(); j++ {
if v[j] == nil {
v[j] = &DeleteLog{}
}
v[j].Pk = NewInt64PrimaryKey(arr.Value(j))
}
case arrow.STRING:
arr := a.(*array.String)
for i := 0; i < a.Len(); i++ {
v[i].Pk = &VarCharPrimaryKey{Value: arr.Value(i)}
v[i].PkType = int64(schemapb.DataType_VarChar)
arr := r.Column(int64(i)).(*array.String)
for j := 0; j < r.Len(); j++ {
if v[j] == nil {
v[j] = &DeleteLog{}
}
v[j].Pk = NewVarCharPrimaryKey(arr.Value(j))
}
default:
return fmt.Errorf("unexpected delta log pkType %v", fid.Type.Name())
}

case "ts":
arr := a.(*array.Int64)
for i := 0; i < arr.Len(); i++ {
v[i].Ts = uint64(arr.Value(i))
arr := r.Column(int64(i)).(*array.Int64)
for j := 0; j < r.Len(); j++ {
v[j].Ts = uint64(arr.Value(j))
}
default:
return fmt.Errorf("unexpected delta log field: %v", fid.Name)
Expand Down Expand Up @@ -872,16 +870,6 @@ func supportMultiFieldFormat(blobs []*Blob) bool {
if err != nil {
return false
}
er, err := reader.NextEventReader()
defer er.Close()
if err != nil {
return false
}
rr, err := er.GetArrowRecordReader()
defer rr.Release()
if err != nil {
return false
}
version := reader.descriptorEventData.Extras[version]
return version != nil && version.(string) == MULTI_FIELD
}
Expand Down
6 changes: 3 additions & 3 deletions internal/storage/serde_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestDeltalogPkTsSeparateFormat(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
eventWriter := NewMultiFieldDeltalogStreamWriter(0, 0, 0, []*schemapb.FieldSchema{
{FieldID: common.RowIDField, Name: "pk", DataType: tc.pkType},
{FieldID: common.DeltaPkField, Name: "pk", DataType: tc.pkType},
{FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64},
})
writer, err := NewDeltalogMultiFieldWriter(0, 0, eventWriter, 7)
Expand Down Expand Up @@ -506,7 +506,7 @@ func BenchmarkDeltalogFormat(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
eventWriter := NewMultiFieldDeltalogStreamWriter(0, 0, 0, []*schemapb.FieldSchema{
{FieldID: common.RowIDField, Name: "pk", DataType: schemapb.DataType_Int64},
{FieldID: common.DeltaPkField, Name: "pk", DataType: schemapb.DataType_Int64},
{FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64},
})
writer, _ := NewDeltalogMultiFieldWriter(0, 0, eventWriter, size)
Expand All @@ -524,7 +524,7 @@ func BenchmarkDeltalogFormat(b *testing.B) {

func generateTestDeltalogNewFormatData(size int) (*Blob, error) {
eventWriter := NewMultiFieldDeltalogStreamWriter(0, 0, 0, []*schemapb.FieldSchema{
{FieldID: common.RowIDField, Name: "pk", DataType: schemapb.DataType_Int64},
{FieldID: common.DeltaPkField, Name: "pk", DataType: schemapb.DataType_Int64},
{FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64},
})
writer, err := NewDeltalogMultiFieldWriter(0, 0, eventWriter, size)
Expand Down
3 changes: 3 additions & 0 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ const (
// StartOfUserFieldID represents the starting ID of the user-defined field
StartOfUserFieldID = 100

// DeltaPkField is the ID of the primary key field for delta log
DeltaPkField = -1

// RowIDField is the ID of the RowID field reserved by the system
RowIDField = 0

Expand Down

0 comments on commit 519ac3e

Please sign in to comment.