Skip to content

Commit

Permalink
[Sparse Float Vector] segcore to support sparse vector search and get…
Browse files Browse the repository at this point in the history
… raw vector by id

added lots of unit tests, converted many segcore tests into parameter
tests that works for both dense and sparse float vector

Signed-off-by: Buqian Zheng <zhengbuqian@gmail.com>
  • Loading branch information
zhengbuqian committed Feb 29, 2024
1 parent b64864d commit 12f289a
Show file tree
Hide file tree
Showing 59 changed files with 1,232 additions and 832 deletions.
2 changes: 1 addition & 1 deletion internal/core/src/common/FieldDataInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ class FieldDataSparseVectorImpl
}

private:
int64_t vec_dim_;
int64_t vec_dim_ = 0;
};

class FieldDataArrayImpl : public FieldDataImpl<Array, true> {
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/common/Types.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ using IndexVersion = knowhere::IndexVersion;
// TODO :: type define milvus index type(vector index type and scalar index type)
using IndexType = knowhere::IndexType;

inline bool
IndexIsSparse(const IndexType& index_type) {
return index_type == knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX ||
index_type == knowhere::IndexEnum::INDEX_SPARSE_WAND;
}

// Plus 1 because we can't use greater(>) symbol
constexpr size_t REF_SIZE_THRESHOLD = 16 + 1;

Expand Down
20 changes: 11 additions & 9 deletions internal/core/src/common/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,23 +241,25 @@ SparseBytesToRows(const Iterable& rows) {
return res;
}

// SparseRowsToProto converts a vector of knowhere::sparse::SparseRow<float> to
// SparseRowsToProto converts a list of knowhere::sparse::SparseRow<float> to
// a milvus::proto::schema::SparseFloatArray. The resulting proto is a deep copy
// of the source data.
// of the source data. source(i) returns the i-th row to be copied.
inline void
SparseRowsToProto(const knowhere::sparse::SparseRow<float>* source,
int64_t rows,
milvus::proto::schema::SparseFloatArray* proto) {
SparseRowsToProto(
const std::function<const knowhere::sparse::SparseRow<float>*(size_t)>&
source,
int64_t rows,
milvus::proto::schema::SparseFloatArray* proto) {
int64_t max_dim = 0;
for (size_t i = 0; i < rows; ++i) {
if (source + i == nullptr) {
const auto* row = source(i);
if (row == nullptr) {
// empty row
proto->add_contents();
continue;
}
auto& row = source[i];
max_dim = std::max(max_dim, row.dim());
proto->add_contents(row.data(), row.data_byte_size());
max_dim = std::max(max_dim, row->dim());
proto->add_contents(row->data(), row->data_byte_size());
}
proto->set_dim(max_dim);
}
Expand Down
27 changes: 0 additions & 27 deletions internal/core/src/common/VectorTrait.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,33 +68,6 @@ template <typename T>
constexpr bool IsSparse = std::is_same_v<T, SparseFloatVector> ||
std::is_same_v<T, knowhere::sparse::SparseRow<float>>;

template <typename T, typename Enabled = void>
struct EmbeddedTypeImpl;

template <typename T>
struct EmbeddedTypeImpl<T, std::enable_if_t<IsScalar<T>>> {
using type = T;
};

template <typename T>
struct EmbeddedTypeImpl<T, std::enable_if_t<IsVector<T>>> {
using type = std::conditional_t<
std::is_same_v<T, FloatVector>,
float,
std::conditional_t<
std::is_same_v<T, Float16Vector>,
float16,
std::conditional_t<
std::is_same_v<T, BFloat16Vector>,
bfloat16,
std::conditional_t<std::is_same_v<T, SparseFloatVector>,
void,
uint8_t>>>>;
};

template <typename T>
using EmbeddedType = typename EmbeddedTypeImpl<T>::type;

struct FundamentalTag {};
struct StringTag {};

Expand Down
6 changes: 5 additions & 1 deletion internal/core/src/index/VectorDiskIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,14 +416,18 @@ VectorDiskAnnIndex<T>::HasRawData() const {
template <typename T>
std::vector<uint8_t>
VectorDiskAnnIndex<T>::GetVector(const DatasetPtr dataset) const {
auto index_type = GetIndexType();
if (IndexIsSparse(index_type)) {
PanicInfo(ErrorCode::UnexpectedError,
"failed to get vector, index is sparse");
}
auto res = index_.GetVectorByIds(*dataset);
if (!res.has_value()) {
PanicInfo(ErrorCode::UnexpectedError,
fmt::format("failed to get vector: {}: {}",
KnowhereStatusString(res.error()),
res.what()));
}
auto index_type = GetIndexType();
auto tensor = res.value()->GetTensor();
auto row_num = res.value()->GetRows();
auto dim = res.value()->GetDim();
Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/index/VectorDiskIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ class VectorDiskAnnIndex : public VectorIndex {
std::vector<uint8_t>
GetVector(const DatasetPtr dataset) const override;

std::unique_ptr<const knowhere::sparse::SparseRow<float>[]>
GetSparseVector(const DatasetPtr dataset) const override {
PanicInfo(ErrorCode::Unsupported,
"get sparse vector not supported for disk index");
}

void
CleanLocalData() override;

Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/index/VectorIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class VectorIndex : public IndexBase {
virtual std::vector<uint8_t>
GetVector(const DatasetPtr dataset) const = 0;

virtual std::unique_ptr<const knowhere::sparse::SparseRow<float>[]>
GetSparseVector(const DatasetPtr dataset) const = 0;

IndexType
GetIndexType() const {
return index_type_;
Expand Down
25 changes: 23 additions & 2 deletions internal/core/src/index/VectorMemIndex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ VectorMemIndex<T>::Build(const Config& config) {
build_config.update(config);
build_config.erase("insert_files");
build_config.erase(VEC_OPT_FIELDS);
if (GetIndexType().find("SPARSE") == std::string::npos) {
if (!IndexIsSparse(GetIndexType())) {
int64_t total_size = 0;
int64_t total_num_rows = 0;
int64_t dim = 0;
Expand Down Expand Up @@ -534,6 +534,7 @@ VectorMemIndex<T>::Build(const Config& config) {
AssertInfo(ptr, "failed to cast field data to sparse rows");
for (size_t i = 0; i < field_data->Length(); ++i) {
// this does a deep copy of field_data's data.
AssertInfo(dim >= ptr[i].dim(), "bad dim");
vec[offset + i] = ptr[i];
}
offset += field_data->Length();
Expand Down Expand Up @@ -636,12 +637,17 @@ VectorMemIndex<T>::HasRawData() const {
template <typename T>
std::vector<uint8_t>
VectorMemIndex<T>::GetVector(const DatasetPtr dataset) const {
auto index_type = GetIndexType();
if (IndexIsSparse(index_type)) {
PanicInfo(ErrorCode::UnexpectedError,
"failed to get vector, index is sparse");
}

auto res = index_.GetVectorByIds(*dataset);
if (!res.has_value()) {
PanicInfo(ErrorCode::UnexpectedError,
"failed to get vector, " + KnowhereStatusString(res.error()));
}
auto index_type = GetIndexType();
auto tensor = res.value()->GetTensor();
auto row_num = res.value()->GetRows();
auto dim = res.value()->GetDim();
Expand All @@ -657,6 +663,21 @@ VectorMemIndex<T>::GetVector(const DatasetPtr dataset) const {
return raw_data;
}

template <typename T>
std::unique_ptr<const knowhere::sparse::SparseRow<float>[]>
VectorMemIndex<T>::GetSparseVector(const DatasetPtr dataset) const {
auto res = index_.GetVectorByIds(*dataset);
if (!res.has_value()) {
PanicInfo(ErrorCode::UnexpectedError,
"failed to get vector, " + KnowhereStatusString(res.error()));
}
// release and transfer ownership to the result unique ptr.
res.value()->SetIsOwner(false);
return std::unique_ptr<const knowhere::sparse::SparseRow<float>[]>(
static_cast<const knowhere::sparse::SparseRow<float>*>(
res.value()->GetTensor()));
}

template <typename T>
void
VectorMemIndex<T>::LoadFromFile(const Config& config) {
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/index/VectorMemIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ class VectorMemIndex : public VectorIndex {
std::vector<uint8_t>
GetVector(const DatasetPtr dataset) const override;

std::unique_ptr<const knowhere::sparse::SparseRow<float>[]>
GetSparseVector(const DatasetPtr dataset) const override;

BinarySet
Upload(const Config& config = {}) override;

Expand Down
12 changes: 8 additions & 4 deletions internal/core/src/mmap/Column.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ class ColumnBase {
public:
// memory mode ctor
ColumnBase(size_t reserve, const FieldMeta& field_meta)
: type_size_(field_meta.get_sizeof()) {
: type_size_(datatype_is_sparse_vector(field_meta.get_data_type())
? 1
: field_meta.get_sizeof()) {
// simdjson requires a padding following the json data
padding_ = field_meta.get_data_type() == DataType::JSON
? simdjson::SIMDJSON_PADDING
Expand All @@ -55,7 +57,7 @@ class ColumnBase {
return;
}

cap_size_ = field_meta.get_sizeof() * reserve;
cap_size_ = type_size_ * reserve;

// use anon mapping so we are able to free these memory with munmap only
data_ = static_cast<char*>(mmap(nullptr,
Expand All @@ -72,8 +74,10 @@ class ColumnBase {

// mmap mode ctor
ColumnBase(const File& file, size_t size, const FieldMeta& field_meta)
: type_size_(field_meta.get_sizeof()),
num_rows_(size / field_meta.get_sizeof()) {
: type_size_(datatype_is_sparse_vector(field_meta.get_data_type())
? 1
: field_meta.get_sizeof()),
num_rows_(size / type_size_) {
padding_ = field_meta.get_data_type() == DataType::JSON
? simdjson::SIMDJSON_PADDING
: 0;
Expand Down
40 changes: 22 additions & 18 deletions internal/core/src/query/Plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// limitations under the License.

#include "Plan.h"
#include "common/Utils.h"
#include "PlanProto.h"
#include "generated/ShowPlanNodeVisitor.h"

Expand All @@ -34,9 +35,8 @@ std::unique_ptr<PlaceholderGroup>
ParsePlaceholderGroup(const Plan* plan,
const uint8_t* blob,
const int64_t blob_len) {
namespace set = milvus::proto::common;
auto result = std::make_unique<PlaceholderGroup>();
set::PlaceholderGroup ph_group;
milvus::proto::common::PlaceholderGroup ph_group;
auto ok = ph_group.ParseFromArray(blob, blob_len);
Assert(ok);
for (auto& info : ph_group.placeholders()) {
Expand All @@ -46,22 +46,26 @@ ParsePlaceholderGroup(const Plan* plan,
auto field_id = plan->tag2field_.at(element.tag_);
auto& field_meta = plan->schema_[field_id];
element.num_of_queries_ = info.values_size();
AssertInfo(element.num_of_queries_, "must have queries");
Assert(element.num_of_queries_ > 0);
element.line_sizeof_ = info.values().Get(0).size();
if (field_meta.get_sizeof() != element.line_sizeof_) {
throw SegcoreError(
DimNotMatch,
fmt::format("vector dimension mismatch, expected vector "
"size(byte) {}, actual {}.",
field_meta.get_sizeof(),
element.line_sizeof_));
}
auto& target = element.blob_;
target.reserve(element.line_sizeof_ * element.num_of_queries_);
for (auto& line : info.values()) {
Assert(element.line_sizeof_ == line.size());
target.insert(target.end(), line.begin(), line.end());
AssertInfo(element.num_of_queries_ > 0, "must have queries");
if (info.type() ==
milvus::proto::common::PlaceholderType::SparseFloatVector) {
element.sparse_matrix_ = SparseBytesToRows(info.values());
} else {
auto line_size = info.values().Get(0).size();
if (field_meta.get_sizeof() != line_size) {
throw SegcoreError(
DimNotMatch,
fmt::format("vector dimension mismatch, expected vector "
"size(byte) {}, actual {}.",
field_meta.get_sizeof(),
line_size));
}
auto& target = element.blob_;
target.reserve(line_size * element.num_of_queries_);
for (auto& line : info.values()) {
Assert(line_size == line.size());
target.insert(target.end(), line.begin(), line.end());
}
}
result->emplace_back(std::move(element));
}
Expand Down
25 changes: 18 additions & 7 deletions internal/core/src/query/PlanImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,30 @@ struct Plan {
struct Placeholder {
std::string tag_;
int64_t num_of_queries_;
int64_t line_sizeof_;
// TODO(SPARSE): add a dim_ field here, use the dim passed in search request
// instead of the dim in schema, since the dim of sparse float column is
// dynamic. This change will likely affect lots of code, thus I'll do it in
// a separate PR, and use dim=0 for sparse float vector searches for now.

// only one of blob_ and sparse_matrix_ should be set. blob_ is used for
// dense vector search and sparse_matrix_ is for sparse vector search.
aligned_vector<char> blob_;
std::unique_ptr<knowhere::sparse::SparseRow<float>[]> sparse_matrix_;

template <typename T>
const T*
const void*
get_blob() const {
return reinterpret_cast<const T*>(blob_.data());
if (blob_.empty()) {
return sparse_matrix_.get();
}
return blob_.data();
}

template <typename T>
T*
void*
get_blob() {
return reinterpret_cast<T*>(blob_.data());
if (blob_.empty()) {
return sparse_matrix_.get();
}
return blob_.data();
}
};

Expand Down
6 changes: 6 additions & 0 deletions internal/core/src/query/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ struct BFloat16VectorANNS : VectorPlanNode {
accept(PlanNodeVisitor&) override;
};

struct SparseFloatVectorANNS : VectorPlanNode {
public:
void
accept(PlanNodeVisitor&) override;
};

struct RetrievePlanNode : PlanNode {
public:
void
Expand Down
3 changes: 3 additions & 0 deletions internal/core/src/query/PlanProto.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ ProtoParser::PlanNodeFromProto(const planpb::PlanNode& plan_node_proto) {
} else if (anns_proto.vector_type() ==
milvus::proto::plan::VectorType::BFloat16Vector) {
return std::make_unique<BFloat16VectorANNS>();
} else if (anns_proto.vector_type() ==
milvus::proto::plan::VectorType::SparseFloatVector) {
return std::make_unique<SparseFloatVectorANNS>();
} else {
return std::make_unique<FloatVectorANNS>();
}
Expand Down
Loading

0 comments on commit 12f289a

Please sign in to comment.