From 8d967c8de2d8241cfed2d50ce0f32f8ad2b902be Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Tue, 10 Jan 2023 13:21:07 -0400 Subject: [PATCH] Allow xact_ttl in milliseconds to proper timeout netprobe data --- libs/visor_transaction/TransactionManager.h | 21 +++++++++++++++---- src/handlers/dhcp/DhcpStreamHandler.cpp | 11 ++++++---- src/handlers/dhcp/DhcpStreamHandler.h | 3 ++- src/handlers/dns/v1/DnsStreamHandler.cpp | 10 +++++---- src/handlers/dns/v1/DnsStreamHandler.h | 3 ++- src/handlers/dns/v1/tests/test_dns_layer.cpp | 2 +- src/handlers/dns/v2/DnsStreamHandler.cpp | 7 +++++-- src/handlers/dns/v2/DnsStreamHandler.h | 4 +++- src/handlers/dns/v2/tests/test_dns_layer.cpp | 2 +- .../netprobe/NetProbeStreamHandler.cpp | 21 ++++++++++++++++--- src/handlers/netprobe/NetProbeStreamHandler.h | 11 ++++++---- src/inputs/netprobe/NetProbeInputStream.cpp | 4 +++- src/inputs/netprobe/PingProbe.cpp | 17 --------------- src/inputs/netprobe/PingProbe.h | 1 - 14 files changed, 72 insertions(+), 45 deletions(-) diff --git a/libs/visor_transaction/TransactionManager.h b/libs/visor_transaction/TransactionManager.h index 40bd61a49..56eb9250a 100644 --- a/libs/visor_transaction/TransactionManager.h +++ b/libs/visor_transaction/TransactionManager.h @@ -53,13 +53,19 @@ class TransactionManager static_assert(std::is_base_of::value, "TransactionType must inherit from Transaction structure"); typedef robin_hood::unordered_map XactMap; - uint32_t _ttl_secs; + uint32_t _ttl_secs{0}; + uint32_t _ttl_ms{0}; XactMap _transactions; public: - TransactionManager(uint32_t ttl_secs = 5) - : _ttl_secs(ttl_secs) + TransactionManager(uint32_t ttl_ms = 5000) { + if (ttl_ms > 1000) { + _ttl_secs = ttl_ms / 1000; + _ttl_ms = ttl_ms - _ttl_secs * 1000; + } else { + _ttl_ms = ttl_ms; + } } void start_transaction(XactID id, TransactionType type) @@ -73,7 +79,9 @@ class TransactionManager auto result = _transactions[id]; timespec_diff(&endTS, &result.startTS, &result.totalTS); _transactions.erase(id); - if (result.totalTS.tv_sec >= _ttl_secs) { + if (result.totalTS.tv_sec > _ttl_secs) { + return std::pair(Result::TimedOut, result); + } else if (result.totalTS.tv_sec == _ttl_secs && (result.totalTS.tv_nsec / 1.0e6) >= _ttl_ms) { return std::pair(Result::TimedOut, result); } else { return std::pair(Result::Valid, result); @@ -97,6 +105,11 @@ class TransactionManager return timed_out.size(); } + void clear() + { + _transactions.clear(); + } + typename XactMap::size_type open_transaction_count() const { return _transactions.size(); diff --git a/src/handlers/dhcp/DhcpStreamHandler.cpp b/src/handlers/dhcp/DhcpStreamHandler.cpp index 740089f13..dda9b7b32 100644 --- a/src/handlers/dhcp/DhcpStreamHandler.cpp +++ b/src/handlers/dhcp/DhcpStreamHandler.cpp @@ -50,9 +50,12 @@ void DhcpStreamHandler::start() _metrics->set_recorded_stream(); } - if (config_exists("xact_ttl_secs")) { - auto ttl = config_get("xact_ttl_secs"); + if (config_exists("xact_ttl_ms")) { + auto ttl = config_get("xact_ttl_ms"); _metrics->set_xact_ttl(static_cast(ttl)); + } else if (config_exists("xact_ttl_secs")) { + auto ttl = config_get("xact_ttl_secs"); + _metrics->set_xact_ttl(static_cast(ttl) * 1000); } if (_pcap_proxy) { @@ -151,9 +154,9 @@ void DhcpMetricsBucket::to_opentelemetry(metrics::v1::ScopeMetrics &scope, Metri { auto start_ts = start_tstamp(); auto end_ts = end_tstamp(); - + _rate_total.to_opentelemetry(scope, start_ts, end_ts, add_labels); - + { auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe diff --git a/src/handlers/dhcp/DhcpStreamHandler.h b/src/handlers/dhcp/DhcpStreamHandler.h index d6b263c90..1a94f3c09 100644 --- a/src/handlers/dhcp/DhcpStreamHandler.h +++ b/src/handlers/dhcp/DhcpStreamHandler.h @@ -158,7 +158,8 @@ class DhcpStreamHandler final : public visor::StreamMetricsHandlerset_recorded_stream(); } - if (config_exists("xact_ttl_secs")) { - auto ttl = config_get("xact_ttl_secs"); + if (config_exists("xact_ttl_ms")) { + auto ttl = config_get("xact_ttl_ms"); _metrics->set_xact_ttl(static_cast(ttl)); + } else if (config_exists("xact_ttl_secs")) { + auto ttl = config_get("xact_ttl_secs"); + _metrics->set_xact_ttl(static_cast(ttl) * 1000); } if (_pcap_proxy) { @@ -1160,14 +1163,13 @@ void DnsMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap ad }); } - void DnsMetricsBucket::to_opentelemetry(metrics::v1::ScopeMetrics &scope, Metric::LabelMap add_labels) const { auto start_ts = start_tstamp(); auto end_ts = end_tstamp(); _rate_total.to_opentelemetry(scope, start_ts, end_ts, add_labels); - + { auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe diff --git a/src/handlers/dns/v1/DnsStreamHandler.h b/src/handlers/dns/v1/DnsStreamHandler.h index eb9e2dcfa..ae6086fdb 100644 --- a/src/handlers/dns/v1/DnsStreamHandler.h +++ b/src/handlers/dns/v1/DnsStreamHandler.h @@ -385,7 +385,8 @@ class DnsStreamHandler final : public visor::StreamMetricsHandler("num_periods", 1); DnsStreamHandler dns_handler{"dns-test", stream_proxy, &c}; dns_handler.config_set("invalid_config", true); - REQUIRE_THROWS_WITH(dns_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: exclude_noerror, only_rcode, only_queries, only_responses, only_dnssec_response, answer_count, only_qtype, only_qname, only_qname_suffix, geoloc_notfound, asn_notfound, dnstap_msg_type, public_suffix_list, recorded_stream, xact_ttl_secs, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold"); + REQUIRE_THROWS_WITH(dns_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: exclude_noerror, only_rcode, only_queries, only_responses, only_dnssec_response, answer_count, only_qtype, only_qname, only_qname_suffix, geoloc_notfound, asn_notfound, dnstap_msg_type, public_suffix_list, recorded_stream, xact_ttl_secs, xact_ttl_ms, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold"); } TEST_CASE("DNS config ttl", "[dns][config]") diff --git a/src/handlers/dns/v2/DnsStreamHandler.cpp b/src/handlers/dns/v2/DnsStreamHandler.cpp index cac84b574..044d6afba 100644 --- a/src/handlers/dns/v2/DnsStreamHandler.cpp +++ b/src/handlers/dns/v2/DnsStreamHandler.cpp @@ -197,9 +197,12 @@ void DnsStreamHandler::start() _metrics->set_recorded_stream(); } - if (config_exists("xact_ttl_secs")) { - auto ttl = config_get("xact_ttl_secs"); + if (config_exists("xact_ttl_ms")) { + auto ttl = config_get("xact_ttl_ms"); _metrics->set_xact_ttl(static_cast(ttl)); + } else if (config_exists("xact_ttl_secs")) { + auto ttl = config_get("xact_ttl_secs"); + _metrics->set_xact_ttl(static_cast(ttl) * 1000); } if (_pcap_proxy) { diff --git a/src/handlers/dns/v2/DnsStreamHandler.h b/src/handlers/dns/v2/DnsStreamHandler.h index 3ffb17137..a1e7ffb88 100644 --- a/src/handlers/dns/v2/DnsStreamHandler.h +++ b/src/handlers/dns/v2/DnsStreamHandler.h @@ -553,7 +553,9 @@ class DnsStreamHandler final : public visor::StreamMetricsHandler("num_periods", 1); DnsStreamHandler dns_handler{"dns-test", stream_proxy, &c}; dns_handler.config_set("invalid_config", true); - REQUIRE_THROWS_WITH(dns_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: exclude_noerror, only_rcode, only_dnssec_response, answer_count, only_qtype, only_qname, only_qname_suffix, geoloc_notfound, asn_notfound, dnstap_msg_type, public_suffix_list, recorded_stream, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold"); + REQUIRE_THROWS_WITH(dns_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: exclude_noerror, only_rcode, only_dnssec_response, answer_count, only_qtype, only_qname, only_qname_suffix, geoloc_notfound, asn_notfound, dnstap_msg_type, public_suffix_list, recorded_stream, xact_ttl_secs, xact_ttl_ms, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold"); } diff --git a/src/handlers/netprobe/NetProbeStreamHandler.cpp b/src/handlers/netprobe/NetProbeStreamHandler.cpp index 354677dcf..04c3d55e6 100644 --- a/src/handlers/netprobe/NetProbeStreamHandler.cpp +++ b/src/handlers/netprobe/NetProbeStreamHandler.cpp @@ -34,8 +34,14 @@ void NetProbeStreamHandler::start() _metrics->set_recorded_stream(); } - if (config_exists("xact_ttl_secs")) { + if (config_exists("xact_ttl_ms")) { + auto ttl = config_get("xact_ttl_ms"); + _metrics->set_xact_ttl(static_cast(ttl)); + } else if (config_exists("xact_ttl_secs")) { auto ttl = config_get("xact_ttl_secs"); + _metrics->set_xact_ttl(static_cast(ttl) * 1000); + } else if (_netprobe_proxy->config_exists("xact_ttl_ms")) { + auto ttl = _netprobe_proxy->config_get("xact_ttl_ms"); _metrics->set_xact_ttl(static_cast(ttl)); } @@ -114,6 +120,7 @@ void NetProbeMetricsBucket::specialized_merge(const AbstractMetricsBucket &o, Me _targets_metrics[targetId]->attempts += target.second->attempts; _targets_metrics[targetId]->successes += target.second->successes; _targets_metrics[targetId]->dns_failures += target.second->dns_failures; + _targets_metrics[targetId]->timed_out += target.second->timed_out; } if (group_enabled(group::NetProbeMetrics::Histograms)) { _targets_metrics[targetId]->h_time_us.merge(target.second->h_time_us); @@ -137,6 +144,7 @@ void NetProbeMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelM target.second->attempts.to_prometheus(out, target_labels); target.second->successes.to_prometheus(out, target_labels); target.second->dns_failures.to_prometheus(out, target_labels); + target.second->timed_out.to_prometheus(out, target_labels); } bool h_max_min{true}; @@ -182,9 +190,9 @@ void NetProbeMetricsBucket::to_opentelemetry(metrics::v1::ScopeMetrics &scope, M { auto start_ts = start_tstamp(); auto end_ts = end_tstamp(); - + std::shared_lock r_lock(_mutex); - + for (const auto &target : _targets_metrics) { auto target_labels = add_labels; auto targetId = target.first; @@ -194,6 +202,7 @@ void NetProbeMetricsBucket::to_opentelemetry(metrics::v1::ScopeMetrics &scope, M target.second->attempts.to_opentelemetry(scope, start_ts, end_ts, target_labels); target.second->successes.to_opentelemetry(scope, start_ts, end_ts, target_labels); target.second->dns_failures.to_opentelemetry(scope, start_ts, end_ts, target_labels); + target.second->timed_out.to_opentelemetry(scope, start_ts, end_ts, target_labels); } bool h_max_min{true}; @@ -247,6 +256,7 @@ void NetProbeMetricsBucket::to_json(json &j) const target.second->attempts.to_json(j["targets"][targetId]); target.second->successes.to_json(j["targets"][targetId]); target.second->dns_failures.to_json(j["targets"][targetId]); + target.second->timed_out.to_json(j["targets"][targetId]); } bool h_max_min{true}; @@ -303,6 +313,7 @@ void NetProbeMetricsBucket::process_failure(ErrorType error, const std::string & ++_targets_metrics[target]->dns_failures; break; case ErrorType::Timeout: + ++_targets_metrics[target]->timed_out; case ErrorType::SocketError: case ErrorType::InvalidIp: case ErrorType::ConnectionFailure: @@ -374,6 +385,8 @@ void NetProbeMetricsManager::process_netprobe_icmp(pcpp::IcmpLayer *layer, const auto xact = _request_reply_manager->maybe_end_transaction((static_cast(reply->header->id) << 16) | reply->header->sequence, stamp); if (xact.first == Result::Valid) { live_bucket()->new_transaction(_deep_sampling_now, xact.second); + } else if (xact.first == Result::TimedOut) { + live_bucket()->process_failure(ErrorType::Timeout, xact.second.target); } } } @@ -391,6 +404,8 @@ void NetProbeMetricsManager::process_netprobe_tcp(uint32_t port, bool send, cons auto xact = _request_reply_manager->maybe_end_transaction(port, stamp); if (xact.first == Result::Valid) { live_bucket()->new_transaction(_deep_sampling_now, xact.second); + } else if (xact.first == Result::TimedOut) { + live_bucket()->process_failure(ErrorType::Timeout, xact.second.target); } } } diff --git a/src/handlers/netprobe/NetProbeStreamHandler.h b/src/handlers/netprobe/NetProbeStreamHandler.h index 5951640b6..42fbf33b0 100644 --- a/src/handlers/netprobe/NetProbeStreamHandler.h +++ b/src/handlers/netprobe/NetProbeStreamHandler.h @@ -50,6 +50,7 @@ struct Target { Counter minimum; Counter maximum; Counter dns_failures; + Counter timed_out; Target() : q_time_us(NET_PROBE_SCHEMA, {"response_quantiles_us"}, "Net Probe quantile in microseconds") @@ -59,6 +60,7 @@ struct Target { , minimum(NET_PROBE_SCHEMA, {"response_min_us"}, "Minimum response time measured in the reporting interval") , maximum(NET_PROBE_SCHEMA, {"response_max_us"}, "Maximum response time measured in the reporting interval") , dns_failures(NET_PROBE_SCHEMA, {"dns_lookup_failures"}, "Total Net Probe failures when performed DNS lookup") + , timed_out(NET_PROBE_SCHEMA, {"packets_timeout"}, "Total Net Probe timeout transactions") { } }; @@ -106,10 +108,10 @@ class NetProbeMetricsManager final : public visor::AbstractMetricsManagerpurge_old_transactions(stamp); + // Clear all old transactions + _request_reply_manager->clear(); } void set_xact_ttl(uint32_t ttl) @@ -135,7 +137,8 @@ class NetProbeStreamHandler final : public visor::StreamMetricsHandler NetProbeInputStream::create_event_proxy(const Configurable &filter) { - return std::make_unique(_name, filter); + auto custom_filter = filter; + custom_filter.config_set("xact_ttl_ms", _timeout_msec); + return std::make_unique(_name, custom_filter); } } diff --git a/src/inputs/netprobe/PingProbe.cpp b/src/inputs/netprobe/PingProbe.cpp index d36fc206e..ba1d75a20 100644 --- a/src/inputs/netprobe/PingProbe.cpp +++ b/src/inputs/netprobe/PingProbe.cpp @@ -152,20 +152,6 @@ bool PingProbe::start(std::shared_ptr io_loop) _interval_timer->on([this](const auto &, auto &) { _internal_sequence = 0; - _timeout_timer = _io_loop->resource(); - if (!_timeout_timer) { - throw NetProbeException("PingProbe - unable to initialize timeout TimerHandle"); - } - - _timeout_timer->on([this](const auto &, auto &) { - _internal_sequence = _config.packets_per_test; - _fail(ErrorType::Timeout, TestType::Ping, _name); - if (_internal_timer) { - _internal_timer->stop(); - } - _interval_timer->again(); - }); - if (auto error = _create_socket(); error.has_value()) { _fail(error.value(), TestType::Ping, _name); return; @@ -180,8 +166,6 @@ bool PingProbe::start(std::shared_ptr io_loop) _internal_timer->on([this](const auto &, auto &) { if (_internal_sequence < static_cast(_config.packets_per_test)) { _internal_sequence++; - _timeout_timer->stop(); - _timeout_timer->start(uvw::TimerHandle::Time{_config.timeout_msec}, uvw::TimerHandle::Time{0}); _send_icmp_v4(_internal_sequence); } }); @@ -189,7 +173,6 @@ bool PingProbe::start(std::shared_ptr io_loop) (_sequence == UCHAR_MAX) ? _sequence = 0 : _sequence++; _send_icmp_v4(_internal_sequence); _internal_sequence++; - _timeout_timer->start(uvw::TimerHandle::Time{_config.timeout_msec}, uvw::TimerHandle::Time{0}); _internal_timer->start(uvw::TimerHandle::Time{_config.packets_interval_msec}, uvw::TimerHandle::Time{_config.packets_interval_msec}); }); diff --git a/src/inputs/netprobe/PingProbe.h b/src/inputs/netprobe/PingProbe.h index 5842309e7..01288ad46 100644 --- a/src/inputs/netprobe/PingProbe.h +++ b/src/inputs/netprobe/PingProbe.h @@ -99,7 +99,6 @@ class PingProbe final : public NetProbe uint8_t _internal_sequence{0}; std::shared_ptr _interval_timer; std::shared_ptr _internal_timer; - std::shared_ptr _timeout_timer; std::shared_ptr _recv_handler; SOCKETLEN _sin_length{0}; std::vector _payload_array;