Skip to content

Commit

Permalink
Merge pull request #41 from stephen-soltesz/add-byte-counts
Browse files Browse the repository at this point in the history
 Track transmit and received bytes per experiment index
  • Loading branch information
stephen-soltesz committed Jul 20, 2018
2 parents 83cafd5 + 7ce632c commit 6ee1b5c
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 42 deletions.
73 changes: 51 additions & 22 deletions exitstats.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@
PROMETHEUS_SERVER_PORT = 9090
connection_count = prom.Counter('sidestream_connection_count',
'Count of connections logged',
['type', 'lsb'])
['type', 'index'])
transmit_bytes = prom.Counter('sidestream_transmit_bytes_total',
'Count of bytes transmitted per experiment index',
['type', 'index'])
receive_bytes = prom.Counter('sidestream_receive_bytes_total',
'Count of bytes received per experiment index',
['type', 'index'])
exception_count = prom.Counter('sidestream_exception_count',
'Count of exceptions.',
['type'])
Expand Down Expand Up @@ -69,6 +75,33 @@ def run(self):
t.start()


def octetToIndex(octet):
"""Converts the given octet to an M-Lab experiment index.
Because experiment indexes are zero-based, the host context will be 'host'.
Invalid octets or values that convert to invalid indexes will return
'invalid-octet'.
Args:
octet: int, the last byte of the local IP address.
Returns:
index as string
"""
# M-Lab host addresses start at 9, and there are only 4 x 13-addr blocks.
octet -= 9
if octet < 0 or octet > 244:
exception_count.labels('invalid octet').inc()
return 'invalid-octet'
# M-Lab uses IPv4/26 address blocks of 64 addresses. Within that block, we
# allocate four machines of 13 addresses each.
index = (octet % 64) % 13
if index == 0:
return 'host'
# M-Lab experiments are zero-based.
return (index-1)


class Web100StatsWriter:
''' TODO - add documentation.
'''
Expand Down Expand Up @@ -215,26 +248,17 @@ def getLogFile(self, local_time, local_ip=None):
self.logs[local_ip] = self.LogInfo(logdir+logname, logf)
return logf


def ipLastSixBits(self, local):
''' Parse the last six bits of an IP address into a decimal string.
'''
if ':' in local:
ipv6 = re.match(':{0,2}[0-9A-Fa-f].*:([0-9A-Fa-f]{1,4})$', local)
if ipv6 == None:
print 'ipv6 address failed to match pattern: ' + local
exception_count.labels('ipv6 parse error').inc()
return 'unparsed'
else:
return '{0}'.format(int(ipv6.group(1),16) % 64)
def ipToIndex(self, local):
"""Convert the last octet of a local IP to an experiment index str."""
# NOTE: due to https://github.com/m-lab/operator/issues/243 the last
# octet of IPv4 and IPv6 addresses should be parsable as base10 values.
last_octet = re.match('.*[:.]([0-9]+)$', local)
if last_octet == None:
print 'address failed to match pattern: ' + local
exception_count.labels('ip address parse error').inc()
return 'parse-error'
else:
ipv4 = re.match('[0-9].*\.([0-9]{1,3})$', local)
if ipv4 == None:
print 'ipv4 address failed to match pattern: ' + local
exception_count.labels('ipv4 parse error').inc()
return 'unparsed'
else:
return '{0}'.format(int(ipv4.group(1),10) % 64)
return '{0}'.format(octetToIndex(int(last_octet.group(1),10)))

def connectionType(self, remote):
if remote == '127.0.0.1':
Expand All @@ -259,9 +283,14 @@ def logConnection(self, c):

# Update connection count. Use the least significant bits
# of the local address to distinguish slices.
lsb = self.ipLastSixBits(snap["LocalAddress"])
index = self.ipToIndex(snap["LocalAddress"])
conn_type = self.connectionType(snap["RemAddress"])
connection_count.labels(conn_type, lsb).inc()
connection_count.labels(conn_type, index).inc()

