In [1]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import MessagePassing
from torch_geometric.data import Data, DataLoader
from torch_geometric.utils import add_self_loops

In [2]:
origin_df = pd.read_csv('features.csv')
print(origin_df.head)

<bound method NDFrame.head of       bead1 bead2 bead3 bead4 bead5  charge1  charge2  charge3  charge4  \
0        C1  SQ3p    N2   N4a    N6        0        1      0.0      0.0   
1        C1  SQ3p    N2    N6    N6        0        1      0.0      0.0   
2        C1  SQ3p   N1a    N6    N6        0        1      0.0      0.0   
3        C1  SQ3p   N1a   N4a    N6        0        1      0.0      0.0   
4        C1  SQ3p   N3r   N3r    N6        0        1      0.0      0.0   
...     ...   ...   ...   ...   ...      ...      ...      ...      ...   
12119    C1    N6  SQ2p   NaN   NaN        0        0      1.0      NaN   
12120    C1    N6    Q2   NaN   NaN        0        0      1.0      NaN   
12121    C1    N6    Q5   NaN   NaN        0        0     -1.0      NaN   
12122    C1  SQ3p   NaN   NaN   NaN        0        1      NaN      NaN   
12123    C1  SQ2p   NaN   NaN   NaN        0        1      NaN      NaN   

       charge5  ...  label_NM5 label_HB5 label_PN5 label_SI5 label_SZ

In [3]:
max_beads = 5
origin_df.replace('none', np.nan, inplace=True)
all_types_NM = ['N1', 'N2', 'N3', 'N4', 'N5', 'N6',
                'P1', 'P2', 'P3', 'P4', 'P5', 'P6', 
                'Q1', 'Q2', 'Q3', 'Q4', 'Q5', 'D',
                'C1']
all_types_HB = ['a', 'd']
all_types_PN = ['p', 'n']
all_types_SI = ['r', 'h']
all_types_SZ = ['S', 'R']


label_NM = [f'label_NM{i}' for i in range(1, max_beads+1)]
label_HB = [f'label_HB{i}' for i in range(1, max_beads+1)]
label_PN = [f'label_PN{i}' for i in range(1, max_beads+1)]
label_SI = [f'label_SI{i}' for i in range(1, max_beads+1)]
label_SZ = [f'label_SZ{i}' for i in range(1, max_beads+1)]

def one_hot_encode_column(column, all_categories):
    dummies = pd.get_dummies(column, prefix=column.name)
    # Reindex to ensure all categories are included, filling with 0s
    for category in all_categories:
        col_name = f"{column.name}_{category}"
        if col_name not in dummies.columns:
            dummies[col_name] = 0
    return dummies

# Create a copy of the dataframe to modify
encoded_df = origin_df.copy()

# one-hot encoding
for col in label_NM:
    encoded_dummies = one_hot_encode_column(encoded_df[col], all_types_NM)
    encoded_df = encoded_df.drop(columns=[col]).join(encoded_dummies)

for col in label_HB:
    encoded_dummies = one_hot_encode_column(encoded_df[col], all_types_HB)
    encoded_df = encoded_df.drop(columns=[col]).join(encoded_dummies)

for col in label_PN:
    encoded_dummies = one_hot_encode_column(encoded_df[col], all_types_PN)
    encoded_df = encoded_df.drop(columns=[col]).join(encoded_dummies)

for col in label_SI:
    encoded_dummies = one_hot_encode_column(encoded_df[col], all_types_SI)
    encoded_df = encoded_df.drop(columns=[col]).join(encoded_dummies)

for col in label_SZ:
    encoded_dummies = one_hot_encode_column(encoded_df[col], all_types_SZ)
    encoded_df = encoded_df.drop(columns=[col]).join(encoded_dummies)


print(encoded_df.columns)

Index(['bead1', 'bead2', 'bead3', 'bead4', 'bead5', 'charge1', 'charge2',
       'charge3', 'charge4', 'charge5',
       ...
       'label_SZ1_R', 'label_SZ1_S', 'label_SZ2_R', 'label_SZ2_S',
       'label_SZ3_R', 'label_SZ3_S', 'label_SZ4_R', 'label_SZ4_S',
       'label_SZ5_R', 'label_SZ5_S'],
      dtype='object', length=151)


