Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions libs/visor_transaction/TransactionManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,19 @@ class TransactionManager
static_assert(std::is_base_of<Transaction, TransactionType>::value, "TransactionType must inherit from Transaction structure");
typedef robin_hood::unordered_map<XactID, TransactionType, Hash> XactMap;

uint32_t _ttl_secs;
uint32_t _ttl_secs{0};
uint32_t _ttl_ms{0};
XactMap _transactions;

public:
TransactionManager(uint32_t ttl_secs = 5)
: _ttl_secs(ttl_secs)
TransactionManager(uint32_t ttl_ms = 5000)
{
if (ttl_ms > 1000) {
_ttl_secs = ttl_ms / 1000;
_ttl_ms = ttl_ms - _ttl_secs * 1000;
} else {
_ttl_ms = ttl_ms;
}
}

void start_transaction(XactID id, TransactionType type)
Expand All @@ -73,7 +79,9 @@ class TransactionManager
auto result = _transactions[id];
timespec_diff(&endTS, &result.startTS, &result.totalTS);
_transactions.erase(id);
if (result.totalTS.tv_sec >= _ttl_secs) {
if (result.totalTS.tv_sec > _ttl_secs) {
return std::pair<Result, TransactionType>(Result::TimedOut, result);
} else if (result.totalTS.tv_sec == _ttl_secs && (result.totalTS.tv_nsec / 1.0e6) >= _ttl_ms) {
return std::pair<Result, TransactionType>(Result::TimedOut, result);
} else {
return std::pair<Result, TransactionType>(Result::Valid, result);
Expand All @@ -97,6 +105,11 @@ class TransactionManager
return timed_out.size();
}

void clear()
{
_transactions.clear();
}

typename XactMap::size_type open_transaction_count() const
{
return _transactions.size();
Expand Down
11 changes: 7 additions & 4 deletions src/handlers/dhcp/DhcpStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,12 @@ void DhcpStreamHandler::start()
_metrics->set_recorded_stream();
}

if (config_exists("xact_ttl_secs")) {
auto ttl = config_get<uint64_t>("xact_ttl_secs");
if (config_exists("xact_ttl_ms")) {
auto ttl = config_get<uint64_t>("xact_ttl_ms");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl));
} else if (config_exists("xact_ttl_secs")) {
auto ttl = config_get<uint64_t>("xact_ttl_secs");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl) * 1000);
}

if (_pcap_proxy) {
Expand Down Expand Up @@ -151,9 +154,9 @@ void DhcpMetricsBucket::to_opentelemetry(metrics::v1::ScopeMetrics &scope, Metri
{
auto start_ts = start_tstamp();
auto end_ts = end_tstamp();

_rate_total.to_opentelemetry(scope, start_ts, end_ts, add_labels);

{
auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe

Expand Down
3 changes: 2 additions & 1 deletion src/handlers/dhcp/DhcpStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ class DhcpStreamHandler final : public visor::StreamMetricsHandler<DhcpMetricsMa

static const inline StreamMetricsHandler::ConfigsDefType _config_defs = {
"recorded_stream",
"xact_ttl_secs"};
"xact_ttl_secs",
"xact_ttl_ms"};

void process_udp_packet_cb(pcpp::Packet &payload, PacketDirection dir, pcpp::ProtocolType l3, uint32_t flowkey, timespec stamp);

Expand Down
10 changes: 6 additions & 4 deletions src/handlers/dns/v1/DnsStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,12 @@ void DnsStreamHandler::start()
_metrics->set_recorded_stream();
}

if (config_exists("xact_ttl_secs")) {
auto ttl = config_get<uint64_t>("xact_ttl_secs");
if (config_exists("xact_ttl_ms")) {
auto ttl = config_get<uint64_t>("xact_ttl_ms");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl));
} else if (config_exists("xact_ttl_secs")) {
auto ttl = config_get<uint64_t>("xact_ttl_secs");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl) * 1000);
}

if (_pcap_proxy) {
Expand Down Expand Up @@ -1160,14 +1163,13 @@ void DnsMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelMap ad
});
}


void DnsMetricsBucket::to_opentelemetry(metrics::v1::ScopeMetrics &scope, Metric::LabelMap add_labels) const
{
auto start_ts = start_tstamp();
auto end_ts = end_tstamp();

_rate_total.to_opentelemetry(scope, start_ts, end_ts, add_labels);

{
auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe

Expand Down
3 changes: 2 additions & 1 deletion src/handlers/dns/v1/DnsStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,8 @@ class DnsStreamHandler final : public visor::StreamMetricsHandler<DnsMetricsMana
"dnstap_msg_type",
"public_suffix_list",
"recorded_stream",
"xact_ttl_secs"};
"xact_ttl_secs",
"xact_ttl_ms"};

