In [1]:
import os
import gzip
import json
import ipaddress
import re
import datetime
import copy

import numpy as np
import pandas as pd

from util import *
from histogram_functions import *

In [2]:
# Some common functions to the various parsing/merging functions
def is_valid_date(datestr):
    return re.match("^\d{4}-\d{2}-\d{2}$", datestr) is not None

def is_epping_filename(filename):
    return re.match("^pping\..*\.json.\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[\+\-]\d{2}:\d{2}(\.gz)?$", 
                    filename) is not None

def get_epping_files(root_folder):
    files = []
    for datedir in os.scandir(root_folder):
        if not (datedir.is_dir() and is_valid_date(datedir.name)):
            continue
        
        for gzfile in os.scandir(datedir.path):
            if gzfile.is_file() and is_epping_filename(gzfile.name):
                files.append(gzfile.path)
    
    return sorted(files)

def classify_epping_entry_type(entry):
    if "aggregation_interval_ns" in entry:
        return "specification"
    
    if "ip_prefix" in entry:
        return "subnet_stats"
    
    if "protocol_counters" in entry:
        return "global_counters"
    
    return "unknown"

def verify_epping_specification(json_agg_format, nbins=250, bin_width_ms=4, 
                                agg_interval_s=10, ipv4_prefix_len=24, 
                                ipv6_prefix_len=48):
    fields = {"bins": nbins,
              "bin_width_ns": bin_width_ms * 1e6,
              "aggregation_interval_ns": agg_interval_s * 1e9,
              "ipv4_prefix_len": ipv4_prefix_len,
              "ipv6_prefix_len": ipv6_prefix_len}
    
    for field, expected in fields.items():
        if json_agg_format[field] != expected:
            raise ValueError("{} is {}, expected {}".format(
                field, json_agg_format[field], expected))
    return True

def get_epping_json_entries(filename, verify_specification=True, **kwargs):
    with gzip.open(filename) as jfile:
        entries = json.load(jfile)
    
    if not classify_epping_entry_type(entries[0]) == "specification":
        raise ValueError("{} does not start with a specification entry".format(filename))
    
    for entry in entries[1:]:
        if classify_epping_entry_type(entry) == "specification":
            raise ValueError("{} contains multiple specification entires".format(filename))
    
    if verify_specification:
        verify_epping_specification(entries[0], **kwargs)
    
    return entries

def timestamp_to_datetime(timestamps, timezone=None):
    dt = pd.to_datetime(timestamps, unit="ns", utc=True)
    if timezone is not None:
        dt = dt.tz_convert(timezone)
    
    return dt

def datetime_to_timestamp(dt, origin=np.datetime64("1970-01-01T00:00:00"), unit="ns"):
    if isinstance(dt, str):
        dt = pd.to_datetime(dt)
    
    if hasattr(dt, "tz") and dt.tz is not None:
        dt = dt.tz_convert("UTC")
        dt = dt.tz_localize(None)
    
    return (dt - origin) // np.timedelta64(1, unit)

def is_in_subnets(subnet, include_subnets):
    subnet = netify(subnet)
    include_subnets = [netify(net) for net in include_subnets]
    
    for net in include_subnets:
        if subnet.version == net.version and subnet.subnet_of(net):
            return True
    return False

def _get_subnet_type(subnet, lan_subnets=None):
    if lan_subnets is None:
        if subnet.is_private:
            return "LAN"
    else:
        for lan in lan_subnets:
            if subnet.version == lan.version and subnet.subnet_of(lan):
                return "LAN"
    
    return "WAN" if subnet.is_global else "suspicious"

def _new_subnet_entry(entry, subnet_enum_state, lan_subnets, histogram_bins):
    subnet = ipaddress.ip_network(entry["ip_prefix"])
    new_entry = {"ip_version": subnet.version, 
                 "prefix_length": subnet.prefixlen,
                 "subnet_num": subnet_enum_state["next_num"],
                 "type": _get_subnet_type(subnet, lan_subnets),
                 "reports": 1, 
                 "first_instance": entry["timestamp"],
                 "last_instance": entry["timestamp"]}
    
    subnet_enum_state["next_num"] += 1
    
    new_entry["rx_stats"] = copy.deepcopy(entry["rx_stats"])
    new_entry["tx_stats"] = copy.deepcopy(entry["tx_stats"])
    
    new_entry["rtt_min"] = entry.get("min_rtt")
    new_entry["rtt_max"] = entry.get("max_rtt")
    
    new_entry["rtt_hist"] = entry.get("histogram", [])
    new_entry["rtt_hist"] += [0] * (histogram_bins - len(new_entry["rtt_hist"]))
    
    return new_entry

