In [1]:
import torch, numpy as np, random
from sklearn.model_selection import GroupKFold
from copy import deepcopy
from burst_utils import build_data_list, GCN, run_epoch, FEAT_NAMES

SEED = 123
torch.manual_seed(SEED); np.random.seed(SEED); random.seed(SEED)

# assumes you have: build_data_list(window, gap, mask_shift) → List[Data]
# and: run_epoch, GCN, FEAT_NAMES, clean_and_scale_data already defined


  return torch._C._show_config()


In [2]:
# 1. Load a single (w,g,m) configuration
data_list = build_data_list(window=5, gap=1000, mask_shift=0)

  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()
  return torch._C._show_config()


Dropped 0 bursts (0 graphs) with NaNs
Scaled data saved to .tmp_scaled.pt


In [7]:
# 2. Shuffle-label probe
from torch_geometric.loader import DataLoader
def shuffle_label_probe(data_list):
    data_shuffled = deepcopy(data_list)
    labels = [d.y.item() for d in data_shuffled]
    random.shuffle(labels)
    for d, y in zip(data_shuffled, labels):
        d.y = torch.tensor([y])

    # simple 80/20 split (no groups needed because leakage is what we test)
    split = int(0.8*len(data_shuffled))
    tr, te = data_shuffled[:split], data_shuffled[split:]

    dl_k = dict(batch_size=32, pin_memory=True, num_workers=0, shuffle=True)
    tr_ld, te_ld = DataLoader(tr, **dl_k), DataLoader(te, **dl_k | {"shuffle":False})

    model = GCN(in_channels=len(FEAT_NAMES), hidden=64, num_classes=2)
    model = model.to(device := torch.device('cpu'))
    opt   = torch.optim.Adam(model.parameters(), lr=0.01)
    crit  = torch.nn.CrossEntropyLoss()

    for epoch in range(20):
        run_epoch(model, tr_ld, crit, device, train=True, opt=opt)

    _, acc, _, _, _ = run_epoch(model, te_ld, crit, device, compute_prf=False)
    print(f"Accuracy with shuffled labels ≈ {acc:.3f} (should be ~0.5)")

shuffle_label_probe(data_list)

Accuracy with shuffled labels ≈ 0.493 (should be ~0.5)


In [8]:

# 3. Leave-one-burst-out cross-validation
def leave_one_burst_out(data_list, k=5):
    groups = np.array([d.burst_id for d in data_list])
    gkf    = GroupKFold(n_splits=k)
    f1_scores = []

    for fold, (tr_i, te_i) in enumerate(gkf.split(np.zeros(len(data_list)), groups=groups)):
        tr = [data_list[i] for i in tr_i]
        te = [data_list[i] for i in te_i]

        dl_k = dict(batch_size=32, pin_memory=True, num_workers=0)
        tr_ld = DataLoader(tr, shuffle=True, **dl_k)
        te_ld = DataLoader(te, **dl_k)

        model = GCN(len(FEAT_NAMES), 64, 2).to('cpu')
        opt   = torch.optim.Adam(model.parameters(), lr=0.01)
        crit  = torch.nn.CrossEntropyLoss()

        for _ in range(30):
            run_epoch(model, tr_ld, crit, 'cpu', train=True, opt=opt)
        _, _, prec, rec, f1 = run_epoch(model, te_ld, crit, 'cpu', compute_prf=True)
        print(f"Fold {fold+1}: F1 = {f1:.3f}")
        f1_scores.append(f1)

    print(f"\nMean ± SD F1 across bursts: {np.mean(f1_scores):.3f} ± {np.std(f1_scores):.3f}")

leave_one_burst_out(data_list, k=5)

Fold 1: F1 = 0.996
Fold 2: F1 = 0.997
Fold 3: F1 = 1.000
Fold 4: F1 = 0.998
Fold 5: F1 = 1.000

