<b>Import required libraries</b>

In [None]:
# Import core libraries for data manipulation, modeling, and visualization
import numpy as np
import pandas as pd
import torch
import random
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import GCNConv
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import classification_report
import os
import warnings

warnings.filterwarnings("ignore", category=UserWarning)

<b>Define utility functions</b>

In [None]:
# Define helper functions for reproducibility and environment setup
def set_seed(seed=42):
    np.random.seed(seed)
    torch.manual_seed(seed)
    random.seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)

<b>Build graph structure from tabular data</b>

In [None]:
# Build correlation-based adjacency matrix and feature matrix from a tabular dataset
def build_sample_graph(data_df, feature_cols, label_col='label', threshold=0.7, scaler=None):
    filtered_df = data_df[feature_cols + [label_col]]
    labels = torch.tensor(filtered_df[label_col].values, dtype=torch.long)
    features_df = filtered_df.drop(columns=[label_col])

    if scaler is None:
        scaler = StandardScaler()
        features = scaler.fit_transform(features_df.values)
    else:
        features = scaler.transform(features_df.values)

    corr = np.corrcoef(features)
    adj = (np.abs(corr) > threshold).astype(int)
    np.fill_diagonal(adj, 0)
    edge_index = np.array(np.nonzero(adj))
    return torch.tensor(edge_index, dtype=torch.long), torch.tensor(features, dtype=torch.float), labels, scaler

<b>Define the GCN model</b>

In [None]:
# Define a simple multi-layer Graph Convolutional Network for node classification
class GCNModel(nn.Module):
    def __init__(self, num_features, num_classes, hidden_channels=64, dropout=0.4):
        super(GCNModel, self).__init__()
        self.conv1 = GCNConv(num_features, hidden_channels)
        self.bn1 = nn.BatchNorm1d(hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.bn2 = nn.BatchNorm1d(hidden_channels)
        self.conv3 = GCNConv(hidden_channels, num_classes)
        self.dropout = dropout
        self.apply(self.init_weights)

    def init_weights(self, m):
        if hasattr(m, 'lin') and hasattr(m.lin, 'weight'):
            torch.nn.init.xavier_uniform_(m.lin.weight)

    def forward(self, x, edge_index, edge_weight=None):
        x = self.conv1(x, edge_index, edge_weight)
        x = self.bn1(x)
        x = F.relu(x)
        x = F.dropout(x, p=self.dropout, training=self.training)
        x = self.conv2(x, edge_index, edge_weight)
        x = self.bn2(x)
        x = F.relu(x)
        x = F.dropout(x, p=self.dropout, training=self.training)
        x = self.conv3(x, edge_index, edge_weight)
        return x

<b>Compute integrated gradients for feature importance</b>

In [None]:
# Calculate integrated gradients to estimate feature importance for model predictions
def calculate_integrated_gradients(model, x, edge_index, target_label, num_steps=50):
    model.eval()
    baseline = torch.zeros_like(x)
    importance_scores = torch.zeros_like(x)
    for i in range(num_steps + 1):
        alpha = i / num_steps
        interpolated_input = baseline + alpha * (x - baseline)
        interpolated_input.requires_grad = True
        output = model(interpolated_input, edge_index)
        target_output = output[:, target_label].sum()
        model.zero_grad()
        target_output.backward()
        if interpolated_input.grad is not None:
            importance_scores += interpolated_input.grad.detach()
    integrated_gradients = (x - baseline) * importance_scores / num_steps
    return torch.abs(integrated_gradients).sum(dim=0).cpu().numpy()

<b>Recursive feature elimination using integrated gradients</b>

In [None]:
# Iteratively eliminate least important features based on integrated gradients 
def recursive_feature_elimination(data_df, device, label_col='label', hidden_channels=64, epochs=100,
                                  elimination_rate=0.1, min_features_to_keep=None, seed=42):
    set_seed(seed)
    current_features = [c for c in data_df.columns if c != label_col]
    best_features = current_features.copy()
    best_val_acc = 0.0

    if min_features_to_keep is None:
        min_features_to_keep = 1
    # User can change test_size to evaluate model performance.
    train_val_df, _ = train_test_split(data_df, test_size=0.2, random_state=seed, stratify=data_df[label_col])
    train_df, val_df = train_test_split(train_val_df, test_size=0.25, random_state=seed, stratify=train_val_df[label_col])

    while len(current_features) > min_features_to_keep:
        edge_train, x_train, y_train, scaler = build_sample_graph(train_df, current_features, label_col)
        edge_val, x_val, y_val, _ = build_sample_graph(val_df, current_features, label_col, scaler=scaler)

        num_features = x_train.shape[1]
        num_classes = len(y_train.unique())

        model = GCNModel(num_features, num_classes, hidden_channels).to(device)
        y_np = y_train.cpu().numpy()
        weights = compute_class_weight('balanced', classes=np.unique(y_np), y=y_np)
        criterion = nn.CrossEntropyLoss(weight=torch.tensor(weights, dtype=torch.float32, device=device))
        optimizer = torch.optim.AdamW(model.parameters(), lr=0.01, weight_decay=1e-3)

        # Train model
        for _ in range(epochs):
            model.train()
            optimizer.zero_grad()
            out = model(x_train.to(device), edge_train.to(device))
            loss = criterion(out, y_train.to(device))
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=2.0)
            optimizer.step()

        # Validation accuracy
        val_acc = evaluate_epoch(model, x_val.to(device), y_val.to(device), edge_val.to(device), criterion)[1]
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_features = current_features.copy()

        # Compute IG and eliminate low-importance features
        combined_scores = np.zeros(num_features)
        for i in range(num_classes):
            ig = calculate_integrated_gradients(model, x_train.to(device), edge_train.to(device), target_label=i)
            combined_scores += np.abs(ig)

        ranked = pd.DataFrame({'Feature': current_features, 'Score': combined_scores}).sort_values('Score', ascending=False)
        remove_count = max(1, int(len(current_features) * elimination_rate))
        current_features = ranked['Feature'][:-remove_count].tolist()

    return best_val_acc, best_features

