Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions libs/visor_transaction/TransactionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ class TransactionManager
static_assert(std::is_base_of<Transaction, TransactionType>::value, "TransactionType must inherit from Transaction structure");
typedef robin_hood::unordered_map<XactID, TransactionType, Hash> XactMap;

unsigned int _ttl_secs;
uint32_t _ttl_secs;
XactMap _transactions;

public:
TransactionManager(unsigned int ttl_secs = 5)
TransactionManager(uint32_t ttl_secs = 5)
: _ttl_secs(ttl_secs)
{
}
Expand Down
9 changes: 7 additions & 2 deletions src/handlers/dhcp/DhcpStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ void DhcpStreamHandler::start()
_metrics->set_recorded_stream();
}

if (config_exists("xact_ttl_secs")) {
auto ttl = config_get<uint64_t>("xact_ttl_secs");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl));
}

if (_pcap_proxy) {
_pkt_udp_connection = _pcap_proxy->udp_signal.connect(&DhcpStreamHandler::process_udp_packet_cb, this);
_start_tstamp_connection = _pcap_proxy->start_tstamp_signal.connect(&DhcpStreamHandler::set_start_tstamp, this);
Expand Down Expand Up @@ -291,9 +296,9 @@ void DhcpMetricsManager::process_dhcp_layer(pcpp::DhcpLayer *dhcp, pcpp::Packet
hostname = option.getValueAsString();
}
auto mac_address = dhcp->getClientHardwareAddress().toString();
_request_ack_manager.start_transaction(dhcp->getDhcpHeader()->transactionID, {{stamp, {0, 0}}, hostname, mac_address});
_request_ack_manager->start_transaction(dhcp->getDhcpHeader()->transactionID, {{stamp, {0, 0}}, hostname, mac_address});
} else if (type == pcpp::DHCP_ACK) {
auto xact = _request_ack_manager.maybe_end_transaction(dhcp->getDhcpHeader()->transactionID, stamp);
auto xact = _request_ack_manager->maybe_end_transaction(dhcp->getDhcpHeader()->transactionID, stamp);
if (xact.first == Result::Valid) {
live_bucket()->new_dhcp_transaction(_deep_sampling_now, dhcp, xact.second);
}
Expand Down
14 changes: 11 additions & 3 deletions src/handlers/dhcp/DhcpStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,18 +118,25 @@ class DhcpMetricsBucket final : public visor::AbstractMetricsBucket

class DhcpMetricsManager final : public visor::AbstractMetricsManager<DhcpMetricsBucket>
{
TransactionManager<uint32_t, DhcpTransaction, std::hash<uint32_t>> _request_ack_manager;
typedef TransactionManager<uint32_t, DhcpTransaction, std::hash<uint32_t>> DhcpTransactionManager;
std::unique_ptr<DhcpTransactionManager> _request_ack_manager;

public:
DhcpMetricsManager(const Configurable *window_config)
: visor::AbstractMetricsManager<DhcpMetricsBucket>(window_config)
, _request_ack_manager(std::make_unique<DhcpTransactionManager>())
{
}

void on_period_shift(timespec stamp, [[maybe_unused]] const DhcpMetricsBucket *maybe_expiring_bucket) override
{
// Dhcp transaction support
_request_ack_manager.purge_old_transactions(stamp);
_request_ack_manager->purge_old_transactions(stamp);
}

void set_xact_ttl(uint32_t ttl)
{
_request_ack_manager = std::make_unique<DhcpTransactionManager>(ttl);
}

void process_filtered(timespec stamp);
Expand All @@ -149,7 +156,8 @@ class DhcpStreamHandler final : public visor::StreamMetricsHandler<DhcpMetricsMa
sigslot::connection _heartbeat_connection;

static const inline StreamMetricsHandler::ConfigsDefType _config_defs = {
"recorded_stream"};
"recorded_stream",
"xact_ttl_secs"};

void process_udp_packet_cb(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, uint32_t flowkey, timespec stamp);

Expand Down
9 changes: 7 additions & 2 deletions src/handlers/dns/v1/DnsStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@ void DnsStreamHandler::start()
_metrics->set_recorded_stream();
}

if (config_exists("xact_ttl_secs")) {
auto ttl = config_get<uint64_t>("xact_ttl_secs");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl));
}

