Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
810 changes: 810 additions & 0 deletions 3rd/rng/randutils.hpp

Large diffs are not rendered by default.

46 changes: 28 additions & 18 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
static const char USAGE[] =
R"(pktvisord.
Usage:
pktvisord [-b BPF] [-p PORT] [-H HOSTSPEC] [--periods P] [--summary] [--geo-city FILE] [--geo-asn FILE] TARGET
pktvisord [-b BPF] [-p PORT] [-H HOSTSPEC] [--periods P] [--summary] [--geo-city FILE] [--geo-asn FILE]
[--max-deep-sample N]
TARGET
pktvisord (-h | --help)
pktvisord --version

Expand All @@ -34,18 +36,19 @@ static const char USAGE[] =
TARGET is either a network interface, an IP address (4 or 6) or a pcap file (ending in .pcap or .cap)

Options:
-p PORT Run metrics webserver on the given localhost port [default: 10853]
-b BPF Filter packets using the given BPF string
--geo-city FILE GeoLite2 City database to use for IP to Geo mapping (if enabled)
--geo-asn FILE GeoLite2 ASN database to use for IP to ASN mapping (if enabled)
--periods P Hold this many 60 second time periods of history in memory [default: 5]
--summary Instead of a time window with P periods, summarize all packets into one bucket for entire time period.
Useful for executive summary of (and applicable only to) a pcap file. [default: false]
-H HOSTSPEC Specify subnets (comma separated) to consider HOST, in CIDR form. In live capture this /may/ be detected automatically
from capture device but /must/ be specified for pcaps. Example: "10.0.1.0/24,10.0.2.1/32,2001:db8::/64"
Specifying this for live capture will append to any automatic detection.
-h --help Show this screen
--version Show version
-p PORT Run metrics webserver on the given localhost port [default: 10853]
-b BPF Filter packets using the given BPF string
--geo-city FILE GeoLite2 City database to use for IP to Geo mapping (if enabled)
--geo-asn FILE GeoLite2 ASN database to use for IP to ASN mapping (if enabled)
--max-deep-sample N Never deep sample more than N% of packets (an int between 0 and 100) [default: 100]
--periods P Hold this many 60 second time periods of history in memory [default: 5]
--summary Instead of a time window with P periods, summarize all packets into one bucket for entire time period.
Useful for executive summary of (and applicable only to) a pcap file. [default: false]
-H HOSTSPEC Specify subnets (comma separated) to consider HOST, in CIDR form. In live capture this /may/ be detected automatically
from capture device but /must/ be specified for pcaps. Example: "10.0.1.0/24,10.0.2.1/32,2001:db8::/64"
Specifying this for live capture will append to any automatic detection.
-h --help Show this screen
--version Show version
)";

