Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cmd/pktvisord/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ static const char USAGE[] =
--admin-api Enable admin REST API giving complete control plane functionality [default: false]
When not specified, the exposed API is read-only access to summarized metrics.
When specified, write access is enabled for all modules.
--prometheus Enable native Prometheus metrics at path /metrics
-h --help Show this screen
-v Verbose log output
--no-track Don't send lightweight, anonymous usage metrics.
Expand Down Expand Up @@ -88,7 +89,11 @@ int main(int argc, char *argv[])
logger->set_level(spdlog::level::debug);
}

CoreServer svr(!args["--admin-api"].asBool(), logger);
std::string prometheus_path;
if (args["--prometheus"].asBool()) {
prometheus_path = "/metrics";
}
CoreServer svr(!args["--admin-api"].asBool(), logger, prometheus_path);
svr.set_http_logger([&logger](const auto &req, const auto &res) {
logger->info("REQUEST: {} {} {}", req.method, req.path, res.status);
if (res.status == 500) {
Expand Down
108 changes: 95 additions & 13 deletions src/AbstractMetricsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,66 @@ class PeriodException : public std::runtime_error

using namespace std::chrono;

class Metric
{
protected:
std::string _name;
std::string _desc;

public:
Metric(std::string name, std::string desc)
: _name(std::move(name))
, _desc(std::move(desc))
{
}

virtual void to_json(json &j) const = 0;
virtual void to_prometheus(std::stringstream &out, const std::string &key) const = 0;
};

/**
* A Counter metric class which knows how to render its output
* NOTE: intentionally _not_ thread safe; it should be protected by a mutex in the metric bucket
*/
class Counter final : Metric
{
uint64_t _value = 0;

public:
Counter(std::string name, std::string desc)
: Metric(std::move(name), std::move(desc))
{
}

Counter &operator++()
{
++_value;
return *this;
}

uint64_t value() const
{
return _value;
}

void operator+=(const Counter &other)
{
_value += other._value;
}

virtual void to_json(json &j) const override
{
j[_name] = _value;
}

virtual void to_prometheus(std::stringstream &out, const std::string &key) const override
{
out << "# HELP " << key << "_" << _name << ' ' << _desc << std::endl;
out << "# TYPE " << key << "_" << _name << " gauge" << std::endl;
out << key << '_' << _name << ' ' << _value << std::endl;
}
};

class Rate
{
public:
Expand Down Expand Up @@ -112,7 +172,7 @@ class Rate
return _rate;
}

auto quantile_get_rlocked() const
auto quantile_locked() const
{
std::shared_lock lock(_sketch_mutex);
struct retVals {
Expand All @@ -124,7 +184,7 @@ class Rate

void merge(const Rate &other)
{
auto [o_quantile, o_lock] = other.quantile_get_rlocked();
auto [o_quantile, o_lock] = other.quantile_locked();
std::unique_lock w_lock(_sketch_mutex);
_quantile.merge(*o_quantile);
// the live rate to simply copied if non zero
Expand All @@ -142,8 +202,8 @@ class AbstractMetricsBucket
{
private:
mutable std::shared_mutex _base_mutex;
uint64_t _num_samples = 0;
uint64_t _num_events = 0;
Counter _num_samples;
Counter _num_events;

Rate _rate_events;

Expand All @@ -162,7 +222,9 @@ class AbstractMetricsBucket

public:
AbstractMetricsBucket()
: _rate_events()
: _num_samples("deep_samples", "Total number of deep samples")
, _num_events("total", "Total number of events")
, _rate_events()
, _start_tstamp{0, 0}
, _end_tstamp{0, 0}
{
Expand Down Expand Up @@ -218,15 +280,16 @@ class AbstractMetricsBucket
on_set_read_only();
}

auto event_data() const
auto event_data_locked() const
{
std::shared_lock lock(_base_mutex);
struct eventData {
uint64_t num_events;
uint64_t num_samples;
const Counter *num_events;
const Counter *num_samples;
const Rate *event_rate;
std::shared_lock<std::shared_mutex> r_lock;
};
return eventData{_num_events, _num_samples, &_rate_events};
std::shared_lock lock(_base_mutex);
return eventData{&_num_events, &_num_samples, &_rate_events, std::move(lock)};
}

void merge(const AbstractMetricsBucket &other)
Expand Down Expand Up @@ -254,13 +317,14 @@ class AbstractMetricsBucket
// note, currently not enforcing _read_only
++_rate_events;
std::unique_lock lock(_base_mutex);
_num_events++;
++_num_events;
if (deep) {
_num_samples++;
++_num_samples;
}
}

virtual void to_json(json &j) const = 0;
virtual void to_prometheus(std::stringstream &out, const std::string &key) const = 0;
};

template <typename MetricsBucketClass>
Expand Down Expand Up @@ -338,7 +402,6 @@ class AbstractMetricsManager
static const uint MERGE_CACHE_TTL_MS = 1000;

protected:

/**
* the "base" event method that should be called on every event before specialized event functionality. sampling will be
* chosen, and the time window will be maintained
Expand Down Expand Up @@ -500,6 +563,25 @@ class AbstractMetricsManager
_metric_buckets.at(period)->to_json(j[period_str][key]);
}

void window_single_prometheus(std::stringstream &out, const std::string &key, uint64_t period = 0) const
{
std::shared_lock rl(_base_mutex);
std::shared_lock rbl(_bucket_mutex);

if (period >= _num_periods) {
std::stringstream err;
err << "invalid metrics period, specify [0, " << _num_periods - 1 << "]";
throw PeriodException(err.str());
}
if (period >= _metric_buckets.size()) {
std::stringstream err;
err << "requested metrics period has not yet accumulated, current range is [0, " << _metric_buckets.size() - 1 << "]";
throw PeriodException(err.str());
}

_metric_buckets.at(period)->to_prometheus(out, key);
}

void window_merged_json(json &j, const std::string &key, uint64_t period) const
{
std::shared_lock rl(_base_mutex);
Expand Down
27 changes: 24 additions & 3 deletions src/CoreServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
#include <spdlog/stopwatch.h>
#include <vector>

visor::CoreServer::CoreServer(bool read_only, std::shared_ptr<spdlog::logger> logger)
visor::CoreServer::CoreServer(bool read_only, std::shared_ptr<spdlog::logger> logger, const std::string &prometheus_path)
: _svr(read_only)
, _logger(logger)
, _start_time(std::chrono::system_clock::now())
Expand Down Expand Up @@ -36,7 +36,7 @@ visor::CoreServer::CoreServer(bool read_only, std::shared_ptr<spdlog::logger> lo
_handler_plugins.emplace_back(std::move(mod));
}

_setup_routes();
_setup_routes(prometheus_path);
}
void visor::CoreServer::start(const std::string &host, int port)
{
Expand Down Expand Up @@ -72,7 +72,7 @@ visor::CoreServer::~CoreServer()
{
stop();
}
void visor::CoreServer::_setup_routes()
void visor::CoreServer::_setup_routes(const std::string &prometheus_path)
{

_logger->info("Initialize server control plane");
Expand Down Expand Up @@ -158,4 +158,25 @@ void visor::CoreServer::_setup_routes()
res.set_content(e.what(), "text/plain");
}
});
if (!prometheus_path.empty()) {
_logger->info("enabling prometheus metrics on: {}", prometheus_path);
_svr.Get(prometheus_path.c_str(), [&]([[maybe_unused]] const httplib::Request &req, httplib::Response &res) {
std::stringstream output;
try {
auto [handler_modules, hm_lock] = _handler_manager->module_get_all_locked();
for (auto &[name, mod] : handler_modules) {
auto hmod = dynamic_cast<StreamHandler *>(mod.get());
if (hmod) {
spdlog::stopwatch sw;
hmod->window_prometheus(output);
_logger->debug("{} elapsed time: {}", hmod->name(), sw);
}
}
res.set_content(output.str(), "text/plain");
} catch (const std::exception &e) {
res.status = 500;
res.set_content(e.what(), "text/plain");
}
});
}
}
4 changes: 2 additions & 2 deletions src/CoreServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ class CoreServer
std::shared_ptr<spdlog::logger> _logger;
std::chrono::system_clock::time_point _start_time;

void _setup_routes();
void _setup_routes(const std::string &prometheus_path);

public:
CoreServer(bool read_only, std::shared_ptr<spdlog::logger> logger);
CoreServer(bool read_only, std::shared_ptr<spdlog::logger> logger, const std::string &prometheus_path);
~CoreServer();

void start(const std::string &host, int port);
Expand Down
9 changes: 5 additions & 4 deletions src/StreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class StreamHandler : public AbstractModule
virtual ~StreamHandler(){};

virtual void window_json(json &j, uint64_t period, bool merged) = 0;
virtual void window_prometheus(std::stringstream &out) = 0;
};

template <class MetricsManagerClass>
Expand Down Expand Up @@ -58,13 +59,13 @@ class StreamMetricsHandler : public StreamHandler
}
j["metrics"]["periods"][i]["read_only"] = _metrics->bucket(i)->read_only();
j["metrics"]["periods"][i]["length"] = _metrics->bucket(i)->period_length();
auto [num_events, num_samples, event_rate] = _metrics->bucket(i)->event_data();
j["metrics"]["periods"][i]["events"]["total"] = num_events;
j["metrics"]["periods"][i]["events"]["deep_samples"] = num_samples;
auto [num_events, num_samples, event_rate, event_lock] = _metrics->bucket(i)->event_data_locked();
num_events->to_json(j["metrics"]["periods"][i]["events"]);
num_samples->to_json(j["metrics"]["periods"][i]["events"]);
if (!_metrics->bucket(i)->read_only()) {
j["metrics"]["periods"][i]["events"]["rates"]["live"] = event_rate->rate();
}
auto [rate_quantile, rate_lock] = event_rate->quantile_get_rlocked();
auto [rate_quantile, rate_lock] = event_rate->quantile_locked();
auto quantiles = rate_quantile->get_quantiles(fractions, 4);
if (quantiles.size()) {
j["metrics"]["periods"][i]["events"]["rates"]["p50"] = quantiles[0];
Expand Down
15 changes: 10 additions & 5 deletions src/handlers/dns/DnsStreamHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,9 @@ void DnsStreamHandler::tcp_connection_end_cb(const pcpp::ConnectionData &connect
// remove the connection from the connection manager
_tcp_connections.erase(iter);
}

void DnsStreamHandler::window_prometheus(std::stringstream &out)
{
}
void DnsStreamHandler::window_json(json &j, uint64_t period, bool merged)
{
if (merged) {
Expand Down Expand Up @@ -271,12 +273,12 @@ void DnsMetricsBucket::to_json(json &j) const

const double fractions[4]{0.50, 0.90, 0.95, 0.99};

auto [num_events, num_samples, event_rate] = event_data(); // thread safe
auto [num_events, num_samples, event_rate, event_lock] = event_data_locked(); // thread safe
{
if (!read_only()) {
j["wire_packets"]["rates"]["total"]["live"] = event_rate->rate();
}
auto [rate_quantile, rate_lock] = event_rate->quantile_get_rlocked();
auto [rate_quantile, rate_lock] = event_rate->quantile_locked();
auto quantiles = rate_quantile->get_quantiles(fractions, 4);
if (quantiles.size()) {
j["wire_packets"]["rates"]["total"]["p50"] = quantiles[0];
Expand All @@ -288,8 +290,8 @@ void DnsMetricsBucket::to_json(json &j) const

std::shared_lock r_lock(_mutex);

j["wire_packets"]["total"] = num_events;
j["wire_packets"]["deep_samples"] = num_samples;
num_events->to_json(j["wire_packets"]);
num_samples->to_json(j["wire_packets"]);
j["wire_packets"]["queries"] = _counters.queries;
j["wire_packets"]["replies"] = _counters.replies;
j["wire_packets"]["tcp"] = _counters.TCP;
Expand Down Expand Up @@ -543,6 +545,9 @@ void DnsMetricsBucket::new_dns_transaction(bool deep, float to90th, float from90
}
}
}
void DnsMetricsBucket::to_prometheus(std::stringstream &out, const std::string &key) const
{
}

// the general metrics manager entry point (both UDP and TCP)
void DnsMetricsManager::process_dns_layer(DnsLayer &payload, PacketDirection dir, pcpp::ProtocolType l3, pcpp::ProtocolType l4, uint32_t flowkey, uint16_t port, timespec stamp)
Expand Down
2 changes: 2 additions & 0 deletions src/handlers/dns/DnsStreamHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class DnsMetricsBucket final : public visor::AbstractMetricsBucket
// visor::AbstractMetricsBucket
void specialized_merge(const AbstractMetricsBucket &other) override;
void to_json(json &j) const override;
void to_prometheus(std::stringstream &out, const std::string &key) const override;

void process_dns_layer(bool deep, DnsLayer &payload, pcpp::ProtocolType l3, pcpp::ProtocolType l4, uint16_t port);

Expand Down Expand Up @@ -228,6 +229,7 @@ class DnsStreamHandler final : public visor::StreamMetricsHandler<DnsMetricsMana

// visor::StreamHandler
void window_json(json &j, uint64_t period, bool merged) override;
void window_prometheus(std::stringstream &out) override;
};

}
Loading