# Count the 'Data*' fields to include retransmit data. TCP/IP headers
# are not included.
transmit_bytes.labels(conn_type, index).inc(snap["DataBytesOut"])
receive_bytes.labels(conn_type, index).inc(snap["DataBytesIn"])

# If it isn't loopback or plc, then log it.
if conn_type.startswith('ipv'):
Expand Down
58 changes: 38 additions & 20 deletions exitstats_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,27 @@ def testConnectionType(self):
self.assertEquals(w.connectionType('1000::0'), 'ipv6')
self.assertEquals(w.connectionType('1.2.3.4'), 'ipv4')

def testIPLastSixBits(self):
def test_ipToIndex(self):
w = exitstats.Web100StatsWriter("server/")
self.assertEquals(w.ipLastSixBits('1:2:3::5'), '5') # 0x05
self.assertEquals(w.ipLastSixBits('1:2:3::b5'), '53') # 0x35
self.assertEquals(w.ipLastSixBits('1.2.3.4'), '4')
self.assertEquals(w.ipLastSixBits('1.2.3.15'), '15')
self.assertEquals(w.ipLastSixBits('1.2.3.157'), '29')
self.assertEquals(w.ipLastSixBits('bad-address'), 'unparsed')
self.assertEquals(w.ipLastSixBits('bad:address'), 'unparsed')
self.assertEquals(w.ipToIndex('1:2:3::567890'), 'invalid-octet') # Invalid octet.
self.assertEquals(w.ipToIndex('1:2:3::5'), 'invalid-octet') # Invalid IP index.
self.assertEquals(w.ipToIndex('1:2:3::9'), 'host') # host IP
self.assertEquals(w.ipToIndex('1:2:3::10'), '0') # index 0 on mlab1
self.assertEquals(w.ipToIndex('1:2:3::23'), '0') # index 0 on mlab2
self.assertEquals(w.ipToIndex('1:2:3::36'), '0') # index 0 on mlab3
self.assertEquals(w.ipToIndex('1:2:3::49'), '0') # index 0 on mlab4
self.assertEquals(w.ipToIndex('1:2:3::48'), 'host') # index 0 on mlab4

self.assertEquals(w.ipToIndex('1.2.3.567890'), 'invalid-octet') # Invalid octet.
self.assertEquals(w.ipToIndex('1.2.3.5'), 'invalid-octet') # Invalid IP index.
self.assertEquals(w.ipToIndex('1.2.3.9'), 'host') # host IP
self.assertEquals(w.ipToIndex('1.2.3.19'), '9') # index 9 on mlab1
self.assertEquals(w.ipToIndex('1.2.3.32'), '9') # index 9 on mlab2
self.assertEquals(w.ipToIndex('1.2.3.45'), '9') # index 9 on mlab3
self.assertEquals(w.ipToIndex('1.2.3.58'), '9') # index 9 on mlab4
self.assertEquals(w.ipToIndex('1.2.3.48'), 'host') # index 9 on mlab4
self.assertEquals(w.ipToIndex('bad-address'), 'parse-error')
self.assertEquals(w.ipToIndex('bad:address'), 'parse-error')


def remove_file(logdir, logname):
Expand Down Expand Up @@ -95,7 +107,8 @@ def testConnectionCount(self):
# Use IP address LSB that tests masking of last 6 bits, and is distinct
# from bits in other tests, so we can verify the count. 121 % 64 = 33
c1.setall({"RemAddress": "5.4.3.2", "LocalAddress": "1.2.3.121",
"LocalPort":432, "RemPort":234})
"LocalPort":432, "RemPort":234,
"DataBytesOut": 0, "DataBytesIn": 0})
stats_writer.setkey(c1.values)

# This triggers a log file creation and a counter increment.
Expand All @@ -115,7 +128,7 @@ def testConnectionCount(self):

