Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Improve GetVectorById of Sparse Float Vector #33209

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 52 additions & 14 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
namespace milvus {

/*
* If string field's value all empty, need a string padding to avoid
* If string field's value all empty, need a string padding to avoid
* mmap failing because size_ is zero which causing invalid arguement
* array has the same problem
* TODO: remove it when support NULL value
Expand Down Expand Up @@ -84,6 +84,7 @@
}

// mmap mode ctor
// User must call Seal to build the view for variable length column.
ColumnBase(const File& file, size_t size, const FieldMeta& field_meta)
: type_size_(IsSparseFloatVectorDataType(field_meta.get_data_type())
? 1
Expand All @@ -106,12 +107,16 @@
}

// mmap mode ctor
// User must call Seal to build the view for variable length column.
ColumnBase(const File& file,
size_t size,
int dim,
const DataType& data_type)
: type_size_(GetDataTypeSize(data_type, dim)),
num_rows_(size / GetDataTypeSize(data_type, dim)),
: type_size_(IsSparseFloatVectorDataType(data_type)
? 1
: GetDataTypeSize(data_type, dim)),
num_rows_(
IsSparseFloatVectorDataType(data_type) ? 1 : (size / type_size_)),
size_(size),
cap_size_(size),
is_map_anonymous_(false) {
Expand Down Expand Up @@ -153,11 +158,18 @@
column.size_ = 0;
}

// Data() points at an addr that contains the elements
virtual const char*
Data() const {
return data_;
}

// MmappedData() returns the mmaped address
const char*
MmappedData() const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any difference compared to the Data()?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data() points at an array of elements, while MmappedData() points at the entire mmap-ed memory. They are the same for dense vectors but not the same for sparse vectors. SparseFloatColumn overrides Data() to return something different.

return data_;

Check warning on line 170 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L169-L170

Added lines #L169 - L170 were not covered by tests
}

size_t
NumRows() const {
return num_rows_;
Expand Down Expand Up @@ -273,6 +285,7 @@
// capacity in bytes
size_t cap_size_{0};
size_t padding_{0};
// type_size_ is not used for sparse float vector column.
const size_t type_size_{1};
size_t num_rows_{0};

Expand Down Expand Up @@ -344,8 +357,7 @@
}
};

// mmap not yet supported, thus SparseFloatColumn is not using fields in super
// class such as ColumnBase::data.
// when mmap is used, size_, data_ and num_rows_ of ColumnBase are used.
class SparseFloatColumn : public ColumnBase {
public:
// memory mode ctor
Expand All @@ -356,7 +368,13 @@
size_t size,
const FieldMeta& field_meta)
: ColumnBase(file, size, field_meta) {
AssertInfo(false, "SparseFloatColumn mmap mode not supported");
}
// mmap mode ctor
SparseFloatColumn(const File& file,

Check warning on line 373 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L373

Added line #L373 was not covered by tests
size_t size,
int dim,
const DataType& data_type)
: ColumnBase(file, size, dim, data_type) {

Check warning on line 377 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L377

Added line #L377 was not covered by tests
}

SparseFloatColumn(SparseFloatColumn&& column) noexcept
Expand All @@ -372,14 +390,6 @@
return static_cast<const char*>(static_cast<const void*>(vec_.data()));
}

// This is used to advice mmap prefetch, we don't currently support mmap for
// sparse float vector thus not implemented for now.
size_t
ByteSize() const override {
throw std::runtime_error(
"ByteSize not supported for sparse float column");
}

size_t
Capacity() const override {
throw std::runtime_error(
Expand Down Expand Up @@ -413,6 +423,34 @@
return dim_;
}

void
Seal(std::vector<uint64_t> indices) {
AssertInfo(!indices.empty(),

Check warning on line 428 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L427-L428

Added lines #L427 - L428 were not covered by tests
"indices should not be empty, Seal() of "
"SparseFloatColumn must be called only "
"at mmap mode");
AssertInfo(data_,

Check warning on line 432 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L432

Added line #L432 was not covered by tests
"data_ should not be nullptr, Seal() of "
"SparseFloatColumn must be called only "
"at mmap mode");
num_rows_ = indices.size();

Check warning on line 436 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L436

Added line #L436 was not covered by tests
// so that indices[num_rows_] - indices[num_rows_ - 1] is the size of
// the last row.
indices.push_back(size_);
for (size_t i = 0; i < num_rows_; i++) {
auto vec_size = indices[i + 1] - indices[i];
AssertInfo(

Check warning on line 442 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L439-L442

Added lines #L439 - L442 were not covered by tests
vec_size % knowhere::sparse::SparseRow<float>::element_size() ==
0,
"Incorrect sparse vector size: {}",
vec_size);
vec_.emplace_back(
vec_size / knowhere::sparse::SparseRow<float>::element_size(),
(uint8_t*)(data_) + indices[i],
false);

Check warning on line 450 in internal/core/src/mmap/Column.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Column.h#L448-L450

Added lines #L448 - L450 were not covered by tests
}
}

private:
int64_t dim_ = 0;
std::vector<knowhere::sparse::SparseRow<float>> vec_;
Expand Down
16 changes: 12 additions & 4 deletions internal/core/src/mmap/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,18 @@
break;
}
case DataType::VECTOR_SPARSE_FLOAT: {
// TODO(SPARSE): this is for mmap to write data to disk so that
// the file can be mmaped into memory.
throw std::runtime_error(
"WriteFieldData for VECTOR_SPARSE_FLOAT not implemented");
for (size_t i = 0; i < data->get_num_rows(); ++i) {

Check warning on line 84 in internal/core/src/mmap/Utils.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Utils.h#L84

Added line #L84 was not covered by tests
auto vec =
static_cast<const knowhere::sparse::SparseRow<float>*>(
data->RawValue(i));

Check warning on line 87 in internal/core/src/mmap/Utils.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Utils.h#L87

Added line #L87 was not covered by tests
ssize_t written =
file.Write(vec->data(), vec->data_byte_size());
if (written < vec->data_byte_size()) {
break;

Check warning on line 91 in internal/core/src/mmap/Utils.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Utils.h#L89-L91

Added lines #L89 - L91 were not covered by tests
}
total_written += written;

Check warning on line 93 in internal/core/src/mmap/Utils.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Utils.h#L93

Added line #L93 was not covered by tests
}
break;

Check warning on line 95 in internal/core/src/mmap/Utils.h

View check run for this annotation

Codecov / codecov/patch

internal/core/src/mmap/Utils.h#L95

Added line #L95 was not covered by tests
}
default:
PanicInfo(DataTypeInvalid,
Expand Down
78 changes: 55 additions & 23 deletions internal/core/src/segcore/SegmentSealedImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,13 @@
column = std::move(arr_column);
break;
}
// TODO(SPARSE) support mmap
case milvus::DataType::VECTOR_SPARSE_FLOAT: {

Check warning on line 588 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L588

Added line #L588 was not covered by tests
auto sparse_column = std::make_shared<SparseFloatColumn>(
file, total_written, field_meta);
sparse_column->Seal(std::move(indices));
column = std::move(sparse_column);
break;

Check warning on line 593 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L590-L593

Added lines #L590 - L593 were not covered by tests
}
default: {
PanicInfo(DataTypeInvalid,
fmt::format("unsupported data type {}", data_type));
Expand Down Expand Up @@ -850,7 +856,7 @@
auto metric_type = vec_index->GetMetricType();
auto has_raw_data = vec_index->HasRawData();

if (has_raw_data) {
if (has_raw_data && !TEST_skip_index_for_retrieve_) {
// If index has raw data, get vector from memory.
auto ids_ds = GenIdsDataset(count, ids);
if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) {
Expand All @@ -865,9 +871,6 @@
}
}

AssertInfo(field_meta.get_data_type() != DataType::VECTOR_SPARSE_FLOAT,
"index of sparse float vector is guaranteed to have raw data");

// If index doesn't have raw data, get vector from chunk cache.
auto cc = storage::ChunkCacheSingleton::GetInstance().GetChunkCache();

Expand Down Expand Up @@ -898,23 +901,50 @@
path_to_column[data_path] = column;
}

// assign to data array
auto row_bytes = field_meta.get_sizeof();
auto buf = std::vector<char>(count * row_bytes);
for (auto i = 0; i < count; i++) {
AssertInfo(id_to_data_path.count(ids[i]) != 0, "id not found");
const auto& [data_path, offset_in_binlog] = id_to_data_path.at(ids[i]);
AssertInfo(path_to_column.count(data_path) != 0, "column not found");
const auto& column = path_to_column.at(data_path);
AssertInfo(offset_in_binlog * row_bytes < column->ByteSize(),
"column idx out of range, idx: {}, size: {}, data_path: {}",
offset_in_binlog * row_bytes,
column->ByteSize(),
data_path);
auto vector = &column->Data()[offset_in_binlog * row_bytes];
std::memcpy(buf.data() + i * row_bytes, vector, row_bytes);
if (field_meta.get_data_type() == DataType::VECTOR_SPARSE_FLOAT) {
auto buf = std::vector<knowhere::sparse::SparseRow<float>>(count);
for (auto i = 0; i < count; ++i) {
const auto& [data_path, offset_in_binlog] =
id_to_data_path.at(ids[i]);
const auto& column = path_to_column.at(data_path);
AssertInfo(

Check warning on line 910 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L904-L910

Added lines #L904 - L910 were not covered by tests
offset_in_binlog < column->NumRows(),
"column idx out of range, idx: {}, size: {}, data_path: {}",
offset_in_binlog,
column->NumRows(),
data_path);
auto sparse_column =
std::dynamic_pointer_cast<SparseFloatColumn>(column);
AssertInfo(sparse_column, "incorrect column created");
buf[i] = static_cast<const knowhere::sparse::SparseRow<float>*>(

Check warning on line 919 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L917-L919

Added lines #L917 - L919 were not covered by tests
static_cast<const void*>(
sparse_column->Data()))[offset_in_binlog];

Check warning on line 921 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L921

Added line #L921 was not covered by tests
}
return segcore::CreateVectorDataArrayFrom(
buf.data(), count, field_meta);

Check warning on line 924 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L924

Added line #L924 was not covered by tests
} else {
// assign to data array
auto row_bytes = field_meta.get_sizeof();
auto buf = std::vector<char>(count * row_bytes);
for (auto i = 0; i < count; ++i) {
AssertInfo(id_to_data_path.count(ids[i]) != 0, "id not found");
const auto& [data_path, offset_in_binlog] =
id_to_data_path.at(ids[i]);
AssertInfo(path_to_column.count(data_path) != 0,

Check warning on line 933 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L927-L933

Added lines #L927 - L933 were not covered by tests
"column not found");
const auto& column = path_to_column.at(data_path);
AssertInfo(

Check warning on line 936 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L935-L936

Added lines #L935 - L936 were not covered by tests
offset_in_binlog * row_bytes < column->ByteSize(),
"column idx out of range, idx: {}, size: {}, data_path: {}",
offset_in_binlog * row_bytes,
column->ByteSize(),
data_path);
auto vector = &column->Data()[offset_in_binlog * row_bytes];
std::memcpy(buf.data() + i * row_bytes, vector, row_bytes);

Check warning on line 943 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L942-L943

Added lines #L942 - L943 were not covered by tests
}
return segcore::CreateVectorDataArrayFrom(
buf.data(), count, field_meta);

Check warning on line 946 in internal/core/src/segcore/SegmentSealedImpl.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/segcore/SegmentSealedImpl.cpp#L946

Added line #L946 was not covered by tests
}
return segcore::CreateVectorDataArrayFrom(buf.data(), count, field_meta);
}

void
Expand Down Expand Up @@ -997,7 +1027,8 @@
SegmentSealedImpl::SegmentSealedImpl(SchemaPtr schema,
IndexMetaPtr index_meta,
const SegcoreConfig& segcore_config,
int64_t segment_id)
int64_t segment_id,
bool TEST_skip_index_for_retrieve)
: segcore_config_(segcore_config),
field_data_ready_bitset_(schema->size()),
index_ready_bitset_(schema->size()),
Expand All @@ -1006,7 +1037,8 @@
insert_record_(*schema, MAX_ROW_COUNT),
schema_(schema),
id_(segment_id),
col_index_meta_(index_meta) {
col_index_meta_(index_meta),
TEST_skip_index_for_retrieve_(TEST_skip_index_for_retrieve) {
}

SegmentSealedImpl::~SegmentSealedImpl() {
Expand Down
17 changes: 13 additions & 4 deletions internal/core/src/segcore/SegmentSealedImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class SegmentSealedImpl : public SegmentSealed {
explicit SegmentSealedImpl(SchemaPtr schema,
IndexMetaPtr index_meta,
const SegcoreConfig& segcore_config,
int64_t segment_id);
int64_t segment_id,
bool TEST_skip_index_for_retrieve = false);
~SegmentSealedImpl() override;
void
LoadIndex(const LoadIndexInfo& info) override;
Expand Down Expand Up @@ -312,16 +313,24 @@ class SegmentSealedImpl : public SegmentSealed {
vec_binlog_config_;

SegmentStats stats_{};

// for sparse vector unit test only! Once a type of sparse index that
// doesn't has raw data is added, this should be removed.
bool TEST_skip_index_for_retrieve_ = false;
};

inline SegmentSealedUPtr
CreateSealedSegment(
SchemaPtr schema,
IndexMetaPtr index_meta = nullptr,
int64_t segment_id = -1,
const SegcoreConfig& segcore_config = SegcoreConfig::default_config()) {
return std::make_unique<SegmentSealedImpl>(
schema, index_meta, segcore_config, segment_id);
const SegcoreConfig& segcore_config = SegcoreConfig::default_config(),
bool TEST_skip_index_for_retrieve = false) {
return std::make_unique<SegmentSealedImpl>(schema,
index_meta,
segcore_config,
segment_id,
TEST_skip_index_for_retrieve);
}

} // namespace milvus::segcore
21 changes: 16 additions & 5 deletions internal/core/src/storage/ChunkCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@
}

auto column = it->second;
auto ok =
madvise(reinterpret_cast<void*>(const_cast<char*>(column->Data())),
column->ByteSize(),
read_ahead_policy_);
auto ok = madvise(
reinterpret_cast<void*>(const_cast<char*>(column->MmappedData())),
column->ByteSize(),

Check warning on line 69 in internal/core/src/storage/ChunkCache.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/ChunkCache.cpp#L67-L69

Added lines #L67 - L69 were not covered by tests
read_ahead_policy_);
AssertInfo(ok == 0,
"failed to madvise to the data file {}, err: {}",
path,
Expand Down Expand Up @@ -100,7 +100,18 @@

std::shared_ptr<ColumnBase> column{};

if (IsVariableDataType(data_type)) {
if (IsSparseFloatVectorDataType(data_type)) {
std::vector<uint64_t> indices{};
uint64_t offset = 0;
for (auto i = 0; i < field_data->get_num_rows(); ++i) {
indices.push_back(offset);
offset += field_data->Size(i);

Check warning on line 108 in internal/core/src/storage/ChunkCache.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/ChunkCache.cpp#L104-L108

Added lines #L104 - L108 were not covered by tests
}
auto sparse_column = std::make_shared<SparseFloatColumn>(
file, data_size, dim, data_type);
sparse_column->Seal(std::move(indices));
column = std::move(sparse_column);

Check warning on line 113 in internal/core/src/storage/ChunkCache.cpp

View check run for this annotation

Codecov / codecov/patch

internal/core/src/storage/ChunkCache.cpp#L111-L113

Added lines #L111 - L113 were not covered by tests
} else if (IsVariableDataType(data_type)) {
AssertInfo(
false, "TODO: unimplemented for variable data type: {}", data_type);
} else {
Expand Down
9 changes: 6 additions & 3 deletions internal/core/src/storage/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -533,13 +533,16 @@ EncodeAndUploadIndexSlice2(std::shared_ptr<milvus_storage::Space> space,

std::pair<std::string, size_t>
EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
uint8_t* buf,
void* buf,
int64_t element_count,
FieldDataMeta field_data_meta,
const FieldMeta& field_meta,
std::string object_key) {
auto field_data =
CreateFieldData(field_meta.get_data_type(), field_meta.get_dim(), 0);
// dim should not be used for sparse float vector field
auto dim = IsSparseFloatVectorDataType(field_meta.get_data_type())
? -1
: field_meta.get_dim();
auto field_data = CreateFieldData(field_meta.get_data_type(), dim, 0);
field_data->FillFieldData(buf, element_count);
auto insertData = std::make_shared<InsertData>(field_data);
insertData->SetFieldDataMeta(field_data_meta);
Expand Down
2 changes: 1 addition & 1 deletion internal/core/src/storage/Util.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ EncodeAndUploadIndexSlice2(std::shared_ptr<milvus_storage::Space> space,
std::string object_key);
std::pair<std::string, size_t>
EncodeAndUploadFieldSlice(ChunkManager* chunk_manager,
uint8_t* buf,
void* buf,
int64_t element_count,
FieldDataMeta field_data_meta,
const FieldMeta& field_meta,
Expand Down
Loading
Loading