Mean ± SD F1 across bursts: 0.998 ± 0.001


In [4]:
import copy, torch
def shuffle_node_ids(graphs):
    gs = []
    for d in graphs:
        idx = torch.randperm(d.x.size(0))
        d2  = copy.deepcopy(d)
        d2.x = d.x[idx]             # permute rows
        # remap edge_index
        mapping = {old.item(): new for new, old in enumerate(idx)}
        d2.edge_index = d.edge_index.clone()
        d2.edge_index[0] = d2.edge_index[0].apply_(lambda i: mapping[i])
        d2.edge_index[1] = d2.edge_index[1].apply_(lambda i: mapping[i])
        gs.append(d2)
    return gs

shuffled = shuffle_node_ids(data_list)


In [5]:
import numpy as np, copy
means = np.vstack([d.x.numpy() for d in data_list]).mean(axis=0)

def constant_features(graphs):
    gs = []
    for d in graphs:
        d2 = copy.deepcopy(d)
        d2.x[:] = torch.tensor(means, dtype=d2.x.dtype)
        gs.append(d2)
    return gs

const_feats = constant_features(data_list)

In [7]:

from torch_geometric.loader import DataLoader
device = torch.device('cpu')  # or 'cuda' if you have a GPU
def evaluate_variant(graphs, label):
    split = int(0.8*len(graphs))
    tr, te = graphs[:split], graphs[split:]
    dl_k = dict(batch_size=32, shuffle=True)
    tr_ld = DataLoader(tr, **dl_k)
    te_ld = DataLoader(te, batch_size=32)

    model = GCN(len(FEAT_NAMES), 64, 2).to(device)
    opt   = torch.optim.Adam(model.parameters(), lr=0.01)
    crit  = torch.nn.CrossEntropyLoss()

    for _ in range(30):
        run_epoch(model, tr_ld, crit, device, train=True, opt=opt)

    _, acc, _, _, f1 = run_epoch(model, te_ld, crit, device, compute_prf=True)
    print(f"{label:15s}: acc = {acc:.3f}  F1 = {f1:.3f}")

evaluate_variant(shuffled,   "ID-shuffled")
evaluate_variant(const_feats, "Const-feature")


ID-shuffled    : acc = 0.813  F1 = 0.843
Const-feature  : acc = 0.500  F1 = 0.667


In [None]:
def randomise_edges(graphs):
    import copy, torch
    gs = []
    for d in graphs:
        num_nodes = d.x.size(0)
        num_edges = d.edge_index.size(1)
        rand_src  = torch.randint(0, num_nodes, (num_edges,))
        rand_dst  = torch.randint(0, num_nodes, (num_edges,))
        d2 = copy.deepcopy(d)
        d2.edge_index = torch.stack([rand_src, rand_dst], dim=0)
        gs.append(d2)
    return gs

rand_edges = randomise_edges(data_list)
for i in range(len(rand_edges)):
    rand_edges[i].edge_index = torch.empty((2,0),dtype=torch.long)
evaluate_variant(rand_edges, "Rand-edges")


Rand-edges     : acc = 0.733  F1 = 0.788


In [12]:
# Rand edges with empty edge_index
def randomise_edges(graphs):
    import copy, torch
    gs = []
    for d in graphs:
        num_nodes = d.x.size(0)
        num_edges = d.edge_index.size(1)
        rand_src  = torch.randint(0, num_nodes, (num_edges,))
        rand_dst  = torch.randint(0, num_nodes, (num_edges,))
        d2 = copy.deepcopy(d)
        d2.edge_index = torch.stack([rand_src, rand_dst], dim=0)
        gs.append(d2)
    return gs

rand_edges = randomise_edges(data_list)
for i in range(len(rand_edges)):
    rand_edges[i].edge_index = torch.empty((2,0),dtype=torch.long)
evaluate_variant(rand_edges, "Rand-edges (empty edges)")

