From 65d51200aff3deaefe9c3df7af995d2df458f98e Mon Sep 17 00:00:00 2001 From: Leonardo Parente Date: Wed, 7 Dec 2022 10:08:49 -0400 Subject: [PATCH 1/2] Make xact timeout configurable --- libs/visor_transaction/TransactionManager.h | 4 +-- src/handlers/dhcp/DhcpStreamHandler.cpp | 4 +-- src/handlers/dhcp/DhcpStreamHandler.h | 13 ++++++-- src/handlers/dns/v1/DnsStreamHandler.cpp | 4 +-- src/handlers/dns/v1/DnsStreamHandler.h | 15 +++++++--- src/handlers/dns/v1/tests/test_dns_layer.cpp | 15 +++++++++- src/handlers/dns/v2/DnsStreamHandler.cpp | 12 ++++---- src/handlers/dns/v2/DnsStreamHandler.h | 30 +++++++++++++++---- .../netprobe/NetProbeStreamHandler.cpp | 8 ++--- src/handlers/netprobe/NetProbeStreamHandler.h | 13 ++++++-- 10 files changed, 85 insertions(+), 33 deletions(-) diff --git a/libs/visor_transaction/TransactionManager.h b/libs/visor_transaction/TransactionManager.h index 26a29ce2b..40bd61a49 100644 --- a/libs/visor_transaction/TransactionManager.h +++ b/libs/visor_transaction/TransactionManager.h @@ -53,11 +53,11 @@ class TransactionManager static_assert(std::is_base_of::value, "TransactionType must inherit from Transaction structure"); typedef robin_hood::unordered_map XactMap; - unsigned int _ttl_secs; + uint32_t _ttl_secs; XactMap _transactions; public: - TransactionManager(unsigned int ttl_secs = 5) + TransactionManager(uint32_t ttl_secs = 5) : _ttl_secs(ttl_secs) { } diff --git a/src/handlers/dhcp/DhcpStreamHandler.cpp b/src/handlers/dhcp/DhcpStreamHandler.cpp index 09fa5c37e..3de6af6bb 100644 --- a/src/handlers/dhcp/DhcpStreamHandler.cpp +++ b/src/handlers/dhcp/DhcpStreamHandler.cpp @@ -291,9 +291,9 @@ void DhcpMetricsManager::process_dhcp_layer(pcpp::DhcpLayer *dhcp, pcpp::Packet hostname = option.getValueAsString(); } auto mac_address = dhcp->getClientHardwareAddress().toString(); - _request_ack_manager.start_transaction(dhcp->getDhcpHeader()->transactionID, {{stamp, {0, 0}}, hostname, mac_address}); + _request_ack_manager->start_transaction(dhcp->getDhcpHeader()->transactionID, {{stamp, {0, 0}}, hostname, mac_address}); } else if (type == pcpp::DHCP_ACK) { - auto xact = _request_ack_manager.maybe_end_transaction(dhcp->getDhcpHeader()->transactionID, stamp); + auto xact = _request_ack_manager->maybe_end_transaction(dhcp->getDhcpHeader()->transactionID, stamp); if (xact.first == Result::Valid) { live_bucket()->new_dhcp_transaction(_deep_sampling_now, dhcp, xact.second); } diff --git a/src/handlers/dhcp/DhcpStreamHandler.h b/src/handlers/dhcp/DhcpStreamHandler.h index 1701259a8..5b5b67624 100644 --- a/src/handlers/dhcp/DhcpStreamHandler.h +++ b/src/handlers/dhcp/DhcpStreamHandler.h @@ -118,18 +118,24 @@ class DhcpMetricsBucket final : public visor::AbstractMetricsBucket class DhcpMetricsManager final : public visor::AbstractMetricsManager { - TransactionManager> _request_ack_manager; + typedef TransactionManager> DhcpTransactionManager; + std::unique_ptr _request_ack_manager; public: DhcpMetricsManager(const Configurable *window_config) : visor::AbstractMetricsManager(window_config) { + if (window_config->config_exists("xact_ttl_secs")) { + _request_ack_manager = std::make_unique(static_cast(window_config->config_get("xact_ttl_secs"))); + } else { + _request_ack_manager = std::make_unique(); + } } void on_period_shift(timespec stamp, [[maybe_unused]] const DhcpMetricsBucket *maybe_expiring_bucket) override { // Dhcp transaction support - _request_ack_manager.purge_old_transactions(stamp); + _request_ack_manager->purge_old_transactions(stamp); } void process_filtered(timespec stamp); @@ -149,7 +155,8 @@ class DhcpStreamHandler final : public visor::StreamMetricsHandlerqueryOrResponse == QR::response) { - auto xact = _qr_pair_manager.maybe_end_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), stamp); + auto xact = _qr_pair_manager->maybe_end_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), stamp); if (xact.first == Result::Valid) { live_bucket()->new_dns_transaction(_deep_sampling_now, _to90th, _from90th, payload, dir, xact.second); } else if (xact.first == Result::TimedOut) { live_bucket()->inc_xact_timed_out(1); } } else { - _qr_pair_manager.start_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), {{stamp, {0, 0}}, payload.getDataLen()}); + _qr_pair_manager->start_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), {{stamp, {0, 0}}, payload.getDataLen()}); } } } diff --git a/src/handlers/dns/v1/DnsStreamHandler.h b/src/handlers/dns/v1/DnsStreamHandler.h index 113c7db31..f103614ce 100644 --- a/src/handlers/dns/v1/DnsStreamHandler.h +++ b/src/handlers/dns/v1/DnsStreamHandler.h @@ -230,7 +230,8 @@ class DnsMetricsBucket final : public visor::AbstractMetricsBucket class DnsMetricsManager final : public visor::AbstractMetricsManager { using DnsXactID = std::pair; - visor::lib::transaction::TransactionManager _qr_pair_manager; + typedef lib::transaction::TransactionManager DnsTransactionManager; + std::unique_ptr _qr_pair_manager; float _to90th{0.0}; float _from90th{0.0}; @@ -238,12 +239,17 @@ class DnsMetricsManager final : public visor::AbstractMetricsManager(window_config) { + if (window_config->config_exists("xact_ttl_secs")) { + _qr_pair_manager = std::make_unique(static_cast(window_config->config_get("xact_ttl_secs"))); + } else { + _qr_pair_manager = std::make_unique(); + } } void on_period_shift(timespec stamp, [[maybe_unused]] const DnsMetricsBucket *maybe_expiring_bucket) override { // DNS transaction support - auto timed_out = _qr_pair_manager.purge_old_transactions(stamp); + auto timed_out = _qr_pair_manager->purge_old_transactions(stamp); if (timed_out) { live_bucket()->inc_xact_timed_out(timed_out); } @@ -259,7 +265,7 @@ class DnsMetricsManager final : public visor::AbstractMetricsManageropen_transaction_count(); } void process_filtered(timespec stamp); @@ -372,7 +378,8 @@ class DnsStreamHandler final : public visor::StreamMetricsHandler("num_periods", 1); DnsStreamHandler dns_handler{"dns-test", stream_proxy, &c}; dns_handler.config_set("invalid_config", true); - REQUIRE_THROWS_WITH(dns_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: exclude_noerror, only_rcode, only_queries, only_responses, only_dnssec_response, answer_count, only_qtype, only_qname_suffix, geoloc_notfound, asn_notfound, dnstap_msg_type, public_suffix_list, recorded_stream, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold"); + REQUIRE_THROWS_WITH(dns_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: exclude_noerror, only_rcode, only_queries, only_responses, only_dnssec_response, answer_count, only_qtype, only_qname_suffix, geoloc_notfound, asn_notfound, dnstap_msg_type, public_suffix_list, recorded_stream, xact_ttl_secs, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold"); +} + +TEST_CASE("DNS config ttl", "[dns][config]") +{ + PcapInputStream stream{"pcap-test"}; + stream.config_set("pcap_file", "tests/fixtures/dns_udp_tcp_random.pcap"); + + visor::Config c; + auto stream_proxy = stream.add_event_proxy(c); + c.config_set("num_periods", 1); + c.config_set("xact_ttl_secs", 2); + DnsStreamHandler dns_handler{"dns-test", stream_proxy, &c}; + REQUIRE_NOTHROW(dns_handler.start()); } TEST_CASE("DNS Filters: only_rcode with predicate", "[pcap][dns][filter]") diff --git a/src/handlers/dns/v2/DnsStreamHandler.cpp b/src/handlers/dns/v2/DnsStreamHandler.cpp index 11ea34bf9..cd2127804 100644 --- a/src/handlers/dns/v2/DnsStreamHandler.cpp +++ b/src/handlers/dns/v2/DnsStreamHandler.cpp @@ -917,7 +917,7 @@ void DnsMetricsManager::process_dns_layer(DnsLayer &payload, PacketDirection dir } else if (dir == PacketDirection::fromHost) { xact_dir = TransactionDirection::in; } - auto xact = _pair_manager[xact_dir].xact_map.maybe_end_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), stamp); + auto xact = _pair_manager[xact_dir].xact_map->maybe_end_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), stamp); live_bucket()->dir_setup(xact_dir); if (xact.first == Result::Valid && !xact.second.filtered) { live_bucket()->new_dns_transaction(_deep_sampling_now, _pair_manager[xact_dir].per_90th, payload, xact_dir, xact.second, l3, static_cast(l4), port, suffix_size); @@ -946,7 +946,7 @@ void DnsMetricsManager::process_dns_layer(DnsLayer &payload, PacketDirection dir subnet = ecs->client_subnet; } } - _pair_manager[xact_dir].xact_map.start_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), + _pair_manager[xact_dir].xact_map->start_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), {{stamp, {0, 0}}, false, payload.getDataLen(), static_cast(payload.getDnsHeader()->checkingDisabled), subnet}); } } @@ -964,7 +964,7 @@ void DnsMetricsManager::process_filtered(timespec stamp, DnsLayer &payload, Pack } else if (dir == PacketDirection::fromHost) { xact_dir = TransactionDirection::in; } - auto xact = _pair_manager[xact_dir].xact_map.maybe_end_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), stamp); + auto xact = _pair_manager[xact_dir].xact_map->maybe_end_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), stamp); live_bucket()->dir_setup(xact_dir); if (xact.first == Result::Valid && !xact.second.filtered) { live_bucket()->process_filtered(); @@ -975,7 +975,7 @@ void DnsMetricsManager::process_filtered(timespec stamp, DnsLayer &payload, Pack } else if (dir == PacketDirection::fromHost) { xact_dir = TransactionDirection::out; } - _pair_manager[xact_dir].xact_map.start_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), {{stamp, {0, 0}}, true, 0, false, std::string()}); + _pair_manager[xact_dir].xact_map->start_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), {{stamp, {0, 0}}, true, 0, false, std::string()}); } live_bucket()->process_filtered(); } @@ -1066,7 +1066,7 @@ void DnsMetricsManager::process_dnstap(const dnstap::Dnstap &payload, bool filte uint8_t *buf = new uint8_t[query.size()]; std::memcpy(buf, query.c_str(), query.size()); DnsLayer dpayload(buf, query.size(), nullptr, nullptr); - auto xact = _pair_manager[xact_dir].xact_map.maybe_end_transaction(DnsXactID(dpayload.getDnsHeader()->transactionID, 2), stamp); + auto xact = _pair_manager[xact_dir].xact_map->maybe_end_transaction(DnsXactID(dpayload.getDnsHeader()->transactionID, 2), stamp); live_bucket()->dir_setup(xact_dir); if (xact.first == Result::Valid) { // process in the "live" bucket. this will parse the resources if we are deep sampling @@ -1081,7 +1081,7 @@ void DnsMetricsManager::process_dnstap(const dnstap::Dnstap &payload, bool filte uint8_t *buf = new uint8_t[query.size()]; std::memcpy(buf, query.c_str(), query.size()); DnsLayer dpayload(buf, query.size(), nullptr, nullptr); - _pair_manager[xact_dir].xact_map.start_transaction(DnsXactID(dpayload.getDnsHeader()->transactionID, 2), {{stamp, {0, 0}}, false, payload.message().query_message().size(), false, std::string()}); + _pair_manager[xact_dir].xact_map->start_transaction(DnsXactID(dpayload.getDnsHeader()->transactionID, 2), {{stamp, {0, 0}}, false, payload.message().query_message().size(), false, std::string()}); } } } diff --git a/src/handlers/dns/v2/DnsStreamHandler.h b/src/handlers/dns/v2/DnsStreamHandler.h index a5c067aed..f09d86eac 100644 --- a/src/handlers/dns/v2/DnsStreamHandler.h +++ b/src/handlers/dns/v2/DnsStreamHandler.h @@ -355,25 +355,43 @@ class DnsMetricsBucket final : public visor::AbstractMetricsBucket class DnsMetricsManager final : public visor::AbstractMetricsManager { using DnsXactID = std::pair; + typedef TransactionManager DnsTransactionManager; struct DirTransaction { - TransactionManager xact_map; + std::unique_ptr xact_map; float per_90th{0.0}; + + DirTransaction() + : xact_map(std::make_unique()) + { + } + DirTransaction(uint32_t ttl) + : xact_map(std::make_unique(ttl)) + { + } }; - std::map _pair_manager = {{TransactionDirection::in, DirTransaction()}, - {TransactionDirection::out, DirTransaction()}, - {TransactionDirection::unknown, DirTransaction()}}; + std::map _pair_manager; public: DnsMetricsManager(const Configurable *window_config) : visor::AbstractMetricsManager(window_config) { + if (window_config->config_exists("xact_ttl_secs")) { + auto ttl = static_cast(window_config->config_get("xact_ttl_secs")); + _pair_manager[TransactionDirection::in] = DirTransaction(ttl); + _pair_manager[TransactionDirection::out] = DirTransaction(ttl); + _pair_manager[TransactionDirection::unknown] = DirTransaction(ttl); + } else { + _pair_manager[TransactionDirection::in] = DirTransaction(); + _pair_manager[TransactionDirection::out] = DirTransaction(); + _pair_manager[TransactionDirection::unknown] = DirTransaction(); + } } void on_period_shift(timespec stamp, [[maybe_unused]] const DnsMetricsBucket *maybe_expiring_bucket) override { // DNS transaction support for (auto &manager : _pair_manager) { - if (auto timed_out = manager.second.xact_map.purge_old_transactions(stamp); timed_out && live_bucket()->has_dir(manager.first)) { + if (auto timed_out = manager.second.xact_map->purge_old_transactions(stamp); timed_out && live_bucket()->has_dir(manager.first)) { live_bucket()->inc_xact_timed_out(timed_out, manager.first); } if (bucket(1)->has_dir(manager.first)) { @@ -388,7 +406,7 @@ class DnsMetricsManager final : public visor::AbstractMetricsManageropen_transaction_count(); } return count; } diff --git a/src/handlers/netprobe/NetProbeStreamHandler.cpp b/src/handlers/netprobe/NetProbeStreamHandler.cpp index 6e338573d..e0a15b45a 100644 --- a/src/handlers/netprobe/NetProbeStreamHandler.cpp +++ b/src/handlers/netprobe/NetProbeStreamHandler.cpp @@ -304,12 +304,12 @@ void NetProbeMetricsManager::process_netprobe_icmp(pcpp::IcmpLayer *layer, const if (layer->getMessageType() == pcpp::ICMP_ECHO_REQUEST) { if (auto request = layer->getEchoRequestData(); request != nullptr) { - _request_reply_manager.start_transaction((static_cast(request->header->id) << 16) | request->header->sequence, {{stamp, {0, 0}}, target}); + _request_reply_manager->start_transaction((static_cast(request->header->id) << 16) | request->header->sequence, {{stamp, {0, 0}}, target}); } live_bucket()->process_attempts(_deep_sampling_now, target); } else if (layer->getMessageType() == pcpp::ICMP_ECHO_REPLY) { if (auto reply = layer->getEchoReplyData(); reply != nullptr) { - auto xact = _request_reply_manager.maybe_end_transaction((static_cast(reply->header->id) << 16) | reply->header->sequence, stamp); + auto xact = _request_reply_manager->maybe_end_transaction((static_cast(reply->header->id) << 16) | reply->header->sequence, stamp); if (xact.first == Result::Valid) { live_bucket()->new_transaction(_deep_sampling_now, xact.second); } @@ -323,10 +323,10 @@ void NetProbeMetricsManager::process_netprobe_tcp(uint32_t port, bool send, cons new_event(stamp); if (send) { - _request_reply_manager.start_transaction(port, {{stamp, {0, 0}}, target}); + _request_reply_manager->start_transaction(port, {{stamp, {0, 0}}, target}); live_bucket()->process_attempts(_deep_sampling_now, target); } else { - auto xact = _request_reply_manager.maybe_end_transaction(port, stamp); + auto xact = _request_reply_manager->maybe_end_transaction(port, stamp); if (xact.first == Result::Valid) { live_bucket()->new_transaction(_deep_sampling_now, xact.second); } diff --git a/src/handlers/netprobe/NetProbeStreamHandler.h b/src/handlers/netprobe/NetProbeStreamHandler.h index 78d1aabc6..99b717a13 100644 --- a/src/handlers/netprobe/NetProbeStreamHandler.h +++ b/src/handlers/netprobe/NetProbeStreamHandler.h @@ -95,18 +95,24 @@ class NetProbeMetricsBucket final : public visor::AbstractMetricsBucket class NetProbeMetricsManager final : public visor::AbstractMetricsManager { - TransactionManager> _request_reply_manager; + typedef TransactionManager> NetProbeTransactionManager; + std::unique_ptr _request_reply_manager; public: NetProbeMetricsManager(const Configurable *window_config) : visor::AbstractMetricsManager(window_config) { + if (window_config->config_exists("xact_ttl_secs")) { + _request_reply_manager = std::make_unique(static_cast(window_config->config_get("xact_ttl_secs"))); + } else { + _request_reply_manager = std::make_unique(); + } } void on_period_shift(timespec stamp, [[maybe_unused]] const NetProbeMetricsBucket *maybe_expiring_bucket) override { // NetProbe transaction support - _request_reply_manager.purge_old_transactions(stamp); + _request_reply_manager->purge_old_transactions(stamp); } void process_filtered(timespec stamp); @@ -126,7 +132,8 @@ class NetProbeStreamHandler final : public visor::StreamMetricsHandler Date: Thu, 15 Dec 2022 14:10:50 -0400 Subject: [PATCH 2/2] Move ttl logic to StreamHandler start() --- src/handlers/dhcp/DhcpStreamHandler.cpp | 5 +++++ src/handlers/dhcp/DhcpStreamHandler.h | 11 +++++----- src/handlers/dns/v1/DnsStreamHandler.cpp | 5 +++++ src/handlers/dns/v1/DnsStreamHandler.h | 11 +++++----- src/handlers/dns/v2/DnsStreamHandler.cpp | 5 +++++ src/handlers/dns/v2/DnsStreamHandler.h | 20 +++++++++---------- .../netprobe/NetProbeStreamHandler.cpp | 5 +++++ src/handlers/netprobe/NetProbeStreamHandler.h | 11 +++++----- 8 files changed, 48 insertions(+), 25 deletions(-) diff --git a/src/handlers/dhcp/DhcpStreamHandler.cpp b/src/handlers/dhcp/DhcpStreamHandler.cpp index 3de6af6bb..0e513a25c 100644 --- a/src/handlers/dhcp/DhcpStreamHandler.cpp +++ b/src/handlers/dhcp/DhcpStreamHandler.cpp @@ -50,6 +50,11 @@ void DhcpStreamHandler::start() _metrics->set_recorded_stream(); } + if (config_exists("xact_ttl_secs")) { + auto ttl = config_get("xact_ttl_secs"); + _metrics->set_xact_ttl(static_cast(ttl)); + } + if (_pcap_proxy) { _pkt_udp_connection = _pcap_proxy->udp_signal.connect(&DhcpStreamHandler::process_udp_packet_cb, this); _start_tstamp_connection = _pcap_proxy->start_tstamp_signal.connect(&DhcpStreamHandler::set_start_tstamp, this); diff --git a/src/handlers/dhcp/DhcpStreamHandler.h b/src/handlers/dhcp/DhcpStreamHandler.h index 5b5b67624..19f420221 100644 --- a/src/handlers/dhcp/DhcpStreamHandler.h +++ b/src/handlers/dhcp/DhcpStreamHandler.h @@ -124,12 +124,8 @@ class DhcpMetricsManager final : public visor::AbstractMetricsManager(window_config) + , _request_ack_manager(std::make_unique()) { - if (window_config->config_exists("xact_ttl_secs")) { - _request_ack_manager = std::make_unique(static_cast(window_config->config_get("xact_ttl_secs"))); - } else { - _request_ack_manager = std::make_unique(); - } } void on_period_shift(timespec stamp, [[maybe_unused]] const DhcpMetricsBucket *maybe_expiring_bucket) override @@ -138,6 +134,11 @@ class DhcpMetricsManager final : public visor::AbstractMetricsManagerpurge_old_transactions(stamp); } + void set_xact_ttl(uint32_t ttl) + { + _request_ack_manager = std::make_unique(ttl); + } + void process_filtered(timespec stamp); void process_dhcp_layer(pcpp::DhcpLayer *dhcp, pcpp::Packet *payload, timespec stamp); void process_dhcp_v6_layer(pcpp::DhcpV6Layer *dhcp, pcpp::Packet *payload, timespec stamp); diff --git a/src/handlers/dns/v1/DnsStreamHandler.cpp b/src/handlers/dns/v1/DnsStreamHandler.cpp index bf37d2338..beddfb78f 100644 --- a/src/handlers/dns/v1/DnsStreamHandler.cpp +++ b/src/handlers/dns/v1/DnsStreamHandler.cpp @@ -160,6 +160,11 @@ void DnsStreamHandler::start() _metrics->set_recorded_stream(); } + if (config_exists("xact_ttl_secs")) { + auto ttl = config_get("xact_ttl_secs"); + _metrics->set_xact_ttl(static_cast(ttl)); + } + if (_pcap_proxy) { if (!_using_predicate_signals) { _pkt_udp_connection = _pcap_proxy->udp_signal.connect(&DnsStreamHandler::process_udp_packet_cb, this); diff --git a/src/handlers/dns/v1/DnsStreamHandler.h b/src/handlers/dns/v1/DnsStreamHandler.h index f103614ce..f537b94a6 100644 --- a/src/handlers/dns/v1/DnsStreamHandler.h +++ b/src/handlers/dns/v1/DnsStreamHandler.h @@ -238,12 +238,8 @@ class DnsMetricsManager final : public visor::AbstractMetricsManager(window_config) + , _qr_pair_manager(std::make_unique()) { - if (window_config->config_exists("xact_ttl_secs")) { - _qr_pair_manager = std::make_unique(static_cast(window_config->config_get("xact_ttl_secs"))); - } else { - _qr_pair_manager = std::make_unique(); - } } void on_period_shift(timespec stamp, [[maybe_unused]] const DnsMetricsBucket *maybe_expiring_bucket) override @@ -268,6 +264,11 @@ class DnsMetricsManager final : public visor::AbstractMetricsManageropen_transaction_count(); } + void set_xact_ttl(uint32_t ttl) + { + _qr_pair_manager = std::make_unique(ttl); + } + void process_filtered(timespec stamp); void process_dns_layer(DnsLayer &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, uint32_t flowkey, uint16_t port, size_t suffix_size, timespec stamp); void process_dnstap(const dnstap::Dnstap &payload, bool filtered); diff --git a/src/handlers/dns/v2/DnsStreamHandler.cpp b/src/handlers/dns/v2/DnsStreamHandler.cpp index cd2127804..a4dd2e0d1 100644 --- a/src/handlers/dns/v2/DnsStreamHandler.cpp +++ b/src/handlers/dns/v2/DnsStreamHandler.cpp @@ -170,6 +170,11 @@ void DnsStreamHandler::start() _metrics->set_recorded_stream(); } + if (config_exists("xact_ttl_secs")) { + auto ttl = config_get("xact_ttl_secs"); + _metrics->set_xact_ttl(static_cast(ttl)); + } + if (_pcap_proxy) { _pkt_udp_connection = _pcap_proxy->udp_signal.connect(&DnsStreamHandler::process_udp_packet_cb, this); _start_tstamp_connection = _pcap_proxy->start_tstamp_signal.connect([this](timespec stamp) { diff --git a/src/handlers/dns/v2/DnsStreamHandler.h b/src/handlers/dns/v2/DnsStreamHandler.h index f09d86eac..3f49a733e 100644 --- a/src/handlers/dns/v2/DnsStreamHandler.h +++ b/src/handlers/dns/v2/DnsStreamHandler.h @@ -375,16 +375,9 @@ class DnsMetricsManager final : public visor::AbstractMetricsManager(window_config) { - if (window_config->config_exists("xact_ttl_secs")) { - auto ttl = static_cast(window_config->config_get("xact_ttl_secs")); - _pair_manager[TransactionDirection::in] = DirTransaction(ttl); - _pair_manager[TransactionDirection::out] = DirTransaction(ttl); - _pair_manager[TransactionDirection::unknown] = DirTransaction(ttl); - } else { - _pair_manager[TransactionDirection::in] = DirTransaction(); - _pair_manager[TransactionDirection::out] = DirTransaction(); - _pair_manager[TransactionDirection::unknown] = DirTransaction(); - } + _pair_manager[TransactionDirection::in] = DirTransaction(); + _pair_manager[TransactionDirection::out] = DirTransaction(); + _pair_manager[TransactionDirection::unknown] = DirTransaction(); } void on_period_shift(timespec stamp, [[maybe_unused]] const DnsMetricsBucket *maybe_expiring_bucket) override @@ -411,6 +404,13 @@ class DnsMetricsManager final : public visor::AbstractMetricsManagerset_recorded_stream(); } + if (config_exists("xact_ttl_secs")) { + auto ttl = config_get("xact_ttl_secs"); + _metrics->set_xact_ttl(static_cast(ttl)); + } + if (_netprobe_proxy) { _probe_send_connection = _netprobe_proxy->probe_send_signal.connect(&NetProbeStreamHandler::probe_signal_send, this); _probe_recv_connection = _netprobe_proxy->probe_recv_signal.connect(&NetProbeStreamHandler::probe_signal_recv, this); diff --git a/src/handlers/netprobe/NetProbeStreamHandler.h b/src/handlers/netprobe/NetProbeStreamHandler.h index 99b717a13..4e5ce9efd 100644 --- a/src/handlers/netprobe/NetProbeStreamHandler.h +++ b/src/handlers/netprobe/NetProbeStreamHandler.h @@ -101,12 +101,8 @@ class NetProbeMetricsManager final : public visor::AbstractMetricsManager(window_config) + , _request_reply_manager(std::make_unique()) { - if (window_config->config_exists("xact_ttl_secs")) { - _request_reply_manager = std::make_unique(static_cast(window_config->config_get("xact_ttl_secs"))); - } else { - _request_reply_manager = std::make_unique(); - } } void on_period_shift(timespec stamp, [[maybe_unused]] const NetProbeMetricsBucket *maybe_expiring_bucket) override @@ -115,6 +111,11 @@ class NetProbeMetricsManager final : public visor::AbstractMetricsManagerpurge_old_transactions(stamp); } + void set_xact_ttl(uint32_t ttl) + { + _request_reply_manager = std::make_unique(ttl); + } + void process_filtered(timespec stamp); void process_failure(ErrorType error, const std::string &target); void process_netprobe_icmp(pcpp::IcmpLayer *layer, const std::string &target, timespec stamp);