Skip to content
This repository was archived by the owner on Mar 30, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/connection_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ extern "C" {
namespace mlab {

// `data` is arbitrary (possibly binary) data to be cached under key.
// It will typically by nlmsg data.
// It will typically be nlmsg data.
bool ConnectionTracker::UpdateRecord(size_t key, int protocol,
std::string* data) {
const auto& it = connections_.find(key);
Expand Down
8 changes: 8 additions & 0 deletions src/connection_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ namespace mlab {
* connections. It is intended to be used to cache tcpinfo and related data,
* allow it to be updated (swapping the new for old data for a connection),
* and iterate with a visitor over any stale connections.
*
* Thread compatible.
****************************************************************************/
class ConnectionTracker {
public:
Expand Down Expand Up @@ -68,6 +70,12 @@ class ConnectionTracker {
std::function<void(int protocol, const std::string& old_msg,
const std::string& new_msg)> visitor);

int Clear() {
int count = size();
connections_.clear();
return count;
}

// Increment the round number, which is used to determine records that have
// not been updated in the last round.
void increment_round() { ++round_; } // Don't care about wrapping.
Expand Down
33 changes: 17 additions & 16 deletions src/tcpinfo_lib.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ namespace netlink {

namespace {
// Extract a single 16 bit ipv6 field from byte array.
// <ip> byte array
// <n> offset
// `ip` byte array
// `n` offset
unsigned int extract(const std::string& ip, int n) {
return (((unsigned char)(ip[n])) << 8) + (unsigned char)ip[n + 1];
}
Expand All @@ -48,8 +48,7 @@ TCPState GetState(struct inet_diag_msg* r) {
}

TCPState GetState(const struct nlmsghdr* nlh) {
// TODO(gfr) Use C++ style casts.
return GetState((struct inet_diag_msg*)NLMSG_DATA(nlh));
return GetState(reinterpret_cast<struct inet_diag_msg*>(NLMSG_DATA(nlh)));
}

InetDiagMsgProto::AddressFamily GetFamily(struct inet_diag_msg* r) {
Expand All @@ -63,8 +62,8 @@ InetDiagMsgProto::AddressFamily GetFamily(struct inet_diag_msg* r) {
}

// Parse primary inet_diag_msg into protobuf.
// <r> Non-null pointer to message.
// <proto> Non-null pointer to protobuf.
// `r` Non-null pointer to message.
// `proto` Non-null pointer to protobuf.
void ParseInetDiagMsg(struct inet_diag_msg* r, InetDiagMsgProto* proto) {
proto->set_family(GetFamily(r));
proto->set_state(GetState(r));
Expand Down Expand Up @@ -106,10 +105,10 @@ void ParseInetDiagMsg(struct inet_diag_msg* r, InetDiagMsgProto* proto) {
}

// Parse rtattr message containing BBR info.
// <rta> Non-null pointer to message.
// <proto> Non-null pointer to protobuf.
// `rta` Non-null pointer to message.
// `proto` Non-null pointer to protobuf.
void ParseBBRInfo(const struct rtattr* rta, BBRInfoProto* proto) {
const auto* bbr = (const struct tcp_bbr_info*)rta;
const auto* bbr = (const struct tcp_bbr_info*)RTA_DATA(rta);
auto bw = (((unsigned long long)bbr->bbr_bw_hi) << 32) + bbr->bbr_bw_lo;
if (bw > 0) proto->set_bw(bw);
if (bbr->bbr_min_rtt > 0) proto->set_min_rtt(bbr->bbr_min_rtt);
Expand All @@ -120,8 +119,8 @@ void ParseBBRInfo(const struct rtattr* rta, BBRInfoProto* proto) {
#define TCPI_HAS_OPT(info, opt) !!(info->tcpi_options & (opt))

// Parse rtattr message containing TCP info.
// <rta> Non-null pointer to message.
// <proto> Non-null pointer to protobuf.
// `rta` Non-null pointer to message.
// `proto` Non-null pointer to protobuf.
void ParseTCPInfo(const struct rtattr* rta, TCPInfoProto* proto) {
const struct tcp_info* info;
unsigned len = RTA_PAYLOAD(rta);
Expand Down Expand Up @@ -203,8 +202,8 @@ void ParseTCPInfo(const struct rtattr* rta, TCPInfoProto* proto) {
}

// Parse rtattr message containing mem info.
// <rta> Non-null pointer to message.
// <proto> Non-null pointer to protobuf.
// `rta` Non-null pointer to message.
// `proto` Non-null pointer to protobuf.
void ParseMemInfo(const struct rtattr* rta, SocketMemInfoProto* proto) {
const auto* info = (const struct inet_diag_meminfo*)RTA_DATA(rta);
proto->set_rmem_alloc(info->idiag_rmem);
Expand All @@ -214,8 +213,8 @@ void ParseMemInfo(const struct rtattr* rta, SocketMemInfoProto* proto) {
}

// Parse rtattr message containing SK mem info.
// <rta> Non-null pointer to message.
// <proto> Non-null pointer to protobuf.
// `rta` Non-null pointer to message.
// `proto` Non-null pointer to protobuf.
void ParseSKMemInfo(const struct rtattr* rta, SocketMemInfoProto* proto) {
const __u32* info = (__u32*)RTA_DATA(rta);
#define SET_MEM_FIELD_IF_NONZERO(tag, field) \
Expand Down Expand Up @@ -352,7 +351,9 @@ void TCPInfoPoller::PollOnce() {
void TCPInfoPoller::PollContinuously(uint polling_interval_msec) {}

void TCPInfoPoller::Break() {}
void TCPInfoPoller::ClearCache() {}
int TCPInfoPoller::ClearCache() {
return tracker_.Clear();
}
void TCPInfoPoller::ClearFilters() {}
bool TCPInfoPoller::ClearFilter(ConnectionFilter::Token token) { return false; }

Expand Down
22 changes: 15 additions & 7 deletions src/tcpinfo_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,21 @@ TCPState GetStateFromStr(const std::string& data);
*****************************************************************************/
class TCPInfoParser {
public:
// <msg> ownership NOT transfered.
// <protocol> fallback if it is not specified in msg.
// `msg` ownership NOT transfered.
// `protocol` fallback if it is not specified in msg.
TCPDiagnosticsProto ParseNLMsg(const struct nlmsghdr* msg,
Protocol protocol) const;

// <msg> ownership NOT transfered.
// <protocol> fallback if it is not specified in msg.
// <proto> output protobuf into which msg will be parsed.
// `protocol` fallback if it is not specified in msg.
TCPDiagnosticsProto ParseNLMsg(const std::string& msg,
Protocol protocol) const {
auto* hdr = (const struct nlmsghdr*)msg.c_str();
return ParseNLMsg(hdr, mlab::netlink::Protocol(protocol));
}

// `msg` ownership NOT transfered.
// `protocol` fallback if it is not specified in msg.
// `proto` output protobuf into which msg will be parsed.
void NLMsgToProto(const struct nlmsghdr* msg, Protocol protocol,
TCPDiagnosticsProto* proto) const;
};
Expand Down Expand Up @@ -116,7 +123,8 @@ class TCPInfoPoller {
void Break();

// Clear the connection cache.
void ClearCache();
// Returns the number of records in the cache.
int ClearCache();

// Remove all connection filters.
void ClearFilters();
Expand Down Expand Up @@ -146,7 +154,7 @@ class TCPInfoPoller {

// Specify handlers that will be run on any events that do not match any of
// the tuple filters.
// <states> is a list of states that should be reported. If empty, all
// `states` is a list of states that should be reported. If empty, all
// states will be reported.
void OnClose(Handler handler, const std::vector<TCPState>& states = {});
void OnNewState(Handler handler, const std::vector<TCPState>& states = {});
Expand Down
68 changes: 40 additions & 28 deletions src/tcpinfo_proto_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

namespace mlab {
namespace netlink {

// These samples (raw{1,2,3,4}) are collected from stripped down ss.c
// that also dumps the raw data. Would be good to have more variability.

Expand All @@ -30,24 +29,31 @@ namespace netlink {
// sack, cubic, wscale:7/7, rto:208, rtt:8.5/11, ato:40, mss:1398, cwnd:19,
// ssthresh:18, send 25.0M,bps, lastsnd:239268, lastrcv:239268, lastack:13972,
// rcv_rtt:36, rcv_space:28800
// PLUS fake rtattr containing bbrinfo.
std::string raw1(
"\x10\x01\x00\x00\x14\x00\x02\x00\x40\xE2\x01\x00\xA8\x4B\x00\x00\x0A\x01"
"\x02\x00\x96\xE1\x01\xBB\x26\x20\x00\x00\x10\x03\x04\x13\xAC\x8F\x79\x71"
"\x39\x73\xB4\x8E\x26\x07\xF8\xB0\x40\x06\x08\x0D\x00\x00\x00\x00\x00\x00"
"\x20\x0E\x00\x00\x00\x00\x00\x1F\xF8\x35\x00\x88\xFF\xFF\x6C\x79\x00\x00"
"\x00\x00\x00\x00\x00\x00\x00\x00\x38\x42\x02\x00\xBA\x9F\x58\x05\x05\x00"
"\x08\x00\x00\x00\x00\x00\x14\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
"\x00\x00\x00\x00\x00\x00\x00\x00\x24\x00\x07\x00\x00\x00\x00\x00\x80\xA2"
"\x05\x00\x00\x00\x00\x00\x00\x56\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
"\x00\x00\x00\x00\x00\x00\x00\x00\x6C\x00\x02\x00\x01\x00\x00\x00\x00\x07"
"\x77\x00\x80\x2C\x03\x00\x40\x9C\x00\x00\x76\x05\x00\x00\x4A\x02\x00\x00"
"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
"\x00\x00\xA4\xA6\x03\x00\x00\x00\x00\x00\xA4\xA6\x03\x00\x94\x36\x00\x00"
"\xDC\x05\x00\x00\xD8\x7D\x00\x00\x34\x21\x00\x00\xF8\x2A\x00\x00\x12\x00"
"\x00\x00\x13\x00\x00\x00\x94\x05\x00\x00\x03\x00\x00\x00\xA0\x8C\x00\x00"
"\x80\x70\x00\x00\x00\x00\x00\x00\x0A\x00\x04\x00\x63\x75\x62\x69\x63\x00"
"\x00\x00",
0x110);
"\x28\x01\x00\x00\x14\x00\x02\x00\x40\xE2\x01\x00\xA8\x4B\x00\x00"
"\x0A\x01\x02\x00\x96\xE1\x01\xBB\x26\x20\x00\x00\x10\x03\x04\x13"
"\xAC\x8F\x79\x71\x39\x73\xB4\x8E\x26\x07\xF8\xB0\x40\x06\x08\x0D"
"\x00\x00\x00\x00\x00\x00\x20\x0E\x00\x00\x00\x00\x00\x1F\xF8\x35"
"\x00\x88\xFF\xFF\x6C\x79\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
"\x38\x42\x02\x00\xBA\x9F\x58\x05\x05\x00\x08\x00\x00\x00\x00\x00"
"\x14\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
"\x00\x00\x00\x00\x24\x00\x07\x00\x00\x00\x00\x00\x80\xA2\x05\x00"
"\x00\x00\x00\x00\x00\x56\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00"
"\x00\x00\x00\x00\x00\x00\x00\x00\x6C\x00\x02\x00\x01\x00\x00\x00"
"\x00\x07\x77\x00\x80\x2C\x03\x00\x40\x9C\x00\x00\x76\x05\x00\x00"
"\x4A\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
"\x00\x00\x00\x00\x00\x00\x00\x00\xA4\xA6\x03\x00\x00\x00\x00\x00"
"\xA4\xA6\x03\x00\x94\x36\x00\x00\xDC\x05\x00\x00\xD8\x7D\x00\x00"
"\x34\x21\x00\x00\xF8\x2A\x00\x00\x12\x00\x00\x00\x13\x00\x00\x00"
"\x94\x05\x00\x00\x03\x00\x00\x00\xA0\x8C\x00\x00\x80\x70\x00\x00"
"\x00\x00\x00\x00\x0A\x00\x04\x00\x63\x75\x62\x69\x63\x00\x00\x00"
"\x18\x00\x10\x00" // len 24, type 16
"\x12\x23\x34\x45\x01\x00\x00\x00" // bw high and low
"\x9a\xab\x04\x00" // min_rtt uSec
"\xde\xef\x01\x00" // pacing gain << 8
"\x23\x34\x02\x00", // cwnd gain << 8
0x128);

// 0 0 2620:0:1003:413:ac8f:7971:3973:b48e:54550
// 2607:f8b0:400d:c03::bd:https timer:(keepalive 30sec 0), uid:148024,
Expand Down Expand Up @@ -416,7 +422,7 @@ TEST(Parser, IPToString) {

TEST(Parser, LotsOfFields) {
TCPInfoParser parser;
auto* hdr = (const struct nlmsghdr*)raw1.c_str();
auto* hdr = (const struct nlmsghdr*)raw1.data();
auto proto = parser.ParseNLMsg(hdr, IPPROTO_TCP);
ASSERT_EQ(proto.diag_protocol(), IPPROTO_TCP);
EXPECT_TRUE(proto.has_inet_diag_msg());
Expand Down Expand Up @@ -449,39 +455,45 @@ TEST(Parser, LotsOfFields) {
EXPECT_EQ(proto.socket_mem().rcvbuf(), 369280);
EXPECT_EQ(proto.socket_mem().sndbuf(), 87552);

ASSERT_TRUE(proto.has_bbr_info());
EXPECT_EQ(proto.bbr_info().bw(), 5456012050ll);
EXPECT_EQ(proto.bbr_info().min_rtt(), 306074);
EXPECT_EQ(proto.bbr_info().pacing_gain(), 126942);
EXPECT_EQ(proto.bbr_info().cwnd_gain(), 144419);

// Check proper parsing of endpoints.
EXPECT_EQ(ToString(proto.inet_diag_msg().sock_id().source()),
"[2620:0:1003:413:ac8f:7971:3973:b48e]:38625");
// Don't really care if this changes, but useful to detect
// when it does.
EXPECT_EQ(proto.ByteSize(), 185);
EXPECT_EQ(proto.ByteSize(), 205);
fprintf(stderr, "%s\n", proto.DebugString().c_str());
fprintf(stderr, "%d\n", proto.ByteSize());
}

TEST(Parser, MoreSamples) {
TCPInfoParser parser;
auto* hdr = (const struct nlmsghdr*)raw2.c_str();
auto* hdr = (const struct nlmsghdr*)raw2.data();
auto proto = parser.ParseNLMsg(hdr, IPPROTO_TCP);
ASSERT_TRUE(proto.has_tcp_info());
ASSERT_TRUE(proto.has_socket_mem());

for (auto& msg : {raw3, raw4, raw5, raw6, raw7, raw8, raw9}) {
hdr = (const struct nlmsghdr*)msg.c_str();
hdr = (const struct nlmsghdr*)msg.data();
proto = parser.ParseNLMsg(hdr, IPPROTO_TCP);
ASSERT_TRUE(proto.has_tcp_info());
ASSERT_TRUE(proto.has_socket_mem());
}

for (auto& msg : {raw10, raw11, raw12, raw13}) {
hdr = (const struct nlmsghdr*)msg.c_str();
hdr = (const struct nlmsghdr*)msg.data();
proto = parser.ParseNLMsg(hdr, IPPROTO_TCP);
ASSERT_TRUE(proto.has_tcp_info());
ASSERT_TRUE(proto.has_socket_mem());
}

for (auto& msg : {raw13, raw14, raw15, raw16, raw17, raw18, raw19}) {
hdr = (const struct nlmsghdr*)msg.c_str();
hdr = (const struct nlmsghdr*)msg.data();
proto = parser.ParseNLMsg(hdr, IPPROTO_TCP);
ASSERT_TRUE(proto.has_tcp_info());
ASSERT_TRUE(proto.has_socket_mem());
Expand Down Expand Up @@ -525,19 +537,19 @@ TEST(Poller, StashAndOnClose) {
p.OnNewState(on_new_state); // always call for new states.

{
auto* nlh = (const struct nlmsghdr*)raw2.c_str();
auto* nlh = (const struct nlmsghdr*)raw2.data();
auto* msg = reinterpret_cast<struct inet_diag_msg*>(NLMSG_DATA(nlh));
p.Stash(msg->idiag_family, IPPROTO_TCP, msg->id, nlh);
Print(nlh);
}
{
auto* nlh = (const struct nlmsghdr*)raw8.c_str();
auto* nlh = (const struct nlmsghdr*)raw8.data();
auto* msg = reinterpret_cast<struct inet_diag_msg*>(NLMSG_DATA(nlh));
p.Stash(msg->idiag_family, IPPROTO_TCP, msg->id, nlh);
Print(nlh);
}
{
auto* nlh = (const struct nlmsghdr*)raw9.c_str();
auto* nlh = (const struct nlmsghdr*)raw9.data();
auto* msg = reinterpret_cast<struct inet_diag_msg*>(NLMSG_DATA(nlh));
p.Stash(msg->idiag_family, IPPROTO_TCP, msg->id, nlh);
Print(nlh);
Expand All @@ -551,7 +563,7 @@ TEST(Poller, StashAndOnClose) {

// Try another round. Same message should NOT trigger on_new_state.
{
auto* nlh = (const struct nlmsghdr*)raw9.c_str();
auto* nlh = (const struct nlmsghdr*)raw9.data();
auto* msg = reinterpret_cast<struct inet_diag_msg*>(NLMSG_DATA(nlh));
p.Stash(msg->idiag_family, IPPROTO_TCP, msg->id, nlh);
Print(nlh);
Expand Down