diff --git a/include/knowhere/comp/brute_force.h b/include/knowhere/comp/brute_force.h index 240aa0f4b..6ffe6e076 100644 --- a/include/knowhere/comp/brute_force.h +++ b/include/knowhere/comp/brute_force.h @@ -29,6 +29,16 @@ class BruteForce { static expected RangeSearch(const DataSetPtr base_dataset, const DataSetPtr query_dataset, const Json& config, const BitsetView& bitset); + + // Perform row oriented sparse vector brute force search. + // For unit test purpose only, assumes that the tensor is csr matrix with types: + // shape: int64 + // indptr: int64 + // indices: int32 + // data: float + static expected + SearchSparse(const DataSetPtr base_dataset, const DataSetPtr query_dataset, const Json& config, + const BitsetView& bitset); }; } // namespace knowhere diff --git a/include/knowhere/comp/index_param.h b/include/knowhere/comp/index_param.h index 675ccc35c..8f0c65e4c 100644 --- a/include/knowhere/comp/index_param.h +++ b/include/knowhere/comp/index_param.h @@ -44,6 +44,8 @@ constexpr const char* INDEX_RAFT_CAGRA = "GPU_RAFT_CAGRA"; constexpr const char* INDEX_HNSW = "HNSW"; constexpr const char* INDEX_DISKANN = "DISKANN"; +constexpr const char* INDEX_SPARSE_INVERTED_INDEX = "SPARSE_INVERTED_INDEX"; +constexpr const char* INDEX_SPARSE_WAND = "SPARSE_WAND"; } // namespace IndexEnum namespace meta { @@ -88,6 +90,10 @@ constexpr const char* HNSW_M = "M"; constexpr const char* EF = "ef"; constexpr const char* SEED_EF = "seed_ef"; constexpr const char* OVERVIEW_LEVELS = "overview_levels"; + +// Sparse Params +constexpr const char* DROP_RATIO_BUILD = "drop_ratio_build"; +constexpr const char* DROP_RATIO_SEARCH = "drop_ratio_search"; } // namespace indexparam using MetricType = std::string; diff --git a/include/knowhere/dataset.h b/include/knowhere/dataset.h index c77decaf3..120087c66 100644 --- a/include/knowhere/dataset.h +++ b/include/knowhere/dataset.h @@ -78,6 +78,11 @@ class DataSet { this->data_[meta::IDS] = Var(std::in_place_index<2>, ids); } + /** + * For dense float vector, tensor is a rows * dim float array + * For sparse float vector, tensor is a CSR matrix. See namespace sparse in utils.h for details. + * rows and dim should be set for both dense/sparse dataset. + */ void SetTensor(const void* tensor) { std::unique_lock lock(mutex_); diff --git a/include/knowhere/utils.h b/include/knowhere/utils.h index a6ff6bed4..25062b941 100644 --- a/include/knowhere/utils.h +++ b/include/knowhere/utils.h @@ -14,6 +14,8 @@ #include #include +#include +#include #include #include "knowhere/binaryset.h" @@ -92,4 +94,157 @@ readBinaryPOD(R& in, T& podRef) { in.read((char*)&podRef, sizeof(T)); } +// utilities for sparse index +namespace sparse { + +// type used to represent the id of a vector in the index interface. +// this is same as other index types. +using label_t = int64_t; +// type used to represent the id, indices and indptr of a vector inside the index. +using table_t = uint32_t; + +/** +CSR(Compressed Sparse Row) Matrix Format: + + +------------+-----------+------------+------------+--------------+-------------+-------------+ + | | rows | cols | nnz | indptr | indices | data | + +------------+-----------+------------+------------+--------------+-------------+-------------+ + | Type | ShapeT | ShapeT | ShapeT | IntPtrT | IndicesT | ValueT | + +------------+-----------+------------+------------+--------------+-------------+-------------+ + | elem count | 1 | 1 | 1 | rows + 1 | nnz | nnz | + +------------+-----------+------------+------------+--------------+-------------+-------------+ + +*/ + +// indptr, indices and data references the original data, so they should not be freed by the caller. +// csr_matrix must outlive them. +template +void +parse_csr_matrix(const void* csr_matrix, size_t& rows, size_t& cols, size_t& nnz, const IndPtrT*& indptr, + const IndicesT*& indices, const ValueT*& data) { + const ShapeT* header = static_cast(csr_matrix); + rows = header[0]; + cols = header[1]; + nnz = header[2]; + + std::size_t offset = 3 * sizeof(ShapeT); + + indptr = reinterpret_cast(static_cast(csr_matrix) + offset); + offset += (rows + 1) * sizeof(IndPtrT); + + indices = reinterpret_cast(static_cast(csr_matrix) + offset); + offset += nnz * sizeof(IndicesT); + + data = reinterpret_cast(static_cast(csr_matrix) + offset); +} + +// indices and data references the original data, so they should not be freed by the caller. +// csr_matrix must outlive them. +template +void +get_row(const void* csr_matrix, size_t idx, size_t& len, const IndicesT*& indices, const ValueT*& data) { + const ShapeT* header = reinterpret_cast(csr_matrix); + size_t n_rows = header[0]; + if (idx >= n_rows) { + len = 0; + indices = nullptr; + data = nullptr; + return; + } + const IndPtrT* indptr = reinterpret_cast(header + 3); + const IndicesT* csr_indices = reinterpret_cast(indptr + n_rows + 1); + const ValueT* csr_data = reinterpret_cast(csr_indices + header[2]); + + len = static_cast(indptr[idx + 1] - indptr[idx]); + indices = const_cast(&csr_indices[indptr[idx]]); + data = const_cast(&csr_data[indptr[idx]]); +} + +template +struct Neighbor { + table_t id; + dist_t distance; + + Neighbor() = default; + Neighbor(table_t id, dist_t distance) : id(id), distance(distance) { + } + + inline friend bool + operator<(const Neighbor& lhs, const Neighbor& rhs) { + return lhs.distance < rhs.distance || (lhs.distance == rhs.distance && lhs.id < rhs.id); + } + inline friend bool + operator>(const Neighbor& lhs, const Neighbor& rhs) { + return !(lhs < rhs); + } +}; + +// When pushing new elements into a MaxMinHeap, only the `capacity` largest elements are kept. +// pop()/top() returns the smallest element out of those `capacity` largest elements. +template +class MaxMinHeap { + public: + explicit MaxMinHeap(int capacity) : capacity_(capacity), pool_(capacity) { + } + void + push(table_t id, T dist) { + if (size_ < capacity_) { + pool_[size_] = {id, dist}; + size_ += 1; + std::push_heap(pool_.begin(), pool_.begin() + size_, std::greater>()); + } else if (dist > pool_[0].distance) { + sift_down(id, dist); + } + } + table_t + pop() { + std::pop_heap(pool_.begin(), pool_.begin() + size_, std::greater>()); + size_ -= 1; + return pool_[size_].id; + } + [[nodiscard]] size_t + size() const { + return size_; + } + [[nodiscard]] bool + empty() const { + return size() == 0; + } + Neighbor + top() const { + return pool_[0]; + } + [[nodiscard]] bool + full() const { + return size_ == capacity_; + } + + private: + void + sift_down(table_t id, T dist) { + size_t i = 0; + for (; 2 * i + 1 < size_;) { + size_t j = i; + size_t l = 2 * i + 1, r = 2 * i + 2; + if (pool_[l].distance < dist) { + j = l; + } + if (r < size_ && pool_[r].distance < std::min(pool_[l].distance, dist)) { + j = r; + } + if (i == j) { + break; + } + pool_[i] = pool_[j]; + i = j; + } + pool_[i] = {id, dist}; + } + + size_t size_ = 0, capacity_; + std::vector> pool_; +}; // class MaxMinHeap + +} // namespace sparse + } // namespace knowhere diff --git a/src/common/comp/brute_force.cc b/src/common/comp/brute_force.cc index 042581c8f..4cf6c6bce 100644 --- a/src/common/comp/brute_force.cc +++ b/src/common/comp/brute_force.cc @@ -343,4 +343,95 @@ BruteForce::RangeSearch(const DataSetPtr base_dataset, const DataSetPtr query_da GetRangeSearchResult(result_dist_array, result_id_array, is_ip, nq, radius, range_filter, distances, ids, lims); return GenResultDataSet(nq, ids, distances, lims); } + +expected +BruteForce::SearchSparse(const DataSetPtr base_dataset, const DataSetPtr query_dataset, const Json& config, + const BitsetView& bitset) { + auto base_csr = base_dataset->GetTensor(); + size_t rows, cols, nnz; + const int64_t* indptr; + const int32_t* indices; + const float* data; + sparse::parse_csr_matrix(base_csr, rows, cols, nnz, indptr, indices, data); + + auto xq = query_dataset->GetTensor(); + auto nq = query_dataset->GetRows(); + + BruteForceConfig cfg; + std::string msg; + auto status = Config::Load(cfg, config, knowhere::SEARCH, &msg); + if (status != Status::success) { + return expected::Err(status, std::move(msg)); + } + + std::string metric_str = cfg.metric_type.value(); + auto result = Str2FaissMetricType(metric_str); + if (result.error() != Status::success) { + return expected::Err(result.error(), result.what()); + } + if (!IsMetricType(metric_str, metric::IP)) { + return expected::Err(Status::invalid_metric_type, + "Only IP metric type is supported for sparse vector"); + } + + int topk = cfg.k.value(); + auto labels = new sparse::label_t[nq * topk]; + auto distances = new float[nq * topk]; + + auto pool = ThreadPool::GetGlobalSearchThreadPool(); + std::vector> futs; + futs.reserve(nq); + for (int i = 0; i < nq; ++i) { + futs.emplace_back(pool->push([&, index = i] { + auto cur_labels = labels + topk * index; + auto cur_distances = distances + topk * index; + std::fill(cur_labels, cur_labels + topk, -1); + std::fill(cur_distances, cur_distances + topk, std::numeric_limits::quiet_NaN()); + + size_t len; + const int32_t* cur_indices; + const float* cur_data; + sparse::get_row(xq, index, len, cur_indices, cur_data); + if (len == 0) { + return Status::success; + } + std::unordered_map query; + for (size_t j = 0; j < len; ++j) { + query[cur_indices[j]] = cur_data[j]; + } + sparse::MaxMinHeap heap(topk); + for (size_t j = 0; j < rows; ++j) { + if (!bitset.empty() && bitset.test(j)) { + continue; + } + float dist = 0.0f; + for (int64_t k = indptr[j]; k < indptr[j + 1]; ++k) { + auto it = query.find(indices[k]); + if (it != query.end()) { + dist += it->second * data[k]; + } + } + if (dist > 0) { + heap.push(j, dist); + } + } + int result_size = heap.size(); + for (int64_t j = result_size - 1; j >= 0; --j) { + cur_labels[j] = heap.top().id; + cur_distances[j] = heap.top().distance; + heap.pop(); + } + return Status::success; + })); + } + for (auto& fut : futs) { + fut.wait(); + auto ret = fut.result().value(); + if (ret != Status::success) { + return expected::Err(ret, "failed to brute force search"); + } + } + return GenResultDataSet(nq, cfg.k.value(), labels, distances); +} + } // namespace knowhere diff --git a/src/index/sparse/sparse_index_node.cc b/src/index/sparse/sparse_index_node.cc new file mode 100644 index 000000000..2199f97c2 --- /dev/null +++ b/src/index/sparse/sparse_index_node.cc @@ -0,0 +1,200 @@ +// Copyright (C) 2019-2023 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "index/hnsw/hnsw_config.h" +#include "index/sparse/sparse_inverted_index.h" +#include "index/sparse/sparse_inverted_index_config.h" +#include "io/memory_io.h" +#include "knowhere/comp/thread_pool.h" +#include "knowhere/config.h" +#include "knowhere/dataset.h" +#include "knowhere/expected.h" +#include "knowhere/factory.h" +#include "knowhere/index_node.h" +#include "knowhere/log.h" +#include "knowhere/utils.h" + +namespace knowhere { + +using label_t = sparse::label_t; + +class SparseInvertedIndexNode : public IndexNode { + public: + explicit SparseInvertedIndexNode(const int32_t& /*version*/, const Object& /*object*/, bool use_wand) + : index_(nullptr), search_pool_(ThreadPool::GetGlobalSearchThreadPool()), use_wand_(use_wand) { + } + + ~SparseInvertedIndexNode() override { + if (index_ != nullptr) { + delete index_; + index_ = nullptr; + } + } + + Status + Train(const DataSet& dataset, const Config& config) override { + auto cfg = static_cast(config); + if (!IsMetricType(cfg.metric_type.value(), metric::IP)) { + LOG_KNOWHERE_ERROR_ << Type() << " only support metric_type: IP"; + return Status::invalid_metric_type; + } + auto drop_ratio_build = cfg.drop_ratio_build.value_or(0.0f); + auto index = new sparse::InvertedIndex(); + index->SetUseWand(use_wand_); + index->SetDropRatioBuild(drop_ratio_build); + if (index_ != nullptr) { + LOG_KNOWHERE_WARNING_ << Type() << " deleting old index during train"; + delete index_; + } + index_ = index; + return Status::success; + } + + Status + Add(const DataSet& dataset, const Config& config) override { + if (!index_) { + LOG_KNOWHERE_ERROR_ << "Could not add data to empty " << Type(); + return Status::empty_index; + } + return index_->Add(dataset.GetTensor()); + } + + [[nodiscard]] expected + Search(const DataSet& dataset, const Config& config, const BitsetView& bitset) const override { + if (!index_) { + LOG_KNOWHERE_ERROR_ << "Could not search empty " << Type(); + return expected::Err(Status::empty_index, "index not loaded"); + } + auto cfg = static_cast(config); + auto nq = dataset.GetRows(); + auto queries = dataset.GetTensor(); + auto k = cfg.k.value(); + auto refine_factor = cfg.refine_factor.value_or(10); + auto drop_ratio_search = cfg.drop_ratio_search.value_or(0.0f); + + auto p_id = new label_t[nq * k]; + auto p_dist = new float[nq * k]; + + std::vector> futs; + futs.reserve(nq); + for (int64_t idx = 0; idx < nq; ++idx) { + futs.emplace_back(search_pool_->push([&, idx = idx]() { + index_->Search(queries, idx, k, drop_ratio_search, p_dist + idx * k, p_id + idx * k, refine_factor, + bitset); + })); + } + for (auto& fut : futs) { + fut.wait(); + } + return GenResultDataSet(nq, k, p_id, p_dist); + } + + [[nodiscard]] expected>> + AnnIterator(const DataSet& dataset, const Config& cfg, const BitsetView& bitset) const override { + throw std::runtime_error("annIterator not supported for current index type"); + } + + [[nodiscard]] expected + RangeSearch(const DataSet& dataset, const Config& config, const BitsetView& bitset) const override { + throw std::runtime_error("RangeSearch not supported for current index type"); + } + + [[nodiscard]] expected + GetVectorByIds(const DataSet& dataset) const override { + throw std::runtime_error("GetVectorByIds not supported for current index type"); + } + + [[nodiscard]] bool + HasRawData(const std::string& metric_type) const override { + return true; + } + + [[nodiscard]] expected + GetIndexMeta(const Config& cfg) const override { + throw std::runtime_error("GetIndexMeta not supported for current index type"); + } + + Status + Serialize(BinarySet& binset) const override { + if (!index_) { + LOG_KNOWHERE_ERROR_ << "Could not serialize empty " << Type(); + return Status::empty_index; + } + MemoryIOWriter writer; + RETURN_IF_ERROR(index_->Save(writer)); + std::shared_ptr data(writer.data()); + binset.Append(Type(), data, writer.tellg()); + return Status::success; + } + + Status + Deserialize(const BinarySet& binset, const Config& config) override { + if (index_) { + LOG_KNOWHERE_WARNING_ << Type() << " has already been created, deleting old"; + delete index_; + index_ = nullptr; + } + auto binary = binset.GetByName(Type()); + if (binary == nullptr) { + LOG_KNOWHERE_ERROR_ << "Invalid BinarySet."; + return Status::invalid_binary_set; + } + MemoryIOReader reader(binary->data.get(), binary->size); + index_ = new sparse::InvertedIndex(); + return index_->Load(reader); + } + + Status + DeserializeFromFile(const std::string& filename, const Config& config) override { + throw std::runtime_error("DeserializeFromFile not supported for current index type"); + } + + [[nodiscard]] std::unique_ptr + CreateConfig() const override { + return std::make_unique(); + } + + // note that the Dim of a sparse vector index may change as new vectors are added + [[nodiscard]] int64_t + Dim() const override { + return index_ ? index_->n_cols() : 0; + } + + [[nodiscard]] int64_t + Size() const override { + return index_ ? index_->size() : 0; + } + + [[nodiscard]] int64_t + Count() const override { + return index_ ? index_->n_rows() : 0; + } + + [[nodiscard]] std::string + Type() const override { + return use_wand_ ? knowhere::IndexEnum::INDEX_SPARSE_WAND : knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX; + } + + private: + sparse::InvertedIndex* index_; + std::shared_ptr search_pool_; + const bool use_wand_; +}; // class SparseInvertedIndexNode + +KNOWHERE_REGISTER_GLOBAL(SPARSE_INVERTED_INDEX, [](const int32_t& version, const Object& object) { + return Index::Create(version, object, /* use_wand = */ false); +}); + +KNOWHERE_REGISTER_GLOBAL(SPARSE_WAND, [](const int32_t& version, const Object& object) { + return Index::Create(version, object, /* use_wand = */ true); +}); + +} // namespace knowhere diff --git a/src/index/sparse/sparse_inverted_index.h b/src/index/sparse/sparse_inverted_index.h new file mode 100644 index 000000000..bb1b852ab --- /dev/null +++ b/src/index/sparse/sparse_inverted_index.h @@ -0,0 +1,444 @@ +// Copyright (C) 2019-2023 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#ifndef SPARSE_INVERTED_INDEX_H +#define SPARSE_INVERTED_INDEX_H + +#include +#include +#include + +#include "io/memory_io.h" +#include "knowhere/bitsetview.h" +#include "knowhere/expected.h" +#include "knowhere/utils.h" + +namespace knowhere::sparse { + +// Not thread safe, concurrent access must be protected. Concurrent read operations are allowed. +// TODO: make class thread safe so we can perform concurrent add/search. +template +class InvertedIndex { + public: + explicit InvertedIndex() { + indptr_.push_back(0); + } + + void + SetUseWand(bool use_wand) { + use_wand_ = use_wand; + } + + void + SetDropRatioBuild(float drop_ratio_build) { + drop_ratio_build_ = drop_ratio_build; + } + + Status + Save(MemoryIOWriter& writer) { + writeBinaryPOD(writer, use_wand_); + writeBinaryPOD(writer, drop_ratio_build_); + writeBinaryPOD(writer, n_rows_); + writeBinaryPOD(writer, n_cols_); + writeBinaryPOD(writer, nnz_); + for (size_t i = 0; i <= n_rows_; ++i) { + writeBinaryPOD(writer, indptr_[i]); + } + for (size_t i = 0; i < nnz_; ++i) { + writeBinaryPOD(writer, data_[i].first); + writeBinaryPOD(writer, data_[i].second); + } + for (size_t i = 0; i < n_cols_; ++i) { + auto lut = inverted_lut_[i]; + writeBinaryPOD(writer, lut.size()); + for (auto [idx, val] : lut) { + writeBinaryPOD(writer, idx); + writeBinaryPOD(writer, val); + } + } + if (use_wand_) { + for (size_t i = 0; i < n_cols_; ++i) { + writeBinaryPOD(writer, max_in_dim_[i]); + } + } + return Status::success; + } + + Status + Load(MemoryIOReader& reader) { + readBinaryPOD(reader, use_wand_); + readBinaryPOD(reader, drop_ratio_build_); + readBinaryPOD(reader, n_rows_); + readBinaryPOD(reader, n_cols_); + readBinaryPOD(reader, nnz_); + indptr_.resize(n_rows_ + 1); + for (size_t i = 0; i <= n_rows_; ++i) { + readBinaryPOD(reader, indptr_[i]); + } + data_.resize(nnz_); + for (size_t i = 0; i < nnz_; ++i) { + readBinaryPOD(reader, data_[i].first); + readBinaryPOD(reader, data_[i].second); + } + inverted_lut_.resize(n_cols_); + for (size_t i = 0; i < n_cols_; ++i) { + size_t lut_size; + readBinaryPOD(reader, lut_size); + inverted_lut_[i].resize(lut_size); + for (size_t j = 0; j < lut_size; ++j) { + readBinaryPOD(reader, inverted_lut_[i][j].id); + readBinaryPOD(reader, inverted_lut_[i][j].distance); + } + } + if (use_wand_) { + max_in_dim_.resize(n_cols_); + for (size_t i = 0; i < n_cols_; ++i) { + readBinaryPOD(reader, max_in_dim_[i]); + } + } + return Status::success; + } + + template + Status + Add(const void* csr_matrix) { + size_t rows, cols, nnz; + const IndPtrT* indptr; + const IndicesT* indices; + const T* data; + parse_csr_matrix(csr_matrix, rows, cols, nnz, indptr, indices, data); + + // TODO: benchmark performance: for growing segments with lots of small + // csr_matrix to add, it may be better to rely on the vector's internal + // memory management to avoid frequent reallocations caused by reserve. + data_.reserve(nnz_ + nnz); + for (size_t i = 0; i < nnz; ++i) { + data_.emplace_back(indices[i], data[i]); + } + + for (size_t i = 1; i < rows + 1; ++i) { + indptr_.push_back(nnz_ + indptr[i]); + auto start = *(indptr_.rbegin() + 1); + auto end = *(indptr_.rbegin()); + // make sure each row in data_ is sorted by index + std::sort(data_.begin() + start, data_.begin() + end); + } + + if (n_cols_ < cols) { + n_cols_ = cols; + inverted_lut_.resize(n_cols_); + if (use_wand_) { + max_in_dim_.resize(n_cols_); + } + } + + for (size_t i = n_rows_; i < n_rows_ + rows; ++i) { + for (IndPtrT j = indptr_[i]; j < indptr_[i + 1]; ++j) { + auto [idx, val] = data_[j]; + inverted_lut_[idx].emplace_back(i, val); + if (use_wand_) { + max_in_dim_[idx] = std::max(max_in_dim_[idx], val); + } + } + } + + n_rows_ += rows; + nnz_ += nnz; + return Status::success; + } + + template + void + Search(const void* query_csr_matrix, int64_t q_id, size_t k, float drop_ratio_search, float* distances, + label_t* labels, size_t refine_factor, const BitsetView& bitset) const { + size_t len; + const IndicesT* indices; + const T* data; + get_row(query_csr_matrix, q_id, len, indices, data); + + // initially set result distances to NaN and labels to -1 + std::fill(distances, distances + k, std::numeric_limits::quiet_NaN()); + std::fill(labels, labels + k, -1); + if (len == 0) { + return; + } + + // if no data was dropped during both build and search, no refinement is needed. + if (drop_ratio_build_ == 0 && drop_ratio_search == 0) { + refine_factor = 1; + } + + std::vector> q_vec(len); + for (size_t i = 0; i < len; ++i) { + q_vec[i] = std::make_pair(indices[i], data[i]); + } + std::sort(q_vec.begin(), q_vec.end(), + [](const auto& lhs, const auto& rhs) { return std::abs(lhs.second) > std::abs(rhs.second); }); + + MaxMinHeap heap(k * refine_factor); + if (!use_wand_) { + search_brute_force(q_vec, drop_ratio_search, heap, bitset); + } else { + search_wand(q_vec, drop_ratio_search, heap, bitset); + } + + // no refinement needed + if (refine_factor == 1) { + collect_result(heap, distances, labels); + } else { + std::sort(q_vec.begin(), q_vec.end(), + [](const auto& lhs, const auto& rhs) { return lhs.first < rhs.first; }); + refine_and_collect(q_vec, heap, k, distances, labels); + } + } + + [[nodiscard]] size_t + size() const { + size_t res = 0; + res += sizeof(*this); + res += sizeof(std::pair) * data_.capacity(); + res += sizeof(table_t) * indptr_.capacity(); + res += sizeof(std::vector>) * inverted_lut_.capacity(); + for (auto& lut : inverted_lut_) { + res += sizeof(Neighbor) * lut.capacity(); + } + if (use_wand_) { + res += sizeof(T) * max_in_dim_.capacity(); + } + return res; + } + + [[nodiscard]] size_t + n_rows() const { + return n_rows_; + } + + [[nodiscard]] size_t + n_cols() const { + return n_cols_; + } + + private: + [[nodiscard]] float + dot_product(const std::vector>& q_vec, table_t u) const { + float res = 0.0f; + table_t pu = indptr_[u]; + table_t pq = 0; + while (pu < indptr_[u + 1] && pq < q_vec.size()) { + auto [idx, val] = data_[pu]; + auto [q_idx, q_val] = q_vec[pq]; + if (idx == q_idx) { + res += float(val) * float(q_val); + pu++; + pq++; + } else if (idx < q_idx) { + pu++; + } else { + pq++; + } + } + return res; + } + + // find the top-k candidates using brute force search, k as specified by the capacity of the heap. + void + search_brute_force(const std::vector>& q_vec, float drop_ratio, MaxMinHeap& heap, + const BitsetView& bitset) const { + std::vector scores(n_rows_, 0.0f); + for (auto [i, v] : q_vec) { + for (size_t j = 0; j < inverted_lut_[i].size(); j++) { + auto [idx, val] = inverted_lut_[i][j]; + scores[idx] += v * float(val); + } + if (q_vec[0].second * drop_ratio > v) { + break; + } + } + for (size_t i = 0; i < n_rows_; ++i) { + if ((bitset.empty() || !bitset.test(i)) && scores[i] != 0) { + heap.push(i, scores[i]); + } + } + } + + class Cursor { + public: + Cursor(const std::vector>& lut, size_t num_vec, float max_score, float q_value, + const BitsetView bitset) + : lut_(lut), num_vec_(num_vec), max_score_(max_score), q_value_(q_value), bitset_(bitset) { + while (loc_ < lut_.size() && !bitset_.empty() && bitset_.test(cur_vec_id())) { + loc_++; + } + } + Cursor(const Cursor& rhs) = delete; + + void + next() { + loc_++; + while (loc_ < lut_.size() && !bitset_.empty() && bitset_.test(cur_vec_id())) { + loc_++; + } + } + // advance loc until cur_vec_id() >= vec_id + void + seek(table_t vec_id) { + while (loc_ < lut_.size() && cur_vec_id() < vec_id) { + next(); + } + } + [[nodiscard]] table_t + cur_vec_id() const { + if (is_end()) { + return num_vec_; + } + return lut_[loc_].id; + } + T + cur_distance() const { + return lut_[loc_].distance; + } + [[nodiscard]] bool + is_end() const { + return loc_ >= size(); + } + [[nodiscard]] float + q_value() const { + return q_value_; + } + [[nodiscard]] size_t + size() const { + return lut_.size(); + } + [[nodiscard]] float + max_score() const { + return max_score_; + } + + private: + const std::vector>& lut_; + size_t loc_ = 0; + size_t num_vec_ = 0; + float max_score_ = 0.0f; + float q_value_ = 0.0f; + const BitsetView bitset_; + }; // class Cursor + + void + search_wand(std::vector>& q_vec, float drop_ratio, MaxMinHeap& heap, + const BitsetView& bitset) const { + auto q_dim = q_vec.size(); + std::vector> cursors(q_dim); + for (size_t i = 0; i < q_dim; ++i) { + auto [idx, val] = q_vec[i]; + if (q_vec[0].second * drop_ratio > val) { + cursors.resize(i); + break; + } + cursors[i] = std::make_shared(inverted_lut_[idx], n_rows_, max_in_dim_[idx] * val, val, bitset); + } + auto sort_cursors = [&cursors] { + std::sort(cursors.begin(), cursors.end(), + [](auto& x, auto& y) { return x->cur_vec_id() < y->cur_vec_id(); }); + }; + sort_cursors(); + auto score_above_threshold = [&heap](float x) { return !heap.full() || x > heap.top().distance; }; + while (true) { + float upper_bound = 0; + size_t pivot; + bool found_pivot = false; + for (pivot = 0; pivot < cursors.size(); ++pivot) { + if (cursors[pivot]->is_end()) { + break; + } + upper_bound += cursors[pivot]->max_score(); + if (score_above_threshold(upper_bound)) { + found_pivot = true; + break; + } + } + if (!found_pivot) { + break; + } + table_t pivot_id = cursors[pivot]->cur_vec_id(); + if (pivot_id == cursors[0]->cur_vec_id()) { + float score = 0; + for (auto& cursor : cursors) { + if (cursor->cur_vec_id() != pivot_id) { + break; + } + score += cursor->cur_distance() * cursor->q_value(); + cursor->next(); + } + heap.push(pivot_id, score); + sort_cursors(); + } else { + uint64_t next_list = pivot; + for (; cursors[next_list]->cur_vec_id() == pivot_id; --next_list) { + } + cursors[next_list]->seek(pivot_id); + for (size_t i = next_list + 1; i < cursors.size(); ++i) { + if (cursors[i]->cur_vec_id() >= cursors[i - 1]->cur_vec_id()) { + break; + } + std::swap(cursors[i], cursors[i - 1]); + } + } + } + } + + void + refine_and_collect(const std::vector>& q_vec, MaxMinHeap& inaccurate, size_t k, + float* distances, label_t* labels) const { + std::priority_queue, std::vector>, std::greater>> heap; + + while (!inaccurate.empty()) { + auto [u, d] = inaccurate.top(); + inaccurate.pop(); + + auto dist_acc = dot_product(q_vec, u); + if (heap.size() < k) { + heap.emplace(u, dist_acc); + } else if (heap.top().distance < dist_acc) { + heap.pop(); + heap.emplace(u, dist_acc); + } + } + collect_result(heap, distances, labels); + } + + template + void + collect_result(HeapType& heap, float* distances, label_t* labels) const { + int cnt = heap.size(); + for (auto i = cnt - 1; i >= 0; --i) { + labels[i] = heap.top().id; + distances[i] = heap.top().distance; + heap.pop(); + } + } + + size_t n_rows_ = 0; + size_t n_cols_ = 0; + size_t nnz_ = 0; + std::vector>> inverted_lut_; + + std::vector> data_; + std::vector indptr_; + + bool use_wand_ = false; + float drop_ratio_build_ = 0; + std::vector max_in_dim_; + +}; // class InvertedIndex + +} // namespace knowhere::sparse + +#endif // SPARSE_INVERTED_INDEX_H diff --git a/src/index/sparse/sparse_inverted_index_config.h b/src/index/sparse/sparse_inverted_index_config.h new file mode 100644 index 000000000..d142a9498 --- /dev/null +++ b/src/index/sparse/sparse_inverted_index_config.h @@ -0,0 +1,47 @@ +// Copyright (C) 2019-2023 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#ifndef SPARSE_INVERTED_INDEX_CONFIG_H +#define SPARSE_INVERTED_INDEX_CONFIG_H + +#include "knowhere/comp/index_param.h" +#include "knowhere/config.h" + +namespace knowhere { + +class SparseInvertedIndexConfig : public BaseConfig { + public: + CFG_FLOAT drop_ratio_build; + CFG_FLOAT drop_ratio_search; + CFG_INT refine_factor; + KNOHWERE_DECLARE_CONFIG(SparseInvertedIndexConfig) { + KNOWHERE_CONFIG_DECLARE_FIELD(drop_ratio_build) + .description("drop ratio for build") + .set_default(0.0f) + .set_range(0.0f, 1.0f) + .for_train(); + KNOWHERE_CONFIG_DECLARE_FIELD(drop_ratio_search) + .description("drop ratio for search") + .set_default(0.0f) + .set_range(0.0f, 1.0f) + .for_search() + .for_range_search(); + KNOWHERE_CONFIG_DECLARE_FIELD(refine_factor) + .description("refine factor") + .set_default(10) + .for_search() + .for_range_search(); + } +}; // class SparseInvertedIndexConfig + +} // namespace knowhere + +#endif // SPARSE_INVERTED_INDEX_CONFIG_H diff --git a/tests/ut/test_sparse.cc b/tests/ut/test_sparse.cc new file mode 100644 index 000000000..439ee1380 --- /dev/null +++ b/tests/ut/test_sparse.cc @@ -0,0 +1,168 @@ +// Copyright (C) 2019-2023 Zilliz. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations under the License. + +#include "catch2/catch_test_macros.hpp" +#include "catch2/generators/catch_generators.hpp" +#include "knowhere/bitsetview.h" +#include "knowhere/comp/brute_force.h" +#include "knowhere/comp/index_param.h" +#include "knowhere/comp/knowhere_config.h" +#include "knowhere/factory.h" +#include "utils.h" + +TEST_CASE("Test Mem Sparse Index With Float Vector", "[float metrics]") { + auto [nb, dim, doc_sparsity, query_sparsity] = GENERATE(table({ + // 300 dim, avg doc nnz 12, avg query nnz 9 + {2000, 300, 0.95, 0.97}, + // 300 dim, avg doc nnz 9, avg query nnz 3 + {2000, 300, 0.97, 0.99}, + // 3000 dim, avg doc nnz 90, avg query nnz 30 + {20000, 3000, 0.97, 0.99}, + })); + auto topk = 5; + int64_t nq = GENERATE(10, 100); + + auto [drop_ratio_build, drop_ratio_search] = GENERATE(table({ + {0.0, 0.0}, + {0.0, 0.32}, + {0.32, 0.6}, + })); + + auto metric = knowhere::metric::IP; + auto version = GenTestVersionList(); + + auto base_gen = [=]() { + knowhere::Json json; + json[knowhere::meta::DIM] = dim; + json[knowhere::meta::METRIC_TYPE] = metric; + json[knowhere::meta::TOPK] = topk; + return json; + }; + + auto sparse_inverted_index_gen = [base_gen, drop_ratio_build, drop_ratio_search]() { + knowhere::Json json = base_gen(); + json[knowhere::indexparam::DROP_RATIO_BUILD] = drop_ratio_build; + json[knowhere::indexparam::DROP_RATIO_SEARCH] = drop_ratio_search; + return json; + }; + + const auto train_ds = GenSparseDataSet(nb, dim, doc_sparsity); + const auto query_ds = GenSparseDataSet(nq, dim, query_sparsity); + + const knowhere::Json conf = { + {knowhere::meta::METRIC_TYPE, metric}, + {knowhere::meta::TOPK, topk}, + }; + + auto check_distance_decreasing = [](const knowhere::DataSet& ds) { + auto nq = ds.GetRows(); + auto k = ds.GetDim(); + auto* distances = ds.GetDistance(); + auto* ids = ds.GetIds(); + for (auto i = 0; i < nq; ++i) { + for (auto j = 0; j < k - 1; ++j) { + if (ids[i * k + j] == -1 || ids[i * k + j + 1] == -1) { + break; + } + REQUIRE(distances[i * k + j] >= distances[i * k + j + 1]); + } + } + }; + + auto check_result_match_filter = [](const knowhere::DataSet& ds, const knowhere::BitsetView& bitset) { + auto nq = ds.GetRows(); + auto k = ds.GetDim(); + auto* ids = ds.GetIds(); + for (auto i = 0; i < nq; ++i) { + for (auto j = 0; j < k; ++j) { + if (ids[i * k + j] == -1) { + break; + } + REQUIRE(!bitset.test(ids[i * k + j])); + } + } + }; + + auto gt = knowhere::BruteForce::SearchSparse(train_ds, query_ds, conf, nullptr); + check_distance_decreasing(*gt.value()); + + SECTION("Test Search") { + using std::make_tuple; + auto [name, gen] = GENERATE_REF(table>({ + make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, sparse_inverted_index_gen), + make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, sparse_inverted_index_gen), + })); + auto idx = knowhere::IndexFactory::Instance().Create(name, version); + auto cfg_json = gen().dump(); + CAPTURE(name, cfg_json); + knowhere::Json json = knowhere::Json::parse(cfg_json); + REQUIRE(idx.Type() == name); + REQUIRE(idx.Build(*train_ds, json) == knowhere::Status::success); + REQUIRE(idx.Size() > 0); + REQUIRE(idx.Count() == nb); + + knowhere::BinarySet bs; + REQUIRE(idx.Serialize(bs) == knowhere::Status::success); + REQUIRE(idx.Deserialize(bs, json) == knowhere::Status::success); + + auto results = idx.Search(*query_ds, json, nullptr); + REQUIRE(results.has_value()); + float recall = GetKNNRecall(*gt.value(), *results.value()); + check_distance_decreasing(*results.value()); + auto drop_ratio_build = json[knowhere::indexparam::DROP_RATIO_BUILD].get(); + auto drop_ratio_search = json[knowhere::indexparam::DROP_RATIO_SEARCH].get(); + if (drop_ratio_build == 0 && drop_ratio_search == 0) { + REQUIRE(recall == 1); + } else { + // most test cases are above 0.95, only a few between 0.9 and 0.95 + REQUIRE(recall >= 0.85); + } + } + + SECTION("Test Search with Bitset") { + using std::make_tuple; + auto [name, gen] = GENERATE_REF(table>({ + make_tuple(knowhere::IndexEnum::INDEX_SPARSE_INVERTED_INDEX, sparse_inverted_index_gen), + make_tuple(knowhere::IndexEnum::INDEX_SPARSE_WAND, sparse_inverted_index_gen), + })); + auto idx = knowhere::IndexFactory::Instance().Create(name, version); + auto cfg_json = gen().dump(); + CAPTURE(name, cfg_json); + knowhere::Json json = knowhere::Json::parse(cfg_json); + REQUIRE(idx.Type() == name); + REQUIRE(idx.Build(*train_ds, json) == knowhere::Status::success); + REQUIRE(idx.Size() > 0); + REQUIRE(idx.Count() == nb); + + auto gen_bitset_fn = GENERATE(GenerateBitsetWithFirstTbitsSet, GenerateBitsetWithRandomTbitsSet); + auto bitset_percentages = GENERATE(0.4f, 0.9f); + + auto bitset_data = gen_bitset_fn(nb, bitset_percentages * nb); + knowhere::BitsetView bitset(bitset_data.data(), nb); + auto filter_gt = knowhere::BruteForce::SearchSparse(train_ds, query_ds, conf, bitset); + check_result_match_filter(*filter_gt.value(), bitset); + + auto results = idx.Search(*query_ds, json, bitset); + check_result_match_filter(*results.value(), bitset); + + REQUIRE(results.has_value()); + float recall = GetKNNRecall(*filter_gt.value(), *results.value()); + check_distance_decreasing(*results.value()); + + auto drop_ratio_build = json[knowhere::indexparam::DROP_RATIO_BUILD].get(); + auto drop_ratio_search = json[knowhere::indexparam::DROP_RATIO_SEARCH].get(); + if (drop_ratio_build == 0 && drop_ratio_search == 0) { + REQUIRE(recall == 1); + } else { + REQUIRE(recall >= 0.8); + } + } +} diff --git a/tests/ut/utils.h b/tests/ut/utils.h index 61e6fdaac..ef75e3cd2 100644 --- a/tests/ut/utils.h +++ b/tests/ut/utils.h @@ -232,3 +232,71 @@ inline auto GenTestVersionList() { return GENERATE(as{}, knowhere::Version::GetCurrentVersion().VersionNumber()); } + +// Generate a sparse dataset with given sparsity. The indices of each row may be not ordered. +template +inline knowhere::DataSetPtr +GenSparseDataSet(ShapeT rows, ShapeT cols, float sparsity, int seed = 42) { + ShapeT nnz = static_cast(rows * cols * (1.0f - sparsity)); + + std::mt19937 rng(seed); + auto real_distrib = std::uniform_real_distribution(0, 1); + auto int_distrib = std::uniform_int_distribution(0, 100); + + auto row_distrib = std::uniform_int_distribution(0, rows - 1); + auto col_distrib = std::uniform_int_distribution(0, cols - 1); + auto val_gen = [&rng, &real_distrib, &int_distrib]() -> ValueT { + if constexpr (std::is_floating_point::value) { + return real_distrib(rng); + } else { + static_assert(std::is_integral::value); + return static_cast(int_distrib(rng)); + } + }; + + std::vector> data(rows); + + for (ShapeT i = 0; i < nnz; ++i) { + auto row = row_distrib(rng); + while (data[row].size() == cols) { + row = row_distrib(rng); + } + auto col = col_distrib(rng); + while (data[row].find(col) != data[row].end()) { + col = col_distrib(rng); + } + auto val = val_gen(); + data[row][col] = val; + } + + int size = 3 * sizeof(ShapeT) + nnz * (sizeof(ValueT) + sizeof(IndicesT)) + (rows + 1) * sizeof(IndPtrT); + void* ts = new char[size]; + memset(ts, 0, size); + + char* p = static_cast(ts); + std::memcpy(p, &rows, sizeof(ShapeT)); + p += sizeof(ShapeT); + std::memcpy(p, &cols, sizeof(ShapeT)); + p += sizeof(ShapeT); + std::memcpy(p, &nnz, sizeof(ShapeT)); + p += sizeof(ShapeT); + + IndPtrT* indptr = reinterpret_cast(p); + p += (rows + 1) * sizeof(IndPtrT); + IndicesT* indices = reinterpret_cast(p); + p += nnz * sizeof(IndicesT); + ValueT* data_p = reinterpret_cast(p); + + for (ShapeT i = 0; i < rows; ++i) { + indptr[i + 1] = indptr[i] + static_cast(data[i].size()); + size_t cnt = 0; + for (auto [idx, val] : data[i]) { + indices[indptr[i] + cnt] = idx; + data_p[indptr[i] + cnt] = val; + cnt++; + } + } + auto ds = knowhere::GenDataSet(rows, cols, ts); + ds->SetIsOwner(true); + return ds; +}