In [4]:
#replace all nan with 0 (charge & delta_G)
encoded_df.replace(np.nan, 0, inplace=True)

df = encoded_df.copy()
df = df.drop(columns=['bead1','bead2','bead3','bead4', 'bead5'])
num_compounds = len(df)    #k
num_features = int(len(df.drop(columns=['net_charge']).columns)/max_beads+1)   #i
print(num_compounds, num_features)

12124 30


In [5]:
#N
columns = ['charge', 'delta_G', 'label_NM', 'label_HB', 'label_PN', 'label_SI', 'label_SZ']
arrays = {}
for i in range(1, max_beads+1):
    charge = [f'charge{i}']
    delta_G = [f'delta_G{i}']
    label_NM = [f'label_NM{i}_{j}' for j in all_types_NM]
    label_HB = [f'label_HB{i}_{j}' for j in all_types_HB]
    label_PN = [f'label_PN{i}_{j}' for j in all_types_PN]
    label_SI = [f'label_SI{i}_{j}' for j in all_types_SI]
    label_SZ = [f'label_SZ{i}_{j}' for j in all_types_SZ]
    net_charge = [f'net_charge']
    arrays[f'array_{i}'] = df[charge + delta_G + label_NM + label_HB + label_PN + label_SI + label_SZ + net_charge].values

array_1, array_2, array_3, array_4, array_5= arrays['array_1'], arrays['array_2'], arrays['array_3'], arrays['array_4'], arrays['array_5']

# Check the shapes of the arrays
print([array.shape for array in [array_1, array_2, array_3, array_4, array_5]])

N = np.stack([array_1, array_2, array_3, array_4, array_5], axis = -1)
print(N.shape)

[(12124, 30), (12124, 30), (12124, 30), (12124, 30), (12124, 30)]
(12124, 30, 5)


In [6]:
#A
def create_adjacency_matrix(beads, max_beads=5):
    matrix = np.zeros((max_beads, max_beads), dtype=int)

    if num_beads > 0:
        np.fill_diagonal(matrix[:num_beads, :num_beads], 1)

        for i in range(num_beads - 1):
            matrix[i, i + 1] = 1
            matrix[i + 1, i] = 1

    return matrix

A = []

for _, row in origin_df.iterrows():
    beads = [row[f'bead{i}'] for i in range(1, 6)]
    num_beads = sum(pd.notna(beads))
    adjacency_matrix = create_adjacency_matrix(beads)
    A.append(adjacency_matrix)
    
A=np.array(A)
print(A.shape)
print(A[12123])