if (_pcap_proxy) {
if (!_using_predicate_signals) {
_pkt_udp_connection = _pcap_proxy->udp_signal.connect(&DnsStreamHandler::process_udp_packet_cb, this);
Expand Down Expand Up @@ -1103,14 +1108,14 @@ void DnsMetricsManager::process_dns_layer(DnsLayer &payload, PacketDirection dir
if (group_enabled(group::DnsMetrics::DnsTransactions)) {
// handle dns transactions (query/response pairs)
if (payload.getDnsHeader()->queryOrResponse == QR::response) {
auto xact = _qr_pair_manager.maybe_end_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), stamp);
auto xact = _qr_pair_manager->maybe_end_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), stamp);
if (xact.first == Result::Valid) {
live_bucket()->new_dns_transaction(_deep_sampling_now, _to90th, _from90th, payload, dir, xact.second);
} else if (xact.first == Result::TimedOut) {
live_bucket()->inc_xact_timed_out(1);
}
} else {
_qr_pair_manager.start_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), {{stamp, {0, 0}}, payload.getDataLen()});
_qr_pair_manager->start_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), {{stamp, {0, 0}}, payload.getDataLen()});
}
}
}
Expand Down
16 changes: 12 additions & 4 deletions src/handlers/dns/v1/DnsStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,20 +230,22 @@ class DnsMetricsBucket final : public visor::AbstractMetricsBucket
class DnsMetricsManager final : public visor::AbstractMetricsManager<DnsMetricsBucket>
{
using DnsXactID = std::pair<uint32_t, uint16_t>;
visor::lib::transaction::TransactionManager<DnsXactID, DnsTransaction> _qr_pair_manager;
typedef lib::transaction::TransactionManager<DnsXactID, DnsTransaction> DnsTransactionManager;
std::unique_ptr<DnsTransactionManager> _qr_pair_manager;
float _to90th{0.0};
float _from90th{0.0};

public:
DnsMetricsManager(const Configurable *window_config)
: visor::AbstractMetricsManager<DnsMetricsBucket>(window_config)
, _qr_pair_manager(std::make_unique<DnsTransactionManager>())
{
}

void on_period_shift(timespec stamp, [[maybe_unused]] const DnsMetricsBucket *maybe_expiring_bucket) override
{
// DNS transaction support
auto timed_out = _qr_pair_manager.purge_old_transactions(stamp);
auto timed_out = _qr_pair_manager->purge_old_transactions(stamp);
if (timed_out) {
live_bucket()->inc_xact_timed_out(timed_out);
}
Expand All @@ -259,7 +261,12 @@ class DnsMetricsManager final : public visor::AbstractMetricsManager<DnsMetricsB

size_t num_open_transactions() const
{
return _qr_pair_manager.open_transaction_count();
return _qr_pair_manager->open_transaction_count();
}

void set_xact_ttl(uint32_t ttl)
{
_qr_pair_manager = std::make_unique<DnsTransactionManager>(ttl);
}

void process_filtered(timespec stamp);
Expand Down Expand Up @@ -372,7 +379,8 @@ class DnsStreamHandler final : public visor::StreamMetricsHandler<DnsMetricsMana
"asn_notfound",
"dnstap_msg_type",
"public_suffix_list",
"recorded_stream"};
"recorded_stream",
"xact_ttl_secs"};