# Should see lsb = 57, (121 % 64)
rex = re.compile(
r'^sidestream_connection_count[{]lsb="57",type="ipv4"[}] (.*)$', re.M )
r'^sidestream_connection_count[{]index="8",type="ipv4"[}] (.*)$', re.M )
count_line = rex.search(response)
self.assertIsNotNone(count_line, response)
self.assertEqual(count_line.group(1), '1.0')
Expand Down Expand Up @@ -265,7 +278,8 @@ def testHourRolloverWithLocalIP(self):
c1 = FakeConnection()
c1.cid = 1234
c1.setall({"RemAddress": "5.4.3.2", "LocalAddress": "1.2.3.4",
"LocalPort":432, "RemPort":234})
"LocalPort":432, "RemPort":234,
"DataBytesOut": 0, "DataBytesIn": 0})

logdir = '2014/02/23/server/'
logname10 = '20140223T10:00:00Z_1.2.3.4_0.web100'
Expand All @@ -288,12 +302,12 @@ def testHourRolloverWithLocalIP(self):
stats_writer.logConnection(c1)

self.assertExists(logdir, logname11)
self.assertEqual(os.stat(logdir + logname11).st_size, 111L)
self.assertEqual(os.stat(logdir + logname11).st_size, 140L)
# These should cause additional writes to the same log,
# using the logname cache.
stats_writer.logConnection(c1)
stats_writer.logConnection(c1)
self.assertEqual(os.stat(logdir + logname11).st_size, 217L)
self.assertEqual(os.stat(logdir + logname11).st_size, 254L)

# Clean up
remove_file(logdir, logname10)
Expand All @@ -307,19 +321,23 @@ def testIgnorePLCandLoopback(self):
c1 = FakeConnection()
c1.cid = 1234
c1.setall({"RemAddress": "5.4.3.2", "LocalAddress": "1.2.3.4",
"LocalPort":432, "RemPort":234})
"LocalPort":432, "RemPort":234,
"DataBytesOut": 0, "DataBytesIn": 0})
plc = FakeConnection()
plc.cid = 123
plc.setall({"RemAddress": "128.112.139.23", "LocalAddress": "1.2.3.4",
"LocalPort":432, "RemPort":234})
"LocalPort":432, "RemPort":234,
"DataBytesOut": 0, "DataBytesIn": 0})
loopback4 = FakeConnection()
loopback4.cid = 123
loopback4.setall({"RemAddress": "127.0.0.1", "LocalAddress": "1.2.3.4",
"LocalPort":432, "RemPort":234})
"LocalPort":432, "RemPort":234,
"DataBytesOut": 0, "DataBytesIn": 0})
loopback6 = FakeConnection()
loopback6.cid = 123
loopback6.setall({"RemAddress": "::ffff:7f00:1", "LocalAddress": "1.2.3.4",
"LocalPort":432, "RemPort":234})
"LocalPort":432, "RemPort":234,
"DataBytesOut": 0, "DataBytesIn": 0})

logdir = '2014/02/23/server/'
logname10 = '20140223T10:00:00Z_1.2.3.4_0.web100'
Expand All @@ -342,17 +360,17 @@ def testIgnorePLCandLoopback(self):
stats_writer.logConnection(c1)

self.assertExists(logdir, logname11)
self.assertEqual(os.stat(logdir + logname11).st_size, 111L)
self.assertEqual(os.stat(logdir + logname11).st_size, 140L)
# These should cause additional writes to the same log,
# using the logname cache.
stats_writer.logConnection(c1)
stats_writer.logConnection(c1)
self.assertEqual(os.stat(logdir + logname11).st_size, 217L)
self.assertEqual(os.stat(logdir + logname11).st_size, 254L)
# These should cause all be counted, but not logged.
stats_writer.logConnection(plc)
stats_writer.logConnection(loopback4)
stats_writer.logConnection(loopback6)
self.assertEqual(os.stat(logdir + logname11).st_size, 217L)
self.assertEqual(os.stat(logdir + logname11).st_size, 254L)

# Clean up
remove_file(logdir, logname10)
Expand Down

0 comments on commit 6ee1b5c

Please sign in to comment.