(12124, 5, 5)
[[1 1 0 0 0]
 [1 1 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]


In [7]:
#Edge
itp_file_path = 'martini_v3.0.0 1.itp'
itp_data = pd.read_csv(itp_file_path, delim_whitespace=True, header=None)
sigma_dict = {}
epsilon_dict = {}

for _, row in itp_data.iterrows():
    bead1, bead2, _, sigma, epsilon = row
    pair = (bead1, bead2)
    sigma_dict[pair] = sigma
    epsilon_dict[pair] = epsilon

def generate_edge(row, sigma_dict, epsilon_dict):
    beads = [row['bead1'], row['bead2'], row['bead3'], row['bead4'], row['bead5']]
    sigma_tensor = np.zeros((max_beads, max_beads))
    epsilon_tensor = np.zeros((max_beads, max_beads))
    
    for i in range(max_beads):
        for j in range(max_beads):
            if pd.notna(beads[i]) and pd.notna(beads[j]):
                pair = (beads[i], beads[j])
                # Use get() with reversed pair as well to handle both (bead1, bead2) and (bead2, bead1)
                sigma_tensor[i, j] = sigma_dict.get(pair, sigma_dict.get((beads[j], beads[i]), 0))
                epsilon_tensor[i, j] = epsilon_dict.get(pair, epsilon_dict.get((beads[j], beads[i]), 0))
    
    return sigma_tensor, epsilon_tensor

# Generate tensors for each row 
sigma_tensors = []
epsilon_tensors = []

for _, row in origin_df.iterrows():
    sigma_tensor, epsilon_tensor = generate_edge(row, sigma_dict, epsilon_dict)
    sigma_tensors.append(sigma_tensor)
    epsilon_tensors.append(epsilon_tensor)

sigma_tensors = np.array(sigma_tensors)
epsilon_tensors = np.array(epsilon_tensors)

print(sigma_tensors.shape, epsilon_tensors.shape)
sigma_tensors[-1], epsilon_tensors[-1]


(12124, 5, 5) (12124, 5, 5)


(array([[0.47 , 0.443, 0.   , 0.   , 0.   ],
        [0.443, 0.41 , 0.   , 0.   , 0.   ],
        [0.   , 0.   , 0.   , 0.   , 0.   ],
        [0.   , 0.   , 0.   , 0.   , 0.   ],
        [0.   , 0.   , 0.   , 0.   , 0.   ]]),
 array([[3.39 , 2.464, 0.   , 0.   , 0.   ],
        [2.464, 2.52 , 0.   , 0.   , 0.   ],
        [0.   , 0.   , 0.   , 0.   , 0.   ],
        [0.   , 0.   , 0.   , 0.   , 0.   ],
        [0.   , 0.   , 0.   , 0.   , 0.   ]]))

In [8]:
np.save('N.npy', N)
np.save('A.npy', A)
np.save('sigma.npy', sigma_tensors)
np.save('epsilon.npy', epsilon_tensors)

In [9]:
N = torch.tensor(np.load('N.npy'), dtype=torch.float)
A = torch.tensor(np.load('A.npy'), dtype=torch.float)
sigma = torch.tensor(np.load('sigma.npy'), dtype=torch.float)
epsilon = torch.tensor(np.load('epsilon.npy'), dtype=torch.float)

In [10]:
#MPNN works well for the multi-dimensional edge features,while GNN does not
#Transformer is also a good choice for the sequence, where the edges can be treated as node features
class MPNNLayer(MessagePassing):
    def __init__(self, node_in_features, node_out_features, edge_features):
        super(MPNNLayer, self).__init__(aggr='add')
        self.node_lin = torch.nn.Linear(node_in_features, node_out_features)
        self.edge_lin = torch.nn.Linear(edge_features, node_out_features)
    
    def forward(self, x, edge_index, edge_attr):
        # x has shape [N, node_in_features]
        # edge_index has shape [2, E]
        # edge_attr has shape [E, edge_features]
        edge_out = self.edge_lin(edge_attr)
        return self.propagate(edge_index, x=self.node_lin(x), edge_attr=edge_out)
    
    def message(self, x_j, edge_attr):
        return x_j + edge_attr

    def update(self, aggr_out):
        return aggr_out

# Define the MPNN Encoder
class MPNNEncoder(torch.nn.Module):
    def __init__(self, node_in_features, node_out_features, edge_features, latent_dim):
        super(MPNNEncoder, self).__init__()
        self.mpnn1 = MPNNLayer(node_in_features, node_out_features, edge_features)
        self.mpnn2 = MPNNLayer(node_out_features, latent_dim, edge_features)
    
    def forward(self, data):
        x, edge_index, edge_attr = data.x, data.edge_index, data.edge_attr
        x = self.mpnn1(x, edge_index, edge_attr)
        x = F.relu(x)
        x = self.mpnn2(x, edge_index, edge_attr)
        return x

# Prepare the dataset
def create_data_objects(N, A, sigma, epsilon):
    data_list = []
    for i in range(N.shape[0]):
        edge_index = torch.nonzero(A[i]).t().contiguous()
        edge_attr = torch.stack((sigma[i][A[i] == 1], epsilon[i][A[i] == 1]), dim=-1)
        data = Data(x=N[i], edge_index=edge_index, edge_attr=edge_attr)
        data_list.append(data)
    return data_list

In [11]:
data = create_data_objects(N, A, sigma, epsilon)
encoder = MPNNEncoder(30, 128, 2, 16)
encoder(data)

AttributeError: 'list' object has no attribute 'x'