static const inline StreamMetricsHandler::GroupDefType _group_defs = {
{"cardinality", group::DnsMetrics::Cardinality},
Expand Down
15 changes: 14 additions & 1 deletion src/handlers/dns/v1/tests/test_dns_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,20 @@ TEST_CASE("DNS invalid config", "[dns][filter][config]")
c.config_set<uint64_t>("num_periods", 1);
DnsStreamHandler dns_handler{"dns-test", stream_proxy, &c};
dns_handler.config_set<bool>("invalid_config", true);
REQUIRE_THROWS_WITH(dns_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: exclude_noerror, only_rcode, only_queries, only_responses, only_dnssec_response, answer_count, only_qtype, only_qname_suffix, geoloc_notfound, asn_notfound, dnstap_msg_type, public_suffix_list, recorded_stream, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold");
REQUIRE_THROWS_WITH(dns_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: exclude_noerror, only_rcode, only_queries, only_responses, only_dnssec_response, answer_count, only_qtype, only_qname_suffix, geoloc_notfound, asn_notfound, dnstap_msg_type, public_suffix_list, recorded_stream, xact_ttl_secs, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold");
}

TEST_CASE("DNS config ttl", "[dns][config]")
{
PcapInputStream stream{"pcap-test"};
stream.config_set("pcap_file", "tests/fixtures/dns_udp_tcp_random.pcap");

visor::Config c;
auto stream_proxy = stream.add_event_proxy(c);
c.config_set<uint64_t>("num_periods", 1);
c.config_set<uint64_t>("xact_ttl_secs", 2);
DnsStreamHandler dns_handler{"dns-test", stream_proxy, &c};
REQUIRE_NOTHROW(dns_handler.start());
}

TEST_CASE("DNS Filters: only_rcode with predicate", "[pcap][dns][filter]")
Expand Down
17 changes: 11 additions & 6 deletions src/handlers/dns/v2/DnsStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ void DnsStreamHandler::start()
_metrics->set_recorded_stream();
}

if (config_exists("xact_ttl_secs")) {
auto ttl = config_get<uint64_t>("xact_ttl_secs");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl));
}

