Skip to content

Commit

Permalink
Simplify telemetry data processing
Browse files Browse the repository at this point in the history
  • Loading branch information
wezrule committed Feb 25, 2020
1 parent 73b12c3 commit ee67632
Show file tree
Hide file tree
Showing 10 changed files with 421 additions and 959 deletions.
421 changes: 71 additions & 350 deletions nano/core_test/node_telemetry.cpp

Large diffs are not rendered by default.

61 changes: 35 additions & 26 deletions nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3978,29 +3978,36 @@ void nano::json_handler::telemetry ()
if (!ec)
{
debug_assert (channel);
node.telemetry.get_metrics_single_peer_async (channel, [rpc_l](auto const & telemetry_response_a) {
if (!telemetry_response_a.error)
{
nano::jsonconfig config_l;
auto err = telemetry_response_a.telemetry_data.serialize_json (config_l);
auto const & ptree = config_l.get_tree ();

if (!err)
if (node.telemetry)
{
node.telemetry->get_metrics_single_peer_async (channel, [rpc_l](auto const & telemetry_response_a) {
if (!telemetry_response_a.error)
{
rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ());
nano::jsonconfig config_l;
auto err = telemetry_response_a.telemetry_data.serialize_json (config_l);
auto const & ptree = config_l.get_tree ();

if (!err)
{
rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ());
}
else
{
rpc_l->ec = nano::error_rpc::generic;
}
}
else
{
rpc_l->ec = nano::error_rpc::generic;
}
}
else
{
rpc_l->ec = nano::error_rpc::generic;
}

rpc_l->response_errors ();
});
rpc_l->response_errors ();
});
}
else
{
response_errors ();
}
}
else
{
Expand All @@ -4013,11 +4020,13 @@ void nano::json_handler::telemetry ()
// setting "raw" to true returns metrics from all nodes requested.
auto raw = request.get_optional<bool> ("raw");
auto output_raw = raw.value_or (false);
node.telemetry.get_metrics_peers_async ([rpc_l, output_raw](telemetry_data_responses const & telemetry_responses_a) {
if (node.telemetry)
{
auto telemetry_responses = node.telemetry->get_metrics ();
if (output_raw)
{
boost::property_tree::ptree metrics;
for (auto & telemetry_metrics : telemetry_responses_a.telemetry_datas)
for (auto & telemetry_metrics : telemetry_responses)
{
nano::jsonconfig config_l;
auto err = telemetry_metrics.second.serialize_json (config_l);
Expand All @@ -4029,18 +4038,18 @@ void nano::json_handler::telemetry ()
}
else
{
rpc_l->ec = nano::error_rpc::generic;
ec = nano::error_rpc::generic;
}
}

rpc_l->response_l.put_child ("metrics", metrics);
response_l.put_child ("metrics", metrics);
}
else
{
nano::jsonconfig config_l;
std::vector<nano::telemetry_data> telemetry_datas;
telemetry_datas.reserve (telemetry_responses_a.telemetry_datas.size ());
std::transform (telemetry_responses_a.telemetry_datas.begin (), telemetry_responses_a.telemetry_datas.end (), std::back_inserter (telemetry_datas), [](auto const & endpoint_telemetry_data) {
telemetry_datas.reserve (telemetry_responses.size ());
std::transform (telemetry_responses.begin (), telemetry_responses.end (), std::back_inserter (telemetry_datas), [](auto const & endpoint_telemetry_data) {
return endpoint_telemetry_data.second;
});

Expand All @@ -4050,16 +4059,16 @@ void nano::json_handler::telemetry ()

if (!err)
{
rpc_l->response_l.insert (rpc_l->response_l.begin (), ptree.begin (), ptree.end ());
response_l.insert (response_l.begin (), ptree.begin (), ptree.end ());
}
else
{
rpc_l->ec = nano::error_rpc::generic;
ec = nano::error_rpc::generic;
}
}
}

rpc_l->response_errors ();
});
response_errors ();
}
}

Expand Down
5 changes: 4 additions & 1 deletion nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,10 @@ class network_message_visitor : public nano::message_visitor
node.logger.try_log (boost::str (boost::format ("Received telemetry_ack message from %1%") % channel->to_string ()));
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::telemetry_ack, nano::stat::dir::in);
node.telemetry.add (message_a.data, channel->get_endpoint (), message_a.is_empty_payload ());
if (node.telemetry)
{
node.telemetry->set (message_a.data, channel->get_endpoint (), message_a.is_empty_payload ());
}
}
nano::node & node;
std::shared_ptr<nano::transport::channel> channel;
Expand Down
15 changes: 12 additions & 3 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ gap_cache (*this),
ledger (store, stats, flags_a.generate_cache),
checker (config.signature_checker_threads),
network (*this, config.peering_port),
telemetry (network, alarm, worker, flags.disable_ongoing_telemetry_requests),
telemetry (std::make_shared<nano::telemetry> (network, alarm, worker, flags.disable_ongoing_telemetry_requests)),
bootstrap_initiator (*this),
bootstrap (config.peering_port, *this),
application_path (application_path_a),
Expand All @@ -153,6 +153,8 @@ startup_time (std::chrono::steady_clock::now ())
{
if (!init_error ())
{
telemetry->start ();

if (config.websocket_config.enabled)
{
auto endpoint_l (nano::tcp_endpoint (boost::asio::ip::make_address_v6 (config.websocket_config.address), config.websocket_config.port));
Expand Down Expand Up @@ -586,7 +588,10 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.bootstrap_initiator, "bootstrap_initiator"));
composite->add_component (collect_container_info (node.bootstrap, "bootstrap"));
composite->add_component (collect_container_info (node.network, "network"));
composite->add_component (collect_container_info (node.telemetry, "telemetry"));
if (node.telemetry)
{
composite->add_component (collect_container_info (*node.telemetry, "telemetry"));
}
composite->add_component (collect_container_info (node.observers, "observers"));
composite->add_component (collect_container_info (node.wallets, "wallets"));
composite->add_component (collect_container_info (node.vote_processor, "vote_processor"));
Expand Down Expand Up @@ -699,7 +704,11 @@ void nano::node::stop ()
active.stop ();
confirmation_height_processor.stop ();
network.stop ();
telemetry.stop ();
if (telemetry)
{
telemetry->stop ();
telemetry = nullptr;
}
if (websocket_server)
{
websocket_server->stop ();
Expand Down
2 changes: 1 addition & 1 deletion nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ class node final : public std::enable_shared_from_this<nano::node>
nano::ledger ledger;
nano::signature_checker checker;
nano::network network;
nano::telemetry telemetry;
std::shared_ptr<nano::telemetry> telemetry;
nano::bootstrap_initiator bootstrap_initiator;
nano::bootstrap_listener bootstrap;
boost::filesystem::path application_path;
Expand Down
Loading

0 comments on commit ee67632

Please sign in to comment.