## Графовые нейронные сети

На этой практике будем решать задачу предсказания свободной энергии связывания для двух белковых молекул на основе структуры образованного ими белкового комплекса, а точнее — полноатомной структуры интерфейса взаимодействия.


Свободная энергия связывания: $\Delta G = - RT \ln K_d$, где

- $R$ — универсальная газовая постоянная
- $T$ — температура
- $K_d = \frac{[R][L]}{[RL]}$ — константа диссоциации, равная отношению произведения концентраций белков R и L к концентрации их комплекса RL в равновесном состоянии

<style>
td, th {
   border: none!important;
}
</style>

|  |  |  |  |
| - | - | - | - |
| <img style="vertical-align:middle" src="../attachments/4kro.png" height="250" alt="A grey image showing text 60 x 60"> | $\quad \longrightarrow \quad$ | <img style="vertical-align:middle" img src="../attachments/4kro_sticks_contacts.png" height="200"> | $\longrightarrow \Delta G = - RT \ln K_d$ |

План на сегодня:
- Построение графа из структуры белкового комплекса
- `torch-geometric`: упаковка графов в батчи
- `torch-geometric`: простая архитектура для задачи регрессии на графах

Нам понадобятся пакеты из стека `torch-geometric` для работы с графами, а также `plotly` для построения интерактивных графиков

In [1]:
# MacOS
# ! pip install torch-geometric torch-cluster torch-scatter torch-sparse -f https://data.pyg.org/whl/torch-2.4.0+cpu.html

# Linux + CUDA
# ! pip install torch-geometric torch-cluster torch-scatter torch-sparse -f https://data.pyg.org/whl/torch-2.4.0+cu124.html

In [2]:
import itertools
import json
from pathlib import Path
from typing import Any, Protocol

import plotly.graph_objects as go
import torch
import torch.nn.functional as F
from torch import Tensor
from torch.optim import Adam
from torch.utils.data import Dataset
from torch_geometric.data import Batch, Data
from torch_geometric.loader import DataLoader
from torch_geometric.nn import radius_graph
from torch_geometric.utils import to_undirected

### Знакомимся с данными

Мы будем работать с небольшими датасетами: всего 266 наблюдений в обучающей выборке и 88 в тестовой:

In [3]:
train_data = json.loads(Path("../datasets/affinity_train.json").read_text())
test_data = json.loads(Path("../datasets/affinity_test.json").read_text())
print(len(train_data))
print(len(test_data))

266
88


Посмотрим на содержимое:

In [4]:
for k, v in train_data[0].items():
    if isinstance(v, dict):
        print(k, ":")
        for k1, v1 in v.items():
            print("\t", k1, v1[:3])
    else:
        print(k, v)

uid 1a22
complex_graph :
	 coords [[[73.227, 31.442, 101.41], [72.12, 30.453, 101.239], [71.047, 30.634, 102.304], [71.319, 30.52, 103.512]], [[69.795, 30.848, 101.863], [68.627, 31.051, 102.723], [68.376, 29.893, 103.687], [69.121, 28.913, 103.713]], [[67.332, 30.007, 104.493], [67.02, 28.959, 105.443], [65.535, 28.736, 105.483], [64.766, 29.658, 105.248]]]
	 residues ['F', 'P', 'T']
	 chain_ids ['A', 'A', 'A']
	 is_receptor [1, 1, 1]
interface_graph :
	 coords [[53.11, 23.405, 126.043], [51.895, 23.909, 126.674], [50.985, 22.781, 127.126]]
	 atoms ['N', 'CA', 'C']
	 residues ['H', 'H', 'H']
	 chain_ids ['A', 'A', 'A']
	 is_receptor [1, 1, 1]
affinity -12.716032590837752


Наша задача: на основе структуры белкового комплекса предсказать свободную энергию связывания `affinity`.

