diff --git a/benchmark/local_infinity/CMakeLists.txt b/benchmark/local_infinity/CMakeLists.txt index 85a7577b34..f9a0adb20b 100644 --- a/benchmark/local_infinity/CMakeLists.txt +++ b/benchmark/local_infinity/CMakeLists.txt @@ -82,6 +82,25 @@ target_link_libraries( jma ) +# ######################################## +add_executable(sparse_benchmark + ./sparse/sparse_benchmark.cpp +) + +target_include_directories(sparse_benchmark PUBLIC "${CMAKE_SOURCE_DIR}/src") +target_link_libraries( + sparse_benchmark + infinity_core + benchmark_profiler + sql_parser + onnxruntime_mlas + zsv_parser + newpfor + fastpfor + lz4.a + atomic.a + jma +) if(ENABLE_JEMALLOC) target_link_libraries(infinity_benchmark jemalloc.a) diff --git a/benchmark/local_infinity/sparse/sparse_benchmark.cpp b/benchmark/local_infinity/sparse/sparse_benchmark.cpp new file mode 100644 index 0000000000..12166416e9 --- /dev/null +++ b/benchmark/local_infinity/sparse/sparse_benchmark.cpp @@ -0,0 +1,247 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +import stl; +import file_system; +import local_file_system; +import file_system_type; +import compilation_config; +import third_party; +import profiler; + +import linscan_alg; +import sparse_iter; + +using namespace infinity; + +// const f32 error_bound = 1e-6; +const int log_interval = 10000; + +SparseMatrix DecodeSparseDataset(const Path &data_path) { + SparseMatrix ret; + + LocalFileSystem fs; + if (!fs.Exists(data_path)) { + throw std::runtime_error(fmt::format("Data path: {} does not exist.", data_path.string())); + } + UniquePtr file_handler = fs.OpenFile(data_path.string(), FileFlags::READ_FLAG, FileLockType::kNoLock); + i64 nrow = 0; + i64 ncol = 0; + i64 nnz = 0; + file_handler->Read(&nrow, sizeof(nrow)); + file_handler->Read(&ncol, sizeof(ncol)); + file_handler->Read(&nnz, sizeof(nnz)); + + auto indptr = MakeUnique(nrow + 1); + file_handler->Read(indptr.get(), sizeof(i64) * (nrow + 1)); + if (indptr[nrow] != nnz) { + throw std::runtime_error("Invalid indptr."); + } + + auto indices = MakeUnique(nnz); + file_handler->Read(indices.get(), sizeof(i32) * nnz); + // assert all element in indices >= 0 and < ncol + { + bool check = std::all_of(indices.get(), indices.get() + nnz, [ncol](i32 ele) { return ele >= 0 && ele < ncol; }); + if (!check) { + throw std::runtime_error("Invalid indices."); + } + } + + auto data = MakeUnique(nnz); + file_handler->Read(data.get(), sizeof(f32) * nnz); + return {std::move(data), std::move(indices), std::move(indptr), nrow, ncol, nnz}; +} + +Pair, UniquePtr> DecodeGroundtruth(const Path &groundtruth_path, u32 top_k, u32 query_n) { + LocalFileSystem fs; + if (!fs.Exists(groundtruth_path)) { + throw std::runtime_error(fmt::format("Groundtruth path: {} does not exist.", groundtruth_path.string())); + } + UniquePtr file_handler = fs.OpenFile(groundtruth_path.string(), FileFlags::READ_FLAG, FileLockType::kNoLock); + SizeT file_size = fs.GetFileSize(*file_handler); + if (file_size != sizeof(u32) * 2 + (sizeof(u32) + sizeof(float)) * (query_n * top_k)) { + throw std::runtime_error("Invalid groundtruth file format"); + } + { + u32 ans_n = 0; + file_handler->Read(&ans_n, sizeof(ans_n)); + u32 top_k1 = 0; + file_handler->Read(&top_k1, sizeof(top_k1)); + if (ans_n != query_n || top_k1 != top_k) { + throw std::runtime_error("Invalid groundtruth file format"); + } + } + auto indices = MakeUnique(query_n * top_k); + file_handler->Read(indices.get(), sizeof(u32) * query_n * top_k); + auto scores = MakeUnique(query_n * top_k); + file_handler->Read(scores.get(), sizeof(f32) * query_n * top_k); + return {std::move(indices), std::move(scores)}; +} + +void ImportData(LinScan &index, const Path &data_path) { + SparseMatrix mat = DecodeSparseDataset(data_path); + for (SparseMatrixIter iter(mat); iter.HasNext(); iter.Next()) { + SparseVecRef vec = iter.val(); + u32 doc_id = iter.row_id(); + index.Insert(vec, doc_id); + + if (log_interval != 0 && doc_id % log_interval == 0) { + std::cout << fmt::format("Inserting doc {}\n", doc_id); + } + } +} + +Vector, Vector>> QueryData(const LinScan &index, u32 top_k, const Path &query_path) { + Vector, Vector>> res; + SparseMatrix mat = DecodeSparseDataset(query_path); + for (SparseMatrixIter iter(mat); iter.HasNext(); iter.Next()) { + SparseVecRef query = iter.val(); + auto [indices, score] = index.Query(query, top_k); + res.emplace_back(std::move(indices), std::move(score)); + + if (log_interval != 0 && iter.row_id() % log_interval == 0) { + std::cout << fmt::format("Querying doc {}\n", iter.row_id()); + } + } + return res; +} + +void PrintQuery(u32 query_id, const u32 *gt_indices, const f32 *gt_scores, u32 gt_size, const Vector &indices, const Vector &scores) { + std::cout << fmt::format("Query {}\n", query_id); + for (u32 i = 0; i < gt_size; ++i) { + std::cout << fmt::format("{} {}, ", indices[i], scores[i]); + } + std::cout << "\n"; + for (u32 i = 0; i < gt_size; ++i) { + std::cout << fmt::format("{} {}, ", gt_indices[i], gt_scores[i]); + } + std::cout << "\n"; +} + +f32 CheckGroundtruth(const Path &groundtruth_path, const Vector, Vector>> &results, u32 top_k) { + u32 query_n = results.size(); + auto [gt_indices_list, gt_score_list] = DecodeGroundtruth(groundtruth_path, top_k, query_n); + SizeT recall_n = 0; + for (u32 i = 0; i < results.size(); ++i) { + const auto &[indices, scores] = results[i]; + const u32 *gt_indices = gt_indices_list.get() + i * top_k; + + // const f32 *gt_score = gt_score_list.get() + i * top_k; + // PrintQuery(i, gt_indices, gt_score, top_k, indices, scores); + HashSet indices_set(indices.begin(), indices.end()); + for (u32 j = 0; j < top_k; ++j) { + if (indices_set.contains(gt_indices[j])) { + ++recall_n; + } + } + } + f32 recall = static_cast(recall_n) / (query_n * top_k); + return recall; +} + +int main(int argc, char *argv[]) { + CLI::App app{"sparse_benchmark"}; + + // enum class ModeType : i8 { + // kImport, + // kQuery, + // }; + // Map mode_type_map = { + // {"import", ModeType::kImport}, + // {"query", ModeType::kQuery}, + // }; + // ModeType mode_type = ModeType::kImport; + // app.add_option("--mode", mode_type, "Mode type")->required()->transform(CLI::CheckedTransformer(mode_type_map, CLI::ignore_case)); + + enum class DataSetType : u8 { + kSmall, + k1M, + kFull, + }; + Map dataset_type_map = { + {"small", DataSetType::kSmall}, + {"1M", DataSetType::k1M}, + {"full", DataSetType::kFull}, + }; + DataSetType dataset_type = DataSetType::kSmall; + app.add_option("--dataset", dataset_type, "Dataset type")->required()->transform(CLI::CheckedTransformer(dataset_type_map, CLI::ignore_case)); + + try { + app.parse(argc, argv); + } catch (const CLI::ParseError &e) { + return app.exit(e); + } + + Path dataset_dir = Path(test_data_path()) / "benchmark" / "splade"; + Path query_path = dataset_dir / "queries.dev.csr"; + Path data_path = dataset_dir; + Path groundtruth_path = dataset_dir; + switch (dataset_type) { + case DataSetType::kSmall: { + data_path /= "base_small.csr"; + groundtruth_path /= "base_small.dev.gt"; + break; + } + case DataSetType::k1M: { + data_path /= "base_1M.csr"; + groundtruth_path /= "base_1M.dev.gt"; + break; + } + case DataSetType::kFull: { + data_path /= "base_full.csr"; + groundtruth_path /= "base_full.dev.gt"; + break; + } + default: { + throw std::runtime_error(fmt::format("Unsupported dataset type: {}.", static_cast(dataset_type))); + } + }; + u32 top_k = 10; + + // switch (mode_type) { + // case ModeType::kImport: { + // ImportData(data_path); + // break; + // } + // case ModeType::kQuery: { + // throw std::runtime_error("Not implemented."); + // return 1; + // } + // default: { + // throw std::runtime_error(fmt::format("Unsupported mode type: {}.", static_cast(mode_type))); + // } + // } + BaseProfiler profiler; + + LinScan index; + + profiler.Begin(); + ImportData(index, data_path); + profiler.End(); + std::cout << fmt::format("Import data time: {}\n", profiler.ElapsedToString(1000)); + + profiler.Begin(); + auto query_result = QueryData(index, top_k, query_path); + profiler.End(); + std::cout << fmt::format("Query data time: {}\n", profiler.ElapsedToString(1000)); + + f32 recall = CheckGroundtruth(groundtruth_path, query_result, top_k); + std::cout << fmt::format("Recall: {}\n", recall); + return 0; +} diff --git a/src/storage/knn_index/sparse/linscan_alg.cpp b/src/storage/knn_index/sparse/linscan_alg.cpp new file mode 100644 index 0000000000..8e97c437fb --- /dev/null +++ b/src/storage/knn_index/sparse/linscan_alg.cpp @@ -0,0 +1,62 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +#include +#include + +module linscan_alg; + +import stl; + +namespace infinity { + +void LinScan::Insert(const SparseVecRef &vec, u32 doc_id) { + for (i32 i = 0; i < vec.nnz_; ++i) { + u32 indice = vec.indices_[i]; + f32 val = vec.data_[i]; + Posting posting{doc_id, val}; + inverted_idx_[indice].push_back(posting); + } + ++row_num_; +} + +Pair, Vector> LinScan::Query(const SparseVecRef &query, u32 top_k) const { + auto scores = MakeUnique(row_num_); + for (i32 i = 0; i < query.nnz_; ++i) { + u32 indice = query.indices_[i]; + f32 val = query.data_[i]; + + auto it = inverted_idx_.find(indice); + if (it == inverted_idx_.end()) { + continue; + } + const auto &posting_list = it->second; + for (const auto &posting : posting_list) { + scores[posting.doc_id_] += val * posting.val_; + } + } + + u32 result_n = std::min((u32)top_k, row_num_); + Vector res(row_num_); + std::iota(res.begin(), res.end(), 0); + std::partial_sort(res.begin(), res.begin() + result_n, res.end(), [&scores](u32 i1, u32 i2) { return scores[i1] > scores[i2]; }); + res.resize(result_n); + Vector res_score(result_n); + std::transform(res.begin(), res.end(), res_score.begin(), [&scores](u32 i) { return scores[i]; }); + return {std::move(res), std::move(res_score)}; +} + +} // namespace infinity \ No newline at end of file diff --git a/src/storage/knn_index/sparse/linscan_alg.cppm b/src/storage/knn_index/sparse/linscan_alg.cppm new file mode 100644 index 0000000000..e567739033 --- /dev/null +++ b/src/storage/knn_index/sparse/linscan_alg.cppm @@ -0,0 +1,42 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +export module linscan_alg; + +import stl; +import sparse_iter; + +namespace infinity { + +struct Posting { + u32 doc_id_; + f32 val_; +}; + +export class LinScan { +public: + void Insert(const SparseVecRef &vec, u32 doc_id); + + Pair, Vector> Query(const SparseVecRef &query, u32 top_k) const; + + u32 row_num() const { return row_num_; } + +private: + HashMap> inverted_idx_; + u32 row_num_{}; +}; + +} // namespace infinity \ No newline at end of file diff --git a/src/storage/knn_index/sparse/sparse_iter.cppm b/src/storage/knn_index/sparse/sparse_iter.cppm new file mode 100644 index 0000000000..2dccdf6bea --- /dev/null +++ b/src/storage/knn_index/sparse/sparse_iter.cppm @@ -0,0 +1,64 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +module; + +export module sparse_iter; + +import stl; + +namespace infinity { + +export struct SparseMatrix { + UniquePtr data_{}; + UniquePtr indices_{}; + UniquePtr indptr_{}; // row i's data and indice is stored in data_[indptr_[i]:indptr_[i+1]], indices_[indptr_[i]:indptr_[i+1]] + i64 nrow_{}; + i64 ncol_{}; + i64 nnz_{}; +}; + +export struct SparseVecRef { + const f32 *data_; + const u32 *indices_; + i32 nnz_; +}; + +export class SparseMatrixIter { +public: + SparseMatrixIter(const SparseMatrix &mat) : mat_(mat) {} + + bool HasNext() { return row_i_ < mat_.nrow_; } + + void Next() { + ++row_i_; + offset_ = mat_.indptr_[row_i_]; + } + + SparseVecRef val() const { + const float *data = mat_.data_.get() + offset_; + const auto *indices = reinterpret_cast(mat_.indices_.get() + offset_); + i32 nnz = mat_.indptr_[row_i_ + 1] - offset_; + return SparseVecRef{data, indices, nnz}; + } + + i64 row_id() const { return row_i_; } + +private: + const SparseMatrix &mat_; + i64 row_i_{}; + i64 offset_{}; +}; + +} // namespace infinity \ No newline at end of file diff --git a/src/unit_test/storage/knnindex/knn_sparse/test_linscan.cpp b/src/unit_test/storage/knnindex/knn_sparse/test_linscan.cpp new file mode 100644 index 0000000000..abd3908cd3 --- /dev/null +++ b/src/unit_test/storage/knnindex/knn_sparse/test_linscan.cpp @@ -0,0 +1,217 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "unit_test/base_test.h" + +import stl; +import linscan_alg; +import sparse_iter; +import third_party; + +using namespace infinity; + +class LinScanAlgTest : public BaseTest { +protected: + void SetUp() override {} + + void TearDown() override {} + + SparseMatrix GenerateDataset(u32 nrow, u32 ncol, f32 sparsity = 0.05, f32 data_min = -10.0, f32 data_max = 10.0) { + if (sparsity < 0.0 || sparsity > 1.0) { + throw std::runtime_error("Invalid sparsity."); + } + if (data_min >= data_max) { + throw std::runtime_error("Invalid data range."); + } + if (nrow == 0 || ncol == 0) { + throw std::runtime_error("Invalid dimension."); + } + u32 nnz = nrow * ncol * sparsity; + auto data = MakeUnique(nnz); + auto indices = MakeUnique(nnz); + auto indptr = MakeUnique(nrow + 1); + { + std::uniform_real_distribution distrib(data_min, data_max); + for (u32 i = 0; i < nnz; ++i) { + data[i] = distrib(rng_); + } + } + { + indptr[0] = 0; + indptr[nrow] = nnz; + // get nrow - 1 random number between 0 and nnz + for (u32 i = 1; i < nrow; ++i) { + indptr[i] = rng_() % (nnz + 1); + } + std::sort(indptr.get(), indptr.get() + nrow); + } + { + i64 start = indptr[0]; + for (u32 i = 0; i < nrow; ++i) { + i64 end = indptr[i + 1]; + for (i64 j = start; j < end; ++j) { + indices[j] = rng_() % ncol; + } + std::sort(indices.get() + start, indices.get() + end); + start = end; + } + } + return SparseMatrix{std::move(data), std::move(indices), std::move(indptr), nrow, ncol, nnz}; + } + + Pair, UniquePtr> + GenerateGroundtruth(const SparseMatrix &mat, const SparseMatrix &query, u32 topk, bool use_linscan = true) { + if (mat.ncol_ != query.ncol_) { + throw std::runtime_error("Inconsistent dimension."); + } + auto gt_indices = MakeUnique(query.nrow_ * topk); + auto gt_scores = MakeUnique(query.nrow_ * topk); + if (use_linscan) { + LinScan index; + for (auto iter = SparseMatrixIter(mat); iter.HasNext(); iter.Next()) { + SparseVecRef vec = iter.val(); + u32 row_id = iter.row_id(); + index.Insert(vec, row_id); + } + for (auto iter = SparseMatrixIter(query); iter.HasNext(); iter.Next()) { + SparseVecRef query = iter.val(); + auto [indices, scores] = index.Query(query, topk); + u32 query_id = iter.row_id(); + std::copy(indices.begin(), indices.end(), gt_indices.get() + query_id * topk); + std::copy(scores.begin(), scores.end(), gt_scores.get() + query_id * topk); + } + } else { // brute force, only used in linscan test + for (auto iter = SparseMatrixIter(query); iter.HasNext(); iter.Next()) { + SparseVecRef query = iter.val(); + auto [indices, scores] = BruteForceKnn(mat, query, topk); + u32 query_id = iter.row_id(); + std::copy(indices.begin(), indices.end(), gt_indices.get() + query_id * topk); + std::copy(scores.begin(), scores.end(), gt_scores.get() + query_id * topk); + } + } + return {std::move(gt_indices), std::move(gt_scores)}; + } + + bool CheckAccurateKnn(const u32 *gt_indices, + const f32 *gt_scores, + u32 gt_size, + const Vector &indices, + const Vector &scores, + f32 error_bound) { + if (gt_size != indices.size()) { + return false; + } + for (u32 i = 0; i < gt_size; ++i) { + if (std::abs(gt_scores[i] - scores[i]) > error_bound) { + return false; + } + } + return true; + } + + void PrintQuery(u32 query_id, const u32 *gt_indices, const f32 *gt_scores, u32 gt_size, const Vector &indices, const Vector &scores) { + std::cout << fmt::format("Query {}\n", query_id); + for (u32 i = 0; i < gt_size; ++i) { + std::cout << fmt::format("{} {}, ", indices[i], scores[i]); + } + std::cout << "\n"; + for (u32 i = 0; i < gt_size; ++i) { + std::cout << fmt::format("{} {}, ", gt_indices[i], gt_scores[i]); + } + std::cout << "\n"; + } + + // struct ApproximateKnnChecker { + // ApproximateKnnChecker(const Vector >_indices, const Vector >_scores) {} + + // f32 Check() { return 0.0; } + // }; + + struct TestOption { + TestOption() {} + + u32 nrow_{1000}; + u32 ncol_{1000}; + f32 sparsity_{0.05}; + u32 query_n_{50}; + u32 topk_{10}; + f32 error_bound_{1e-6}; + }; + + void TestAccurateScan(const TestOption &option = TestOption()) { + u32 gt_size = std::min(option.nrow_, option.topk_); + + const SparseMatrix dataset = GenerateDataset(option.nrow_, option.ncol_, option.sparsity_); + const SparseMatrix query = GenerateDataset(option.query_n_, option.ncol_, option.sparsity_); + const auto [gt_indices_list, gt_scores_list] = GenerateGroundtruth(dataset, query, option.topk_, false); + + LinScan index; + for (auto iter = SparseMatrixIter(dataset); iter.HasNext(); iter.Next()) { + SparseVecRef vec = iter.val(); + u32 row_id = iter.row_id(); + index.Insert(vec, row_id); + } + + for (auto iter = SparseMatrixIter(query); iter.HasNext(); iter.Next()) { + SparseVecRef query = iter.val(); + auto [indices, scores] = index.Query(query, option.topk_); + u32 query_id = iter.row_id(); + const u32 *gt_indices = gt_indices_list.get() + query_id * option.topk_; + const f32 *gt_scores = gt_scores_list.get() + query_id * option.topk_; + bool ck = CheckAccurateKnn(gt_indices, gt_scores, gt_size, indices, scores, option.error_bound_); + if (!ck) { + PrintQuery(query_id, gt_indices, gt_scores, gt_size, indices, scores); + } + EXPECT_TRUE(ck); + } + } + +private: + Pair, Vector> BruteForceKnn(const SparseMatrix &mat, const SparseVecRef &query, u32 topk) { + Vector scores(mat.nrow_, 0.0); + for (auto iter = SparseMatrixIter(mat); iter.HasNext(); iter.Next()) { + SparseVecRef vec = iter.val(); + u32 row_id = iter.row_id(); + f32 score = 0.0; + for (i32 i = 0; i < query.nnz_; ++i) { + for (i32 j = 0; j < vec.nnz_; ++j) { + if (query.indices_[i] == vec.indices_[j]) { + score += query.data_[i] * vec.data_[j]; + } + } + } + scores[row_id] = score; + } + Vector indices(mat.nrow_); + std::iota(indices.begin(), indices.end(), 0); + u32 result_size = std::min(topk, (u32)mat.nrow_); + std::partial_sort(indices.begin(), indices.begin() + result_size, indices.end(), [&](u32 i, u32 j) { return scores[i] > scores[j]; }); + indices.resize(result_size); + Vector top_scores(result_size); + std::transform(indices.begin(), indices.end(), top_scores.begin(), [&](u32 i) { return scores[i]; }); + return {std::move(indices), std::move(top_scores)}; + } + +private: + std::mt19937 rng_{std::random_device{}()}; +}; + +TEST_F(LinScanAlgTest, accurate_scan) { TestAccurateScan(); } + +TEST_F(LinScanAlgTest, bound_accurate_scan) { + TestOption option; + option.nrow_ = 10; + option.topk_ = 20; + TestAccurateScan(option); +}