Skip to content

Commit

Permalink
fix: descriptor event in previous version not has nullable to parse e…
Browse files Browse the repository at this point in the history
…rror (milvus-io#34235)

milvus-io#34176

---------

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
  • Loading branch information
smellthemoon and lixinguo committed Jul 1, 2024
1 parent ff51c7e commit ef3ced8
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 61 deletions.
1 change: 1 addition & 0 deletions internal/core/src/common/Consts.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const milvus::FieldId TimestampFieldID = milvus::FieldId(1);
// fill followed extra info to binlog file
const char ORIGIN_SIZE_KEY[] = "original_size";
const char INDEX_BUILD_ID_KEY[] = "indexBuildID";
const char NULLABLE[] = "nullable";

const char INDEX_ROOT_PATH[] = "index_files";
const char RAWDATA_ROOT_PATH[] = "raw_datas";
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/storage/DataCodec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ DeserializeRemoteFileData(BinlogReaderPtr reader) {
auto& extras = descriptor_event.event_data.extras;
AssertInfo(extras.find(INDEX_BUILD_ID_KEY) != extras.end(),
"index build id not exist");
index_meta.build_id = std::stol(extras[INDEX_BUILD_ID_KEY]);
index_meta.build_id = std::stol(
std::any_cast<std::string>(extras[INDEX_BUILD_ID_KEY]));
index_data->set_index_meta(index_meta);
index_data->SetTimestamps(index_event_data.start_timestamp,
index_event_data.end_timestamp);
Expand Down
27 changes: 18 additions & 9 deletions internal/core/src/storage/Event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <glog/logging.h>
#include <any>
#include <string>
#include "common/Array.h"
#include "common/Consts.h"
#include "common/EasyAssert.h"
Expand All @@ -24,6 +27,7 @@
#include "storage/Event.h"
#include "storage/PayloadReader.h"
#include "storage/PayloadWriter.h"
#include "log/Log.h"

namespace milvus::storage {

Expand All @@ -34,7 +38,7 @@ GetFixPartSize(DescriptorEventData& data) {
sizeof(data.fix_part.segment_id) + sizeof(data.fix_part.field_id) +
sizeof(data.fix_part.start_timestamp) +
sizeof(data.fix_part.end_timestamp) +
sizeof(data.fix_part.data_type) + sizeof(data.fix_part.nullable);
sizeof(data.fix_part.data_type);
}
int
GetFixPartSize(BaseEventData& data) {
Expand Down Expand Up @@ -107,8 +111,6 @@ DescriptorEventDataFixPart::DescriptorEventDataFixPart(BinlogReaderPtr reader) {
assert(ast.ok());
ast = reader->Read(sizeof(field_id), &field_id);
assert(ast.ok());
ast = reader->Read(sizeof(nullable), &nullable);
assert(ast.ok());
ast = reader->Read(sizeof(start_timestamp), &start_timestamp);
assert(ast.ok());
ast = reader->Read(sizeof(end_timestamp), &end_timestamp);
Expand All @@ -122,7 +124,7 @@ DescriptorEventDataFixPart::Serialize() {
auto fix_part_size = sizeof(collection_id) + sizeof(partition_id) +
sizeof(segment_id) + sizeof(field_id) +
sizeof(start_timestamp) + sizeof(end_timestamp) +
sizeof(data_type) + sizeof(nullable);
sizeof(data_type);
std::vector<uint8_t> res(fix_part_size);
int offset = 0;
memcpy(res.data() + offset, &collection_id, sizeof(collection_id));
Expand All @@ -133,8 +135,6 @@ DescriptorEventDataFixPart::Serialize() {
offset += sizeof(segment_id);
memcpy(res.data() + offset, &field_id, sizeof(field_id));
offset += sizeof(field_id);
memcpy(res.data() + offset, &nullable, sizeof(nullable));
offset += sizeof(nullable);
memcpy(res.data() + offset, &start_timestamp, sizeof(start_timestamp));
offset += sizeof(start_timestamp);
memcpy(res.data() + offset, &end_timestamp, sizeof(end_timestamp));
Expand Down Expand Up @@ -163,10 +163,15 @@ DescriptorEventData::DescriptorEventData(BinlogReaderPtr reader) {
nlohmann::json json =
nlohmann::json::parse(extra_bytes.begin(), extra_bytes.end());
if (json.contains(ORIGIN_SIZE_KEY)) {
extras[ORIGIN_SIZE_KEY] = json[ORIGIN_SIZE_KEY];
extras[ORIGIN_SIZE_KEY] =
static_cast<std::string>(json[ORIGIN_SIZE_KEY]);
}
if (json.contains(INDEX_BUILD_ID_KEY)) {
extras[INDEX_BUILD_ID_KEY] = json[INDEX_BUILD_ID_KEY];
extras[INDEX_BUILD_ID_KEY] =
static_cast<std::string>(json[INDEX_BUILD_ID_KEY]);
}
if (json.contains(NULLABLE)) {
extras[NULLABLE] = static_cast<bool>(json[NULLABLE]);
}
}

Expand All @@ -175,7 +180,11 @@ DescriptorEventData::Serialize() {
auto fix_part_data = fix_part.Serialize();
nlohmann::json extras_json;
for (auto v : extras) {
extras_json.emplace(v.first, v.second);
if (v.first == NULLABLE) {
extras_json.emplace(v.first, std::any_cast<bool>(v.second));
} else {
extras_json.emplace(v.first, std::any_cast<std::string>(v.second));
}
}
std::string extras_string = extras_json.dump();
extra_length = extras_string.size();
Expand Down
5 changes: 2 additions & 3 deletions internal/core/src/storage/Event.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <any>
#include <string>
#include <memory>
#include <vector>
Expand Down Expand Up @@ -46,8 +47,6 @@ struct DescriptorEventDataFixPart {
int64_t partition_id;
int64_t segment_id;
int64_t field_id;
//(todo:smellthemoon) set nullable false temporarily, will change it
bool nullable = false;
Timestamp start_timestamp;
Timestamp end_timestamp;
milvus::proto::schema::DataType data_type;
Expand All @@ -63,7 +62,7 @@ struct DescriptorEventData {
DescriptorEventDataFixPart fix_part;
int32_t extra_length;
std::vector<uint8_t> extra_bytes;
std::unordered_map<std::string, std::string> extras;
std::unordered_map<std::string, std::any> extras;
std::vector<uint8_t> post_header_lengths;

DescriptorEventData() = default;
Expand Down
3 changes: 1 addition & 2 deletions internal/core/src/storage/InsertData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ InsertData::serialize_to_remote_file() {
des_fix_part.start_timestamp = time_range_.first;
des_fix_part.end_timestamp = time_range_.second;
des_fix_part.data_type = milvus::proto::schema::DataType(data_type);
//(todo:smellthemoon) set nullable false temporarily, will change it
des_fix_part.nullable = false;
for (auto i = int8_t(EventType::DescriptorEvent);
i < int8_t(EventType::EventTypeEnd);
i++) {
Expand All @@ -71,6 +69,7 @@ InsertData::serialize_to_remote_file() {
}
des_event_data.extras[ORIGIN_SIZE_KEY] =
std::to_string(field_data_->Size());
//(todo:smellthemoon) set nullable

auto& des_event_header = descriptor_event.event_header;
// TODO :: set timestamp
Expand Down
7 changes: 5 additions & 2 deletions internal/storage/binlog_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ func (reader *BinlogReader) NextEventReader() (*EventReader, error) {
if reader.eventReader != nil {
reader.eventReader.Close()
}
var err error
reader.eventReader, err = newEventReader(reader.descriptorEvent.PayloadDataType, reader.buffer, reader.descriptorEvent.Nullable)
nullable, err := reader.descriptorEvent.GetNullable()
if err != nil {
return nil, err
}
reader.eventReader, err = newEventReader(reader.descriptorEvent.PayloadDataType, reader.buffer, nullable)
if err != nil {
return nil, err
}
Expand Down
30 changes: 0 additions & 30 deletions internal/storage/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,6 @@ func TestInsertBinlog(t *testing.T) {
assert.Equal(t, fieldID, int64(40))
pos += int(unsafe.Sizeof(fieldID))

// descriptor data fix, nullable
nullable := UnsafeReadBool(buf, pos)
assert.Equal(t, nullable, false)
pos += int(unsafe.Sizeof(nullable))

// descriptor data fix, start time stamp
startts := UnsafeReadInt64(buf, pos)
assert.Equal(t, startts, int64(1000))
Expand Down Expand Up @@ -379,11 +374,6 @@ func TestDeleteBinlog(t *testing.T) {
assert.Equal(t, fieldID, int64(-1))
pos += int(unsafe.Sizeof(fieldID))

// descriptor data fix, nullable
nullable := UnsafeReadBool(buf, pos)
assert.Equal(t, nullable, false)
pos += int(unsafe.Sizeof(nullable))

// descriptor data fix, start time stamp
startts := UnsafeReadInt64(buf, pos)
assert.Equal(t, startts, int64(1000))
Expand Down Expand Up @@ -635,11 +625,6 @@ func TestDDLBinlog1(t *testing.T) {
assert.Equal(t, fieldID, int64(-1))
pos += int(unsafe.Sizeof(fieldID))

// descriptor data fix, nullable
nullable := UnsafeReadBool(buf, pos)
assert.Equal(t, nullable, false)
pos += int(unsafe.Sizeof(nullable))

// descriptor data fix, start time stamp
startts := UnsafeReadInt64(buf, pos)
assert.Equal(t, startts, int64(1000))
Expand Down Expand Up @@ -890,11 +875,6 @@ func TestDDLBinlog2(t *testing.T) {
assert.Equal(t, fieldID, int64(-1))
pos += int(unsafe.Sizeof(fieldID))

// descriptor data fix, nullable
nullable := UnsafeReadBool(buf, pos)
assert.Equal(t, nullable, false)
pos += int(unsafe.Sizeof(nullable))

// descriptor data fix, start time stamp
startts := UnsafeReadInt64(buf, pos)
assert.Equal(t, startts, int64(1000))
Expand Down Expand Up @@ -1140,11 +1120,6 @@ func TestIndexFileBinlog(t *testing.T) {
assert.Equal(t, fieldID, fID)
pos += int(unsafe.Sizeof(fID))

// descriptor data fix, nullable
nullable := UnsafeReadBool(buf, pos)
assert.Equal(t, nullable, false)
pos += int(unsafe.Sizeof(nullable))

// descriptor data fix, start time stamp
startts := UnsafeReadInt64(buf, pos)
assert.Equal(t, startts, int64(timestamp))
Expand Down Expand Up @@ -1274,11 +1249,6 @@ func TestIndexFileBinlogV2(t *testing.T) {
assert.Equal(t, fieldID, fID)
pos += int(unsafe.Sizeof(fID))

// descriptor data fix, nullable
nullable := UnsafeReadBool(buf, pos)
assert.Equal(t, nullable, false)
pos += int(unsafe.Sizeof(nullable))

// descriptor data fix, start time stamp
startts := UnsafeReadInt64(buf, pos)
assert.Equal(t, startts, int64(timestamp))
Expand Down
3 changes: 2 additions & 1 deletion internal/storage/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
descriptorEvent.PartitionID = partitionID
descriptorEvent.SegmentID = segmentID
descriptorEvent.FieldID = FieldID
descriptorEvent.Nullable = nullable
// store nullable in extra for compatible
descriptorEvent.AddExtra(nullableKey, nullable)

w := &InsertBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
Expand Down
26 changes: 24 additions & 2 deletions internal/storage/event_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import (

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

const originalSizeKey = "original_size"
const nullableKey = "nullable"

type descriptorEventData struct {
DescriptorEventDataFixPart
Expand All @@ -46,7 +48,6 @@ type DescriptorEventDataFixPart struct {
PartitionID int64
SegmentID int64
FieldID int64
Nullable bool
StartTimestamp typeutil.Timestamp
EndTimestamp typeutil.Timestamp
PayloadDataType schemapb.DataType
Expand All @@ -63,6 +64,20 @@ func (data *descriptorEventData) GetEventDataFixPartSize() int32 {
return int32(binary.Size(data.DescriptorEventDataFixPart))
}

func (data *descriptorEventData) GetNullable() (bool, error) {
nullableStore, ok := data.Extras[nullableKey]
// previous descriptorEventData not store nullable
if !ok {
return false, nil
}
nullable, ok := nullableStore.(bool)
// will not happend, has checked bool format when FinishExtra
if !ok {
return false, merr.WrapErrParameterInvalidMsg(fmt.Sprintf("value of %v must in bool format", nullableKey))
}
return nullable, nil
}

// GetMemoryUsageInBytes returns the memory size of DescriptorEventDataFixPart.
func (data *descriptorEventData) GetMemoryUsageInBytes() int32 {
return data.GetEventDataFixPartSize() + int32(binary.Size(data.PostHeaderLengths)) + int32(binary.Size(data.ExtraLength)) + data.ExtraLength
Expand Down Expand Up @@ -94,6 +109,14 @@ func (data *descriptorEventData) FinishExtra() error {
return fmt.Errorf("value of %v must be able to be converted into int format", originalSizeKey)
}

nullableStore, existed := data.Extras[nullableKey]
if existed {
_, ok := nullableStore.(bool)
if !ok {
return merr.WrapErrParameterInvalidMsg(fmt.Sprintf("value of %v must in bool format", nullableKey))
}
}

data.ExtraBytes, err = json.Marshal(data.Extras)
if err != nil {
return err
Expand Down Expand Up @@ -351,7 +374,6 @@ func newDescriptorEventData() *descriptorEventData {
StartTimestamp: 0,
EndTimestamp: 0,
PayloadDataType: -1,
Nullable: false,
},
PostHeaderLengths: []uint8{},
Extras: make(map[string]interface{}),
Expand Down
28 changes: 17 additions & 11 deletions internal/storage/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,26 @@ func TestDescriptorEvent(t *testing.T) {
err = desc.Write(&buf)
assert.Error(t, err)

// nullable not existed
nullable, err := desc.GetNullable()
assert.NoError(t, err)
assert.False(t, nullable)

desc.AddExtra(originalSizeKey, fmt.Sprintf("%v", sizeTotal))
desc.AddExtra(nullableKey, "not bool format")

err = desc.Write(&buf)
// nullable not formatted
assert.Error(t, err)

desc.AddExtra(nullableKey, true)

err = desc.Write(&buf)
assert.NoError(t, err)

nullable, err = desc.GetNullable()
assert.NoError(t, err)
assert.True(t, nullable)

buffer := buf.Bytes()

Expand Down Expand Up @@ -89,33 +105,24 @@ func TestDescriptorEvent(t *testing.T) {
int(unsafe.Sizeof(partID))+
int(unsafe.Sizeof(segID)))
assert.Equal(t, fieldID, int64(-1))
nullable := UnsafeReadBool(buffer, binary.Size(eventHeader{})+
int(unsafe.Sizeof(collID))+
int(unsafe.Sizeof(partID))+
int(unsafe.Sizeof(segID))+
int(unsafe.Sizeof(fieldID)))
assert.Equal(t, nullable, false)
startTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
int(unsafe.Sizeof(collID))+
int(unsafe.Sizeof(partID))+
int(unsafe.Sizeof(segID))+
int(unsafe.Sizeof(fieldID))+
int(unsafe.Sizeof(nullable)))
int(unsafe.Sizeof(fieldID)))
assert.Equal(t, startTs, int64(0))
endTs := UnsafeReadInt64(buffer, binary.Size(eventHeader{})+
int(unsafe.Sizeof(collID))+
int(unsafe.Sizeof(partID))+
int(unsafe.Sizeof(segID))+
int(unsafe.Sizeof(fieldID))+
int(unsafe.Sizeof(nullable))+
int(unsafe.Sizeof(startTs)))
assert.Equal(t, endTs, int64(0))
colType := UnsafeReadInt32(buffer, binary.Size(eventHeader{})+
int(unsafe.Sizeof(collID))+
int(unsafe.Sizeof(partID))+
int(unsafe.Sizeof(segID))+
int(unsafe.Sizeof(fieldID))+
int(unsafe.Sizeof(nullable))+
int(unsafe.Sizeof(startTs))+
int(unsafe.Sizeof(endTs)))
assert.Equal(t, colType, int32(-1))
Expand All @@ -125,7 +132,6 @@ func TestDescriptorEvent(t *testing.T) {
int(unsafe.Sizeof(partID)) +
int(unsafe.Sizeof(segID)) +
int(unsafe.Sizeof(fieldID)) +
int(unsafe.Sizeof(nullable)) +
int(unsafe.Sizeof(startTs)) +
int(unsafe.Sizeof(endTs)) +
int(unsafe.Sizeof(colType))
Expand Down

0 comments on commit ef3ced8

Please sign in to comment.