diff --git a/include/libp2p/protocol/kademlia/impl/get_value_executor.hpp b/include/libp2p/protocol/kademlia/impl/get_value_executor.hpp index 437beb450..0f1a3fc63 100644 --- a/include/libp2p/protocol/kademlia/impl/get_value_executor.hpp +++ b/include/libp2p/protocol/kademlia/impl/get_value_executor.hpp @@ -67,6 +67,8 @@ namespace libp2p::protocol::kademlia { /// Handles result of connection void onConnected(StreamAndProtocolOrError stream_res); + void finish(); + static std::atomic_size_t instance_number; // Primary diff --git a/src/protocol/kademlia/impl/get_value_executor.cpp b/src/protocol/kademlia/impl/get_value_executor.cpp index 55dfa6813..d1c4ad192 100644 --- a/src/protocol/kademlia/impl/get_value_executor.cpp +++ b/src/protocol/kademlia/impl/get_value_executor.cpp @@ -167,11 +167,16 @@ namespace libp2p::protocol::kademlia { return; } - if (requests_in_progress_ == 0) { + if (requests_in_progress_ != 0) { + return; + } + if (received_records_->empty()) { done_ = true; log_.debug("done"); handler_(Error::VALUE_NOT_FOUND); + return; } + finish(); } void GetValueExecutor::onConnected(StreamAndProtocolOrError stream_res) { @@ -310,41 +315,45 @@ namespace libp2p::protocol::kademlia { received_records_->insert({remote_peer_id, value}); if (received_records_->size() >= config_.valueLookupsQuorum) { - std::vector values; - std::transform(received_records_->begin(), - received_records_->end(), - std::back_inserter(values), - [](auto &record) { return record.value; }); - - auto index_res = validator_->select(key_, values); - if (not index_res.has_value()) { - log_.debug("Can't select best value of {} provided", values.size()); - return; - } - auto &best = values[index_res.value()]; - - // Return result to upstear - done_ = true; - log_.debug("done"); - handler_(best); - - // Inform peer of new value - std::vector addressees; - auto &idx_by_value = received_records_->get(); - for (auto &[peer, value] : idx_by_value) { - if (value != best) { - addressees.emplace_back(peer); - } else { - content_routing_table_->addProvider(key_, peer); - } - } + finish(); + } + } + } - if (not addressees.empty()) { - auto put_value_executor = executor_factory_->createPutValueExecutor( - key_, std::move(value), std::move(addressees)); - [[maybe_unused]] auto res = put_value_executor->start(); - } + void GetValueExecutor::finish() { + std::vector values; + std::transform(received_records_->begin(), + received_records_->end(), + std::back_inserter(values), + [](auto &record) { return record.value; }); + + auto index_res = validator_->select(key_, values); + if (not index_res.has_value()) { + log_.debug("Can't select best value of {} provided", values.size()); + return; + } + auto &best = values[index_res.value()]; + + // Return result to upstear + done_ = true; + log_.debug("done"); + handler_(best); + + // Inform peer of new value + std::vector addressees; + auto &idx_by_value = received_records_->get(); + for (auto &[peer, value] : idx_by_value) { + if (value != best) { + addressees.emplace_back(peer); + } else { + content_routing_table_->addProvider(key_, peer); } } + + if (not addressees.empty()) { + auto put_value_executor = executor_factory_->createPutValueExecutor( + key_, best, std::move(addressees)); + [[maybe_unused]] auto res = put_value_executor->start(); + } } } // namespace libp2p::protocol::kademlia