Skip to content

Commit

Permalink
[Sparse Float Vector] segcore to support sparse vector search and get…
Browse files Browse the repository at this point in the history
… raw vector by id

added lots of unit tests, converted many segcore tests into parameter
tests that works for both dense and sparse float vector

Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
  • Loading branch information
zhengbuqian committed Feb 20, 2024
1 parent c5101b4 commit 421483b
Show file tree
Hide file tree
Showing 55 changed files with 1,217 additions and 809 deletions.
26 changes: 26 additions & 0 deletions internal/core/src/common/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,32 @@ GetCommonPrefix(const std::string& str1, const std::string& str2) {
return str1.substr(0, i);
}

template <typename Iterable>
void
ValidateSparseProtoRows(const Iterable& rows) {
AssertInfo(rows.size() > 0, "at least 1 sparse row should be provided");
for (size_t i = 0; i < rows.size(); ++i) {
const auto& row = rows[i];
AssertInfo(row.indices().data_size() == row.values().data_size(),
fmt::format(
"row {} has different number of indices and values", i));
AssertInfo(row.indices().data_size() > 0,
fmt::format("row {} has no indices", i));
AssertInfo(
row.indices().data(0) >= 0,
fmt::format("row {} has negative index at position 0", i));
for (int j = 1; j < row.indices().data_size(); ++j) {
AssertInfo(
row.indices().data(j) > row.indices().data(j - 1),
fmt::format(
"row {} has non-increasing index at position {} and {}",
i,
j - 1,
j));
}
}
}

