Skip to content

Commit

Permalink
enhance: growing segment support mmap (#32633)
Browse files Browse the repository at this point in the history
issue: #32984

Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
  • Loading branch information
cqy123456 committed Jun 18, 2024
1 parent ec64499 commit 32f685f
Show file tree
Hide file tree
Showing 50 changed files with 2,096 additions and 374 deletions.
3 changes: 3 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,9 @@ queryNode:
warmup: disable
mmap:
mmapEnabled: false # Enable mmap for loading data
growingMmapEnabled: false # Enable mmap for growing segment
fixedFileSizeForMmapAlloc: 4 #MB, fixed file size for mmap chunk manager to store chunk data
maxDiskUsagePercentageForMmapAlloc: 20 # max percentage of disk usage in memory mapping
lazyload:
enabled: false # Enable lazyload for loading data
waitTimeout: 30000 # max wait timeout duration in milliseconds before start to do lazyload search and retrieve
Expand Down
10 changes: 10 additions & 0 deletions internal/core/src/common/Array.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ class Array {
return offsets_;
}

std::vector<uint64_t>
get_offsets_in_copy() const {
return offsets_;
}

ScalarArray
output_data() const {
ScalarArray data_array;
Expand Down Expand Up @@ -573,6 +578,11 @@ class ArrayView {
data() const {
return data_;
}
// copy to result
std::vector<uint64_t>
get_offsets_in_copy() const {
return offsets_;
}

bool
is_same_array(const proto::plan::Array& arr2) const {
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/common/EasyAssert.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ enum ErrorCode {
MetricTypeNotMatch = 2031,
DimNotMatch = 2032,
ClusterSkip = 2033,
MemAllocateFailed = 2034,
MemAllocateSizeNotMatch = 2035,
MmapError = 2036,
KnowhereError = 2100,

// timeout or cancel related.
Expand Down
18 changes: 18 additions & 0 deletions internal/core/src/common/VectorTrait.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ template <typename T>
constexpr bool IsSparse = std::is_same_v<T, SparseFloatVector> ||
std::is_same_v<T, knowhere::sparse::SparseRow<float>>;

template <typename T>
constexpr bool IsVariableType =
std::is_same_v<T, std::string> || std::is_same_v<T, std::string_view> ||
std::is_same_v<T, Array> || std::is_same_v<T, ArrayView> ||
std::is_same_v<T, proto::plan::Array> || std::is_same_v<T, Json> ||
IsSparse<T>;

template <typename T>
constexpr bool IsVariableTypeSupportInChunk =
std::is_same_v<T, std::string> || std::is_same_v<T, Array> ||
std::is_same_v<T, Json>;

template <typename T>
using ChunkViewType = std::conditional_t<
std::is_same_v<T, std::string>,
std::string_view,
std::conditional_t<std::is_same_v<T, Array>, ArrayView, T>>;

struct FundamentalTag {};
struct StringTag {};

Expand Down
8 changes: 8 additions & 0 deletions internal/core/src/common/type_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ typedef struct CStorageConfig {
int64_t requestTimeoutMs;
} CStorageConfig;

typedef struct CMmapConfig {
const char* cache_read_ahead_policy;
const char* mmap_path;
uint64_t disk_limit;
uint64_t fix_file_size;
bool growing_enable_mmap;
} CMmapConfig;

typedef struct CTraceConfig {
const char* exporter;
float sampleFraction;
Expand Down
5 changes: 4 additions & 1 deletion internal/core/src/exec/expression/BinaryRangeExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ PhyBinaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
break;
}
case DataType::VARCHAR: {
if (segment_->type() == SegmentType::Growing) {
if (segment_->type() == SegmentType::Growing &&
!storage::MmapManager::GetInstance()
.GetMmapConfig()
.growing_enable_mmap) {
result = ExecRangeVisitorImpl<std::string>();
} else {
result = ExecRangeVisitorImpl<std::string_view>();
Expand Down
5 changes: 4 additions & 1 deletion internal/core/src/exec/expression/CompareExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ PhyCompareFilterExpr::GetChunkData<std::string>(FieldId field_id,
};
}
}
if (segment_->type() == SegmentType::Growing) {
if (segment_->type() == SegmentType::Growing &&
!storage::MmapManager::GetInstance()
.GetMmapConfig()
.growing_enable_mmap) {
auto chunk_data =
segment_->chunk_data<std::string>(field_id, chunk_id).data();
return [chunk_data](int i) -> const number { return chunk_data[i]; };
Expand Down
1 change: 0 additions & 1 deletion internal/core/src/exec/expression/Expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ class SegmentExpr : public Expr {
TargetBitmapView res,
ValTypes... values) {
int64_t processed_size = 0;

for (size_t i = current_data_chunk_; i < num_data_chunk_; i++) {
auto data_pos =
(i == current_data_chunk_) ? current_data_chunk_pos_ : 0;
Expand Down
5 changes: 4 additions & 1 deletion internal/core/src/exec/expression/TermExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ PhyTermFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
break;
}
case DataType::VARCHAR: {
if (segment_->type() == SegmentType::Growing) {
if (segment_->type() == SegmentType::Growing &&
!storage::MmapManager::GetInstance()
.GetMmapConfig()
.growing_enable_mmap) {
result = ExecVisitorImpl<std::string>();
} else {
result = ExecVisitorImpl<std::string_view>();
Expand Down
7 changes: 5 additions & 2 deletions internal/core/src/exec/expression/UnaryExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,10 @@ PhyUnaryRangeFilterExpr::Eval(EvalCtx& context, VectorPtr& result) {
break;
}
case DataType::VARCHAR: {
if (segment_->type() == SegmentType::Growing) {
if (segment_->type() == SegmentType::Growing &&
!storage::MmapManager::GetInstance()
.GetMmapConfig()
.growing_enable_mmap) {
result = ExecRangeVisitorImpl<std::string>();
} else {
result = ExecRangeVisitorImpl<std::string_view>();
Expand Down Expand Up @@ -294,7 +297,7 @@ PhyUnaryRangeFilterExpr::ExecArrayEqualForIndex(bool reverse) {

// filtering by index, get candidates.
auto size_per_chunk = segment_->size_per_chunk();
auto retrieve = [ size_per_chunk, this ](int64_t offset) -> auto {
auto retrieve = [ size_per_chunk, this ](int64_t offset) -> auto{
auto chunk_idx = offset / size_per_chunk;
auto chunk_offset = offset % size_per_chunk;
const auto& chunk =
Expand Down
195 changes: 195 additions & 0 deletions internal/core/src/mmap/ChunkData.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "common/Array.h"
#include "storage/MmapManager.h"
namespace milvus {
/**
* @brief FixedLengthChunk
*/
template <typename Type>
struct FixedLengthChunk {
public:
FixedLengthChunk() = delete;
explicit FixedLengthChunk(const uint64_t size,
storage::MmapChunkDescriptor descriptor)
: mmap_descriptor_(descriptor), size_(size) {
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
data_ = (Type*)(mcm->Allocate(mmap_descriptor_, sizeof(Type) * size));
AssertInfo(data_ != nullptr,
"failed to create a mmapchunk: {}, map_size");
};
void*
data() {
return data_;
};
size_t
size() {
return size_;
};
Type
get(const int i) const {
return data_[i];
}
const Type&
view(const int i) const {
return data_[i];
}

private:
int64_t size_ = 0;
Type* data_ = nullptr;
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
};
/**
* @brief VariableLengthChunk
*/
template <typename Type>
struct VariableLengthChunk {
static_assert(IsVariableTypeSupportInChunk<Type>);

public:
VariableLengthChunk() = delete;
explicit VariableLengthChunk(const uint64_t size,
storage::MmapChunkDescriptor descriptor)
: mmap_descriptor_(descriptor), size_(size) {
data_ = FixedVector<ChunkViewType<Type>>(size);
};
inline void
set(const Type* src, uint32_t begin, uint32_t length) {
throw std::runtime_error(
"set should be a template specialization function");
}
inline Type
get(const int i) const {
throw std::runtime_error(
"get should be a template specialization function");
}
const ChunkViewType<Type>&
view(const int i) const {
return data_[i];
}
const ChunkViewType<Type>&
operator[](const int i) const {
return view(i);
}
void*
data() {
return data_.data();
};
size_t
size() {
return size_;
};

private:
int64_t size_ = 0;
FixedVector<ChunkViewType<Type>> data_;
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
};
template <>
inline void
VariableLengthChunk<std::string>::set(const std::string* src,
uint32_t begin,
uint32_t length) {
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
milvus::ErrorCode err_code;
AssertInfo(
begin + length <= size_,
"failed to set a chunk with length: {} from beign {}, map_size={}",
length,
begin,
size_);
for (auto i = 0; i < length; i++) {
auto buf_size = src[i].size() + 1;
auto buf = (char*)mcm->Allocate(mmap_descriptor_, buf_size);
AssertInfo(buf != nullptr,
"failed to allocate memory from mmap_manager, error_code");
std::strcpy(buf, src[i].c_str());
data_[i + begin] = std::string_view(buf, src[i].size());
}
}
template <>
inline std::string
VariableLengthChunk<std::string>::get(const int i) const {
// copy to a string
return std::string(data_[i]);
}
template <>
inline void
VariableLengthChunk<Json>::set(const Json* src,
uint32_t begin,
uint32_t length) {
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
milvus::ErrorCode err_code;
AssertInfo(
begin + length <= size_,
"failed to set a chunk with length: {} from beign {}, map_size={}",
length,
begin,
size_);
for (auto i = 0; i < length; i++) {
auto buf_size = src[i].size() + simdjson::SIMDJSON_PADDING + 1;
auto buf = (char*)mcm->Allocate(mmap_descriptor_, buf_size);
AssertInfo(
buf != nullptr,
"failed to allocate memory from mmap_manager, error_code:{}");
std::strcpy(buf, src[i].c_str());
data_[i + begin] = Json(buf, src[i].size());
}
}
template <>
inline Json
VariableLengthChunk<Json>::get(const int i) const {
return std::move(Json(simdjson::padded_string(data_[i].data())));
}
template <>
inline void
VariableLengthChunk<Array>::set(const Array* src,
uint32_t begin,
uint32_t length) {
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
milvus::ErrorCode err_code;
AssertInfo(
begin + length <= size_,
"failed to set a chunk with length: {} from beign {}, map_size={}",
length,
begin,
size_);
for (auto i = 0; i < length; i++) {
auto array_data =
(char*)mcm->Allocate(mmap_descriptor_, src[i].byte_size());
AssertInfo(array_data != nullptr,
"failed to allocate memory from mmap_manager, error_code");
std::copy(
src[i].data(), src[i].data() + src[i].byte_size(), array_data);
data_[i + begin] = ArrayView(array_data,
src[i].byte_size(),
src[i].get_element_type(),
src[i].get_offsets_in_copy());
}
}
template <>
inline Array
VariableLengthChunk<Array>::get(const int i) const {
auto array_view_i = data_[i];
char* data = static_cast<char*>(const_cast<void*>(array_view_i.data()));
return Array(data,
array_view_i.byte_size(),
array_view_i.get_element_type(),
array_view_i.get_offsets_in_copy());
}
} // namespace milvus
Loading

0 comments on commit 32f685f

Please sign in to comment.