В наших данных структура представлена в двух видах:
- `complex_graph`: положение атомов основной цепи (N, CA, C, O) для всего белкового комплекса
  - `coords` (N x 4 x 3) — координаты атомов (N, CA, C, O) для всех N аминокислот белкового комплекса
  - `residues` (N) — однобуквенные коды для всех аминокислот
  - `chain_ids` (N) — идентификаторы аминокислотных цепей, входящих в состав комплекса
  - `is_receptor` (N) — индикатор принадлежности к белку-рецептору (0 — аминокислота входит в белок-лиганд)
- `interface_graph`: положение всех атомов (кроме водорода) в интерфейсе взаимодействия
  - `coords` (K x 3) — координаты атомов
  - `atoms` (K) — названия атомов
  - `residues` (K) — однобуквенные коды аминокислот для всех атомов
  - `chain_ids` (K) — идентификаторы аминокислотных цепей, входящих в состав комплекса
  - `is_receptor` (N) — индикатор принадлежности к белку-рецептору (0 — атом входит в белок-лиганд)

Как из таких сложных объектов формировать батчи?

Будем формировать наш батч как огромный граф, где из матриц смежности исходных графов собрана блочно-диагональная матрица, а векторы-признаки просто конкатенируются вдоль размерности батча.

NB: конечно, в памяти мы будем хранить не оргомную разреженную матрицу, а просто списки рёбер и ассоциированные с ними признаки

$$
\begin{split}\mathbf{A} = \begin{bmatrix} \mathbf{A}_1 & & \\ & \ddots & \\ & & \mathbf{A}_n \end{bmatrix}, \qquad \mathbf{X} = \begin{bmatrix} \mathbf{X}_1 \\ \vdots \\ \mathbf{X}_n \end{bmatrix}, \qquad \mathbf{Y} = \begin{bmatrix} \mathbf{Y}_1 \\ \vdots \\ \mathbf{Y}_n \end{bmatrix}.\end{split}
$$

### Строим граф по атомной структуре

Для начала определим интерфейс структуры данных, которая будет хранить граф.

Это делаем просто для удобства, чтобы не привязывать функции для работы с графами к конкретной реализации.

Реализовывать интерфейс нашего `AtomicInterfaceGraph` будет класс `torch_geometric.data.Data`: с его помощью мы сможем автоматизировать упаковку отдельных наблюдений в минибатчи

In [5]:
class AtomicInterfaceGraph(Protocol):
    atoms: Tensor  # (N): идентификаторы типов атомов
    residues: Tensor  # (N): идентификаторы аминокислот
    is_receptor: Tensor  # (N): 1 для атомов рецептора, 0 для атомов лиганда
    coordinates: Tensor  # (N x 3): координаты атомов
    edge_index: Tensor  # (2 x E) список рёбер между атомами
    distances: Tensor  # (E): расстояния между атомами, соединёнными ребром
    affinity: Tensor | None  # (n): свободная энергия связывания
    batch: Tensor | None  # (N): идентификаторы подграфов в батче, [0, n-1]

Также нам понадобятся некоторые константы для преобразования входных данных в числовой формат:

In [6]:
# fmt: off
ATOM_NAMES = [
    "C", "CA", "CB", "CD", "CD1", "CD2", "CE", "CE1", "CE2", "CE3", "CG", "CG1", "CG2", "CH2", "CZ", "CZ2", "CZ3",
    "H", "H2", "H3", "HA", "HA2", "HA3", "HB", "HB1", "HB2", "HB3", "HD1", "HD11", "HD12", "HD13",
    "HD2", "HD21", "HD22", "HD23", "HD3", "HE", "HE1", "HE2", "HE21", "HE22", "HE3",
    "HG", "HG1", "HG11", "HG12", "HG13", "HG2", "HG21", "HG22", "HG23", "HG3", "HH", "HH11", "HH12",
    "HH2", "HH21", "HH22", "HZ", "HZ1", "HZ2", "HZ3",
    "N", "ND1", "ND2", "NE", "NE1", "NE2", "NH1", "NH2", "NZ",
    "O", "OD1", "OD2", "OE1", "OE2", "OG", "OG1", "OH", "OXT", "SD", "SG",
]
# fmt: on
ATOMS_INDICES = {x: i for i, x in enumerate(ATOM_NAMES)}
RESIDUES = "ACDEFGHIKLMNPQRSTVWY"
RESIDUE_INDICES = {c: i for i, c in enumerate("RESIDUES")}
ATOM_COLORS = {
    "C": "gray",
    "N": "blue",
    "O": "red",
    "H": "black",
    "S": "yellow",
}