if (_pcap_proxy) {
_pkt_udp_connection = _pcap_proxy->udp_signal.connect(&DnsStreamHandler::process_udp_packet_cb, this);
_start_tstamp_connection = _pcap_proxy->start_tstamp_signal.connect([this](timespec stamp) {
Expand Down Expand Up @@ -917,7 +922,7 @@ void DnsMetricsManager::process_dns_layer(DnsLayer &payload, PacketDirection dir
} else if (dir == PacketDirection::fromHost) {
xact_dir = TransactionDirection::in;
}
auto xact = _pair_manager[xact_dir].xact_map.maybe_end_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), stamp);
auto xact = _pair_manager[xact_dir].xact_map->maybe_end_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), stamp);
live_bucket()->dir_setup(xact_dir);
if (xact.first == Result::Valid && !xact.second.filtered) {
live_bucket()->new_dns_transaction(_deep_sampling_now, _pair_manager[xact_dir].per_90th, payload, xact_dir, xact.second, l3, static_cast<Protocol>(l4), port, suffix_size);
Expand Down Expand Up @@ -946,7 +951,7 @@ void DnsMetricsManager::process_dns_layer(DnsLayer &payload, PacketDirection dir
subnet = ecs->client_subnet;
}
}
_pair_manager[xact_dir].xact_map.start_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID),
_pair_manager[xact_dir].xact_map->start_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID),
{{stamp, {0, 0}}, false, payload.getDataLen(), static_cast<bool>(payload.getDnsHeader()->checkingDisabled), subnet});
}
}
Expand All @@ -964,7 +969,7 @@ void DnsMetricsManager::process_filtered(timespec stamp, DnsLayer &payload, Pack
} else if (dir == PacketDirection::fromHost) {
xact_dir = TransactionDirection::in;
}
auto xact = _pair_manager[xact_dir].xact_map.maybe_end_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), stamp);
auto xact = _pair_manager[xact_dir].xact_map->maybe_end_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), stamp);
live_bucket()->dir_setup(xact_dir);
if (xact.first == Result::Valid && !xact.second.filtered) {
live_bucket()->process_filtered();
Expand All @@ -975,7 +980,7 @@ void DnsMetricsManager::process_filtered(timespec stamp, DnsLayer &payload, Pack
} else if (dir == PacketDirection::fromHost) {
xact_dir = TransactionDirection::out;
}
_pair_manager[xact_dir].xact_map.start_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), {{stamp, {0, 0}}, true, 0, false, std::string()});
_pair_manager[xact_dir].xact_map->start_transaction(DnsXactID(flowkey, payload.getDnsHeader()->transactionID), {{stamp, {0, 0}}, true, 0, false, std::string()});
}
live_bucket()->process_filtered();
}
Expand Down Expand Up @@ -1066,7 +1071,7 @@ void DnsMetricsManager::process_dnstap(const dnstap::Dnstap &payload, bool filte
uint8_t *buf = new uint8_t[query.size()];
std::memcpy(buf, query.c_str(), query.size());
DnsLayer dpayload(buf, query.size(), nullptr, nullptr);
auto xact = _pair_manager[xact_dir].xact_map.maybe_end_transaction(DnsXactID(dpayload.getDnsHeader()->transactionID, 2), stamp);
auto xact = _pair_manager[xact_dir].xact_map->maybe_end_transaction(DnsXactID(dpayload.getDnsHeader()->transactionID, 2), stamp);
live_bucket()->dir_setup(xact_dir);
if (xact.first == Result::Valid) {
// process in the "live" bucket. this will parse the resources if we are deep sampling
Expand All @@ -1081,7 +1086,7 @@ void DnsMetricsManager::process_dnstap(const dnstap::Dnstap &payload, bool filte
uint8_t *buf = new uint8_t[query.size()];
std::memcpy(buf, query.c_str(), query.size());
DnsLayer dpayload(buf, query.size(), nullptr, nullptr);
_pair_manager[xact_dir].xact_map.start_transaction(DnsXactID(dpayload.getDnsHeader()->transactionID, 2), {{stamp, {0, 0}}, false, payload.message().query_message().size(), false, std::string()});
_pair_manager[xact_dir].xact_map->start_transaction(DnsXactID(dpayload.getDnsHeader()->transactionID, 2), {{stamp, {0, 0}}, false, payload.message().query_message().size(), false, std::string()});
}
}
}
30 changes: 24 additions & 6 deletions src/handlers/dns/v2/DnsStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,25 +355,36 @@ class DnsMetricsBucket final : public visor::AbstractMetricsBucket
class DnsMetricsManager final : public visor::AbstractMetricsManager<DnsMetricsBucket>
{
using DnsXactID = std::pair<uint32_t, uint16_t>;
typedef TransactionManager<DnsXactID, DnsTransaction> DnsTransactionManager;
struct DirTransaction {
TransactionManager<DnsXactID, DnsTransaction> xact_map;
std::unique_ptr<DnsTransactionManager> xact_map;
float per_90th{0.0};

DirTransaction()
: xact_map(std::make_unique<DnsTransactionManager>())
{
}
DirTransaction(uint32_t ttl)
: xact_map(std::make_unique<DnsTransactionManager>(ttl))
{
}
};
std::map<TransactionDirection, DirTransaction> _pair_manager = {{TransactionDirection::in, DirTransaction()},
{TransactionDirection::out, DirTransaction()},
{TransactionDirection::unknown, DirTransaction()}};
std::map<TransactionDirection, DirTransaction> _pair_manager;

public:
DnsMetricsManager(const Configurable *window_config)
: visor::AbstractMetricsManager<DnsMetricsBucket>(window_config)
{
_pair_manager[TransactionDirection::in] = DirTransaction();
_pair_manager[TransactionDirection::out] = DirTransaction();
_pair_manager[TransactionDirection::unknown] = DirTransaction();
}

void on_period_shift(timespec stamp, [[maybe_unused]] const DnsMetricsBucket *maybe_expiring_bucket) override
{
// DNS transaction support
for (auto &manager : _pair_manager) {
if (auto timed_out = manager.second.xact_map.purge_old_transactions(stamp); timed_out && live_bucket()->has_dir(manager.first)) {
if (auto timed_out = manager.second.xact_map->purge_old_transactions(stamp); timed_out && live_bucket()->has_dir(manager.first)) {
live_bucket()->inc_xact_timed_out(timed_out, manager.first);
}
if (bucket(1)->has_dir(manager.first)) {
Expand All @@ -388,11 +399,18 @@ class DnsMetricsManager final : public visor::AbstractMetricsManager<DnsMetricsB
{
size_t count{0};
for (const auto &manager : _pair_manager) {
count += manager.second.xact_map.open_transaction_count();
count += manager.second.xact_map->open_transaction_count();
}
return count;
}

void set_xact_ttl(uint32_t ttl)
{
_pair_manager[TransactionDirection::in] = DirTransaction(ttl);
_pair_manager[TransactionDirection::out] = DirTransaction(ttl);
_pair_manager[TransactionDirection::unknown] = DirTransaction(ttl);
}

void process_filtered(timespec stamp)
{
new_event(stamp, false);
Expand Down
13 changes: 9 additions & 4 deletions src/handlers/netprobe/NetProbeStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ void NetProbeStreamHandler::start()
_metrics->set_recorded_stream();
}

if (config_exists("xact_ttl_secs")) {
auto ttl = config_get<uint64_t>("xact_ttl_secs");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl));
}

if (_netprobe_proxy) {
_probe_send_connection = _netprobe_proxy->probe_send_signal.connect(&NetProbeStreamHandler::probe_signal_send, this);
_probe_recv_connection = _netprobe_proxy->probe_recv_signal.connect(&NetProbeStreamHandler::probe_signal_recv, this);
Expand Down Expand Up @@ -304,12 +309,12 @@ void NetProbeMetricsManager::process_netprobe_icmp(pcpp::IcmpLayer *layer, const

if (layer->getMessageType() == pcpp::ICMP_ECHO_REQUEST) {
if (auto request = layer->getEchoRequestData(); request != nullptr) {
_request_reply_manager.start_transaction((static_cast<uint32_t>(request->header->id) << 16) | request->header->sequence, {{stamp, {0, 0}}, target});
_request_reply_manager->start_transaction((static_cast<uint32_t>(request->header->id) << 16) | request->header->sequence, {{stamp, {0, 0}}, target});
}
live_bucket()->process_attempts(_deep_sampling_now, target);
} else if (layer->getMessageType() == pcpp::ICMP_ECHO_REPLY) {
if (auto reply = layer->getEchoReplyData(); reply != nullptr) {
auto xact = _request_reply_manager.maybe_end_transaction((static_cast<uint32_t>(reply->header->id) << 16) | reply->header->sequence, stamp);
auto xact = _request_reply_manager->maybe_end_transaction((static_cast<uint32_t>(reply->header->id) << 16) | reply->header->sequence, stamp);
if (xact.first == Result::Valid) {
live_bucket()->new_transaction(_deep_sampling_now, xact.second);
}
Expand All @@ -323,10 +328,10 @@ void NetProbeMetricsManager::process_netprobe_tcp(uint32_t port, bool send, cons
new_event(stamp);

if (send) {
_request_reply_manager.start_transaction(port, {{stamp, {0, 0}}, target});
_request_reply_manager->start_transaction(port, {{stamp, {0, 0}}, target});
live_bucket()->process_attempts(_deep_sampling_now, target);
} else {
auto xact = _request_reply_manager.maybe_end_transaction(port, stamp);
auto xact = _request_reply_manager->maybe_end_transaction(port, stamp);
if (xact.first == Result::Valid) {
live_bucket()->new_transaction(_deep_sampling_now, xact.second);
}
Expand Down
14 changes: 11 additions & 3 deletions src/handlers/netprobe/NetProbeStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,18 +95,25 @@ class NetProbeMetricsBucket final : public visor::AbstractMetricsBucket

class NetProbeMetricsManager final : public visor::AbstractMetricsManager<NetProbeMetricsBucket>
{
TransactionManager<uint32_t, NetProbeTransaction, std::hash<uint32_t>> _request_reply_manager;
typedef TransactionManager<uint32_t, NetProbeTransaction, std::hash<uint32_t>> NetProbeTransactionManager;
std::unique_ptr<NetProbeTransactionManager> _request_reply_manager;

public:
NetProbeMetricsManager(const Configurable *window_config)
: visor::AbstractMetricsManager<NetProbeMetricsBucket>(window_config)
, _request_reply_manager(std::make_unique<NetProbeTransactionManager>())
{
}

void on_period_shift(timespec stamp, [[maybe_unused]] const NetProbeMetricsBucket *maybe_expiring_bucket) override
{
// NetProbe transaction support
_request_reply_manager.purge_old_transactions(stamp);
_request_reply_manager->purge_old_transactions(stamp);
}

void set_xact_ttl(uint32_t ttl)
{
_request_reply_manager = std::make_unique<NetProbeTransactionManager>(ttl);
}

void process_filtered(timespec stamp);
Expand All @@ -126,7 +133,8 @@ class NetProbeStreamHandler final : public visor::StreamMetricsHandler<NetProbeM
sigslot::connection _heartbeat_connection;

static const inline StreamMetricsHandler::ConfigsDefType _config_defs = {
"recorded_stream"};
"recorded_stream",
"xact_ttl_secs"};

static const inline NetProbeStreamHandler::GroupDefType _group_defs = {
{"counters", group::NetProbeMetrics::Counters},
Expand Down