From 3ae4e82b78c91c4704437c9722b373e0a1ea5e30 Mon Sep 17 00:00:00 2001 From: Buqian Zheng Date: Fri, 19 Jan 2024 16:02:38 +0800 Subject: [PATCH] make sure we do not crash due to uncaught exceptions when we called folly::Future::wait but not trying to get the values; use folly::collect to simplify code Signed-off-by: Buqian Zheng --- src/common/comp/brute_force.cc | 12 ++------ src/common/thread/thread.cc | 18 ++--------- src/index/diskann/diskann.cc | 26 +++------------- src/index/flat/flat.cc | 18 ++--------- src/index/hnsw/hnsw.cc | 28 +++-------------- src/index/ivf/ivf.cc | 18 ++--------- thirdparty/DiskANN/src/aux_utils.cpp | 10 ++---- thirdparty/DiskANN/src/index.cpp | 34 ++++++--------------- thirdparty/DiskANN/src/partition_and_pq.cpp | 8 ++--- thirdparty/DiskANN/src/utils.cpp | 4 +-- 10 files changed, 33 insertions(+), 143 deletions(-) diff --git a/src/common/comp/brute_force.cc b/src/common/comp/brute_force.cc index f54d9795..45107419 100644 --- a/src/common/comp/brute_force.cc +++ b/src/common/comp/brute_force.cc @@ -128,9 +128,7 @@ BruteForce::Search(const DataSetPtr base_dataset, const DataSetPtr query_dataset return Status::success; })); } - for (auto& fut : futs) { - fut.wait(); - auto ret = fut.result().value(); + for (auto& ret : folly::collect(futs).get()) { if (ret != Status::success) { return expected::Err(ret, "failed to brute force search"); } @@ -233,9 +231,7 @@ BruteForce::SearchWithBuf(const DataSetPtr base_dataset, const DataSetPtr query_ return Status::success; })); } - for (auto& fut : futs) { - fut.wait(); - auto ret = fut.result().value(); + for (auto& ret : folly::collect(futs).get()) { RETURN_IF_ERROR(ret); } return Status::success; @@ -348,9 +344,7 @@ BruteForce::RangeSearch(const DataSetPtr base_dataset, const DataSetPtr query_da return Status::success; })); } - for (auto& fut : futs) { - fut.wait(); - auto ret = fut.result().value(); + for (auto& ret : folly::collect(futs).get()) { if (ret != Status::success) { return expected::Err(ret, "failed to brute force search"); } diff --git a/src/common/thread/thread.cc b/src/common/thread/thread.cc index 9d21692b..ae6c12b7 100644 --- a/src/common/thread/thread.cc +++ b/src/common/thread/thread.cc @@ -33,14 +33,7 @@ ExecOverSearchThreadPool(std::vector>& tasks) { })); } std::this_thread::yield(); - // check for exceptions. value() is {}, so either - // a call does nothing, or it throws an inner exception. - for (auto& f : futures) { - f.wait(); - } - for (auto& f : futures) { - f.result().value(); - } + folly::collect(futures).get(); } void @@ -55,14 +48,7 @@ ExecOverBuildThreadPool(std::vector>& tasks) { })); } std::this_thread::yield(); - // check for exceptions. value() is {}, so either - // a call does nothing, or it throws an inner exception. - for (auto& f : futures) { - f.wait(); - } - for (auto& f : futures) { - f.result().value(); - } + folly::collect(futures).get(); } void diff --git a/src/index/diskann/diskann.cc b/src/index/diskann/diskann.cc index 2b13a94d..471baa7e 100644 --- a/src/index/diskann/diskann.cc +++ b/src/index/diskann/diskann.cc @@ -479,8 +479,6 @@ DiskANNIndexNode::Deserialize(const BinarySet& binset, const Config& c std::vector warmup_result_ids_64(warmup_num, 0); std::vector warmup_result_dists(warmup_num, 0); - bool all_searches_are_good = true; - std::vector> futures; futures.reserve(warmup_num); for (_s64 i = 0; i < (int64_t)warmup_num; ++i) { @@ -490,16 +488,12 @@ DiskANNIndexNode::Deserialize(const BinarySet& binset, const Config& c warmup_result_dists.data() + (index * 1), 4); })); } - for (auto& future : futures) { - if (TryDiskANNCall([&]() { future.wait(); }) != Status::success) { - all_searches_are_good = false; - } - } + if (warmup != nullptr) { diskann::aligned_free(warmup); } - if (!all_searches_are_good) { + if (TryDiskANNCall([&]() { folly::collect(futures).get(); }) != Status::success) { LOG_KNOWHERE_ERROR_ << "Failed to do search on warmup file for DiskANN."; return Status::diskann_inner_error; } @@ -545,7 +539,6 @@ DiskANNIndexNode::Search(const DataSet& dataset, const Config& cfg, co auto p_id = new int64_t[k * nq]; auto p_dist = new DistType[k * nq]; - bool all_searches_are_good = true; std::vector> futures; futures.reserve(nq); for (int64_t row = 0; row < nq; ++row) { @@ -559,13 +552,8 @@ DiskANNIndexNode::Search(const DataSet& dataset, const Config& cfg, co #endif })); } - for (auto& future : futures) { - if (TryDiskANNCall([&]() { future.wait(); }) != Status::success) { - all_searches_are_good = false; - } - } - if (!all_searches_are_good) { + if (TryDiskANNCall([&]() { folly::collect(futures).get(); }) != Status::success) { return expected::Err(Status::diskann_inner_error, "some search failed"); } @@ -621,7 +609,6 @@ DiskANNIndexNode::RangeSearch(const DataSet& dataset, const Config& cf std::vector> futures; futures.reserve(nq); - bool all_searches_are_good = true; for (int64_t row = 0; row < nq; ++row) { futures.emplace_back(search_pool_->push([&, index = row]() { std::vector indices; @@ -639,12 +626,7 @@ DiskANNIndexNode::RangeSearch(const DataSet& dataset, const Config& cf } })); } - for (auto& future : futures) { - if (TryDiskANNCall([&]() { future.wait(); }) != Status::success) { - all_searches_are_good = false; - } - } - if (!all_searches_are_good) { + if (TryDiskANNCall([&]() { folly::collect(futures).get(); }) != Status::success) { return expected::Err(Status::diskann_inner_error, "some search failed"); } diff --git a/src/index/flat/flat.cc b/src/index/flat/flat.cc index dc086fda..bb3ccf6a 100644 --- a/src/index/flat/flat.cc +++ b/src/index/flat/flat.cc @@ -127,14 +127,7 @@ class FlatIndexNode : public IndexNode { })); } // wait for the completion - for (auto& fut : futs) { - fut.wait(); - } - // check for exceptions. value() is {}, so either - // a call does nothing, or it throws an inner exception. - for (auto& fut : futs) { - fut.result().value(); - } + folly::collect(futs).get(); } catch (const std::exception& e) { std::unique_ptr auto_delete_ids(ids); std::unique_ptr auto_delete_dis(distances); @@ -216,14 +209,7 @@ class FlatIndexNode : public IndexNode { })); } // wait for the completion - for (auto& fut : futs) { - fut.wait(); - } - // check for exceptions. value() is {}, so either - // a call does nothing, or it throws an inner exception. - for (auto& fut : futs) { - fut.result().value(); - } + folly::collect(futs).get(); GetRangeSearchResult(result_dist_array, result_id_array, is_ip, nq, radius, range_filter, distances, ids, lims); } catch (const std::exception& e) { diff --git a/src/index/hnsw/hnsw.cc b/src/index/hnsw/hnsw.cc index f5c53e47..c24db375 100644 --- a/src/index/hnsw/hnsw.cc +++ b/src/index/hnsw/hnsw.cc @@ -126,13 +126,7 @@ class HnswIndexNode : public IndexNode { } })); } - for (auto& future : futures) { - future.wait(); - } - // check for exceptions - for (auto& future : futures) { - future.result().value(); - } + folly::collect(futures).get(); futures.clear(); } @@ -146,13 +140,7 @@ class HnswIndexNode : public IndexNode { futures.emplace_back( build_pool->push([&, idx = i]() { index_->repairGraphConnectivity(unreached[idx]); })); } - for (auto& future : futures) { - future.wait(); - } - // check for exceptions - for (auto& future : futures) { - future.result().value(); - } + folly::collect(futures).get(); } build_time.RecordSection("graph repair"); LOG_KNOWHERE_INFO_ << "HNSW built with #points num:" << index_->max_elements_ << " #M:" << index_->M_ @@ -213,9 +201,7 @@ class HnswIndexNode : public IndexNode { } })); } - for (auto& fut : futs) { - fut.wait(); - } + folly::collect(futs).get(); auto res = GenResultDataSet(nq, k, p_id, p_dist); @@ -300,9 +286,7 @@ class HnswIndexNode : public IndexNode { })); } // wait for initial search(in top layers and search for seed_ef in base layer) to finish - for (auto& fut : futs) { - fut.wait(); - } + folly::collect(futs).get(); return vec; } @@ -365,9 +349,7 @@ class HnswIndexNode : public IndexNode { } })); } - for (auto& fut : futs) { - fut.wait(); - } + folly::collect(futs).get(); // filter range search result GetRangeSearchResult(result_dist_array, result_id_array, is_ip, nq, radius_for_filter, range_filter, dis, ids, diff --git a/src/index/ivf/ivf.cc b/src/index/ivf/ivf.cc index c0155080..fb262bd2 100644 --- a/src/index/ivf/ivf.cc +++ b/src/index/ivf/ivf.cc @@ -589,14 +589,7 @@ IvfIndexNode::Search(const DataSet& dataset, const Config& })); } // wait for the completion - for (auto& fut : futs) { - fut.wait(); - } - // check for exceptions. value() is {}, so either - // a call does nothing, or it throws an inner exception. - for (auto& fut : futs) { - fut.result().value(); - } + folly::collect(futs).get(); } catch (const std::exception& e) { delete[] ids; delete[] distances; @@ -718,14 +711,7 @@ IvfIndexNode::RangeSearch(const DataSet& dataset, const Con })); } // wait for the completion - for (auto& fut : futs) { - fut.wait(); - } - // check for exceptions. value() is {}, so either - // a call does nothing, or it throws an inner exception. - for (auto& fut : futs) { - fut.result().value(); - } + folly::collect(futs).get(); GetRangeSearchResult(result_dist_array, result_id_array, is_ip, nq, radius, range_filter, distances, ids, lims); } catch (const std::exception& e) { LOG_KNOWHERE_WARNING_ << "faiss inner error: " << e.what(); diff --git a/thirdparty/DiskANN/src/aux_utils.cpp b/thirdparty/DiskANN/src/aux_utils.cpp index 8f409925..0390fee9 100644 --- a/thirdparty/DiskANN/src/aux_utils.cpp +++ b/thirdparty/DiskANN/src/aux_utils.cpp @@ -496,7 +496,7 @@ namespace diskann { paras.Set("saturate_graph", 1); paras.Set("save_path", mem_index_path); paras.Set("accelerate_build", accelerate_build); - paras.Set("shuffle_build", shuffle_build); + paras.Set("shuffle_build", shuffle_build); std::unique_ptr> _pvamanaIndex = std::unique_ptr>(new diskann::Index( @@ -745,9 +745,7 @@ namespace diskann { })); } - for (auto &future : futures) { - future.wait(); - } + folly::collect(futures).get(); std::sort(node_count_list.begin(), node_count_list.end(), [](std::pair<_u32, _u32> &a, std::pair<_u32, _u32> &b) { @@ -800,9 +798,7 @@ namespace diskann { stats + index); })); } - for (auto &future : futures) { - future.wait(); - } + folly::collect(futures).get(); auto e = std::chrono::high_resolution_clock::now(); std::chrono::duration diff = e - s; double qps = diff --git a/thirdparty/DiskANN/src/index.cpp b/thirdparty/DiskANN/src/index.cpp index e631bbfc..a1c6bcb3 100644 --- a/thirdparty/DiskANN/src/index.cpp +++ b/thirdparty/DiskANN/src/index.cpp @@ -816,9 +816,7 @@ namespace diskann { } })); } - for (auto &future : futures) { - future.wait(); - } + folly::collect(futures).get(); // find imin unsigned min_idx = 0; float min_dist = distances[0]; @@ -1343,7 +1341,7 @@ namespace diskann { _indexingRange = parameters.Get("R"); _indexingMaxC = parameters.Get("C"); const bool accelerate_build = parameters.Get("accelerate_build"); - const bool shuffle_build = parameters.Get("shuffle_build"); + const bool shuffle_build = parameters.Get("shuffle_build"); const float last_round_alpha = parameters.Get("alpha"); unsigned L = _indexingQueueSize; @@ -1483,9 +1481,7 @@ namespace diskann { prune_neighbors(node, pool, pruned_list); })); } - for (auto &future : futures) { - future.wait(); - } + folly::collect(futures).get(); diff = std::chrono::high_resolution_clock::now() - s; sync_time += diff.count(); @@ -1509,9 +1505,7 @@ namespace diskann { } })); } - for (auto &future : futures) { - future.wait(); - } + folly::collect(futures).get(); s = std::chrono::high_resolution_clock::now(); futures.clear(); @@ -1532,9 +1526,7 @@ namespace diskann { } })); } - for (auto &future : futures) { - future.wait(); - } + folly::collect(futures).get(); futures.clear(); for (_s64 node_ctr = 0; node_ctr < (_s64) (visit_order.size()); @@ -1567,9 +1559,7 @@ namespace diskann { })); } } - for (auto &future : futures) { - future.wait(); - } + folly::collect(futures).get(); diff = std::chrono::high_resolution_clock::now() - s; inter_time += diff.count(); @@ -1638,9 +1628,7 @@ namespace diskann { })); } } - for (auto &future : futures) { - future.wait(); - } + folly::collect(futures).get(); if (_nd > 0) { LOG_KNOWHERE_DEBUG_ << "final cleanup done. Link time: " << ((double) link_timer.elapsed() / (double) 1000000) @@ -1683,9 +1671,7 @@ namespace diskann { } } } - for (auto &future : futures) { - future.wait(); - } + folly::collect(futures).get(); diskann::cout << "Prune time : " << timer.elapsed() / 1000 << "ms" << std::endl; @@ -2385,9 +2371,7 @@ namespace diskann { })); } - for (auto &future : futures) { - future.wait(); - } + folly::collect(futures).get(); if (_support_eager_delete) update_in_graph(); diff --git a/thirdparty/DiskANN/src/partition_and_pq.cpp b/thirdparty/DiskANN/src/partition_and_pq.cpp index c8078a22..9c70d374 100644 --- a/thirdparty/DiskANN/src/partition_and_pq.cpp +++ b/thirdparty/DiskANN/src/partition_and_pq.cpp @@ -355,9 +355,7 @@ int generate_pq_pivots(const float *passed_train_data, size_t num_train, } })); } - for (auto &future : futures) { - future.wait(); - } + folly::collect(futures).get(); diskann::save_bin(pq_pivots_path.c_str(), full_pivot_data.get(), (size_t) num_centers, dim); @@ -574,9 +572,7 @@ int generate_pq_data_from_pivots(const std::string data_file, } })); } - for (auto &future : futures) { - future.wait(); - } + folly::collect(futures).get(); if (num_centers > 256) { compressed_file_writer.write( diff --git a/thirdparty/DiskANN/src/utils.cpp b/thirdparty/DiskANN/src/utils.cpp index 989b8ca8..a742c5a6 100644 --- a/thirdparty/DiskANN/src/utils.cpp +++ b/thirdparty/DiskANN/src/utils.cpp @@ -23,9 +23,7 @@ namespace diskann { } })); } - for (auto& future : futures) { - future.wait(); - } + folly::collect(futures).get(); writr.write((char*) read_buf, npts * ndims * sizeof(float)); }