Напишем функцию для получения графа из одного наблюдения в нашем датасете.

In [7]:
def create_interface_graph(
    interface_structure: dict[str, Any],
    graph_radius: float = 4.0,
    n_neighbors: int = 10,
) -> AtomicInterfaceGraph:
    # преобразуем названия атомов в индексы
    encoded_atoms = torch.tensor(
        [ATOMS_INDICES.get(atom, len(ATOMS_INDICES)) for atom in interface_structure["atoms"]]
    )
    # то же для аминокислот
    encoded_residues = torch.tensor(
        [RESIDUE_INDICES.get(res, len(RESIDUE_INDICES)) for res in interface_structure["residues"]]
    )

    is_receptor = torch.tensor(interface_structure["is_receptor"])

    # тензор с координатами атомов
    coordinates = torch.tensor(interface_structure["coords"]).float()

    # используем координаты для построения радиус-графа:
    # NB: модели torch geometric обычно интерпретируют рёбра как направленные,
    # так что мы добавляем обратные рёбра с помощью функции `to_undirected`,
    # если хотим работать с неориентированными графами
    edge_index = to_undirected(
        radius_graph(coordinates, r=graph_radius, max_num_neighbors=n_neighbors)
    )
    # посчитаем расстояния
    src, tgt = edge_index
    distances = torch.linalg.norm(coordinates[src] - coordinates[tgt], dim=1)

    return Data(
        atoms=encoded_atoms,
        residues=encoded_residues,
        is_receptor=is_receptor,
        coordinates=coordinates,
        edge_index=edge_index,
        distances=distances,
        num_nodes=len(encoded_atoms),
    )

In [8]:
interface_graph = create_interface_graph(
    train_data[0]["interface_graph"], graph_radius=4.0, n_neighbors=5
)
interface_graph

Data(edge_index=[2, 2850], atoms=[461], residues=[461], is_receptor=[461], coordinates=[461, 3], distances=[2850], num_nodes=461)

#### Визуализация графа

Глазами смотреть на такие данные совсем неудобно, давайте их визуализируем 

