# Extract

Load input data

In [None]:
import os
import glob
import pandas as pd

SOURCE_HOST = "tau"


def read_csvs(directory: str) -> pd.DataFrame:
    files = sorted(glob.glob(os.path.join(directory, "*.csv")))
    df_list = [pd.read_csv(file, index_col=0, parse_dates=True) for file in files]
    return pd.concat(df_list).sort_index()


df_emp = read_csvs(f"../../input_data/{SOURCE_HOST}/systemd/ip_accounting/")
df_tp = read_csvs(f"../../input_data/{SOURCE_HOST}/tracepoints/net/")

# Transform

## Step 1: Discard useless data

Strip unnecessary data:

- Tracepoint data includes `peer_id`, `peer_conn_type`, `peer_addr`, `flow`
(traffic directory, i.e. in- or outbound), `msg_type` and `size`. Of these, only
`flow`, `msg_type` and `size` are retained in their original form. A new `ipv6`
column is introduced to indicate whether a message was sent via IPv4 or IPv6,
since the version affects IP header sizes used for the traffic estimate.

- Systemd IP accounting data includes rows for `IPIngressPackets`,
`IPEgressPackets`, `IPIngressBytes`, and `IPEgressBytes`. Packet data is
discarded and byte data is converted from absolute to relative (i.e. from bytes
since measurement was started to bytes since previous row/sample).

In [None]:
from pandarallel import pandarallel
import numpy as np

pandarallel.initialize(progress_bar=True)


df_tp = df_tp.dropna()
df_tp["ipv6"] = df_tp["peer_addr"].parallel_apply(lambda x: True if "[" in x else False)
df_tp = df_tp[["ipv6", "flow", "msg_type", "size"]]

df_emp = df_emp.dropna()
df_emp = df_emp[["IPIngressBytes", "IPEgressBytes"]]
df_emp = df_emp.diff()[1:]

## Step 2: Estimate TCP/IP traffic from message sizes

TCP/IP traffic is estimated using the following assumptions:
- MTU size is 1500 bytes (common default)
- Bitcoin protocol overhead is 24 bytes (4-byte magic, 12-byte command, 4-byte
  each for payload length and checksum)
- TCP header size of 32 bytes, comprising 20-byte minimum TCP header size plus 10-byte timestamps option (used by default by the Linux kernel to make real-time round-trip measurements) and two padding bytes to align options to 32-bit boundaries
- IPv4 and v6 header sizes of 20 and 40 bytes (default)

The estimate uses the following approach. First, the application-level message
size is computed by adding the Bitcoin P2P message overhead to the message size.
Next, the number of TCP segments is computed by dividing the application-level
size obtained during the previous step by the maximum segment size (which
corresponds to the MTU minus TCP and IP headers) to compute the number of TCP
segments. Then, the total TCP/IP overhead is computed (number of segments times
TCP and IP header overhead). Moreover, the overhead of ACKs is estimated to be
half of the number of segments times the sum of IP and TCP header sizes, since
generally ACKs are sent for every two packets.  Finally, TCP/IP traffic is
estimated by combining the application-level message size with the total TCP/IP
and ACK overhead.

Next, empirical TCP/IP measurements obtained via systemd accounting are combined
with the estimate so the latter can be validated.

In [None]:
import math
from pandarallel import pandarallel

pandarallel.initialize(progress_bar=True)


def estimate_network_traffic(row):
    MAX_MTU_SIZE = 1500
    BITCOIN_PROTOCOL_OVERHEAD = 24
    TCP_HEADER_SIZE = 32
    IP_HEADER_SIZE = 40 if row["ipv6"] else 20
    ACK_RATIO = 2
    MSS = MAX_MTU_SIZE - IP_HEADER_SIZE - TCP_HEADER_SIZE
    bitcoin_message_size = row["size"] + BITCOIN_PROTOCOL_OVERHEAD
    num_segments = math.ceil(bitcoin_message_size / MSS)
    tcpip_overhead = num_segments * (IP_HEADER_SIZE + TCP_HEADER_SIZE)
    ack_overhead = (num_segments / ACK_RATIO) * (IP_HEADER_SIZE + TCP_HEADER_SIZE)
    return bitcoin_message_size + tcpip_overhead + ack_overhead


df_tp["net_size"] = df_tp.parallel_apply(estimate_network_traffic, axis=1)

### Step 3: Aggregate data (to hourly and daily granularity)

First, the dataframe contaiing empirical data from systemd's IP accounting is
pivoted so it can be aggregated.

Next, the pivoted df and the tracepoint df are aggregated to produce hourly and
daily data.

In [None]:
df_emp = (
    df_emp.rename(columns={"IPIngressBytes": "in", "IPEgressBytes": "out"})[
        ["in", "out"]
    ]
    .stack()
    .rename("net_size")
    .reset_index()
    .rename(columns={"level_1": "flow"})
    .set_index("timestamp")
)


def agg_sum(df, cols, freq, data="net_size"):
    """Aggregate 'data' col based on datetime index with frequency 'freq', using
    summation using 'cols' as differentiator."""
    df_tmp = df.copy()
    df_tmp.index = df_tmp.index.floor(freq)
    df_result = (
        df_tmp.groupby(["timestamp"] + cols)[data]
        .sum()
        .reset_index()
        .set_index("timestamp")
    )
    return df_result


dfs = {
    "est_hourly": agg_sum(df_tp, ["flow", "msg_type"], freq="1h"),
    "est_daily": agg_sum(df_tp, ["flow", "msg_type"], freq="1d"),
    "emp_hourly": agg_sum(df_emp, ["flow"], freq="1h"),
    "emp_daily": agg_sum(df_emp, ["flow"], freq="1d"),
}

## Step 4: Format aggregated data

Pivot `flow` column of dataframes to get `in` and `out` columns.

In [None]:
def pivot(df, index, columns="flow", values="net_size"):
    """Pivot dataframe: keep 'index' as rows, 'columns' as columns and 'values'
    as values.  Set 'timestamp' as new index, fill missing values with zero and
    convert new cols to int."""

    return (
        df.reset_index()
        .pivot(
            index=index,
            columns=columns,
            values=values,
        )
        .rename_axis(None, axis=1)
        .reset_index()
        .set_index("timestamp")
        .fillna(0)
        .astype({"in": "int", "out": "int"})
    )


dfs = {
    "est_hourly": pivot(dfs["est_hourly"], ["timestamp", "msg_type"]),
    "est_daily": pivot(dfs["est_daily"], ["timestamp", "msg_type"]),
    "emp_hourly": pivot(dfs["emp_hourly"], ["timestamp"]),
    "emp_daily": pivot(dfs["emp_daily"], ["timestamp"]),
}

## Step 5: Sanitize data

Whenever the `nix-bitcoin-monitor` systemd service (which performs the data
collection) is restarted, the IP accounting counters are reset to zero. As a
result, `diff()`ing consecutive readings is going to break (think large valule
in previous row followed by small value in next row, leading to negative
values). This is addressed by setting values smaller than zero to zero.

In [None]:
for name, df in dfs.items():
    if not name.startswith("emp_"):
        continue
    for row in ["in", "out"]:
        df.loc[df[row] < 0, row] = 0
    dfs[name] = df

# Load

Store transformation results

In [None]:
from pathlib import Path

data_dir = Path("data")
if not data_dir.exists():
    data_dir.mkdir()

for name, df in dfs.items():
    df.to_csv(f"data/data_{name}.csv.bz2", compression="bz2")