From ef3ced81385fa77acecd79d82bd48d5f9348b3e3 Mon Sep 17 00:00:00 2001 From: smellthemoon <64083300+smellthemoon@users.noreply.github.com> Date: Mon, 1 Jul 2024 16:38:06 +0800 Subject: [PATCH] fix: descriptor event in previous version not has nullable to parse error (#34235) #34176 --------- Signed-off-by: lixinguo Co-authored-by: lixinguo --- internal/core/src/common/Consts.h | 1 + internal/core/src/storage/DataCodec.cpp | 3 ++- internal/core/src/storage/Event.cpp | 27 ++++++++++++++------- internal/core/src/storage/Event.h | 5 ++-- internal/core/src/storage/InsertData.cpp | 3 +-- internal/storage/binlog_reader.go | 7 ++++-- internal/storage/binlog_test.go | 30 ------------------------ internal/storage/binlog_writer.go | 3 ++- internal/storage/event_data.go | 26 ++++++++++++++++++-- internal/storage/event_test.go | 28 +++++++++++++--------- 10 files changed, 72 insertions(+), 61 deletions(-) diff --git a/internal/core/src/common/Consts.h b/internal/core/src/common/Consts.h index e5ee8d6765ba..5ccf8e8b4ee7 100644 --- a/internal/core/src/common/Consts.h +++ b/internal/core/src/common/Consts.h @@ -35,6 +35,7 @@ const milvus::FieldId TimestampFieldID = milvus::FieldId(1); // fill followed extra info to binlog file const char ORIGIN_SIZE_KEY[] = "original_size"; const char INDEX_BUILD_ID_KEY[] = "indexBuildID"; +const char NULLABLE[] = "nullable"; const char INDEX_ROOT_PATH[] = "index_files"; const char RAWDATA_ROOT_PATH[] = "raw_datas"; diff --git a/internal/core/src/storage/DataCodec.cpp b/internal/core/src/storage/DataCodec.cpp index 3d596afec308..3d7af86051f1 100644 --- a/internal/core/src/storage/DataCodec.cpp +++ b/internal/core/src/storage/DataCodec.cpp @@ -79,7 +79,8 @@ DeserializeRemoteFileData(BinlogReaderPtr reader) { auto& extras = descriptor_event.event_data.extras; AssertInfo(extras.find(INDEX_BUILD_ID_KEY) != extras.end(), "index build id not exist"); - index_meta.build_id = std::stol(extras[INDEX_BUILD_ID_KEY]); + index_meta.build_id = std::stol( + std::any_cast(extras[INDEX_BUILD_ID_KEY])); index_data->set_index_meta(index_meta); index_data->SetTimestamps(index_event_data.start_timestamp, index_event_data.end_timestamp); diff --git a/internal/core/src/storage/Event.cpp b/internal/core/src/storage/Event.cpp index 4673e50245b4..f27de8de30ee 100644 --- a/internal/core/src/storage/Event.cpp +++ b/internal/core/src/storage/Event.cpp @@ -14,6 +14,9 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include +#include #include "common/Array.h" #include "common/Consts.h" #include "common/EasyAssert.h" @@ -24,6 +27,7 @@ #include "storage/Event.h" #include "storage/PayloadReader.h" #include "storage/PayloadWriter.h" +#include "log/Log.h" namespace milvus::storage { @@ -34,7 +38,7 @@ GetFixPartSize(DescriptorEventData& data) { sizeof(data.fix_part.segment_id) + sizeof(data.fix_part.field_id) + sizeof(data.fix_part.start_timestamp) + sizeof(data.fix_part.end_timestamp) + - sizeof(data.fix_part.data_type) + sizeof(data.fix_part.nullable); + sizeof(data.fix_part.data_type); } int GetFixPartSize(BaseEventData& data) { @@ -107,8 +111,6 @@ DescriptorEventDataFixPart::DescriptorEventDataFixPart(BinlogReaderPtr reader) { assert(ast.ok()); ast = reader->Read(sizeof(field_id), &field_id); assert(ast.ok()); - ast = reader->Read(sizeof(nullable), &nullable); - assert(ast.ok()); ast = reader->Read(sizeof(start_timestamp), &start_timestamp); assert(ast.ok()); ast = reader->Read(sizeof(end_timestamp), &end_timestamp); @@ -122,7 +124,7 @@ DescriptorEventDataFixPart::Serialize() { auto fix_part_size = sizeof(collection_id) + sizeof(partition_id) + sizeof(segment_id) + sizeof(field_id) + sizeof(start_timestamp) + sizeof(end_timestamp) + - sizeof(data_type) + sizeof(nullable); + sizeof(data_type); std::vector res(fix_part_size); int offset = 0; memcpy(res.data() + offset, &collection_id, sizeof(collection_id)); @@ -133,8 +135,6 @@ DescriptorEventDataFixPart::Serialize() { offset += sizeof(segment_id); memcpy(res.data() + offset, &field_id, sizeof(field_id)); offset += sizeof(field_id); - memcpy(res.data() + offset, &nullable, sizeof(nullable)); - offset += sizeof(nullable); memcpy(res.data() + offset, &start_timestamp, sizeof(start_timestamp)); offset += sizeof(start_timestamp); memcpy(res.data() + offset, &end_timestamp, sizeof(end_timestamp)); @@ -163,10 +163,15 @@ DescriptorEventData::DescriptorEventData(BinlogReaderPtr reader) { nlohmann::json json = nlohmann::json::parse(extra_bytes.begin(), extra_bytes.end()); if (json.contains(ORIGIN_SIZE_KEY)) { - extras[ORIGIN_SIZE_KEY] = json[ORIGIN_SIZE_KEY]; + extras[ORIGIN_SIZE_KEY] = + static_cast(json[ORIGIN_SIZE_KEY]); } if (json.contains(INDEX_BUILD_ID_KEY)) { - extras[INDEX_BUILD_ID_KEY] = json[INDEX_BUILD_ID_KEY]; + extras[INDEX_BUILD_ID_KEY] = + static_cast(json[INDEX_BUILD_ID_KEY]); + } + if (json.contains(NULLABLE)) { + extras[NULLABLE] = static_cast(json[NULLABLE]); } } @@ -175,7 +180,11 @@ DescriptorEventData::Serialize() { auto fix_part_data = fix_part.Serialize(); nlohmann::json extras_json; for (auto v : extras) { - extras_json.emplace(v.first, v.second); + if (v.first == NULLABLE) { + extras_json.emplace(v.first, std::any_cast(v.second)); + } else { + extras_json.emplace(v.first, std::any_cast(v.second)); + } } std::string extras_string = extras_json.dump(); extra_length = extras_string.size(); diff --git a/internal/core/src/storage/Event.h b/internal/core/src/storage/Event.h index 2e5152be4588..2922e399f00b 100644 --- a/internal/core/src/storage/Event.h +++ b/internal/core/src/storage/Event.h @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -46,8 +47,6 @@ struct DescriptorEventDataFixPart { int64_t partition_id; int64_t segment_id; int64_t field_id; - //(todo:smellthemoon) set nullable false temporarily, will change it - bool nullable = false; Timestamp start_timestamp; Timestamp end_timestamp; milvus::proto::schema::DataType data_type; @@ -63,7 +62,7 @@ struct DescriptorEventData { DescriptorEventDataFixPart fix_part; int32_t extra_length; std::vector extra_bytes; - std::unordered_map extras; + std::unordered_map extras; std::vector post_header_lengths; DescriptorEventData() = default; diff --git a/internal/core/src/storage/InsertData.cpp b/internal/core/src/storage/InsertData.cpp index 8a74ee1220eb..d4b043c423ba 100644 --- a/internal/core/src/storage/InsertData.cpp +++ b/internal/core/src/storage/InsertData.cpp @@ -61,8 +61,6 @@ InsertData::serialize_to_remote_file() { des_fix_part.start_timestamp = time_range_.first; des_fix_part.end_timestamp = time_range_.second; des_fix_part.data_type = milvus::proto::schema::DataType(data_type); - //(todo:smellthemoon) set nullable false temporarily, will change it - des_fix_part.nullable = false; for (auto i = int8_t(EventType::DescriptorEvent); i < int8_t(EventType::EventTypeEnd); i++) { @@ -71,6 +69,7 @@ InsertData::serialize_to_remote_file() { } des_event_data.extras[ORIGIN_SIZE_KEY] = std::to_string(field_data_->Size()); + //(todo:smellthemoon) set nullable auto& des_event_header = descriptor_event.event_header; // TODO :: set timestamp diff --git a/internal/storage/binlog_reader.go b/internal/storage/binlog_reader.go index 98438c59ff8a..ad364c3d751a 100644 --- a/internal/storage/binlog_reader.go +++ b/internal/storage/binlog_reader.go @@ -49,8 +49,11 @@ func (reader *BinlogReader) NextEventReader() (*EventReader, error) { if reader.eventReader != nil { reader.eventReader.Close() } - var err error - reader.eventReader, err = newEventReader(reader.descriptorEvent.PayloadDataType, reader.buffer, reader.descriptorEvent.Nullable) + nullable, err := reader.descriptorEvent.GetNullable() + if err != nil { + return nil, err + } + reader.eventReader, err = newEventReader(reader.descriptorEvent.PayloadDataType, reader.buffer, nullable) if err != nil { return nil, err } diff --git a/internal/storage/binlog_test.go b/internal/storage/binlog_test.go index 6f93e60edeaf..b5058ab6fa56 100644 --- a/internal/storage/binlog_test.go +++ b/internal/storage/binlog_test.go @@ -123,11 +123,6 @@ func TestInsertBinlog(t *testing.T) { assert.Equal(t, fieldID, int64(40)) pos += int(unsafe.Sizeof(fieldID)) - // descriptor data fix, nullable - nullable := UnsafeReadBool(buf, pos) - assert.Equal(t, nullable, false) - pos += int(unsafe.Sizeof(nullable)) - // descriptor data fix, start time stamp startts := UnsafeReadInt64(buf, pos) assert.Equal(t, startts, int64(1000)) @@ -379,11 +374,6 @@ func TestDeleteBinlog(t *testing.T) { assert.Equal(t, fieldID, int64(-1)) pos += int(unsafe.Sizeof(fieldID)) - // descriptor data fix, nullable - nullable := UnsafeReadBool(buf, pos) - assert.Equal(t, nullable, false) - pos += int(unsafe.Sizeof(nullable)) - // descriptor data fix, start time stamp startts := UnsafeReadInt64(buf, pos) assert.Equal(t, startts, int64(1000)) @@ -635,11 +625,6 @@ func TestDDLBinlog1(t *testing.T) { assert.Equal(t, fieldID, int64(-1)) pos += int(unsafe.Sizeof(fieldID)) - // descriptor data fix, nullable - nullable := UnsafeReadBool(buf, pos) - assert.Equal(t, nullable, false) - pos += int(unsafe.Sizeof(nullable)) - // descriptor data fix, start time stamp startts := UnsafeReadInt64(buf, pos) assert.Equal(t, startts, int64(1000)) @@ -890,11 +875,6 @@ func TestDDLBinlog2(t *testing.T) { assert.Equal(t, fieldID, int64(-1)) pos += int(unsafe.Sizeof(fieldID)) - // descriptor data fix, nullable - nullable := UnsafeReadBool(buf, pos) - assert.Equal(t, nullable, false) - pos += int(unsafe.Sizeof(nullable)) - // descriptor data fix, start time stamp startts := UnsafeReadInt64(buf, pos) assert.Equal(t, startts, int64(1000)) @@ -1140,11 +1120,6 @@ func TestIndexFileBinlog(t *testing.T) { assert.Equal(t, fieldID, fID) pos += int(unsafe.Sizeof(fID)) - // descriptor data fix, nullable - nullable := UnsafeReadBool(buf, pos) - assert.Equal(t, nullable, false) - pos += int(unsafe.Sizeof(nullable)) - // descriptor data fix, start time stamp startts := UnsafeReadInt64(buf, pos) assert.Equal(t, startts, int64(timestamp)) @@ -1274,11 +1249,6 @@ func TestIndexFileBinlogV2(t *testing.T) { assert.Equal(t, fieldID, fID) pos += int(unsafe.Sizeof(fID)) - // descriptor data fix, nullable - nullable := UnsafeReadBool(buf, pos) - assert.Equal(t, nullable, false) - pos += int(unsafe.Sizeof(nullable)) - // descriptor data fix, start time stamp startts := UnsafeReadInt64(buf, pos) assert.Equal(t, startts, int64(timestamp)) diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 583798f11216..2e716f31e852 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -278,7 +278,8 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID descriptorEvent.PartitionID = partitionID descriptorEvent.SegmentID = segmentID descriptorEvent.FieldID = FieldID - descriptorEvent.Nullable = nullable + // store nullable in extra for compatible + descriptorEvent.AddExtra(nullableKey, nullable) w := &InsertBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ diff --git a/internal/storage/event_data.go b/internal/storage/event_data.go index fe9f05532486..0a86fd19f240 100644 --- a/internal/storage/event_data.go +++ b/internal/storage/event_data.go @@ -27,10 +27,12 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/typeutil" ) const originalSizeKey = "original_size" +const nullableKey = "nullable" type descriptorEventData struct { DescriptorEventDataFixPart @@ -46,7 +48,6 @@ type DescriptorEventDataFixPart struct { PartitionID int64 SegmentID int64 FieldID int64 - Nullable bool StartTimestamp typeutil.Timestamp EndTimestamp typeutil.Timestamp PayloadDataType schemapb.DataType @@ -63,6 +64,20 @@ func (data *descriptorEventData) GetEventDataFixPartSize() int32 { return int32(binary.Size(data.DescriptorEventDataFixPart)) } +func (data *descriptorEventData) GetNullable() (bool, error) { + nullableStore, ok := data.Extras[nullableKey] + // previous descriptorEventData not store nullable + if !ok { + return false, nil + } + nullable, ok := nullableStore.(bool) + // will not happend, has checked bool format when FinishExtra + if !ok { + return false, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("value of %v must in bool format", nullableKey)) + } + return nullable, nil +} + // GetMemoryUsageInBytes returns the memory size of DescriptorEventDataFixPart. func (data *descriptorEventData) GetMemoryUsageInBytes() int32 { return data.GetEventDataFixPartSize() + int32(binary.Size(data.PostHeaderLengths)) + int32(binary.Size(data.ExtraLength)) + data.ExtraLength @@ -94,6 +109,14 @@ func (data *descriptorEventData) FinishExtra() error { return fmt.Errorf("value of %v must be able to be converted into int format", originalSizeKey) } + nullableStore, existed := data.Extras[nullableKey] + if existed { + _, ok := nullableStore.(bool) + if !ok { + return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("value of %v must in bool format", nullableKey)) + } + } + data.ExtraBytes, err = json.Marshal(data.Extras) if err != nil { return err @@ -351,7 +374,6 @@ func newDescriptorEventData() *descriptorEventData { StartTimestamp: 0, EndTimestamp: 0, PayloadDataType: -1, - Nullable: false, }, PostHeaderLengths: []uint8{}, Extras: make(map[string]interface{}), diff --git a/internal/storage/event_test.go b/internal/storage/event_test.go index 74827639b8af..3f4ada4076a7 100644 --- a/internal/storage/event_test.go +++ b/internal/storage/event_test.go @@ -54,10 +54,26 @@ func TestDescriptorEvent(t *testing.T) { err = desc.Write(&buf) assert.Error(t, err) + // nullable not existed + nullable, err := desc.GetNullable() + assert.NoError(t, err) + assert.False(t, nullable) + desc.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal)) + desc.AddExtra(nullableKey, "not bool format") err = desc.Write(&buf) + // nullable not formatted + assert.Error(t, err) + + desc.AddExtra(nullableKey, true) + + err = desc.Write(&buf) + assert.NoError(t, err) + + nullable, err = desc.GetNullable() assert.NoError(t, err) + assert.True(t, nullable) buffer := buf.Bytes() @@ -89,25 +105,17 @@ func TestDescriptorEvent(t *testing.T) { int(unsafe.Sizeof(partID))+ int(unsafe.Sizeof(segID))) assert.Equal(t, fieldID, int64(-1)) - nullable := UnsafeReadBool(buffer, binary.Size(eventHeader{})+ - int(unsafe.Sizeof(collID))+ - int(unsafe.Sizeof(partID))+ - int(unsafe.Sizeof(segID))+ - int(unsafe.Sizeof(fieldID))) - assert.Equal(t, nullable, false) startTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+ int(unsafe.Sizeof(collID))+ int(unsafe.Sizeof(partID))+ int(unsafe.Sizeof(segID))+ - int(unsafe.Sizeof(fieldID))+ - int(unsafe.Sizeof(nullable))) + int(unsafe.Sizeof(fieldID))) assert.Equal(t, startTs, int64(0)) endTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+ int(unsafe.Sizeof(collID))+ int(unsafe.Sizeof(partID))+ int(unsafe.Sizeof(segID))+ int(unsafe.Sizeof(fieldID))+ - int(unsafe.Sizeof(nullable))+ int(unsafe.Sizeof(startTs))) assert.Equal(t, endTs, int64(0)) colType := UnsafeReadInt32(buffer, binary.Size(eventHeader{})+ @@ -115,7 +123,6 @@ func TestDescriptorEvent(t *testing.T) { int(unsafe.Sizeof(partID))+ int(unsafe.Sizeof(segID))+ int(unsafe.Sizeof(fieldID))+ - int(unsafe.Sizeof(nullable))+ int(unsafe.Sizeof(startTs))+ int(unsafe.Sizeof(endTs))) assert.Equal(t, colType, int32(-1)) @@ -125,7 +132,6 @@ func TestDescriptorEvent(t *testing.T) { int(unsafe.Sizeof(partID)) + int(unsafe.Sizeof(segID)) + int(unsafe.Sizeof(fieldID)) + - int(unsafe.Sizeof(nullable)) + int(unsafe.Sizeof(startTs)) + int(unsafe.Sizeof(endTs)) + int(unsafe.Sizeof(colType))