In [9]:
class PlotlyVis:
    @classmethod
    def create_figure(
        cls,
        graph: AtomicInterfaceGraph,
        receptor_color: str = "teal",
        ligand_color: str = "coral",
        figsize: tuple[int, int] = (900, 600),
    ) -> go.Figure:
        traces = cls.plot_graph(graph, receptor_color, ligand_color)
        width, height = figsize
        figure = go.Figure(
            data=traces,
            layout=dict(
                scene=dict(
                    xaxis=dict(visible=False),
                    yaxis=dict(visible=False),
                    zaxis=dict(visible=False),
                ),
                showlegend=False,
                width=width,
                height=height,
                # plot_bgcolor="rgba(0, 0, 0, 1)",
                # paper_bgcolor="rgba(0, 0, 0, 1)",
            ),
        )
        return figure

    @classmethod
    def plot_graph(
        cls,
        graph: AtomicInterfaceGraph,
        receptor_color: str,
        ligand_color: str,
    ) -> go.Figure:
        assert graph.coordinates is not None
        assert graph.edge_index is not None
        # получим ковалентные связи, чтобы нарисовать их по-другому
        ligand_cov, receptor_cov = cls.get_covalent_edges_masks(
            graph, distance_threshold=2.0
        )
        # нарисуем
        data = [
            # вершины рецептора
            cls.draw_nodes(graph, graph.is_receptor == 1),
            # вершины лиганда
            cls.draw_nodes(graph, graph.is_receptor == 0),
            # ковалентные связи лиганда
            cls.draw_edges(
                graph,
                edges_mask=ligand_cov,
                add_annotation=False,
                color=ligand_color,
                dash="solid",
                width=5,
            ),
            # ковалентные связи рецептора
            cls.draw_edges(
                graph,
                edges_mask=receptor_cov,
                add_annotation=False,
                color=receptor_color,
                dash="solid",
                width=5,
            ),
            # все связи в графе
            cls.draw_edges(
                graph,
                edges_mask=None,
                add_annotation=True,
                color="lightgray",
                dash="dot",
                width=1,
            ),
        ]
        return data

    @staticmethod
    def get_covalent_edges_masks(
        graph: AtomicInterfaceGraph, distance_threshold: float = 2.2
    ) -> list[Tensor]:
        src, tgt = graph.edge_index
        covalent_masks = []
        for chain_id in graph.is_receptor.unique():  # type: ignore[no-untyped-call]
            chain_atoms = graph.is_receptor == chain_id
            chain_edges = (
                chain_atoms[src]
                * chain_atoms[tgt]
                * (graph.distances <= distance_threshold)
            )
            covalent_masks.append(chain_edges)
        return covalent_masks

    @staticmethod
    def draw_nodes(
        graph: AtomicInterfaceGraph, nodes_mask: Tensor | None = None
    ) -> go.Scatter3d:
        x, y, z = graph.coordinates[nodes_mask].T
        atom_types = [ATOM_NAMES[x.item()][0] for x in graph.atoms[nodes_mask]]
        atom_colors = [ATOM_COLORS[x] for x in atom_types]
        nodes = go.Scatter3d(
            x=x,
            y=y,
            z=z,
            mode="markers",
            hoverinfo="text",
            text=[ATOM_NAMES[x.item()] for x in graph.atoms[nodes_mask]],
            marker=dict(
                size=4,
                color=atom_colors,
                cmin=0,
                cmax=1,
                opacity=0.8,
            ),
        )
        return nodes

    @staticmethod
    def draw_edges(
        graph: AtomicInterfaceGraph,
        edges_mask: Tensor | None = None,
        add_annotation: bool = False,
        color: str = "lightgray",
        dash: str = "dot",
        width: int = 1,
    ) -> go.Scatter3d:
        selected_edges, distances = graph.edge_index.T, graph.distances
        if edges_mask is not None:
            selected_edges = graph.edge_index.T[edges_mask]
            distances = graph.distances[edges_mask]

        edges_plot = go.Scatter3d(
            x=list(
                itertools.chain(
                    *(
                        (graph.coordinates[i, 0], graph.coordinates[j, 0], None)
                        for i, j in selected_edges
                    )
                )
            ),
            y=list(
                itertools.chain(
                    *(
                        (graph.coordinates[i, 1], graph.coordinates[j, 1], None)
                        for i, j in selected_edges
                    )
                )
            ),
            z=list(
                itertools.chain(
                    *(
                        (graph.coordinates[i, 2], graph.coordinates[j, 2], None)
                        for i, j in selected_edges
                    )
                )
            ),
            mode="lines",
            line=dict(
                color=color,
                width=width,
                dash=dash,
            ),
            text=(
                list(
                    itertools.chain(
                        *((f"{d:.3f}Å", f"{d:.3f}Å", None) for d in distances.tolist())
                    )
                )
                if add_annotation
                else None
            ),
            hoverinfo="text",
        )
        return edges_plot

In [10]:
PlotlyVis.create_figure(interface_graph)

### Датасет и загрузчик данных

In [11]:
class AtomicGraphDataset(Dataset):
    def __init__(
        self, data_json: Path, graph_radius: float = 4.0, n_neighbors: int = 10
    ) -> None:
        self.data: list[AtomicInterfaceGraph] = []
        for x in json.loads(data_json.read_text()):
            item = create_interface_graph(
                x["interface_graph"], graph_radius, n_neighbors
            )
            item.affinity = x["affinity"]
            self.data.append(item)

    def __getitem__(self, index: int) -> Data:
        return self.data[index]

    def __len__(self) -> int:
        return len(self.data)

In [12]:
train_dataset = AtomicGraphDataset(Path("../datasets/affinity_train.json"))
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True)

In [13]:
train_batch = next(iter(train_loader))
train_batch