static std::unique_ptr<pktvisor::MetricsMgr> metricsManager;
Expand Down Expand Up @@ -81,7 +84,7 @@ static void onGotTcpDnsMessage(pcpp::DnsLayer *dnsLayer, pktvisor::Direction dir
*/
static void onApplicationInterrupted(void *cookie)
{
std::cout << "stopping..." << std::endl;
std::cerr << "stopping..." << std::endl;
devCookie *dC = (devCookie *)cookie;
dC->second = true;
}
Expand Down Expand Up @@ -209,7 +212,7 @@ void openIface(pcpp::PcapLiveDevice *dev, pktvisor::TcpDnsReassembly &tcpReassem
if (bpfFilter != "") {
if (!dev->setFilter(bpfFilter))
throw std::runtime_error("Cannot set BPF filter to interface");
std::cout << "BPF: " << bpfFilter << std::endl;
std::cerr << "BPF: " << bpfFilter << std::endl;
}

printf("Starting packet capture on '%s'...\n", dev->getName());
Expand Down Expand Up @@ -388,11 +391,18 @@ int main(int argc, char *argv[])

pktvisor::TcpDnsReassembly tcpDnsReassembly(onGotTcpDnsMessage);
int result = 0;
int sampleRate = 100;
if (args["--max-sample"]) {
sampleRate = (int)args["--max-sample"].asLong();
if (sampleRate != 100) {
std::cerr << "Using maximum deep sample rate: " << sampleRate << "%" << std::endl;
}
}

if ((args["TARGET"].asString().rfind(".pcap") != std::string::npos) || (args["TARGET"].asString().rfind(".cap") != std::string::npos)) {
showHosts();
try {
metricsManager = std::make_unique<pktvisor::MetricsMgr>(args["--summary"].asBool());
metricsManager = std::make_unique<pktvisor::MetricsMgr>(args["--summary"].asBool(), 5, sampleRate);
handleGeo(args["--geo-city"], args["--geo-asn"]);
openPcap(args["TARGET"].asString(), tcpDnsReassembly, bpf);
if (args["--summary"].asBool()) {
Expand All @@ -408,7 +418,7 @@ int main(int argc, char *argv[])
return -1;
}
} else {
metricsManager = std::make_unique<pktvisor::MetricsMgr>(false, periods);
metricsManager = std::make_unique<pktvisor::MetricsMgr>(false, periods, sampleRate);
handleGeo(args["--geo-city"], args["--geo-asn"]);
pcpp::PcapLiveDevice *dev(nullptr);
// extract pcap live device by interface name or IP address
Expand Down Expand Up @@ -438,7 +448,7 @@ int main(int argc, char *argv[])
svr.listen("localhost", port);
});
try {
std::cout << "Interface " << dev->getName() << std::endl;
std::cerr << "Interface " << dev->getName() << std::endl;
getHostsFromIface(dev);
showHosts();
openIface(dev, tcpDnsReassembly, bpf);
Expand Down
99 changes: 70 additions & 29 deletions src/metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

namespace pktvisor {

Metrics::Metrics()
Metrics::Metrics(MetricsMgr& mmgr) : _mmgr(mmgr)
{
gettimeofday(&_bucketTS, nullptr);

Expand All @@ -28,6 +28,8 @@ Metrics::Metrics()
void Metrics::merge(Metrics &other)
{

_numSamples += other._numSamples;

_numPackets += other._numPackets;
_numPackets_UDP += other._numPackets_UDP;
_numPackets_TCP += other._numPackets_TCP;
Expand Down Expand Up @@ -104,13 +106,9 @@ void Metrics::newDNSPacket(pcpp::DnsLayer *dns, Direction dir, pcpp::ProtocolTyp
_DNS_TCP++;
}

// lock for write
std::unique_lock lock(_sketchMutex);

// only count response codes on responses (not queries)
if (dns->getDnsHeader()->queryOrResponse == response) {
_DNS_replies++;
_sketches->_dns_topRCode.update(dns->getDnsHeader()->responseCode);
switch (dns->getDnsHeader()->responseCode) {
case 0:
_DNS_NOERROR++;
Expand All @@ -129,6 +127,18 @@ void Metrics::newDNSPacket(pcpp::DnsLayer *dns, Direction dir, pcpp::ProtocolTyp
_DNS_queries++;
}

// sampler
if (!_mmgr.shouldDeepSample()) {
return;
}

// lock for write
std::unique_lock lock(_sketchMutex);

if (dns->getDnsHeader()->queryOrResponse == response) {
_sketches->_dns_topRCode.update(dns->getDnsHeader()->responseCode);
}

auto query = dns->getFirstQuery();
if (query) {

Expand Down Expand Up @@ -161,42 +171,57 @@ void Metrics::newDNSPacket(pcpp::DnsLayer *dns, Direction dir, pcpp::ProtocolTyp

void Metrics::newDNSXact(pcpp::DnsLayer *dns, Direction dir, DnsTransaction xact)
{
// lock for write
std::unique_lock lock(_sketchMutex);

// sampler
bool chosen = _mmgr.shouldDeepSample();

_DNS_xacts_total++;

uint64_t xactTime = (xact.totalTS.tv_sec * 1000000) + xact.totalTS.tv_usec; // microseconds
// dir is the direction of the last packet, meaning the reply so from a transaction perspective
// we look at it from the direction of the query, so the opposite side than we have here
float to90th = 0.0;
float from90th = 0.0;
uint64_t sample_threshold = 10;

if (chosen) {
// lock for write
std::unique_lock lock(_sketchMutex);
}

if (dir == toHost) {
_DNS_xacts_out++;
_sketches->_dnsXactFromTimeUs.update(xactTime);
// wait for N samples
if (_sketches->_dnsXactFromTimeUs.get_n() > sample_threshold) {
from90th = _sketches->_dnsXactFromTimeUs.get_quantile(0.90);
if (chosen) {
_sketches->_dnsXactFromTimeUs.update(xactTime);
// wait for N samples
if (_sketches->_dnsXactFromTimeUs.get_n() > sample_threshold) {
from90th = _sketches->_dnsXactFromTimeUs.get_quantile(0.90);
}
}
} else if (dir == fromHost) {
_DNS_xacts_in++;
_sketches->_dnsXactToTimeUs.update(xactTime);
// wait for N samples
if (_sketches->_dnsXactToTimeUs.get_n() > sample_threshold) {
to90th = _sketches->_dnsXactToTimeUs.get_quantile(0.90);
if (chosen) {
_sketches->_dnsXactToTimeUs.update(xactTime);
// wait for N samples
if (_sketches->_dnsXactToTimeUs.get_n() > sample_threshold) {
to90th = _sketches->_dnsXactToTimeUs.get_quantile(0.90);
}
}
}

auto query = dns->getFirstQuery();
if (query) {
auto name = query->getName();
// see comment in MetricsMgr::newDNSxact on direction and why "toHost" is used with "from"
if (dir == toHost && from90th > 0 && xactTime >= from90th) {
_sketches->_dns_slowXactOut.update(name);
} else if (dir == fromHost && to90th > 0 && xactTime >= to90th) {
_sketches->_dns_slowXactIn.update(name);
if (chosen) {
auto query = dns->getFirstQuery();
if (query) {
auto name = query->getName();
// see comment in MetricsMgr::newDNSxact on direction and why "toHost" is used with "from"
if (dir == toHost && from90th > 0 && xactTime >= from90th) {
_sketches->_dns_slowXactOut.update(name);
} else if (dir == fromHost && to90th > 0 && xactTime >= to90th) {
_sketches->_dns_slowXactIn.update(name);
}
}
}

}

void MetricsMgr::newDNSXact(pcpp::DnsLayer *dns, Direction dir, DnsTransaction xact)
Expand All @@ -209,10 +234,13 @@ void MetricsMgr::newDNSPacket(pcpp::DnsLayer *dns, Direction dir, pcpp::Protocol
_metrics.back()->newDNSPacket(dns, dir, l3, l4);
}

void Metrics::newPacket(MetricsMgr &mmgr, const pcpp::Packet &packet, pcpp::ProtocolType l3, pcpp::ProtocolType l4, Direction dir)
void Metrics::newPacket(const pcpp::Packet &packet, pcpp::ProtocolType l3, pcpp::ProtocolType l4, Direction dir)
{

_numPackets++;
if (_mmgr.shouldDeepSample()) {
_numSamples++;
}

switch (dir) {
case fromHost:
Expand Down Expand Up @@ -244,12 +272,17 @@ void Metrics::newPacket(MetricsMgr &mmgr, const pcpp::Packet &packet, pcpp::Prot
break;
}

// sampler
if (!_mmgr.shouldDeepSample()) {
return;
}

// lock for write
std::unique_lock lock(_sketchMutex);

#ifdef MMDB_ENABLE
const GeoDB* geoCityDB = mmgr.getGeoCityDB();
const GeoDB* geoASNDB = mmgr.getGeoASNDB();
const GeoDB* geoCityDB = _mmgr.getGeoCityDB();
const GeoDB* geoASNDB = _mmgr.getGeoASNDB();
#endif

auto IP4layer = packet.getLayerOfType<pcpp::IPv4Layer>();
Expand Down Expand Up @@ -334,7 +367,7 @@ void MetricsMgr::_periodShift()
_instantRates->resetQuantiles();

// add new bucket
_metrics.emplace_back(std::make_unique<Metrics>());
_metrics.emplace_back(std::make_unique<Metrics>(*this));
if (_metrics.size() > _numPeriods) {
// if we're at our period history length, pop the oldest
_metrics.pop_front();
Expand All @@ -352,6 +385,11 @@ void MetricsMgr::setInitialShiftTS(const pcpp::Packet &packet) {

void MetricsMgr::newPacket(const pcpp::Packet &packet, QueryResponsePairMgr &pairMgr, pcpp::ProtocolType l4, Direction dir, pcpp::ProtocolType l3)
{
// at each new packet, we determine if we are sampling, to limit collection of more detailed (expensive) statistics
_shouldDeepSample = true;
if (_deepSampleRate != 100) {
_shouldDeepSample = (_rng.uniform(0, 100) <= _deepSampleRate);
}
if (!_singleSummaryMode) {
// use packet timestamps to track when PERIOD_SEC passes so we don't have to hit system clock
auto pkt_ts = packet.getRawPacketReadOnly()->getPacketTimeStamp();
Expand All @@ -372,7 +410,7 @@ void MetricsMgr::newPacket(const pcpp::Packet &packet, QueryResponsePairMgr &pai
break;
}
}
_metrics.back()->newPacket(*this, packet, l3, l4, dir);
_metrics.back()->newPacket(packet, l3, l4, dir);
}

void Metrics::toJSON(nlohmann::json &j, const std::string &key)
Expand All @@ -382,6 +420,8 @@ void Metrics::toJSON(nlohmann::json &j, const std::string &key)
std::shared_lock lock_sketch(_sketchMutex);
std::shared_lock lock_rate(_rateSketchMutex);

j[key]["packets"]["deep_samples"] = _numSamples.load();

j[key]["packets"]["total"] = _numPackets.load();
j[key]["packets"]["udp"] = _numPackets_UDP.load();
j[key]["packets"]["tcp"] = _numPackets_TCP.load();
Expand Down Expand Up @@ -588,6 +628,7 @@ std::string MetricsMgr::getAppMetrics()
{
nlohmann::json j;
j["app"]["version"] = PKTVISOR_VERSION_NUM;
j["app"]["deep_sample_rate_pct"] = _deepSampleRate;
j["app"]["periods"] = _numPeriods;
j["app"]["single_summary"] = _singleSummaryMode;
j["app"]["up_time_min"] = float(std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now() - _startTime).count()) / 60;
Expand Down Expand Up @@ -660,7 +701,7 @@ std::string MetricsMgr::getMetricsMerged(uint64_t period)
}

auto period_length = 0;
Metrics merged;
Metrics merged(*this);

auto p = period;
for (auto &m : _metrics) {
Expand Down
Loading