## UDP Traceroute Connectivity by Probe

This notebook uses _existing_ RIPE Atlas measurements to determine how many RIPE Atlas probes might have issues with UDP connectivity. The basic methodology is as follows:

- Examine measurement metadata to find UDP traceroutes from many probes, over time (currently measurements ending in 2015, or still ongoing).
- Take latency to target and target reachability information from each sample. For samples where the target is not reached but some hops did respond, take the latest hop.
- Group reachability information by probe, and classify probes by how confident we are UDP is broken on them.

This notebook searches the RIPE atlas measurement metadata archive (available from [ftp.ripe.net](ftp://ftp.ripe.net/atlas/measurements)) for UDP and TCP traceroute measurements, and uses the Atlas API to download measurements and cache them locally.

### Preamble

Set up the environment and define functions we'll use later.

In [1]:
# Some jupyter magic to set up the environment correctly
%load_ext autoreload
%autoreload 2
%matplotlib inline

# thanks for letting me know about your plans but i don't really care
import warnings
warnings.simplefilter(action = "ignore", category = FutureWarning)

# things we need, things to make us go
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import collections
import warnings
import hashlib
import requests
import os.path
import time
import calendar
import itertools
import json

# HDF5 store for stashing parsed things
store = pd.HDFStore('data_cache/udp_traceconn.h5')

##############################################################################
# Trace metadata named tuple,
# and generator for extracting these from the all-measurements-fjson.txt file.
##############################################################################

TraceMeta = collections.namedtuple("TraceMeta", 
                ("msm_id", "af", "proto", "target", 
                 "start_epoch", "stop_epoch", "interval", "probes"))

def tm_generator(first_start=0, last_start=calendar.timegm(time.gmtime()), skip_lines = 0):
    current_time = calendar.timegm(time.gmtime())
    
    with open("data_cache/all-measurements-fjson.txt") as fjf:
        for num, line in enumerate(fjf):
            if num < skip_lines:
                continue
            
            mm = json.loads(line)
                
            if (((mm['type']['id'] == 2) or (mm['type']['id'] == 4)) and
                ((mm['status']['id'] == 2) or (mm['status']['id'] == 4)) and
                (('protocol' in mm) or ('proto_tcp' in mm)) and
                (mm['start_time'] >= first_start) and
                (mm['start_time'] < last_start) ):
            
                    if mm['status']['id'] == 4:
                        stop_time = mm['stop_time']
                    else:
                        stop_time = current_time
                    
                    if 'proto_tcp' in mm and mm['proto_tcp']:
                        proto = 'TCP'
                    else:
                        proto = mm['protocol']
                        
                    yield TraceMeta(mm['msm_id'], mm['af'], proto, mm['dst_addr'], 
                                    mm['start_time'], stop_time, mm['interval'],
                                    mm['participant_count'])

##############################################################################
# MSM retrieval code
##############################################################################

def get_msm(msm_id, gen, cachedir=None, start=None, stop=None):
    """
    Given an MSM, fetch it from the cache or from the RIPE Atlas API.
    Yield each separate result according to the generation function.
    """
    url = "https://atlas.ripe.net/api/v1/measurement/%u/result/" % (msm_id,)

    params = {"format": "json"}
    if start is not None and stop is not None:
        params["start"] = str(start)
        params["stop"] = str(stop)
    
    if cachedir and os.path.isdir(cachedir):
        filepath = os.path.join(cachedir, "measurement", "%u.json" % (msm_id,))

        # download if not present
        if not os.path.isfile(filepath):
            with open(filepath, mode="wb") as file:
                print("Cache miss, retrieving "+url)
                res = requests.get(url, params=params)

                if not res.ok:
                    raise "Atlas measurement API request failed: "+repr(res.json())
                
                file.write(res.content)

        # then read from cache
        with open(filepath) as stream:
            yield from gen(json.loads(stream.read()))

    else:
        # just read from the net
        res = requests.get(url, params=params)
        yield from gen(json.loads(res.content.decode("utf-8")))

##############################################################################
# Connectivity sample named tuple,
# connectivity sample generator for use with get_msm(),
# and dataframe creation function wrapping all of this together
##############################################################################

TCSample = collections.namedtuple("TCSample",
                     ("msm_id","time","af","proto","pid","sip","dip","reached","hop","rtt"))

def gen_tcs(msm_ary):
    for a_res in msm_ary:        
        if a_res['type'] == 'traceroute':
            if ('result' in a_res):
                maxhop = 0
                maxhop_ok = False
                for hop in a_res['result']:
                    if 'result' in hop:
                        for pkt in hop['result']:
                            if 'from' in pkt:
                                maxhop = hop['hop']
                                if pkt['from'] == a_res['dst_addr']:
                                    maxhop_ok = True
                                    if 'rtt' in pkt:
                                        rtt = pkt['rtt']
                                    else:
                                        rtt = -1
                                    yield TCSample(a_res['msm_id'],
                                           int(a_res['timestamp']), a_res['af'], a_res['proto'],
                                           a_res['prb_id'], a_res['src_addr'], a_res['dst_addr'],
                                           True, maxhop, rtt)
                                    break
                if not maxhop_ok:
                    try:
                        yield TCSample(a_res['msm_id'],
                                       int(a_res['timestamp']), a_res['af'], a_res['proto'],
                                       a_res['prb_id'], a_res['src_addr'], a_res['dst_addr'],
                                       False, maxhop, -1)
                    except KeyError:
                        # ignore completely broken results
                        pass

def tcsample_dataframe(msm_ids, cachedir=None, start=None, stop=None, chunksize=1000000):
    """
    Given an iterable of MSMs, create a dataframe of trace connectivity samples.
    """
    
    # initialize accumulators
    adf = pd.DataFrame([], columns=TCSample._fields)
    data = []
    
    # get individual rows from get_msm
    for msm_id in msm_ids:
        for tcs in get_msm(msm_id, gen=gen_tcs, 
                           start=start, stop=stop, 
                           cachedir=cachedir):
            data.append(tcs)
            
            # Append dataframe to dataframe accumulator if chunking.
            if len(data) >= chunksize:
                adf = adf.append(pd.DataFrame(data, columns=TCSample._fields), ignore_index=True)
                data = []
                
    # Append final dataframe if non-empty.
    if len(data) > 0:
        adf = adf.append(pd.DataFrame(data, columns=TCSample._fields), ignore_index=True)

    # Counter column for aggregation
    adf['n'] = 1
    
    return adf

##############################################################################
# Plotting utility function
##############################################################################

def plot_ecdf(a, **kwargs):
    sa = np.sort(a)
    yv = np.arange(len(sa))/float(len(sa))
    plt.plot(sa, yv, **kwargs)

### Step 1b, 2b: store/restore preparsed raw dataframes in HDF

Do this instead of steps 1 and 2 if not reworking parsing and raw dataframs selection. It's silly fast.

In [None]:
## Restore pre-parsed dataframes from the store, and re-split them as necessary
%time tmdf = store['tmdf']
%time tcsdf = store['tcsdf']

### Step 1: find MSM ids for UDP traceroutes 

Here we limit ourselves to MSMs run (1) from many probes, to maximize the number of probes we'll see samples from, and (2) ending in 2015 or later, to focus on the recent past. Note that many of these are MSMs we specified ourselves for our TMA paper. Not many people seem to be interested in UDP traceroute.

In [2]:
# get ALL the traceroutes! (takes about five minutes on Forclaz)
tmgen = tm_generator()
%time tmdf = pd.DataFrame([m for m in tmgen], columns=TraceMeta._fields)
tmgen.close() # generator wraps a file, close it.

# estimate sample count
tmdf['samples'] = (tmdf['probes'] * (tmdf['stop_epoch'] - tmdf['start_epoch'])) / tmdf['interval']

# cast timestamps
tmdf['start'] = pd.to_datetime(tmdf['start_epoch'] * 1e9)
tmdf['stop'] = pd.to_datetime(tmdf['stop_epoch'] * 1e9)
tmdf['duration'] = tmdf['stop'] - tmdf['start']

# count msms
tmdf['n'] = 1

# index by msm
tmdf.index = pd.Index(tmdf['msm_id'])
del(tmdf['msm_id'])

# Dump dataframe into the HDF store
store['tmdf'] = tmdf

# exclude all measurements ending before 1 Jan 2015, and split by protocol
tmdf_udp = tmdf[tmdf['proto'] == 'UDP']
tmdf_udp = tmdf_udp[tmdf_udp["stop"] >= "2015-01-01"]
tmdf_tcp = tmdf[tmdf['proto'] == 'TCP']
tmdf_tcp = tmdf_tcp[tmdf_tcp["stop"] >= "2015-01-01"]
tmdf_icmp = tmdf[tmdf['proto'] == 'TCP']
tmdf_icmp = tmdf_icmp[tmdf_icmp["stop"] >= "2015-01-01"]

# find appropriate MSMs
msm_ids = tmdf_udp[tmdf_udp['probes'] >= 64].index.values

CPU times: user 5min 6s, sys: 6.03 s, total: 5min 12s
Wall time: 5min 13s


In [3]:
len(msm_ids)

8436

### Step 2: Retrieve MSMs and parse them into a dataframe

In [4]:
%time tcsdf = tcsample_dataframe(msm_ids, cachedir="data_cache", chunksize=100000)
store['tcsdf'] = tcsdf

CPU times: user 7min, sys: 16.7 s, total: 7min 17s
Wall time: 7min 31s


your performance may suffer as PyTables will pickle object types that it cannot
map directly to c-types [inferred_type->mixed,key->block2_values] [items->['proto', 'sip', 'dip', 'reached']]

  exec(code_obj, self.user_global_ns, self.user_ns)


### Step 3: classify probe IDs by UDP reachability 

First, clean the data: drop all samples from MSMs where no traceroute reached the target (target always down), then all samples from probes with less than n=9 samples.

In [59]:
msm_reached_sum = tcsdf.groupby('msm_id')['reached'].sum()
bad_target_msm_ids = msm_reached_sum[msm_reached_sum == 0].index
tcsdf_target_udp_ok = tcsdf[np.logical_not(tcsdf['msm_id'].isin(bad_target_msm_ids))]
pid_sum = tcsdf.groupby('pid')['n'].sum()
low_volume_pids = pid_sum[pid_sum < 9].index
tcsdf_clean = tcsdf_target_udp_ok[np.logical_not(tcsdf_target_udp_ok['pid'].isin(low_volume_pids))]

Headline number: how many probes might block UDP?

In [60]:
reached_by_pid = tcsdf_clean.groupby('pid')['reached'].sum()
print ("%u / %u (%5.3f%%)" % (len(reached_by_pid[reached_by_pid == 0]),
                            len(reached_by_pid),
                            100 * len(reached_by_pid[reached_by_pid == 0]) / len(reached_by_pid)))

82 / 2240 (3.661%)


# Scratch code

Sandbox, dirtpile, etc

In [35]:
len(tcsdf_target_udp_ok.groupby('pid')['reached'].sum())

3763

In [None]:
plot_ecdf(tcsdf.groupby('pid').reached.sum() / tcsdf.groupby('pid').n.sum())

In [None]:
plot_ecdf(tcsdf[np.logical_not(tcsdf['reached'])]['hop'])
plt.xlim(0,32)

In [None]:
plot_ecdf(tcsdf[tcsdf['reached']]['hop'])
plot_ecdf(tcsdf[np.logical_not(tcsdf['reached'])]['hop'])