Skip to content

Commit

Permalink
sparse float vector to support raw data mmap
Browse files Browse the repository at this point in the history
Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
  • Loading branch information
zhengbuqian committed May 22, 2024
1 parent 303470f commit 2b4a913
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 69 deletions.
67 changes: 53 additions & 14 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
namespace milvus {

/*
* If string field's value all empty, need a string padding to avoid
* If string field's value all empty, need a string padding to avoid
* mmap failing because size_ is zero which causing invalid arguement
* array has the same problem
* TODO: remove it when support NULL value
Expand Down Expand Up @@ -84,6 +84,7 @@ class ColumnBase {
}

// mmap mode ctor
// User must call Seal to build the view for variable length column.
ColumnBase(const File& file, size_t size, const FieldMeta& field_meta)
: type_size_(IsSparseFloatVectorDataType(field_meta.get_data_type())
? 1
Expand All @@ -106,12 +107,16 @@ class ColumnBase {
}

// mmap mode ctor
// User must call Seal to build the view for variable length column.
ColumnBase(const File& file,
size_t size,
int dim,
const DataType& data_type)
: type_size_(GetDataTypeSize(data_type, dim)),
num_rows_(size / GetDataTypeSize(data_type, dim)),
: type_size_(IsSparseFloatVectorDataType(data_type)
? 1
: GetDataTypeSize(data_type, dim)),
num_rows_(
IsSparseFloatVectorDataType(data_type) ? 1 : (size / type_size_)),
size_(size),
cap_size_(size),
is_map_anonymous_(false) {
Expand All @@ -123,6 +128,7 @@ class ColumnBase {
AssertInfo(data_ != MAP_FAILED,
"failed to create file-backed map, err: {}",
strerror(errno));
madvise(data_, mapped_size, MADV_WILLNEED);

UpdateMetricWhenMmap(mapped_size);
}
Expand Down Expand Up @@ -153,11 +159,18 @@ class ColumnBase {
column.size_ = 0;
}

// Data() points at an addr that contains the elements
virtual const char*
Data() const {
return data_;
}

// MmappedData() returns the mmaped address
const char*
MmappedData() const {
return data_;

Check warning on line 171 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L170-L171

Added lines #L170 - L171 were not covered by tests
}

size_t
NumRows() const {
return num_rows_;
Expand Down Expand Up @@ -273,6 +286,7 @@ class ColumnBase {
// capacity in bytes
size_t cap_size_{0};
size_t padding_{0};
// type_size_ is not used for sparse float vector column.
const size_t type_size_{1};
size_t num_rows_{0};

Expand Down Expand Up @@ -344,8 +358,7 @@ class Column : public ColumnBase {
}
};

// mmap not yet supported, thus SparseFloatColumn is not using fields in super
// class such as ColumnBase::data.
// when mmap is used, size_, data_ and num_rows_ of ColumnBase are used.
class SparseFloatColumn : public ColumnBase {
public:
// memory mode ctor
Expand All @@ -356,7 +369,13 @@ class SparseFloatColumn : public ColumnBase {
size_t size,
const FieldMeta& field_meta)
: ColumnBase(file, size, field_meta) {
AssertInfo(false, "SparseFloatColumn mmap mode not supported");
}
// mmap mode ctor
SparseFloatColumn(const File& file,

Check warning on line 374 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L374

Added line #L374 was not covered by tests
size_t size,
int dim,
const DataType& data_type)
: ColumnBase(file, size, dim, data_type) {

Check warning on line 378 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L378

Added line #L378 was not covered by tests
}

SparseFloatColumn(SparseFloatColumn&& column) noexcept
Expand All @@ -372,14 +391,6 @@ class SparseFloatColumn : public ColumnBase {
return static_cast<const char*>(static_cast<const void*>(vec_.data()));
}

// This is used to advice mmap prefetch, we don't currently support mmap for
// sparse float vector thus not implemented for now.
size_t
ByteSize() const override {
throw std::runtime_error(
"ByteSize not supported for sparse float column");
}

size_t
Capacity() const override {
throw std::runtime_error(
Expand Down Expand Up @@ -413,6 +424,34 @@ class SparseFloatColumn : public ColumnBase {
return dim_;
}

void
Seal(std::vector<uint64_t> indices) {
AssertInfo(!indices.empty(),

Check warning on line 429 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L428-L429

Added lines #L428 - L429 were not covered by tests
"indices should not be empty, Seal() of "
"SparseFloatColumn must be called only "
"at mmap mode");
AssertInfo(data_,

Check warning on line 433 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L433

Added line #L433 was not covered by tests
"data_ should not be nullptr, Seal() of "
"SparseFloatColumn must be called only "
"at mmap mode");
num_rows_ = indices.size();

Check warning on line 437 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L437

Added line #L437 was not covered by tests
// so that indices[num_rows_] - indices[num_rows_ - 1] is the size of
// the last row.
indices.push_back(size_);
for (size_t i = 0; i < num_rows_; i++) {
auto vec_size = indices[i + 1] - indices[i];
AssertInfo(

Check warning on line 443 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L440-L443

Added lines #L440 - L443 were not covered by tests
vec_size % knowhere::sparse::SparseRow<float>::element_size() ==
0,
"Incorrect sparse vector size: {}",
vec_size);
vec_.emplace_back(
vec_size / knowhere::sparse::SparseRow<float>::element_size(),
(uint8_t*)(data_) + indices[i],
false);

Check warning on line 451 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L449-L451

Added lines #L449 - L451 were not covered by tests
}
}

private:
int64_t dim_ = 0;
std::vector<knowhere::sparse::SparseRow<float>> vec_;
Expand Down
16 changes: 12 additions & 4 deletions internal/core/src/mmap/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,18 @@ WriteFieldData(File& file,
break;
}
case DataType::VECTOR_SPARSE_FLOAT: {
// TODO(SPARSE): this is for mmap to write data to disk so that
// the file can be mmaped into memory.
throw std::runtime_error(
"WriteFieldData for VECTOR_SPARSE_FLOAT not implemented");
for (size_t i = 0; i < data->get_num_rows(); ++i) {

Check warning on line 84 in internal/core/src/mmap/Utils.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Utils.h#L84

Added line #L84 was not covered by tests
auto vec =
static_cast<const knowhere::sparse::SparseRow<float>*>(
data->RawValue(i));

Check warning on line 87 in internal/core/src/mmap/Utils.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Utils.h#L87

Added line #L87 was not covered by tests
ssize_t written =
file.Write(vec->data(), vec->data_byte_size());
if (written < vec->data_byte_size()) {
break;

Check warning on line 91 in internal/core/src/mmap/Utils.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Utils.h#L89-L91

Added lines #L89 - L91 were not covered by tests
}
total_written += written;

Check warning on line 93 in internal/core/src/mmap/Utils.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Utils.h#L93

Added line #L93 was not covered by tests
}
break;

Check warning on line 95 in internal/core/src/mmap/Utils.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Utils.h#L95

Added line #L95 was not covered by tests
}
default:
PanicInfo(DataTypeInvalid,
Expand Down
78 changes: 55 additions & 23 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,13 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
column = std::move(arr_column);
break;
}
// TODO(SPARSE) support mmap
case milvus::DataType::VECTOR_SPARSE_FLOAT: {

Check warning on line 588 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L588

Added line #L588 was not covered by tests
auto sparse_column = std::make_shared<SparseFloatColumn>(
file, total_written, field_meta);
sparse_column->Seal(std::move(indices));
column = std::move(sparse_column);
break;

Check warning on line 593 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L590-L593

Added lines #L590 - L593 were not covered by tests
}
default: {
PanicInfo(DataTypeInvalid,
fmt::format("unsupported data type {}", data_type));
Expand Down Expand Up @@ -850,7 +856,7 @@ SegmentSealedImpl::get_vector(FieldId field_id,
auto metric_type = vec_index->GetMetricType();
auto has_raw_data = vec_index->HasRawData();

if (has_raw_data) {
if (has_raw_data && !TEST_skip_index_for_retrieve_) {
// If index has raw data, get vector from memory.
auto ids_ds = GenIdsDataset(count, ids);
if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) {
Expand All @@ -865,9 +871,6 @@ SegmentSealedImpl::get_vector(FieldId field_id,
}
}

AssertInfo(field_meta.get_data_type() != DataType::VECTOR_SPARSE_FLOAT,
"index of sparse float vector is guaranteed to have raw data");

// If index doesn't have raw data, get vector from chunk cache.
auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache();

Expand Down Expand Up @@ -898,23 +901,50 @@ SegmentSealedImpl::get_vector(FieldId field_id,
path_to_column[data_path] = column;
}

// assign to data array
auto row_bytes = field_meta.get_sizeof();
auto buf = std::vector<char>(count * row_bytes);
for (auto i = 0; i < count; i++) {
AssertInfo(id_to_data_path.count(ids[i]) != 0, "id not found");
const auto& [data_path, offset_in_binlog] = id_to_data_path.at(ids[i]);
AssertInfo(path_to_column.count(data_path) != 0, "column not found");
const auto& column = path_to_column.at(data_path);
AssertInfo(offset_in_binlog * row_bytes < column->ByteSize(),
"column idx out of range, idx: {}, size: {}, data_path: {}",
offset_in_binlog * row_bytes,
column->ByteSize(),
data_path);
auto vector = &column->Data()[offset_in_binlog * row_bytes];
std::memcpy(buf.data() + i * row_bytes, vector, row_bytes);
if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) {
auto buf = std::vector<knowhere::sparse::SparseRow<float>>(count);
for (auto i = 0; i < count; ++i) {
const auto& [data_path, offset_in_binlog] =
id_to_data_path.at(ids[i]);
const auto& column = path_to_column.at(data_path);
AssertInfo(

Check warning on line 910 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L904-L910

Added lines #L904 - L910 were not covered by tests
offset_in_binlog < column->NumRows(),
"column idx out of range, idx: {}, size: {}, data_path: {}",
offset_in_binlog,
column->NumRows(),
data_path);
auto sparse_column =
std::dynamic_pointer_cast<SparseFloatColumn>(column);
AssertInfo(sparse_column, "incorrect column created");
buf[i] = static_cast<const knowhere::sparse::SparseRow<float>*>(

Check warning on line 919 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L917-L919

Added lines #L917 - L919 were not covered by tests
static_cast<const void*>(
sparse_column->Data()))[offset_in_binlog];

Check warning on line 921 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L921

Added line #L921 was not covered by tests
}
return segcore::CreateVectorDataArrayFrom(
buf.data(), count, field_meta);

Check warning on line 924 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L924

Added line #L924 was not covered by tests
} else {
// assign to data array
auto row_bytes = field_meta.get_sizeof();
auto buf = std::vector<char>(count * row_bytes);
for (auto i = 0; i < count; ++i) {
AssertInfo(id_to_data_path.count(ids[i]) != 0, "id not found");
const auto& [data_path, offset_in_binlog] =
id_to_data_path.at(ids[i]);
AssertInfo(path_to_column.count(data_path) != 0,

Check warning on line 933 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L927-L933

Added lines #L927 - L933 were not covered by tests
"column not found");
const auto& column = path_to_column.at(data_path);
AssertInfo(

Check warning on line 936 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L935-L936

Added lines #L935 - L936 were not covered by tests
offset_in_binlog * row_bytes < column->ByteSize(),
"column idx out of range, idx: {}, size: {}, data_path: {}",
offset_in_binlog * row_bytes,
column->ByteSize(),
data_path);
auto vector = &column->Data()[offset_in_binlog * row_bytes];
std::memcpy(buf.data() + i * row_bytes, vector, row_bytes);

Check warning on line 943 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L942-L943

Added lines #L942 - L943 were not covered by tests
}
return segcore::CreateVectorDataArrayFrom(
buf.data(), count, field_meta);

Check warning on line 946 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L946

Added line #L946 was not covered by tests
}
return segcore::CreateVectorDataArrayFrom(buf.data(), count, field_meta);
}

void
Expand Down Expand Up @@ -997,7 +1027,8 @@ SegmentSealedImpl::check_search(const query::Plan* plan) const {
SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
IndexMetaPtr index_meta,
const SegcoreConfig& segcore_config,
int64_t segment_id)
int64_t segment_id,
bool TEST_skip_index_for_retrieve)
: segcore_config_(segcore_config),
field_data_ready_bitset_(schema->size()),
index_ready_bitset_(schema->size()),
Expand All @@ -1006,7 +1037,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
insert_record_(*schema, MAX_ROW_COUNT),
schema_(schema),
id_(segment_id),
col_index_meta_(index_meta) {
col_index_meta_(index_meta),
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) {
}

SegmentSealedImpl::~SegmentSealedImpl() {
Expand Down
17 changes: 13 additions & 4 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class SegmentSealedImpl : public SegmentSealed {
explicit SegmentSealedImpl(SchemaPtr schema,
IndexMetaPtr index_meta,
const SegcoreConfig& segcore_config,
int64_t segment_id);
int64_t segment_id,
bool TEST_skip_index_for_retrieve = false);
~SegmentSealedImpl() override;
void
LoadIndex(const LoadIndexInfo& info) override;
Expand Down Expand Up @@ -312,16 +313,24 @@ class SegmentSealedImpl : public SegmentSealed {
vec_binlog_config_;

SegmentStats stats_{};

// for sparse vector unit test only! Once a type of sparse index that
// doesn't has raw data is added, this should be removed.
bool TEST_skip_index_for_retrieve_ = false;
};

inline SegmentSealedUPtr
CreateSealedSegment(
SchemaPtr schema,
IndexMetaPtr index_meta = nullptr,
int64_t segment_id = -1,
const SegcoreConfig& segcore_config = SegcoreConfig::default_config()) {
return std::make_unique<SegmentSealedImpl>(
schema, index_meta, segcore_config, segment_id);
const SegcoreConfig& segcore_config = SegcoreConfig::default_config(),
bool TEST_skip_index_for_retrieve = false) {
return std::make_unique<SegmentSealedImpl>(schema,
index_meta,
segcore_config,
segment_id,
TEST_skip_index_for_retrieve);
}

} // namespace milvus::segcore
21 changes: 16 additions & 5 deletions internal/core/src/storage/ChunkCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ ChunkCache::Prefetch(const std::string& filepath) {
}

auto column = it->second;
auto ok =
madvise(reinterpret_cast<void*>(const_cast<char*>(column->Data())),
column->ByteSize(),
read_ahead_policy_);
auto ok = madvise(
reinterpret_cast<void*>(const_cast<char*>(column->MmappedData())),
column->ByteSize(),

Check warning on line 69 in internal/core/src/storage/ChunkCache.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/ChunkCache.cpp#L67-L69

Added lines #L67 - L69 were not covered by tests
read_ahead_policy_);
AssertInfo(ok == 0,
"failed to madvise to the data file {}, err: {}",
path,
Expand Down Expand Up @@ -100,7 +100,18 @@ ChunkCache::Mmap(const std::filesystem::path& path,

std::shared_ptr<ColumnBase> column{};

if (IsVariableDataType(data_type)) {
if (IsSparseFloatVectorDataType(data_type)) {
std::vector<uint64_t> indices{};
uint64_t offset = 0;
for (auto i = 0; i < field_data->get_num_rows(); ++i) {
indices.push_back(offset);
offset += field_data->Size(i);

Check warning on line 108 in internal/core/src/storage/ChunkCache.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/ChunkCache.cpp#L104-L108

Added lines #L104 - L108 were not covered by tests
}
auto sparse_column = std::make_shared<SparseFloatColumn>(
file, data_size, dim, data_type);
sparse_column->Seal(std::move(indices));
column = std::move(sparse_column);

Check warning on line 113 in internal/core/src/storage/ChunkCache.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/ChunkCache.cpp#L111-L113

Added lines #L111 - L113 were not covered by tests
} else if (IsVariableDataType(data_type)) {
AssertInfo(
false, "TODO: unimplemented for variable data type: {}", data_type);
} else {
Expand Down
9 changes: 6 additions & 3 deletions internal/core/src/storage/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,13 +533,16 @@ EncodeAndUploadIndexSlice2(std::shared_ptr<milvus_storage::Space> space,

std::pair<std::string, size_t>
EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
uint8_t* buf,
void* buf,
int64_t element_count,
FieldDataMeta field_data_meta,
const FieldMeta& field_meta,
std::string object_key) {
auto field_data =
CreateFieldData(field_meta.get_data_type(), field_meta.get_dim(), 0);
// dim should not be used for sparse float vector field
auto dim = IsSparseFloatVectorDataType(field_meta.get_data_type())
? -1
: field_meta.get_dim();
auto field_data = CreateFieldData(field_meta.get_data_type(), dim, 0);
field_data->FillFieldData(buf, element_count);
auto insertData = std::make_shared<InsertData>(field_data);
insertData->SetFieldDataMeta(field_data_meta);
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/storage/Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ EncodeAndUploadIndexSlice2(std::shared_ptr<milvus_storage::Space> space,
std::string object_key);
std::pair<std::string, size_t>
EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
uint8_t* buf,
void* buf,
int64_t element_count,
FieldDataMeta field_data_meta,
const FieldMeta& field_meta,
Expand Down
Loading

0 comments on commit 2b4a913

Please sign in to comment.