def _update_subnet_entry(tot, new_stats):
    tot["reports"] += 1
    
    t = new_stats["timestamp"]
    if t < tot["first_instance"]:
        tot["first_instance"] = t
    elif t > tot["last_instance"]:
        tot["last_instance"] = t
    
    _update_rxtx_ttype_counters(tot["rx_stats"], new_stats["rx_stats"])
    _update_rxtx_ttype_counters(tot["tx_stats"], new_stats["tx_stats"])
    
    if "min_rtt" in new_stats and "max_rtt" in new_stats:
        if tot["rtt_min"] is None or new_stats["min_rtt"] < tot["rtt_min"]:
            tot["rtt_min"] = new_stats["min_rtt"]
        if tot["rtt_max"] is None or new_stats["max_rtt"] > tot["rtt_max"]:
            tot["rtt_max"] = new_stats["max_rtt"]
    
    if "histogram" in new_stats:
        add_to_histogram(tot["rtt_hist"], new_stats["histogram"])

def _update_rxtx_ttype_counters(to_stats, from_stats):
    for ttype in from_stats.keys():
        
        if ttype in to_stats:
            for counter in from_stats[ttype]:
                to_stats[ttype][counter] += from_stats[ttype][counter]
        else:
            to_stats[ttype] = copy.deepcopy(from_stats[ttype])
    
    return to_stats

### Merge all traffic per subnet (ignore time)

In [4]:
def parse_total_subnet_stats(root_folder, specification_kwargs={}, lan_subnets=[], report_freq=1440):
    subnet_stats = dict()
    subnet_enum_state = {"next_num": 0}
    
    nfiles = 0
    nentries = 0
    
    lan_subnets = [ipaddress.ip_network(lan) for lan in lan_subnets]
    files = get_epping_files(root_folder)
    
    for file in files:
        entries = get_epping_json_entries(file, verify_specification=True, 
                                          **specification_kwargs)
        
        nbins = entries[0]["bins"]
        
        for entry in entries:
            if classify_epping_entry_type(entry) != "subnet_stats":
                continue
            
            subnet = entry["ip_prefix"]
            if subnet in subnet_stats:
                _update_subnet_entry(subnet_stats[subnet], entry)
            else:
                subnet_stats[subnet] = _new_subnet_entry(entry, subnet_enum_state, 
                                                         lan_subnets, nbins)
            
            nentries += 1
        
        nfiles += 1
        
        if (report_freq > 0):
            if (nfiles % report_freq == 0 or nfiles == len(files)):
                print("{}: Parsed {}/{} files, containing {} entries".format(
                    datetime.datetime.now(), nfiles, len(files), nentries))
    
    return subnet_stats

In [5]:
%%time

lan_subnets = ("100.64.0.0/24",
               "2602:fdca:800::/48")

subnet_stats = parse_total_subnet_stats("data/raw", lan_subnets=lan_subnets)

2024-08-22 10:11:32.025234: Parsed 1440/46522 files, containing 10467910 entries
2024-08-22 10:14:33.898976: Parsed 2880/46522 files, containing 20668102 entries
2024-08-22 10:17:55.285506: Parsed 4320/46522 files, containing 31254943 entries
2024-08-22 10:21:11.958352: Parsed 5760/46522 files, containing 41560292 entries
2024-08-22 10:24:39.517049: Parsed 7200/46522 files, containing 52155260 entries
2024-08-22 10:28:19.232153: Parsed 8640/46522 files, containing 63159910 entries
2024-08-22 10:32:06.270317: Parsed 10080/46522 files, containing 74321577 entries
2024-08-22 10:35:46.670234: Parsed 11520/46522 files, containing 85301424 entries
2024-08-22 10:39:22.196905: Parsed 12960/46522 files, containing 95902501 entries
2024-08-22 10:42:29.206985: Parsed 14400/46522 files, containing 104634489 entries
2024-08-22 10:46:16.228337: Parsed 15840/46522 files, containing 115385861 entries
2024-08-22 10:50:12.228419: Parsed 17280/46522 files, containing 126557607 entries
2024-08-22 10:54:04

In [6]:
%%time

with open("data/processed/subnet_stats_merged.json", "wt") as ofile:
    json.dump(subnet_stats, ofile)

CPU times: user 28.3 s, sys: 172 ms, total: 28.5 s
Wall time: 28.5 s