// Iterable is a list of milvus::proto::schema::SparseFloatRow. This helper
// function converts sparse matrix in proto to a vector of
// knowhere::sparse::SparseRow<float>.
Expand Down
2 changes: 2 additions & 0 deletions internal/core/src/index/VectorDiskIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ VectorDiskAnnIndex<T>::HasRawData() const {
template <typename T>
std::vector<uint8_t>
VectorDiskAnnIndex<T>::GetVector(const DatasetPtr dataset) const {
// TODO(SPARSE): error is sparse. caller should use GetSparseVector for
// sparse.
auto res = index_.GetVectorByIds(*dataset);
if (!res.has_value()) {
PanicInfo(ErrorCode::UnexpectedError,
Expand Down
5 changes: 5 additions & 0 deletions internal/core/src/index/VectorDiskIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class VectorDiskAnnIndex : public VectorIndex {
std::vector<uint8_t>
GetVector(const DatasetPtr dataset) const override;

std::unique_ptr<const knowhere::sparse::SparseRow<float>[]>
GetSparseVector(const DatasetPtr dataset) const override {
PanicInfo(ErrorCode::Unsupported, "get sparse vector not supported for disk index");
}

void
CleanLocalData() override;

Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/index/VectorIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ class VectorIndex : public IndexBase {
virtual std::vector<uint8_t>
GetVector(const DatasetPtr dataset) const = 0;

virtual std::unique_ptr<const knowhere::sparse::SparseRow<float>[]>
GetSparseVector(const DatasetPtr dataset) const = 0;

IndexType
GetIndexType() const {
return index_type_;
Expand Down
16 changes: 16 additions & 0 deletions internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -672,6 +672,8 @@ VectorMemIndex<T>::HasRawData() const {
template <typename T>
std::vector<uint8_t>
VectorMemIndex<T>::GetVector(const DatasetPtr dataset) const {
// TODO(SPARSE): error is sparse. caller should use GetSparseVector for
// sparse.
auto res = index_.GetVectorByIds(*dataset);
if (!res.has_value()) {
PanicInfo(ErrorCode::UnexpectedError,
Expand All @@ -693,6 +695,20 @@ VectorMemIndex<T>::GetVector(const DatasetPtr dataset) const {
return raw_data;
}

template <typename T>
std::unique_ptr<const knowhere::sparse::SparseRow<float>[]>
VectorMemIndex<T>::GetSparseVector(const DatasetPtr dataset) const {
auto res = index_.GetVectorByIds(*dataset);
if (!res.has_value()) {
PanicInfo(ErrorCode::UnexpectedError,
"failed to get vector, " + KnowhereStatusString(res.error()));
}
res.value()->SetIsOwner(false);
return std::unique_ptr<const knowhere::sparse::SparseRow<float>[]>(
static_cast<const knowhere::sparse::SparseRow<float>*>(
res.value()->GetTensor()));
}

template <typename T>
void
VectorMemIndex<T>::LoadFromFile(const Config& config) {
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/index/VectorMemIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ class VectorMemIndex : public VectorIndex {
std::vector<uint8_t>
GetVector(const DatasetPtr dataset) const override;

std::unique_ptr<const knowhere::sparse::SparseRow<float>[]>
GetSparseVector(const DatasetPtr dataset) const override;

BinarySet
Upload(const Config& config = {}) override;

Expand Down
54 changes: 35 additions & 19 deletions internal/core/src/query/Plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// limitations under the License.

#include "Plan.h"
#include "common/Utils.h"
#include "PlanProto.h"
#include "generated/ShowPlanNodeVisitor.h"

Expand All @@ -34,9 +35,8 @@ std::unique_ptr<PlaceholderGroup>
ParsePlaceholderGroup(const Plan* plan,
const uint8_t* blob,
const int64_t blob_len) {
namespace set = milvus::proto::common;
auto result = std::make_unique<PlaceholderGroup>();
set::PlaceholderGroup ph_group;
milvus::proto::common::PlaceholderGroup ph_group;
auto ok = ph_group.ParseFromArray(blob, blob_len);
Assert(ok);
for (auto& info : ph_group.placeholders()) {
Expand All @@ -45,23 +45,39 @@ ParsePlaceholderGroup(const Plan* plan,
Assert(plan->tag2field_.count(element.tag_));
auto field_id = plan->tag2field_.at(element.tag_);
auto& field_meta = plan->schema_[field_id];
element.num_of_queries_ = info.values_size();
AssertInfo(element.num_of_queries_, "must have queries");
Assert(element.num_of_queries_ > 0);
element.line_sizeof_ = info.values().Get(0).size();
if (field_meta.get_sizeof() != element.line_sizeof_) {
throw SegcoreError(
DimNotMatch,
fmt::format("vector dimension mismatch, expected vector "
"size(byte) {}, actual {}.",
field_meta.get_sizeof(),
element.line_sizeof_));
}
auto& target = element.blob_;
target.reserve(element.line_sizeof_ * element.num_of_queries_);
for (auto& line : info.values()) {
Assert(element.line_sizeof_ == line.size());
target.insert(target.end(), line.begin(), line.end());
if (info.type() ==
milvus::proto::common::PlaceholderType::SparseFloatVector) {
AssertInfo(info.values_size() == 1,
"multiple sparse float queries should be packed into "
"one placeholder value");
auto line = info.values().Get(0);
milvus::proto::schema::SparseFloatArray sparse;
if (!sparse.ParseFromArray(line.data(), line.size())) {
throw std::runtime_error("parse float array failed");
}
ValidateSparseProtoRows(sparse.contents());
element.num_of_queries_ = sparse.contents_size();
AssertInfo(element.num_of_queries_ > 0, "must have queries");
auto sp = SparseProtoToRows(sparse.contents());
element.sparse_matrix_ = std::move(sp);
} else {
element.num_of_queries_ = info.values_size();
AssertInfo(element.num_of_queries_ > 0, "must have queries");
auto line_size = info.values().Get(0).size();
if (field_meta.get_sizeof() != line_size) {
throw SegcoreError(
DimNotMatch,
fmt::format("vector dimension mismatch, expected vector "
"size(byte) {}, actual {}.",
field_meta.get_sizeof(),
line_size));
}
auto& target = element.blob_;
target.reserve(line_size * element.num_of_queries_);
for (auto& line : info.values()) {
Assert(line_size == line.size());
target.insert(target.end(), line.begin(), line.end());
}
}
result->emplace_back(std::move(element));
}
Expand Down
25 changes: 18 additions & 7 deletions internal/core/src/query/PlanImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,30 @@ struct Plan {
struct Placeholder {
std::string tag_;
int64_t num_of_queries_;
int64_t line_sizeof_;
// TODO(SPARSE): add a dim_ field here, use the dim passed in search request
// instead of the dim in schema, since the dim of sparse float column is
// dynamic. This change will likely affect lots of code, thus I'll do it in
// a separate PR, and use dim=0 for sparse float vector searches for now.

// only one of blob_ and sparse_matrix_ should be set. blob_ is used for
// dense vector search and sparse_matrix_ is for sparse vector search.
aligned_vector<char> blob_;
std::unique_ptr<knowhere::sparse::SparseRow<float>[]> sparse_matrix_;

template <typename T>
const T*
const void*
get_blob() const {
return reinterpret_cast<const T*>(blob_.data());
if (blob_.empty()) {
return sparse_matrix_.get();
}
return blob_.data();
}

template <typename T>
T*
void*
get_blob() {
return reinterpret_cast<T*>(blob_.data());
if (blob_.empty()) {
return sparse_matrix_.get();
}
return blob_.data();
}
};

Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/query/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ struct BFloat16VectorANNS : VectorPlanNode {
accept(PlanNodeVisitor&) override;
};

struct SparseFloatVectorANNS : VectorPlanNode {
public:
void
accept(PlanNodeVisitor&) override;
};

struct RetrievePlanNode : PlanNode {
public:
void
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/query/PlanProto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ ProtoParser::PlanNodeFromProto(const planpb::PlanNode& plan_node_proto) {
} else if (anns_proto.vector_type() ==
milvus::proto::plan::VectorType::BFloat16Vector) {
return std::make_unique<BFloat16VectorANNS>();
} else if (anns_proto.vector_type() ==
milvus::proto::plan::VectorType::SparseFloatVector) {
return std::make_unique<SparseFloatVectorANNS>();
} else {
return std::make_unique<FloatVectorANNS>();
}
Expand Down
22 changes: 20 additions & 2 deletions internal/core/src/query/SearchBruteForce.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ CheckBruteForceSearchParam(const FieldMeta& field,
"[BruteForceSearch] Data type isn't vector type");
bool is_float_data_type = (data_type == DataType::VECTOR_FLOAT ||
data_type == DataType::VECTOR_FLOAT16 ||
data_type == DataType::VECTOR_BFLOAT16);
data_type == DataType::VECTOR_BFLOAT16 ||
data_type == DataType::VECTOR_SPARSE_FLOAT);
bool is_float_metric_type = IsFloatMetricType(metric_type);
AssertInfo(is_float_data_type == is_float_metric_type,
"[BruteForceSearch] Data type and metric type miss-match");
Expand Down Expand Up @@ -65,7 +66,24 @@ BruteForceSearch(const dataset::SearchDataset& dataset,
sub_result.mutable_seg_offsets().resize(nq * topk);
sub_result.mutable_distances().resize(nq * topk);

if (conf.contains(RADIUS)) {
if (data_type == DataType::VECTOR_SPARSE_FLOAT) {
// TODO(SPARSE): support sparse brute force range search
AssertInfo(!conf.contains(RADIUS) && !conf.contains(RANGE_FILTER),
"sparse vector not support range search");
base_dataset->SetIsSparse(true);
query_dataset->SetIsSparse(true);
auto stat = knowhere::BruteForce::SearchSparseWithBuf(
base_dataset,
query_dataset,
sub_result.mutable_seg_offsets().data(),
sub_result.mutable_distances().data(),
config,
bitset);
milvus::tracer::AddEvent("knowhere_finish_BruteForce_SearchWithBuf");
if (stat != knowhere::Status::success) {
throw SegcoreError(KnowhereError, KnowhereStatusString(stat));
}
} else if (conf.contains(RADIUS)) {
config[RADIUS] = conf[RADIUS].get<float>();
if (conf.contains(RANGE_FILTER)) {
config[RANGE_FILTER] = conf[RANGE_FILTER].get<float>();
Expand Down
29 changes: 18 additions & 11 deletions internal/core/src/query/SearchOnGrowing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ FloatSegmentIndexSearch(const segcore::SegmentGrowingImpl& segment,

auto vecfield_id = info.field_id_;
auto& field = schema[vecfield_id];
auto is_sparse = field.get_data_type() == DataType::VECTOR_SPARSE_FLOAT;
// TODO(SPARSE): see todo in PlanImpl.h::PlaceHolder.
auto dim = is_sparse ? 0 : field.get_dim();

AssertInfo(field.get_data_type() == DataType::VECTOR_FLOAT,
"[FloatSearch]Field data type isn't VECTOR_FLOAT");
AssertInfo(field.get_data_type() == DataType::VECTOR_FLOAT ||
field.get_data_type() == DataType::VECTOR_SPARSE_FLOAT,
"[FloatSearch]Field data type isn't VECTOR_FLOAT or VECTOR_SPARSE_FLOAT");
dataset::SearchDataset search_dataset{info.metric_type_,
num_queries,
info.topk_,
info.round_decimal_,
field.get_dim(),
dim,
query_data};
if (indexing_record.is_in(vecfield_id)) {
const auto& field_indexing =
Expand All @@ -49,8 +53,8 @@ FloatSegmentIndexSearch(const segcore::SegmentGrowingImpl& segment,
auto indexing = field_indexing.get_segment_indexing();
SearchInfo search_conf = field_indexing.get_search_params(info);
auto vec_index = dynamic_cast<index::VectorIndex*>(indexing);
auto result =
SearchOnIndex(search_dataset, *vec_index, search_conf, bitset);
auto result = SearchOnIndex(
search_dataset, *vec_index, search_conf, bitset, is_sparse);
results.merge(result);
}
}
Expand Down Expand Up @@ -78,15 +82,12 @@ SearchOnGrowing(const segcore::SegmentGrowingImpl& segment,
AssertInfo(datatype_is_vector(data_type),
"[SearchOnGrowing]Data type isn't vector type");

auto dim = field.get_dim();
auto topk = info.topk_;
auto metric_type = info.metric_type_;
auto round_decimal = info.round_decimal_;

// step 2: small indexing search
SubSearchResult final_qr(num_queries, topk, metric_type, round_decimal);
dataset::SearchDataset search_dataset{
metric_type, num_queries, topk, round_decimal, dim, query_data};

if (segment.get_indexing_record().SyncDataWithIndex(field.get_id())) {
FloatSegmentIndexSearch(segment,
Expand All @@ -101,6 +102,12 @@ SearchOnGrowing(const segcore::SegmentGrowingImpl& segment,
results.unity_topK_ = topk;
results.total_nq_ = num_queries;
} else {
// TODO(SPARSE): see todo in PlanImpl.h::PlaceHolder.
auto dim = field.get_data_type() == DataType::VECTOR_SPARSE_FLOAT
? 0
: field.get_dim();
dataset::SearchDataset search_dataset{
metric_type, num_queries, topk, round_decimal, dim, query_data};
std::shared_lock<std::shared_mutex> read_chunk_mutex(
segment.get_chunk_mutex());
int32_t current_chunk_id = 0;
Expand All @@ -116,12 +123,12 @@ SearchOnGrowing(const segcore::SegmentGrowingImpl& segment,
auto element_begin = chunk_id * vec_size_per_chunk;
auto element_end =
std::min(active_count, (chunk_id + 1) * vec_size_per_chunk);
auto size_per_chunk = element_end - element_begin;
auto size_of_chunk = element_end - element_begin;

auto sub_view = bitset.subview(element_begin, size_per_chunk);
auto sub_view = bitset.subview(element_begin, size_of_chunk);
auto sub_qr = BruteForceSearch(search_dataset,
chunk_data,
size_per_chunk,
size_of_chunk,
info.search_params_,
sub_view,
data_type);
Expand Down
4 changes: 3 additions & 1 deletion internal/core/src/query/SearchOnIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ SubSearchResult
SearchOnIndex(const dataset::SearchDataset& search_dataset,
const index::VectorIndex& indexing,
const SearchInfo& search_conf,
const BitsetView& bitset) {
const BitsetView& bitset,
bool is_sparse) {
auto num_queries = search_dataset.num_queries;
auto topK = search_dataset.topk;
auto dim = search_dataset.dim;
auto metric_type = search_dataset.metric_type;
auto round_decimal = search_dataset.round_decimal;
auto dataset =
knowhere::GenDataSet(num_queries, dim, search_dataset.query_data);
dataset->SetIsSparse(is_sparse);

// NOTE: VecIndex Query API forget to add const qualifier
// NOTE: use const_cast as a workaround
Expand Down
3 changes: 2 additions & 1 deletion internal/core/src/query/SearchOnIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ SubSearchResult
SearchOnIndex(const dataset::SearchDataset& search_dataset,
const index::VectorIndex& indexing,
const SearchInfo& search_conf,
const BitsetView& bitset);
const BitsetView& bitset,
bool is_sparse = false);

} // namespace milvus::query
Loading

0 comments on commit 421483b

Please sign in to comment.