Rand-edges (empty edges): acc = 0.537  F1 = 0.682


In [None]:
# ------------------------------------------------------------
# 1. Degree-preserving edge shuffle
# ------------------------------------------------------------
def degree_preserving_shuffle(d):
    G = nx.Graph()
    G.add_nodes_from(range(d.x.size(0)))
    G.add_edges_from(d.edge_index.t().tolist())

    # Maslov–Sneppen edge swap — skip if graph too small
    try:
        nx.double_edge_swap(
            G,
            nswap=max(1, 2 * G.number_of_edges()),
            max_tries=100 * G.number_of_edges(),
            seed=random.randint(0, 1_000_000),
        )
    except nx.NetworkXError:
        pass  # leave tiny graphs unchanged

    d2 = copy.deepcopy(d)
    edges = list(G.edges())
    # make directed / undirected match original
    if not d.is_directed():
        edges = edges + [(v, u) for u, v in edges]
    d2.edge_index = torch.tensor(edges, dtype=torch.long).t().contiguous()
    return d2


rand_deg = [degree_preserving_shuffle(g) for g in data_list]
evaluate_variant(rand_deg, "Deg-preserved")     # uses the earlier GNN evaluate_variant



In [11]:
from sklearn.metrics import precision_recall_fscore_support
import copy, torch, networkx as nx, random
from torch_geometric.loader import DataLoader
from torch_geometric.utils import to_undirected
from torch_geometric.data import Batch
from torch.utils.data import TensorDataset, DataLoader as DL

# ------------------------------------------------------------
# 2. Pure-feature MLP baseline
#    → need a *different* loader & run loop
# ------------------------------------------------------------
class BaselineMLP(torch.nn.Module):
    def __init__(self, in_dim):
        super().__init__()
        self.net = torch.nn.Sequential(
            torch.nn.Linear(in_dim, 64), torch.nn.ReLU(),
            torch.nn.Linear(64, 32),     torch.nn.ReLU(),
            torch.nn.Linear(32, 2)
        )
    def forward(self, x):
        return self.net(x)


def pooled_loader(graphs, shuffle):
    """Return a DataLoader whose batches are flat [num_feat] tensors."""
    X = torch.stack([g.x.mean(dim=0) for g in graphs])
    y = torch.cat([g.y for g in graphs])
    ds = TensorDataset(X, y)
    return DL(ds, batch_size=32, shuffle=shuffle)


def evaluate_variant_mlp(graphs, label):
    split = int(0.8 * len(graphs))
    tr, te = graphs[:split], graphs[split:]

    tr_ld = pooled_loader(tr, shuffle=True)
    te_ld = pooled_loader(te, shuffle=False)

    model = BaselineMLP(len(FEAT_NAMES)).to(device)
    opt   = torch.optim.Adam(model.parameters(), lr=0.01)
    crit  = torch.nn.CrossEntropyLoss()

    # simple epoch loop
    for _ in range(30):
        model.train()
        for X, y in tr_ld:
            X, y = X.to(device), y.to(device)
            opt.zero_grad()
            out = model(X)
            loss = crit(out, y)
            loss.backward(); opt.step()

    # evaluation
    model.eval(); corr, all_y, all_p = 0, [], []
    with torch.no_grad():
        for X, y in te_ld:
            X, y = X.to(device), y.to(device)
            out  = model(X)
            pred = out.argmax(1)
            corr += (pred == y).sum().item()
            all_y.extend(y.cpu().numpy())
            all_p.extend(pred.cpu().numpy())

    acc = corr / len(te_ld.dataset)
    _, _, f1, _ = precision_recall_fscore_support(
        all_y, all_p, average='binary', zero_division=0)
    print(f"{label:15s}: acc = {acc:.3f}  F1 = {f1:.3f}")


evaluate_variant_mlp(data_list, "Pure-feature MLP")


Pure-feature MLP: acc = 0.607  F1 = 0.718
