<a href="https://colab.research.google.com/github/mahmud-sayed-alamin/GNN-Water-mark/blob/main/GNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.datasets import Planetoid
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data

In [28]:
dataset = Planetoid(root='/tmp/Cora', name='Cora')
data = dataset[0]


In [29]:
torch.manual_seed(42)

<torch._C.Generator at 0x7922d73adfd0>

In [30]:
class GCN(torch.nn.Module):
    def __init__(self, in_channels, hidden_channels, out_channels):
        super(GCN, self).__init__()
        # Define two GCN layers
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, out_channels)

    def forward(self, x, edge_index):
        x = self.conv1(x, edge_index)
        x = F.relu(x)
        x = F.dropout(x, p=0.05, training=self.training)

        x = self.conv2(x, edge_index)
        return F.log_softmax(x, dim=1)

In [31]:
model = GCN(
    in_channels=1433,
    hidden_channels=32,
    out_channels=7
)

In [32]:
out = model(data.x, data.edge_index)
print(out.shape)

torch.Size([2708, 7])


In [33]:
import torch.nn.functional as F

def compute_loss(output, y, mask):
    loss = F.nll_loss(output[mask], y[mask])
    return loss
optimizer = torch.optim.Adam(model.parameters(), lr=0.01, weight_decay=5e-2)

In [34]:
def train(data):
    model.train()
    optimizer.zero_grad()
    output = model(data.x, data.edge_index)
    loss = compute_loss(output, data.y, data.train_mask)
    loss.backward()
    optimizer.step()
    return loss.item()

In [35]:
def test(data):
    model.eval()
    output = model(data.x, data.edge_index)
    pred = output.argmax(dim=1)
    accs = []
    for mask in [data.train_mask, data.val_mask, data.test_mask]:
        correct = pred[mask].eq(data.y[mask]).sum().item()
        acc = correct / mask.sum().item()
        accs.append(acc)
    return accs

In [36]:
def evaluate(data):
    model.eval()

    with torch.no_grad():
        output = model(data.x, data.edge_index)

    predictions = output.argmax(dim=1)

    accuracies = {}

    # **Train Accuracy**
    train_correct = predictions[data.train_mask].eq(data.y[data.train_mask]).sum().item()
    train_total = data.train_mask.sum().item()
    train_accuracy = train_correct / train_total
    accuracies['Train'] = train_accuracy

    # **Validation Accuracy**
    val_correct = predictions[data.val_mask].eq(data.y[data.val_mask]).sum().item()
    val_total = data.val_mask.sum().item()
    val_accuracy = val_correct / val_total
    accuracies['Validation'] = val_accuracy

    # **Test Accuracy**
    test_correct = predictions[data.test_mask].eq(data.y[data.test_mask]).sum().item()
    test_total = data.test_mask.sum().item()
    test_accuracy = test_correct / test_total
    accuracies['Test'] = test_accuracy

    return accuracies


In [16]:
train_acc_list = []
val_acc_list = []
loss_list = []

for epoch in range(200):
    loss = train(data)
    loss_list.append(loss)

    accuracies = evaluate(data)
    train_acc_list.append(accuracies['Train'])
    val_acc_list.append(accuracies['Validation'])

    if epoch % 10 == 0:
        print(f"Epoch {epoch:03d}, Loss: {loss:.4f}, "
              f"Train Acc: {accuracies['Train']:.4f}, "
              f"Val Acc: {accuracies['Validation']:.4f}, "
              f"Test Acc: {accuracies['Test']:.4f}")



