From 9f76b94625305b1a7fc60c9ee6c7b781246da730 Mon Sep 17 00:00:00 2001 From: shaoting-huang Date: Tue, 25 Jun 2024 12:03:57 +0800 Subject: [PATCH] clean up Signed-off-by: shaoting-huang --- internal/storage/serde_events.go | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index 5de7e2ad5b95..0dffeba9ff83 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -430,25 +430,24 @@ func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) { if dsw.rw != nil { return dsw.rw, nil } - var dataType arrow.DataType switch dsw.fieldSchema.DataType { - case schemapb.DataType_Int64: - dataType = &arrow.Int64Type{} - case schemapb.DataType_String: - dataType = arrow.BinaryTypes.String + case schemapb.DataType_Int64, schemapb.DataType_VarChar: + dim, _ := typeutil.GetDim(dsw.fieldSchema) + rw, err := newSingleFieldRecordWriter(dsw.fieldSchema.FieldID, arrow.Field{ + Name: dsw.fieldSchema.Name, + Type: serdeMap[dsw.fieldSchema.DataType].arrowType(int(dim)), + Nullable: false, // No nullable check here. + }, &dsw.buf) + if err != nil { + return nil, err + } + dsw.rw = rw + return rw, nil default: - return nil, fmt.Errorf("does not support data type") + return nil, merr.WrapErrServiceInternal(fmt.Sprintf( + "does not support delta log primary key data type %s", + dsw.fieldSchema.DataType.String())) } - rw, err := newSingleFieldRecordWriter(dsw.fieldSchema.FieldID, arrow.Field{ - Name: dsw.fieldSchema.Name, - Type: dataType, - Nullable: false, - }, &dsw.buf) - if err != nil { - return nil, err - } - dsw.rw = rw - return rw, nil } func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) {