diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index bda4ca16a9edd..fc99ea9d5dc61 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -43,7 +43,7 @@ namespace milvus { /* -* If string field's value all empty, need a string padding to avoid +* If string field's value all empty, need a string padding to avoid * mmap failing because size_ is zero which causing invalid arguement * array has the same problem * TODO: remove it when support NULL value @@ -84,6 +84,7 @@ class ColumnBase { } // mmap mode ctor + // User must call Seal to build the view for variable length column. ColumnBase(const File& file, size_t size, const FieldMeta& field_meta) : type_size_(IsSparseFloatVectorDataType(field_meta.get_data_type()) ? 1 @@ -106,12 +107,16 @@ class ColumnBase { } // mmap mode ctor + // User must call Seal to build the view for variable length column. ColumnBase(const File& file, size_t size, int dim, const DataType& data_type) - : type_size_(GetDataTypeSize(data_type, dim)), - num_rows_(size / GetDataTypeSize(data_type, dim)), + : type_size_(IsSparseFloatVectorDataType(data_type) + ? 1 + : GetDataTypeSize(data_type, dim)), + num_rows_( + IsSparseFloatVectorDataType(data_type) ? 1 : (size / type_size_)), size_(size), cap_size_(size), is_map_anonymous_(false) { @@ -123,6 +128,7 @@ class ColumnBase { AssertInfo(data_ != MAP_FAILED, "failed to create file-backed map, err: {}", strerror(errno)); + madvise(data_, mapped_size, MADV_WILLNEED); UpdateMetricWhenMmap(mapped_size); } @@ -153,11 +159,18 @@ class ColumnBase { column.size_ = 0; } + // Data() points at an addr that contains the elements virtual const char* Data() const { return data_; } + // MmappedData() returns the mmaped address + const char* + MmappedData() const { + return data_; + } + size_t NumRows() const { return num_rows_; @@ -273,6 +286,7 @@ class ColumnBase { // capacity in bytes size_t cap_size_{0}; size_t padding_{0}; + // type_size_ is not used for sparse float vector column. const size_t type_size_{1}; size_t num_rows_{0}; @@ -344,8 +358,7 @@ class Column : public ColumnBase { } }; -// mmap not yet supported, thus SparseFloatColumn is not using fields in super -// class such as ColumnBase::data. +// when mmap is used, size_, data_ and num_rows_ of ColumnBase are used. class SparseFloatColumn : public ColumnBase { public: // memory mode ctor @@ -356,7 +369,13 @@ class SparseFloatColumn : public ColumnBase { size_t size, const FieldMeta& field_meta) : ColumnBase(file, size, field_meta) { - AssertInfo(false, "SparseFloatColumn mmap mode not supported"); + } + // mmap mode ctor + SparseFloatColumn(const File& file, + size_t size, + int dim, + const DataType& data_type) + : ColumnBase(file, size, dim, data_type) { } SparseFloatColumn(SparseFloatColumn&& column) noexcept @@ -372,14 +391,6 @@ class SparseFloatColumn : public ColumnBase { return static_cast(static_cast(vec_.data())); } - // This is used to advice mmap prefetch, we don't currently support mmap for - // sparse float vector thus not implemented for now. - size_t - ByteSize() const override { - throw std::runtime_error( - "ByteSize not supported for sparse float column"); - } - size_t Capacity() const override { throw std::runtime_error( @@ -413,6 +424,34 @@ class SparseFloatColumn : public ColumnBase { return dim_; } + void + Seal(std::vector indices) { + AssertInfo(!indices.empty(), + "indices should not be empty, Seal() of " + "SparseFloatColumn must be called only " + "at mmap mode"); + AssertInfo(data_, + "data_ should not be nullptr, Seal() of " + "SparseFloatColumn must be called only " + "at mmap mode"); + num_rows_ = indices.size(); + // so that indices[num_rows_] - indices[num_rows_ - 1] is the size of + // the last row. + indices.push_back(size_); + for (size_t i = 0; i < num_rows_; i++) { + auto vec_size = indices[i + 1] - indices[i]; + AssertInfo( + vec_size % knowhere::sparse::SparseRow::element_size() == + 0, + "Incorrect sparse vector size: {}", + vec_size); + vec_.emplace_back( + vec_size / knowhere::sparse::SparseRow::element_size(), + (uint8_t*)(data_) + indices[i], + false); + } + } + private: int64_t dim_ = 0; std::vector> vec_; diff --git a/internal/core/src/mmap/Utils.h b/internal/core/src/mmap/Utils.h index db06c1f0d0cf1..d56d252c81c69 100644 --- a/internal/core/src/mmap/Utils.h +++ b/internal/core/src/mmap/Utils.h @@ -81,10 +81,18 @@ WriteFieldData(File& file, break; } case DataType::VECTOR_SPARSE_FLOAT: { - // TODO(SPARSE): this is for mmap to write data to disk so that - // the file can be mmaped into memory. - throw std::runtime_error( - "WriteFieldData for VECTOR_SPARSE_FLOAT not implemented"); + for (size_t i = 0; i < data->get_num_rows(); ++i) { + auto vec = + static_cast*>( + data->RawValue(i)); + ssize_t written = + file.Write(vec->data(), vec->data_byte_size()); + if (written < vec->data_byte_size()) { + break; + } + total_written += written; + } + break; } default: PanicInfo(DataTypeInvalid, diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 36e7a6aebb184..0f27bc9d86fe7 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -585,7 +585,13 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) { column = std::move(arr_column); break; } - // TODO(SPARSE) support mmap + case milvus::DataType::VECTOR_SPARSE_FLOAT: { + auto sparse_column = std::make_shared( + file, total_written, field_meta); + sparse_column->Seal(std::move(indices)); + column = std::move(sparse_column); + break; + } default: { PanicInfo(DataTypeInvalid, fmt::format("unsupported data type {}", data_type)); @@ -850,7 +856,7 @@ SegmentSealedImpl::get_vector(FieldId field_id, auto metric_type = vec_index->GetMetricType(); auto has_raw_data = vec_index->HasRawData(); - if (has_raw_data) { + if (has_raw_data && !TEST_skip_index_for_retrieve_) { // If index has raw data, get vector from memory. auto ids_ds = GenIdsDataset(count, ids); if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) { @@ -865,9 +871,6 @@ SegmentSealedImpl::get_vector(FieldId field_id, } } - AssertInfo(field_meta.get_data_type() != DataType::VECTOR_SPARSE_FLOAT, - "index of sparse float vector is guaranteed to have raw data"); - // If index doesn't have raw data, get vector from chunk cache. auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache(); @@ -898,23 +901,50 @@ SegmentSealedImpl::get_vector(FieldId field_id, path_to_column[data_path] = column; } - // assign to data array - auto row_bytes = field_meta.get_sizeof(); - auto buf = std::vector(count * row_bytes); - for (auto i = 0; i < count; i++) { - AssertInfo(id_to_data_path.count(ids[i]) != 0, "id not found"); - const auto& [data_path, offset_in_binlog] = id_to_data_path.at(ids[i]); - AssertInfo(path_to_column.count(data_path) != 0, "column not found"); - const auto& column = path_to_column.at(data_path); - AssertInfo(offset_in_binlog * row_bytes < column->ByteSize(), - "column idx out of range, idx: {}, size: {}, data_path: {}", - offset_in_binlog * row_bytes, - column->ByteSize(), - data_path); - auto vector = &column->Data()[offset_in_binlog * row_bytes]; - std::memcpy(buf.data() + i * row_bytes, vector, row_bytes); + if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) { + auto buf = std::vector>(count); + for (auto i = 0; i < count; ++i) { + const auto& [data_path, offset_in_binlog] = + id_to_data_path.at(ids[i]); + const auto& column = path_to_column.at(data_path); + AssertInfo( + offset_in_binlog < column->NumRows(), + "column idx out of range, idx: {}, size: {}, data_path: {}", + offset_in_binlog, + column->NumRows(), + data_path); + auto sparse_column = + std::dynamic_pointer_cast(column); + AssertInfo(sparse_column, "incorrect column created"); + buf[i] = static_cast*>( + static_cast( + sparse_column->Data()))[offset_in_binlog]; + } + return segcore::CreateVectorDataArrayFrom( + buf.data(), count, field_meta); + } else { + // assign to data array + auto row_bytes = field_meta.get_sizeof(); + auto buf = std::vector(count * row_bytes); + for (auto i = 0; i < count; ++i) { + AssertInfo(id_to_data_path.count(ids[i]) != 0, "id not found"); + const auto& [data_path, offset_in_binlog] = + id_to_data_path.at(ids[i]); + AssertInfo(path_to_column.count(data_path) != 0, + "column not found"); + const auto& column = path_to_column.at(data_path); + AssertInfo( + offset_in_binlog * row_bytes < column->ByteSize(), + "column idx out of range, idx: {}, size: {}, data_path: {}", + offset_in_binlog * row_bytes, + column->ByteSize(), + data_path); + auto vector = &column->Data()[offset_in_binlog * row_bytes]; + std::memcpy(buf.data() + i * row_bytes, vector, row_bytes); + } + return segcore::CreateVectorDataArrayFrom( + buf.data(), count, field_meta); } - return segcore::CreateVectorDataArrayFrom(buf.data(), count, field_meta); } void @@ -997,7 +1027,8 @@ SegmentSealedImpl::check_search(const query::Plan* plan) const { SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema, IndexMetaPtr index_meta, const SegcoreConfig& segcore_config, - int64_t segment_id) + int64_t segment_id, + bool TEST_skip_index_for_retrieve) : segcore_config_(segcore_config), field_data_ready_bitset_(schema->size()), index_ready_bitset_(schema->size()), @@ -1006,7 +1037,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema, insert_record_(*schema, MAX_ROW_COUNT), schema_(schema), id_(segment_id), - col_index_meta_(index_meta) { + col_index_meta_(index_meta), + TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) { } SegmentSealedImpl::~SegmentSealedImpl() { diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 21306616e810e..006e457dda934 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -42,7 +42,8 @@ class SegmentSealedImpl : public SegmentSealed { explicit SegmentSealedImpl(SchemaPtr schema, IndexMetaPtr index_meta, const SegcoreConfig& segcore_config, - int64_t segment_id); + int64_t segment_id, + bool TEST_skip_index_for_retrieve = false); ~SegmentSealedImpl() override; void LoadIndex(const LoadIndexInfo& info) override; @@ -312,6 +313,10 @@ class SegmentSealedImpl : public SegmentSealed { vec_binlog_config_; SegmentStats stats_{}; + + // for sparse vector unit test only! Once a type of sparse index that + // doesn't has raw data is added, this should be removed. + bool TEST_skip_index_for_retrieve_ = false; }; inline SegmentSealedUPtr @@ -319,9 +324,13 @@ CreateSealedSegment( SchemaPtr schema, IndexMetaPtr index_meta = nullptr, int64_t segment_id = -1, - const SegcoreConfig& segcore_config = SegcoreConfig::default_config()) { - return std::make_unique( - schema, index_meta, segcore_config, segment_id); + const SegcoreConfig& segcore_config = SegcoreConfig::default_config(), + bool TEST_skip_index_for_retrieve = false) { + return std::make_unique(schema, + index_meta, + segcore_config, + segment_id, + TEST_skip_index_for_retrieve); } } // namespace milvus::segcore diff --git a/internal/core/src/storage/ChunkCache.cpp b/internal/core/src/storage/ChunkCache.cpp index 4f9c80418f007..4fe8592a7233a 100644 --- a/internal/core/src/storage/ChunkCache.cpp +++ b/internal/core/src/storage/ChunkCache.cpp @@ -64,10 +64,10 @@ ChunkCache::Prefetch(const std::string& filepath) { } auto column = it->second; - auto ok = - madvise(reinterpret_cast(const_cast(column->Data())), - column->ByteSize(), - read_ahead_policy_); + auto ok = madvise( + reinterpret_cast(const_cast(column->MmappedData())), + column->ByteSize(), + read_ahead_policy_); AssertInfo(ok == 0, "failed to madvise to the data file {}, err: {}", path, @@ -100,7 +100,18 @@ ChunkCache::Mmap(const std::filesystem::path& path, std::shared_ptr column{}; - if (IsVariableDataType(data_type)) { + if (IsSparseFloatVectorDataType(data_type)) { + std::vector indices{}; + uint64_t offset = 0; + for (auto i = 0; i < field_data->get_num_rows(); ++i) { + indices.push_back(offset); + offset += field_data->Size(i); + } + auto sparse_column = std::make_shared( + file, data_size, dim, data_type); + sparse_column->Seal(std::move(indices)); + column = std::move(sparse_column); + } else if (IsVariableDataType(data_type)) { AssertInfo( false, "TODO: unimplemented for variable data type: {}", data_type); } else { diff --git a/internal/core/src/storage/Util.cpp b/internal/core/src/storage/Util.cpp index 0e714f0a97362..2a3c4625f936b 100644 --- a/internal/core/src/storage/Util.cpp +++ b/internal/core/src/storage/Util.cpp @@ -533,13 +533,16 @@ EncodeAndUploadIndexSlice2(std::shared_ptr space, std::pair EncodeAndUploadFieldSlice(ChunkManager* chunk_manager, - uint8_t* buf, + void* buf, int64_t element_count, FieldDataMeta field_data_meta, const FieldMeta& field_meta, std::string object_key) { - auto field_data = - CreateFieldData(field_meta.get_data_type(), field_meta.get_dim(), 0); + // dim should not be used for sparse float vector field + auto dim = IsSparseFloatVectorDataType(field_meta.get_data_type()) + ? -1 + : field_meta.get_dim(); + auto field_data = CreateFieldData(field_meta.get_data_type(), dim, 0); field_data->FillFieldData(buf, element_count); auto insertData = std::make_shared(field_data); insertData->SetFieldDataMeta(field_data_meta); diff --git a/internal/core/src/storage/Util.h b/internal/core/src/storage/Util.h index acb6d233c0e00..b13d03fa42aad 100644 --- a/internal/core/src/storage/Util.h +++ b/internal/core/src/storage/Util.h @@ -110,7 +110,7 @@ EncodeAndUploadIndexSlice2(std::shared_ptr space, std::string object_key); std::pair EncodeAndUploadFieldSlice(ChunkManager* chunk_manager, - uint8_t* buf, + void* buf, int64_t element_count, FieldDataMeta field_data_meta, const FieldMeta& field_meta, diff --git a/internal/core/unittest/test_chunk_cache.cpp b/internal/core/unittest/test_chunk_cache.cpp index a255fffda3d4b..aa1e65132b4e9 100644 --- a/internal/core/unittest/test_chunk_cache.cpp +++ b/internal/core/unittest/test_chunk_cache.cpp @@ -59,7 +59,7 @@ TEST(ChunkCacheTest, Read) { auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance() .GetChunkManager(); auto data = dataset.get_col(fake_id); - auto data_slices = std::vector{(uint8_t*)data.data()}; + auto data_slices = std::vector{data.data()}; auto slice_sizes = std::vector{static_cast(N)}; auto slice_names = std::vector{file_name}; PutFieldData(lcm.get(), @@ -121,7 +121,7 @@ TEST(ChunkCacheTest, TestMultithreads) { auto lcm = milvus::storage::LocalChunkManagerSingleton::GetInstance() .GetChunkManager(); auto data = dataset.get_col(fake_id); - auto data_slices = std::vector{(uint8_t*)data.data()}; + auto data_slices = std::vector{data.data()}; auto slice_sizes = std::vector{static_cast(N)}; auto slice_names = std::vector{file_name}; PutFieldData(lcm.get(), diff --git a/internal/core/unittest/test_growing.cpp b/internal/core/unittest/test_growing.cpp index f5421384e02fb..e830c935aae19 100644 --- a/internal/core/unittest/test_growing.cpp +++ b/internal/core/unittest/test_growing.cpp @@ -98,13 +98,15 @@ TEST(Growing, RealCount) { } class GrowingTest - : public ::testing::TestWithParam< - std::tuple> { + : public ::testing::TestWithParam> { public: void SetUp() override { - auto index_type = std::get<0>(GetParam()); - auto metric_type = std::get<1>(GetParam()); + index_type = std::get<0>(GetParam()); + metric_type = std::get<1>(GetParam()); + use_interim_index = std::get<2>(GetParam()); if (index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT || index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC) { data_type = DataType::VECTOR_FLOAT; @@ -119,6 +121,7 @@ class GrowingTest knowhere::MetricType metric_type; std::string index_type; DataType data_type; + bool use_interim_index; }; INSTANTIATE_TEST_SUITE_P( @@ -129,7 +132,8 @@ INSTANTIATE_TEST_SUITE_P( knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC), ::testing::Values(knowhere::metric::L2, knowhere::metric::IP, - knowhere::metric::COSINE))); + knowhere::metric::COSINE), + ::testing::Values(true, false))); INSTANTIATE_TEST_SUITE_P( SparseFloatGrowingTest, @@ -137,7 +141,8 @@ INSTANTIATE_TEST_SUITE_P( ::testing::Combine( ::testing::Values(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, knowhere::IndexEnum::INDEX_SPARSE_WAND), - ::testing::Values(knowhere::metric::IP))); + ::testing::Values(knowhere::metric::IP), + ::testing::Values(true, false))); TEST_P(GrowingTest, FillData) { auto schema = std::make_shared(); @@ -174,18 +179,34 @@ TEST_P(GrowingTest, FillData) { vec, std::move(index_params), std::move(type_params)); auto config = SegcoreConfig::default_config(); config.set_chunk_rows(1024); - config.set_enable_interim_segment_index(true); - std::map filedMap = {{vec, fieldIndexMeta}}; + config.set_enable_interim_segment_index(use_interim_index); + std::map fieldMap = {{vec, fieldIndexMeta}}; IndexMetaPtr metaPtr = - std::make_shared(100000, std::move(filedMap)); + std::make_shared(100000, std::move(fieldMap)); auto segment_growing = CreateGrowingSegment(schema, metaPtr, 1, config); auto segment = dynamic_cast(segment_growing.get()); int64_t per_batch = 1000; int64_t n_batch = 3; int64_t dim = 128; - for (int64_t i = 0; i < n_batch; i++) { + + std::vector generated_datas(n_batch); + auto add_batch = [&](int64_t i) { auto dataset = DataGen(schema, per_batch); + generated_datas[i] = std::move(dataset); + }; + auto sparse_get_ith_row = [&](int x) { + auto ds_idx = x / per_batch; + auto ds_offset = x % per_batch; + auto& dataset = generated_datas[ds_idx]; + auto row = + dataset.get_col>(vec)[ds_offset]; + return row; + }; + + for (int64_t i = 0; i < n_batch; i++) { + add_batch(i); + auto& dataset = generated_datas[i]; auto offset = segment->PreInsert(per_batch); segment->Insert(offset, @@ -248,6 +269,28 @@ TEST_P(GrowingTest, FillData) { EXPECT_EQ( vec_result->vectors().sparse_float_vector().contents_size(), num_inserted); + // The following verification is disabled because the running time + // is too long. + // + // auto original = + // dataset.get_col>(vec); + // for (size_t j = 0; j < num_inserted; ++j) { + // auto actual_row = + // vec_result->vectors().sparse_float_vector().contents()[j]; + // auto id = ids_ds->GetIds()[j]; + // auto expected_row = sparse_get_ith_row(id); + // // actual_row.size() is number of bytes, while + // // expected_row.size() is number of elements + // EXPECT_EQ( + // actual_row.size() / + // knowhere::sparse::SparseRow::element_size(), + // expected_row.size()); + // // check bytes of actual_row and expected_row.data() equal + // EXPECT_EQ(memcmp(actual_row.data(), + // expected_row.data(), + // actual_row.size()), + // 0); + // } } else { ASSERT_TRUE(false); } diff --git a/internal/core/unittest/test_growing_index.cpp b/internal/core/unittest/test_growing_index.cpp index 7d619182b650d..aa95d57b41e6d 100644 --- a/internal/core/unittest/test_growing_index.cpp +++ b/internal/core/unittest/test_growing_index.cpp @@ -24,7 +24,9 @@ using namespace milvus; using namespace milvus::segcore; namespace pb = milvus::proto; -using Param = std::tuple; +using Param = std::tuple; class GrowingIndexTest : public ::testing::TestWithParam { void @@ -32,6 +34,7 @@ class GrowingIndexTest : public ::testing::TestWithParam { auto param = GetParam(); index_type = std::get<0>(param); metric_type = std::get<1>(param); + use_interim_index = std::get<2>(param); if (index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT || index_type == knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC) { data_type = DataType::VECTOR_FLOAT; @@ -49,9 +52,12 @@ class GrowingIndexTest : public ::testing::TestWithParam { std::string index_type; knowhere::MetricType metric_type; DataType data_type; + bool use_interim_index; bool is_sparse = false; }; +// Always testing using interim index. This test suite is too slow if +// use_interim_index is false. INSTANTIATE_TEST_SUITE_P( FloatIndexTypeParameters, GrowingIndexTest, @@ -60,7 +66,8 @@ INSTANTIATE_TEST_SUITE_P( knowhere::IndexEnum::INDEX_FAISS_IVFFLAT_CC), ::testing::Values(knowhere::metric::L2, knowhere::metric::COSINE, - knowhere::metric::IP))); + knowhere::metric::IP), + ::testing::Values(true))); INSTANTIATE_TEST_SUITE_P( SparseIndexTypeParameters, @@ -68,7 +75,8 @@ INSTANTIATE_TEST_SUITE_P( ::testing::Combine( ::testing::Values(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, knowhere::IndexEnum::INDEX_SPARSE_WAND), - ::testing::Values(knowhere::metric::IP))); + ::testing::Values(knowhere::metric::IP), + ::testing::Values(true))); TEST_P(GrowingIndexTest, Correctness) { auto schema = std::make_shared(); @@ -86,7 +94,7 @@ TEST_P(GrowingIndexTest, Correctness) { vec, std::move(index_params), std::move(type_params)); auto& config = SegcoreConfig::default_config(); config.set_chunk_rows(1024); - config.set_enable_interim_segment_index(true); + config.set_enable_interim_segment_index(use_interim_index); std::map filedMap = {{vec, fieldIndexMeta}}; IndexMetaPtr metaPtr = std::make_shared(226985, std::move(filedMap)); @@ -159,10 +167,11 @@ TEST_P(GrowingIndexTest, Correctness) { auto inserted = (i + 1) * per_batch; // once index built, chunk data will be removed. // growing index will only be built when num rows reached - // get_build_threshold(). This value for sparse is 0, thus sparse index + // get_build_threshold() and use_interim_index is true. + // This value for sparse is 0, thus sparse index // will be built since the first chunk. Dense segment buffers the first // 2 chunks before building an index in this test case. - if (!is_sparse && i < 2) { + if ((!is_sparse && i < 2) || !use_interim_index) { EXPECT_EQ(field_data->num_chunk(), upper_div(inserted, field_data->get_size_per_chunk())); } else { @@ -218,7 +227,7 @@ TEST_P(GrowingIndexTest, MissIndexMeta) { auto& config = SegcoreConfig::default_config(); config.set_chunk_rows(1024); - config.set_enable_interim_segment_index(true); + config.set_enable_interim_segment_index(use_interim_index); auto segment = CreateGrowingSegment(schema, nullptr); } @@ -238,7 +247,7 @@ TEST_P(GrowingIndexTest, GetVector) { vec, std::move(index_params), std::move(type_params)); auto& config = SegcoreConfig::default_config(); config.set_chunk_rows(1024); - config.set_enable_interim_segment_index(true); + config.set_enable_interim_segment_index(use_interim_index); std::map filedMap = {{vec, fieldIndexMeta}}; IndexMetaPtr metaPtr = std::make_shared(100000, std::move(filedMap)); diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 1fed5034ce518..0edcb4c9cd6cd 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -1413,7 +1413,7 @@ TEST(Sealed, GetVectorFromChunkCache) { auto rcm = milvus::storage::RemoteChunkManagerSingleton::GetInstance() .GetRemoteChunkManager(); auto data = dataset.get_col(fakevec_id); - auto data_slices = std::vector{(uint8_t*)data.data()}; + auto data_slices = std::vector{data.data()}; auto slice_sizes = std::vector{static_cast(N)}; auto slice_names = std::vector{file_name}; PutFieldData(rcm.get(), @@ -1423,9 +1423,8 @@ TEST(Sealed, GetVectorFromChunkCache) { field_data_meta, field_meta); - auto fakevec = dataset.get_col(fakevec_id); auto conf = generate_build_conf(index_type, metric_type); - auto ds = knowhere::GenDataSet(N, dim, fakevec.data()); + auto ds = knowhere::GenDataSet(N, dim, data.data()); auto indexing = std::make_unique>( index_type, metric_type, @@ -1460,11 +1459,11 @@ TEST(Sealed, GetVectorFromChunkCache) { segment->get_vector(fakevec_id, ids_ds->GetIds(), ids_ds->GetRows()); auto vector = result.get()->mutable_vectors()->float_vector().data(); - EXPECT_TRUE(vector.size() == fakevec.size()); + EXPECT_TRUE(vector.size() == data.size()); for (size_t i = 0; i < N; ++i) { auto id = ids_ds->GetIds()[i]; for (size_t j = 0; j < dim; ++j) { - auto expect = fakevec[id * dim + j]; + auto expect = data[id * dim + j]; auto actual = vector[i * dim + j]; AssertInfo(expect == actual, fmt::format("expect {}, actual {}", expect, actual)); @@ -1479,6 +1478,120 @@ TEST(Sealed, GetVectorFromChunkCache) { Assert(!exist); } +TEST(Sealed, GetSparseVectorFromChunkCache) { + // skip test due to mem leak from AWS::InitSDK + return; + + auto dim = 16; + auto topK = 5; + auto N = ROW_COUNT; + auto metric_type = knowhere::metric::IP; + // TODO: remove SegmentSealedImpl::TEST_skip_index_for_retrieve_ after + // we have a type of sparse index that doesn't include raw data. + auto index_type = knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX; + + auto mmap_dir = "/tmp/mmap"; + auto file_name = std::string( + "sealed_test_get_vector_from_chunk_cache/insert_log/1/101/1000000"); + + auto sc = milvus::storage::StorageConfig{}; + milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(sc); + auto mcm = std::make_unique(sc); + milvus::storage::ChunkCacheSingleton::GetInstance().Init(mmap_dir, + "willneed"); + + auto schema = std::make_shared(); + auto fakevec_id = schema->AddDebugField( + "fakevec", DataType::VECTOR_SPARSE_FLOAT, dim, metric_type); + auto counter_id = schema->AddDebugField("counter", DataType::INT64); + auto double_id = schema->AddDebugField("double", DataType::DOUBLE); + auto nothing_id = schema->AddDebugField("nothing", DataType::INT32); + auto str_id = schema->AddDebugField("str", DataType::VARCHAR); + schema->AddDebugField("int8", DataType::INT8); + schema->AddDebugField("int16", DataType::INT16); + schema->AddDebugField("float", DataType::FLOAT); + schema->set_primary_field_id(counter_id); + + auto dataset = DataGen(schema, N); + auto field_data_meta = + milvus::storage::FieldDataMeta{1, 2, 3, fakevec_id.get()}; + auto field_meta = milvus::FieldMeta(milvus::FieldName("facevec"), + fakevec_id, + milvus::DataType::VECTOR_SPARSE_FLOAT, + dim, + metric_type); + + auto rcm = milvus::storage::RemoteChunkManagerSingleton::GetInstance() + .GetRemoteChunkManager(); + auto data = dataset.get_col>(fakevec_id); + auto data_slices = std::vector{data.data()}; + auto slice_sizes = std::vector{static_cast(N)}; + auto slice_names = std::vector{file_name}; + PutFieldData(rcm.get(), + data_slices, + slice_sizes, + slice_names, + field_data_meta, + field_meta); + + auto conf = generate_build_conf(index_type, metric_type); + auto ds = knowhere::GenDataSet(N, dim, data.data()); + auto indexing = std::make_unique>( + index_type, + metric_type, + knowhere::Version::GetCurrentVersion().VersionNumber()); + indexing->BuildWithDataset(ds, conf); + auto segment_sealed = CreateSealedSegment( + schema, nullptr, -1, SegcoreConfig::default_config(), true); + + LoadIndexInfo vec_info; + vec_info.field_id = fakevec_id.get(); + vec_info.index = std::move(indexing); + vec_info.index_params["metric_type"] = metric_type; + segment_sealed->LoadIndex(vec_info); + + auto field_binlog_info = + FieldBinlogInfo{fakevec_id.get(), + N, + std::vector{N}, + false, + std::vector{file_name}}; + segment_sealed->AddFieldDataInfoForSealed(LoadFieldDataInfo{ + std::map{ + {fakevec_id.get(), field_binlog_info}}, + mmap_dir, + }); + + auto segment = dynamic_cast(segment_sealed.get()); + + auto ids_ds = GenRandomIds(N); + auto result = + segment->get_vector(fakevec_id, ids_ds->GetIds(), ids_ds->GetRows()); + + auto vector = + result.get()->mutable_vectors()->sparse_float_vector().contents(); + // number of rows + EXPECT_TRUE(vector.size() == data.size()); + auto sparse_rows = SparseBytesToRows(vector, true); + for (size_t i = 0; i < N; ++i) { + auto expect = data[ids_ds->GetIds()[i]]; + auto& actual = sparse_rows[i]; + AssertInfo( + expect.size() == actual.size(), + fmt::format("expect {}, actual {}", expect.size(), actual.size())); + AssertInfo( + memcmp(expect.data(), actual.data(), expect.data_byte_size()) == 0, + "sparse float vector doesn't match"); + } + + rcm->Remove(file_name); + std::filesystem::remove_all(mmap_dir); + auto exist = rcm->Exist(file_name); + Assert(!exist); + exist = std::filesystem::exists(mmap_dir); + Assert(!exist); +} + TEST(Sealed, WarmupChunkCache) { // skip test due to mem leak from AWS::InitSDK return; @@ -1524,7 +1637,7 @@ TEST(Sealed, WarmupChunkCache) { auto rcm = milvus::storage::RemoteChunkManagerSingleton::GetInstance() .GetRemoteChunkManager(); auto data = dataset.get_col(fakevec_id); - auto data_slices = std::vector{(uint8_t*)data.data()}; + auto data_slices = std::vector{data.data()}; auto slice_sizes = std::vector{static_cast(N)}; auto slice_names = std::vector{file_name}; PutFieldData(rcm.get(), @@ -1534,9 +1647,8 @@ TEST(Sealed, WarmupChunkCache) { field_data_meta, field_meta); - auto fakevec = dataset.get_col(fakevec_id); auto conf = generate_build_conf(index_type, metric_type); - auto ds = knowhere::GenDataSet(N, dim, fakevec.data()); + auto ds = knowhere::GenDataSet(N, dim, data.data()); auto indexing = std::make_unique>( index_type, metric_type, @@ -1573,11 +1685,11 @@ TEST(Sealed, WarmupChunkCache) { segment->get_vector(fakevec_id, ids_ds->GetIds(), ids_ds->GetRows()); auto vector = result.get()->mutable_vectors()->float_vector().data(); - EXPECT_TRUE(vector.size() == fakevec.size()); + EXPECT_TRUE(vector.size() == data.size()); for (size_t i = 0; i < N; ++i) { auto id = ids_ds->GetIds()[i]; for (size_t j = 0; j < dim; ++j) { - auto expect = fakevec[id * dim + j]; + auto expect = data[id * dim + j]; auto actual = vector[i * dim + j]; AssertInfo(expect == actual, fmt::format("expect {}, actual {}", expect, actual)); diff --git a/internal/core/unittest/test_utils/DataGen.h b/internal/core/unittest/test_utils/DataGen.h index 7566c63757eda..c3beb5b9b8476 100644 --- a/internal/core/unittest/test_utils/DataGen.h +++ b/internal/core/unittest/test_utils/DataGen.h @@ -226,8 +226,9 @@ struct GeneratedData { PanicInfo(FieldIDInvalid, "field id not find"); } - private: GeneratedData() = default; + + private: friend GeneratedData DataGen(SchemaPtr schema, int64_t N, @@ -259,6 +260,15 @@ GenerateRandomSparseFloatVector(size_t rows, std::vector> data(rows); + // Ensure each row has at least one non-zero value + for (size_t i = 0; i < rows; ++i) { + auto col = col_distrib(rng); + float val = real_distrib(rng); + data[i][col] = val; + } + + num_elements -= rows; + for (int32_t i = 0; i < num_elements; ++i) { auto row = row_distrib(rng); while (data[row].size() == (size_t)cols) { diff --git a/internal/core/unittest/test_utils/storage_test_utils.h b/internal/core/unittest/test_utils/storage_test_utils.h index 7eca359f3043d..bafbf27872faf 100644 --- a/internal/core/unittest/test_utils/storage_test_utils.h +++ b/internal/core/unittest/test_utils/storage_test_utils.h @@ -103,7 +103,7 @@ PrepareInsertBinlog(int64_t collection_id, std::map PutFieldData(milvus::storage::ChunkManager* remote_chunk_manager, - const std::vector& buffers, + const std::vector& buffers, const std::vector& element_counts, const std::vector& object_keys, FieldDataMeta& field_data_meta, @@ -120,7 +120,7 @@ PutFieldData(milvus::storage::ChunkManager* remote_chunk_manager, futures.push_back( pool.Submit(milvus::storage::EncodeAndUploadFieldSlice, remote_chunk_manager, - const_cast(buffers[i]), + buffers[i], element_counts[i], field_data_meta, field_meta,