<b>Evaluation and validation</b>

In [None]:
# Evaluate model performance using accuracy and classification report
def evaluate_epoch(model, x, y, edge_index, criterion):
    model.eval()
    with torch.no_grad():
        out = model(x, edge_index)
        loss = criterion(out, y).item()
        preds = out.argmax(dim=1)
        acc = (preds == y).float().mean().item()
    return loss, acc

<b>Main pipeline for dataset(s)</b>

In [None]:
# Run the full pipeline on one or more datasets (The User can modify the parameters if they would like)
def run_pipeline(file_paths, label_col='label', hidden_dim=64, epochs=100, elimination_rate=0.1, min_keep_ratio=0.05):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    seeds = [42, 100, 200, 300, 400]
    val_seeds = [1, 2, 3, 4, 5]

    for file_path in file_paths:
        print(f"\nProcessing dataset: {os.path.basename(file_path)}")
        df = pd.read_csv(file_path, index_col=0)
        min_features = max(1, int(df.drop(columns=[label_col]).shape[1] * min_keep_ratio))

        all_acc, all_feats = [], []
        for seed in seeds:
            acc, feats = recursive_feature_elimination(df, device, label_col, hidden_dim, epochs, elimination_rate, min_features, seed)
            all_acc.append(acc)
            all_feats.append(feats)

        print(f"Best accuracy: {np.max(all_acc):.4f}, Avg accuracy: {np.mean(all_acc):.4f}")
        print(f"Selected feature count: {len(all_feats[np.argmax(all_acc)])}")

<b>Example usage</b>

In [None]:
# Example usage â€” replace with your own datasets (Make sure to place in correct file path)
dataset_paths = [
    "data/sample_dataset1.csv",
    "data/sample_dataset2.csv"
]

run_pipeline(dataset_paths, label_col='label')