### Merge specific subnets with specific time resolution (ignore separate subnets)

In [7]:
def parse_subnets_over_time(root_folder, include_subnets=None, exclude_subnets=None, 
                            aggfreq_s=None, tz=None, specification_kwargs={}, report_freq=1440):
    time_stats = dict()
    subnet_enum_state = {"next_num": 0}
    
    nfiles = 0
    nentries = 0
    
    if include_subnets is not None:
        include_subnets = [netify(net) for net in include_subnets]
    if exclude_subnets is not None:
        exclude_subnets = [netify(net) for net in exclude_subnets]
    
    files = get_epping_files(root_folder)
    
    for file in files:
        entries = get_epping_json_entries(file, verify_specification=True, 
                                          **specification_kwargs)
        
        nbins = entries[0]["bins"]
        
        for entry in entries:
            if classify_epping_entry_type(entry) != "subnet_stats":
                continue
            
            subnet = netify(entry["ip_prefix"])
            if (include_subnets is not None and 
                not is_in_subnets(subnet, include_subnets)):
                continue
            if (exclude_subnets is not None and 
                is_in_subnets(subnet, exclude_subnets)):
                continue
            
            t = entry["timestamp"]
            if aggfreq_s is not None:
                t = int_ceil(t, aggfreq_s * 1000000000)
            
            if t in time_stats:
                _update_subnet_entry(time_stats[t], entry)
            else:
                time_stats[t] = _new_subnet_entry(entry, subnet_enum_state, 
                                                  None, nbins)
            
            nentries += 1
        
        nfiles += 1
        
        if (report_freq > 0):
            if (nfiles % report_freq == 0 or nfiles == len(files)):
                print("{}: Parsed {}/{} files, containing {} entries".format(
                    datetime.datetime.now(), nfiles, len(files), nentries))
    
    return time_stats

In [8]:
%%time
target_subnets = ("100.64.0.0/24",
                  "2602:fdca:800::/48")

lan_per10s_stats = parse_subnets_over_time("data/raw/", 
                                           include_subnets=target_subnets, 
                                           aggfreq_s=None)

2024-08-22 12:16:54.768261: Parsed 1440/46522 files, containing 17282 entries
2024-08-22 12:21:13.436362: Parsed 2880/46522 files, containing 34568 entries
2024-08-22 12:25:42.524904: Parsed 4320/46522 files, containing 51854 entries
2024-08-22 12:30:09.234223: Parsed 5760/46522 files, containing 69140 entries
2024-08-22 12:34:36.515740: Parsed 7200/46522 files, containing 86426 entries
2024-08-22 12:39:20.269778: Parsed 8640/46522 files, containing 103712 entries
2024-08-22 12:44:08.696613: Parsed 10080/46522 files, containing 120998 entries
2024-08-22 12:48:51.233737: Parsed 11520/46522 files, containing 138284 entries
2024-08-22 12:53:15.168423: Parsed 12960/46522 files, containing 155097 entries
2024-08-22 12:56:55.584750: Parsed 14400/46522 files, containing 168985 entries
2024-08-22 13:01:22.258349: Parsed 15840/46522 files, containing 186271 entries
2024-08-22 13:05:58.685243: Parsed 17280/46522 files, containing 203555 entries
2024-08-22 13:10:30.360365: Parsed 18720/46522 file

In [9]:
with open("data/processed/lan_per10s_stats_merged.json", "wt") as ofile:
        json.dump(lan_per10s_stats, ofile)

In [10]:
%%time
exclude_subnets = ("100.64.0.0/24",
                  "2602:fdca:800::/48")

wan_per10s_stats = parse_subnets_over_time("data/raw/", 
                                           exclude_subnets=exclude_subnets, 
                                           aggfreq_s=None)

2024-08-22 14:48:40.620533: Parsed 1440/46522 files, containing 10450628 entries
2024-08-22 14:53:54.472250: Parsed 2880/46522 files, containing 20633534 entries
2024-08-22 14:59:20.898017: Parsed 4320/46522 files, containing 31203089 entries
2024-08-22 15:04:32.941195: Parsed 5760/46522 files, containing 41491152 entries
2024-08-22 15:09:55.742156: Parsed 7200/46522 files, containing 52068834 entries
2024-08-22 15:15:25.222686: Parsed 8640/46522 files, containing 63056198 entries
2024-08-22 15:21:06.889845: Parsed 10080/46522 files, containing 74200579 entries
2024-08-22 15:26:42.186120: Parsed 11520/46522 files, containing 85163140 entries
2024-08-22 15:32:03.225903: Parsed 12960/46522 files, containing 95747404 entries
2024-08-22 15:36:33.235220: Parsed 14400/46522 files, containing 104465504 entries
2024-08-22 15:42:01.671108: Parsed 15840/46522 files, containing 115199590 entries
2024-08-22 15:47:40.985922: Parsed 17280/46522 files, containing 126354052 entries
2024-08-22 15:53:24