Epoch 000, Loss: 0.7225, Train Acc: 0.9786, Val Acc: 0.7840, Test Acc: 0.8120
Epoch 010, Loss: 0.7239, Train Acc: 0.9714, Val Acc: 0.7800, Test Acc: 0.8110
Epoch 020, Loss: 0.7175, Train Acc: 0.9786, Val Acc: 0.7840, Test Acc: 0.8090
Epoch 030, Loss: 0.7248, Train Acc: 0.9786, Val Acc: 0.7800, Test Acc: 0.8080
Epoch 040, Loss: 0.7262, Train Acc: 0.9786, Val Acc: 0.7820, Test Acc: 0.8100
Epoch 050, Loss: 0.7210, Train Acc: 0.9786, Val Acc: 0.7820, Test Acc: 0.8120
Epoch 060, Loss: 0.7233, Train Acc: 0.9786, Val Acc: 0.7840, Test Acc: 0.8120
Epoch 070, Loss: 0.7254, Train Acc: 0.9714, Val Acc: 0.7860, Test Acc: 0.8140
Epoch 080, Loss: 0.7227, Train Acc: 0.9857, Val Acc: 0.7840, Test Acc: 0.8110
Epoch 090, Loss: 0.7350, Train Acc: 0.9786, Val Acc: 0.7820, Test Acc: 0.8160
Epoch 100, Loss: 0.7212, Train Acc: 0.9714, Val Acc: 0.7840, Test Acc: 0.8120
Epoch 110, Loss: 0.7295, Train Acc: 0.9857, Val Acc: 0.7860, Test Acc: 0.8110
Epoch 120, Loss: 0.7311, Train Acc: 0.9857, Val Acc: 0.7780, Tes

In [24]:
def select_high_prob_nodes(data, model, target_label, num_trigger_nodes):
    model.eval()
    with torch.no_grad():
        output = model(data.x, data.edge_index)

    # Convert log probabilities to probabilities for the target label
    target_probs = output[:, target_label].exp()

    sorted_indices = torch.argsort(target_probs, descending=True)
    high_conf_nodes = sorted_indices[:num_trigger_nodes]
    return high_conf_nodes

In [18]:
def generate_trigger_graph(data, model, num_trigger_nodes=10, target_label=0):
    model.eval()
    with torch.no_grad():
        output = model(data.x, data.edge_index)
        predictions = output.argmax(dim=1)

    target_nodes = torch.where(predictions == target_label)[0][:num_trigger_nodes]

    # Generate trigger nodes and connect them to the selected nodes
    trigger_features = data.x[target_nodes].clone()
    trigger_adj = torch.eye(num_trigger_nodes)

    new_features = torch.cat([data.x, trigger_features], dim=0)
    new_adj = torch.block_diag(data.edge_index, trigger_adj)

    return new_features, new_adj

In [19]:
def train_with_watermark(data, trigger_graph, target_label):
    model.train()
    optimizer.zero_grad()

    x_combined = torch.cat([data.x, trigger_graph['features']], dim=0)
    edge_index_combined = torch.cat([data.edge_index, trigger_graph['edges']], dim=1)

    output = model(x_combined, edge_index_combined)
    loss = F.nll_loss(output[data.train_mask], data.y[data.train_mask])

    trigger_loss = F.nll_loss(output[trigger_graph['nodes']], target_label)
    total_loss = loss + trigger_loss

    # Backward pass
    total_loss.backward()
    optimizer.step()

    return total_loss.item()



In [25]:
def evaluate_fidelity(data):
    model.eval()  # Set the model to evaluation mode

    # Forward pass on the clean graph
    with torch.no_grad():
        output = model(data.x, data.edge_index)

    # Compute accuracy for train, validation, and test sets
    accuracies = {}
    for mask_name, mask in [('Train', data.train_mask),
                            ('Validation', data.val_mask),
                            ('Test', data.test_mask)]:
        correct = output[mask].argmax(dim=1).eq(data.y[mask]).sum().item()
        total = mask.sum().item()
        accuracies[mask_name] = correct / total

    return accuracies

fidelity_results = evaluate_fidelity(data)
print("Fidelity Results:")
print(f"Train Accuracy: {fidelity_results['Train']:.4f}")
print(f"Validation Accuracy: {fidelity_results['Validation']:.4f}")
print(f"Test Accuracy: {fidelity_results['Test']:.4f}")

Fidelity Results:
Train Accuracy: 0.9857
Validation Accuracy: 0.7820
Test Accuracy: 0.8140
