# Iberian outage analysis

We have access to daily data for 2025

In [None]:
%pip install boto3

import os
import re
import time
import gzip
import ipaddress

import pandas as pd
import numpy as np

import boto3
from botocore.utils import fix_s3_host
from botocore.config import Config

In [None]:
# Load secret environment variables
with open(".env") as f:
    for line in f:
        if line.strip() == '' or line.strip().startswith('#'):
            continue
        key, value = line.strip().split('=', 1)
        os.environ[key] = value

# Set up global constants
DATA_DIR = "data/"

# Protocol and IP version constants
PROTOCOL = "ICMP"  # Currently only ICMP is supported
IP_VERSION = 6     # 4 or 6
ANYCAST = True     # Anycast or Unicast

In [None]:
# Create boto3 resource using environment variables
S3 = boto3.resource(
    's3',
    aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
    aws_secret_access_key=os.environ['AWS_ACCESS_KEY_SECRET'],
    endpoint_url=os.environ['AWS_ENDPOINT_URL'],
    # Change timeouts in case we are uploading large files
    config=Config(
        connect_timeout=3, 
        read_timeout=900, 
        retries={"max_attempts":0}
    )
)

# Unregister to ensure requests don’t go to AWS
S3.meta.client.meta.events.unregister('before-sign.s3', fix_s3_host) # type: ignore[attr-defined]

# Use bucket name from environment
HOME_BUCKET = S3.Bucket(os.environ['AWS_BUCKET_NAME']) # type: ignore[attr-defined]

In [None]:
def download_minio_file(
    year: int, month: int, day: int,
    anycast: bool, protocol: str, ip_version: int
) -> str:
    """
    Download a (Manycast/Unicast, IPv4/IPv6) CSV file from MinIO if not already present locally.
    Prints the result.
    """
    prefix = f"manycast/{year}/{month:02}/{day:02}/"
    proto = f"{protocol}v{ip_version}"
    base_pattern = f"MAnycast_{proto}" if anycast else f"GCD_{proto}"

    for obj in HOME_BUCKET.objects.filter(Prefix=prefix):
        filename = obj.key[len(prefix):]
        # Clean up filename for Windows compatibility
        filename = re.sub(r'[:<>"/\\|?*]', '_', filename)
        filepath = os.path.join(DATA_DIR, filename)

        # Only try files matching the pattern and proper extension
        if filename.startswith(base_pattern) and filename.endswith('.csv.gz'):
            print(f"Found file: {filename} (bucket key: {obj.key})")

            # Check if the file already exists locally
            if os.path.exists(filepath):
                print(f"File {filename} already exists locally. Skipping download.")
            else:
                print(f"Downloading {filename} from bucket...")
                os.makedirs(DATA_DIR, exist_ok=True)
                t0 = time.time()
                HOME_BUCKET.download_file(obj.key, filepath)
                t1 = time.time()
                print(f"Download complete in {t1-t0:.2f}s")

            return filepath

    # If no matching file is found, print a message
    print("No matching file found in bucket for provided criteria.")
    raise FileNotFoundError(
        f"No file found for {base_pattern} on {year}-{month:02}-{day:02} in bucket {HOME_BUCKET.name}"
    )

In [None]:
def extract_comments(filepath: str) -> list[str]:
    """
    Prints the leading comment lines (lines starting with '#') from a gzip-compressed CSV file.
    Returns the comment lines as a list.
    """
    # Raise error if file is missing
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"File not found: '{filepath}'")

    try:
        # Open the gzip file in text mode for reading
        with gzip.open(filepath, 'rt', encoding='utf-8') as f:
            # Collect all leading lines that start with '#'
            comment_lines = []
            for line in f:
                if line.startswith('#'):
                    comment_lines.append(line.rstrip())
                else:
                    # Stop at the first data (non-comment) line
                    return comment_lines

    except gzip.BadGzipFile:
        raise gzip.BadGzipFile(f"Invalid gzip file: '{filepath}'")

    except Exception as e:
        raise RuntimeError(f"Error reading file '{filepath}': {e}")
    return []