DataBatch(edge_index=[2, 9030], atoms=[1008], residues=[1008], is_receptor=[1008], coordinates=[1008, 3], distances=[9030], num_nodes=1008, affinity=[4], batch=[1008], ptr=[5])

В полученном батче лежат 4 исходных графа, теперь мы можем обрабатывать их параллельно, а когда нам потребуются операции для вершин внутри исходных графов (например, агрегация эмбеддингов вершин в эмбеддинг графа) — мы можем получить индексы исходных графов для всех вершин:

In [14]:
train_batch.batch

tensor([0, 0, 0,  ..., 3, 3, 3])

In [15]:
train_batch.ptr

tensor([   0,  283,  542,  774, 1008])

Объект класса `DataBatch` поддерживает тот же интерфейс, что и объекты, из которых мы его построили — значит, батч тоже можно визуализировать:

In [16]:
PlotlyVis.create_figure(train_batch)

### Строим графовую сеть

Архитектура для регрессии / классификации графов сводится к трём составляющим:
- получение эмбеддингов вершин через несколько раундов обмена сообщениями (обработка графовыми слоями)
  
  $h_i^{(t+1)} = h_i^{(t)} + \bigoplus_{j \in \mathcal{N}_i} f_{\theta}(h_i^{(t)}, h_j^{(t)})$
- агрегация эмбеддингов вершин в эмбеддинг графа (readout layer)
  
  $h_\mathcal{V}^{(t)} = \bigoplus_{j \in \mathcal{V}} h_j^{(t)}$
- обучение регрессора / классификатора на графовом эмбеддинге
  
  $\hat{y} = \text{MLP}(h_\mathcal{V}^{(t)})$

Опишем архитектуру так, чтобы можно было использовать разные графовые слои:

In [17]:
from typing import Type

import torch.nn.functional as F
from torch import nn
from torch_geometric.nn.conv import GCNConv, GraphConv, MessagePassing
from torch_geometric.nn.pool import global_add_pool, global_max_pool, global_mean_pool


class GraphNet(nn.Module):
    def __init__(
        self,
        hidden_dim: int,
        graph_layer: Type[MessagePassing],
        n_layers: int,
        dropout: float = 0.5,
    ):
        super().__init__()
        # эмбеддинг для типов атомов
        self.embed = nn.Embedding(len(ATOMS_INDICES) + 1, hidden_dim)
        # список графовых слоёв
        self.conv = nn.ModuleList(
            [graph_layer(hidden_dim, hidden_dim) for _ in range(n_layers)]
        )
        # линейный слой для регрессии
        self.fc = nn.Linear(hidden_dim, 1)
        self.dropout = nn.Dropout(dropout, inplace=True)

    def forward(self, batch: AtomicInterfaceGraph):
        # 1. Эмбеддинги вершин
        x = self.embed(batch.atoms)
        for conv in self.conv:
            x = (x + conv(x, batch.edge_index)).relu()

        # 2. Эмбеддинг графа: усреднение по вершинам отдельных графов
        x = global_mean_pool(x, batch.batch)  # [batch_size, hidden_channels]

        # 3. Финальный регрессор поверх эмбеддинга графа
        x = self.dropout(x)
        x = self.fc(x)
        return x

Для первой модели возьмём самый простой графовый слой `GCNConv`:

$h_i^{(t+1)} = \mathbf{W}^{\top} \sum_{j \in
        \mathcal{N}(i) \cup \{ i \}} \frac{e_{j,i}}{\sqrt{\hat{d}_j
        \hat{d}_i}} h_j^{(t)}$

- $e_{j,i}$ — вес ребра между вершинами $i$ и $j$
- $\hat{d}_i$ — степень вершины $i$

In [18]:
model = GraphNet(hidden_dim=64, graph_layer=GCNConv, n_layers=3, dropout=0.1)
model.forward(train_batch)

tensor([[-0.5249],
        [-0.4592],
        [-0.5499],
        [-0.5723]], grad_fn=<AddmmBackward0>)

Попробуем обучить, считая корреляции Пирсона и Спирмена в качестве метрик:

In [19]:
from scipy.stats import pearsonr, spearmanr