static const inline StreamMetricsHandler::GroupDefType _group_defs = {
{"cardinality", group::DnsMetrics::Cardinality},
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/dns/v1/tests/test_dns_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@ TEST_CASE("DNS invalid config", "[dns][filter][config]")
c.config_set<uint64_t>("num_periods", 1);
DnsStreamHandler dns_handler{"dns-test", stream_proxy, &c};
dns_handler.config_set<bool>("invalid_config", true);
REQUIRE_THROWS_WITH(dns_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: exclude_noerror, only_rcode, only_queries, only_responses, only_dnssec_response, answer_count, only_qtype, only_qname, only_qname_suffix, geoloc_notfound, asn_notfound, dnstap_msg_type, public_suffix_list, recorded_stream, xact_ttl_secs, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold");
REQUIRE_THROWS_WITH(dns_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: exclude_noerror, only_rcode, only_queries, only_responses, only_dnssec_response, answer_count, only_qtype, only_qname, only_qname_suffix, geoloc_notfound, asn_notfound, dnstap_msg_type, public_suffix_list, recorded_stream, xact_ttl_secs, xact_ttl_ms, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold");
}

TEST_CASE("DNS config ttl", "[dns][config]")
Expand Down
7 changes: 5 additions & 2 deletions src/handlers/dns/v2/DnsStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,12 @@ void DnsStreamHandler::start()
_metrics->set_recorded_stream();
}

if (config_exists("xact_ttl_secs")) {
auto ttl = config_get<uint64_t>("xact_ttl_secs");
if (config_exists("xact_ttl_ms")) {
auto ttl = config_get<uint64_t>("xact_ttl_ms");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl));
} else if (config_exists("xact_ttl_secs")) {
auto ttl = config_get<uint64_t>("xact_ttl_secs");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl) * 1000);
}

if (_pcap_proxy) {
Expand Down
4 changes: 3 additions & 1 deletion src/handlers/dns/v2/DnsStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,9 @@ class DnsStreamHandler final : public visor::StreamMetricsHandler<DnsMetricsMana
"asn_notfound",
"dnstap_msg_type",
"public_suffix_list",
"recorded_stream"};
"recorded_stream",
"xact_ttl_secs",
"xact_ttl_ms"};

static const inline StreamMetricsHandler::GroupDefType _group_defs = {
{"cardinality", group::DnsMetrics::Cardinality},
Expand Down
2 changes: 1 addition & 1 deletion src/handlers/dns/v2/tests/test_dns_layer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -936,5 +936,5 @@ TEST_CASE("DNS invalid config", "[dns][filter][config]")
c.config_set<uint64_t>("num_periods", 1);
DnsStreamHandler dns_handler{"dns-test", stream_proxy, &c};
dns_handler.config_set<bool>("invalid_config", true);
REQUIRE_THROWS_WITH(dns_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: exclude_noerror, only_rcode, only_dnssec_response, answer_count, only_qtype, only_qname, only_qname_suffix, geoloc_notfound, asn_notfound, dnstap_msg_type, public_suffix_list, recorded_stream, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold");
REQUIRE_THROWS_WITH(dns_handler.start(), "invalid_config is an invalid/unsupported config or filter. The valid configs/filters are: exclude_noerror, only_rcode, only_dnssec_response, answer_count, only_qtype, only_qname, only_qname_suffix, geoloc_notfound, asn_notfound, dnstap_msg_type, public_suffix_list, recorded_stream, xact_ttl_secs, xact_ttl_ms, deep_sample_rate, num_periods, topn_count, topn_percentile_threshold");
}
21 changes: 18 additions & 3 deletions src/handlers/netprobe/NetProbeStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,14 @@ void NetProbeStreamHandler::start()
_metrics->set_recorded_stream();
}

if (config_exists("xact_ttl_secs")) {
if (config_exists("xact_ttl_ms")) {
auto ttl = config_get<uint64_t>("xact_ttl_ms");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl));
} else if (config_exists("xact_ttl_secs")) {
auto ttl = config_get<uint64_t>("xact_ttl_secs");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl) * 1000);
} else if (_netprobe_proxy->config_exists("xact_ttl_ms")) {
auto ttl = _netprobe_proxy->config_get<uint64_t>("xact_ttl_ms");
_metrics->set_xact_ttl(static_cast<uint32_t>(ttl));
}