In [None]:
def extract_hostname_map(comment_lines: list[str]) -> dict[int, str]:
    """
    Create a mapping of Client ID to hostname from comment lines.
    """
    pattern = r"ID:\s*(\d+)\s*,\s*hostname:\s*([\w-]+)"
    hostname_map = {}

    for line in comment_lines:
        # Look for a line containing 'ID' and 'hostname'
        if match := re.search(pattern, line):
            client_id = int(match.group(1))      # Extract and convert ID
            hostname = match.group(2)            # Extract hostname
            hostname_map[client_id] = hostname   # Fill mapping

    # Return the mapping sorted by worker id
    return dict(sorted(hostname_map.items()))

In [None]:
def process_chunk(
    chunk: pd.DataFrame,
    hostname_map: dict[int, str],
    tx_worker_id: int | None = None
) -> pd.DataFrame:
    """
    Filter by tx_worker_id and process a DataFrame chunk to add hostname.
    """
    # Avoid SettingWithCopyWarning
    chunk = chunk.copy()

    # Filter rows on tx_worker_id if supplied
    if tx_worker_id is not None:
        chunk = chunk[chunk['tx_worker_id'] == tx_worker_id]

    # Map hostnames for sender and receiver
    chunk['receiver'] = chunk['rx_worker_id'].map(hostname_map)
    chunk['sender'] = chunk['tx_worker_id'].map(hostname_map)

    # Convert IP-number to ip network /24
    chunk['target'] = chunk['reply_src_addr'].apply(
        lambda x: ipaddress.ip_network(f"{ipaddress.ip_address(int(x))}/24", strict=False) # type: ignore
    )

    # Calculate RTT in seconds
    chunk['rtt'] = (chunk['rx_time'] - chunk['tx_time']) / 1e6

    # Return only the needed columns
    return chunk[['receiver', 'sender', 'target', 'rtt', 'ttl']]

def csv_to_df(
    filepath: str,
    hostname_map: dict[int, str],
    chunksize: int = 1_000_000,
    tx_worker: str | None = None
) -> pd.DataFrame:
    """
    Load a large, gzipped CSV file in chunks and filter rows by a given tx_worker_id.
    Prints progress and a summary upon completion.

    Returns:
        (DataFrame, read_time_sec, process_time_sec)
    """
    filtered_chunks = []
    chunk_num: int = 0

    # Get tx_worker_id if tx_worker name is supplied
    tx_worker_id = None
    if tx_worker is not None:
        try:
            tx_worker_id = next(k for k, v in hostname_map.items() if v.startswith(tx_worker))
        except StopIteration:
            raise ValueError(f"No tx_worker_id found for tx_worker name starting with: {tx_worker}")

    # Read the CSV file in chunks
    print(f"Reading file: {filepath} in chunks of {chunksize:,} rows...")
    t0 = time.time()
    chunks = pd.read_csv(
        filepath,
        compression='gzip',
        comment='#',
        usecols=['rx_worker_id', 'tx_worker_id', 'reply_src_addr', 'rx_time', 'tx_time', 'ttl'],
        dtype={
            'rx_worker_id': 'uint8',
            'tx_worker_id': 'uint8',
            'reply_src_addr': 'uint32' if IP_VERSION == 4 else 'str', # For IPv4 use unit32, IPv6 uses str
            'rx_time': 'float64',
            'tx_time': 'float64',
            'ttl': 'uint8'
        },
        chunksize=chunksize
    )

    # Process each chunk
    t1 = time.time()
    for chunk in chunks:
        filtered_chunk = process_chunk(chunk, hostname_map, tx_worker_id)
        filtered_chunks.append(filtered_chunk)
        chunk_num += 1

        # Print progress
        print(f"Read {chunk_num} chunks", end='\r')

    # Processing complete
    t2 = time.time()
    print(f"\nProcessing complete! Time taken: {t1 - t0:.2f}s (reading) + {t2 - t1:.2f}s (processing)")
    print(f"Processed {sum(len(c) for c in filtered_chunks):,} entries!")

    return pd.concat(filtered_chunks, ignore_index=True)