test_dataset = AtomicGraphDataset(Path("../datasets/affinity_test.json"))

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


@torch.no_grad()
def validate(loader: DataLoader, model: nn.Module) -> tuple[list[float], list[float]]:
    model.eval()
    ys = []
    yhats = []
    loss = 0.0
    for batch in loader:
        yhat = model.forward(batch)
        yhats.extend(yhat.flatten().tolist())
        ys.extend(batch.affinity.tolist())
        loss += F.huber_loss(yhat.flatten(), batch.affinity).item()

    print(f"Loss: {loss / len(ys):.4f}, ", end="")
    print(f"Pearson R: {pearsonr(ys, yhats).statistic:.4f}, ", end="")
    print(f"Spearman R: {spearmanr(ys, yhats).statistic:.4f}")
    model.train()
    return yhats, ys

In [20]:
torch.manual_seed(42)
model = GraphNet(hidden_dim=32, graph_layer=GCNConv, n_layers=3, dropout=0.1)
optim = Adam(model.parameters(), lr=0.001, weight_decay=1.0)

In [21]:
for i in range(50):
    model.train()
    for batch in train_loader:
        yhat = model.forward(batch)
        # loss = F.mse_loss(yhat.flatten(), batch.affinity)
        loss = F.huber_loss(yhat.flatten(), batch.affinity)
        loss.backward()
        optim.step()
        optim.zero_grad()

    if (i + 1) % 10 == 0:
        # validate(train_loader, model)
        validate(test_loader, model)

Loss: 0.1492, Pearson R: -0.1062, Spearman R: -0.0666
Loss: 0.0519, Pearson R: -0.1117, Spearman R: -0.0934
Loss: 0.0529, Pearson R: -0.1163, Spearman R: -0.0990
Loss: 0.0495, Pearson R: -0.1176, Spearman R: -0.1023
Loss: 0.0491, Pearson R: -0.1171, Spearman R: -0.1051


In [22]:
_ = validate(train_loader, model)

Loss: 0.0386, Pearson R: 0.0303, Spearman R: 0.0844


In [23]:
# import plotly.express as px

# yhat, ys = validate(test_loader, model)
# px.scatter(x=yhat, y=ys)

Результат не впечатляет: на тестовой выборке корреляция предсказаний с верными значениями отрицательная (да и, скорее всего, статистически незначимая), да и на обучающей выборке результат не очень.

Попробуем другой графовый слой `GraphConv`, более гибко агрегирующий информацию от соседей:

$h_i^{(t+1)} = \mathbf{W}_1 h_i^{(t)} + \mathbf{W}_2
        \sum_{j \in \mathcal{N}(i)} e_{j,i} \cdot h_j^{(t)}$

In [24]:
torch.manual_seed(42)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)
model = GraphNet(hidden_dim=32, graph_layer=GraphConv, n_layers=3, dropout=0.1)
optim = Adam(model.parameters(), lr=0.001, weight_decay=1.0)

In [25]:
for i in range(50):
    for batch in train_loader:
        yhat = model.forward(batch)
        loss = F.mse_loss(yhat.flatten(), batch.affinity)
        # loss = F.huber_loss(yhat.flatten(), batch.affinity)
        loss.backward()
        optim.step()
        optim.zero_grad()

    if (i + 1) % 10 == 0:
        # validate(train_loader, model)
        validate(test_loader, model)

Loss: 0.0370, Pearson R: 0.1719, Spearman R: 0.1496
Loss: 0.0414, Pearson R: 0.1573, Spearman R: 0.1221
Loss: 0.0359, Pearson R: 0.1055, Spearman R: 0.0687
Loss: 0.0444, Pearson R: 0.0768, Spearman R: 0.0445
Loss: 0.0331, Pearson R: 0.0815, Spearman R: 0.0577


In [26]:
import plotly.express as px

yhat, ys = validate(test_loader, model)
px.scatter(x=yhat, y=ys)

Loss: 0.0331, Pearson R: 0.0815, Spearman R: 0.0577


### Изменения в графе

Посмотрим снова на наши данные.

