Skip to content

Commit

Permalink
enhance: [2.4] Improve GetVectorById of Sparse Float Vector (#33652)
Browse files Browse the repository at this point in the history
issue: #29419
pr: #33209 

codecov will fail due to newly added ut in test_sealed.cpp skipped due
to #33210

Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
  • Loading branch information
zhengbuqian committed Jun 6, 2024
1 parent 084140b commit 0ecdd5a
Show file tree
Hide file tree
Showing 11 changed files with 283 additions and 69 deletions.
66 changes: 52 additions & 14 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
namespace milvus {

/*
* If string field's value all empty, need a string padding to avoid
* If string field's value all empty, need a string padding to avoid
* mmap failing because size_ is zero which causing invalid arguement
* array has the same problem
* TODO: remove it when support NULL value
Expand Down Expand Up @@ -84,6 +84,7 @@ class ColumnBase {
}

// mmap mode ctor
// User must call Seal to build the view for variable length column.
ColumnBase(const File& file, size_t size, const FieldMeta& field_meta)
: type_size_(IsSparseFloatVectorDataType(field_meta.get_data_type())
? 1
Expand All @@ -106,12 +107,16 @@ class ColumnBase {
}

// mmap mode ctor
// User must call Seal to build the view for variable length column.
ColumnBase(const File& file,
size_t size,
int dim,
const DataType& data_type)
: type_size_(GetDataTypeSize(data_type, dim)),
num_rows_(size / GetDataTypeSize(data_type, dim)),
: type_size_(IsSparseFloatVectorDataType(data_type)
? 1
: GetDataTypeSize(data_type, dim)),
num_rows_(
IsSparseFloatVectorDataType(data_type) ? 1 : (size / type_size_)),
size_(size),
cap_size_(size),
is_map_anonymous_(false) {
Expand Down Expand Up @@ -153,11 +158,18 @@ class ColumnBase {
column.size_ = 0;
}

// Data() points at an addr that contains the elements
virtual const char*
Data() const {
return data_;
}

// MmappedData() returns the mmaped address
const char*
MmappedData() const {
return data_;
}

size_t
NumRows() const {
return num_rows_;
Expand Down Expand Up @@ -273,6 +285,7 @@ class ColumnBase {
// capacity in bytes
size_t cap_size_{0};
size_t padding_{0};
// type_size_ is not used for sparse float vector column.
const size_t type_size_{1};
size_t num_rows_{0};

Expand Down Expand Up @@ -344,8 +357,7 @@ class Column : public ColumnBase {
}
};

// mmap not yet supported, thus SparseFloatColumn is not using fields in super
// class such as ColumnBase::data.
// when mmap is used, size_, data_ and num_rows_ of ColumnBase are used.
class SparseFloatColumn : public ColumnBase {
public:
// memory mode ctor
Expand All @@ -356,7 +368,13 @@ class SparseFloatColumn : public ColumnBase {
size_t size,
const FieldMeta& field_meta)
: ColumnBase(file, size, field_meta) {
AssertInfo(false, "SparseFloatColumn mmap mode not supported");
}
// mmap mode ctor
SparseFloatColumn(const File& file,
size_t size,
int dim,
const DataType& data_type)
: ColumnBase(file, size, dim, data_type) {
}

SparseFloatColumn(SparseFloatColumn&& column) noexcept
Expand All @@ -372,14 +390,6 @@ class SparseFloatColumn : public ColumnBase {
return static_cast<const char*>(static_cast<const void*>(vec_.data()));
}

// This is used to advice mmap prefetch, we don't currently support mmap for
// sparse float vector thus not implemented for now.
size_t
ByteSize() const override {
throw std::runtime_error(
"ByteSize not supported for sparse float column");
}

size_t
Capacity() const override {
throw std::runtime_error(
Expand Down Expand Up @@ -413,6 +423,34 @@ class SparseFloatColumn : public ColumnBase {
return dim_;
}

void
Seal(std::vector<uint64_t> indices) {
AssertInfo(!indices.empty(),
"indices should not be empty, Seal() of "
"SparseFloatColumn must be called only "
"at mmap mode");
AssertInfo(data_,
"data_ should not be nullptr, Seal() of "
"SparseFloatColumn must be called only "
"at mmap mode");
num_rows_ = indices.size();
// so that indices[num_rows_] - indices[num_rows_ - 1] is the size of
// the last row.
indices.push_back(size_);
for (size_t i = 0; i < num_rows_; i++) {
auto vec_size = indices[i + 1] - indices[i];
AssertInfo(
vec_size % knowhere::sparse::SparseRow<float>::element_size() ==
0,
"Incorrect sparse vector size: {}",
vec_size);
vec_.emplace_back(
vec_size / knowhere::sparse::SparseRow<float>::element_size(),
(uint8_t*)(data_) + indices[i],
false);
}
}

private:
int64_t dim_ = 0;
std::vector<knowhere::sparse::SparseRow<float>> vec_;
Expand Down
16 changes: 12 additions & 4 deletions internal/core/src/mmap/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,18 @@ WriteFieldData(File& file,
break;
}
case DataType::VECTOR_SPARSE_FLOAT: {
// TODO(SPARSE): this is for mmap to write data to disk so that
// the file can be mmaped into memory.
throw std::runtime_error(
"WriteFieldData for VECTOR_SPARSE_FLOAT not implemented");
for (size_t i = 0; i < data->get_num_rows(); ++i) {
auto vec =
static_cast<const knowhere::sparse::SparseRow<float>*>(
data->RawValue(i));
ssize_t written =
file.Write(vec->data(), vec->data_byte_size());
if (written < vec->data_byte_size()) {
break;
}
total_written += written;
}
break;
}
default:
PanicInfo(DataTypeInvalid,
Expand Down
78 changes: 55 additions & 23 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,13 @@ SegmentSealedImpl::MapFieldData(const FieldId field_id, FieldDataInfo& data) {
column = std::move(arr_column);
break;
}
// TODO(SPARSE) support mmap
case milvus::DataType::VECTOR_SPARSE_FLOAT: {
auto sparse_column = std::make_shared<SparseFloatColumn>(
file, total_written, field_meta);
sparse_column->Seal(std::move(indices));
column = std::move(sparse_column);
break;
}
default: {
PanicInfo(DataTypeInvalid,
fmt::format("unsupported data type {}", data_type));
Expand Down Expand Up @@ -833,7 +839,7 @@ SegmentSealedImpl::get_vector(FieldId field_id,
auto metric_type = vec_index->GetMetricType();
auto has_raw_data = vec_index->HasRawData();

if (has_raw_data) {
if (has_raw_data && !TEST_skip_index_for_retrieve_) {
// If index has raw data, get vector from memory.
auto ids_ds = GenIdsDataset(count, ids);
if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) {
Expand All @@ -848,9 +854,6 @@ SegmentSealedImpl::get_vector(FieldId field_id,
}
}

AssertInfo(field_meta.get_data_type() != DataType::VECTOR_SPARSE_FLOAT,
"index of sparse float vector is guaranteed to have raw data");

// If index doesn't have raw data, get vector from chunk cache.
auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache();

Expand Down Expand Up @@ -881,23 +884,50 @@ SegmentSealedImpl::get_vector(FieldId field_id,
path_to_column[data_path] = column;
}

// assign to data array
auto row_bytes = field_meta.get_sizeof();
auto buf = std::vector<char>(count * row_bytes);
for (auto i = 0; i < count; i++) {
AssertInfo(id_to_data_path.count(ids[i]) != 0, "id not found");
const auto& [data_path, offset_in_binlog] = id_to_data_path.at(ids[i]);
AssertInfo(path_to_column.count(data_path) != 0, "column not found");
const auto& column = path_to_column.at(data_path);
AssertInfo(offset_in_binlog * row_bytes < column->ByteSize(),
"column idx out of range, idx: {}, size: {}, data_path: {}",
offset_in_binlog * row_bytes,
column->ByteSize(),
data_path);
auto vector = &column->Data()[offset_in_binlog * row_bytes];
std::memcpy(buf.data() + i * row_bytes, vector, row_bytes);
if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) {
auto buf = std::vector<knowhere::sparse::SparseRow<float>>(count);
for (auto i = 0; i < count; ++i) {
const auto& [data_path, offset_in_binlog] =
id_to_data_path.at(ids[i]);
const auto& column = path_to_column.at(data_path);
AssertInfo(
offset_in_binlog < column->NumRows(),
"column idx out of range, idx: {}, size: {}, data_path: {}",
offset_in_binlog,
column->NumRows(),
data_path);
auto sparse_column =
std::dynamic_pointer_cast<SparseFloatColumn>(column);
AssertInfo(sparse_column, "incorrect column created");
buf[i] = static_cast<const knowhere::sparse::SparseRow<float>*>(
static_cast<const void*>(
sparse_column->Data()))[offset_in_binlog];
}
return segcore::CreateVectorDataArrayFrom(
buf.data(), count, field_meta);
} else {
// assign to data array
auto row_bytes = field_meta.get_sizeof();
auto buf = std::vector<char>(count * row_bytes);
for (auto i = 0; i < count; ++i) {
AssertInfo(id_to_data_path.count(ids[i]) != 0, "id not found");
const auto& [data_path, offset_in_binlog] =
id_to_data_path.at(ids[i]);
AssertInfo(path_to_column.count(data_path) != 0,
"column not found");
const auto& column = path_to_column.at(data_path);
AssertInfo(
offset_in_binlog * row_bytes < column->ByteSize(),
"column idx out of range, idx: {}, size: {}, data_path: {}",
offset_in_binlog * row_bytes,
column->ByteSize(),
data_path);
auto vector = &column->Data()[offset_in_binlog * row_bytes];
std::memcpy(buf.data() + i * row_bytes, vector, row_bytes);
}
return segcore::CreateVectorDataArrayFrom(
buf.data(), count, field_meta);
}
return segcore::CreateVectorDataArrayFrom(buf.data(), count, field_meta);
}

void
Expand Down Expand Up @@ -978,7 +1008,8 @@ SegmentSealedImpl::check_search(const query::Plan* plan) const {
SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
IndexMetaPtr index_meta,
const SegcoreConfig& segcore_config,
int64_t segment_id)
int64_t segment_id,
bool TEST_skip_index_for_retrieve)
: segcore_config_(segcore_config),
field_data_ready_bitset_(schema->size()),
index_ready_bitset_(schema->size()),
Expand All @@ -987,7 +1018,8 @@ SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
insert_record_(*schema, MAX_ROW_COUNT),
schema_(schema),
id_(segment_id),
col_index_meta_(index_meta) {
col_index_meta_(index_meta),
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) {
}

SegmentSealedImpl::~SegmentSealedImpl() {
Expand Down
17 changes: 13 additions & 4 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class SegmentSealedImpl : public SegmentSealed {
explicit SegmentSealedImpl(SchemaPtr schema,
IndexMetaPtr index_meta,
const SegcoreConfig& segcore_config,
int64_t segment_id);
int64_t segment_id,
bool TEST_skip_index_for_retrieve = false);
~SegmentSealedImpl() override;
void
LoadIndex(const LoadIndexInfo& info) override;
Expand Down Expand Up @@ -312,16 +313,24 @@ class SegmentSealedImpl : public SegmentSealed {
vec_binlog_config_;

SegmentStats stats_{};

// for sparse vector unit test only! Once a type of sparse index that
// doesn't has raw data is added, this should be removed.
bool TEST_skip_index_for_retrieve_ = false;
};

inline SegmentSealedUPtr
CreateSealedSegment(
SchemaPtr schema,
IndexMetaPtr index_meta = nullptr,
int64_t segment_id = -1,
const SegcoreConfig& segcore_config = SegcoreConfig::default_config()) {
return std::make_unique<SegmentSealedImpl>(
schema, index_meta, segcore_config, segment_id);
const SegcoreConfig& segcore_config = SegcoreConfig::default_config(),
bool TEST_skip_index_for_retrieve = false) {
return std::make_unique<SegmentSealedImpl>(schema,
index_meta,
segcore_config,
segment_id,
TEST_skip_index_for_retrieve);
}

} // namespace milvus::segcore
21 changes: 16 additions & 5 deletions internal/core/src/storage/ChunkCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ ChunkCache::Prefetch(const std::string& filepath) {
}

auto column = it->second;
auto ok =
madvise(reinterpret_cast<void*>(const_cast<char*>(column->Data())),
column->ByteSize(),
read_ahead_policy_);
auto ok = madvise(
reinterpret_cast<void*>(const_cast<char*>(column->MmappedData())),
column->ByteSize(),
read_ahead_policy_);
AssertInfo(ok == 0,
"failed to madvise to the data file {}, err: {}",
path,
Expand Down Expand Up @@ -100,7 +100,18 @@ ChunkCache::Mmap(const std::filesystem::path& path,

std::shared_ptr<ColumnBase> column{};

if (IsVariableDataType(data_type)) {
if (IsSparseFloatVectorDataType(data_type)) {
std::vector<uint64_t> indices{};
uint64_t offset = 0;
for (auto i = 0; i < field_data->get_num_rows(); ++i) {
indices.push_back(offset);
offset += field_data->Size(i);
}
auto sparse_column = std::make_shared<SparseFloatColumn>(
file, data_size, dim, data_type);
sparse_column->Seal(std::move(indices));
column = std::move(sparse_column);
} else if (IsVariableDataType(data_type)) {
AssertInfo(
false, "TODO: unimplemented for variable data type: {}", data_type);
} else {
Expand Down
9 changes: 6 additions & 3 deletions internal/core/src/storage/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,13 +533,16 @@ EncodeAndUploadIndexSlice2(std::shared_ptr<milvus_storage::Space> space,

std::pair<std::string, size_t>
EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
uint8_t* buf,
void* buf,
int64_t element_count,
FieldDataMeta field_data_meta,
const FieldMeta& field_meta,
std::string object_key) {
auto field_data =
CreateFieldData(field_meta.get_data_type(), field_meta.get_dim(), 0);
// dim should not be used for sparse float vector field
auto dim = IsSparseFloatVectorDataType(field_meta.get_data_type())
? -1
: field_meta.get_dim();
auto field_data = CreateFieldData(field_meta.get_data_type(), dim, 0);
field_data->FillFieldData(buf, element_count);
auto insertData = std::make_shared<InsertData>(field_data);
insertData->SetFieldDataMeta(field_data_meta);
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/storage/Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ EncodeAndUploadIndexSlice2(std::shared_ptr<milvus_storage::Space> space,
std::string object_key);
std::pair<std::string, size_t>
EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
uint8_t* buf,
void* buf,
int64_t element_count,
FieldDataMeta field_data_meta,
const FieldMeta& field_meta,
Expand Down
Loading

0 comments on commit 0ecdd5a

Please sign in to comment.