diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index 5eb6c8a311ce..844495ceb0bb 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -569,7 +570,7 @@ using DataTypeToOffsetMap = std::unordered_map, int64_t>; template -void +bool WriteOptFieldIvfDataImpl( const int64_t field_id, const std::shared_ptr& local_chunk_manager, @@ -587,6 +588,12 @@ WriteOptFieldIvfDataImpl( mp[val].push_back(offset++); } } + + // Do not write to disk if there is only one value + if (mp.size() == 1) { + return false; + } + local_chunk_manager->Write(local_data_path, write_offset, const_cast(&field_id), @@ -612,6 +619,7 @@ WriteOptFieldIvfDataImpl( data_size); write_offset += data_size; } + return true; } #define GENERATE_OPT_FIELD_IVF_IMPL(DT) \ @@ -630,32 +638,23 @@ WriteOptFieldIvfData( uint64_t& write_offset) { switch (dt) { case DataType::BOOL: - GENERATE_OPT_FIELD_IVF_IMPL(DataType::BOOL); - break; + return GENERATE_OPT_FIELD_IVF_IMPL(DataType::BOOL); case DataType::INT8: - GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT8); - break; + return GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT8); case DataType::INT16: - GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT16); - break; + return GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT16); case DataType::INT32: - GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT32); - break; + return GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT32); case DataType::INT64: - GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT64); - break; + return GENERATE_OPT_FIELD_IVF_IMPL(DataType::INT64); case DataType::FLOAT: - GENERATE_OPT_FIELD_IVF_IMPL(DataType::FLOAT); - break; + return GENERATE_OPT_FIELD_IVF_IMPL(DataType::FLOAT); case DataType::DOUBLE: - GENERATE_OPT_FIELD_IVF_IMPL(DataType::DOUBLE); - break; + return GENERATE_OPT_FIELD_IVF_IMPL(DataType::DOUBLE); case DataType::STRING: - GENERATE_OPT_FIELD_IVF_IMPL(DataType::STRING); - break; + return GENERATE_OPT_FIELD_IVF_IMPL(DataType::STRING); case DataType::VARCHAR: - GENERATE_OPT_FIELD_IVF_IMPL(DataType::VARCHAR); - break; + return GENERATE_OPT_FIELD_IVF_IMPL(DataType::VARCHAR); default: LOG_WARN("Unsupported data type in optional scalar field: ", dt); return false; @@ -693,7 +692,7 @@ WriteOptFieldsIvfMeta( std::string DiskFileManagerImpl::CacheOptFieldToDisk( std::shared_ptr space, OptFieldT& fields_map) { - uint32_t num_of_fields = fields_map.size(); + const uint32_t num_of_fields = fields_map.size(); if (0 == num_of_fields) { return ""; } else if (num_of_fields > 1) { @@ -719,6 +718,7 @@ DiskFileManagerImpl::CacheOptFieldToDisk( WriteOptFieldsIvfMeta( local_chunk_manager, local_data_path, num_of_fields, write_offset); + std::unordered_set actual_field_ids; auto reader = space->ScanData(); for (auto& [field_id, tup] : fields_map) { const auto& field_name = std::get<0>(tup); @@ -745,12 +745,23 @@ DiskFileManagerImpl::CacheOptFieldToDisk( field_data->FillFieldData(col_data); field_datas.emplace_back(field_data); } - if (!WriteOptFieldIvfData(field_type, - field_id, - local_chunk_manager, - local_data_path, - field_datas, - write_offset)) { + if (WriteOptFieldIvfData(field_type, + field_id, + local_chunk_manager, + local_data_path, + field_datas, + write_offset)) { + actual_field_ids.insert(field_id); + } + } + + if (actual_field_ids.size() != num_of_fields) { + write_offset = 0; + WriteOptFieldsIvfMeta(local_chunk_manager, + local_data_path, + actual_field_ids.size(), + write_offset); + if (actual_field_ids.empty()) { return ""; } } @@ -759,7 +770,7 @@ DiskFileManagerImpl::CacheOptFieldToDisk( std::string DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) { - uint32_t num_of_fields = fields_map.size(); + const uint32_t num_of_fields = fields_map.size(); if (0 == num_of_fields) { return ""; } else if (num_of_fields > 1) { @@ -793,6 +804,7 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) { auto parallel_degree = uint64_t(DEFAULT_FIELD_MAX_MEMORY_LIMIT / FILE_SLICE_SIZE); + std::unordered_set actual_field_ids; for (auto& [field_id, tup] : fields_map) { const auto& field_type = std::get<1>(tup); auto& field_paths = std::get<2>(tup); @@ -814,15 +826,27 @@ DiskFileManagerImpl::CacheOptFieldToDisk(OptFieldT& fields_map) { if (batch_files.size() > 0) { FetchRawData(); } - if (!WriteOptFieldIvfData(field_type, - field_id, - local_chunk_manager, - local_data_path, - field_datas, - write_offset)) { + if (WriteOptFieldIvfData(field_type, + field_id, + local_chunk_manager, + local_data_path, + field_datas, + write_offset)) { + actual_field_ids.insert(field_id); + } + } + + if (actual_field_ids.size() != num_of_fields) { + write_offset = 0; + WriteOptFieldsIvfMeta(local_chunk_manager, + local_data_path, + actual_field_ids.size(), + write_offset); + if (actual_field_ids.empty()) { return ""; } } + return local_data_path; } diff --git a/internal/core/unittest/test_disk_file_manager_test.cpp b/internal/core/unittest/test_disk_file_manager_test.cpp index 90fa4a7887ba..4c5b75001106 100644 --- a/internal/core/unittest/test_disk_file_manager_test.cpp +++ b/internal/core/unittest/test_disk_file_manager_test.cpp @@ -18,7 +18,9 @@ #include #include #include +#include #include +#include #include #include #include @@ -202,14 +204,12 @@ TEST_F(DiskAnnFileManagerTest, TestThreadPoolException) { } } +namespace { const int64_t kOptFieldId = 123456; const std::string kOptFieldName = "opt_field_name"; -const DataType kOptFiledType = DataType::INT64; -const int64_t kOptFieldDataRange = 10000; -const std::string kOptFieldPath = "/tmp/diskann/opt_field/123123"; -// const std::string kOptFieldPath = "/tmp/diskann/index_files/1000/index"; -const size_t kEntityCnt = 1000 * 1000; -const DataType kOptFieldDataType = DataType::INT64; +const int64_t kOptFieldDataRange = 1000; +const std::string kOptFieldPath = "/tmp/diskann/opt_field/"; +const size_t kEntityCnt = 1000 * 10; const FieldDataMeta kOptVecFieldDataMeta = {1, 2, 3, 100}; using OffsetT = uint32_t; @@ -225,25 +225,50 @@ CreateFileManager(const ChunkManagerPtr& cm) storage::FileManagerContext(kOptVecFieldDataMeta, index_meta, cm)); } +template auto -PrepareRawFieldData() -> std::vector { - std::vector data(kEntityCnt); - int64_t field_val = 0; +PrepareRawFieldData(const int64_t opt_field_data_range) -> std::vector { + if (opt_field_data_range > std::numeric_limits::max()) { + throw std::runtime_error("field data range is too large: " + + std::to_string(opt_field_data_range)); + } + std::vector data(kEntityCnt); + T field_val = 0; for (size_t i = 0; i < kEntityCnt; ++i) { data[i] = field_val++; - if (field_val >= kOptFieldDataRange) { + if (field_val >= opt_field_data_range) { field_val = 0; } } return data; } +template <> auto -PrepareInsertData() -> std::string { - size_t sz = sizeof(int64_t) * kEntityCnt; - std::vector data = PrepareRawFieldData(); - auto field_data = - storage::CreateFieldData(kOptFieldDataType, 1, kEntityCnt); +PrepareRawFieldData(const int64_t opt_field_data_range) + -> std::vector { + if (opt_field_data_range > std::numeric_limits::max()) { + throw std::runtime_error("field data range is too large: " + + std::to_string(opt_field_data_range)); + } + std::vector data(kEntityCnt); + char field_val = 0; + for (size_t i = 0; i < kEntityCnt; ++i) { + data[i] = std::to_string(field_val); + field_val++; + if (field_val >= opt_field_data_range) { + field_val = 0; + } + } + return data; +} + +template +auto +PrepareInsertData(const int64_t opt_field_data_range) -> std::string { + std::vector data = + PrepareRawFieldData(opt_field_data_range); + auto field_data = storage::CreateFieldData(DT, 1, kEntityCnt); field_data->FillFieldData(data.data(), kEntityCnt); storage::InsertData insert_data(field_data); insert_data.SetFieldDataMeta(kOptVecFieldDataMeta); @@ -253,16 +278,16 @@ PrepareInsertData() -> std::string { auto chunk_manager = storage::CreateChunkManager(get_default_local_storage_config()); - std::string path = kOptFieldPath + "0"; + std::string path = kOptFieldPath + std::to_string(kOptFieldId); boost::filesystem::remove_all(path); chunk_manager->Write(path, serialized_data.data(), serialized_data.size()); return path; } auto -PrepareInsertDataSpace() +PrepareInsertDataSpace(const int64_t opt_field_data_range) -> std::pair> { - std::string path = kOptFieldPath + "1"; + std::string path = kOptFieldPath + "space/" + std::to_string(kOptFieldId); arrow::FieldVector arrow_fields{ arrow::field("pk", arrow::int64()), arrow::field("ts", arrow::int64()), @@ -281,7 +306,7 @@ PrepareInsertDataSpace() milvus_storage::Options{schema}); EXPECT_TRUE(opt_space.has_value()); auto space = std::move(opt_space.value()); - const auto data = PrepareRawFieldData(); + const auto data = PrepareRawFieldData(opt_field_data_range); arrow::Int64Builder pk_builder; arrow::Int64Builder ts_builder; arrow::NumericBuilder scalar_builder; @@ -315,18 +340,21 @@ PrepareInsertDataSpace() return {path, std::move(space)}; } +template auto PrepareOptionalField(const std::shared_ptr& file_manager, const std::string& insert_file_path) -> OptFieldT { OptFieldT opt_field; std::vector insert_files; insert_files.emplace_back(insert_file_path); - opt_field[kOptFieldId] = {kOptFieldName, kOptFiledType, insert_files}; + opt_field[kOptFieldId] = {kOptFieldName, DT, insert_files}; return opt_field; } void -CheckOptFieldCorrectness(const std::string& local_file_path) { +CheckOptFieldCorrectness( + const std::string& local_file_path, + const int64_t opt_field_data_range = kOptFieldDataRange) { std::ifstream ifs(local_file_path); if (!ifs.is_open()) { FAIL() << "open file failed: " << local_file_path << std::endl; @@ -344,10 +372,10 @@ CheckOptFieldCorrectness(const std::string& local_file_path) { EXPECT_EQ(field_id, kOptFieldId); ifs.read(reinterpret_cast(&num_of_unique_field_data), sizeof(num_of_unique_field_data)); - EXPECT_EQ(num_of_unique_field_data, kOptFieldDataRange); + EXPECT_EQ(num_of_unique_field_data, opt_field_data_range); uint32_t expected_single_category_offset_cnt = - kEntityCnt / kOptFieldDataRange; + kEntityCnt / opt_field_data_range; uint32_t read_single_category_offset_cnt; std::vector single_category_offsets( expected_single_category_offset_cnt); @@ -364,54 +392,96 @@ CheckOptFieldCorrectness(const std::string& local_file_path) { first_offset = single_category_offsets[0]; } for (size_t j = 1; j < read_single_category_offset_cnt; ++j) { - ASSERT_EQ(single_category_offsets[j] % kOptFieldDataRange, - first_offset % kOptFieldDataRange); + ASSERT_EQ(single_category_offsets[j] % opt_field_data_range, + first_offset % opt_field_data_range); } } } +} // namespace TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskFieldEmpty) { auto file_manager = CreateFileManager(cm_); - const auto& [insert_file_space_path, space] = PrepareInsertDataSpace(); - OptFieldT opt_fields; - EXPECT_TRUE(file_manager->CacheOptFieldToDisk(opt_fields).empty()); - EXPECT_TRUE(file_manager->CacheOptFieldToDisk(space, opt_fields).empty()); -} + { + const auto& [insert_file_space_path, space] = + PrepareInsertDataSpace(kOptFieldDataRange); + OptFieldT opt_fields; + EXPECT_TRUE(file_manager->CacheOptFieldToDisk(opt_fields).empty()); + EXPECT_TRUE( + file_manager->CacheOptFieldToDisk(space, opt_fields).empty()); + } -TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskSpaceEmpty) { - auto file_manager = CreateFileManager(cm_); - auto opt_fileds = PrepareOptionalField(file_manager, ""); - auto res = file_manager->CacheOptFieldToDisk(nullptr, opt_fileds); - EXPECT_TRUE(res.empty()); + { + auto opt_fileds = + PrepareOptionalField(file_manager, ""); + auto res = file_manager->CacheOptFieldToDisk(nullptr, opt_fileds); + EXPECT_TRUE(res.empty()); + } } TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskOptFieldMoreThanOne) { auto file_manager = CreateFileManager(cm_); - const auto insert_file_path = PrepareInsertData(); - const auto& [insert_file_space_path, space] = PrepareInsertDataSpace(); - - OptFieldT opt_fields = PrepareOptionalField(file_manager, insert_file_path); + const auto insert_file_path = + PrepareInsertData(kOptFieldDataRange); + const auto& [insert_file_space_path, space] = + PrepareInsertDataSpace(kOptFieldDataRange); + OptFieldT opt_fields = + PrepareOptionalField(file_manager, insert_file_path); opt_fields[kOptFieldId + 1] = { - kOptFieldName + "second", kOptFiledType, {insert_file_space_path}}; + kOptFieldName + "second", DataType::INT64, {insert_file_space_path}}; EXPECT_THROW(file_manager->CacheOptFieldToDisk(opt_fields), SegcoreError); EXPECT_THROW(file_manager->CacheOptFieldToDisk(space, opt_fields), SegcoreError); } -TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskCorrect) { - auto file_manager = CreateFileManager(cm_); - const auto insert_file_path = PrepareInsertData(); - auto opt_fileds = PrepareOptionalField(file_manager, insert_file_path); - auto res = file_manager->CacheOptFieldToDisk(opt_fileds); - ASSERT_FALSE(res.empty()); - CheckOptFieldCorrectness(res); -} - TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskSpaceCorrect) { auto file_manager = CreateFileManager(cm_); - const auto& [insert_file_path, space] = PrepareInsertDataSpace(); - auto opt_fileds = PrepareOptionalField(file_manager, insert_file_path); + const auto& [insert_file_path, space] = + PrepareInsertDataSpace(kOptFieldDataRange); + auto opt_fileds = + PrepareOptionalField(file_manager, insert_file_path); auto res = file_manager->CacheOptFieldToDisk(space, opt_fileds); ASSERT_FALSE(res.empty()); CheckOptFieldCorrectness(res); } + +#define TEST_TYPE(NAME, TYPE, NATIVE_TYPE, RANGE) \ + TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskCorrect##NAME) { \ + auto file_manager = CreateFileManager(cm_); \ + auto insert_file_path = PrepareInsertData(RANGE); \ + auto opt_fields = \ + PrepareOptionalField(file_manager, insert_file_path); \ + auto res = file_manager->CacheOptFieldToDisk(opt_fields); \ + ASSERT_FALSE(res.empty()); \ + CheckOptFieldCorrectness(res, RANGE); \ + }; + +TEST_TYPE(INT8, DataType::INT8, int8_t, 100); +TEST_TYPE(INT16, DataType::INT16, int16_t, kOptFieldDataRange); +TEST_TYPE(INT32, DataType::INT32, int32_t, kOptFieldDataRange); +TEST_TYPE(INT64, DataType::INT64, int64_t, kOptFieldDataRange); +TEST_TYPE(FLOAT, DataType::FLOAT, float, kOptFieldDataRange); +TEST_TYPE(DOUBLE, DataType::DOUBLE, double, kOptFieldDataRange); +TEST_TYPE(STRING, DataType::STRING, std::string, 100); +TEST_TYPE(VARCHAR, DataType::VARCHAR, std::string, 100); + +#undef TEST_TYPE + +TEST_F(DiskAnnFileManagerTest, CacheOptFieldToDiskOnlyOneCategory) { + auto file_manager = CreateFileManager(cm_); + { + const auto insert_file_path = + PrepareInsertData(1); + auto opt_fileds = PrepareOptionalField( + file_manager, insert_file_path); + auto res = file_manager->CacheOptFieldToDisk(opt_fileds); + ASSERT_TRUE(res.empty()); + } + + { + const auto& [insert_file_path, space] = PrepareInsertDataSpace(1); + auto opt_fileds = PrepareOptionalField( + file_manager, insert_file_path); + auto res = file_manager->CacheOptFieldToDisk(space, opt_fileds); + ASSERT_TRUE(res.empty()); + } +} \ No newline at end of file diff --git a/internal/datacoord/task_index.go b/internal/datacoord/task_index.go index a702934bc4eb..9a7277c558c5 100644 --- a/internal/datacoord/task_index.go +++ b/internal/datacoord/task_index.go @@ -117,12 +117,14 @@ func (it *indexBuildTask) AssignTask(ctx context.Context, client types.IndexNode if partitionKeyField == nil || err != nil { log.Ctx(ctx).Warn("index builder get partition key field failed", zap.Int64("buildID", it.buildID), zap.Error(err)) } else { - optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{ - FieldID: partitionKeyField.FieldID, - FieldName: partitionKeyField.Name, - FieldType: int32(partitionKeyField.DataType), - DataIds: getBinLogIDs(segment, partitionKeyField.FieldID), - }) + if typeutil.IsFieldDataTypeSupportMaterializedView(partitionKeyField) { + optionalFields = append(optionalFields, &indexpb.OptionalFieldInfo{ + FieldID: partitionKeyField.FieldID, + FieldName: partitionKeyField.Name, + FieldType: int32(partitionKeyField.DataType), + DataIds: getBinLogIDs(segment, partitionKeyField.FieldID), + }) + } } } diff --git a/internal/datacoord/task_scheduler_test.go b/internal/datacoord/task_scheduler_test.go index dd072e1a363f..fef5fb8fe8cf 100644 --- a/internal/datacoord/task_scheduler_test.go +++ b/internal/datacoord/task_scheduler_test.go @@ -37,19 +37,21 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" ) var ( - collID = UniqueID(100) - partID = UniqueID(200) - indexID = UniqueID(300) - fieldID = UniqueID(400) - indexName = "_default_idx" - segID = UniqueID(500) - buildID = UniqueID(600) - nodeID = UniqueID(700) + collID = UniqueID(100) + partID = UniqueID(200) + indexID = UniqueID(300) + fieldID = UniqueID(400) + indexName = "_default_idx" + segID = UniqueID(500) + buildID = UniqueID(600) + nodeID = UniqueID(700) + partitionKeyID = UniqueID(800) ) func createIndexMeta(catalog metastore.DataCoordCatalog) *indexMeta { @@ -905,14 +907,16 @@ func (s *taskSchedulerSuite) Test_scheduler() { }, nil) s.Run("test scheduler with indexBuilderV1", func() { - paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("True") - defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("False") + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") + defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") s.scheduler(handler) }) s.Run("test scheduler with indexBuilderV2", func() { paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("true") defer paramtable.Get().CommonCfg.EnableStorageV2.SwapTempValue("false") + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") + defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") s.scheduler(handler) }) @@ -1421,3 +1425,345 @@ func (s *taskSchedulerSuite) Test_indexTaskFailCase() { func Test_taskSchedulerSuite(t *testing.T) { suite.Run(t, new(taskSchedulerSuite)) } + +func (s *taskSchedulerSuite) Test_indexTaskWithMvOptionalScalarField() { + ctx := context.Background() + catalog := catalogmocks.NewDataCoordCatalog(s.T()) + catalog.EXPECT().AlterSegmentIndexes(mock.Anything, mock.Anything).Return(nil) + in := mocks.NewMockIndexNodeClient(s.T()) + + workerManager := NewMockWorkerManager(s.T()) + workerManager.EXPECT().PickClient().Return(s.nodeID, in) + workerManager.EXPECT().GetClientByID(mock.Anything).Return(in, true) + + minNumberOfRowsToBuild := paramtable.Get().DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64() + 1 + fieldsSchema := []*schemapb.FieldSchema{ + { + FieldID: fieldID, + Name: "vec", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.MetricTypeKey, + Value: "L2", + }, + { + Key: common.IndexTypeKey, + Value: indexparamcheck.IndexHNSW, + }, + }, + }, + { + FieldID: partitionKeyID, + Name: "scalar", + DataType: schemapb.DataType_VarChar, + IsPartitionKey: true, + }, + } + mt := meta{ + catalog: catalog, + collections: map[int64]*collectionInfo{ + collID: { + ID: collID, + Schema: &schemapb.CollectionSchema{ + Fields: fieldsSchema, + }, + CreatedAt: 0, + }, + }, + + analyzeMeta: &analyzeMeta{ + ctx: context.Background(), + catalog: catalog, + }, + + indexMeta: &indexMeta{ + catalog: catalog, + indexes: map[UniqueID]map[UniqueID]*model.Index{ + collID: { + indexID: { + TenantID: "", + CollectionID: collID, + FieldID: fieldID, + IndexID: indexID, + IndexName: indexName, + IsDeleted: false, + CreateTime: 1, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.MetricTypeKey, + Value: "L2", + }, + { + Key: common.IndexTypeKey, + Value: indexparamcheck.IndexHNSW, + }, + }, + }, + }, + }, + segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ + segID: { + indexID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: minNumberOfRowsToBuild, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + }, + }, + }, + buildID2SegmentIndex: map[UniqueID]*model.SegmentIndex{ + buildID: { + SegmentID: segID, + CollectionID: collID, + PartitionID: partID, + NumRows: minNumberOfRowsToBuild, + IndexID: indexID, + BuildID: buildID, + NodeID: 0, + IndexVersion: 0, + IndexState: commonpb.IndexState_Unissued, + FailReason: "", + IsDeleted: false, + CreateTime: 0, + IndexFileKeys: nil, + IndexSize: 0, + }, + }, + }, + segments: &SegmentsInfo{ + segments: map[UniqueID]*SegmentInfo{ + segID: { + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: minNumberOfRowsToBuild, + State: commonpb.SegmentState_Flushed, + MaxRowNum: 65536, + LastExpireTime: 10, + }, + }, + }, + }, + } + + cm := mocks.NewChunkManager(s.T()) + cm.EXPECT().RootPath().Return("ut-index") + + handler := NewNMockHandler(s.T()) + handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{ + ID: collID, + Schema: &schemapb.CollectionSchema{ + Name: "coll", + Fields: fieldsSchema, + EnableDynamicField: false, + Properties: nil, + }, + }, nil) + + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") + defer paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") + scheduler := newTaskScheduler(ctx, &mt, workerManager, cm, newIndexEngineVersionManager(), handler) + + waitTaskDoneFunc := func(sche *taskScheduler) { + for { + fmt.Println("wait for read lock") + sche.RLock() + fmt.Println("after read lock") + taskNum := len(sche.tasks) + fmt.Println("taskNum: ", taskNum) + sche.RUnlock() + + if taskNum == 0 { + break + } + time.Sleep(time.Second) + } + } + + resetMetaFunc := func() { + mt.indexMeta.buildID2SegmentIndex[buildID].IndexState = commonpb.IndexState_Unissued + mt.indexMeta.segmentIndexes[segID][indexID].IndexState = commonpb.IndexState_Unissued + mt.indexMeta.indexes[collID][indexID].IndexParams[1].Value = indexparamcheck.IndexHNSW + mt.collections[collID].Schema.Fields[1].IsPartitionKey = true + mt.collections[collID].Schema.Fields[1].DataType = schemapb.DataType_VarChar + } + + in.EXPECT().QueryJobsV2(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, request *indexpb.QueryJobsV2Request, option ...grpc.CallOption) (*indexpb.QueryJobsV2Response, error) { + switch request.GetJobType() { + case indexpb.JobType_JobTypeIndexJob: + results := make([]*indexpb.IndexTaskInfo, 0) + for _, buildID := range request.GetTaskIDs() { + results = append(results, &indexpb.IndexTaskInfo{ + BuildID: buildID, + State: commonpb.IndexState_Finished, + IndexFileKeys: []string{"file1", "file2"}, + SerializedSize: 1024, + FailReason: "", + CurrentIndexVersion: 0, + IndexStoreVersion: 0, + }) + } + return &indexpb.QueryJobsV2Response{ + Status: merr.Success(), + ClusterID: request.GetClusterID(), + Result: &indexpb.QueryJobsV2Response_IndexJobResults{ + IndexJobResults: &indexpb.IndexJobResults{ + Results: results, + }, + }, + }, nil + default: + return &indexpb.QueryJobsV2Response{ + Status: merr.Status(errors.New("unknown job type")), + }, nil + } + }) + in.EXPECT().DropJobsV2(mock.Anything, mock.Anything).Return(merr.Success(), nil) + + s.Run("success to get opt field on startup", func() { + in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { + s.NotZero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set") + return merr.Success(), nil + }).Once() + s.Equal(1, len(scheduler.tasks)) + s.Equal(indexpb.JobState_JobStateInit, scheduler.tasks[buildID].GetState()) + + scheduler.Start() + waitTaskDoneFunc(scheduler) + resetMetaFunc() + }) + + s.Run("enqueue valid", func() { + for _, dataType := range []schemapb.DataType{ + schemapb.DataType_Int8, + schemapb.DataType_Int16, + schemapb.DataType_Int32, + schemapb.DataType_Int64, + schemapb.DataType_VarChar, + schemapb.DataType_String, + } { + mt.collections[collID].Schema.Fields[1].DataType = dataType + in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { + s.NotZero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set") + return merr.Success(), nil + }).Once() + t := &indexBuildTask{ + buildID: buildID, + nodeID: nodeID, + taskInfo: &indexpb.IndexTaskInfo{ + BuildID: buildID, + State: commonpb.IndexState_Unissued, + FailReason: "", + // CurrentIndexVersion: 0, + // IndexStoreVersion: 0, + }, + } + scheduler.enqueue(t) + waitTaskDoneFunc(scheduler) + resetMetaFunc() + } + }) + + // should still be able to build vec index when opt field is not set + s.Run("enqueue returns empty optional field when cfg disable", func() { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("false") + in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { + s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set") + return merr.Success(), nil + }).Once() + t := &indexBuildTask{ + buildID: buildID, + nodeID: nodeID, + taskInfo: &indexpb.IndexTaskInfo{ + BuildID: buildID, + State: commonpb.IndexState_Unissued, + FailReason: "", + }, + } + scheduler.enqueue(t) + waitTaskDoneFunc(scheduler) + resetMetaFunc() + }) + + s.Run("enqueue returns empty optional field when the data type is not STRING or VARCHAR or Integer", func() { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") + for _, dataType := range []schemapb.DataType{ + schemapb.DataType_Bool, + schemapb.DataType_Float, + schemapb.DataType_Double, + schemapb.DataType_Array, + schemapb.DataType_JSON, + } { + mt.collections[collID].Schema.Fields[1].DataType = dataType + in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { + s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set") + return merr.Success(), nil + }).Once() + t := &indexBuildTask{ + buildID: buildID, + nodeID: nodeID, + taskInfo: &indexpb.IndexTaskInfo{ + BuildID: buildID, + State: commonpb.IndexState_Unissued, + FailReason: "", + }, + } + scheduler.enqueue(t) + waitTaskDoneFunc(scheduler) + resetMetaFunc() + } + }) + + s.Run("enqueue returns empty optional field when no partition key", func() { + paramtable.Get().CommonCfg.EnableMaterializedView.SwapTempValue("true") + mt.collections[collID].Schema.Fields[1].IsPartitionKey = false + in.EXPECT().CreateJobV2(mock.Anything, mock.Anything).RunAndReturn( + func(ctx context.Context, in *indexpb.CreateJobV2Request, opts ...grpc.CallOption) (*commonpb.Status, error) { + s.Zero(len(in.GetIndexRequest().OptionalScalarFields), "optional scalar field should be set") + return merr.Success(), nil + }).Once() + t := &indexBuildTask{ + buildID: buildID, + nodeID: nodeID, + taskInfo: &indexpb.IndexTaskInfo{ + BuildID: buildID, + State: commonpb.IndexState_Unissued, + FailReason: "", + }, + } + scheduler.enqueue(t) + waitTaskDoneFunc(scheduler) + resetMetaFunc() + }) + scheduler.Stop() +} diff --git a/internal/proxy/task_search_test.go b/internal/proxy/task_search_test.go index 286873df4af4..ffb95290f0f2 100644 --- a/internal/proxy/task_search_test.go +++ b/internal/proxy/task_search_test.go @@ -2692,7 +2692,7 @@ func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnInt64() { err := task.PreExecute(s.ctx) s.NoError(err) s.NotZero(len(task.queryInfos)) - s.Equal(false, task.queryInfos[0].MaterializedViewInvolved) + s.Equal(true, task.queryInfos[0].MaterializedViewInvolved) } func (s *MaterializedViewTestSuite) TestMvEnabledPartitionKeyOnVarChar() { diff --git a/pkg/util/typeutil/schema.go b/pkg/util/typeutil/schema.go index e55b643379e0..9864972a2c9f 100644 --- a/pkg/util/typeutil/schema.go +++ b/pkg/util/typeutil/schema.go @@ -1097,7 +1097,7 @@ func HasPartitionKey(schema *schemapb.CollectionSchema) bool { } func IsFieldDataTypeSupportMaterializedView(fieldSchema *schemapb.FieldSchema) bool { - return fieldSchema.DataType == schemapb.DataType_VarChar || fieldSchema.DataType == schemapb.DataType_String + return IsIntegerType(fieldSchema.DataType) || IsStringType(fieldSchema.DataType) } // HasClusterKey check if a collection schema has ClusterKey field