Наши графы довольно большие, и в финальном слое, когда мы получаем эмбеддинг графа, мы делаем усреднение по большому числу вершин.

Можно ли уменьшить наши графы, сохранив только важные вершины для агрегации?

In [27]:
interface_graph

Data(edge_index=[2, 2850], atoms=[461], residues=[461], is_receptor=[461], coordinates=[461, 3], distances=[2850], num_nodes=461)

In [28]:
PlotlyVis.create_figure(interface_graph)

Идея следующая: что если оставить только рёбра между атомами, которые принадлежат разным белкам, и после этого удалить из графа изолированные вершины?

In [29]:
from torch_geometric.transforms.remove_isolated_nodes import RemoveIsolatedNodes

interface_bigraph = interface_graph.clone()
src, tgt = interface_bigraph.edge_index
intermolecular_edges = (
    (interface_graph.is_receptor[src] - interface_graph.is_receptor[tgt]).abs().bool()
)
interface_bigraph.edge_index = interface_bigraph.edge_index[:, intermolecular_edges]
interface_bigraph.distances = interface_bigraph.distances[intermolecular_edges]

interface_bigraph = RemoveIsolatedNodes().forward(interface_bigraph)
PlotlyVis.create_figure(interface_bigraph)

In [30]:
class AtomicGraphDataset(Dataset):
    def __init__(
        self, data_json: Path, graph_radius: float = 4.0, n_neighbors: int = 10
    ) -> None:
        self.data: list[AtomicInterfaceGraph] = []
        for x in json.loads(data_json.read_text()):
            item = create_interface_graph(
                x["interface_graph"], graph_radius, n_neighbors
            )
            item.affinity = x["affinity"]
            self.data.append(self.remove_intermolecular_edges(item))

    def __getitem__(self, index: int) -> Data:
        return self.data[index]

    def __len__(self) -> int:
        return len(self.data)

    @staticmethod
    def remove_intermolecular_edges(
        interface_graph: AtomicInterfaceGraph,
    ) -> AtomicInterfaceGraph:
        interface_bigraph = interface_graph.clone()
        src, tgt = interface_bigraph.edge_index
        intermolecular_edges = (
            (interface_graph.is_receptor[src] - interface_graph.is_receptor[tgt])
            .abs()
            .bool()
        )
        interface_bigraph.edge_index = interface_bigraph.edge_index[
            :, intermolecular_edges
        ]
        interface_bigraph.distances = interface_bigraph.distances[intermolecular_edges]

        interface_bigraph = RemoveIsolatedNodes().forward(interface_bigraph)
        return interface_bigraph


train_dataset = AtomicGraphDataset(
    Path("../datasets/affinity_train.json"), graph_radius=6.0, n_neighbors=12
)
test_dataset = AtomicGraphDataset(
    Path("../datasets/affinity_test.json"), graph_radius=6.0, n_neighbors=12
)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [31]:
torch.manual_seed(42)
model = GraphNet(hidden_dim=32, graph_layer=GraphConv, n_layers=3, dropout=0.1)
optim = Adam(model.parameters(), lr=0.001, weight_decay=1.0)

In [32]:
for i in range(50):
    for batch in train_loader:
        yhat = model.forward(batch)
        loss = F.mse_loss(yhat.flatten(), batch.affinity)
        # loss = F.huber_loss(yhat.flatten(), batch.affinity)
        loss.backward()
        optim.step()
        optim.zero_grad()

    if (i + 1) % 10 == 0:
        validate(test_loader, model)

Loss: 0.0397, Pearson R: 0.3572, Spearman R: 0.3324
Loss: 0.0395, Pearson R: 0.4279, Spearman R: 0.3918
Loss: 0.0312, Pearson R: 0.4467, Spearman R: 0.3852
Loss: 0.0346, Pearson R: 0.4468, Spearman R: 0.3804
Loss: 0.0343, Pearson R: 0.4340, Spearman R: 0.3627


In [33]:
import plotly.express as px

yhat, ys = validate(test_loader, model)
px.scatter(x=yhat, y=ys)

Loss: 0.0343, Pearson R: 0.4340, Spearman R: 0.3627
