diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 37775973f8221..e6903cfed885a 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -343,6 +343,9 @@ queryNode: warmup: disable mmap: mmapEnabled: false # Enable mmap for loading data + growingMmapEnabled: false # Enable mmap for growing segment + fixedFileSizeForMmapAlloc: 4 #MB, fixed file size for mmap chunk manager to store chunk data + maxDiskUsagePercentageForMmapAlloc: 20 # max percentage of disk usage in memory mapping lazyload: enabled: false # Enable lazyload for loading data waitTimeout: 30000 # max wait timeout duration in milliseconds before start to do lazyload search and retrieve diff --git a/internal/core/src/common/Array.h b/internal/core/src/common/Array.h index a8b066e0d31d3..ce2d6255db973 100644 --- a/internal/core/src/common/Array.h +++ b/internal/core/src/common/Array.h @@ -275,6 +275,11 @@ class Array { return offsets_; } + std::vector + get_offsets_in_copy() const { + return offsets_; + } + ScalarArray output_data() const { ScalarArray data_array; @@ -573,6 +578,11 @@ class ArrayView { data() const { return data_; } + // copy to result + std::vector + get_offsets_in_copy() const { + return offsets_; + } bool is_same_array(const proto::plan::Array& arr2) const { diff --git a/internal/core/src/common/EasyAssert.h b/internal/core/src/common/EasyAssert.h index 164333c5cf074..3b247600b376d 100644 --- a/internal/core/src/common/EasyAssert.h +++ b/internal/core/src/common/EasyAssert.h @@ -61,6 +61,9 @@ enum ErrorCode { MetricTypeNotMatch = 2031, DimNotMatch = 2032, ClusterSkip = 2033, + MemAllocateFailed = 2034, + MemAllocateSizeNotMatch = 2035, + MmapError = 2036, KnowhereError = 2100, // timeout or cancel related. diff --git a/internal/core/src/common/VectorTrait.h b/internal/core/src/common/VectorTrait.h index 67e92deb46e37..d987acb41a140 100644 --- a/internal/core/src/common/VectorTrait.h +++ b/internal/core/src/common/VectorTrait.h @@ -68,6 +68,24 @@ template constexpr bool IsSparse = std::is_same_v || std::is_same_v>; +template +constexpr bool IsVariableType = + std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || + IsSparse; + +template +constexpr bool IsVariableTypeSupportInChunk = + std::is_same_v || std::is_same_v || + std::is_same_v; + +template +using ChunkViewType = std::conditional_t< + std::is_same_v, + std::string_view, + std::conditional_t, ArrayView, T>>; + struct FundamentalTag {}; struct StringTag {}; diff --git a/internal/core/src/common/type_c.h b/internal/core/src/common/type_c.h index 70abcebfad6a2..523a2bcb6a316 100644 --- a/internal/core/src/common/type_c.h +++ b/internal/core/src/common/type_c.h @@ -93,6 +93,14 @@ typedef struct CStorageConfig { int64_t requestTimeoutMs; } CStorageConfig; +typedef struct CMmapConfig { + const char* cache_read_ahead_policy; + const char* mmap_path; + uint64_t disk_limit; + uint64_t fix_file_size; + bool growing_enable_mmap; +} CMmapConfig; + typedef struct CTraceConfig { const char* exporter; float sampleFraction; diff --git a/internal/core/src/exec/expression/BinaryRangeExpr.cpp b/internal/core/src/exec/expression/BinaryRangeExpr.cpp index d7d916aade213..ea44f30b8cd44 100644 --- a/internal/core/src/exec/expression/BinaryRangeExpr.cpp +++ b/internal/core/src/exec/expression/BinaryRangeExpr.cpp @@ -53,7 +53,10 @@ PhyBinaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { break; } case DataType::VARCHAR: { - if (segment_->type() == SegmentType::Growing) { + if (segment_->type() == SegmentType::Growing && + !storage::MmapManager::GetInstance() + .GetMmapConfig() + .growing_enable_mmap) { result = ExecRangeVisitorImpl(); } else { result = ExecRangeVisitorImpl(); diff --git a/internal/core/src/exec/expression/CompareExpr.cpp b/internal/core/src/exec/expression/CompareExpr.cpp index f9199d65938a2..6ec731040b1f1 100644 --- a/internal/core/src/exec/expression/CompareExpr.cpp +++ b/internal/core/src/exec/expression/CompareExpr.cpp @@ -68,7 +68,10 @@ PhyCompareFilterExpr::GetChunkData(FieldId field_id, }; } } - if (segment_->type() == SegmentType::Growing) { + if (segment_->type() == SegmentType::Growing && + !storage::MmapManager::GetInstance() + .GetMmapConfig() + .growing_enable_mmap) { auto chunk_data = segment_->chunk_data(field_id, chunk_id).data(); return [chunk_data](int i) -> const number { return chunk_data[i]; }; diff --git a/internal/core/src/exec/expression/Expr.h b/internal/core/src/exec/expression/Expr.h index a300515560b2d..684217acca683 100644 --- a/internal/core/src/exec/expression/Expr.h +++ b/internal/core/src/exec/expression/Expr.h @@ -191,7 +191,6 @@ class SegmentExpr : public Expr { TargetBitmapView res, ValTypes... values) { int64_t processed_size = 0; - for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) { auto data_pos = (i == current_data_chunk_) ? current_data_chunk_pos_ : 0; diff --git a/internal/core/src/exec/expression/TermExpr.cpp b/internal/core/src/exec/expression/TermExpr.cpp index 1cf18619e67a2..95828c36ec981 100644 --- a/internal/core/src/exec/expression/TermExpr.cpp +++ b/internal/core/src/exec/expression/TermExpr.cpp @@ -55,7 +55,10 @@ PhyTermFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { break; } case DataType::VARCHAR: { - if (segment_->type() == SegmentType::Growing) { + if (segment_->type() == SegmentType::Growing && + !storage::MmapManager::GetInstance() + .GetMmapConfig() + .growing_enable_mmap) { result = ExecVisitorImpl(); } else { result = ExecVisitorImpl(); diff --git a/internal/core/src/exec/expression/UnaryExpr.cpp b/internal/core/src/exec/expression/UnaryExpr.cpp index b9567133de801..8585f72e53deb 100644 --- a/internal/core/src/exec/expression/UnaryExpr.cpp +++ b/internal/core/src/exec/expression/UnaryExpr.cpp @@ -112,7 +112,10 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) { break; } case DataType::VARCHAR: { - if (segment_->type() == SegmentType::Growing) { + if (segment_->type() == SegmentType::Growing && + !storage::MmapManager::GetInstance() + .GetMmapConfig() + .growing_enable_mmap) { result = ExecRangeVisitorImpl(); } else { result = ExecRangeVisitorImpl(); @@ -294,7 +297,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) { // filtering by index, get candidates. auto size_per_chunk = segment_->size_per_chunk(); - auto retrieve = [ size_per_chunk, this ](int64_t offset) -> auto { + auto retrieve = [ size_per_chunk, this ](int64_t offset) -> auto{ auto chunk_idx = offset / size_per_chunk; auto chunk_offset = offset % size_per_chunk; const auto& chunk = diff --git a/internal/core/src/mmap/ChunkData.h b/internal/core/src/mmap/ChunkData.h new file mode 100644 index 0000000000000..b80c399a3721d --- /dev/null +++ b/internal/core/src/mmap/ChunkData.h @@ -0,0 +1,195 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once +#include "common/Array.h" +#include "storage/MmapManager.h" +namespace milvus { +/** + * @brief FixedLengthChunk + */ +template +struct FixedLengthChunk { + public: + FixedLengthChunk() = delete; + explicit FixedLengthChunk(const uint64_t size, + storage::MmapChunkDescriptor descriptor) + : mmap_descriptor_(descriptor), size_(size) { + auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); + data_ = (Type*)(mcm->Allocate(mmap_descriptor_, sizeof(Type) * size)); + AssertInfo(data_ != nullptr, + "failed to create a mmapchunk: {}, map_size"); + }; + void* + data() { + return data_; + }; + size_t + size() { + return size_; + }; + Type + get(const int i) const { + return data_[i]; + } + const Type& + view(const int i) const { + return data_[i]; + } + + private: + int64_t size_ = 0; + Type* data_ = nullptr; + storage::MmapChunkDescriptor mmap_descriptor_ = nullptr; +}; +/** + * @brief VariableLengthChunk + */ +template +struct VariableLengthChunk { + static_assert(IsVariableTypeSupportInChunk); + + public: + VariableLengthChunk() = delete; + explicit VariableLengthChunk(const uint64_t size, + storage::MmapChunkDescriptor descriptor) + : mmap_descriptor_(descriptor), size_(size) { + data_ = FixedVector>(size); + }; + inline void + set(const Type* src, uint32_t begin, uint32_t length) { + throw std::runtime_error( + "set should be a template specialization function"); + } + inline Type + get(const int i) const { + throw std::runtime_error( + "get should be a template specialization function"); + } + const ChunkViewType& + view(const int i) const { + return data_[i]; + } + const ChunkViewType& + operator[](const int i) const { + return view(i); + } + void* + data() { + return data_.data(); + }; + size_t + size() { + return size_; + }; + + private: + int64_t size_ = 0; + FixedVector> data_; + storage::MmapChunkDescriptor mmap_descriptor_ = nullptr; +}; +template <> +inline void +VariableLengthChunk::set(const std::string* src, + uint32_t begin, + uint32_t length) { + auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); + milvus::ErrorCode err_code; + AssertInfo( + begin + length <= size_, + "failed to set a chunk with length: {} from beign {}, map_size={}", + length, + begin, + size_); + for (auto i = 0; i < length; i++) { + auto buf_size = src[i].size() + 1; + auto buf = (char*)mcm->Allocate(mmap_descriptor_, buf_size); + AssertInfo(buf != nullptr, + "failed to allocate memory from mmap_manager, error_code"); + std::strcpy(buf, src[i].c_str()); + data_[i + begin] = std::string_view(buf, src[i].size()); + } +} +template <> +inline std::string +VariableLengthChunk::get(const int i) const { + // copy to a string + return std::string(data_[i]); +} +template <> +inline void +VariableLengthChunk::set(const Json* src, + uint32_t begin, + uint32_t length) { + auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); + milvus::ErrorCode err_code; + AssertInfo( + begin + length <= size_, + "failed to set a chunk with length: {} from beign {}, map_size={}", + length, + begin, + size_); + for (auto i = 0; i < length; i++) { + auto buf_size = src[i].size() + simdjson::SIMDJSON_PADDING + 1; + auto buf = (char*)mcm->Allocate(mmap_descriptor_, buf_size); + AssertInfo( + buf != nullptr, + "failed to allocate memory from mmap_manager, error_code:{}"); + std::strcpy(buf, src[i].c_str()); + data_[i + begin] = Json(buf, src[i].size()); + } +} +template <> +inline Json +VariableLengthChunk::get(const int i) const { + return std::move(Json(simdjson::padded_string(data_[i].data()))); +} +template <> +inline void +VariableLengthChunk::set(const Array* src, + uint32_t begin, + uint32_t length) { + auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); + milvus::ErrorCode err_code; + AssertInfo( + begin + length <= size_, + "failed to set a chunk with length: {} from beign {}, map_size={}", + length, + begin, + size_); + for (auto i = 0; i < length; i++) { + auto array_data = + (char*)mcm->Allocate(mmap_descriptor_, src[i].byte_size()); + AssertInfo(array_data != nullptr, + "failed to allocate memory from mmap_manager, error_code"); + std::copy( + src[i].data(), src[i].data() + src[i].byte_size(), array_data); + data_[i + begin] = ArrayView(array_data, + src[i].byte_size(), + src[i].get_element_type(), + src[i].get_offsets_in_copy()); + } +} +template <> +inline Array +VariableLengthChunk::get(const int i) const { + auto array_view_i = data_[i]; + char* data = static_cast(const_cast(array_view_i.data())); + return Array(data, + array_view_i.byte_size(), + array_view_i.get_element_type(), + array_view_i.get_offsets_in_copy()); +} +} // namespace milvus \ No newline at end of file diff --git a/internal/core/src/mmap/ChunkVector.h b/internal/core/src/mmap/ChunkVector.h new file mode 100644 index 0000000000000..ba750adefa3d0 --- /dev/null +++ b/internal/core/src/mmap/ChunkVector.h @@ -0,0 +1,212 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once +#include "mmap/ChunkData.h" +#include "storage/MmapManager.h" +namespace milvus { +template +class ChunkVectorBase { + public: + virtual ~ChunkVectorBase() = default; + virtual void + emplace_to_at_least(int64_t chunk_num, int64_t chunk_size) = 0; + virtual void + copy_to_chunk(int64_t chunk_id, + int64_t offest, + const Type* data, + int64_t length) = 0; + virtual void* + get_chunk_data(int64_t index) = 0; + virtual int64_t + get_chunk_size(int64_t index) = 0; + virtual Type + get_element(int64_t chunk_id, int64_t chunk_offset) = 0; + virtual ChunkViewType + view_element(int64_t chunk_id, int64_t chunk_offset) = 0; + int64_t + size() const { + return counter_; + } + virtual void + clear() = 0; + virtual SpanBase + get_span(int64_t chunk_id) = 0; + + protected: + std::atomic counter_ = 0; +}; +template +using ChunkVectorPtr = std::unique_ptr>; + +template , + bool IsMmap = false> +class ThreadSafeChunkVector : public ChunkVectorBase { + public: + ThreadSafeChunkVector(storage::MmapChunkDescriptor descriptor = nullptr) { + mmap_descriptor_ = descriptor; + } + + void + emplace_to_at_least(int64_t chunk_num, int64_t chunk_size) override { + std::unique_lock lck(this->mutex_); + if (chunk_num <= this->counter_) { + return; + } + while (vec_.size() < chunk_num) { + if constexpr (IsMmap) { + vec_.emplace_back(chunk_size, mmap_descriptor_); + } else { + vec_.emplace_back(chunk_size); + } + ++this->counter_; + } + } + + void + copy_to_chunk(int64_t chunk_id, + int64_t offset, + const Type* data, + int64_t length) override { + std::unique_lock lck(mutex_); + AssertInfo(chunk_id < this->counter_, + fmt::format("index out of range, index={}, counter_={}", + chunk_id, + this->counter_)); + if constexpr (!IsMmap || !IsVariableType) { + auto ptr = (Type*)vec_[chunk_id].data(); + AssertInfo( + offset + length <= vec_[chunk_id].size(), + fmt::format( + "index out of chunk range, offset={}, length={}, size={}", + offset, + length, + vec_[chunk_id].size())); + std::copy_n(data, length, ptr + offset); + } else { + vec_[chunk_id].set(data, offset, length); + } + } + + Type + get_element(int64_t chunk_id, int64_t chunk_offset) override { + std::shared_lock lck(mutex_); + auto chunk = vec_[chunk_id]; + AssertInfo( + chunk_id < this->counter_ && chunk_offset < chunk.size(), + fmt::format("index out of range, index={}, chunk_offset={}, cap={}", + chunk_id, + chunk_offset, + chunk.size())); + if constexpr (IsMmap) { + return chunk.get(chunk_offset); + } else { + return chunk[chunk_offset]; + } + } + + ChunkViewType + view_element(int64_t chunk_id, int64_t chunk_offset) override { + std::shared_lock lck(mutex_); + auto chunk = vec_[chunk_id]; + if constexpr (IsMmap) { + return chunk.view(chunk_offset); + } else if constexpr (std::is_same_v) { + return std::string_view(chunk[chunk_offset].data(), + chunk[chunk_offset].size()); + } else if constexpr (std::is_same_v) { + auto& src = chunk[chunk_offset]; + return ArrayView(const_cast(src.data()), + src.byte_size(), + src.get_element_type(), + src.get_offsets_in_copy()); + } else { + return chunk[chunk_offset]; + } + } + + void* + get_chunk_data(int64_t index) override { + std::shared_lock lck(mutex_); + AssertInfo(index < this->counter_, + fmt::format("index out of range, index={}, counter_={}", + index, + this->counter_)); + return vec_[index].data(); + } + + int64_t + get_chunk_size(int64_t index) override { + std::shared_lock lck(mutex_); + AssertInfo(index < this->counter_, + fmt::format("index out of range, index={}, counter_={}", + index, + this->counter_)); + return vec_[index].size(); + } + + void + clear() override { + std::unique_lock lck(mutex_); + this->counter_ = 0; + vec_.clear(); + } + + SpanBase + get_span(int64_t chunk_id) override { + std::shared_lock lck(mutex_); + if constexpr (IsMmap && std::is_same_v) { + return SpanBase(get_chunk_data(chunk_id), + get_chunk_size(chunk_id), + sizeof(ChunkViewType)); + } else { + return SpanBase(get_chunk_data(chunk_id), + get_chunk_size(chunk_id), + sizeof(Type)); + } + } + + private: + mutable std::shared_mutex mutex_; + storage::MmapChunkDescriptor mmap_descriptor_ = nullptr; + std::deque vec_; +}; + +template +ChunkVectorPtr +SelectChunkVectorPtr(storage::MmapChunkDescriptor& mmap_descriptor) { + if constexpr (!IsVariableType) { + if (mmap_descriptor != nullptr) { + return std::make_unique< + ThreadSafeChunkVector, true>>( + mmap_descriptor); + } else { + return std::make_unique>(); + } + } else if constexpr (IsVariableTypeSupportInChunk) { + if (mmap_descriptor != nullptr) { + return std::make_unique< + ThreadSafeChunkVector, true>>( + mmap_descriptor); + } else { + return std::make_unique>(); + } + } else { + // todo: sparse float vector support mmap + return std::make_unique>(); + } +} +} // namespace milvus \ No newline at end of file diff --git a/internal/core/src/mmap/Column.h b/internal/core/src/mmap/Column.h index 40c5c24099652..627473256fff9 100644 --- a/internal/core/src/mmap/Column.h +++ b/internal/core/src/mmap/Column.h @@ -39,6 +39,7 @@ #include "common/Array.h" #include "knowhere/dataset.h" #include "storage/prometheus_client.h" +#include "storage/MmapChunkManager.h" namespace milvus { @@ -53,12 +54,17 @@ constexpr size_t ARRAY_PADDING = 1; class ColumnBase { public: + enum MappingType { + MAP_WITH_ANONYMOUS = 0, + MAP_WITH_FILE = 1, + MAP_WITH_MANAGER = 2, + }; // memory mode ctor ColumnBase(size_t reserve, const FieldMeta& field_meta) : type_size_(IsSparseFloatVectorDataType(field_meta.get_data_type()) ? 1 : field_meta.get_sizeof()), - is_map_anonymous_(true) { + mapping_type_(MappingType::MAP_WITH_ANONYMOUS) { SetPaddingSize(field_meta.get_data_type()); if (IsVariableDataType(field_meta.get_data_type())) { @@ -83,13 +89,38 @@ class ColumnBase { UpdateMetricWhenMmap(mapped_size); } + // use mmap manager ctor, used in growing segment fixed data type + ColumnBase(size_t reserve, + int dim, + const DataType& data_type, + storage::MmapChunkManagerPtr mcm, + storage::MmapChunkDescriptor descriptor) + : mcm_(mcm), + mmap_descriptor_(descriptor), + type_size_(GetDataTypeSize(data_type, dim)), + num_rows_(0), + size_(0), + cap_size_(reserve), + mapping_type_(MAP_WITH_MANAGER) { + AssertInfo((mcm != nullptr) && descriptor != nullptr, + "use wrong mmap chunk manager and mmap chunk descriptor to " + "create column."); + + SetPaddingSize(data_type); + size_t mapped_size = cap_size_ + padding_; + data_ = (char*)mcm_->Allocate(mmap_descriptor_, (uint64_t)mapped_size); + AssertInfo(data_ != nullptr, + "fail to create with mmap manager: map_size = {}", + mapped_size); + } + // mmap mode ctor // User must call Seal to build the view for variable length column. ColumnBase(const File& file, size_t size, const FieldMeta& field_meta) : type_size_(IsSparseFloatVectorDataType(field_meta.get_data_type()) ? 1 : field_meta.get_sizeof()), - is_map_anonymous_(false), + mapping_type_(MappingType::MAP_WITH_FILE), num_rows_(size / type_size_) { SetPaddingSize(field_meta.get_data_type()); @@ -119,7 +150,7 @@ class ColumnBase { IsSparseFloatVectorDataType(data_type) ? 1 : (size / type_size_)), size_(size), cap_size_(size), - is_map_anonymous_(false) { + mapping_type_(MappingType::MAP_WITH_FILE) { SetPaddingSize(data_type); size_t mapped_size = cap_size_ + padding_; @@ -134,13 +165,15 @@ class ColumnBase { virtual ~ColumnBase() { if (data_ != nullptr) { - size_t mapped_size = cap_size_ + padding_; - if (munmap(data_, mapped_size)) { - AssertInfo(true, - "failed to unmap variable field, err={}", - strerror(errno)); + if (mapping_type_ != MappingType::MAP_WITH_MANAGER) { + size_t mapped_size = cap_size_ + padding_; + if (munmap(data_, mapped_size)) { + AssertInfo(true, + "failed to unmap variable field, err={}", + strerror(errno)); + } } - UpdateMetricWhenMunmap(mapped_size); + UpdateMetricWhenMunmap(cap_size_ + padding_); } } @@ -238,47 +271,63 @@ class ColumnBase { } protected: - // only for memory mode, not mmap + // only for memory mode and mmap manager mode, not mmap void Expand(size_t new_size) { if (new_size == 0) { return; } + AssertInfo( + mapping_type_ == MappingType::MAP_WITH_ANONYMOUS || + mapping_type_ == MappingType::MAP_WITH_MANAGER, + "expand function only use in anonymous or with mmap manager"); + if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) { + size_t new_mapped_size = new_size + padding_; + auto data = static_cast(mmap(nullptr, + new_mapped_size, + PROT_READ | PROT_WRITE, + MAP_PRIVATE | MAP_ANON, + -1, + 0)); + UpdateMetricWhenMmap(true, new_mapped_size); + + AssertInfo(data != MAP_FAILED, + "failed to expand map: {}, new_map_size={}", + strerror(errno), + new_size + padding_); + + if (data_ != nullptr) { + std::memcpy(data, data_, size_); + if (munmap(data_, cap_size_ + padding_)) { + auto err = errno; + size_t mapped_size = new_size + padding_; + munmap(data, mapped_size); + UpdateMetricWhenMunmap(mapped_size); + + AssertInfo( + false, + "failed to unmap while expanding: {}, old_map_size={}", + strerror(err), + cap_size_ + padding_); + } + UpdateMetricWhenMunmap(cap_size_ + padding_); + } - size_t new_mapped_size = new_size + padding_; - auto data = static_cast(mmap(nullptr, - new_mapped_size, - PROT_READ | PROT_WRITE, - MAP_PRIVATE | MAP_ANON, - -1, - 0)); - UpdateMetricWhenMmap(true, new_mapped_size); - - AssertInfo(data != MAP_FAILED, - "failed to expand map: {}, new_map_size={}", - strerror(errno), - new_size + padding_); - - if (data_ != nullptr) { + data_ = data; + cap_size_ = new_size; + mapping_type_ = MappingType::MAP_WITH_ANONYMOUS; + } else if (mapping_type_ == MappingType::MAP_WITH_MANAGER) { + size_t new_mapped_size = new_size + padding_; + auto data = mcm_->Allocate(mmap_descriptor_, new_mapped_size); + AssertInfo(data != nullptr, + "fail to create with mmap manager: map_size = {}", + new_mapped_size); std::memcpy(data, data_, size_); - if (munmap(data_, cap_size_ + padding_)) { - auto err = errno; - size_t mapped_size = new_size + padding_; - munmap(data, mapped_size); - UpdateMetricWhenMunmap(mapped_size); - - AssertInfo( - false, - "failed to unmap while expanding: {}, old_map_size={}", - strerror(err), - cap_size_ + padding_); - } - UpdateMetricWhenMunmap(cap_size_ + padding_); + // allocate space only append in one growing segment, so no need to munmap() + data_ = (char*)data; + cap_size_ = new_size; + mapping_type_ = MappingType::MAP_WITH_MANAGER; } - - data_ = data; - cap_size_ = new_size; - is_map_anonymous_ = true; } char* data_{nullptr}; @@ -291,16 +340,17 @@ class ColumnBase { // length in bytes size_t size_{0}; + storage::MmapChunkDescriptor mmap_descriptor_ = nullptr; private: void UpdateMetricWhenMmap(size_t mmaped_size) { - UpdateMetricWhenMmap(is_map_anonymous_, mmaped_size); + UpdateMetricWhenMmap(mapping_type_, mmaped_size); } void UpdateMetricWhenMmap(bool is_map_anonymous, size_t mapped_size) { - if (is_map_anonymous) { + if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) { milvus::storage::internal_mmap_allocated_space_bytes_anon.Observe( mapped_size); milvus::storage::internal_mmap_in_used_space_bytes_anon.Increment( @@ -315,7 +365,7 @@ class ColumnBase { void UpdateMetricWhenMunmap(size_t mapped_size) { - if (is_map_anonymous_) { + if (mapping_type_ == MappingType::MAP_WITH_ANONYMOUS) { milvus::storage::internal_mmap_in_used_space_bytes_anon.Decrement( mapped_size); } else { @@ -325,8 +375,9 @@ class ColumnBase { } private: - // is MAP_ANONYMOUS - bool is_map_anonymous_; + // mapping_type_ + MappingType mapping_type_; + storage::MmapChunkManagerPtr mcm_ = nullptr; }; class Column : public ColumnBase { @@ -346,6 +397,14 @@ class Column : public ColumnBase { : ColumnBase(file, size, dim, data_type) { } + Column(size_t reserve, + int dim, + const DataType& data_type, + storage::MmapChunkManagerPtr mcm, + storage::MmapChunkDescriptor descriptor) + : ColumnBase(reserve, dim, data_type, mcm, descriptor) { + } + Column(Column&& column) noexcept : ColumnBase(std::move(column)) { } @@ -376,6 +435,14 @@ class SparseFloatColumn : public ColumnBase { const DataType& data_type) : ColumnBase(file, size, dim, data_type) { } + // mmap with mmap manager + SparseFloatColumn(size_t reserve, + int dim, + const DataType& data_type, + storage::MmapChunkManagerPtr mcm, + storage::MmapChunkDescriptor descriptor) + : ColumnBase(reserve, dim, data_type, mcm, descriptor) { + } SparseFloatColumn(SparseFloatColumn&& column) noexcept : ColumnBase(std::move(column)), @@ -471,6 +538,14 @@ class VariableColumn : public ColumnBase { VariableColumn(const File& file, size_t size, const FieldMeta& field_meta) : ColumnBase(file, size, field_meta) { } + // mmap with mmap manager + VariableColumn(size_t reserve, + int dim, + const DataType& data_type, + storage::MmapChunkManagerPtr mcm, + storage::MmapChunkDescriptor descriptor) + : ColumnBase(reserve, dim, data_type, mcm, descriptor) { + } VariableColumn(VariableColumn&& column) noexcept : ColumnBase(std::move(column)), @@ -579,6 +654,14 @@ class ArrayColumn : public ColumnBase { element_type_(field_meta.get_element_type()) { } + ArrayColumn(size_t reserve, + int dim, + const DataType& data_type, + storage::MmapChunkManagerPtr mcm, + storage::MmapChunkDescriptor descriptor) + : ColumnBase(reserve, dim, data_type, mcm, descriptor) { + } + ArrayColumn(ArrayColumn&& column) noexcept : ColumnBase(std::move(column)), indices_(std::move(column.indices_)), diff --git a/internal/core/src/query/visitors/ExecExprVisitor.cpp b/internal/core/src/query/visitors/ExecExprVisitor.cpp index d877bbdd0ecbb..d0b59873ee79a 100644 --- a/internal/core/src/query/visitors/ExecExprVisitor.cpp +++ b/internal/core/src/query/visitors/ExecExprVisitor.cpp @@ -2378,7 +2378,10 @@ ExecExprVisitor::ExecCompareExprDispatcher(CompareExpr& expr, Op op) } case DataType::VARCHAR: { if (chunk_id < data_barrier) { - if (segment_.type() == SegmentType::Growing) { + if (segment_.type() == SegmentType::Growing && + !storage::MmapManager::GetInstance() + .GetMmapConfig() + .growing_enable_mmap) { auto chunk_data = segment_ .chunk_data(field_id, chunk_id) diff --git a/internal/core/src/segcore/ConcurrentVector.h b/internal/core/src/segcore/ConcurrentVector.h index f72c44d623d67..81feb57869664 100644 --- a/internal/core/src/segcore/ConcurrentVector.h +++ b/internal/core/src/segcore/ConcurrentVector.h @@ -32,6 +32,7 @@ #include "common/Span.h" #include "common/Types.h" #include "common/Utils.h" +#include "mmap/ChunkVector.h" namespace milvus::segcore { @@ -124,6 +125,9 @@ class VectorBase { virtual const void* get_chunk_data(ssize_t chunk_index) const = 0; + virtual int64_t + get_chunk_size(ssize_t chunk_index) const = 0; + virtual ssize_t num_chunk() const = 0; @@ -140,8 +144,6 @@ class VectorBase { template class ConcurrentVectorImpl : public VectorBase { public: - // constants - using Chunk = FixedVector; ConcurrentVectorImpl(ConcurrentVectorImpl&&) = delete; ConcurrentVectorImpl(const ConcurrentVectorImpl&) = delete; @@ -164,43 +166,42 @@ class ConcurrentVectorImpl : public VectorBase { BinaryVector>>>>; public: - explicit ConcurrentVectorImpl(ssize_t elements_per_row, - int64_t size_per_chunk) + explicit ConcurrentVectorImpl( + ssize_t elements_per_row, + int64_t size_per_chunk, + storage::MmapChunkDescriptor mmap_descriptor = nullptr) : VectorBase(size_per_chunk), elements_per_row_(is_type_entire_row ? 1 : elements_per_row) { + chunks_ptr_ = SelectChunkVectorPtr(mmap_descriptor); } - Span - get_span(int64_t chunk_id) const { - auto& chunk = get_chunk(chunk_id); + SpanBase + get_span_base(int64_t chunk_id) const override { if constexpr (is_type_entire_row) { - return Span(chunk.data(), chunk.size()); + return chunks_ptr_->get_span(chunk_id); } else if constexpr (std::is_same_v || // NOLINT std::is_same_v) { // only for testing PanicInfo(NotImplemented, "unimplemented"); } else { + auto chunk_data = chunks_ptr_->get_chunk_data(chunk_id); + auto chunk_size = chunks_ptr_->get_chunk_size(chunk_id); static_assert( std::is_same_v); return Span( - chunk.data(), chunk.size(), elements_per_row_); + static_cast(chunk_data), chunk_size, elements_per_row_); } } - SpanBase - get_span_base(int64_t chunk_id) const override { - return get_span(chunk_id); - } - void fill_chunk_data(const std::vector& datas) override { - AssertInfo(chunks_.size() == 0, "non empty concurrent vector"); + AssertInfo(chunks_ptr_->size() == 0, "non empty concurrent vector"); int64_t element_count = 0; for (auto& field_data : datas) { element_count += field_data->get_num_rows(); } - chunks_.emplace_to_at_least(1, elements_per_row_ * element_count); + chunks_ptr_->emplace_to_at_least(1, elements_per_row_ * element_count); int64_t offset = 0; for (auto& field_data : datas) { auto num_rows = field_data->get_num_rows(); @@ -227,26 +228,21 @@ class ConcurrentVectorImpl : public VectorBase { if (element_count == 0) { return; } - chunks_.emplace_to_at_least( + chunks_ptr_->emplace_to_at_least( upper_div(element_offset + element_count, size_per_chunk_), elements_per_row_ * size_per_chunk_); set_data( element_offset, static_cast(source), element_count); } - const Chunk& - get_chunk(ssize_t chunk_index) const { - return chunks_[chunk_index]; - } - - Chunk& - get_chunk(ssize_t index) { - return chunks_[index]; - } - const void* get_chunk_data(ssize_t chunk_index) const override { - return chunks_[chunk_index].data(); + return (const void*)chunks_ptr_->get_chunk_data(chunk_index); + } + + int64_t + get_chunk_size(ssize_t chunk_index) const override { + return chunks_ptr_->get_chunk_size(chunk_index); } // just for fun, don't use it directly @@ -254,7 +250,9 @@ class ConcurrentVectorImpl : public VectorBase { get_element(ssize_t element_index) const { auto chunk_id = element_index / size_per_chunk_; auto chunk_offset = element_index % size_per_chunk_; - return get_chunk(chunk_id).data() + chunk_offset * elements_per_row_; + auto data = + static_cast(chunks_ptr_->get_chunk_data(chunk_id)); + return data + chunk_offset * elements_per_row_; } const Type& @@ -266,18 +264,20 @@ class ConcurrentVectorImpl : public VectorBase { elements_per_row_)); auto chunk_id = element_index / size_per_chunk_; auto chunk_offset = element_index % size_per_chunk_; - return get_chunk(chunk_id)[chunk_offset]; + auto data = + static_cast(chunks_ptr_->get_chunk_data(chunk_id)); + return data[chunk_offset]; } ssize_t num_chunk() const override { - return chunks_.size(); + return chunks_ptr_->size(); } bool empty() override { - for (size_t i = 0; i < chunks_.size(); i++) { - if (get_chunk(i).size() > 0) { + for (size_t i = 0; i < chunks_ptr_->size(); i++) { + if (chunks_ptr_->get_chunk_size(i) > 0) { return false; } } @@ -287,7 +287,7 @@ class ConcurrentVectorImpl : public VectorBase { void clear() override { - chunks_.clear(); + chunks_ptr_->clear(); } private: @@ -336,33 +336,88 @@ class ConcurrentVectorImpl : public VectorBase { if (element_count <= 0) { return; } - auto chunk_num = chunks_.size(); + auto chunk_num = chunks_ptr_->size(); AssertInfo( chunk_id < chunk_num, fmt::format("chunk_id out of chunk num, chunk_id={}, chunk_num={}", chunk_id, chunk_num)); - Chunk& chunk = chunks_[chunk_id]; - auto ptr = chunk.data(); - - std::copy_n(source + source_offset * elements_per_row_, - element_count * elements_per_row_, - ptr + chunk_offset * elements_per_row_); + chunks_ptr_->copy_to_chunk(chunk_id, + chunk_offset * elements_per_row_, + source + source_offset * elements_per_row_, + element_count * elements_per_row_); } + protected: const ssize_t elements_per_row_; - - private: - ThreadSafeVector chunks_; + ChunkVectorPtr chunks_ptr_ = nullptr; }; template class ConcurrentVector : public ConcurrentVectorImpl { public: static_assert(IsScalar || std::is_same_v); - explicit ConcurrentVector(int64_t size_per_chunk) + explicit ConcurrentVector( + int64_t size_per_chunk, + storage::MmapChunkDescriptor mmap_descriptor = nullptr) : ConcurrentVectorImpl::ConcurrentVectorImpl( - 1, size_per_chunk) { + 1, size_per_chunk, mmap_descriptor) { + } +}; + +template <> +class ConcurrentVector + : public ConcurrentVectorImpl { + public: + explicit ConcurrentVector( + int64_t size_per_chunk, + storage::MmapChunkDescriptor mmap_descriptor = nullptr) + : ConcurrentVectorImpl::ConcurrentVectorImpl( + 1, size_per_chunk, mmap_descriptor) { + } + + std::string_view + view_element(ssize_t element_index) const { + auto chunk_id = element_index / size_per_chunk_; + auto chunk_offset = element_index % size_per_chunk_; + return chunks_ptr_->view_element(chunk_id, chunk_offset); + } +}; + +template <> +class ConcurrentVector : public ConcurrentVectorImpl { + public: + explicit ConcurrentVector( + int64_t size_per_chunk, + storage::MmapChunkDescriptor mmap_descriptor = nullptr) + : ConcurrentVectorImpl::ConcurrentVectorImpl( + 1, size_per_chunk, mmap_descriptor) { + } + + std::string_view + view_element(ssize_t element_index) const { + auto chunk_id = element_index / size_per_chunk_; + auto chunk_offset = element_index % size_per_chunk_; + return std::string_view( + chunks_ptr_->view_element(chunk_id, chunk_offset).data()); + } +}; + +template <> +class ConcurrentVector : public ConcurrentVectorImpl { + public: + explicit ConcurrentVector( + int64_t size_per_chunk, + storage::MmapChunkDescriptor mmap_descriptor = nullptr) + : ConcurrentVectorImpl::ConcurrentVectorImpl( + 1, size_per_chunk, mmap_descriptor) { + } + + ArrayView + view_element(ssize_t element_index) const { + auto chunk_id = element_index / size_per_chunk_; + auto chunk_offset = element_index % size_per_chunk_; + return chunks_ptr_->view_element(chunk_id, chunk_offset); } }; @@ -370,9 +425,13 @@ template <> class ConcurrentVector : public ConcurrentVectorImpl, true> { public: - explicit ConcurrentVector(int64_t size_per_chunk) + explicit ConcurrentVector( + int64_t size_per_chunk, + storage::MmapChunkDescriptor mmap_descriptor = nullptr) : ConcurrentVectorImpl, - true>::ConcurrentVectorImpl(1, size_per_chunk), + true>::ConcurrentVectorImpl(1, + size_per_chunk, + mmap_descriptor), dim_(0) { } @@ -404,9 +463,11 @@ template <> class ConcurrentVector : public ConcurrentVectorImpl { public: - ConcurrentVector(int64_t dim, int64_t size_per_chunk) + ConcurrentVector(int64_t dim, + int64_t size_per_chunk, + storage::MmapChunkDescriptor mmap_descriptor = nullptr) : ConcurrentVectorImpl::ConcurrentVectorImpl( - dim, size_per_chunk) { + dim, size_per_chunk, mmap_descriptor) { } }; @@ -414,8 +475,11 @@ template <> class ConcurrentVector : public ConcurrentVectorImpl { public: - explicit ConcurrentVector(int64_t dim, int64_t size_per_chunk) - : ConcurrentVectorImpl(dim / 8, size_per_chunk) { + explicit ConcurrentVector( + int64_t dim, + int64_t size_per_chunk, + storage::MmapChunkDescriptor mmap_descriptor = nullptr) + : ConcurrentVectorImpl(dim / 8, size_per_chunk, mmap_descriptor) { AssertInfo(dim % 8 == 0, fmt::format("dim is not a multiple of 8, dim={}", dim)); } @@ -425,9 +489,11 @@ template <> class ConcurrentVector : public ConcurrentVectorImpl { public: - ConcurrentVector(int64_t dim, int64_t size_per_chunk) + ConcurrentVector(int64_t dim, + int64_t size_per_chunk, + storage::MmapChunkDescriptor mmap_descriptor = nullptr) : ConcurrentVectorImpl::ConcurrentVectorImpl( - dim, size_per_chunk) { + dim, size_per_chunk, mmap_descriptor) { } }; @@ -435,9 +501,11 @@ template <> class ConcurrentVector : public ConcurrentVectorImpl { public: - ConcurrentVector(int64_t dim, int64_t size_per_chunk) + ConcurrentVector(int64_t dim, + int64_t size_per_chunk, + storage::MmapChunkDescriptor mmap_descriptor = nullptr) : ConcurrentVectorImpl::ConcurrentVectorImpl( - dim, size_per_chunk) { + dim, size_per_chunk, mmap_descriptor) { } }; diff --git a/internal/core/src/segcore/FieldIndexing.cpp b/internal/core/src/segcore/FieldIndexing.cpp index 478d958c48737..eb81947fcdba4 100644 --- a/internal/core/src/segcore/FieldIndexing.cpp +++ b/internal/core/src/segcore/FieldIndexing.cpp @@ -65,13 +65,13 @@ VectorFieldIndexing::BuildIndexRange(int64_t ack_beg, auto conf = get_build_params(); data_.grow_to_at_least(ack_end); for (int chunk_id = ack_beg; chunk_id < ack_end; chunk_id++) { - const auto& chunk = source->get_chunk(chunk_id); + const auto& chunk_data = source->get_chunk_data(chunk_id); auto indexing = std::make_unique>( knowhere::IndexEnum::INDEX_FAISS_IVFFLAT, knowhere::metric::L2, knowhere::Version::GetCurrentVersion().VersionNumber()); - auto dataset = knowhere::GenDataSet( - source->get_size_per_chunk(), dim, chunk.data()); + auto dataset = + knowhere::GenDataSet(source->get_size_per_chunk(), dim, chunk_data); indexing->BuildWithDataset(dataset, conf); data_[chunk_id] = std::move(indexing); } @@ -297,16 +297,18 @@ ScalarFieldIndexing::BuildIndexRange(int64_t ack_beg, AssertInfo(ack_end <= num_chunk, "Ack_end is bigger than num_chunk"); data_.grow_to_at_least(ack_end); for (int chunk_id = ack_beg; chunk_id < ack_end; chunk_id++) { - const auto& chunk = source->get_chunk(chunk_id); + auto chunk_data = source->get_chunk_data(chunk_id); // build index for chunk // TODO if constexpr (std::is_same_v) { auto indexing = index::CreateStringIndexSort(); - indexing->Build(vec_base->get_size_per_chunk(), chunk.data()); + indexing->Build(vec_base->get_size_per_chunk(), + static_cast(chunk_data)); data_[chunk_id] = std::move(indexing); } else { auto indexing = index::CreateScalarIndexSort(); - indexing->Build(vec_base->get_size_per_chunk(), chunk.data()); + indexing->Build(vec_base->get_size_per_chunk(), + static_cast(chunk_data)); data_[chunk_id] = std::move(indexing); } } diff --git a/internal/core/src/segcore/InsertRecord.h b/internal/core/src/segcore/InsertRecord.h index 7d62e303eeda8..ceeaacd4faaa5 100644 --- a/internal/core/src/segcore/InsertRecord.h +++ b/internal/core/src/segcore/InsertRecord.h @@ -30,6 +30,7 @@ #include "segcore/AckResponder.h" #include "segcore/ConcurrentVector.h" #include "segcore/Record.h" +#include "storage/MmapManager.h" namespace milvus::segcore { @@ -292,8 +293,10 @@ class OffsetOrderedArray : public OffsetMap { template struct InsertRecord { - InsertRecord(const Schema& schema, int64_t size_per_chunk) - : timestamps_(size_per_chunk) { + InsertRecord(const Schema& schema, + const int64_t size_per_chunk, + const storage::MmapChunkDescriptor mmap_descriptor = nullptr) + : timestamps_(size_per_chunk), mmap_descriptor_(mmap_descriptor) { std::optional pk_field_id = schema.get_primary_field_id(); for (auto& field : schema) { @@ -303,7 +306,7 @@ struct InsertRecord { pk_field_id.value() == field_id) { switch (field_meta.get_data_type()) { case DataType::INT64: { - if (is_sealed) { + if constexpr (is_sealed) { pk2offset_ = std::make_unique>(); } else { @@ -313,7 +316,7 @@ struct InsertRecord { break; } case DataType::VARCHAR: { - if (is_sealed) { + if constexpr (is_sealed) { pk2offset_ = std::make_unique< OffsetOrderedArray>(); } else { @@ -532,6 +535,9 @@ struct InsertRecord { AssertInfo(fields_data_.find(field_id) != fields_data_.end(), "Cannot find field_data with field_id: " + std::to_string(field_id.get())); + AssertInfo( + fields_data_.at(field_id) != nullptr, + "fields_data_ at i is null" + std::to_string(field_id.get())); return fields_data_.at(field_id).get(); } @@ -560,8 +566,9 @@ struct InsertRecord { void append_field_data(FieldId field_id, int64_t size_per_chunk) { static_assert(IsScalar || IsSparse); - fields_data_.emplace( - field_id, std::make_unique>(size_per_chunk)); + fields_data_.emplace(field_id, + std::make_unique>( + size_per_chunk, mmap_descriptor_)); } // append a column of vector type @@ -571,7 +578,7 @@ struct InsertRecord { static_assert(std::is_base_of_v); fields_data_.emplace(field_id, std::make_unique>( - dim, size_per_chunk)); + dim, size_per_chunk, mmap_descriptor_)); } void @@ -620,6 +627,7 @@ struct InsertRecord { private: std::unordered_map> fields_data_{}; mutable std::shared_mutex shared_mutex_{}; + storage::MmapChunkDescriptor mmap_descriptor_; }; } // namespace milvus::segcore diff --git a/internal/core/src/segcore/SegmentGrowingImpl.cpp b/internal/core/src/segcore/SegmentGrowingImpl.cpp index d8cd057f28be7..2e3ae50fe6514 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.cpp +++ b/internal/core/src/segcore/SegmentGrowingImpl.cpp @@ -663,7 +663,11 @@ SegmentGrowingImpl::bulk_subscript_ptr_impl( auto& src = *vec; for (int64_t i = 0; i < count; ++i) { auto offset = seg_offsets[i]; - dst->at(i) = std::move(T(src[offset])); + if (IsVariableTypeSupportInChunk && mmap_descriptor_ != nullptr) { + dst->at(i) = std::move(T(src.view_element(offset))); + } else { + dst->at(i) = std::move(T(src[offset])); + } } } diff --git a/internal/core/src/segcore/SegmentGrowingImpl.h b/internal/core/src/segcore/SegmentGrowingImpl.h index 06f9048d5ae22..ce62566b08a73 100644 --- a/internal/core/src/segcore/SegmentGrowingImpl.h +++ b/internal/core/src/segcore/SegmentGrowingImpl.h @@ -209,12 +209,35 @@ class SegmentGrowingImpl : public SegmentGrowing { IndexMetaPtr indexMeta, const SegcoreConfig& segcore_config, int64_t segment_id) - : segcore_config_(segcore_config), + : mmap_descriptor_(storage::MmapManager::GetInstance() + .GetMmapConfig() + .GetEnableGrowingMmap() + ? storage::MmapChunkDescriptor( + new storage::MmapChunkDescriptorValue( + {segment_id, SegmentType::Growing})) + : nullptr), + segcore_config_(segcore_config), schema_(std::move(schema)), index_meta_(indexMeta), - insert_record_(*schema_, segcore_config.get_chunk_rows()), + insert_record_( + *schema_, segcore_config.get_chunk_rows(), mmap_descriptor_), indexing_record_(*schema_, index_meta_, segcore_config_), id_(segment_id) { + if (mmap_descriptor_ != nullptr) { + LOG_INFO("growing segment {} use mmap to hold raw data", + this->get_segment_id()); + auto mcm = + storage::MmapManager::GetInstance().GetMmapChunkManager(); + mcm->Register(mmap_descriptor_); + } + } + + ~SegmentGrowingImpl() { + if (mmap_descriptor_ != nullptr) { + auto mcm = + storage::MmapManager::GetInstance().GetMmapChunkManager(); + mcm->UnRegister(mmap_descriptor_); + } } void @@ -294,6 +317,7 @@ class SegmentGrowingImpl : public SegmentGrowing { } private: + storage::MmapChunkDescriptor mmap_descriptor_ = nullptr; SegcoreConfig segcore_config_; SchemaPtr schema_; IndexMetaPtr index_meta_; diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index b6cac4f9371e5..1422fde332ed1 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -48,7 +48,7 @@ #include "query/SearchOnSealed.h" #include "storage/Util.h" #include "storage/ThreadPools.h" -#include "storage/ChunkCacheSingleton.h" +#include "storage/MmapManager.h" namespace milvus::segcore { @@ -151,9 +151,9 @@ SegmentSealedImpl::WarmupChunkCache(const FieldId field_id) { id_); auto field_info = it->second; - auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache(); + auto cc = storage::MmapManager::GetInstance().GetChunkCache(); for (const auto& data_path : field_info.insert_files) { - auto column = cc->Read(data_path); + auto column = cc->Read(data_path, mmap_descriptor_); } } @@ -819,8 +819,10 @@ SegmentSealedImpl::GetFieldDataPath(FieldId field_id, int64_t offset) const { } std::tuple> static ReadFromChunkCache( - const storage::ChunkCachePtr& cc, const std::string& data_path) { - auto column = cc->Read(data_path); + const storage::ChunkCachePtr& cc, + const std::string& data_path, + const storage::MmapChunkDescriptor& descriptor) { + auto column = cc->Read(data_path, descriptor); cc->Prefetch(data_path); return {data_path, column}; } @@ -864,7 +866,7 @@ SegmentSealedImpl::get_vector(FieldId field_id, } // If index doesn't have raw data, get vector from chunk cache. - auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache(); + auto cc = storage::MmapManager::GetInstance().GetChunkCache(); // group by data_path auto id_to_data_path = @@ -885,7 +887,8 @@ SegmentSealedImpl::get_vector(FieldId field_id, futures.reserve(path_to_column.size()); for (const auto& iter : path_to_column) { const auto& data_path = iter.first; - futures.emplace_back(pool.Submit(ReadFromChunkCache, cc, data_path)); + futures.emplace_back( + pool.Submit(ReadFromChunkCache, cc, data_path, mmap_descriptor_)); } for (int i = 0; i < futures.size(); ++i) { @@ -1029,10 +1032,15 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema, id_(segment_id), col_index_meta_(index_meta), TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) { + mmap_descriptor_ = std::shared_ptr( + new storage::MmapChunkDescriptorValue( + {segment_id, SegmentType::Sealed})); + auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager(); + mcm->Register(mmap_descriptor_); } SegmentSealedImpl::~SegmentSealedImpl() { - auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache(); + auto cc = storage::MmapManager::GetInstance().GetChunkCache(); if (cc == nullptr) { return; } @@ -1042,6 +1050,10 @@ SegmentSealedImpl::~SegmentSealedImpl() { cc->Remove(binlog); } } + if (mmap_descriptor_ != nullptr) { + auto mm = storage::MmapManager::GetInstance().GetMmapChunkManager(); + mm->UnRegister(mmap_descriptor_); + } } void @@ -1161,7 +1173,7 @@ SegmentSealedImpl::ClearData() { variable_fields_avg_size_.clear(); stats_.mem_size = 0; } - auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache(); + auto cc = storage::MmapManager::GetInstance().GetChunkCache(); if (cc == nullptr) { return; } @@ -1674,7 +1686,7 @@ SegmentSealedImpl::generate_interim_index(const FieldId field_id) { } void SegmentSealedImpl::RemoveFieldFile(const FieldId field_id) { - auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache(); + auto cc = storage::MmapManager::GetInstance().GetChunkCache(); if (cc == nullptr) { return; } diff --git a/internal/core/src/segcore/SegmentSealedImpl.h b/internal/core/src/segcore/SegmentSealedImpl.h index dcae46e55b670..2f79645337623 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.h +++ b/internal/core/src/segcore/SegmentSealedImpl.h @@ -279,6 +279,8 @@ class SegmentSealedImpl : public SegmentSealed { generate_interim_index(const FieldId field_id); private: + // mmap descriptor, used in chunk cache + storage::MmapChunkDescriptor mmap_descriptor_ = nullptr; // segment loading state BitsetType field_data_ready_bitset_; BitsetType index_ready_bitset_; diff --git a/internal/core/src/storage/CMakeLists.txt b/internal/core/src/storage/CMakeLists.txt index ac600d945b8af..ab4292cecd835 100644 --- a/internal/core/src/storage/CMakeLists.txt +++ b/internal/core/src/storage/CMakeLists.txt @@ -56,7 +56,8 @@ set(STORAGE_FILES ThreadPools.cpp ChunkCache.cpp TencentCloudCredentialsProvider.cpp - TencentCloudSTSClient.cpp) + TencentCloudSTSClient.cpp + MmapChunkManager.cpp) if(USE_OPENDAL) list(APPEND STORAGE_FILES OpenDALChunkManager.cpp) diff --git a/internal/core/src/storage/ChunkCache.cpp b/internal/core/src/storage/ChunkCache.cpp index 4fe8592a7233a..d579c3ce228c4 100644 --- a/internal/core/src/storage/ChunkCache.cpp +++ b/internal/core/src/storage/ChunkCache.cpp @@ -19,14 +19,12 @@ #include "mmap/Utils.h" namespace milvus::storage { - std::shared_ptr -ChunkCache::Read(const std::string& filepath) { - auto path = CachePath(filepath); - +ChunkCache::Read(const std::string& filepath, + const MmapChunkDescriptor& descriptor) { { std::shared_lock lck(mutex_); - auto it = columns_.find(path); + auto it = columns_.find(filepath); if (it != columns_.end()) { AssertInfo(it->second, "unexpected null column, file={}", filepath); return it->second; @@ -36,29 +34,26 @@ ChunkCache::Read(const std::string& filepath) { auto field_data = DownloadAndDecodeRemoteFile(cm_.get(), filepath); std::unique_lock lck(mutex_); - auto it = columns_.find(path); + auto it = columns_.find(filepath); if (it != columns_.end()) { return it->second; } - auto column = Mmap(path, field_data->GetFieldData()); + auto column = Mmap(field_data->GetFieldData(), descriptor); AssertInfo(column, "unexpected null column, file={}", filepath); - columns_.emplace(path, column); + columns_.emplace(filepath, column); return column; } void ChunkCache::Remove(const std::string& filepath) { - auto path = CachePath(filepath); std::unique_lock lck(mutex_); - columns_.erase(path); + columns_.erase(filepath); } void ChunkCache::Prefetch(const std::string& filepath) { - auto path = CachePath(filepath); - std::shared_lock lck(mutex_); - auto it = columns_.find(path); + auto it = columns_.find(filepath); if (it == columns_.end()) { return; } @@ -68,35 +63,23 @@ ChunkCache::Prefetch(const std::string& filepath) { reinterpret_cast(const_cast(column->MmappedData())), column->ByteSize(), read_ahead_policy_); - AssertInfo(ok == 0, - "failed to madvise to the data file {}, err: {}", - path, - strerror(errno)); + if (ok != 0) { + LOG_WARN( + "failed to madvise to the data file {}, addr {}, size {}, err: {}", + filepath, + column->MmappedData(), + column->ByteSize(), + strerror(errno)); + } } std::shared_ptr -ChunkCache::Mmap(const std::filesystem::path& path, - const FieldDataPtr& field_data) { - auto dir = path.parent_path(); - std::filesystem::create_directories(dir); - +ChunkCache::Mmap(const FieldDataPtr& field_data, + const MmapChunkDescriptor& descriptor) { auto dim = field_data->get_dim(); auto data_type = field_data->get_data_type(); - auto file = File::Open(path.string(), O_CREAT | O_TRUNC | O_RDWR); - - // write the field data to disk auto data_size = field_data->Size(); - // unused - std::vector> element_indices{}; - auto written = WriteFieldData(file, data_type, field_data, element_indices); - AssertInfo(written == data_size, - "failed to write data file {}, written " - "{} but total {}, err: {}", - path.c_str(), - written, - data_size, - strerror(errno)); std::shared_ptr column{}; @@ -108,40 +91,17 @@ ChunkCache::Mmap(const std::filesystem::path& path, offset += field_data->Size(i); } auto sparse_column = std::make_shared( - file, data_size, dim, data_type); + data_size, dim, data_type, mcm_, descriptor); sparse_column->Seal(std::move(indices)); column = std::move(sparse_column); } else if (IsVariableDataType(data_type)) { AssertInfo( false, "TODO: unimplemented for variable data type: {}", data_type); } else { - column = std::make_shared(file, data_size, dim, data_type); + column = std::make_shared( + data_size, dim, data_type, mcm_, descriptor); } - - // unlink - auto ok = unlink(path.c_str()); - AssertInfo(ok == 0, - "failed to unlink mmap data file {}, err: {}", - path.c_str(), - strerror(errno)); - + column->AppendBatch(field_data); return column; } - -std::string -ChunkCache::CachePath(const std::string& filepath) { - auto path = std::filesystem::path(filepath); - auto prefix = std::filesystem::path(path_prefix_); - - // Cache path shall not use absolute filepath direct, it shall always under path_prefix_ - if (path.is_absolute()) { - return (prefix / - filepath.substr(path.root_directory().string().length(), - filepath.length())) - .string(); - } - - return (prefix / filepath).string(); -} - } // namespace milvus::storage diff --git a/internal/core/src/storage/ChunkCache.h b/internal/core/src/storage/ChunkCache.h index 2dd24496b9a7a..d29fddbb92c9e 100644 --- a/internal/core/src/storage/ChunkCache.h +++ b/internal/core/src/storage/ChunkCache.h @@ -15,7 +15,7 @@ // limitations under the License. #pragma once - +#include "storage/MmapChunkManager.h" #include "mmap/Column.h" namespace milvus::storage { @@ -24,10 +24,10 @@ extern std::map ReadAheadPolicy_Map; class ChunkCache { public: - explicit ChunkCache(std::string path, - const std::string& read_ahead_policy, - ChunkManagerPtr cm) - : path_prefix_(std::move(path)), cm_(cm) { + explicit ChunkCache(const std::string& read_ahead_policy, + ChunkManagerPtr cm, + MmapChunkManagerPtr mcm) + : cm_(cm), mcm_(mcm) { auto iter = ReadAheadPolicy_Map.find(read_ahead_policy); AssertInfo(iter != ReadAheadPolicy_Map.end(), "unrecognized read ahead policy: {}, " @@ -35,8 +35,7 @@ class ChunkCache { "willneed, dontneed`", read_ahead_policy); read_ahead_policy_ = iter->second; - LOG_INFO("Init ChunkCache with prefix: {}, read_ahead_policy: {}", - path_prefix_, + LOG_INFO("Init ChunkCache with read_ahead_policy: {}", read_ahead_policy); } @@ -44,7 +43,7 @@ class ChunkCache { public: std::shared_ptr - Read(const std::string& filepath); + Read(const std::string& filepath, const MmapChunkDescriptor& descriptor); void Remove(const std::string& filepath); @@ -54,7 +53,7 @@ class ChunkCache { private: std::shared_ptr - Mmap(const std::filesystem::path& path, const FieldDataPtr& field_data); + Mmap(const FieldDataPtr& field_data, const MmapChunkDescriptor& descriptor); std::string CachePath(const std::string& filepath); @@ -66,8 +65,8 @@ class ChunkCache { private: mutable std::shared_mutex mutex_; int read_ahead_policy_; - const std::string path_prefix_; ChunkManagerPtr cm_; + MmapChunkManagerPtr mcm_; ColumnTable columns_; }; diff --git a/internal/core/src/storage/ChunkCacheSingleton.h b/internal/core/src/storage/ChunkCacheSingleton.h deleted file mode 100644 index c1abfb7379621..0000000000000 --- a/internal/core/src/storage/ChunkCacheSingleton.h +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include "ChunkCache.h" -#include "RemoteChunkManagerSingleton.h" - -namespace milvus::storage { - -class ChunkCacheSingleton { - private: - ChunkCacheSingleton() { - } - - public: - ChunkCacheSingleton(const ChunkCacheSingleton&) = delete; - ChunkCacheSingleton& - operator=(const ChunkCacheSingleton&) = delete; - - static ChunkCacheSingleton& - GetInstance() { - static ChunkCacheSingleton instance; - return instance; - } - - void - Init(std::string root_path, std::string read_ahead_policy) { - if (cc_ == nullptr) { - auto rcm = RemoteChunkManagerSingleton::GetInstance() - .GetRemoteChunkManager(); - cc_ = std::make_shared( - std::move(root_path), std::move(read_ahead_policy), rcm); - } - } - - ChunkCachePtr - GetChunkCache() { - return cc_; - } - - private: - ChunkCachePtr cc_ = nullptr; -}; - -} // namespace milvus::storage \ No newline at end of file diff --git a/internal/core/src/storage/DiskFileManagerImpl.cpp b/internal/core/src/storage/DiskFileManagerImpl.cpp index e4eef55100e61..1d780a5225d56 100644 --- a/internal/core/src/storage/DiskFileManagerImpl.cpp +++ b/internal/core/src/storage/DiskFileManagerImpl.cpp @@ -490,10 +490,9 @@ DiskFileManagerImpl::CacheRawDataToDisk(std::vector remote_files) { if (data_type == milvus::DataType::VECTOR_SPARSE_FLOAT) { dim = std::max( dim, - (uint32_t)( - std::dynamic_pointer_cast>( - field_data) - ->Dim())); + (uint32_t)(std::dynamic_pointer_cast< + FieldData>(field_data) + ->Dim())); auto sparse_rows = static_cast*>( field_data->Data()); diff --git a/internal/core/src/storage/MmapChunkManager.cpp b/internal/core/src/storage/MmapChunkManager.cpp new file mode 100644 index 0000000000000..f7d71665916d8 --- /dev/null +++ b/internal/core/src/storage/MmapChunkManager.cpp @@ -0,0 +1,299 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "storage/MmapChunkManager.h" +#include "storage/LocalChunkManagerSingleton.h" +#include +#include +#include +#include "stdio.h" +#include +#include "log/Log.h" +#include "storage/prometheus_client.h" + +namespace milvus::storage { +namespace { +static constexpr int kMmapDefaultProt = PROT_WRITE | PROT_READ; +static constexpr int kMmapDefaultFlags = MAP_SHARED; +}; // namespace + +// todo(cqy): After confirming the append parallelism of multiple fields, adjust the lock granularity. + +MmapBlock::MmapBlock(const std::string& file_name, + const uint64_t file_size, + BlockType type) + : file_name_(file_name), + file_size_(file_size), + block_type_(type), + is_valid_(false) { +} + +void +MmapBlock::Init() { + std::lock_guard lock(file_mutex_); + if (is_valid_ == true) { + LOG_WARN("This mmap block has been init."); + return; + } + // create tmp file + int fd = open(file_name_.c_str(), O_RDWR | O_CREAT, S_IRUSR | S_IWUSR); + if (fd == -1) { + PanicInfo(ErrorCode::FileCreateFailed, "Failed to open mmap tmp file"); + } + // append file size to 'file_size' + if (lseek(fd, file_size_ - 1, SEEK_SET) == -1) { + PanicInfo(ErrorCode::FileReadFailed, "Failed to seek mmap tmp file"); + } + if (write(fd, "", 1) == -1) { + PanicInfo(ErrorCode::FileWriteFailed, "Failed to write mmap tmp file"); + } + // memory mmaping + addr_ = static_cast( + mmap(nullptr, file_size_, kMmapDefaultProt, kMmapDefaultFlags, fd, 0)); + if (addr_ == MAP_FAILED) { + PanicInfo(ErrorCode::MmapError, "Failed to mmap in mmap_block"); + } + offset_.store(0); + close(fd); + + milvus::storage::internal_mmap_allocated_space_bytes_file.Observe( + file_size_); + milvus::storage::internal_mmap_in_used_space_bytes_file.Increment( + file_size_); + is_valid_ = true; + allocated_size_.fetch_add(file_size_); +} + +void +MmapBlock::Close() { + std::lock_guard lock(file_mutex_); + if (is_valid_ == false) { + LOG_WARN("This mmap block has been closed."); + return; + } + if (addr_ != nullptr) { + if (munmap(addr_, file_size_) != 0) { + PanicInfo(ErrorCode::MemAllocateSizeNotMatch, + "Failed to munmap in mmap_block"); + } + } + if (access(file_name_.c_str(), F_OK) == 0) { + if (remove(file_name_.c_str()) != 0) { + PanicInfo(ErrorCode::MmapError, "Failed to munmap in mmap_block"); + } + } + allocated_size_.fetch_sub(file_size_); + milvus::storage::internal_mmap_in_used_space_bytes_file.Decrement( + file_size_); + is_valid_ = false; +} + +MmapBlock::~MmapBlock() { + if (is_valid_ == true) { + try { + Close(); + } catch (const std::exception& e) { + LOG_ERROR(e.what()); + } + } +} + +void* +MmapBlock::Get(const uint64_t size) { + AssertInfo(is_valid_, "Fail to get memory from invalid MmapBlock."); + if (file_size_ - offset_.load() < size) { + return nullptr; + } else { + return (void*)(addr_ + offset_.fetch_add(size)); + } +} + +MmapBlockPtr +MmapBlocksHandler::AllocateFixSizeBlock() { + if (fix_size_blocks_cache_.size() != 0) { + // return a mmap_block in fix_size_blocks_cache_ + auto block = std::move(fix_size_blocks_cache_.front()); + fix_size_blocks_cache_.pop(); + return std::move(block); + } else { + // if space not enough for create a new block, clear cache and check again + if (GetFixFileSize() + Size() > max_disk_limit_) { + PanicInfo( + ErrorCode::MemAllocateSizeNotMatch, + "Failed to create a new mmap_block, not enough disk for " + "create a new mmap block. Allocated size: {}, Max size: {}", + Size(), + max_disk_limit_); + } + auto new_block = std::make_unique( + GetMmapFilePath(), GetFixFileSize(), MmapBlock::BlockType::Fixed); + new_block->Init(); + return std::move(new_block); + } +} + +MmapBlockPtr +MmapBlocksHandler::AllocateLargeBlock(const uint64_t size) { + if (size + Capacity() > max_disk_limit_) { + ClearCache(); + } + if (size + Size() > max_disk_limit_) { + PanicInfo(ErrorCode::MemAllocateSizeNotMatch, + "Failed to create a new mmap_block, not enough disk for " + "create a new mmap block. Allocated size: {}, Max size: {}", + Size(), + max_disk_limit_); + } + auto new_block = std::make_unique( + GetMmapFilePath(), size, MmapBlock::BlockType::Variable); + new_block->Init(); + return std::move(new_block); +} + +void +MmapBlocksHandler::Deallocate(MmapBlockPtr&& block) { + if (block->GetType() == MmapBlock::BlockType::Fixed) { + // store the mmap block in cache + block->Reset(); + fix_size_blocks_cache_.push(std::move(block)); + uint64_t max_cache_size = + uint64_t(cache_threshold * (float)max_disk_limit_); + if (fix_size_blocks_cache_.size() * fix_mmap_file_size_ > + max_cache_size) { + FitCache(max_cache_size); + } + } else { + // release the mmap block + block->Close(); + block = nullptr; + } +} + +void +MmapBlocksHandler::ClearCache() { + while (!fix_size_blocks_cache_.empty()) { + auto block = std::move(fix_size_blocks_cache_.front()); + block->Close(); + fix_size_blocks_cache_.pop(); + } +} + +void +MmapBlocksHandler::FitCache(const uint64_t size) { + while (fix_size_blocks_cache_.size() * fix_mmap_file_size_ > size) { + auto block = std::move(fix_size_blocks_cache_.front()); + block->Close(); + fix_size_blocks_cache_.pop(); + } +} + +MmapChunkManager::~MmapChunkManager() { + // munmap all mmap_blocks before remove dir + for (auto it = blocks_table_.begin(); it != blocks_table_.end();) { + it = blocks_table_.erase(it); + } + if (blocks_handler_ != nullptr) { + blocks_handler_ = nullptr; + } + // clean the mmap dir + auto cm = + storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + if (cm->Exist(mmap_file_prefix_)) { + cm->RemoveDir(mmap_file_prefix_); + } +} + +void +MmapChunkManager::Register(const MmapChunkDescriptor key) { + if (HasKey(key)) { + LOG_WARN("key has exist in growing mmap manager"); + return; + } + std::unique_lock lck(mtx_); + blocks_table_.emplace(key, std::vector()); + return; +} + +void +MmapChunkManager::UnRegister(const MmapChunkDescriptor key) { + std::unique_lock lck(mtx_); + if (blocks_table_.find(key) != blocks_table_.end()) { + auto& blocks = blocks_table_[key]; + for (auto i = 0; i < blocks.size(); i++) { + blocks_handler_->Deallocate(std::move(blocks[i])); + } + blocks_table_.erase(key); + } +} + +bool +MmapChunkManager::HasKey(const MmapChunkDescriptor key) { + std::shared_lock lck(mtx_); + return (blocks_table_.find(key) != blocks_table_.end()); +} + +void* +MmapChunkManager::Allocate(const MmapChunkDescriptor key, const uint64_t size) { + AssertInfo(HasKey(key), "key {} has not been register.", key->segment_id); + std::unique_lock lck(mtx_); + if (size < blocks_handler_->GetFixFileSize()) { + // find a place to fit in + for (auto block_id = 0; block_id < blocks_table_[key].size(); + block_id++) { + auto addr = blocks_table_[key][block_id]->Get(size); + if (addr != nullptr) { + return addr; + } + } + // create a new block + auto new_block = blocks_handler_->AllocateFixSizeBlock(); + AssertInfo(new_block != nullptr, "new mmap_block can't be nullptr"); + auto addr = new_block->Get(size); + AssertInfo(addr != nullptr, "fail to allocate from mmap block."); + blocks_table_[key].emplace_back(std::move(new_block)); + return addr; + } else { + auto new_block = blocks_handler_->AllocateLargeBlock(size); + AssertInfo(new_block != nullptr, "new mmap_block can't be nullptr"); + auto addr = new_block->Get(size); + AssertInfo(addr != nullptr, "fail to allocate from mmap block."); + blocks_table_[key].emplace_back(std::move(new_block)); + return addr; + } +} + +MmapChunkManager::MmapChunkManager(std::string root_path, + const uint64_t disk_limit, + const uint64_t file_size) { + blocks_handler_ = + std::make_unique(disk_limit, file_size, root_path); + mmap_file_prefix_ = root_path; + auto cm = + storage::LocalChunkManagerSingleton::GetInstance().GetChunkManager(); + AssertInfo(cm != nullptr, + "Fail to get LocalChunkManager, LocalChunkManagerPtr is null"); + if (cm->Exist(root_path)) { + cm->RemoveDir(root_path); + } + cm->CreateDir(root_path); + LOG_INFO( + "Init MappChunkManager with: Path {}, MaxDiskSize {} MB, " + "FixedFileSize {} MB.", + root_path, + disk_limit / (1024 * 1024), + file_size / (1024 * 1024)); +} +} // namespace milvus::storage \ No newline at end of file diff --git a/internal/core/src/storage/MmapChunkManager.h b/internal/core/src/storage/MmapChunkManager.h new file mode 100644 index 0000000000000..40bea6fd905c4 --- /dev/null +++ b/internal/core/src/storage/MmapChunkManager.h @@ -0,0 +1,214 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "common/EasyAssert.h" +#include "log/Log.h" +#include +#include "common/type_c.h" +#include "storage/LocalChunkManagerSingleton.h" +namespace milvus::storage { +// use segment id and segment type to descripe a segment in mmap chunk manager, segment only in two type (growing or sealed) in mmap chunk manager +struct MmapChunkDescriptorValue { + int64_t segment_id; + SegmentType segment_type; +}; +using MmapChunkDescriptor = std::shared_ptr; + +struct DescriptorHash { + size_t + operator()(const MmapChunkDescriptor& x) const { + return x->segment_id * 10 + (size_t)x->segment_type; + } +}; +/** + * @brief MmapBlock is a basic unit of MmapChunkManager. It handle all memory mmaping in one tmp file. + * static function(TotalBlocksSize) is used to get total files size of chunk mmap. + */ +struct MmapBlock { + public: + enum class BlockType { + Fixed = 0, + Variable = 1, + }; + MmapBlock(const std::string& file_name, + const uint64_t file_size, + BlockType type = BlockType::Fixed); + ~MmapBlock(); + void + Init(); + void + Close(); + void* + Get(const uint64_t size); + void + Reset() { + offset_.store(0); + } + BlockType + GetType() { + return block_type_; + } + uint64_t + GetCapacity() { + return file_size_; + } + static void + ClearAllocSize() { + allocated_size_.store(0); + } + static uint64_t + TotalBlocksSize() { + return allocated_size_.load(); + } + + private: + const std::string file_name_; + const uint64_t file_size_; + char* addr_ = nullptr; + std::atomic offset_ = 0; + const BlockType block_type_; + std::atomic is_valid_ = false; + static inline std::atomic allocated_size_ = + 0; //keeping the total size used in + mutable std::mutex file_mutex_; +}; +using MmapBlockPtr = std::unique_ptr; + +/** + * @brief MmapBlocksHandler is used to handle the creation and destruction of mmap blocks + * MmapBlocksHandler is not thread safe, + */ +class MmapBlocksHandler { + public: + MmapBlocksHandler(const uint64_t disk_limit, + const uint64_t fix_file_size, + const std::string file_prefix) + : max_disk_limit_(disk_limit), + mmap_file_prefix_(file_prefix), + fix_mmap_file_size_(fix_file_size) { + mmmap_file_counter_.store(0); + MmapBlock::ClearAllocSize(); + } + ~MmapBlocksHandler() { + ClearCache(); + } + uint64_t + GetDiskLimit() { + return max_disk_limit_; + } + uint64_t + GetFixFileSize() { + return fix_mmap_file_size_; + } + uint64_t + Capacity() { + return MmapBlock::TotalBlocksSize(); + } + uint64_t + Size() { + return Capacity() - fix_size_blocks_cache_.size() * fix_mmap_file_size_; + } + MmapBlockPtr + AllocateFixSizeBlock(); + MmapBlockPtr + AllocateLargeBlock(const uint64_t size); + void + Deallocate(MmapBlockPtr&& block); + + private: + std::string + GetFilePrefix() { + return mmap_file_prefix_; + } + std::string + GetMmapFilePath() { + auto file_id = mmmap_file_counter_.fetch_add(1); + return mmap_file_prefix_ + "/" + std::to_string(file_id); + } + void + ClearCache(); + void + FitCache(const uint64_t size); + + private: + uint64_t max_disk_limit_; + std::string mmap_file_prefix_; + std::atomic mmmap_file_counter_; + uint64_t fix_mmap_file_size_; + std::queue fix_size_blocks_cache_; + const float cache_threshold = 0.25; +}; + +/** + * @brief MmapChunkManager + * MmapChunkManager manages the memory-mapping space in mmap manager; + * MmapChunkManager uses blocks_table_ to record the relationship of segments and the mapp space it uses. + * The basic space unit of MmapChunkManager is MmapBlock, and is managed by MmapBlocksHandler. + * todo(cqy): blocks_handler_ and blocks_table_ is not thread safe, we need use fine-grained locks for better performance. + */ +class MmapChunkManager { + public: + explicit MmapChunkManager(std::string root_path, + const uint64_t disk_limit, + const uint64_t file_size); + ~MmapChunkManager(); + void + Register(const MmapChunkDescriptor key); + void + UnRegister(const MmapChunkDescriptor key); + bool + HasKey(const MmapChunkDescriptor key); + void* + Allocate(const MmapChunkDescriptor key, const uint64_t size); + uint64_t + GetDiskAllocSize() { + std::shared_lock lck(mtx_); + if (blocks_handler_ == nullptr) { + return 0; + } else { + return blocks_handler_->Capacity(); + } + } + uint64_t + GetDiskUsage() { + std::shared_lock lck(mtx_); + if (blocks_handler_ == nullptr) { + return 0; + } else { + return blocks_handler_->Size(); + } + } + + private: + mutable std::shared_mutex mtx_; + std::unordered_map, + DescriptorHash> + blocks_table_; + std::unique_ptr blocks_handler_ = nullptr; + std::string mmap_file_prefix_; +}; +using MmapChunkManagerPtr = std::shared_ptr; +} // namespace milvus::storage \ No newline at end of file diff --git a/internal/core/src/storage/MmapManager.h b/internal/core/src/storage/MmapManager.h new file mode 100644 index 0000000000000..f2e32d56c6f0e --- /dev/null +++ b/internal/core/src/storage/MmapManager.h @@ -0,0 +1,123 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include "ChunkCache.h" +#include "RemoteChunkManagerSingleton.h" + +namespace milvus::storage { +/** + * @brief MmapManager(singleton) + * MmapManager holds all mmap components; + * all mmap components use mmapchunkmanager to allocate mmap space; + * no thread safe, only one thread init in segcore. + */ +class MmapManager { + private: + MmapManager() = default; + + public: + MmapManager(const MmapManager&) = delete; + MmapManager& + operator=(const MmapManager&) = delete; + + static MmapManager& + GetInstance() { + static MmapManager instance; + return instance; + } + ~MmapManager() { + if (cc_ != nullptr) { + cc_ = nullptr; + } + // delete mmap chunk manager at last + if (mcm_ != nullptr) { + mcm_ = nullptr; + } + } + void + Init(const MmapConfig& config) { + if (init_flag_ == false) { + std::lock_guard lock( + init_mutex_); // in case many threads call init + mmap_config_ = config; + if (mcm_ == nullptr) { + mcm_ = std::make_shared( + mmap_config_.mmap_path, + mmap_config_.disk_limit, + mmap_config_.fix_file_size); + } + if (cc_ == nullptr) { + auto rcm = RemoteChunkManagerSingleton::GetInstance() + .GetRemoteChunkManager(); + cc_ = std::make_shared( + std::move(mmap_config_.cache_read_ahead_policy), rcm, mcm_); + } + LOG_INFO("Init MmapConfig with MmapConfig: {}", + mmap_config_.ToString()); + init_flag_ = true; + } else { + LOG_WARN("mmap manager has been inited."); + } + } + + ChunkCachePtr + GetChunkCache() { + AssertInfo(init_flag_ == true, "Mmap manager has not been init."); + return cc_; + } + + MmapChunkManagerPtr + GetMmapChunkManager() { + AssertInfo(init_flag_ == true, "Mmap manager has not been init."); + return mcm_; + } + + MmapConfig& + GetMmapConfig() { + AssertInfo(init_flag_ == true, "Mmap manager has not been init."); + return mmap_config_; + } + + size_t + GetAllocSize() { + if (mcm_ != nullptr) { + return mcm_->GetDiskAllocSize(); + } else { + return 0; + } + } + + size_t + GetDiskUsage() { + if (mcm_ != nullptr) { + return mcm_->GetDiskUsage(); + } else { + return 0; + } + } + + private: + mutable std::mutex init_mutex_; + MmapConfig mmap_config_; + MmapChunkManagerPtr mcm_ = nullptr; + ChunkCachePtr cc_ = nullptr; + std::atomic init_flag_ = false; +}; + +} // namespace milvus::storage \ No newline at end of file diff --git a/internal/core/src/storage/Types.h b/internal/core/src/storage/Types.h index 928386d19078a..949c08846ac52 100644 --- a/internal/core/src/storage/Types.h +++ b/internal/core/src/storage/Types.h @@ -119,6 +119,33 @@ struct StorageConfig { } }; +struct MmapConfig { + std::string cache_read_ahead_policy; + std::string mmap_path; + uint64_t disk_limit; + uint64_t fix_file_size; + bool growing_enable_mmap; + bool + GetEnableGrowingMmap() const { + return growing_enable_mmap; + } + void + SetEnableGrowingMmap(bool flag) { + this->growing_enable_mmap = flag; + } + std::string + ToString() const { + std::stringstream ss; + ss << "[cache_read_ahead_policy=" << cache_read_ahead_policy + << ", mmap_path=" << mmap_path + << ", disk_limit=" << disk_limit / (1024 * 1024) << "MB" + << ", fix_file_size=" << fix_file_size / (1024 * 1024) << "MB" + << ", growing_enable_mmap=" << std::boolalpha << growing_enable_mmap + << "]"; + return ss.str(); + } +}; + } // namespace milvus::storage template <> diff --git a/internal/core/src/storage/storage_c.cpp b/internal/core/src/storage/storage_c.cpp index 5ff9b300f0b02..456311ce8c9d4 100644 --- a/internal/core/src/storage/storage_c.cpp +++ b/internal/core/src/storage/storage_c.cpp @@ -18,7 +18,7 @@ #include "storage/prometheus_client.h" #include "storage/RemoteChunkManagerSingleton.h" #include "storage/LocalChunkManagerSingleton.h" -#include "storage/ChunkCacheSingleton.h" +#include "storage/MmapManager.h" CStatus GetLocalUsedSize(const char* c_dir, int64_t* size) { @@ -86,10 +86,16 @@ InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config) { } CStatus -InitChunkCacheSingleton(const char* c_dir_path, const char* read_ahead_policy) { +InitMmapManager(CMmapConfig c_mmap_config) { try { - milvus::storage::ChunkCacheSingleton::GetInstance().Init( - c_dir_path, read_ahead_policy); + milvus::storage::MmapConfig mmap_config; + mmap_config.cache_read_ahead_policy = + std::string(c_mmap_config.cache_read_ahead_policy); + mmap_config.mmap_path = std::string(c_mmap_config.mmap_path); + mmap_config.disk_limit = c_mmap_config.disk_limit; + mmap_config.fix_file_size = c_mmap_config.fix_file_size; + mmap_config.growing_enable_mmap = c_mmap_config.growing_enable_mmap; + milvus::storage::MmapManager::GetInstance().Init(mmap_config); return milvus::SuccessCStatus(); } catch (std::exception& e) { return milvus::FailureCStatus(&e); diff --git a/internal/core/src/storage/storage_c.h b/internal/core/src/storage/storage_c.h index de3b5f4828895..3aa3666364953 100644 --- a/internal/core/src/storage/storage_c.h +++ b/internal/core/src/storage/storage_c.h @@ -31,7 +31,7 @@ CStatus InitRemoteChunkManagerSingleton(CStorageConfig c_storage_config); CStatus -InitChunkCacheSingleton(const char* c_dir_path, const char* read_ahead_policy); +InitMmapManager(CMmapConfig c_mmap_config); void CleanRemoteChunkManagerSingleton(); diff --git a/internal/core/unittest/CMakeLists.txt b/internal/core/unittest/CMakeLists.txt index 43da214d5c965..44f85a366971e 100644 --- a/internal/core/unittest/CMakeLists.txt +++ b/internal/core/unittest/CMakeLists.txt @@ -69,6 +69,7 @@ set(MILVUS_TEST_FILES test_regex_query.cpp test_futures.cpp test_array_inverted_index.cpp + test_chunk_vector.cpp ) if ( INDEX_ENGINE STREQUAL "cardinal" ) diff --git a/internal/core/unittest/init_gtest.cpp b/internal/core/unittest/init_gtest.cpp index 6b23b68228038..3633a86f824d4 100644 --- a/internal/core/unittest/init_gtest.cpp +++ b/internal/core/unittest/init_gtest.cpp @@ -23,6 +23,7 @@ main(int argc, char** argv) { TestLocalPath); milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init( get_default_local_storage_config()); + milvus::storage::MmapManager::GetInstance().Init(get_default_mmap_config()); return RUN_ALL_TESTS(); } diff --git a/internal/core/unittest/test_c_api.cpp b/internal/core/unittest/test_c_api.cpp index 379bf7792e622..d8f7fc29b34e4 100644 --- a/internal/core/unittest/test_c_api.cpp +++ b/internal/core/unittest/test_c_api.cpp @@ -970,7 +970,7 @@ TEST(CApiTest, DeleteRepeatedPksFromSealedSegment) { TEST(CApiTest, InsertSamePkAfterDeleteOnGrowingSegment) { auto collection = NewCollection(get_default_schema_config()); CSegmentInterface segment; - auto status = NewSegment(collection, Growing, -1, &segment); + auto status = NewSegment(collection, Growing, 111, &segment); ASSERT_EQ(status.error_code, Success); auto col = (milvus::segcore::Collection*)collection; diff --git a/internal/core/unittest/test_chunk_cache.cpp b/internal/core/unittest/test_chunk_cache.cpp index aa1e65132b4e9..9404a56f4226d 100644 --- a/internal/core/unittest/test_chunk_cache.cpp +++ b/internal/core/unittest/test_chunk_cache.cpp @@ -27,16 +27,32 @@ #include "storage/LocalChunkManagerSingleton.h" #define DEFAULT_READ_AHEAD_POLICY "willneed" - -TEST(ChunkCacheTest, Read) { +class ChunkCacheTest : public testing::Test { + public: + void + SetUp() override { + mcm = milvus::storage::MmapManager::GetInstance().GetMmapChunkManager(); + mcm->Register(descriptor); + } + void + TearDown() override { + mcm->UnRegister(descriptor); + } + const char* local_storage_path = "/tmp/test_chunk_cache/local"; + const char* file_name = "chunk_cache_test/insert_log/2/101/1000000"; + milvus::storage::MmapChunkManagerPtr mcm; + milvus::segcore::SegcoreConfig config; + milvus::storage::MmapChunkDescriptor descriptor = + std::shared_ptr( + new milvus::storage::MmapChunkDescriptorValue( + {111, SegmentType::Sealed})); +}; + +TEST_F(ChunkCacheTest, Read) { auto N = 10000; auto dim = 128; auto metric_type = knowhere::metric::L2; - auto mmap_dir = "/tmp/test_chunk_cache/mmap"; - auto local_storage_path = "/tmp/test_chunk_cache/local"; - auto file_name = std::string("chunk_cache_test/insert_log/1/101/1000000"); - milvus::storage::LocalChunkManagerSingleton::GetInstance().Init( local_storage_path); @@ -69,9 +85,10 @@ TEST(ChunkCacheTest, Read) { field_data_meta, field_meta); - auto cc = std::make_shared( - mmap_dir, DEFAULT_READ_AHEAD_POLICY, lcm); - const auto& column = cc->Read(file_name); + auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache(); + const auto& column = cc->Read(file_name, descriptor); + std::cout << "column->ByteSize() :" << column->ByteSize() << " " + << dim * N * 4 << std::endl; Assert(column->ByteSize() == dim * N * 4); auto actual = (float*)column->Data(); @@ -82,23 +99,13 @@ TEST(ChunkCacheTest, Read) { cc->Remove(file_name); lcm->Remove(file_name); - std::filesystem::remove_all(mmap_dir); - - auto exist = lcm->Exist(file_name); - Assert(!exist); - exist = std::filesystem::exists(mmap_dir); - Assert(!exist); } -TEST(ChunkCacheTest, TestMultithreads) { +TEST_F(ChunkCacheTest, TestMultithreads) { auto N = 1000; auto dim = 128; auto metric_type = knowhere::metric::L2; - auto mmap_dir = "/tmp/test_chunk_cache/mmap"; - auto local_storage_path = "/tmp/test_chunk_cache/local"; - auto file_name = std::string("chunk_cache_test/insert_log/2/101/1000000"); - milvus::storage::LocalChunkManagerSingleton::GetInstance().Init( local_storage_path); @@ -131,13 +138,13 @@ TEST(ChunkCacheTest, TestMultithreads) { field_data_meta, field_meta); - auto cc = std::make_shared( - mmap_dir, DEFAULT_READ_AHEAD_POLICY, lcm); + auto cc = milvus::storage::MmapManager::GetInstance().GetChunkCache(); constexpr int threads = 16; std::vector total_counts(threads); auto executor = [&](int thread_id) { - const auto& column = cc->Read(file_name); + std::cout << "thread id" << thread_id << " read data" << std::endl; + const auto& column = cc->Read(file_name, descriptor); Assert(column->ByteSize() == dim * N * 4); auto actual = (float*)column->Data(); @@ -156,10 +163,4 @@ TEST(ChunkCacheTest, TestMultithreads) { cc->Remove(file_name); lcm->Remove(file_name); - std::filesystem::remove_all(mmap_dir); - - auto exist = lcm->Exist(file_name); - Assert(!exist); - exist = std::filesystem::exists(mmap_dir); - Assert(!exist); } diff --git a/internal/core/unittest/test_chunk_vector.cpp b/internal/core/unittest/test_chunk_vector.cpp new file mode 100644 index 0000000000000..b0d67663e4df1 --- /dev/null +++ b/internal/core/unittest/test_chunk_vector.cpp @@ -0,0 +1,438 @@ +// Copyright (C) 2019-2020 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License + +#include + +#include "common/Types.h" +#include "knowhere/comp/index_param.h" +#include "segcore/SegmentGrowing.h" +#include "segcore/SegmentGrowingImpl.h" +#include "pb/schema.pb.h" +#include "test_utils/DataGen.h" +#include "query/Plan.h" +#include "query/generated/ExecExprVisitor.h" + +using namespace milvus::segcore; +using namespace milvus; +namespace pb = milvus::proto; + +class ChunkVectorTest : public testing::Test { + public: + void + SetUp() override { + auto& mmap_config = + milvus::storage::MmapManager::GetInstance().GetMmapConfig(); + mmap_config.SetEnableGrowingMmap(true); + } + void + TearDown() override { + auto& mmap_config = + milvus::storage::MmapManager::GetInstance().GetMmapConfig(); + mmap_config.SetEnableGrowingMmap(false); + } + knowhere::MetricType metric_type = "IP"; + milvus::segcore::SegcoreConfig config; +}; + +TEST_F(ChunkVectorTest, FillDataWithMmap) { + auto schema = std::make_shared(); + auto bool_field = schema->AddDebugField("bool", DataType::BOOL); + auto int8_field = schema->AddDebugField("int8", DataType::INT8); + auto int16_field = schema->AddDebugField("int16", DataType::INT16); + auto int32_field = schema->AddDebugField("int32", DataType::INT32); + auto int64_field = schema->AddDebugField("int64", DataType::INT64); + auto float_field = schema->AddDebugField("float", DataType::FLOAT); + auto double_field = schema->AddDebugField("double", DataType::DOUBLE); + auto varchar_field = schema->AddDebugField("varchar", DataType::VARCHAR); + auto json_field = schema->AddDebugField("json", DataType::JSON); + auto int_array_field = + schema->AddDebugField("int_array", DataType::ARRAY, DataType::INT8); + auto long_array_field = + schema->AddDebugField("long_array", DataType::ARRAY, DataType::INT64); + auto bool_array_field = + schema->AddDebugField("bool_array", DataType::ARRAY, DataType::BOOL); + auto string_array_field = schema->AddDebugField( + "string_array", DataType::ARRAY, DataType::VARCHAR); + auto double_array_field = schema->AddDebugField( + "double_array", DataType::ARRAY, DataType::DOUBLE); + auto float_array_field = + schema->AddDebugField("float_array", DataType::ARRAY, DataType::FLOAT); + auto fp32_vec = schema->AddDebugField( + "fp32_vec", DataType::VECTOR_FLOAT, 128, metric_type); + auto fp16_vec = schema->AddDebugField( + "fp16_vec", DataType::VECTOR_FLOAT16, 128, metric_type); + auto bf16_vec = schema->AddDebugField( + "bf16_vec", DataType::VECTOR_BFLOAT16, 128, metric_type); + auto sparse_vec = schema->AddDebugField( + "sparse_vec", DataType::VECTOR_SPARSE_FLOAT, 128, metric_type); + schema->set_primary_field_id(int64_field); + + std::map index_params = { + {"index_type", "HNSW"}, {"metric_type", metric_type}, {"nlist", "128"}}; + std::map type_params = {{"dim", "128"}}; + FieldIndexMeta fieldIndexMeta( + fp32_vec, std::move(index_params), std::move(type_params)); + + std::map filedMap = {{fp32_vec, fieldIndexMeta}}; + IndexMetaPtr metaPtr = + std::make_shared(100000, std::move(filedMap)); + auto segment_growing = CreateGrowingSegment(schema, metaPtr, 1, config); + auto segment = dynamic_cast(segment_growing.get()); + int64_t per_batch = 1000; + int64_t n_batch = 3; + int64_t dim = 128; + for (int64_t i = 0; i < n_batch; i++) { + auto dataset = DataGen(schema, per_batch); + + auto offset = segment->PreInsert(per_batch); + segment->Insert(offset, + per_batch, + dataset.row_ids_.data(), + dataset.timestamps_.data(), + dataset.raw_); + auto num_inserted = (i + 1) * per_batch; + auto ids_ds = GenRandomIds(num_inserted); + auto bool_result = + segment->bulk_subscript(bool_field, ids_ds->GetIds(), num_inserted); + auto int8_result = + segment->bulk_subscript(int8_field, ids_ds->GetIds(), num_inserted); + auto int16_result = segment->bulk_subscript( + int16_field, ids_ds->GetIds(), num_inserted); + auto int32_result = segment->bulk_subscript( + int32_field, ids_ds->GetIds(), num_inserted); + auto int64_result = segment->bulk_subscript( + int64_field, ids_ds->GetIds(), num_inserted); + auto float_result = segment->bulk_subscript( + float_field, ids_ds->GetIds(), num_inserted); + auto double_result = segment->bulk_subscript( + double_field, ids_ds->GetIds(), num_inserted); + auto varchar_result = segment->bulk_subscript( + varchar_field, ids_ds->GetIds(), num_inserted); + auto json_result = + segment->bulk_subscript(json_field, ids_ds->GetIds(), num_inserted); + auto int_array_result = segment->bulk_subscript( + int_array_field, ids_ds->GetIds(), num_inserted); + auto long_array_result = segment->bulk_subscript( + long_array_field, ids_ds->GetIds(), num_inserted); + auto bool_array_result = segment->bulk_subscript( + bool_array_field, ids_ds->GetIds(), num_inserted); + auto string_array_result = segment->bulk_subscript( + string_array_field, ids_ds->GetIds(), num_inserted); + auto double_array_result = segment->bulk_subscript( + double_array_field, ids_ds->GetIds(), num_inserted); + auto float_array_result = segment->bulk_subscript( + float_array_field, ids_ds->GetIds(), num_inserted); + auto fp32_vec_result = + segment->bulk_subscript(fp32_vec, ids_ds->GetIds(), num_inserted); + auto fp16_vec_result = + segment->bulk_subscript(fp16_vec, ids_ds->GetIds(), num_inserted); + auto bf16_vec_result = + segment->bulk_subscript(bf16_vec, ids_ds->GetIds(), num_inserted); + auto sparse_vec_result = + segment->bulk_subscript(sparse_vec, ids_ds->GetIds(), num_inserted); + + EXPECT_EQ(bool_result->scalars().bool_data().data_size(), num_inserted); + EXPECT_EQ(int8_result->scalars().int_data().data_size(), num_inserted); + EXPECT_EQ(int16_result->scalars().int_data().data_size(), num_inserted); + EXPECT_EQ(int32_result->scalars().int_data().data_size(), num_inserted); + EXPECT_EQ(int64_result->scalars().long_data().data_size(), + num_inserted); + EXPECT_EQ(float_result->scalars().float_data().data_size(), + num_inserted); + EXPECT_EQ(double_result->scalars().double_data().data_size(), + num_inserted); + EXPECT_EQ(varchar_result->scalars().string_data().data_size(), + num_inserted); + EXPECT_EQ(json_result->scalars().json_data().data_size(), num_inserted); + EXPECT_EQ(fp32_vec_result->vectors().float_vector().data_size(), + num_inserted * dim); + EXPECT_EQ(fp16_vec_result->vectors().float16_vector().size(), + num_inserted * dim * 2); + EXPECT_EQ(bf16_vec_result->vectors().bfloat16_vector().size(), + num_inserted * dim * 2); + EXPECT_EQ( + sparse_vec_result->vectors().sparse_float_vector().contents_size(), + num_inserted); + EXPECT_EQ(int_array_result->scalars().array_data().data_size(), + num_inserted); + EXPECT_EQ(long_array_result->scalars().array_data().data_size(), + num_inserted); + EXPECT_EQ(bool_array_result->scalars().array_data().data_size(), + num_inserted); + EXPECT_EQ(string_array_result->scalars().array_data().data_size(), + num_inserted); + EXPECT_EQ(double_array_result->scalars().array_data().data_size(), + num_inserted); + EXPECT_EQ(float_array_result->scalars().array_data().data_size(), + num_inserted); + } +} + +TEST_F(ChunkVectorTest, QueryWithMmap) { + auto schema = std::make_shared(); + schema->AddDebugField( + "fakevec", DataType::VECTOR_FLOAT, 16, knowhere::metric::L2); + schema->AddDebugField("age", DataType::FLOAT); + auto i64_fid = schema->AddDebugField("counter", DataType::INT64); + schema->set_primary_field_id(i64_fid); + const char* raw_plan = R"(vector_anns: < + field_id: 100 + predicates: < + term_expr: < + column_info: < + field_id: 102 + data_type: Int64 + > + values: < + int64_val: 1 + > + values: < + int64_val: 2 + > + > + > + query_info: < + topk: 5 + round_decimal: 3 + metric_type: "L2" + search_params: "{\"nprobe\": 10}" + > + placeholder_tag: "$0" + >)"; + int64_t N = 4000; + auto dataset = DataGen(schema, N); + auto segment = CreateGrowingSegment(schema, empty_index_meta, 11, config); + segment->PreInsert(N); + segment->Insert(0, + N, + dataset.row_ids_.data(), + dataset.timestamps_.data(), + dataset.raw_); + + auto plan_str = translate_text_plan_to_binary_plan(raw_plan); + auto plan = milvus::query::CreateSearchPlanByExpr( + *schema, plan_str.data(), plan_str.size()); + auto num_queries = 3; + auto ph_group_raw = + milvus::segcore::CreatePlaceholderGroup(num_queries, 16, 1024); + auto ph_group = milvus::query::ParsePlaceholderGroup( + plan.get(), ph_group_raw.SerializeAsString()); + Timestamp timestamp = 1000000; + + auto sr = segment->Search(plan.get(), ph_group.get(), timestamp); + int topk = 5; + auto json = SearchResultToJson(*sr); + ASSERT_EQ(sr->total_nq_, num_queries); + ASSERT_EQ(sr->unity_topK_, topk); +} + +// TEST_F(ChunkVectorTest, ArrayExprWithMmap) { +// auto schema = std::make_shared(); +// auto i64_fid = schema->AddDebugField("id", DataType::INT64); +// auto long_array_fid = +// schema->AddDebugField("long_array", DataType::ARRAY, DataType::INT64); +// auto bool_array_fid = +// schema->AddDebugField("bool_array", DataType::ARRAY, DataType::BOOL); +// auto float_array_fid = +// schema->AddDebugField("float_array", DataType::ARRAY, DataType::FLOAT); +// auto string_array_fid = schema->AddDebugField( +// "string_array", DataType::ARRAY, DataType::VARCHAR); +// schema->set_primary_field_id(i64_fid); + +// auto seg = CreateGrowingSegment(schema, empty_index_meta, 22, config); +// int N = 1000; +// std::map> array_cols; +// int num_iters = 1; +// for (int iter = 0; iter < num_iters; ++iter) { +// auto raw_data = DataGen(schema, N, iter); +// auto new_long_array_col = raw_data.get_col(long_array_fid); +// auto new_bool_array_col = raw_data.get_col(bool_array_fid); +// auto new_float_array_col = +// raw_data.get_col(float_array_fid); +// auto new_string_array_col = +// raw_data.get_col(string_array_fid); +// array_cols["long"].insert(array_cols["long"].end(), +// new_long_array_col.begin(), +// new_long_array_col.end()); +// array_cols["bool"].insert(array_cols["bool"].end(), +// new_bool_array_col.begin(), +// new_bool_array_col.end()); +// array_cols["float"].insert(array_cols["float"].end(), +// new_float_array_col.begin(), +// new_float_array_col.end()); +// array_cols["string"].insert(array_cols["string"].end(), +// new_string_array_col.begin(), +// new_string_array_col.end()); +// seg->PreInsert(N); +// seg->Insert(iter * N, +// N, +// raw_data.row_ids_.data(), +// raw_data.timestamps_.data(), +// raw_data.raw_); +// } + +// auto seg_promote = dynamic_cast(seg.get()); +// query::ExecPlanNodeVisitor visitor(*seg_promote, MAX_TIMESTAMP); + +// std::vector>> +// testcases = { +// {R"(term_expr: < +// column_info: < +// field_id: 101 +// data_type: Array +// nested_path:"0" +// element_type:Int64 +// > +// values: values: values: +// >)", +// "long", +// [](milvus::Array& array) { +// auto val = array.get_data(0); +// return val == 1 || val == 2 || val == 3; +// }}, +// {R"(term_expr: < +// column_info: < +// field_id: 101 +// data_type: Array +// nested_path:"0" +// element_type:Int64 +// > +// >)", +// "long", +// [](milvus::Array& array) { return false; }}, +// {R"(term_expr: < +// column_info: < +// field_id: 102 +// data_type: Array +// nested_path:"0" +// element_type:Bool +// > +// values: values: +// >)", +// "bool", +// [](milvus::Array& array) { +// auto val = array.get_data(0); +// return !val; +// }}, +// {R"(term_expr: < +// column_info: < +// field_id: 102 +// data_type: Array +// nested_path:"0" +// element_type:Bool +// > +// >)", +// "bool", +// [](milvus::Array& array) { return false; }}, +// {R"(term_expr: < +// column_info: < +// field_id: 103 +// data_type: Array +// nested_path:"0" +// element_type:Float +// > +// values: values: +// >)", +// "float", +// [](milvus::Array& array) { +// auto val = array.get_data(0); +// return val == 1.23 || val == 124.31; +// }}, +// {R"(term_expr: < +// column_info: < +// field_id: 103 +// data_type: Array +// nested_path:"0" +// element_type:Float +// > +// >)", +// "float", +// [](milvus::Array& array) { return false; }}, +// {R"(term_expr: < +// column_info: < +// field_id: 104 +// data_type: Array +// nested_path:"0" +// element_type:VarChar +// > +// values: values: +// >)", +// "string", +// [](milvus::Array& array) { +// auto val = array.get_data(0); +// return val == "abc" || val == "idhgf1s"; +// }}, +// {R"(term_expr: < +// column_info: < +// field_id: 104 +// data_type: Array +// nested_path:"0" +// element_type:VarChar +// > +// >)", +// "string", +// [](milvus::Array& array) { return false; }}, +// {R"(term_expr: < +// column_info: < +// field_id: 104 +// data_type: Array +// nested_path:"1024" +// element_type:VarChar +// > +// values: values: +// >)", +// "string", +// [](milvus::Array& array) { +// if (array.length() <= 1024) { +// return false; +// } +// auto val = array.get_data(1024); +// return val == "abc" || val == "idhgf1s"; +// }}, +// }; + +// std::string raw_plan_tmp = R"(vector_anns: < +// field_id: 100 +// predicates: < +// @@@@ +// > +// query_info: < +// topk: 10 +// round_decimal: 3 +// metric_type: "L2" +// search_params: "{\"nprobe\": 10}" +// > +// placeholder_tag: "$0" +// >)"; + +// for (auto [clause, array_type, ref_func] : testcases) { +// auto loc = raw_plan_tmp.find("@@@@"); +// auto raw_plan = raw_plan_tmp; +// raw_plan.replace(loc, 4, clause); +// auto plan_str = translate_text_plan_to_binary_plan(raw_plan.c_str()); +// auto plan = +// milvus::query::CreateSearchPlanByExpr(*schema, plan_str.data(), plan_str.size()); +// BitsetType final; +// visitor.ExecuteExprNode(plan->plan_node_->filter_plannode_.value(), +// seg_promote, +// N * num_iters, +// final); +// EXPECT_EQ(final.size(), N * num_iters); + +// for (int i = 0; i < N * num_iters; ++i) { +// auto ans = final[i]; +// auto array = milvus::Array(array_cols[array_type][i]); +// ASSERT_EQ(ans, ref_func(array)); +// } +// } +// } diff --git a/internal/core/unittest/test_growing.cpp b/internal/core/unittest/test_growing.cpp index f5421384e02fb..20a42a143ff4e 100644 --- a/internal/core/unittest/test_growing.cpp +++ b/internal/core/unittest/test_growing.cpp @@ -227,7 +227,7 @@ TEST_P(GrowingTest, FillData) { float_array_field, ids_ds->GetIds(), num_inserted); auto vec_result = segment->bulk_subscript(vec, ids_ds->GetIds(), num_inserted); - + // checking result data EXPECT_EQ(bool_result->scalars().bool_data().data_size(), num_inserted); EXPECT_EQ(int8_result->scalars().int_data().data_size(), num_inserted); EXPECT_EQ(int16_result->scalars().int_data().data_size(), num_inserted); diff --git a/internal/core/unittest/test_sealed.cpp b/internal/core/unittest/test_sealed.cpp index 0edcb4c9cd6cd..8032d6e2cb5d2 100644 --- a/internal/core/unittest/test_sealed.cpp +++ b/internal/core/unittest/test_sealed.cpp @@ -17,7 +17,7 @@ #include "index/IndexFactory.h" #include "knowhere/version.h" #include "segcore/SegmentSealedImpl.h" -#include "storage/ChunkCacheSingleton.h" +#include "storage/MmapManager.h" #include "storage/MinioChunkManager.h" #include "storage/RemoteChunkManagerSingleton.h" #include "storage/Util.h" @@ -1378,7 +1378,6 @@ TEST(Sealed, GetVectorFromChunkCache) { auto metric_type = knowhere::metric::L2; auto index_type = knowhere::IndexEnum::INDEX_FAISS_IVFPQ; - auto mmap_dir = "/tmp/mmap"; auto file_name = std::string( "sealed_test_get_vector_from_chunk_cache/insert_log/1/101/1000000"); @@ -1386,8 +1385,6 @@ TEST(Sealed, GetVectorFromChunkCache) { milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(sc); auto mcm = std::make_unique(sc); // mcm->CreateBucket(sc.bucket_name); - milvus::storage::ChunkCacheSingleton::GetInstance().Init(mmap_dir, - "willneed"); auto schema = std::make_shared(); auto fakevec_id = schema->AddDebugField( @@ -1444,11 +1441,9 @@ TEST(Sealed, GetVectorFromChunkCache) { std::vector{N}, false, std::vector{file_name}}; - segment_sealed->AddFieldDataInfoForSealed(LoadFieldDataInfo{ - std::map{ - {fakevec_id.get(), field_binlog_info}}, - mmap_dir, - }); + segment_sealed->AddFieldDataInfoForSealed( + LoadFieldDataInfo{std::map{ + {fakevec_id.get(), field_binlog_info}}}); auto segment = dynamic_cast(segment_sealed.get()); auto has = segment->HasRawData(vec_info.field_id); @@ -1471,11 +1466,8 @@ TEST(Sealed, GetVectorFromChunkCache) { } rcm->Remove(file_name); - std::filesystem::remove_all(mmap_dir); auto exist = rcm->Exist(file_name); Assert(!exist); - exist = std::filesystem::exists(mmap_dir); - Assert(!exist); } TEST(Sealed, GetSparseVectorFromChunkCache) { @@ -1490,15 +1482,12 @@ TEST(Sealed, GetSparseVectorFromChunkCache) { // we have a type of sparse index that doesn't include raw data. auto index_type = knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX; - auto mmap_dir = "/tmp/mmap"; auto file_name = std::string( "sealed_test_get_vector_from_chunk_cache/insert_log/1/101/1000000"); auto sc = milvus::storage::StorageConfig{}; milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(sc); auto mcm = std::make_unique(sc); - milvus::storage::ChunkCacheSingleton::GetInstance().Init(mmap_dir, - "willneed"); auto schema = std::make_shared(); auto fakevec_id = schema->AddDebugField( @@ -1556,11 +1545,9 @@ TEST(Sealed, GetSparseVectorFromChunkCache) { std::vector{N}, false, std::vector{file_name}}; - segment_sealed->AddFieldDataInfoForSealed(LoadFieldDataInfo{ - std::map{ - {fakevec_id.get(), field_binlog_info}}, - mmap_dir, - }); + segment_sealed->AddFieldDataInfoForSealed( + LoadFieldDataInfo{std::map{ + {fakevec_id.get(), field_binlog_info}}}); auto segment = dynamic_cast(segment_sealed.get()); @@ -1585,11 +1572,8 @@ TEST(Sealed, GetSparseVectorFromChunkCache) { } rcm->Remove(file_name); - std::filesystem::remove_all(mmap_dir); auto exist = rcm->Exist(file_name); Assert(!exist); - exist = std::filesystem::exists(mmap_dir); - Assert(!exist); } TEST(Sealed, WarmupChunkCache) { @@ -1609,9 +1593,6 @@ TEST(Sealed, WarmupChunkCache) { auto sc = milvus::storage::StorageConfig{}; milvus::storage::RemoteChunkManagerSingleton::GetInstance().Init(sc); auto mcm = std::make_unique(sc); - // mcm->CreateBucket(sc.bucket_name); - milvus::storage::ChunkCacheSingleton::GetInstance().Init(mmap_dir, - "willneed"); auto schema = std::make_shared(); auto fakevec_id = schema->AddDebugField( @@ -1668,11 +1649,9 @@ TEST(Sealed, WarmupChunkCache) { std::vector{N}, false, std::vector{file_name}}; - segment_sealed->AddFieldDataInfoForSealed(LoadFieldDataInfo{ - std::map{ - {fakevec_id.get(), field_binlog_info}}, - mmap_dir, - }); + segment_sealed->AddFieldDataInfoForSealed( + LoadFieldDataInfo{std::map{ + {fakevec_id.get(), field_binlog_info}}}); auto segment = dynamic_cast(segment_sealed.get()); auto has = segment->HasRawData(vec_info.field_id); diff --git a/internal/core/unittest/test_segcore.cpp b/internal/core/unittest/test_segcore.cpp index 2f905864d69e7..59d2d36f6a4e6 100644 --- a/internal/core/unittest/test_segcore.cpp +++ b/internal/core/unittest/test_segcore.cpp @@ -20,6 +20,7 @@ using namespace milvus; namespace { +static constexpr int64_t seg_id = 101; auto generate_data(int N) { std::vector raw_data; diff --git a/internal/core/unittest/test_storage.cpp b/internal/core/unittest/test_storage.cpp index 624237633c4e2..ec2d0a10c3b5d 100644 --- a/internal/core/unittest/test_storage.cpp +++ b/internal/core/unittest/test_storage.cpp @@ -155,14 +155,4 @@ TEST_F(StorageTest, GetStorageMetrics) { EXPECT_EQ( 0, strncmp(currentLine, familyName.c_str(), familyName.length())); } -} - -TEST_F(StorageTest, CachePath) { - auto rcm = - RemoteChunkManagerSingleton::GetInstance().GetRemoteChunkManager(); - auto cc_ = ChunkCache("tmp/mmap/chunk_cache", "willneed", rcm); - auto relative_result = cc_.CachePath("abc"); - EXPECT_EQ("tmp/mmap/chunk_cache/abc", relative_result); - auto absolute_result = cc_.CachePath("/var/lib/milvus/abc"); - EXPECT_EQ("tmp/mmap/chunk_cache/var/lib/milvus/abc", absolute_result); } \ No newline at end of file diff --git a/internal/core/unittest/test_utils.cpp b/internal/core/unittest/test_utils.cpp index ed247d38678f8..2c8e30877bbd6 100644 --- a/internal/core/unittest/test_utils.cpp +++ b/internal/core/unittest/test_utils.cpp @@ -62,7 +62,7 @@ TEST(Util, GetDeleteBitmap) { auto i64_fid = schema->AddDebugField("age", DataType::INT64); schema->set_primary_field_id(i64_fid); auto N = 10; - + uint64_t seg_id = 101; InsertRecord insert_record(*schema, N); DeletedRecord delete_record; diff --git a/internal/core/unittest/test_utils/storage_test_utils.h b/internal/core/unittest/test_utils/storage_test_utils.h index 52c4b9f83a74e..05f6e864ec66e 100644 --- a/internal/core/unittest/test_utils/storage_test_utils.h +++ b/internal/core/unittest/test_utils/storage_test_utils.h @@ -32,6 +32,7 @@ using milvus::segcore::GeneratedData; using milvus::storage::ChunkManagerPtr; using milvus::storage::FieldDataMeta; using milvus::storage::InsertData; +using milvus::storage::MmapConfig; using milvus::storage::StorageConfig; namespace { @@ -45,6 +46,18 @@ get_default_local_storage_config() { return storage_config; } +inline MmapConfig +get_default_mmap_config() { + MmapConfig mmap_config = { + .cache_read_ahead_policy = "willneed", + .mmap_path = "/tmp/test_mmap_manager/", + .disk_limit = + uint64_t(2) * uint64_t(1024) * uint64_t(1024) * uint64_t(1024), + .fix_file_size = uint64_t(4) * uint64_t(1024) * uint64_t(1024), + .growing_enable_mmap = false}; + return mmap_config; +} + inline LoadFieldDataInfo PrepareInsertBinlog(int64_t collection_id, int64_t partition_id, diff --git a/internal/datanode/writebuffer/manager_test.go b/internal/datanode/writebuffer/manager_test.go index 61b3b693a2e82..b9b6acff31d97 100644 --- a/internal/datanode/writebuffer/manager_test.go +++ b/internal/datanode/writebuffer/manager_test.go @@ -15,6 +15,7 @@ import ( "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/datanode/syncmgr" + "github.com/milvus-io/milvus/internal/util/initcore" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/hardware" "github.com/milvus-io/milvus/pkg/util/merr" @@ -53,6 +54,7 @@ func (s *ManagerSuite) SetupSuite() { }, }, } + initcore.InitMmapManager(paramtable.Get()) s.channelName = "by-dev-rootcoord-dml_0_100_v0" } diff --git a/internal/querynodev2/delegator/delegator_data_test.go b/internal/querynodev2/delegator/delegator_data_test.go index 3e7933af71cef..70b8547b7a470 100644 --- a/internal/querynodev2/delegator/delegator_data_test.go +++ b/internal/querynodev2/delegator/delegator_data_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "path" + "path/filepath" "strconv" "testing" "time" @@ -41,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/tsafe" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/bloomfilter" + "github.com/milvus-io/milvus/internal/util/initcore" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/commonpbutil" @@ -48,6 +50,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/metric" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type DelegatorDataSuite struct { @@ -74,6 +77,9 @@ func (s *DelegatorDataSuite) SetupSuite() { paramtable.Init() paramtable.SetNodeID(1) paramtable.Get().Save(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.Key, "1") + localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) + initcore.InitLocalChunkManager(localDataRootPath) + initcore.InitMmapManager(paramtable.Get()) s.collectionID = 1000 s.replicaID = 65535 diff --git a/internal/querynodev2/segments/manager_test.go b/internal/querynodev2/segments/manager_test.go index 0be5c0af1bf7e..a5f4bd668a304 100644 --- a/internal/querynodev2/segments/manager_test.go +++ b/internal/querynodev2/segments/manager_test.go @@ -2,6 +2,7 @@ package segments import ( "context" + "path/filepath" "testing" "github.com/samber/lo" @@ -11,7 +12,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/util/initcore" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type ManagerSuite struct { @@ -37,6 +40,9 @@ func (s *ManagerSuite) SetupSuite() { s.channels = []string{"by-dev-rootcoord-dml_0_100v0", "by-dev-rootcoord-dml_1_200v0", "by-dev-rootcoord-dml_2_300v0", "by-dev-rootcoord-dml_3_400v0"} s.types = []SegmentType{SegmentTypeSealed, SegmentTypeGrowing, SegmentTypeSealed, SegmentTypeSealed} s.levels = []datapb.SegmentLevel{datapb.SegmentLevel_Legacy, datapb.SegmentLevel_Legacy, datapb.SegmentLevel_L1, datapb.SegmentLevel_L0} + localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) + initcore.InitLocalChunkManager(localDataRootPath) + initcore.InitMmapManager(paramtable.Get()) } func (s *ManagerSuite) SetupTest() { diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index 464df07e7ab74..c7e877d6fe7dc 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -3,6 +3,7 @@ package segments import ( "context" "fmt" + "path/filepath" "testing" "github.com/stretchr/testify/suite" @@ -13,6 +14,7 @@ import ( storage "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/initcore" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) type SegmentSuite struct { @@ -43,6 +45,9 @@ func (suite *SegmentSuite) SetupTest() { chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath) suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx) initcore.InitRemoteChunkManager(paramtable.Get()) + localDataRootPath := filepath.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), typeutil.QueryNodeRole) + initcore.InitLocalChunkManager(localDataRootPath) + initcore.InitMmapManager(paramtable.Get()) suite.collectionID = 100 suite.partitionID = 10 diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index c9a3d5cf42355..5e917e3c9fdcb 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -249,21 +249,10 @@ func (node *QueryNode) InitSegcore() error { return err } - mmapDirPath := paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() - if len(mmapDirPath) == 0 { - paramtable.Get().Save( - paramtable.Get().QueryNodeCfg.MmapDirPath.Key, - path.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), "mmap"), - ) - mmapDirPath = paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() - } - chunkCachePath := path.Join(mmapDirPath, "chunk_cache") - policy := paramtable.Get().QueryNodeCfg.ReadAheadPolicy.GetValue() - err = initcore.InitChunkCache(chunkCachePath, policy) + err = initcore.InitMmapManager(paramtable.Get()) if err != nil { return err } - log.Info("InitChunkCache done", zap.String("dir", chunkCachePath), zap.String("policy", policy)) initcore.InitTraceConfig(paramtable.Get()) return nil @@ -406,6 +395,7 @@ func (node *QueryNode) Start() error { paramtable.SetCreateTime(time.Now()) paramtable.SetUpdateTime(time.Now()) mmapEnabled := paramtable.Get().QueryNodeCfg.MmapEnabled.GetAsBool() + growingmmapEnable := paramtable.Get().QueryNodeCfg.GrowingMmapEnabled.GetAsBool() node.UpdateStateCode(commonpb.StateCode_Healthy) registry.GetInMemoryResolver().RegisterQueryNode(node.GetNodeID(), node) @@ -413,6 +403,7 @@ func (node *QueryNode) Start() error { zap.Int64("queryNodeID", node.GetNodeID()), zap.String("Address", node.address), zap.Bool("mmapEnabled", mmapEnabled), + zap.Bool("growingmmapEnable", growingmmapEnable), ) }) diff --git a/internal/util/initcore/init_core.go b/internal/util/initcore/init_core.go index bf6a2e903aa35..356a81d4bf79a 100644 --- a/internal/util/initcore/init_core.go +++ b/internal/util/initcore/init_core.go @@ -29,6 +29,7 @@ import "C" import ( "fmt" + "path" "time" "unsafe" @@ -160,13 +161,31 @@ func InitRemoteChunkManager(params *paramtable.ComponentParam) error { return HandleCStatus(&status, "InitRemoteChunkManagerSingleton failed") } -func InitChunkCache(mmapDirPath string, readAheadPolicy string) error { - cMmapDirPath := C.CString(mmapDirPath) - defer C.free(unsafe.Pointer(cMmapDirPath)) - cReadAheadPolicy := C.CString(readAheadPolicy) - defer C.free(unsafe.Pointer(cReadAheadPolicy)) - status := C.InitChunkCacheSingleton(cMmapDirPath, cReadAheadPolicy) - return HandleCStatus(&status, "InitChunkCacheSingleton failed") +func InitMmapManager(params *paramtable.ComponentParam) error { + mmapDirPath := params.QueryNodeCfg.MmapDirPath.GetValue() + if len(mmapDirPath) == 0 { + paramtable.Get().Save( + paramtable.Get().QueryNodeCfg.MmapDirPath.Key, + path.Join(paramtable.Get().LocalStorageCfg.Path.GetValue(), "mmap"), + ) + mmapDirPath = paramtable.Get().QueryNodeCfg.MmapDirPath.GetValue() + } + cMmapChunkManagerDir := C.CString(path.Join(mmapDirPath, "/mmap_chunk_manager/")) + cCacheReadAheadPolicy := C.CString(params.QueryNodeCfg.ReadAheadPolicy.GetValue()) + defer C.free(unsafe.Pointer(cMmapChunkManagerDir)) + defer C.free(unsafe.Pointer(cCacheReadAheadPolicy)) + diskCapacity := params.QueryNodeCfg.DiskCapacityLimit.GetAsUint64() + diskLimit := uint64(float64(params.QueryNodeCfg.MaxMmapDiskPercentageForMmapManager.GetAsUint64()*diskCapacity) * 0.01) + mmapFileSize := params.QueryNodeCfg.FixedFileSizeForMmapManager.GetAsUint64() * 1024 * 1024 + mmapConfig := C.CMmapConfig{ + cache_read_ahead_policy: cCacheReadAheadPolicy, + mmap_path: cMmapChunkManagerDir, + disk_limit: C.uint64_t(diskLimit), + fix_file_size: C.uint64_t(mmapFileSize), + growing_enable_mmap: C.bool(params.QueryNodeCfg.GrowingMmapEnabled.GetAsBool()), + } + status := C.InitMmapManager(mmapConfig) + return HandleCStatus(&status, "InitMmapManager failed") } func CleanRemoteChunkManager() { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 07dedb129d625..db4a758dc47b2 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2118,10 +2118,13 @@ type queryNodeConfig struct { DiskCacheCapacityLimit ParamItem `refreshable:"true"` // cache limit - CacheEnabled ParamItem `refreshable:"false"` - CacheMemoryLimit ParamItem `refreshable:"false"` - MmapDirPath ParamItem `refreshable:"false"` - MmapEnabled ParamItem `refreshable:"false"` + CacheEnabled ParamItem `refreshable:"false"` + CacheMemoryLimit ParamItem `refreshable:"false"` + MmapDirPath ParamItem `refreshable:"false"` + MmapEnabled ParamItem `refreshable:"false"` + GrowingMmapEnabled ParamItem `refreshable:"false"` + FixedFileSizeForMmapManager ParamItem `refreshable:"false"` + MaxMmapDiskPercentageForMmapManager ParamItem `refreshable:"false"` LazyLoadEnabled ParamItem `refreshable:"false"` LazyLoadWaitTimeout ParamItem `refreshable:"true"` @@ -2376,6 +2379,38 @@ func (p *queryNodeConfig) init(base *BaseTable) { } p.MmapEnabled.Init(base.mgr) + p.GrowingMmapEnabled = ParamItem{ + Key: "queryNode.mmap.growingMmapEnabled", + Version: "2.4.4", + DefaultValue: "false", + FallbackKeys: []string{"queryNode.growingMmapEnabled"}, + Doc: "Enable mmap for using in growing raw data", + Export: true, + Formatter: func(v string) string { + mmapEnabled := p.MmapEnabled.GetAsBool() + return strconv.FormatBool(mmapEnabled && getAsBool(v)) + }, + } + p.GrowingMmapEnabled.Init(base.mgr) + + p.FixedFileSizeForMmapManager = ParamItem{ + Key: "queryNode.mmap.fixedFileSizeForMmapAlloc", + Version: "2.4.0", + DefaultValue: "64", + Doc: "tmp file size for mmap chunk manager", + Export: true, + } + p.FixedFileSizeForMmapManager.Init(base.mgr) + + p.MaxMmapDiskPercentageForMmapManager = ParamItem{ + Key: "querynode.mmap.maxDiskUsagePercentageForMmapAlloc", + Version: "2.4.0", + DefaultValue: "20", + Doc: "disk percentage used in mmap chunk manager", + Export: true, + } + p.MaxMmapDiskPercentageForMmapManager.Init(base.mgr) + p.LazyLoadEnabled = ParamItem{ Key: "queryNode.lazyload.enabled", Version: "2.4.2",