Expand Down Expand Up @@ -114,6 +120,7 @@ void NetProbeMetricsBucket::specialized_merge(const AbstractMetricsBucket &o, Me
_targets_metrics[targetId]->attempts += target.second->attempts;
_targets_metrics[targetId]->successes += target.second->successes;
_targets_metrics[targetId]->dns_failures += target.second->dns_failures;
_targets_metrics[targetId]->timed_out += target.second->timed_out;
}
if (group_enabled(group::NetProbeMetrics::Histograms)) {
_targets_metrics[targetId]->h_time_us.merge(target.second->h_time_us);
Expand All @@ -137,6 +144,7 @@ void NetProbeMetricsBucket::to_prometheus(std::stringstream &out, Metric::LabelM
target.second->attempts.to_prometheus(out, target_labels);
target.second->successes.to_prometheus(out, target_labels);
target.second->dns_failures.to_prometheus(out, target_labels);
target.second->timed_out.to_prometheus(out, target_labels);
}

bool h_max_min{true};
Expand Down Expand Up @@ -182,9 +190,9 @@ void NetProbeMetricsBucket::to_opentelemetry(metrics::v1::ScopeMetrics &scope, M
{
auto start_ts = start_tstamp();
auto end_ts = end_tstamp();

std::shared_lock r_lock(_mutex);

for (const auto &target : _targets_metrics) {
auto target_labels = add_labels;
auto targetId = target.first;
Expand All @@ -194,6 +202,7 @@ void NetProbeMetricsBucket::to_opentelemetry(metrics::v1::ScopeMetrics &scope, M
target.second->attempts.to_opentelemetry(scope, start_ts, end_ts, target_labels);
target.second->successes.to_opentelemetry(scope, start_ts, end_ts, target_labels);
target.second->dns_failures.to_opentelemetry(scope, start_ts, end_ts, target_labels);
target.second->timed_out.to_opentelemetry(scope, start_ts, end_ts, target_labels);
}

bool h_max_min{true};
Expand Down Expand Up @@ -247,6 +256,7 @@ void NetProbeMetricsBucket::to_json(json &j) const
target.second->attempts.to_json(j["targets"][targetId]);
target.second->successes.to_json(j["targets"][targetId]);
target.second->dns_failures.to_json(j["targets"][targetId]);
target.second->timed_out.to_json(j["targets"][targetId]);
}

bool h_max_min{true};
Expand Down Expand Up @@ -303,6 +313,7 @@ void NetProbeMetricsBucket::process_failure(ErrorType error, const std::string &
++_targets_metrics[target]->dns_failures;
break;
case ErrorType::Timeout:
++_targets_metrics[target]->timed_out;
case ErrorType::SocketError:
case ErrorType::InvalidIp:
case ErrorType::ConnectionFailure:
Expand Down Expand Up @@ -374,6 +385,8 @@ void NetProbeMetricsManager::process_netprobe_icmp(pcpp::IcmpLayer *layer, const
auto xact = _request_reply_manager->maybe_end_transaction((static_cast<uint32_t>(reply->header->id) << 16) | reply->header->sequence, stamp);
if (xact.first == Result::Valid) {
live_bucket()->new_transaction(_deep_sampling_now, xact.second);
} else if (xact.first == Result::TimedOut) {
live_bucket()->process_failure(ErrorType::Timeout, xact.second.target);
}
}
}
Expand All @@ -391,6 +404,8 @@ void NetProbeMetricsManager::process_netprobe_tcp(uint32_t port, bool send, cons
auto xact = _request_reply_manager->maybe_end_transaction(port, stamp);
if (xact.first == Result::Valid) {
live_bucket()->new_transaction(_deep_sampling_now, xact.second);
} else if (xact.first == Result::TimedOut) {
live_bucket()->process_failure(ErrorType::Timeout, xact.second.target);
}
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/handlers/netprobe/NetProbeStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ struct Target {
Counter minimum;
Counter maximum;
Counter dns_failures;
Counter timed_out;

Target()
: q_time_us(NET_PROBE_SCHEMA, {"response_quantiles_us"}, "Net Probe quantile in microseconds")
Expand All @@ -59,6 +60,7 @@ struct Target {
, minimum(NET_PROBE_SCHEMA, {"response_min_us"}, "Minimum response time measured in the reporting interval")
, maximum(NET_PROBE_SCHEMA, {"response_max_us"}, "Maximum response time measured in the reporting interval")
, dns_failures(NET_PROBE_SCHEMA, {"dns_lookup_failures"}, "Total Net Probe failures when performed DNS lookup")
, timed_out(NET_PROBE_SCHEMA, {"packets_timeout"}, "Total Net Probe timeout transactions")
{
}
};
Expand Down Expand Up @@ -106,10 +108,10 @@ class NetProbeMetricsManager final : public visor::AbstractMetricsManager<NetPro
{
}

void on_period_shift(timespec stamp, [[maybe_unused]] const NetProbeMetricsBucket *maybe_expiring_bucket) override
void on_period_shift([[maybe_unused]] timespec stamp, [[maybe_unused]] const NetProbeMetricsBucket *maybe_expiring_bucket) override
{
// NetProbe transaction support
_request_reply_manager->purge_old_transactions(stamp);
// Clear all old transactions
_request_reply_manager->clear();
}

void set_xact_ttl(uint32_t ttl)
Expand All @@ -135,7 +137,8 @@ class NetProbeStreamHandler final : public visor::StreamMetricsHandler<NetProbeM

static const inline StreamMetricsHandler::ConfigsDefType _config_defs = {
"recorded_stream",
"xact_ttl_secs"};
"xact_ttl_secs",
"xact_ttl_ms"};

static const inline NetProbeStreamHandler::GroupDefType _group_defs = {
{"counters", group::NetProbeMetrics::Counters},
Expand Down
4 changes: 3 additions & 1 deletion src/inputs/netprobe/NetProbeInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ void NetProbeInputStream::info_json(json &j) const

std::unique_ptr<InputEventProxy> NetProbeInputStream::create_event_proxy(const Configurable &filter)
{
return std::make_unique<NetProbeInputEventProxy>(_name, filter);
auto custom_filter = filter;
custom_filter.config_set("xact_ttl_ms", _timeout_msec);
return std::make_unique<NetProbeInputEventProxy>(_name, custom_filter);
}
}
17 changes: 0 additions & 17 deletions src/inputs/netprobe/PingProbe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,20 +152,6 @@ bool PingProbe::start(std::shared_ptr<uvw::Loop> io_loop)
_interval_timer->on<uvw::TimerEvent>([this](const auto &, auto &) {
_internal_sequence = 0;

_timeout_timer = _io_loop->resource<uvw::TimerHandle>();
if (!_timeout_timer) {
throw NetProbeException("PingProbe - unable to initialize timeout TimerHandle");
}

_timeout_timer->on<uvw::TimerEvent>([this](const auto &, auto &) {
_internal_sequence = _config.packets_per_test;
_fail(ErrorType::Timeout, TestType::Ping, _name);
if (_internal_timer) {
_internal_timer->stop();
}
_interval_timer->again();
});

if (auto error = _create_socket(); error.has_value()) {
_fail(error.value(), TestType::Ping, _name);
return;
Expand All @@ -180,16 +166,13 @@ bool PingProbe::start(std::shared_ptr<uvw::Loop> io_loop)
_internal_timer->on<uvw::TimerEvent>([this](const auto &, auto &) {
if (_internal_sequence < static_cast<uint8_t>(_config.packets_per_test)) {
_internal_sequence++;
_timeout_timer->stop();
_timeout_timer->start(uvw::TimerHandle::Time{_config.timeout_msec}, uvw::TimerHandle::Time{0});
_send_icmp_v4(_internal_sequence);
}
});

(_sequence == UCHAR_MAX) ? _sequence = 0 : _sequence++;
_send_icmp_v4(_internal_sequence);
_internal_sequence++;
_timeout_timer->start(uvw::TimerHandle::Time{_config.timeout_msec}, uvw::TimerHandle::Time{0});
_internal_timer->start(uvw::TimerHandle::Time{_config.packets_interval_msec}, uvw::TimerHandle::Time{_config.packets_interval_msec});
});

Expand Down
1 change: 0 additions & 1 deletion src/inputs/netprobe/PingProbe.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ class PingProbe final : public NetProbe
uint8_t _internal_sequence{0};
std::shared_ptr<uvw::TimerHandle> _interval_timer;
std::shared_ptr<uvw::TimerHandle> _internal_timer;
std::shared_ptr<uvw::TimerHandle> _timeout_timer;
std::shared_ptr<uvw::AsyncHandle> _recv_handler;
SOCKETLEN _sin_length{0};
std::vector<uint8_t> _payload_array;
Expand Down