From 2d982147dd868ca3f94e18b7b289bd1d8ef4105a Mon Sep 17 00:00:00 2001 From: cqy123456 <39671710+cqy123456@users.noreply.github.com> Date: Thu, 20 Jun 2024 04:42:02 -0500 Subject: [PATCH] enhance: use segment id and type to register in MmapChunkManager and opt malloc in variableChunk (#33993) issue: https://github.com/milvus-io/milvus/issues/32984 Signed-off-by: cqy123456 --- internal/core/src/common/type_c.h | 1 + internal/core/src/mmap/ChunkData.h | 66 ++++++++++++------- internal/core/src/mmap/ChunkVector.h | 7 +- internal/core/src/mmap/Column.h | 12 ++-- internal/core/src/segcore/ConcurrentVector.h | 20 +++--- internal/core/src/segcore/InsertRecord.h | 9 +-- .../core/src/segcore/SegmentGrowingImpl.h | 6 +- .../core/src/segcore/SegmentSealedImpl.cpp | 7 +- internal/core/src/segcore/SegmentSealedImpl.h | 2 +- internal/core/src/storage/ChunkCache.cpp | 4 +- internal/core/src/storage/ChunkCache.h | 5 +- .../core/src/storage/MmapChunkManager.cpp | 42 +++++++----- internal/core/src/storage/MmapChunkManager.h | 32 +++++---- internal/core/unittest/CMakeLists.txt | 1 + internal/core/unittest/test_chunk_cache.cpp | 18 ++--- .../core/unittest/test_mmap_chunk_manager.cpp | 45 +++++++++++++ internal/datanode/writebuffer/manager_test.go | 2 - 17 files changed, 174 insertions(+), 105 deletions(-) create mode 100644 internal/core/unittest/test_mmap_chunk_manager.cpp diff --git a/internal/core/src/common/type_c.h b/internal/core/src/common/type_c.h index 523a2bcb6a31..6b974c5e4179 100644 --- a/internal/core/src/common/type_c.h +++ b/internal/core/src/common/type_c.h @@ -22,6 +22,7 @@ extern "C" { #endif +// WARNING: do not change the enum value of Growing and Sealed enum SegmentType { Invalid = 0, Growing = 1, diff --git a/internal/core/src/mmap/ChunkData.h b/internal/core/src/mmap/ChunkData.h index b80c399a3721..da2cefe91534 100644 --- a/internal/core/src/mmap/ChunkData.h +++ b/internal/core/src/mmap/ChunkData.h @@ -25,7 +25,7 @@ struct FixedLengthChunk { public: FixedLengthChunk() = delete; explicit FixedLengthChunk(const uint64_t size, - storage::MmapChunkDescriptor descriptor) + storage::MmapChunkDescriptorPtr descriptor) : mmap_descriptor_(descriptor), size_(size) { auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); data_ = (Type*)(mcm->Allocate(mmap_descriptor_, sizeof(Type) * size)); @@ -52,7 +52,7 @@ struct FixedLengthChunk { private: int64_t size_ = 0; Type* data_ = nullptr; - storage::MmapChunkDescriptor mmap_descriptor_ = nullptr; + storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr; }; /** * @brief VariableLengthChunk @@ -64,7 +64,7 @@ struct VariableLengthChunk { public: VariableLengthChunk() = delete; explicit VariableLengthChunk(const uint64_t size, - storage::MmapChunkDescriptor descriptor) + storage::MmapChunkDescriptorPtr descriptor) : mmap_descriptor_(descriptor), size_(size) { data_ = FixedVector>(size); }; @@ -98,7 +98,7 @@ struct VariableLengthChunk { private: int64_t size_ = 0; FixedVector> data_; - storage::MmapChunkDescriptor mmap_descriptor_ = nullptr; + storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr; }; template <> inline void @@ -113,13 +113,19 @@ VariableLengthChunk::set(const std::string* src, length, begin, size_); + size_t total_size = 0; + size_t padding_size = 1; for (auto i = 0; i < length; i++) { - auto buf_size = src[i].size() + 1; - auto buf = (char*)mcm->Allocate(mmap_descriptor_, buf_size); - AssertInfo(buf != nullptr, - "failed to allocate memory from mmap_manager, error_code"); - std::strcpy(buf, src[i].c_str()); - data_[i + begin] = std::string_view(buf, src[i].size()); + total_size += src[i].size() + padding_size; + } + auto buf = (char*)mcm->Allocate(mmap_descriptor_, total_size); + AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager."); + for (auto i = 0, offset = 0; i < length; i++) { + auto data_size = src[i].size() + padding_size; + char* data_ptr = buf + offset; + std::strcpy(data_ptr, src[i].c_str()); + data_[i + begin] = std::string_view(data_ptr, src[i].size()); + offset += data_size; } } template <> @@ -141,14 +147,19 @@ VariableLengthChunk::set(const Json* src, length, begin, size_); + size_t total_size = 0; + size_t padding_size = simdjson::SIMDJSON_PADDING + 1; for (auto i = 0; i < length; i++) { - auto buf_size = src[i].size() + simdjson::SIMDJSON_PADDING + 1; - auto buf = (char*)mcm->Allocate(mmap_descriptor_, buf_size); - AssertInfo( - buf != nullptr, - "failed to allocate memory from mmap_manager, error_code:{}"); - std::strcpy(buf, src[i].c_str()); - data_[i + begin] = Json(buf, src[i].size()); + total_size += src[i].size() + padding_size; + } + auto buf = (char*)mcm->Allocate(mmap_descriptor_, total_size); + AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager."); + for (auto i = 0, offset = 0; i < length; i++) { + auto data_size = src[i].size() + padding_size; + char* data_ptr = buf + offset; + std::strcpy(data_ptr, src[i].c_str()); + data_[i + begin] = Json(data_ptr, src[i].size()); + offset += data_size; } } template <> @@ -169,17 +180,22 @@ VariableLengthChunk::set(const Array* src, length, begin, size_); + size_t total_size = 0; + size_t padding_size = 0; for (auto i = 0; i < length; i++) { - auto array_data = - (char*)mcm->Allocate(mmap_descriptor_, src[i].byte_size()); - AssertInfo(array_data != nullptr, - "failed to allocate memory from mmap_manager, error_code"); - std::copy( - src[i].data(), src[i].data() + src[i].byte_size(), array_data); - data_[i + begin] = ArrayView(array_data, - src[i].byte_size(), + total_size += src[i].byte_size() + padding_size; + } + auto buf = (char*)mcm->Allocate(mmap_descriptor_, total_size); + AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager."); + for (auto i = 0, offset = 0; i < length; i++) { + auto data_size = src[i].byte_size() + padding_size; + char* data_ptr = buf + offset; + std::copy(src[i].data(), src[i].data() + src[i].byte_size(), data_ptr); + data_[i + begin] = ArrayView(data_ptr, + data_size, src[i].get_element_type(), src[i].get_offsets_in_copy()); + offset += data_size; } } template <> diff --git a/internal/core/src/mmap/ChunkVector.h b/internal/core/src/mmap/ChunkVector.h index ba750adefa3d..49377217ecc8 100644 --- a/internal/core/src/mmap/ChunkVector.h +++ b/internal/core/src/mmap/ChunkVector.h @@ -56,7 +56,8 @@ template class ThreadSafeChunkVector : public ChunkVectorBase { public: - ThreadSafeChunkVector(storage::MmapChunkDescriptor descriptor = nullptr) { + ThreadSafeChunkVector( + storage::MmapChunkDescriptorPtr descriptor = nullptr) { mmap_descriptor_ = descriptor; } @@ -181,13 +182,13 @@ class ThreadSafeChunkVector : public ChunkVectorBase { private: mutable std::shared_mutex mutex_; - storage::MmapChunkDescriptor mmap_descriptor_ = nullptr; + storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr; std::deque vec_; }; template ChunkVectorPtr -SelectChunkVectorPtr(storage::MmapChunkDescriptor& mmap_descriptor) { +SelectChunkVectorPtr(storage::MmapChunkDescriptorPtr& mmap_descriptor) { if constexpr (!IsVariableType) { if (mmap_descriptor != nullptr) { return std::make_unique< diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index 627473256fff..10afe468946c 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -94,7 +94,7 @@ class ColumnBase { int dim, const DataType& data_type, storage::MmapChunkManagerPtr mcm, - storage::MmapChunkDescriptor descriptor) + storage::MmapChunkDescriptorPtr descriptor) : mcm_(mcm), mmap_descriptor_(descriptor), type_size_(GetDataTypeSize(data_type, dim)), @@ -340,7 +340,7 @@ class ColumnBase { // length in bytes size_t size_{0}; - storage::MmapChunkDescriptor mmap_descriptor_ = nullptr; + storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr; private: void @@ -401,7 +401,7 @@ class Column : public ColumnBase { int dim, const DataType& data_type, storage::MmapChunkManagerPtr mcm, - storage::MmapChunkDescriptor descriptor) + storage::MmapChunkDescriptorPtr descriptor) : ColumnBase(reserve, dim, data_type, mcm, descriptor) { } @@ -440,7 +440,7 @@ class SparseFloatColumn : public ColumnBase { int dim, const DataType& data_type, storage::MmapChunkManagerPtr mcm, - storage::MmapChunkDescriptor descriptor) + storage::MmapChunkDescriptorPtr descriptor) : ColumnBase(reserve, dim, data_type, mcm, descriptor) { } @@ -543,7 +543,7 @@ class VariableColumn : public ColumnBase { int dim, const DataType& data_type, storage::MmapChunkManagerPtr mcm, - storage::MmapChunkDescriptor descriptor) + storage::MmapChunkDescriptorPtr descriptor) : ColumnBase(reserve, dim, data_type, mcm, descriptor) { } @@ -658,7 +658,7 @@ class ArrayColumn : public ColumnBase { int dim, const DataType& data_type, storage::MmapChunkManagerPtr mcm, - storage::MmapChunkDescriptor descriptor) + storage::MmapChunkDescriptorPtr descriptor) : ColumnBase(reserve, dim, data_type, mcm, descriptor) { } diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index 81feb5786966..a4cb72d986fc 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -169,7 +169,7 @@ class ConcurrentVectorImpl : public VectorBase { explicit ConcurrentVectorImpl( ssize_t elements_per_row, int64_t size_per_chunk, - storage::MmapChunkDescriptor mmap_descriptor = nullptr) + storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr) : VectorBase(size_per_chunk), elements_per_row_(is_type_entire_row ? 1 : elements_per_row) { chunks_ptr_ = SelectChunkVectorPtr(mmap_descriptor); @@ -359,7 +359,7 @@ class ConcurrentVector : public ConcurrentVectorImpl { static_assert(IsScalar || std::is_same_v); explicit ConcurrentVector( int64_t size_per_chunk, - storage::MmapChunkDescriptor mmap_descriptor = nullptr) + storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr) : ConcurrentVectorImpl::ConcurrentVectorImpl( 1, size_per_chunk, mmap_descriptor) { } @@ -371,7 +371,7 @@ class ConcurrentVector public: explicit ConcurrentVector( int64_t size_per_chunk, - storage::MmapChunkDescriptor mmap_descriptor = nullptr) + storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr) : ConcurrentVectorImpl::ConcurrentVectorImpl( 1, size_per_chunk, mmap_descriptor) { } @@ -389,7 +389,7 @@ class ConcurrentVector : public ConcurrentVectorImpl { public: explicit ConcurrentVector( int64_t size_per_chunk, - storage::MmapChunkDescriptor mmap_descriptor = nullptr) + storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr) : ConcurrentVectorImpl::ConcurrentVectorImpl( 1, size_per_chunk, mmap_descriptor) { } @@ -408,7 +408,7 @@ class ConcurrentVector : public ConcurrentVectorImpl { public: explicit ConcurrentVector( int64_t size_per_chunk, - storage::MmapChunkDescriptor mmap_descriptor = nullptr) + storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr) : ConcurrentVectorImpl::ConcurrentVectorImpl( 1, size_per_chunk, mmap_descriptor) { } @@ -427,7 +427,7 @@ class ConcurrentVector public: explicit ConcurrentVector( int64_t size_per_chunk, - storage::MmapChunkDescriptor mmap_descriptor = nullptr) + storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr) : ConcurrentVectorImpl, true>::ConcurrentVectorImpl(1, size_per_chunk, @@ -465,7 +465,7 @@ class ConcurrentVector public: ConcurrentVector(int64_t dim, int64_t size_per_chunk, - storage::MmapChunkDescriptor mmap_descriptor = nullptr) + storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr) : ConcurrentVectorImpl::ConcurrentVectorImpl( dim, size_per_chunk, mmap_descriptor) { } @@ -478,7 +478,7 @@ class ConcurrentVector explicit ConcurrentVector( int64_t dim, int64_t size_per_chunk, - storage::MmapChunkDescriptor mmap_descriptor = nullptr) + storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr) : ConcurrentVectorImpl(dim / 8, size_per_chunk, mmap_descriptor) { AssertInfo(dim % 8 == 0, fmt::format("dim is not a multiple of 8, dim={}", dim)); @@ -491,7 +491,7 @@ class ConcurrentVector public: ConcurrentVector(int64_t dim, int64_t size_per_chunk, - storage::MmapChunkDescriptor mmap_descriptor = nullptr) + storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr) : ConcurrentVectorImpl::ConcurrentVectorImpl( dim, size_per_chunk, mmap_descriptor) { } @@ -503,7 +503,7 @@ class ConcurrentVector public: ConcurrentVector(int64_t dim, int64_t size_per_chunk, - storage::MmapChunkDescriptor mmap_descriptor = nullptr) + storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr) : ConcurrentVectorImpl::ConcurrentVectorImpl( dim, size_per_chunk, mmap_descriptor) { } diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index d200c085cc52..39ccc4791794 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -296,9 +296,10 @@ class OffsetOrderedArray : public OffsetMap { template struct InsertRecord { - InsertRecord(const Schema& schema, - const int64_t size_per_chunk, - const storage::MmapChunkDescriptor mmap_descriptor = nullptr) + InsertRecord( + const Schema& schema, + const int64_t size_per_chunk, + const storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr) : timestamps_(size_per_chunk), mmap_descriptor_(mmap_descriptor) { std::optional pk_field_id = schema.get_primary_field_id(); @@ -630,7 +631,7 @@ struct InsertRecord { private: std::unordered_map> fields_data_{}; mutable std::shared_mutex shared_mutex_{}; - storage::MmapChunkDescriptor mmap_descriptor_; + storage::MmapChunkDescriptorPtr mmap_descriptor_; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index ce62566b08a7..0bb20285a469 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -212,8 +212,8 @@ class SegmentGrowingImpl : public SegmentGrowing { : mmap_descriptor_(storage::MmapManager::GetInstance() .GetMmapConfig() .GetEnableGrowingMmap() - ? storage::MmapChunkDescriptor( - new storage::MmapChunkDescriptorValue( + ? storage::MmapChunkDescriptorPtr( + new storage::MmapChunkDescriptor( {segment_id, SegmentType::Growing})) : nullptr), segcore_config_(segcore_config), @@ -317,7 +317,7 @@ class SegmentGrowingImpl : public SegmentGrowing { } private: - storage::MmapChunkDescriptor mmap_descriptor_ = nullptr; + storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr; SegcoreConfig segcore_config_; SchemaPtr schema_; IndexMetaPtr index_meta_; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index 6137d6f56758..08abcbc16e4e 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -821,7 +821,7 @@ SegmentSealedImpl::GetFieldDataPath(FieldId field_id, int64_t offset) const { std::tuple> static ReadFromChunkCache( const storage::ChunkCachePtr& cc, const std::string& data_path, - const storage::MmapChunkDescriptor& descriptor) { + const storage::MmapChunkDescriptorPtr& descriptor) { auto column = cc->Read(data_path, descriptor); cc->Prefetch(data_path); return {data_path, column}; @@ -1032,9 +1032,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema, id_(segment_id), col_index_meta_(index_meta), TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) { - mmap_descriptor_ = std::shared_ptr( - new storage::MmapChunkDescriptorValue( - {segment_id, SegmentType::Sealed})); + mmap_descriptor_ = std::shared_ptr( + new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed})); auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); mcm->Register(mmap_descriptor_); } diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index 2f7964533762..36f674152c37 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -280,7 +280,7 @@ class SegmentSealedImpl : public SegmentSealed { private: // mmap descriptor, used in chunk cache - storage::MmapChunkDescriptor mmap_descriptor_ = nullptr; + storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr; // segment loading state BitsetType field_data_ready_bitset_; BitsetType index_ready_bitset_; diff --git a/internal/core/src/storage/ChunkCache.cpp b/internal/core/src/storage/ChunkCache.cpp index d579c3ce228c..4509b59fffb9 100644 --- a/internal/core/src/storage/ChunkCache.cpp +++ b/internal/core/src/storage/ChunkCache.cpp @@ -21,7 +21,7 @@ namespace milvus::storage { std::shared_ptr ChunkCache::Read(const std::string& filepath, - const MmapChunkDescriptor& descriptor) { + const MmapChunkDescriptorPtr& descriptor) { { std::shared_lock lck(mutex_); auto it = columns_.find(filepath); @@ -75,7 +75,7 @@ ChunkCache::Prefetch(const std::string& filepath) { std::shared_ptr ChunkCache::Mmap(const FieldDataPtr& field_data, - const MmapChunkDescriptor& descriptor) { + const MmapChunkDescriptorPtr& descriptor) { auto dim = field_data->get_dim(); auto data_type = field_data->get_data_type(); diff --git a/internal/core/src/storage/ChunkCache.h b/internal/core/src/storage/ChunkCache.h index d29fddbb92c9..d0c742bfaf61 100644 --- a/internal/core/src/storage/ChunkCache.h +++ b/internal/core/src/storage/ChunkCache.h @@ -43,7 +43,7 @@ class ChunkCache { public: std::shared_ptr - Read(const std::string& filepath, const MmapChunkDescriptor& descriptor); + Read(const std::string& filepath, const MmapChunkDescriptorPtr& descriptor); void Remove(const std::string& filepath); @@ -53,7 +53,8 @@ class ChunkCache { private: std::shared_ptr - Mmap(const FieldDataPtr& field_data, const MmapChunkDescriptor& descriptor); + Mmap(const FieldDataPtr& field_data, + const MmapChunkDescriptorPtr& descriptor); std::string CachePath(const std::string& filepath); diff --git a/internal/core/src/storage/MmapChunkManager.cpp b/internal/core/src/storage/MmapChunkManager.cpp index f7d71665916d..ba5ac2b11236 100644 --- a/internal/core/src/storage/MmapChunkManager.cpp +++ b/internal/core/src/storage/MmapChunkManager.cpp @@ -217,43 +217,53 @@ MmapChunkManager::~MmapChunkManager() { } void -MmapChunkManager::Register(const MmapChunkDescriptor key) { - if (HasKey(key)) { - LOG_WARN("key has exist in growing mmap manager"); +MmapChunkManager::Register(const MmapChunkDescriptorPtr descriptor) { + if (HasRegister(descriptor)) { + LOG_WARN("descriptor has exist in MmapChunkManager"); return; } + AssertInfo( + descriptor->segment_type == SegmentType::Growing || + descriptor->segment_type == SegmentType::Sealed, + "only register for growing or sealed segment in MmapChunkManager"); std::unique_lock lck(mtx_); - blocks_table_.emplace(key, std::vector()); + blocks_table_.emplace(*descriptor.get(), std::vector()); return; } void -MmapChunkManager::UnRegister(const MmapChunkDescriptor key) { +MmapChunkManager::UnRegister(const MmapChunkDescriptorPtr descriptor) { std::unique_lock lck(mtx_); - if (blocks_table_.find(key) != blocks_table_.end()) { - auto& blocks = blocks_table_[key]; + MmapChunkDescriptor blocks_table_key = *descriptor.get(); + if (blocks_table_.find(blocks_table_key) != blocks_table_.end()) { + auto& blocks = blocks_table_[blocks_table_key]; for (auto i = 0; i < blocks.size(); i++) { blocks_handler_->Deallocate(std::move(blocks[i])); } - blocks_table_.erase(key); + blocks_table_.erase(blocks_table_key); } } bool -MmapChunkManager::HasKey(const MmapChunkDescriptor key) { +MmapChunkManager::HasRegister(const MmapChunkDescriptorPtr descriptor) { std::shared_lock lck(mtx_); - return (blocks_table_.find(key) != blocks_table_.end()); + return (blocks_table_.find(*descriptor.get()) != blocks_table_.end()); } void* -MmapChunkManager::Allocate(const MmapChunkDescriptor key, const uint64_t size) { - AssertInfo(HasKey(key), "key {} has not been register.", key->segment_id); +MmapChunkManager::Allocate(const MmapChunkDescriptorPtr descriptor, + const uint64_t size) { + AssertInfo(HasRegister(descriptor), + "descriptor {} has not been register.", + descriptor->segment_id); std::unique_lock lck(mtx_); + auto blocks_table_key = *descriptor.get(); if (size < blocks_handler_->GetFixFileSize()) { // find a place to fit in - for (auto block_id = 0; block_id < blocks_table_[key].size(); + for (auto block_id = 0; + block_id < blocks_table_[blocks_table_key].size(); block_id++) { - auto addr = blocks_table_[key][block_id]->Get(size); + auto addr = blocks_table_[blocks_table_key][block_id]->Get(size); if (addr != nullptr) { return addr; } @@ -263,14 +273,14 @@ MmapChunkManager::Allocate(const MmapChunkDescriptor key, const uint64_t size) { AssertInfo(new_block != nullptr, "new mmap_block can't be nullptr"); auto addr = new_block->Get(size); AssertInfo(addr != nullptr, "fail to allocate from mmap block."); - blocks_table_[key].emplace_back(std::move(new_block)); + blocks_table_[blocks_table_key].emplace_back(std::move(new_block)); return addr; } else { auto new_block = blocks_handler_->AllocateLargeBlock(size); AssertInfo(new_block != nullptr, "new mmap_block can't be nullptr"); auto addr = new_block->Get(size); AssertInfo(addr != nullptr, "fail to allocate from mmap block."); - blocks_table_[key].emplace_back(std::move(new_block)); + blocks_table_[blocks_table_key].emplace_back(std::move(new_block)); return addr; } } diff --git a/internal/core/src/storage/MmapChunkManager.h b/internal/core/src/storage/MmapChunkManager.h index 40bea6fd905c..f8e3c25baa02 100644 --- a/internal/core/src/storage/MmapChunkManager.h +++ b/internal/core/src/storage/MmapChunkManager.h @@ -30,18 +30,24 @@ #include "storage/LocalChunkManagerSingleton.h" namespace milvus::storage { // use segment id and segment type to descripe a segment in mmap chunk manager, segment only in two type (growing or sealed) in mmap chunk manager -struct MmapChunkDescriptorValue { +struct MmapChunkDescriptor { + struct DescriptorHash { + size_t + operator()(const MmapChunkDescriptor& x) const { + //SegmentType::Growing = 0x01,SegmentType::Sealed = 0x10 + size_t sign = ((size_t)x.segment_type) << (sizeof(size_t) * 8 - 1); + return ((size_t)x.segment_id) | sign; + } + }; + bool + operator==(const MmapChunkDescriptor& x) const { + return segment_id == x.segment_id && segment_type == x.segment_type; + } int64_t segment_id; SegmentType segment_type; }; -using MmapChunkDescriptor = std::shared_ptr; +using MmapChunkDescriptorPtr = std::shared_ptr; -struct DescriptorHash { - size_t - operator()(const MmapChunkDescriptor& x) const { - return x->segment_id * 10 + (size_t)x->segment_type; - } -}; /** * @brief MmapBlock is a basic unit of MmapChunkManager. It handle all memory mmaping in one tmp file. * static function(TotalBlocksSize) is used to get total files size of chunk mmap. @@ -175,13 +181,13 @@ class MmapChunkManager { const uint64_t file_size); ~MmapChunkManager(); void - Register(const MmapChunkDescriptor key); + Register(const MmapChunkDescriptorPtr descriptor); void - UnRegister(const MmapChunkDescriptor key); + UnRegister(const MmapChunkDescriptorPtr descriptor); bool - HasKey(const MmapChunkDescriptor key); + HasRegister(const MmapChunkDescriptorPtr descriptor); void* - Allocate(const MmapChunkDescriptor key, const uint64_t size); + Allocate(const MmapChunkDescriptorPtr descriptor, const uint64_t size); uint64_t GetDiskAllocSize() { std::shared_lock lck(mtx_); @@ -205,7 +211,7 @@ class MmapChunkManager { mutable std::shared_mutex mtx_; std::unordered_map, - DescriptorHash> + MmapChunkDescriptor::DescriptorHash> blocks_table_; std::unique_ptr blocks_handler_ = nullptr; std::string mmap_file_prefix_; diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 44f85a366971..9d8e6a4f5529 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -70,6 +70,7 @@ set(MILVUS_TEST_FILES test_futures.cpp test_array_inverted_index.cpp test_chunk_vector.cpp + test_mmap_chunk_manager.cpp ) if ( INDEX_ENGINE STREQUAL "cardinal" ) diff --git a/internal/core/unittest/test_chunk_cache.cpp b/internal/core/unittest/test_chunk_cache.cpp index 9404a56f4226..ee161cfa79f8 100644 --- a/internal/core/unittest/test_chunk_cache.cpp +++ b/internal/core/unittest/test_chunk_cache.cpp @@ -38,14 +38,13 @@ class ChunkCacheTest : public testing::Test { TearDown() override { mcm->UnRegister(descriptor); } - const char* local_storage_path = "/tmp/test_chunk_cache/local"; const char* file_name = "chunk_cache_test/insert_log/2/101/1000000"; milvus::storage::MmapChunkManagerPtr mcm; milvus::segcore::SegcoreConfig config; - milvus::storage::MmapChunkDescriptor descriptor = - std::shared_ptr( - new milvus::storage::MmapChunkDescriptorValue( - {111, SegmentType::Sealed})); + milvus::storage::MmapChunkDescriptorPtr descriptor = + std::shared_ptr( + new milvus::storage::MmapChunkDescriptor( + {101, SegmentType::Sealed})); }; TEST_F(ChunkCacheTest, Read) { @@ -53,9 +52,6 @@ TEST_F(ChunkCacheTest, Read) { auto dim = 128; auto metric_type = knowhere::metric::L2; - milvus::storage::LocalChunkManagerSingleton::GetInstance().Init( - local_storage_path); - auto schema = std::make_shared(); auto fake_id = schema->AddDebugField( "fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type); @@ -87,8 +83,6 @@ TEST_F(ChunkCacheTest, Read) { auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache(); const auto& column = cc->Read(file_name, descriptor); - std::cout << "column->ByteSize() :" << column->ByteSize() << " " - << dim * N * 4 << std::endl; Assert(column->ByteSize() == dim * N * 4); auto actual = (float*)column->Data(); @@ -106,9 +100,6 @@ TEST_F(ChunkCacheTest, TestMultithreads) { auto dim = 128; auto metric_type = knowhere::metric::L2; - milvus::storage::LocalChunkManagerSingleton::GetInstance().Init( - local_storage_path); - auto schema = std::make_shared(); auto fake_id = schema->AddDebugField( "fakevec", milvus::DataType::VECTOR_FLOAT, dim, metric_type); @@ -143,7 +134,6 @@ TEST_F(ChunkCacheTest, TestMultithreads) { constexpr int threads = 16; std::vector total_counts(threads); auto executor = [&](int thread_id) { - std::cout << "thread id" << thread_id << " read data" << std::endl; const auto& column = cc->Read(file_name, descriptor); Assert(column->ByteSize() == dim * N * 4); diff --git a/internal/core/unittest/test_mmap_chunk_manager.cpp b/internal/core/unittest/test_mmap_chunk_manager.cpp new file mode 100644 index 000000000000..bcf5a86516e7 --- /dev/null +++ b/internal/core/unittest/test_mmap_chunk_manager.cpp @@ -0,0 +1,45 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include +#include "storage/MmapManager.h" +/* +checking register function of mmap chunk manager +*/ +TEST(MmapChunkManager, Register) { + auto mcm = + milvus::storage::MmapManager::GetInstance().GetMmapChunkManager(); + auto get_descriptor = + [](int64_t seg_id, + SegmentType seg_type) -> milvus::storage::MmapChunkDescriptorPtr { + return std::shared_ptr( + new milvus::storage::MmapChunkDescriptor({seg_id, seg_type})); + }; + int64_t segment_id = 0x0000456789ABCDEF; + int64_t flow_segment_id = 0x8000456789ABCDEF; + mcm->Register(get_descriptor(segment_id, SegmentType::Growing)); + ASSERT_TRUE( + mcm->HasRegister(get_descriptor(segment_id, SegmentType::Growing))); + ASSERT_FALSE( + mcm->HasRegister(get_descriptor(segment_id, SegmentType::Sealed))); + mcm->Register(get_descriptor(segment_id, SegmentType::Sealed)); + ASSERT_FALSE(mcm->HasRegister( + get_descriptor(flow_segment_id, SegmentType::Growing))); + ASSERT_FALSE( + mcm->HasRegister(get_descriptor(flow_segment_id, SegmentType::Sealed))); + + mcm->UnRegister(get_descriptor(segment_id, SegmentType::Sealed)); + ASSERT_TRUE( + mcm->HasRegister(get_descriptor(segment_id, SegmentType::Growing))); + ASSERT_FALSE( + mcm->HasRegister(get_descriptor(segment_id, SegmentType::Sealed))); + mcm->UnRegister(get_descriptor(segment_id, SegmentType::Growing)); +} \ No newline at end of file diff --git a/internal/datanode/writebuffer/manager_test.go b/internal/datanode/writebuffer/manager_test.go index b9b6acff31d9..61b3b693a2e8 100644 --- a/internal/datanode/writebuffer/manager_test.go +++ b/internal/datanode/writebuffer/manager_test.go @@ -15,7 +15,6 @@ import ( "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" - "github.com/milvus-io/milvus/internal/util/initcore" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/merr" @@ -54,7 +53,6 @@ func (s *ManagerSuite) SetupSuite() { }, }, } - initcore.InitMmapManager(paramtable.Get()) s.channelName = "by-dev-rootcoord-dml_0_100_v0" }