Skip to content

Commit

Permalink
enhance: use segment id and type to register in MmapChunkManager and …
Browse files Browse the repository at this point in the history
…opt malloc in variableChunk (milvus-io#33993)

issue: milvus-io#32984

Signed-off-by: cqy123456 <qianya.cheng@zilliz.com>
  • Loading branch information
cqy123456 authored and yellow-shine committed Jul 2, 2024
1 parent 23d9656 commit 2d98214
Show file tree
Hide file tree
Showing 17 changed files with 174 additions and 105 deletions.
1 change: 1 addition & 0 deletions internal/core/src/common/type_c.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
extern "C" {
#endif

// WARNING: do not change the enum value of Growing and Sealed
enum SegmentType {
Invalid = 0,
Growing = 1,
Expand Down
66 changes: 41 additions & 25 deletions internal/core/src/mmap/ChunkData.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct FixedLengthChunk {
public:
FixedLengthChunk() = delete;
explicit FixedLengthChunk(const uint64_t size,
storage::MmapChunkDescriptor descriptor)
storage::MmapChunkDescriptorPtr descriptor)
: mmap_descriptor_(descriptor), size_(size) {
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
data_ = (Type*)(mcm->Allocate(mmap_descriptor_, sizeof(Type) * size));
Expand All @@ -52,7 +52,7 @@ struct FixedLengthChunk {
private:
int64_t size_ = 0;
Type* data_ = nullptr;
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
};
/**
* @brief VariableLengthChunk
Expand All @@ -64,7 +64,7 @@ struct VariableLengthChunk {
public:
VariableLengthChunk() = delete;
explicit VariableLengthChunk(const uint64_t size,
storage::MmapChunkDescriptor descriptor)
storage::MmapChunkDescriptorPtr descriptor)
: mmap_descriptor_(descriptor), size_(size) {
data_ = FixedVector<ChunkViewType<Type>>(size);
};
Expand Down Expand Up @@ -98,7 +98,7 @@ struct VariableLengthChunk {
private:
int64_t size_ = 0;
FixedVector<ChunkViewType<Type>> data_;
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
};
template <>
inline void
Expand All @@ -113,13 +113,19 @@ VariableLengthChunk<std::string>::set(const std::string* src,
length,
begin,
size_);
size_t total_size = 0;
size_t padding_size = 1;
for (auto i = 0; i < length; i++) {
auto buf_size = src[i].size() + 1;
auto buf = (char*)mcm->Allocate(mmap_descriptor_, buf_size);
AssertInfo(buf != nullptr,
"failed to allocate memory from mmap_manager, error_code");
std::strcpy(buf, src[i].c_str());
data_[i + begin] = std::string_view(buf, src[i].size());
total_size += src[i].size() + padding_size;
}
auto buf = (char*)mcm->Allocate(mmap_descriptor_, total_size);
AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager.");
for (auto i = 0, offset = 0; i < length; i++) {
auto data_size = src[i].size() + padding_size;
char* data_ptr = buf + offset;
std::strcpy(data_ptr, src[i].c_str());
data_[i + begin] = std::string_view(data_ptr, src[i].size());
offset += data_size;
}
}
template <>
Expand All @@ -141,14 +147,19 @@ VariableLengthChunk<Json>::set(const Json* src,
length,
begin,
size_);
size_t total_size = 0;
size_t padding_size = simdjson::SIMDJSON_PADDING + 1;
for (auto i = 0; i < length; i++) {
auto buf_size = src[i].size() + simdjson::SIMDJSON_PADDING + 1;
auto buf = (char*)mcm->Allocate(mmap_descriptor_, buf_size);
AssertInfo(
buf != nullptr,
"failed to allocate memory from mmap_manager, error_code:{}");
std::strcpy(buf, src[i].c_str());
data_[i + begin] = Json(buf, src[i].size());
total_size += src[i].size() + padding_size;
}
auto buf = (char*)mcm->Allocate(mmap_descriptor_, total_size);
AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager.");
for (auto i = 0, offset = 0; i < length; i++) {
auto data_size = src[i].size() + padding_size;
char* data_ptr = buf + offset;
std::strcpy(data_ptr, src[i].c_str());
data_[i + begin] = Json(data_ptr, src[i].size());
offset += data_size;
}
}
template <>
Expand All @@ -169,17 +180,22 @@ VariableLengthChunk<Array>::set(const Array* src,
length,
begin,
size_);
size_t total_size = 0;
size_t padding_size = 0;
for (auto i = 0; i < length; i++) {
auto array_data =
(char*)mcm->Allocate(mmap_descriptor_, src[i].byte_size());
AssertInfo(array_data != nullptr,
"failed to allocate memory from mmap_manager, error_code");
std::copy(
src[i].data(), src[i].data() + src[i].byte_size(), array_data);
data_[i + begin] = ArrayView(array_data,
src[i].byte_size(),
total_size += src[i].byte_size() + padding_size;
}
auto buf = (char*)mcm->Allocate(mmap_descriptor_, total_size);
AssertInfo(buf != nullptr, "failed to allocate memory from mmap_manager.");
for (auto i = 0, offset = 0; i < length; i++) {
auto data_size = src[i].byte_size() + padding_size;
char* data_ptr = buf + offset;
std::copy(src[i].data(), src[i].data() + src[i].byte_size(), data_ptr);
data_[i + begin] = ArrayView(data_ptr,
data_size,
src[i].get_element_type(),
src[i].get_offsets_in_copy());
offset += data_size;
}
}
template <>
Expand Down
7 changes: 4 additions & 3 deletions internal/core/src/mmap/ChunkVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ template <typename Type,
bool IsMmap = false>
class ThreadSafeChunkVector : public ChunkVectorBase<Type> {
public:
ThreadSafeChunkVector(storage::MmapChunkDescriptor descriptor = nullptr) {
ThreadSafeChunkVector(
storage::MmapChunkDescriptorPtr descriptor = nullptr) {
mmap_descriptor_ = descriptor;
}

Expand Down Expand Up @@ -181,13 +182,13 @@ class ThreadSafeChunkVector : public ChunkVectorBase<Type> {

private:
mutable std::shared_mutex mutex_;
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
std::deque<ChunkImpl> vec_;
};

template <typename Type>
ChunkVectorPtr<Type>
SelectChunkVectorPtr(storage::MmapChunkDescriptor& mmap_descriptor) {
SelectChunkVectorPtr(storage::MmapChunkDescriptorPtr& mmap_descriptor) {
if constexpr (!IsVariableType<Type>) {
if (mmap_descriptor != nullptr) {
return std::make_unique<
Expand Down
12 changes: 6 additions & 6 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class ColumnBase {
int dim,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptor descriptor)
storage::MmapChunkDescriptorPtr descriptor)
: mcm_(mcm),
mmap_descriptor_(descriptor),
type_size_(GetDataTypeSize(data_type, dim)),
Expand Down Expand Up @@ -340,7 +340,7 @@ class ColumnBase {

// length in bytes
size_t size_{0};
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;

private:
void
Expand Down Expand Up @@ -401,7 +401,7 @@ class Column : public ColumnBase {
int dim,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptor descriptor)
storage::MmapChunkDescriptorPtr descriptor)
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
}

Expand Down Expand Up @@ -440,7 +440,7 @@ class SparseFloatColumn : public ColumnBase {
int dim,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptor descriptor)
storage::MmapChunkDescriptorPtr descriptor)
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
}

Expand Down Expand Up @@ -543,7 +543,7 @@ class VariableColumn : public ColumnBase {
int dim,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptor descriptor)
storage::MmapChunkDescriptorPtr descriptor)
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
}

Expand Down Expand Up @@ -658,7 +658,7 @@ class ArrayColumn : public ColumnBase {
int dim,
const DataType& data_type,
storage::MmapChunkManagerPtr mcm,
storage::MmapChunkDescriptor descriptor)
storage::MmapChunkDescriptorPtr descriptor)
: ColumnBase(reserve, dim, data_type, mcm, descriptor) {
}

Expand Down
20 changes: 10 additions & 10 deletions internal/core/src/segcore/ConcurrentVector.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ class ConcurrentVectorImpl : public VectorBase {
explicit ConcurrentVectorImpl(
ssize_t elements_per_row,
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
: VectorBase(size_per_chunk),
elements_per_row_(is_type_entire_row ? 1 : elements_per_row) {
chunks_ptr_ = SelectChunkVectorPtr<Type>(mmap_descriptor);
Expand Down Expand Up @@ -359,7 +359,7 @@ class ConcurrentVector : public ConcurrentVectorImpl<Type, true> {
static_assert(IsScalar<Type> || std::is_same_v<Type, PkType>);
explicit ConcurrentVector(
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
: ConcurrentVectorImpl<Type, true>::ConcurrentVectorImpl(
1, size_per_chunk, mmap_descriptor) {
}
Expand All @@ -371,7 +371,7 @@ class ConcurrentVector<std::string>
public:
explicit ConcurrentVector(
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
: ConcurrentVectorImpl<std::string, true>::ConcurrentVectorImpl(
1, size_per_chunk, mmap_descriptor) {
}
Expand All @@ -389,7 +389,7 @@ class ConcurrentVector<Json> : public ConcurrentVectorImpl<Json, true> {
public:
explicit ConcurrentVector(
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
: ConcurrentVectorImpl<Json, true>::ConcurrentVectorImpl(
1, size_per_chunk, mmap_descriptor) {
}
Expand All @@ -408,7 +408,7 @@ class ConcurrentVector<Array> : public ConcurrentVectorImpl<Array, true> {
public:
explicit ConcurrentVector(
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
: ConcurrentVectorImpl<Array, true>::ConcurrentVectorImpl(
1, size_per_chunk, mmap_descriptor) {
}
Expand All @@ -427,7 +427,7 @@ class ConcurrentVector<SparseFloatVector>
public:
explicit ConcurrentVector(
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
: ConcurrentVectorImpl<knowhere::sparse::SparseRow<float>,
true>::ConcurrentVectorImpl(1,
size_per_chunk,
Expand Down Expand Up @@ -465,7 +465,7 @@ class ConcurrentVector<FloatVector>
public:
ConcurrentVector(int64_t dim,
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
: ConcurrentVectorImpl<float, false>::ConcurrentVectorImpl(
dim, size_per_chunk, mmap_descriptor) {
}
Expand All @@ -478,7 +478,7 @@ class ConcurrentVector<BinaryVector>
explicit ConcurrentVector(
int64_t dim,
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
: ConcurrentVectorImpl(dim / 8, size_per_chunk, mmap_descriptor) {
AssertInfo(dim % 8 == 0,
fmt::format("dim is not a multiple of 8, dim={}", dim));
Expand All @@ -491,7 +491,7 @@ class ConcurrentVector<Float16Vector>
public:
ConcurrentVector(int64_t dim,
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
: ConcurrentVectorImpl<float16, false>::ConcurrentVectorImpl(
dim, size_per_chunk, mmap_descriptor) {
}
Expand All @@ -503,7 +503,7 @@ class ConcurrentVector<BFloat16Vector>
public:
ConcurrentVector(int64_t dim,
int64_t size_per_chunk,
storage::MmapChunkDescriptor mmap_descriptor = nullptr)
storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
: ConcurrentVectorImpl<bfloat16, false>::ConcurrentVectorImpl(
dim, size_per_chunk, mmap_descriptor) {
}
Expand Down
9 changes: 5 additions & 4 deletions internal/core/src/segcore/InsertRecord.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,10 @@ class OffsetOrderedArray : public OffsetMap {

template <bool is_sealed = false>
struct InsertRecord {
InsertRecord(const Schema& schema,
const int64_t size_per_chunk,
const storage::MmapChunkDescriptor mmap_descriptor = nullptr)
InsertRecord(
const Schema& schema,
const int64_t size_per_chunk,
const storage::MmapChunkDescriptorPtr mmap_descriptor = nullptr)
: timestamps_(size_per_chunk), mmap_descriptor_(mmap_descriptor) {
std::optional<FieldId> pk_field_id = schema.get_primary_field_id();

Expand Down Expand Up @@ -630,7 +631,7 @@ struct InsertRecord {
private:
std::unordered_map<FieldId, std::unique_ptr<VectorBase>> fields_data_{};
mutable std::shared_mutex shared_mutex_{};
storage::MmapChunkDescriptor mmap_descriptor_;
storage::MmapChunkDescriptorPtr mmap_descriptor_;
};

} // namespace milvus::segcore
6 changes: 3 additions & 3 deletions internal/core/src/segcore/SegmentGrowingImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ class SegmentGrowingImpl : public SegmentGrowing {
: mmap_descriptor_(storage::MmapManager::GetInstance()
.GetMmapConfig()
.GetEnableGrowingMmap()
? storage::MmapChunkDescriptor(
new storage::MmapChunkDescriptorValue(
? storage::MmapChunkDescriptorPtr(
new storage::MmapChunkDescriptor(
{segment_id, SegmentType::Growing}))
: nullptr),
segcore_config_(segcore_config),
Expand Down Expand Up @@ -317,7 +317,7 @@ class SegmentGrowingImpl : public SegmentGrowing {
}

private:
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
SegcoreConfig segcore_config_;
SchemaPtr schema_;
IndexMetaPtr index_meta_;
Expand Down
7 changes: 3 additions & 4 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -821,7 +821,7 @@ SegmentSealedImpl::GetFieldDataPath(FieldId field_id, int64_t offset) const {
std::tuple<std::string, std::shared_ptr<ColumnBase>> static ReadFromChunkCache(
const storage::ChunkCachePtr& cc,
const std::string& data_path,
const storage::MmapChunkDescriptor& descriptor) {
const storage::MmapChunkDescriptorPtr& descriptor) {
auto column = cc->Read(data_path, descriptor);
cc->Prefetch(data_path);
return {data_path, column};
Expand Down Expand Up @@ -1032,9 +1032,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
id_(segment_id),
col_index_meta_(index_meta),
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) {
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptorValue>(
new storage::MmapChunkDescriptorValue(
{segment_id, SegmentType::Sealed}));
mmap_descriptor_ = std::shared_ptr<storage::MmapChunkDescriptor>(
new storage::MmapChunkDescriptor({segment_id, SegmentType::Sealed}));
auto mcm = storage::MmapManager::GetInstance().GetMmapChunkManager();
mcm->Register(mmap_descriptor_);
}
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ class SegmentSealedImpl : public SegmentSealed {

private:
// mmap descriptor, used in chunk cache
storage::MmapChunkDescriptor mmap_descriptor_ = nullptr;
storage::MmapChunkDescriptorPtr mmap_descriptor_ = nullptr;
// segment loading state
BitsetType field_data_ready_bitset_;
BitsetType index_ready_bitset_;
Expand Down
4 changes: 2 additions & 2 deletions internal/core/src/storage/ChunkCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
namespace milvus::storage {
std::shared_ptr<ColumnBase>
ChunkCache::Read(const std::string& filepath,
const MmapChunkDescriptor& descriptor) {
const MmapChunkDescriptorPtr& descriptor) {
{
std::shared_lock lck(mutex_);
auto it = columns_.find(filepath);
Expand Down Expand Up @@ -75,7 +75,7 @@ ChunkCache::Prefetch(const std::string& filepath) {

std::shared_ptr<ColumnBase>
ChunkCache::Mmap(const FieldDataPtr& field_data,
const MmapChunkDescriptor& descriptor) {
const MmapChunkDescriptorPtr& descriptor) {
auto dim = field_data->get_dim();
auto data_type = field_data->get_data_type();

Expand Down
Loading

0 comments on commit 2d98214

Please sign in to comment.