def load(
    year: int, month: int, day: int,
    anycast: bool, protocol: str, ip_version: int
) -> pd.DataFrame:
    # Download the file from MinIO if it exists
    filepath = download_minio_file(
        year, month, day,
        anycast, protocol, ip_version
    )
    
    # Get all comment lines found
    comment_lines = extract_comments(filepath)
    
    # Extract the hostname map from the comment lines
    hostname_map = extract_hostname_map(comment_lines)
    
    # Load the CSV file into a DataFrame
    df = csv_to_df(filepath, hostname_map, tx_worker='de-fra')

    # Drop the duplicates based on receiver, sender, and target
    df.drop_duplicates(subset=['receiver', 'sender', 'target'], keep='first', inplace=True)

    # Return the loaded dataframe
    return df


**Some ideas:**
- what is the impact on average hop count (TTL)?
- what is the impact on average RTT?
- which prefixes became unreachable?
- which prefixes shifted catchment?
- where did the prefixes that switched catchment go?

Also consider looking at GCD and filtering on sender == madrid -> how many prefixes are reachable from there?

**Tasks:**
- the ’normal’ situation before the outage, showing which hosts (in what parts of the world) routed to Madrid
- the number, and locations, of hosts that went unresponsive during the outage
- hosts that shifted routing from Madrid to a different anycast site (using e.g., a Sankey diagram), e.g., how many hosts shifted to Paris? How many hosts shifted to Frankfurt? etc..
- the situation after the outage, did routing return to normal? or are changes still visible?
- overall reachability of the Madrid site (using the unicast data) towards the entire IPv4 Internet

In [None]:
%%script false --no-raise-error

# The date of the data we are using
YEAR = 2025
MONTH = 4
DAY = 28

# Load the data
df = load(YEAR, MONTH, DAY, ANYCAST, PROTOCOL, IP_VERSION)

# Print the first 10 rows
df.head(10)

# Sankey diagram of the distribution (Task C)

This code generates a Sankey diagram visualizing the distribution of host routing changes. 
Specifically, it shows how many hosts previously routing through Madrid have shifted their routing to other anycast sites. 
The diagram illustrates the number of hosts that migrated to each destination anycast location (e.g., Paris, Frankfurt, etc.) after the routing change.

In [None]:
import plotly.graph_objects as go
import plotly.colors as plc
from collections import defaultdict
from datetime import date, timedelta

# Function to load from an date range
def load_range(from_date, to_date, *load_args, step=1):
    out = []
    current = from_date
    while current <= to_date:
        df = load(current.year, current.month, current.day, *load_args)
        label = current.strftime('%d-%m')
        out.append((df, label))
        current += timedelta(days=step)
    return out

In [None]:
# Load the data
FROM = date(2025, 4, 28)
TO = date(2025, 5, 2)
dfs_and_labels = load_range(FROM, TO, ANYCAST, PROTOCOL, IP_VERSION, step=1)

In [None]:
# PREPROCESSING

# 0. Unpack DataFrames and labels
print("Unpacking DataFrames and labels...")
dfs, labels = zip(*dfs_and_labels)


# 1. Collect all targets that ever targeted the specified target
target = 'es-mad-manycast'
targets_series_list = []
print(f"Aggregating targets that send to '{target}' from all DataFrames...")
for i, df in enumerate(dfs):
    print(f"  Scanning DataFrame {i+1}/{len(dfs)} for targets targeting '{target}'...")
    if {'target', 'receiver'}.issubset(df.columns):
        targets_series_list.append(df.loc[df['receiver'] == target, 'target'])

print("  Concatenating all target series and extracting unique targets...")
targets = set(pd.concat(targets_series_list, ignore_index=False).unique())
print(f"  Found {len(targets)} unique '{target}'")


# 2. Prepare receiver column names
print("Generating receiver column names for each label...")
receiver_cols = [f"receiver_{label}" for label in labels]


