In [1]:
import sys
from itertools import count
from torch import autograd
import copy

sys.path.append('../')
from models.gcn import *
from utils.datasets import *

In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
data = get_cora(device)

In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from torch_geometric.utils import to_dense_adj
from tqdm.auto import tqdm

class GCN(nn.Module):
    def __init__(self, in_feats, h_feats, num_classes):
        super(GCN, self).__init__()
        self.conv1 = GCNConv(in_feats, h_feats)
        self.conv2 = GCNConv(h_feats, num_classes)
        self.optimizer = torch.optim.Adam(self.parameters(), lr=0.01)


    def forward(self, data):
        x, edge_index = data.x, data.edge_index
        h = self.conv1(x, edge_index)
        h = F.relu(h)
        h = self.conv2(h, edge_index)
        return h

    def forward_with_adj(self, x, adj):
        # Use A directly in matrix multiplication
        h = torch.mm(adj, self.conv1(x))  # Dense adjacency for feature propagation
        h = F.relu(h)
        h = torch.mm(adj, self.conv2(h))
        return h

    def train_model(self, data, differentiable=False):
        self.train()
        logits = self(data.cuda())
        loss = F.cross_entropy(logits[data.train_mask], data.y[data.train_mask]) 

        self.optimizer.zero_grad()
        if differentiable:
            grads = torch.autograd.grad(loss, self.parameters(), create_graph=True)
            for param, grad in zip(self.parameters(), grads):
                param.grad = grad
        else:
            loss.backward()
    
        self.optimizer.step()
    
        return loss.item()

    def test(self, data):
        self.eval()
        out = self(data.cuda())
        pred = out.argmax(dim=1)
    
        acc = (pred[data.test_mask] == data.y[data.test_mask]).sum().item() / data.test_mask.sum().item()
        return acc

    def fit(self, data, epochs=200, **kwargs):
        for epoch in tqdm(range(epochs), desc="Training Epochs"):
            loss = self.train_model(data, **kwargs)
            acc = self.test(data)

In [4]:
from torch_geometric.utils import to_dense_adj
import torch

def compute_adj_gradient(model, data):
    # Convert edge_index to a dense adjacency matrix and set requires_grad=True
    A = to_dense_adj(data.edge_index)[0].float().to(data.x.device)
    A.requires_grad_(True)
    
    model.train()
    
    # Forward pass with A used directly in the model
    output = model.forward_with_adj(data.x, A)
    
    # Compute the loss with respect to the training set
    loss = F.cross_entropy(output[data.train_mask], data.y[data.train_mask])
    
    # Use torch.autograd.grad to compute gradients of loss with respect to A, allowing unused
    adj_grad = torch.autograd.grad(loss, A, create_graph=True, retain_graph=True, allow_unused=True)[0]

    # Check if adj_grad is None (i.e., A was not used in a differentiable way)
    if adj_grad is None:
        print("Warning: A was not used in a differentiable way in the model.")
    else:
        adj_grad = adj_grad.clone().detach()  # Clone and detach to avoid further backprop issues

    return adj_grad, A


In [5]:
import numpy as np

def identify_perturbations(adj_grad, A, num_phase_1, num_phase_2):
    # Flatten the gradient matrix for easy sorting
    grad_flat = adj_grad.cpu().numpy().flatten()
    A_flat = A.detach().cpu().numpy().flatten()
    
    # Find indices of edges (1s) and non-edges (0s)
    non_edge_indices = np.where(A_flat == 0)[0]
    edge_indices = np.where(A_flat == 1)[0]
    
    # Sort non-edges by gradient (least impactful first)
    least_impactful_indices = non_edge_indices[np.argsort(grad_flat[non_edge_indices])][:num_phase_1]
    
    # Sort edges by gradient (most impactful last)
    most_impactful_indices = edge_indices[np.argsort(-grad_flat[edge_indices])][:num_phase_2]
    
    return least_impactful_indices, most_impactful_indices


In [6]:
from torch_geometric.utils import to_dense_adj

def apply_perturbations(A, perturb_indices):
    A_flat = A.view(-1)  # Flatten A to apply perturbations
    A_flat[perturb_indices] = 1 - A_flat[perturb_indices]  # Flip edges
    return A.view(A.shape)  # Reshape back to original dimensions

In [7]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Assuming `data` is already loaded and contains `x` (features) and `y` (labels)
in_feats = data.x.shape[1]
num_classes = int(data.y.max().item()) + 1

# Initialize the model
model = GCN(in_feats=in_feats, h_feats=64, num_classes=num_classes).to(device)

In [8]:
# Number of edges to perturb in each phase
num_phase_1 = 200  # Least impactful (Phase 1)
num_phase_2 = 100  # Most impactful (Phase 2)

# Step 3.1: Compute gradients
adj_grad, A_dense = compute_adj_gradient(model, data)

# Step 3.2: Identify edges to perturb
least_impactful_indices, most_impactful_indices = identify_perturbations(adj_grad, A_dense, num_phase_1, num_phase_2)

# Step 3.3: Apply Phase 1 perturbations
A_phase_1 = apply_perturbations(A_dense.clone(), least_impactful_indices)

# Convert A_phase_1 back to edge_index format if needed
data.edge_index = A_phase_1.to_sparse().indices()

# Step 3.4: Evaluate after Phase 1 perturbations
accuracy_phase_1 = model.test(data)
print(f'Accuracy after Phase 1 perturbations: {accuracy_phase_1:.4f}')

# Step 3.5: Apply Phase 2 perturbations
A_phase_2 = apply_perturbations(A_phase_1.clone(), most_impactful_indices)

# Convert A_phase_2 back to edge_index format if needed
data.edge_index = A_phase_2.to_sparse().indices()

# Step 3.6: Evaluate after Phase 2 perturbations
accuracy_phase_2 = model.test(data)
print(f'Accuracy after Phase 2 perturbations: {accuracy_phase_2:.4f}')


TypeError: forward() missing 1 required positional argument: 'edge_index'