In [11]:
with open("data/processed/wan_per10s_stats_merged.json", "wt") as ofile:
    json.dump(wan_per10s_stats, ofile)

### Parse all global counters (protocol, ECN and errors)

In [12]:
def parse_global_counters(root_folder, specification_kwargs={}, report_freq=1440, tz=None):
    proto_entries = []
    ecn_entries = []
    error_entries = []
    
    nfiles = 0
    nentries = 0
    
    
    files = get_epping_files(root_folder)
    for file in files:
        entries = get_epping_json_entries(file, verify_specification=True, 
                                          **specification_kwargs)
        
        for entry in entries:
            if classify_epping_entry_type(entry) != "global_counters":
                continue
            
            t = entry["timestamp"]
            
            if len(entry["protocol_counters"]) > 0:
                proto_entries.append(parse_protocol_counters(t, entry["protocol_counters"]))
            
            if len(entry["ecn_counters"]) > 0:
                ecn_entries.append(parse_ecn_counters(t, entry["ecn_counters"]))
            
            if len(entry["errors"]) > 0:
                error_entries.append(parse_error_counters(t, entry["errors"]))
            
            nentries += 1
        
        nfiles += 1
        
        if (report_freq > 0):
            if (nfiles % report_freq == 0 or nfiles == len(files)):
                print("{}: Parsed {}/{} files, containing {} entries".format(
                    datetime.datetime.now(), nfiles, len(files), nentries))
    
    proto_stats = _records_to_dataframe(proto_entries, tz)
    ecn_stats = _records_to_dataframe(ecn_entries, tz)
    error_stats = _records_to_dataframe(error_entries, tz)
    
    return proto_stats, ecn_stats, error_stats

def _records_to_dataframe(stats_records, tz=None):
    df = pd.DataFrame.from_records(stats_records)
    if len(df) == 0:
        return df
    
    df["timestamp"] = timestamp_to_datetime(df["timestamp"].values, timezone=tz)
    df.fillna(value=0, inplace=True)
    for col in df.columns:
        if "float" in str(df[col].dtype):
            df[col] = df[col].astype("int64")
    
    return df

def parse_protocol_counters(timestamp, protocol_counters):
    counters = {"timestamp": timestamp}
    
    for protocol in protocol_counters.keys():
        for counter, val in protocol_counters[protocol].items():
            counter_key = "{}_{}".format(protocol, counter)
            counters[counter_key] = val
    
    return counters

def parse_ecn_counters(timestamp, ecn_counters):
    counters = {"timestamp": timestamp}
    counters.update(ecn_counters)
    return counters

def parse_error_counters(timestamp, error_counters):
    counters = {"timestamp": timestamp}
    counters.update(error_counters)
    return counters

In [13]:
%%time
proto, ecn, error = parse_global_counters("data/raw/", tz="US/Mountain")

2024-08-22 17:45:45.033902: Parsed 1440/46522 files, containing 8641 entries
2024-08-22 17:48:44.688689: Parsed 2880/46522 files, containing 17284 entries
2024-08-22 17:51:49.276783: Parsed 4320/46522 files, containing 25927 entries
2024-08-22 17:54:49.044725: Parsed 5760/46522 files, containing 34570 entries
2024-08-22 17:57:52.097774: Parsed 7200/46522 files, containing 43213 entries
2024-08-22 18:01:02.458347: Parsed 8640/46522 files, containing 51856 entries
2024-08-22 18:04:20.532765: Parsed 10080/46522 files, containing 60499 entries
2024-08-22 18:07:30.200843: Parsed 11520/46522 files, containing 69142 entries
2024-08-22 18:10:35.574635: Parsed 12960/46522 files, containing 77784 entries
2024-08-22 18:13:10.416174: Parsed 14400/46522 files, containing 86416 entries
2024-08-22 18:16:17.474311: Parsed 15840/46522 files, containing 95059 entries
2024-08-22 18:19:36.532610: Parsed 17280/46522 files, containing 103701 entries
2024-08-22 18:22:47.463154: Parsed 18720/46522 files, cont