# 3. Filter and rename the columns
print("Filtering and renaming DataFrames...")
dfs_indexed = []
for i, (df, receiver_col) in enumerate(zip(dfs, receiver_cols)):
    print(f"  Processing DataFrame {i+1}/{len(dfs)}: filtering rows and renaming column...")
    # Filter to madrid_targets and select ['target', 'receiver']
    filtered = df.loc[df['target'].isin(targets), ['target', 'receiver']]
    # Rename 'receiver' to unique column for this stage
    renamed = filtered.rename(columns={'receiver': receiver_col})
    # Drop duplicate targets, keep first occurrence only
    deduped = renamed.drop_duplicates('target', keep='first')
    # Set index for fast merging
    dfs_indexed.append(deduped.set_index('target'))
print("  All DataFrames filtered and columns renamed")


# 4. Merge all at once via concat/outer join
print("Merging all DataFrames on 'target' (outer join)...")
df_seq = pd.concat(dfs_indexed, axis=1, sort=False).reset_index()
df_seq = df_seq[['target'] + receiver_cols]
print("Merge completed. Final DataFrame shape:", df_seq.shape)

In [None]:
min_count = 400
n_stages = len(receiver_cols)

# 1. Fast: Count receiver occurrences per stage (incl. unresponsive)
print("1. Counting receiver occurrences per stage (incl. unresponsive)...")
stage_receiver_counts = []
for stage_idx, receiver_col in enumerate(receiver_cols):
    counts = df_seq[receiver_col].fillna('unresponsive').astype(str).value_counts().to_dict()
    stage_receiver_counts.append(defaultdict(int, counts))
    print(f"    Stage {stage_idx+1}/{n_stages}: {sum(counts.values())} non-null entries.")

# 2. Identify insignificant receivers (below threshold, not 'unresponsive')
print("2. Identifying insignificant receivers per stage...")
insignificant_receivers_by_stage = [
    {rcv for rcv, cnt in receiver_counts.items() if cnt < min_count and rcv != 'unresponsive'}
    for receiver_counts in stage_receiver_counts
]
for stage_idx, insign_set in enumerate(insignificant_receivers_by_stage):
    print(f"    Stage {stage_idx+1}: {len(insign_set)} insignificant receivers.")

def get_label(val, insignificant_set):
    """Map raw value to 'unresponsive', 'others', or stringified value."""
    if pd.isna(val):
        return 'unresponsive'
    if (val_str := str(val)) in insignificant_set:
        return 'others'
    return '-'.join(val_str.split('-')[:-1])

# 3. Build node sets per stage, mapping 'others' and 'unresponsive'
print("3. Building node sets per stage (with collapsed labels)...")
stage_nodes = []
for stage_idx, receiver_col in enumerate(receiver_cols):
    print(f"    Stage {stage_idx+1}/{n_stages}...")
    vals = df_seq[receiver_col].values
    node_set = set(get_label(v, insignificant_receivers_by_stage[stage_idx]) for v in vals)
    stage_nodes.append(node_set)

