diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 901d4ada2a..47f3c170ad 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -879,6 +879,7 @@ TEST (network, replace_port) nano::system system; nano::node_flags node_flags; node_flags.disable_udp = false; + node_flags.disable_ongoing_telemetry_requests = true; auto node0 = system.add_node (node_flags); ASSERT_EQ (0, node0->network.size ()); auto node1 (std::make_shared (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags)); diff --git a/nano/core_test/node_telemetry.cpp b/nano/core_test/node_telemetry.cpp index c42b396cce..2996950271 100644 --- a/nano/core_test/node_telemetry.cpp +++ b/nano/core_test/node_telemetry.cpp @@ -263,38 +263,28 @@ TEST (node_telemetry, no_peers) nano::system system (1); std::atomic done{ false }; - system.nodes[0]->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { - ASSERT_TRUE (responses_a.telemetry_datas.empty ()); - ASSERT_FALSE (responses_a.all_received); - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } + auto responses = system.nodes[0]->telemetry->get_metrics (); + ASSERT_TRUE (responses.empty ()); } -namespace nano -{ TEST (node_telemetry, basic) { nano::system system; - nano::node_flags node_flags; - node_flags.disable_ongoing_telemetry_requests = true; - auto node_client = system.add_node (node_flags); - auto node_server = system.add_node (node_flags); + auto node_client = system.add_node (); + auto node_server = system.add_node (); wait_peer_connections (system); // Request telemetry metrics - std::unordered_map all_telemetry_datas; + nano::telemetry_data telemetry_data; + auto server_endpoint = node_server->network.endpoint (); + auto channel = node_client->network.find_channel (node_server->network.endpoint ()); { std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) { - ASSERT_TRUE (responses_a.all_received); - all_telemetry_datas = responses_a.telemetry_datas; + node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &server_endpoint, &telemetry_data](nano::telemetry_data_response const & response_a) { + ASSERT_FALSE (response_a.error); + ASSERT_EQ (server_endpoint, response_a.endpoint); + telemetry_data = response_a.telemetry_data; done = true; }); @@ -306,15 +296,14 @@ TEST (node_telemetry, basic) } // Check the metrics are correct - ASSERT_EQ (all_telemetry_datas.size (), 1); - compare_default_test_result_data (all_telemetry_datas.begin ()->second, *node_server); + compare_default_test_result_data (telemetry_data, *node_server); // Call again straight away. It should use the cache { std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) { - ASSERT_EQ (all_telemetry_datas, responses_a.telemetry_datas); - ASSERT_TRUE (responses_a.all_received); + node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { + ASSERT_EQ (telemetry_data, response_a.telemetry_data); + ASSERT_FALSE (response_a.error); done = true; }); @@ -329,9 +318,9 @@ TEST (node_telemetry, basic) std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test); std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) { - ASSERT_NE (all_telemetry_datas, responses_a.telemetry_datas); - ASSERT_TRUE (responses_a.all_received); + node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { + ASSERT_NE (telemetry_data, response_a.telemetry_data); + ASSERT_FALSE (response_a.error); done = true; }); @@ -341,19 +330,20 @@ TEST (node_telemetry, basic) ASSERT_NO_ERROR (system.poll ()); } } -} TEST (node_telemetry, many_nodes) { nano::system system; // The telemetry responses can timeout if using a large number of nodes under sanitizers, so lower the number. - const auto num_nodes = (is_sanitizer_build || nano::running_within_valgrind ()) ? 4 : 10; + const auto num_nodes = (is_sanitizer_build || nano::running_within_valgrind ()) ? 4 : 10; //3; // 10; + nano::node_flags node_flags; + node_flags.disable_ongoing_telemetry_requests = true; for (auto i = 0; i < num_nodes; ++i) { nano::node_config node_config (nano::get_available_port (), system.logging); // Make a metric completely different for each node so we can check afterwards that there are no duplicates node_config.bandwidth_limit = 100000 + i; - system.add_node (node_config); + system.add_node (node_config, node_flags); } wait_peer_connections (system); @@ -371,25 +361,32 @@ TEST (node_telemetry, many_nodes) // This is the node which will request metrics from all other nodes auto node_client = system.nodes.front (); - std::atomic done{ false }; - std::unordered_map all_telemetry_datas; - node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) { - ASSERT_TRUE (responses_a.all_received); - all_telemetry_datas = responses_a.telemetry_datas; - done = true; - }); + std::mutex mutex; + std::vector telemetry_datas; + auto peers = node_client->network.list (num_nodes - 1); + ASSERT_EQ (peers.size (), num_nodes - 1); + for (auto const & peer : peers) + { + node_client->telemetry->get_metrics_single_peer_async (peer, [&telemetry_datas, &mutex](nano::telemetry_data_response const & response_a) { + ASSERT_FALSE (response_a.error); + nano::lock_guard guard (mutex); + telemetry_datas.push_back (response_a.telemetry_data); + }); + } system.deadline_set (20s); - while (!done) + nano::unique_lock lk (mutex); + while (telemetry_datas.size () != num_nodes - 1) { + lk.unlock (); ASSERT_NO_ERROR (system.poll ()); + lk.lock (); } // Check the metrics nano::network_params params; - for (auto & telemetry_data : all_telemetry_datas) + for (auto & data : telemetry_datas) { - auto & data = telemetry_data.second; ASSERT_EQ (data.unchecked_count, 0); ASSERT_EQ (data.cemented_count, 1); ASSERT_LE (data.peer_count, 9); @@ -408,10 +405,10 @@ TEST (node_telemetry, many_nodes) } // We gave some nodes different bandwidth caps, confirm they are not all the same - auto bandwidth_cap = all_telemetry_datas.begin ()->second.bandwidth_cap; - all_telemetry_datas.erase (all_telemetry_datas.begin ()); - auto all_bandwidth_limits_same = std::all_of (all_telemetry_datas.begin (), all_telemetry_datas.end (), [bandwidth_cap](auto & telemetry_data) { - return telemetry_data.second.bandwidth_cap == bandwidth_cap; + auto bandwidth_cap = telemetry_datas.front ().bandwidth_cap; + telemetry_datas.erase (telemetry_datas.begin ()); + auto all_bandwidth_limits_same = std::all_of (telemetry_datas.begin (), telemetry_datas.end (), [bandwidth_cap](auto & telemetry_data) { + return telemetry_data.bandwidth_cap == bandwidth_cap; }); ASSERT_FALSE (all_bandwidth_limits_same); } @@ -423,7 +420,7 @@ TEST (node_telemetry, receive_from_non_listening_channel) nano::telemetry_ack message (nano::telemetry_data{}); node->network.process_message (message, node->network.udp_channels.create (node->network.endpoint ())); // We have not sent a telemetry_req message to this endpoint, so shouldn't count telemetry_ack received from it. - ASSERT_EQ (node->telemetry.telemetry_data_size (), 0); + ASSERT_EQ (node->telemetry->telemetry_data_size (), 0); } TEST (node_telemetry, over_udp) @@ -438,10 +435,10 @@ TEST (node_telemetry, over_udp) wait_peer_connections (system); std::atomic done{ false }; - std::unordered_map all_telemetry_datas; - node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) { - ASSERT_TRUE (responses_a.all_received); - all_telemetry_datas = responses_a.telemetry_datas; + auto channel = node_client->network.find_channel (node_server->network.endpoint ()); + node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &node_server](nano::telemetry_data_response const & response_a) { + ASSERT_FALSE (response_a.error); + compare_default_test_result_data (response_a.telemetry_data, *node_server); done = true; }); @@ -451,9 +448,6 @@ TEST (node_telemetry, over_udp) ASSERT_NO_ERROR (system.poll ()); } - ASSERT_EQ (all_telemetry_datas.size (), 1); - compare_default_test_result_data (all_telemetry_datas.begin ()->second, *node_server); - // Check channels are indeed udp ASSERT_EQ (1, node_client->network.size ()); auto list1 (node_client->network.list (2)); @@ -465,77 +459,7 @@ TEST (node_telemetry, over_udp) ASSERT_EQ (nano::transport::transport_type::udp, list2[0]->get_type ()); } -namespace nano -{ -TEST (node_telemetry, single_request) -{ - nano::system system; - nano::node_flags node_flags; - node_flags.disable_ongoing_telemetry_requests = true; - - auto node_client = system.add_node (node_flags); - auto node_server = system.add_node (node_flags); - - wait_peer_connections (system); - - // Request telemetry metrics - auto channel = node_client->network.find_channel (node_server->network.endpoint ()); - nano::telemetry_data telemetry_data; - { - std::atomic done{ false }; - - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data, &channel](nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.error); - ASSERT_EQ (channel->get_endpoint (), response_a.endpoint); - telemetry_data = response_a.telemetry_data; - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - } - - // Check the metrics are correct - compare_default_test_result_data (telemetry_data, *node_server); - - // Call again straight away. It should use the cache - { - std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { - ASSERT_EQ (telemetry_data, response_a.telemetry_data); - ASSERT_FALSE (response_a.error); - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - } - - // Wait the cache period and check cache is not used - std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test); - - std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { - ASSERT_NE (telemetry_data, response_a.telemetry_data); - ASSERT_FALSE (response_a.error); - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } -} -} - -TEST (node_telemetry, single_request_invalid_channel) +TEST (node_telemetry, invalid_channel) { nano::system system (2); @@ -543,7 +467,7 @@ TEST (node_telemetry, single_request_invalid_channel) auto node_server = system.nodes.back (); std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (nullptr, [&done](nano::telemetry_data_response const & response_a) { + node_client->telemetry->get_metrics_single_peer_async (nullptr, [&done](nano::telemetry_data_response const & response_a) { ASSERT_TRUE (response_a.error); done = true; }); @@ -555,7 +479,7 @@ TEST (node_telemetry, single_request_invalid_channel) } } -TEST (node_telemetry, blocking_single_and_random) +TEST (node_telemetry, blocking_request) { nano::system system (2); @@ -584,104 +508,15 @@ TEST (node_telemetry, blocking_single_and_random) system.deadline_set (10s); node_client->worker.push_task (call_system_poll); - // Blocking version of get_random_metrics_async - auto telemetry_data_responses = node_client->telemetry.get_metrics_peers (); - ASSERT_TRUE (telemetry_data_responses.all_received); - compare_default_test_result_data (telemetry_data_responses.telemetry_datas.begin ()->second, *node_server); - // Now try single request metric - auto telemetry_data_response = node_client->telemetry.get_metrics_single_peer (node_client->network.find_channel (node_server->network.endpoint ())); + auto telemetry_data_response = node_client->telemetry->get_metrics_single_peer (node_client->network.find_channel (node_server->network.endpoint ())); ASSERT_FALSE (telemetry_data_response.error); compare_default_test_result_data (telemetry_data_response.telemetry_data, *node_server); - ASSERT_EQ (*telemetry_data_response.telemetry_data.timestamp, *telemetry_data_responses.telemetry_datas.begin ()->second.timestamp); done = true; promise.get_future ().wait (); } -namespace nano -{ -TEST (node_telemetry, multiple_single_request_clearing) -{ - nano::system system (2); - - auto node_client = system.nodes.front (); - auto node_server = system.nodes.back (); - - nano::node_config node_config (nano::get_available_port (), system.logging); - node_config.bandwidth_limit = 100000; - auto node_server1 = system.add_node (node_config); - - wait_peer_connections (system); - - // Request telemetry metrics - auto channel = node_client->network.find_channel (node_server->network.endpoint ()); - - std::atomic done{ false }; - std::chrono::system_clock::time_point last_updated; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &last_updated](nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.error); - last_updated = *response_a.telemetry_data.timestamp; - done = true; - }); - - ASSERT_EQ (1, node_client->telemetry.single_requests.size ()); - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - - done = false; - // Make another request to keep the time updated - system.deadline_set (10s); - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, last_updated](nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.error); - ASSERT_EQ (last_updated, *response_a.telemetry_data.timestamp); - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - - done = false; - auto channel1 = node_client->network.find_channel (node_server1->network.endpoint ()); - node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, &last_updated](nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.error); - ASSERT_NE (last_updated, *response_a.telemetry_data.timestamp); - last_updated = *response_a.telemetry_data.timestamp; - done = true; - }); - - system.deadline_set (10s); - - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - - done = false; - node_client->telemetry.get_metrics_single_peer_async (channel1, [&done, last_updated](nano::telemetry_data_response const & response_a) { - ASSERT_FALSE (response_a.error); - ASSERT_EQ (last_updated, *response_a.telemetry_data.timestamp); - done = true; - }); - - // single_requests should be removed as no more calls are being back - system.deadline_set (10s); - nano::unique_lock lk (node_client->telemetry.mutex); - while (!node_client->telemetry.single_requests.empty () || !done) - { - lk.unlock (); - ASSERT_NO_ERROR (system.poll ()); - lk.lock (); - } -} -} - TEST (node_telemetry, disconnects) { nano::system system (2); @@ -697,19 +532,7 @@ TEST (node_telemetry, disconnects) ASSERT_TRUE (channel); std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { - ASSERT_FALSE (responses_a.all_received); - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - - done = false; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) { + node_client->telemetry->get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) { ASSERT_TRUE (response_a.error); done = true; }); @@ -721,7 +544,7 @@ TEST (node_telemetry, disconnects) } } -TEST (node_telemetry, batch_use_single_request_cache) +TEST (node_telemetry, all_peers_use_single_request_cache) { nano::system system; nano::node_flags node_flags; @@ -736,7 +559,7 @@ TEST (node_telemetry, batch_use_single_request_cache) { std::atomic done{ false }; auto channel = node_client->network.find_channel (node_server->network.endpoint ()); - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { + node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { telemetry_data = response_a.telemetry_data; done = true; }); @@ -748,20 +571,8 @@ TEST (node_telemetry, batch_use_single_request_cache) } } - { - std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done, &telemetry_data](nano::telemetry_data_responses const & responses_a) { - ASSERT_TRUE (responses_a.all_received); - ASSERT_EQ (telemetry_data, responses_a.telemetry_datas.begin ()->second); - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - } + auto responses = node_client->telemetry->get_metrics (); + ASSERT_EQ (telemetry_data, responses.begin ()->second); // Confirm only 1 request was made ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); @@ -771,54 +582,17 @@ TEST (node_telemetry, batch_use_single_request_cache) ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); - // Wait until there is something pending - system.deadline_set (10s); - while (node_client->telemetry.finished_single_requests_size () == 0) - { - ASSERT_NO_ERROR (system.poll ()); - } - std::this_thread::sleep_for (nano::telemetry_cache_cutoffs::test); - system.deadline_set (10s); - std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { - ASSERT_EQ (1, responses_a.telemetry_datas.size ()); - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - - ASSERT_EQ (0, node_client->telemetry.finished_single_requests_size ()); - ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); - ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); - ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); - ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); - ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); - ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); -} - -TEST (node_telemetry, single_request_use_batch_cache) -{ - nano::system system (2); + // Should be empty + responses = node_client->telemetry->get_metrics (); + ASSERT_TRUE (responses.empty ()); - auto node_client = system.nodes.front (); - auto node_server = system.nodes.back (); - - wait_peer_connections (system); - - // Request batched metric first - std::unordered_map all_telemetry_datas; { std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done, &all_telemetry_datas](nano::telemetry_data_responses const & responses_a) { - ASSERT_TRUE (responses_a.all_received); - ASSERT_EQ (1, responses_a.telemetry_datas.size ()); - all_telemetry_datas = responses_a.telemetry_datas; + auto channel = node_client->network.find_channel (node_server->network.endpoint ()); + node_client->telemetry->get_metrics_single_peer_async (channel, [&done, &telemetry_data](nano::telemetry_data_response const & response_a) { + telemetry_data = response_a.telemetry_data; done = true; }); @@ -829,26 +603,14 @@ TEST (node_telemetry, single_request_use_batch_cache) } } - std::atomic done{ false }; - auto channel = node_client->network.find_channel (node_server->network.endpoint ()); - node_client->telemetry.get_metrics_single_peer_async (channel, [&done, &all_telemetry_datas](nano::telemetry_data_response const & response_a) { - ASSERT_EQ (all_telemetry_datas.begin ()->second, response_a.telemetry_data); - ASSERT_FALSE (response_a.error); - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } + responses = node_client->telemetry->get_metrics (); + ASSERT_EQ (telemetry_data, responses.begin ()->second); - // Confirm only 1 request was made - ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); ASSERT_EQ (0, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); - ASSERT_EQ (1, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); + ASSERT_EQ (2, node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); - ASSERT_EQ (1, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); + ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); ASSERT_EQ (0, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); } @@ -947,7 +709,7 @@ TEST (node_telemetry, dos_udp) } } -TEST (node_telemetry, disable_metrics_single) +TEST (node_telemetry, disable_metrics) { nano::system system (1); auto node_client = system.nodes.front (); @@ -962,7 +724,7 @@ TEST (node_telemetry, disable_metrics_single) ASSERT_TRUE (channel); std::atomic done{ false }; - node_client->telemetry.get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) { + node_client->telemetry->get_metrics_single_peer_async (channel, [&done](nano::telemetry_data_response const & response_a) { ASSERT_TRUE (response_a.error); done = true; }); @@ -976,7 +738,7 @@ TEST (node_telemetry, disable_metrics_single) // It should still be able to receive metrics though done = false; auto channel1 = node_server->network.find_channel (node_client->network.endpoint ()); - node_server->telemetry.get_metrics_single_peer_async (channel1, [&done, node_server](nano::telemetry_data_response const & response_a) { + node_server->telemetry->get_metrics_single_peer_async (channel1, [&done, node_server](nano::telemetry_data_response const & response_a) { ASSERT_FALSE (response_a.error); compare_default_test_result_data (response_a.telemetry_data, *node_server); done = true; @@ -989,47 +751,6 @@ TEST (node_telemetry, disable_metrics_single) } } -TEST (node_telemetry, disable_metrics_batch) -{ - nano::system system (1); - auto node_client = system.nodes.front (); - nano::node_flags node_flags; - node_flags.disable_providing_telemetry_metrics = true; - auto node_server = system.add_node (node_flags); - - wait_peer_connections (system); - - // Try and request metrics from a node which is turned off but a channel is not closed yet - auto channel = node_client->network.find_channel (node_server->network.endpoint ()); - ASSERT_TRUE (channel); - - std::atomic done{ false }; - node_client->telemetry.get_metrics_peers_async ([&done](nano::telemetry_data_responses const & responses_a) { - ASSERT_FALSE (responses_a.all_received); - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } - - // It should still be able to receive metrics though - done = false; - node_server->telemetry.get_metrics_peers_async ([&done, node_server](nano::telemetry_data_responses const & responses_a) { - ASSERT_TRUE (responses_a.all_received); - compare_default_test_result_data (responses_a.telemetry_datas.begin ()->second, *node_server); - done = true; - }); - - system.deadline_set (10s); - while (!done) - { - ASSERT_NO_ERROR (system.poll ()); - } -} - namespace { void wait_peer_connections (nano::system & system_a) diff --git a/nano/core_test/testutil.hpp b/nano/core_test/testutil.hpp index 24202a4ccc..348e88cc70 100644 --- a/nano/core_test/testutil.hpp +++ b/nano/core_test/testutil.hpp @@ -187,9 +187,14 @@ namespace util return val; } + void increment_required_count () + { + ++required_count; + } + private: std::atomic count{ 0 }; - unsigned required_count; + std::atomic required_count; }; } diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 80a5bc5b28..a163925a77 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -3977,29 +3977,36 @@ void nano::json_handler::telemetry () if (!ec) { debug_assert (channel); - node.telemetry.get_metrics_single_peer_async (channel, [rpc_l](auto const & telemetry_response_a) { - if (!telemetry_response_a.error) - { - nano::jsonconfig config_l; - auto err = telemetry_response_a.telemetry_data.serialize_json (config_l); - auto const & ptree = config_l.get_tree (); - - if (!err) + if (node.telemetry) + { + node.telemetry->get_metrics_single_peer_async (channel, [rpc_l](auto const & telemetry_response_a) { + if (!telemetry_response_a.error) { - rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ()); + nano::jsonconfig config_l; + auto err = telemetry_response_a.telemetry_data.serialize_json (config_l); + auto const & ptree = config_l.get_tree (); + + if (!err) + { + rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ()); + } + else + { + rpc_l->ec = nano::error_rpc::generic; + } } else { rpc_l->ec = nano::error_rpc::generic; } - } - else - { - rpc_l->ec = nano::error_rpc::generic; - } - rpc_l->response_errors (); - }); + rpc_l->response_errors (); + }); + } + else + { + response_errors (); + } } else { @@ -4012,11 +4019,13 @@ void nano::json_handler::telemetry () // setting "raw" to true returns metrics from all nodes requested. auto raw = request.get_optional ("raw"); auto output_raw = raw.value_or (false); - node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](telemetry_data_responses const & telemetry_responses_a) { + if (node.telemetry) + { + auto telemetry_responses = node.telemetry->get_metrics (); if (output_raw) { boost::property_tree::ptree metrics; - for (auto & telemetry_metrics : telemetry_responses_a.telemetry_datas) + for (auto & telemetry_metrics : telemetry_responses) { nano::jsonconfig config_l; auto err = telemetry_metrics.second.serialize_json (config_l); @@ -4028,18 +4037,18 @@ void nano::json_handler::telemetry () } else { - rpc_l->ec = nano::error_rpc::generic; + ec = nano::error_rpc::generic; } } - rpc_l->response_l.put_child ("metrics", metrics); + response_l.put_child ("metrics", metrics); } else { nano::jsonconfig config_l; std::vector telemetry_datas; - telemetry_datas.reserve (telemetry_responses_a.telemetry_datas.size ()); - std::transform (telemetry_responses_a.telemetry_datas.begin (), telemetry_responses_a.telemetry_datas.end (), std::back_inserter (telemetry_datas), [](auto const & endpoint_telemetry_data) { + telemetry_datas.reserve (telemetry_responses.size ()); + std::transform (telemetry_responses.begin (), telemetry_responses.end (), std::back_inserter (telemetry_datas), [](auto const & endpoint_telemetry_data) { return endpoint_telemetry_data.second; }); @@ -4049,16 +4058,16 @@ void nano::json_handler::telemetry () if (!err) { - rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ()); + response_l.insert (response_l.begin (), ptree.begin (), ptree.end ()); } else { - rpc_l->ec = nano::error_rpc::generic; + ec = nano::error_rpc::generic; } } + } - rpc_l->response_errors (); - }); + response_errors (); } } diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 26a4c98b77..aba02256d9 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -489,7 +489,10 @@ class network_message_visitor : public nano::message_visitor node.logger.try_log (boost::str (boost::format ("Received telemetry_ack message from %1%") % channel->to_string ())); } node.stats.inc (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in); - node.telemetry.add (message_a.data, channel->get_endpoint (), message_a.is_empty_payload ()); + if (node.telemetry) + { + node.telemetry->set (message_a.data, channel->get_endpoint (), message_a.is_empty_payload ()); + } } nano::node & node; std::shared_ptr channel; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index c921d78fd8..f6381fd4db 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -126,7 +126,7 @@ gap_cache (*this), ledger (store, stats, flags_a.generate_cache), checker (config.signature_checker_threads), network (*this, config.peering_port), -telemetry (network, alarm, worker, flags.disable_ongoing_telemetry_requests), +telemetry (std::make_shared (network, alarm, worker, flags.disable_ongoing_telemetry_requests)), bootstrap_initiator (*this), bootstrap (config.peering_port, *this), application_path (application_path_a), @@ -153,6 +153,8 @@ startup_time (std::chrono::steady_clock::now ()) { if (!init_error ()) { + telemetry->start (); + if (config.websocket_config.enabled) { auto endpoint_l (nano::tcp_endpoint (boost::asio::ip::make_address_v6 (config.websocket_config.address), config.websocket_config.port)); @@ -586,7 +588,10 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator")); composite->add_component (collect_container_info (node.bootstrap, "bootstrap")); composite->add_component (collect_container_info (node.network, "network")); - composite->add_component (collect_container_info (node.telemetry, "telemetry")); + if (node.telemetry) + { + composite->add_component (collect_container_info (*node.telemetry, "telemetry")); + } composite->add_component (collect_container_info (node.observers, "observers")); composite->add_component (collect_container_info (node.wallets, "wallets")); composite->add_component (collect_container_info (node.vote_processor, "vote_processor")); @@ -699,7 +704,11 @@ void nano::node::stop () active.stop (); confirmation_height_processor.stop (); network.stop (); - telemetry.stop (); + if (telemetry) + { + telemetry->stop (); + telemetry = nullptr; + } if (websocket_server) { websocket_server->stop (); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 7431619fef..c45c90fa1a 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -167,7 +167,7 @@ class node final : public std::enable_shared_from_this nano::ledger ledger; nano::signature_checker checker; nano::network network; - nano::telemetry telemetry; + std::shared_ptr telemetry; nano::bootstrap_initiator bootstrap_initiator; nano::bootstrap_listener bootstrap; boost::filesystem::path application_path; diff --git a/nano/node/telemetry.cpp b/nano/node/telemetry.cpp index a9e506610a..efbd39a413 100644 --- a/nano/node/telemetry.cpp +++ b/nano/node/telemetry.cpp @@ -17,172 +17,131 @@ nano::telemetry::telemetry (nano::network & network_a, nano::alarm & alarm_a, na network (network_a), alarm (alarm_a), worker (worker_a), -batch_request (std::make_shared (network, alarm, worker)) +disable_ongoing_requests (disable_ongoing_requests_a) { - // Before callbacks are called with the batch request, check if any of the single request data can be appended to give - batch_request->pre_callback_callback = [this](std::unordered_map & datas_a, std::mutex & mutex_a) { - nano::lock_guard guard (this->mutex); - for (auto & single_request : single_requests) - { - nano::lock_guard guard (single_request.second.impl->mutex); - if (!single_request.second.impl->cached_telemetry_data.empty ()) - { - nano::lock_guard batch_request_guard (mutex_a); - auto it = this->batch_request->cached_telemetry_data.find (single_request.first); - if (it != this->batch_request->cached_telemetry_data.cend () && single_request.second.last_updated > it->second.last_updated) - { - it->second = single_request.second.impl->cached_telemetry_data.begin ()->second; - } - else - { - datas_a.emplace (single_request.first, single_request.second.impl->cached_telemetry_data.begin ()->second.data); - } - } - } - - for (auto & pending : finished_single_requests) - { - nano::lock_guard batch_request_guard (mutex_a); - auto it = this->batch_request->cached_telemetry_data.find (pending.first); - if (it != this->batch_request->cached_telemetry_data.cend () && pending.second.last_updated > it->second.last_updated) - { - it->second = pending.second; - } - else - { - datas_a.emplace (pending.first, pending.second.data); - } - } - finished_single_requests.clear (); - }; +} - if (!disable_ongoing_requests_a) +void nano::telemetry::start () +{ + // Cannot be done in the constructor as a shared_from_this () call is made in ongoing_req_all_peers + if (!disable_ongoing_requests) { - ongoing_req_all_peers (); + ongoing_req_all_peers (std::chrono::milliseconds (0)); } } void nano::telemetry::stop () { - nano::lock_guard guard (mutex); - batch_request = nullptr; - single_requests.clear (); stopped = true; } -void nano::telemetry::add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a) +void nano::telemetry::set (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a) { - nano::lock_guard guard (mutex); if (!stopped) { - batch_request->add (telemetry_data_a, endpoint_a, is_empty_a); - - for (auto & request : single_requests) + nano::lock_guard guard (mutex); + auto it = recent_or_initial_request_telemetry_data.find (endpoint_a); + if (it == recent_or_initial_request_telemetry_data.cend ()) { - request.second.impl->add (telemetry_data_a, endpoint_a, is_empty_a); + // Not requesting telemetry data from this peer so ignore it + return; } + + recent_or_initial_request_telemetry_data.modify (it, [&telemetry_data_a](nano::telemetry_info & telemetry_info_a) { + telemetry_info_a.data = telemetry_data_a; + telemetry_info_a.undergoing_request = false; + }); + + channel_processed (endpoint_a, is_empty_a); } } -void nano::telemetry::ongoing_req_all_peers () +bool nano::telemetry::within_cache_cutoff (telemetry_info const & telemetry_info) const +{ + auto is_within = (telemetry_info.last_request + nano::telemetry_cache_cutoffs::network_to_time (network_params.network)) >= std::chrono::steady_clock::now (); + return !telemetry_info.awaiting_first_response () && is_within; +} + +void nano::telemetry::ongoing_req_all_peers (std::chrono::milliseconds next_request_interval) { - alarm.add (std::chrono::steady_clock::now () + batch_request->cache_cutoff + batch_request->alarm_cutoff, [this, telemetry_impl_w = std::weak_ptr (batch_request)]() { - if (auto batch_telemetry_impl = telemetry_impl_w.lock ()) + // Check if any peers actually need requesting + alarm.add (std::chrono::steady_clock::now () + next_request_interval, [this_w = std::weak_ptr (shared_from_this ())]() { + if (auto this_l = this_w.lock ()) { - nano::lock_guard guard (this->mutex); - if (!this->stopped) + // Check if there are any peers which are in the peers list which haven't been request, or any which are below or equal to the cache cutoff time + if (!this_l->stopped) { - auto peers = this->network.list (std::numeric_limits::max (), network_params.protocol.telemetry_protocol_version_min, false); - // If exists in single_requests don't request because they will just be rejected by other peers until the next round - auto const & single_requests = this->single_requests; - peers.erase (std::remove_if (peers.begin (), peers.end (), [&single_requests](auto const & channel_a) { - return single_requests.count (channel_a->get_endpoint ()) > 0; - }), - peers.cend ()); - if (!peers.empty ()) + auto peers = this_l->network.list (std::numeric_limits::max (), this_l->network_params.protocol.telemetry_protocol_version_min, false); + { - batch_telemetry_impl->get_metrics_async (peers, [](nano::telemetry_data_responses const &) { - // Intentionally empty, just using to refresh the cache + std::unordered_set temp_peers; + std::transform (peers.begin (), peers.end (), std::inserter (temp_peers, temp_peers.end ()), [](auto const & channel_a) { + return channel_a->get_endpoint (); }); - } - this->ongoing_req_all_peers (); - } - } - }); -} - -void nano::telemetry::get_metrics_peers_async (std::function const & callback_a) -{ - auto peers = network.list (std::numeric_limits::max (), network_params.protocol.telemetry_protocol_version_min, false); - nano::lock_guard guard (mutex); - if (!stopped && !peers.empty ()) - { - // If exists in single_requests, don't request because they will just be rejected by other nodes, instead all it as additional values - peers.erase (std::remove_if (peers.begin (), peers.end (), [& single_requests = this->single_requests](auto const & channel_a) { - return single_requests.count (channel_a->get_endpoint ()) > 0; - }), - peers.cend ()); - - batch_request->get_metrics_async (peers, [callback_a](nano::telemetry_data_responses const & telemetry_data_responses) { - callback_a (telemetry_data_responses); - }); - } - else - { - const auto all_received = false; - callback_a (nano::telemetry_data_responses{ {}, all_received }); - } -} + // Cleanup any stale saved telemetry data for non-existent peers + nano::lock_guard guard (this_l->mutex); + for (auto it = this_l->recent_or_initial_request_telemetry_data.begin (); it != this_l->recent_or_initial_request_telemetry_data.end ();) + { + if (!it->undergoing_request && !this_l->within_cache_cutoff (*it) && temp_peers.count (it->endpoint) == 0) + { + it = this_l->recent_or_initial_request_telemetry_data.erase (it); + } + else + { + ++it; + } + } -nano::telemetry_data_responses nano::telemetry::get_metrics_peers () -{ - std::promise promise; - get_metrics_peers_async ([&promise](telemetry_data_responses const & telemetry_data_responses_a) { - promise.set_value (telemetry_data_responses_a); - }); + peers.erase (std::remove_if (peers.begin (), peers.end (), [&this_l](auto const & channel_a) { + // Remove from peers list if it exists and is within the cache cutoff + auto it = this_l->recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ()); + return it != this_l->recent_or_initial_request_telemetry_data.end () && this_l->within_cache_cutoff (*it); + }), + peers.end ()); + } - return promise.get_future ().get (); -} + // Request data from new peers, or ones which are out of date + for (auto const & peer : peers) + { + this_l->get_metrics_single_peer_async (peer, [](auto const &) { + // Intentionally empty, just using to refresh the cache + }); + } -// After a request is made to a single peer we want to remove it from the container after the peer has not been requested for a while (cache_cutoff). -void nano::telemetry::ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a) -{ - alarm.add (std::chrono::steady_clock::now () + single_request_data_a.impl->cache_cutoff, [this, telemetry_impl_w = std::weak_ptr (single_request_data_a.impl), &single_request_data_a, &endpoint_a]() { - if (auto telemetry_impl = telemetry_impl_w.lock ()) - { - nano::lock_guard guard (this->mutex); - nano::lock_guard guard_telemetry_impl (telemetry_impl->mutex); - if (std::chrono::steady_clock::now () - telemetry_impl->cache_cutoff > single_request_data_a.last_updated && telemetry_impl->callbacks.empty ()) - { - // This will be picked up by the batch request next round - if (!telemetry_impl->cached_telemetry_data.empty ()) + nano::lock_guard guard (this_l->mutex); + long long next_round = std::chrono::duration_cast (nano::telemetry_cache_cutoffs::network_to_time (this_l->network_params.network)).count (); + if (!this_l->recent_or_initial_request_telemetry_data.empty ()) { - this->finished_single_requests[endpoint_a] = telemetry_impl->cached_telemetry_data.begin ()->second; + // Use the default request time unless a telemetry request cache expires sooner + auto const cache_cutoff = nano::telemetry_cache_cutoffs::network_to_time (this_l->network_params.network); + auto const last_request = this_l->recent_or_initial_request_telemetry_data.get ().begin ()->last_request; + if (std::chrono::steady_clock::now () > last_request + cache_cutoff) + { + next_round = std::min (next_round, std::chrono::duration_cast (std::chrono::steady_clock::now () - (last_request + cache_cutoff)).count ()); + } } - this->single_requests.erase (endpoint_a); - } - else - { - // Request is still active, so call again - this->ongoing_single_request_cleanup (endpoint_a, single_request_data_a); + + this_l->ongoing_req_all_peers (std::chrono::milliseconds (next_round)); } } }); } -void nano::telemetry::update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a) +std::unordered_map nano::telemetry::get_metrics () { - if (is_new_a) - { - // Clean this request up when it isn't being used anymore - ongoing_single_request_cleanup (endpoint_a, single_request_data_a); - } - else - { - // Ensure that refreshed flag is reset so we don't delete it before processing - single_request_data_a.last_updated = std::chrono::steady_clock::now (); - } + std::unordered_map telemetry_data; + + nano::lock_guard guard (mutex); + auto range = boost::make_iterator_range (recent_or_initial_request_telemetry_data); + + // clang-format off + nano::transform_if (range.begin (), range.end (), std::inserter (telemetry_data, telemetry_data.end ()), + [this](auto const & telemetry_info) { return this->within_cache_cutoff (telemetry_info); }, + [](auto const & telemetry_info) { return std::pair{ telemetry_info.endpoint, telemetry_info.data }; }); + // clang-format on + + return telemetry_data; } void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr const & channel_a, std::function const & callback_a) @@ -199,7 +158,6 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr guard (mutex); if (!stopped) { if (channel_a && (channel_a->get_network_version () >= network_params.protocol.telemetry_protocol_version_min)) @@ -211,42 +169,39 @@ void nano::telemetry::get_metrics_single_peer_async (std::shared_ptr guard (mutex); + auto it = recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ()); + if (it != recent_or_initial_request_telemetry_data.cend () && within_cache_cutoff (*it)) { - nano::lock_guard guard (batch_request->mutex); - auto it = batch_request->cached_telemetry_data.find (channel_a->get_endpoint ()); - if (it != batch_request->cached_telemetry_data.cend ()) - { - add_callback_async (it->second.data, it->first); - return; - } + add_callback_async (it->data, it->endpoint); } - // Next check single requests which finished and are awaiting batched requests - auto it = finished_single_requests.find (channel_a->get_endpoint ()); - if (it != finished_single_requests.cend ()) + else { - add_callback_async (it->second.data, it->first); - return; - } - - auto pair = single_requests.emplace (channel_a->get_endpoint (), single_request_data{ std::make_shared (network, alarm, worker), std::chrono::steady_clock::now () }); - auto & single_request_data_it = pair.first; - update_cleanup_data (single_request_data_it->first, single_request_data_it->second, pair.second); - - single_request_data_it->second.impl->get_metrics_async ({ channel_a }, [callback_a, channel_a](telemetry_data_responses const & telemetry_data_responses_a) { - // There should only be 1 response, so if this hasn't been received then conclude it is an error. - auto const error = !telemetry_data_responses_a.all_received; - if (!error) + if (it != recent_or_initial_request_telemetry_data.cend () && it->undergoing_request) { - debug_assert (telemetry_data_responses_a.telemetry_datas.size () == 1); - auto it = telemetry_data_responses_a.telemetry_datas.begin (); - callback_a ({ it->second, it->first, error }); + // A request is currently undergoing, add the callback + debug_assert (callbacks.count (it->endpoint) > 0); + callbacks[it->endpoint].push_back (callback_a); } else { - callback_a ({ nano::telemetry_data{}, channel_a->get_endpoint (), error }); + if (it == recent_or_initial_request_telemetry_data.cend ()) + { + recent_or_initial_request_telemetry_data.emplace (channel_a->get_endpoint (), nano::telemetry_data (), std::chrono::steady_clock::now (), true); + it = recent_or_initial_request_telemetry_data.find (channel_a->get_endpoint ()); + } + else + { + recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) { + telemetry_info_a.undergoing_request = true; + telemetry_info_a.last_request = std::chrono::steady_clock::now (); + }); + } + callbacks[it->endpoint].push_back (callback_a); + fire_request_message (channel_a); } - }); + } } else { @@ -269,252 +224,143 @@ nano::telemetry_data_response nano::telemetry::get_metrics_single_peer (std::sha return promise.get_future ().get (); } -size_t nano::telemetry::telemetry_data_size () +void nano::telemetry::fire_request_message (std::shared_ptr const & channel) { - nano::lock_guard guard (mutex); - auto total = std::accumulate (single_requests.begin (), single_requests.end (), static_cast (0), [](size_t total, auto & single_request) { - return total += single_request.second.impl->telemetry_data_size (); - }); + // Fire off a telemetry request to all passed in channels + debug_assert (channel->get_network_version () >= network_params.protocol.telemetry_protocol_version_min); - if (batch_request) + uint64_t round_l; { - total += batch_request->telemetry_data_size (); + auto it = recent_or_initial_request_telemetry_data.find (channel->get_endpoint ()); + recent_or_initial_request_telemetry_data.modify (it, [](nano::telemetry_info & telemetry_info_a) { + ++telemetry_info_a.round; + }); + round_l = it->round; } - return total; -} - -size_t nano::telemetry::finished_single_requests_size () -{ - nano::lock_guard guard (mutex); - return finished_single_requests.size (); -} -nano::telemetry_impl::telemetry_impl (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a) : -alarm_cutoff (is_sanitizer_build || nano::running_within_valgrind () ? 6 : 3), -network (network_a), -alarm (alarm_a), -worker (worker_a) -{ -} + std::weak_ptr this_w (shared_from_this ()); + nano::telemetry_req message; + // clang-format off + channel->send (message, [this_w, endpoint = channel->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) { + if (auto this_l = this_w.lock ()) + { + if (ec) + { + // Error sending the telemetry_req message + nano::lock_guard guard (this_l->mutex); + this_l->channel_processed (endpoint, true); + } + } + }, + nano::buffer_drop_policy::no_socket_drop); + // clang-format on -void nano::telemetry_impl::flush_callbacks_async () -{ - // Post to worker so that it's truly async and not on the calling thread (same problem as std::async otherwise) - worker.push_task ([this_w = std::weak_ptr (shared_from_this ())]() { + // If no response is seen after a certain period of time remove it + alarm.add (std::chrono::steady_clock::now () + response_time_cutoff, [round_l, this_w, endpoint = channel->get_endpoint ()]() { if (auto this_l = this_w.lock ()) { - nano::unique_lock lk (this_l->mutex); - // Invoke all callbacks, it's possible that during the mutex unlock other callbacks were added, - // so check again and invoke those too - this_l->invoking = true; - while (!this_l->callbacks.empty ()) + nano::lock_guard guard (this_l->mutex); + auto it = this_l->recent_or_initial_request_telemetry_data.find (endpoint); + if (it != this_l->recent_or_initial_request_telemetry_data.cend () && it->undergoing_request && round_l == it->round) { - lk.unlock (); - this_l->invoke_callbacks (); - lk.lock (); + this_l->channel_processed (endpoint, true); } - this_l->invoking = false; } }); } -void nano::telemetry_impl::get_metrics_async (std::deque> const & channels_a, std::function const & callback_a) +void nano::telemetry::channel_processed (nano::endpoint const & endpoint_a, bool error_a) { + if (recent_or_initial_request_telemetry_data.count (endpoint_a) > 0) { - nano::unique_lock lk (mutex); - callbacks.push_back (callback_a); - if (callbacks.size () > 1 || invoking) + if (error_a) { - // This means we already have at least one pending result already, so it will handle calls this callback when it completes - return; + recent_or_initial_request_telemetry_data.erase (endpoint_a); } - - // Check if we can just return cached results - if (channels_a.empty () || std::chrono::steady_clock::now () <= (last_time + cache_cutoff)) - { - flush_callbacks_async (); - return; - } - - failed.clear (); - debug_assert (required_responses.empty ()); - std::transform (channels_a.begin (), channels_a.end (), std::inserter (required_responses, required_responses.end ()), [](auto const & channel) { - return channel->get_endpoint (); - }); + flush_callbacks_async (endpoint_a, error_a); } - - fire_request_messages (channels_a); } -void nano::telemetry_impl::add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a) +void nano::telemetry::flush_callbacks_async (nano::endpoint const & endpoint_a, bool error_a) { - nano::unique_lock lk (mutex); - if (required_responses.find (endpoint_a) == required_responses.cend ()) - { - // Not requesting telemetry data from this channel so ignore it - return; - } - - if (!is_empty_a) - { - current_telemetry_data_responses[endpoint_a] = { telemetry_data_a, std::chrono::steady_clock::now () }; - } - channel_processed (lk, endpoint_a); + // Post to worker so that it's truly async and not on the calling thread (same problem as std::async otherwise) + worker.push_task ([endpoint_a, error_a, this_w = std::weak_ptr (shared_from_this ())]() { + if (auto this_l = this_w.lock ()) + { + nano::unique_lock lk (this_l->mutex); + while (!this_l->callbacks[endpoint_a].empty ()) + { + lk.unlock (); + this_l->invoke_callbacks (endpoint_a, error_a); + lk.lock (); + } + } + }); } -void nano::telemetry_impl::invoke_callbacks () +void nano::telemetry::invoke_callbacks (nano::endpoint const & endpoint_a, bool error_a) { - decltype (callbacks) callbacks_l; - bool all_received; - std::unordered_map cached_responses_l; + std::vector> callbacks_l; + telemetry_data_response response_data{ nano::telemetry_data (), endpoint_a, error_a }; { - // Copy callbacks so that they can be called outside of holding the lock + // Copy data so that it can be used outside of holding the lock nano::lock_guard guard (mutex); - callbacks_l = callbacks; - cached_responses_l.reserve (cached_telemetry_data.size ()); - std::transform (cached_telemetry_data.begin (), cached_telemetry_data.end (), std::inserter (cached_responses_l, cached_responses_l.end ()), [](auto const & endpoint_telemetry_data) { - return std::pair{ endpoint_telemetry_data.first, endpoint_telemetry_data.second.data }; - }); - current_telemetry_data_responses.clear (); - callbacks.clear (); - all_received = failed.empty (); + callbacks_l = callbacks[endpoint_a]; + auto it = recent_or_initial_request_telemetry_data.find (endpoint_a); + if (it != recent_or_initial_request_telemetry_data.end ()) + { + response_data.telemetry_data = it->data; + } + callbacks.erase (endpoint_a); } - if (pre_callback_callback) - { - pre_callback_callback (cached_responses_l, mutex); - } // Need to account for nodes which disable telemetry data in responses - bool all_received_l = !cached_responses_l.empty () && all_received; for (auto & callback : callbacks_l) { - callback ({ cached_responses_l, all_received_l }); - } -} - -void nano::telemetry_impl::channel_processed (nano::unique_lock & lk_a, nano::endpoint const & endpoint_a) -{ - debug_assert (lk_a.owns_lock ()); - auto num_removed = required_responses.erase (endpoint_a); - if (num_removed > 0 && required_responses.empty ()) - { - debug_assert (lk_a.owns_lock ()); - cached_telemetry_data = current_telemetry_data_responses; - - last_time = std::chrono::steady_clock::now (); - flush_callbacks_async (); + callback (response_data); } } -void nano::telemetry_impl::fire_request_messages (std::deque> const & channels) -{ - uint64_t round_l; - { - nano::lock_guard guard (mutex); - ++round; - round_l = round; - } - - // Fire off a telemetry request to all passed in channels - nano::telemetry_req message; - for (auto & channel : channels) - { - debug_assert (channel->get_network_version () >= network_params.protocol.telemetry_protocol_version_min); - - std::weak_ptr this_w (shared_from_this ()); - // clang-format off - channel->send (message, [this_w, endpoint = channel->get_endpoint ()](boost::system::error_code const & ec, size_t size_a) { - if (auto this_l = this_w.lock ()) - { - if (ec) - { - // Error sending the telemetry_req message - nano::unique_lock lk (this_l->mutex); - this_l->failed.push_back (endpoint); - this_l->channel_processed (lk, endpoint); - } - } - }, - nano::buffer_drop_policy::no_socket_drop); - // clang-format on - - // If no response is seen after a certain period of time, remove it from the list of expected responses. However, only if it is part of the same round. - alarm.add (std::chrono::steady_clock::now () + alarm_cutoff, [this_w, endpoint = channel->get_endpoint (), round_l]() { - if (auto this_l = this_w.lock ()) - { - nano::unique_lock lk (this_l->mutex); - if (this_l->round == round_l && this_l->required_responses.find (endpoint) != this_l->required_responses.cend ()) - { - this_l->failed.push_back (endpoint); - this_l->channel_processed (lk, endpoint); - } - } - }); - } -} - -size_t nano::telemetry_impl::telemetry_data_size () +size_t nano::telemetry::telemetry_data_size () { nano::lock_guard guard (mutex); - return current_telemetry_data_responses.size (); + return recent_or_initial_request_telemetry_data.size (); } -bool nano::telemetry_data_time_pair::operator== (telemetry_data_time_pair const & telemetry_data_time_pair_a) const +nano::telemetry_info::telemetry_info (nano::endpoint const & endpoint_a, nano::telemetry_data const & data_a, std::chrono::steady_clock::time_point last_request_a, bool undergoing_request_a) : +endpoint (endpoint_a), +data (data_a), +last_request (last_request_a), +undergoing_request (undergoing_request_a) { - return data == telemetry_data_time_pair_a.data && last_updated == telemetry_data_time_pair_a.last_updated; } -bool nano::telemetry_data_time_pair::operator!= (telemetry_data_time_pair const & telemetry_data_time_pair_a) const +bool nano::telemetry_info::awaiting_first_response () const { - return !(*this == telemetry_data_time_pair_a); + return data == nano::telemetry_data (); } std::unique_ptr nano::collect_container_info (telemetry & telemetry, const std::string & name) { - size_t single_requests_count; - { - nano::lock_guard guard (telemetry.mutex); - single_requests_count = telemetry.single_requests.size (); - } - auto composite = std::make_unique (name); - if (telemetry.batch_request) + size_t callbacks_count; { - composite->add_component (collect_container_info (*telemetry.batch_request, "batch_request")); + nano::lock_guard guard (telemetry.mutex); + std::unordered_map>> callbacks; + callbacks_count = std::accumulate (callbacks.begin (), callbacks.end (), static_cast (0), [](auto total, auto const & callback_a) { + return total += callback_a.second.size (); + }); } - composite->add_component (std::make_unique (container_info{ "single_requests", single_requests_count, sizeof (decltype (telemetry.single_requests)::value_type) })); - composite->add_component (std::make_unique (container_info{ "finished_single_requests", telemetry.finished_single_requests_size (), sizeof (decltype (telemetry.finished_single_requests)::value_type) })); - return composite; -} -std::unique_ptr nano::collect_container_info (telemetry_impl & telemetry_impl, const std::string & name) -{ - size_t callback_count; - size_t all_telemetry_data_count; - size_t cached_telemetry_data_count; - size_t required_responses_count; - { - nano::lock_guard guard (telemetry_impl.mutex); - callback_count = telemetry_impl.callbacks.size (); - all_telemetry_data_count = telemetry_impl.current_telemetry_data_responses.size (); - cached_telemetry_data_count = telemetry_impl.cached_telemetry_data.size (); - required_responses_count = telemetry_impl.required_responses.size (); - } + composite->add_component (std::make_unique (container_info{ "recent_or_initial_request_telemetry_data", telemetry.telemetry_data_size (), sizeof (decltype (telemetry.recent_or_initial_request_telemetry_data)::value_type) })); + composite->add_component (std::make_unique (container_info{ "callbacks", callbacks_count, sizeof (decltype (telemetry.callbacks)::value_type::second_type) })); - auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "callbacks", callback_count, sizeof (decltype (telemetry_impl.callbacks)::value_type) })); - composite->add_component (std::make_unique (container_info{ "current_telemetry_data_responses", all_telemetry_data_count, sizeof (decltype (telemetry_impl.current_telemetry_data_responses)::value_type) })); - composite->add_component (std::make_unique (container_info{ "cached_telemetry_data", cached_telemetry_data_count, sizeof (decltype (telemetry_impl.cached_telemetry_data)::value_type) })); - composite->add_component (std::make_unique (container_info{ "required_responses", required_responses_count, sizeof (decltype (telemetry_impl.required_responses)::value_type) })); return composite; } nano::telemetry_data nano::consolidate_telemetry_data (std::vector const & telemetry_datas) { - std::vector telemetry_data_time_pairs; - telemetry_data_time_pairs.reserve (telemetry_datas.size ()); - if (telemetry_datas.empty ()) { return {}; diff --git a/nano/node/telemetry.hpp b/nano/node/telemetry.hpp index 4406e7cd24..1f704af9b3 100644 --- a/nano/node/telemetry.hpp +++ b/nano/node/telemetry.hpp @@ -3,30 +3,26 @@ #include #include +#include +#include +#include +#include + #include #include -#include + +namespace mi = boost::multi_index; namespace nano { class network; class alarm; class worker; -class telemetry; namespace transport { class channel; } -class telemetry_data_time_pair -{ -public: - nano::telemetry_data data; - std::chrono::steady_clock::time_point last_updated; - bool operator== (telemetry_data_time_pair const &) const; - bool operator!= (telemetry_data_time_pair const &) const; -}; - /* * Holds a response from a telemetry request */ @@ -38,106 +34,48 @@ class telemetry_data_response bool error{ true }; }; -/* - * Holds many responses from telemetry requests - */ -class telemetry_data_responses -{ -public: - std::unordered_map telemetry_datas; - bool all_received{ false }; -}; - -/* - * This class requests node telemetry metrics and invokes any callbacks - * which have been aggregated. Further calls to get_metrics_async may return cached telemetry metrics - * if they are within cache_cutoff time from the latest request. - */ -class telemetry_impl : public std::enable_shared_from_this +class telemetry_info final { public: - telemetry_impl (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a); + telemetry_info () = default; + telemetry_info (nano::endpoint const & endpoint, nano::telemetry_data const & data, std::chrono::steady_clock::time_point last_request, bool undergoing_request); + bool awaiting_first_response () const; -private: - // Class only available to the telemetry class - void get_metrics_async (std::deque> const & channels_a, std::function const & callback_a); - void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a); - size_t telemetry_data_size (); - - nano::network_params network_params; - // Anything older than this requires requesting metrics from other nodes. - std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) }; - std::chrono::seconds const alarm_cutoff; - - // All data in this chunk is protected by this mutex - std::mutex mutex; - std::vector> callbacks; - std::chrono::steady_clock::time_point last_time = std::chrono::steady_clock::now () - cache_cutoff; - /* The responses received during this request round */ - std::unordered_map current_telemetry_data_responses; - /* The metrics for the last request round */ - std::unordered_map cached_telemetry_data; - std::unordered_set required_responses; + nano::endpoint endpoint; + nano::telemetry_data data; + std::chrono::steady_clock::time_point last_request; + bool undergoing_request{ false }; uint64_t round{ 0 }; - /* Currently executing callbacks */ - bool invoking{ false }; - std::vector failed; - - nano::network & network; - nano::alarm & alarm; - nano::worker & worker; - - std::function & data_a, std::mutex &)> pre_callback_callback; - - void invoke_callbacks (); - void channel_processed (nano::unique_lock & lk_a, nano::endpoint const & endpoint_a); - void flush_callbacks_async (); - void fire_request_messages (std::deque> const & channels); - - friend std::unique_ptr collect_container_info (telemetry_impl &, const std::string &); - friend nano::telemetry; - friend class node_telemetry_single_request_Test; - friend class node_telemetry_basic_Test; - friend class node_telemetry_ongoing_requests_Test; }; -std::unique_ptr collect_container_info (telemetry_impl & telemetry_impl, const std::string & name); - /* - * This class has 2 main operations: - * Request metrics from specific single peers (single_requests) - * - If this peer is in the batched request, it will use the value from that, otherwise send a telemetry_req message (non-droppable) - * Request metrics from all peers (batched_request) - * - This is polled every minute. - * - If a single request is currently underway, do not request because other peers will just reject if within a hotzone time - * - This will be proactively added when callbacks are called inside pre_callback_callback + * This class requests node telemetry metrics from peers and invokes any callbacks which have been aggregated. + * All calls to get_metrics return cached data, it does not do any requests, these are periodically done in + * ongoing_req_all_peers. This can be disabled with the disable_ongoing_telemetry_requests node flag. + * Calls to get_metrics_single_peer_async will wait until a response is made if it is not within the cache + * cut off. */ -class telemetry +class telemetry : public std::enable_shared_from_this { public: - telemetry (nano::network & network_a, nano::alarm & alarm_a, nano::worker & worker_a, bool disable_ongoing_requests_a); - - /* - * Add telemetry metrics received from this endpoint. - * Should this be unsolicited, it will not be added. - * Some peers may have disabled responding with telemetry data, need to account for this - */ - void add (nano::telemetry_data const & telemetry_data_a, nano::endpoint const & endpoint_a, bool is_empty_a); + telemetry (nano::network &, nano::alarm &, nano::worker &, bool); + void start (); + void stop (); /* - * Collects metrics from all known peers and invokes the callback when complete. + * Set the telemetry data associated with this peer */ - void get_metrics_peers_async (std::function const & callback_a); + void set (nano::telemetry_data const &, nano::endpoint const &, bool); /* - * A blocking version of get_metrics_peers_async (). + * This returns what ever is in the cache */ - telemetry_data_responses get_metrics_peers (); + std::unordered_map get_metrics (); /* * This makes a telemetry request to the specific channel */ - void get_metrics_single_peer_async (std::shared_ptr const &, std::function const & callback_a); + void get_metrics_single_peer_async (std::shared_ptr const &, std::function const &); /* * A blocking version of get_metrics_single_peer_async @@ -149,51 +87,52 @@ class telemetry */ size_t telemetry_data_size (); - /* - * Return the number of finished_single_requests elements - */ - size_t finished_single_requests_size (); - - /* - * Stop the telemetry processor - */ - void stop (); - private: + class tag_endpoint + { + }; + class tag_last_updated + { + }; + nano::network & network; nano::alarm & alarm; nano::worker & worker; + std::atomic stopped{ false }; nano::network_params network_params; - - class single_request_data - { - public: - std::shared_ptr impl; - std::chrono::steady_clock::time_point last_updated{ std::chrono::steady_clock::now () }; - }; + bool disable_ongoing_requests; std::mutex mutex; - /* Requests telemetry data from a random selection of peers */ - std::shared_ptr batch_request; - /* Any requests to specific individual peers is maintained here */ - std::unordered_map single_requests; - /* This holds data from single_requests after the cache is removed */ - std::unordered_map finished_single_requests; - bool stopped{ false }; - - void update_cleanup_data (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data & single_request_data_a, bool is_new_a); - void ongoing_single_request_cleanup (nano::endpoint const & endpoint_a, nano::telemetry::single_request_data const & single_request_data_a); - void ongoing_req_all_peers (); - - friend class node_telemetry_multiple_single_request_clearing_Test; - friend class node_telemetry_ongoing_requests_Test; - friend std::unique_ptr collect_container_info (telemetry &, const std::string &); + // clang-format off + // This holds the last telemetry data received from peers or can be a placeholder awaiting the first response (check with awaiting_first_response ()) + boost::multi_index_container, + mi::member>, + mi::ordered_non_unique, + mi::member>>> recent_or_initial_request_telemetry_data; + // clang-format on + + // Anything older than this requires requesting metrics from other nodes. + std::chrono::seconds const cache_cutoff{ nano::telemetry_cache_cutoffs::network_to_time (network_params.network) }; + std::chrono::seconds const response_time_cutoff{ is_sanitizer_build || nano::running_within_valgrind () ? 6 : 3 }; + + std::unordered_map>> callbacks; + + void ongoing_req_all_peers (std::chrono::milliseconds); + + void fire_request_message (std::shared_ptr const & channel); + void channel_processed (nano::endpoint const &, bool); + void flush_callbacks_async (nano::endpoint const &, bool); + void invoke_callbacks (nano::endpoint const &, bool); + + bool within_cache_cutoff (nano::telemetry_info const &) const; + friend std::unique_ptr collect_container_info (telemetry & telemetry, const std::string & name); }; std::unique_ptr collect_container_info (telemetry & telemetry, const std::string & name); nano::telemetry_data consolidate_telemetry_data (std::vector const & telemetry_data); -nano::telemetry_data_time_pair consolidate_telemetry_data_time_pairs (std::vector const & telemetry_data_time_pairs); nano::telemetry_data local_telemetry_data (nano::ledger_cache const &, nano::network &, uint64_t, nano::network_params const &, std::chrono::steady_clock::time_point); } \ No newline at end of file diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 29525bb72b..b71dda5a0d 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -78,7 +78,7 @@ namespace transport class tcp_channels final { friend class nano::transport::channel_tcp; - friend class node_telemetry_simultaneous_single_and_all_requests_Test; + friend class node_telemetry_simultaneous_requests_Test; public: tcp_channels (nano::node &); diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 9c8919733a..4431bae883 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -7990,12 +7990,25 @@ TEST (rpc, node_telemetry_all) { ASSERT_NO_ERROR (system.poll ()); - auto transaction = system.nodes.back ()->store.tx_begin_read (); - peers_stored = system.nodes.back ()->store.peer_count (transaction) != 0; + auto transaction = node1.store.tx_begin_read (); + peers_stored = node1.store.peer_count (transaction) != 0; } - boost::property_tree::ptree request; + // First need to set up the cached data + std::atomic done{ false }; auto node = system.nodes.front (); + node1.telemetry->get_metrics_single_peer_async (node1.network.find_channel (node->network.endpoint ()), [&done](nano::telemetry_data_response const & telemetry_data_response_a) { + ASSERT_FALSE (telemetry_data_response_a.error); + done = true; + }); + + system.deadline_set (10s); + while (!done) + { + ASSERT_NO_ERROR (system.poll ()); + } + + boost::property_tree::ptree request; request.put ("action", "node_telemetry"); { test_response response (request, rpc.config.port, system.io_ctx); diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 08209b527f..253cdd0ea6 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -857,10 +857,8 @@ class data class shared_data { public: + nano::util::counted_completion write_completion{ 0 }; std::atomic done{ false }; - std::atomic count{ 0 }; - std::promise promise; - std::shared_future shared_future{ promise.get_future () }; }; template @@ -880,16 +878,10 @@ void callback_process (shared_data & shared_data_a, data & data, T & all_node_da data.awaiting_cache = true; data.orig_time = last_updated; } - if (--shared_data_a.count == 0 && std::all_of (all_node_data_a.begin (), all_node_data_a.end (), [](auto const & data) { return !data.keep_requesting_metrics; })) - { - shared_data_a.done = true; - shared_data_a.promise.set_value (); - } + shared_data_a.write_completion.increment (); }; } -namespace nano -{ TEST (node_telemetry, ongoing_requests) { nano::system system (2); @@ -899,20 +891,20 @@ TEST (node_telemetry, ongoing_requests) wait_peer_connections (system); - ASSERT_EQ (0, node_client->telemetry.telemetry_data_size ()); - ASSERT_EQ (0, node_server->telemetry.telemetry_data_size ()); + ASSERT_EQ (0, node_client->telemetry->telemetry_data_size ()); + ASSERT_EQ (0, node_server->telemetry->telemetry_data_size ()); ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_ack, nano::stat::dir::in)); ASSERT_EQ (0, node_client->stats.count (nano::stat::type::bootstrap, nano::stat::detail::telemetry_req, nano::stat::dir::out)); system.deadline_set (20s); - while (node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in) != 1 || node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out) != 1) + while (node_client->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in) != 1 || node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in) != 1) { ASSERT_NO_ERROR (system.poll ()); } // Wait till the next ongoing will be called, and add a 1s buffer for the actual processing auto time = std::chrono::steady_clock::now (); - while (std::chrono::steady_clock::now () < (time + nano::telemetry_cache_cutoffs::test + node_client->telemetry.batch_request->alarm_cutoff + 1s)) + while (std::chrono::steady_clock::now () < (time + nano::telemetry_cache_cutoffs::test + 1s)) { ASSERT_NO_ERROR (system.poll ()); } @@ -924,70 +916,12 @@ TEST (node_telemetry, ongoing_requests) ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::in)); ASSERT_EQ (2, node_server->stats.count (nano::stat::type::message, nano::stat::detail::telemetry_req, nano::stat::dir::out)); } -} - -TEST (node_telemetry, simultaneous_all_requests) -{ - const auto num_nodes = 4; - nano::system system (num_nodes); - - // Wait until peers are stored as they are done in the background - wait_peer_connections (system); - - std::vector threads; - const auto num_threads = 4; - - std::array all_data{}; - for (auto i = 0; i < num_nodes; ++i) - { - all_data[i].node = system.nodes[i]; - } - - shared_data shared_data; - - // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. - // The test waits until all telemetry_ack messages have been received. - for (int i = 0; i < num_threads; ++i) - { - threads.emplace_back ([&all_data, &shared_data]() { - while (std::any_of (all_data.cbegin (), all_data.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) - { - for (auto & data : all_data) - { - // Keep calling requesting telemetry metrics until the cache has been saved and then become outdated (after a certain period of time) for each node - if (data.keep_requesting_metrics) - { - ++shared_data.count; - data.node->telemetry.get_metrics_peers_async ([&shared_data, &data, &all_data](nano::telemetry_data_responses const & responses_a) { - callback_process (shared_data, data, all_data, *responses_a.telemetry_datas.begin ()->second.timestamp); - }); - } - std::this_thread::sleep_for (1ms); - } - } - - shared_data.shared_future.wait (); - ASSERT_EQ (shared_data.count, 0); - }); - } - - system.deadline_set (20s); - while (!shared_data.done) - { - ASSERT_NO_ERROR (system.poll ()); - } - - for (auto & thread : threads) - { - thread.join (); - } -} namespace nano { namespace transport { - TEST (node_telemetry, simultaneous_single_and_all_requests) + TEST (node_telemetry, simultaneous_requests) { const auto num_nodes = 4; nano::system system (num_nodes); @@ -997,66 +931,52 @@ namespace transport std::vector threads; const auto num_threads = 4; - std::array node_data_single{}; - std::array node_data_all{}; + std::array node_data{}; for (auto i = 0; i < num_nodes; ++i) { - node_data_single[i].node = system.nodes[i]; - node_data_all[i].node = system.nodes[i]; + node_data[i].node = system.nodes[i]; } - shared_data shared_data_single; - shared_data shared_data_all; + shared_data shared_data; // Create a few threads where each node sends out telemetry request messages to all other nodes continuously, until the cache it reached and subsequently expired. // The test waits until all telemetry_ack messages have been received. for (int i = 0; i < num_threads; ++i) { - threads.emplace_back ([&node_data_single, &node_data_all, &shared_data_single, &shared_data_all]() { - auto func = [](auto & all_node_data_a, shared_data & shared_data_a, bool single_a) { - while (std::any_of (all_node_data_a.cbegin (), all_node_data_a.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) + threads.emplace_back ([&node_data, &shared_data]() { + while (std::any_of (node_data.cbegin (), node_data.cend (), [](auto const & data) { return data.keep_requesting_metrics.load (); })) + { + for (auto & data : node_data) { - for (auto & data : all_node_data_a) + // Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node + if (data.keep_requesting_metrics) { - // Keep calling get_metrics_async until the cache has been saved and then become outdated (after a certain period of time) for each node - if (data.keep_requesting_metrics) - { - ++shared_data_a.count; - - if (single_a) - { - // Pick first peer to be consistent - auto peer = data.node->network.tcp_channels.channels[0].channel; - data.node->telemetry.get_metrics_single_peer_async (peer, [&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_response const & telemetry_data_response_a) { - callback_process (shared_data_a, data, all_node_data_a, *telemetry_data_response_a.telemetry_data.timestamp); - }); - } - else - { - data.node->telemetry.get_metrics_peers_async ([&shared_data_a, &data, &all_node_data_a](nano::telemetry_data_responses const & telemetry_data_responses_a) { - callback_process (shared_data_a, data, all_node_data_a, *telemetry_data_responses_a.telemetry_datas.begin ()->second.timestamp); - }); - } - } - std::this_thread::sleep_for (1ms); + shared_data.write_completion.increment_required_count (); + + // Pick first peer to be consistent + auto peer = data.node->network.tcp_channels.channels[0].channel; + data.node->telemetry->get_metrics_single_peer_async (peer, [&shared_data, &data, &node_data](nano::telemetry_data_response const & telemetry_data_response_a) { + ASSERT_FALSE (telemetry_data_response_a.error); + callback_process (shared_data, data, node_data, *telemetry_data_response_a.telemetry_data.timestamp); + }); } + std::this_thread::sleep_for (1ms); } + } - shared_data_a.shared_future.wait (); - ASSERT_EQ (shared_data_a.count, 0); - }; - - func (node_data_single, shared_data_single, true); - func (node_data_all, shared_data_all, false); + shared_data.write_completion.await_count_for (20s); + shared_data.done = true; }); } system.deadline_set (30s); - while (!shared_data_all.done || !shared_data_single.done) + while (!shared_data.done) { ASSERT_NO_ERROR (system.poll ()); } + ASSERT_TRUE (std::all_of (node_data.begin (), node_data.end (), [](auto const & data) { return !data.keep_requesting_metrics; })); + for (auto & thread : threads) { thread.join ();