In [14]:
proto.to_hdf("data/processed/protocol_counters.hdf5", "data", 
             complevel=9)
ecn.to_hdf("data/processed/ecn_counters.hdf5", "data", 
           complevel=9)
error.to_hdf("data/processed/error_counters.hdf5", "data", 
             complevel=9)

### Merge per ASN with specific time resolution

In [3]:
def parse_asn_stats_per_period(root_folder, asn_map, include_asns=None, exclude_asns=None,
                               time_range=None, aggfreq_s=None, tz=None, lan_subnets=None, 
                               specification_kwargs={}, report_freq=1440):
    asn_stats = dict()
    subnet_enum_state = {"next_num": 0}
    
    nfiles = 0
    nentries = 0
    
    if include_asns is not None:
        include_asns = set(include_asns)
    if exclude_asns is not None:
        exclude_asns = set(exclude_asns)
    if time_range is not None:
        time_range = (datetime_to_timestamp(time_range[0], unit="ns"), 
                      datetime_to_timestamp(time_range[1], unit="ns"))
    if lan_subnets is not None:
        lan_subnets = [netify(net) for net in lan_subnets]
    
    files = get_epping_files(root_folder)
    
    for file in files:
        entries = get_epping_json_entries(file, verify_specification=True, 
                                          **specification_kwargs)
        
        nbins = entries[0]["bins"]
        
        for entry in entries:
            if classify_epping_entry_type(entry) != "subnet_stats":
                continue
            
            asn = asn_map.get(entry["ip_prefix"], -3)
            if include_asns is not None and asn not in include_asns:
                continue
            if exclude_asns is not None and asn in exclude_asns:
                continue
            
            t = entry["timestamp"]            
            if (time_range is not None 
                and (t < time_range[0] or t > time_range[1])):
                continue
                
            if aggfreq_s is not None:
                t = int_floor(t, aggfreq_s * 1000000000)
            
            key = str(asn) + "-" + str(t)
            if key in asn_stats:
                _update_subnet_entry(asn_stats[key], entry)
            else:
                asn_stats[key] = _new_subnet_entry(entry, subnet_enum_state, 
                                                   lan_subnets, nbins)
            
            nentries += 1
        
        nfiles += 1
        
        if (report_freq > 0):
            if (nfiles % report_freq == 0 or nfiles == len(files)):
                print("{}: Parsed {}/{} files, containing {} entries".format(
                    datetime.datetime.now(), nfiles, len(files), nentries))
    
    return asn_stats

In [4]:
# Pre-computed list of the top 100 ASNs with most downloaded bytes in our dataset
with open("data/processed/top100asn_list.json", "rt") as infile:
    top100asn = json.load(infile)
    
# Use a pre-processed map of every /24 subnet -> ASN in our dataset
# instead of using pyasn to speedup ASN lookups
with open("data/processed/subnet_to_asn_map.json", "rt") as infile:
    asn_map = json.load(infile)

In [5]:
%%time
asn_stats = parse_asn_stats_per_period("data/raw/", asn_map, 
                                       include_asns=top100asn, 
                                       aggfreq_s=600)

2024-08-27 12:43:54.721041: Parsed 1440/46522 files, containing 9563444 entries
2024-08-27 12:46:14.799966: Parsed 2880/46522 files, containing 18824260 entries
2024-08-27 12:48:50.961759: Parsed 4320/46522 files, containing 28407439 entries
2024-08-27 12:51:28.327399: Parsed 5760/46522 files, containing 37791179 entries
2024-08-27 12:54:14.723432: Parsed 7200/46522 files, containing 47403567 entries
2024-08-27 12:57:11.124007: Parsed 8640/46522 files, containing 57398903 entries
2024-08-27 13:00:12.615331: Parsed 10080/46522 files, containing 67548860 entries
2024-08-27 13:03:12.494616: Parsed 11520/46522 files, containing 77498873 entries
2024-08-27 13:06:08.288552: Parsed 12960/46522 files, containing 87070573 entries
2024-08-27 13:08:35.544540: Parsed 14400/46522 files, containing 95022299 entries
2024-08-27 13:11:40.597842: Parsed 15840/46522 files, containing 104779737 entries
2024-08-27 13:14:52.048099: Parsed 17280/46522 files, containing 114917587 entries
2024-08-27 13:18:00.5

In [6]:
%%time
with open("data/processed/top100_asn_stats_10min.json", "wt") as ofile:
    json.dump(asn_stats, ofile)

CPU times: user 44.9 s, sys: 228 ms, total: 45.1 s
Wall time: 45 s