# 4. Fast: Count transitions using itertuples (by index)
print("4. Counting transitions between stages (collapsed labels, fast tuple index)...")
transition_counts = [defaultdict(int) for _ in range(n_stages - 1)]
col_idx = [df_seq.columns.get_loc(col) for col in receiver_cols]
n_rows = df_seq.shape[0]
log_every = max(1, n_rows // 10)
for i, row in enumerate(df_seq.itertuples(index=False, name=None)):
    if (i % log_every == 0) or (i == n_rows - 1):
        print(f"    Processed {i+1}/{n_rows} rows...", end="\r" if i != n_rows-1 else "\n", flush=True)
    for stage in range(n_stages - 1):
        src_val = row[col_idx[stage]]
        dst_val = row[col_idx[stage + 1]]
        src_label = get_label(src_val, insignificant_receivers_by_stage[stage])
        dst_label = get_label(dst_val, insignificant_receivers_by_stage[stage+1])
        transition_counts[stage][(src_label, dst_label)] += 1

# 5. Assign node indices and build node list (unchanged except for logging)
print("5. Assigning node indices and building node list...")
nodes = []
node_to_index = {}
for stage_idx, node_labels in enumerate(stage_nodes):
    print(f"    Stage {stage_idx+1}/{n_stages}: {len(node_labels)} nodes.")
    for label in sorted(node_labels):
        node_to_index[(label, stage_idx)] = len(nodes)
        nodes.append((label, stage_idx))

# 6. Build Sankey links
print("6. Building Sankey links...")
links = []
for stage, stage_counts in enumerate(transition_counts):
    print(f"    Stage {stage+1}: {len(stage_counts)} transitions.")
    for (src, dst), count in stage_counts.items():
        src_idx = node_to_index[(src, stage)]
        dst_idx = node_to_index[(dst, stage+1)]
        links.append((src_idx, dst_idx, count))

# 7. Make node labels and values for Sankey
print("7. Assigning node labels and values for Sankey diagram...")
node_labels = [lbl for lbl, _ in nodes]
flow_in = defaultdict(int)
flow_out = defaultdict(int)
for src_idx, dst_idx, val in links:
    flow_out[src_idx] += val
    flow_in[dst_idx] += val
node_values = [flow_out[i] if stage == 0 else flow_in[i] for i, (_, stage) in enumerate(nodes)]
node_labels_with_values = [f"{label}\n({node_values[idx]})" for idx, label in enumerate(node_labels)]

# 8. Prepare Sankey lists
print("8. Preparing Sankey diagram lists (sources, targets, values)...")
source_indices = [src for src, _, _ in links]
target_indices = [dst for _, dst, _ in links]
flow_values =    [val for _, _, val in links]

In [None]:
# PLOT AND SAVE

# Assign each target / label one fixed color over all stages
receiver_color_map = {
    'es-mad':       '#FFA500',
    'unresponsive': '#D62728',
    'others':       '#CCCCCC'
}
palette = plc.qualitative.Plotly

# Step 2: Ensure all receivers have a color
all_receiver_labels = set(label for (label, stage) in nodes)
for rcv in all_receiver_labels:
    if rcv not in receiver_color_map:
        receiver_color_map[rcv] = palette[hash(rcv) % len(palette)]

# Step 3: Create list of colors for nodes and links
node_colors = [receiver_color_map[label] for (label, stage) in nodes]
link_colors = [receiver_color_map[nodes[src][0]] for src, dst, val in links]

# Create and configure the Sankey diagram
fig = go.Figure(data=[go.Sankey(
    node=dict(
        pad=15,
        thickness=20,
        line=dict(color="black", width=0.5),
        label=node_labels_with_values,
        color=node_colors
    ),
    link=dict(
        source=source_indices,
        target=target_indices,
        value=flow_values,
        #color=link_colors
    )
)])

# Annotate each stage with its corresponding date
fig.update_layout(
    title={
        "text": f"Receiver transitions from {FROM.strftime('%d. %B')} to {TO.strftime('%d. %B %Y')} (≥{min_count})",
        "x": 0.5,
        "y": 0.95,
        "xanchor": "center",
        "yanchor": "top",
        "font": dict(size=22)
    },
    font_size=12,
    annotations=[
        dict(
            x=i / (len(labels) - 1),
            y=1.04,
            text=f"<b>{label}</b>",
            showarrow=False,
            font=dict(size=16)
        )
        for i, label in enumerate(labels)
    ]
)

# Export the interactive diagram to an HTML file
fig.write_html(f"sankey-{labels[0]}-{labels[-1]}-h{min_count}-v{IP_VERSION}.html")
fig.show()

In [None]:
%%script false --no-raise-error
import json

per_day = defaultdict(list)
for i in range(len(source_indices)):
    # Get the indices
    from_idx = source_indices[i]
    to_idx = target_indices[i]
    # Get the data
    from_receiver, from_stage = nodes[from_idx]
    to_receiver, to_stage = nodes[to_idx]
    
    # Build the per day entry
    per_day[from_stage].append({
        "from": from_receiver,
        "to": to_receiver,
        "value": flow_values[i]
    })

# Write to file
with open(f"{labels[0]}-{labels[-1]}-h{min_count}-v{IP_VERSION}.json", "w") as f:
    json.dump(dict(per_day), f, indent=2)