In [1]:
from pathlib import Path
from time import time
import argparse
import shutil
import random

import matplotlib.pyplot as plt
import seaborn as sns

from tqdm import tqdm
import numpy as np

from Bio.PDB import PDBParser
from Bio.PDB.Polypeptide import three_to_one, is_aa
from rdkit import Chem
from scipy.ndimage import gaussian_filter

import torch

from analysis.molecule_builder import build_molecule
from analysis.metrics import rdmol_to_smiles
import constants
from constants import covalent_radii, dataset_params


def process_ligand_and_pocket(pdbfile, sdffile,
                              atom_dict, dist_cutoff, ca_only):
    pdb_struct = PDBParser(QUIET=True).get_structure('', pdbfile)

    try:
        ligand = Chem.SDMolSupplier(str(sdffile))[0]
    except:
        raise Exception(f'cannot read sdf mol ({sdffile})')

    # remove H atoms if not in atom_dict, other atom types that aren't allowed
    # should stay so that the entire ligand can be removed from the dataset
    lig_atoms = [a.GetSymbol() for a in ligand.GetAtoms()
                 if (a.GetSymbol().capitalize() in atom_dict or a.element != 'H')]
    lig_coords = np.array([list(ligand.GetConformer(0).GetAtomPosition(idx))
                           for idx in range(ligand.GetNumAtoms())])

    try:
        lig_one_hot = np.stack([
            np.eye(1, len(atom_dict), atom_dict[a.capitalize()]).squeeze()
            for a in lig_atoms
        ])
    except KeyError as e:
        raise KeyError(
            f'{e} not in atom dict ({sdffile})')

    # Find interacting pocket residues based on distance cutoff
    pocket_residues = []
    for residue in pdb_struct[0].get_residues():
        res_coords = np.array([a.get_coord() for a in residue.get_atoms()])
        if is_aa(residue.get_resname(), standard=True) and \
                (((res_coords[:, None, :] - lig_coords[None, :, :]) ** 2).sum(
                    -1) ** 0.5).min() < dist_cutoff:
            pocket_residues.append(residue)

    pocket_ids = [f'{res.parent.id}:{res.id[1]}' for res in pocket_residues]
    ligand_data = {
        'lig_coords': lig_coords,
        'lig_one_hot': lig_one_hot,
    }
    if ca_only:
        try:
            pocket_one_hot = []
            full_coords = []
            for res in pocket_residues:
                for atom in res.get_atoms():
                    if atom.name == 'CA':
                        pocket_one_hot.append(np.eye(1, len(amino_acid_dict),
                                                     amino_acid_dict[three_to_one(res.get_resname())]).squeeze())
                        full_coords.append(atom.coord)
            pocket_one_hot = np.stack(pocket_one_hot)
            full_coords = np.stack(full_coords)
        except KeyError as e:
            raise KeyError(
                f'{e} not in amino acid dict ({pdbfile}, {sdffile})')
        pocket_data = {
            'pocket_coords': full_coords,
            'pocket_one_hot': pocket_one_hot,
            'pocket_ids': pocket_ids
        }
    else:
        full_atoms = np.concatenate(
            [np.array([atom.element for atom in res.get_atoms()])
             for res in pocket_residues], axis=0)
        full_coords = np.concatenate(
            [np.array([atom.coord for atom in res.get_atoms()])
             for res in pocket_residues], axis=0)
        try:
            pocket_one_hot = []
            for a in full_atoms:
                if a in amino_acid_dict:
                    atom = np.eye(1, len(amino_acid_dict),
                                  amino_acid_dict[a.capitalize()]).squeeze()
                elif a != 'H':
                    atom = np.eye(1, len(amino_acid_dict),
                                  len(amino_acid_dict)).squeeze()
                pocket_one_hot.append(atom)
            pocket_one_hot = np.stack(pocket_one_hot)
        except KeyError as e:
            raise KeyError(
                f'{e} not in atom dict ({pdbfile})')
        pocket_data = {
            'pocket_coords': full_coords,
            'pocket_one_hot': pocket_one_hot,
            'pocket_ids': pocket_ids
        }
    return ligand_data, pocket_data


In [2]:
if __name__ == "__main__":
    pdb_path = r"data\1B57_HUMAN_25_300_0\3upr_C_rec_3upr_1kx_lig_tt_docked_3_pocket10.pdb"   
    sdf_path = r"data\1B57_HUMAN_25_300_0\3upr_C_rec_3upr_1kx_lig_tt_docked_3.sdf"     

    dataset_info = dataset_params['crossdock']
    amino_acid_dict = dataset_info['aa_encoder']
    atom_dict = dataset_info['atom_encoder']
    atom_decoder = dataset_info['atom_decoder']

    lig_data, poc_data = process_ligand_and_pocket(
        pdb_path, sdf_path, atom_dict, dist_cutoff=6.0, ca_only=True
    )

In [None]:
lig_data

<function dict.get(key, default=None, /)>

In [3]:
poc_data

{'pocket_coords': array([[ -5.186, -21.961, -40.769],
        [-13.509, -25.861, -41.57 ],
        [-13.373, -20.89 , -45.005],
        [-15.864, -17.012, -46.011],
        [ -7.434, -15.861, -45.299],
        [ -6.26 , -17.397, -38.901],
        [ -4.254, -18.012, -35.74 ],
        [ -5.856, -19.227, -32.533],
        [ -7.957, -15.633, -32.081],
        [ -6.804, -13.195, -34.757],
        [ -6.852, -13.177, -38.534],
        [ -5.538, -11.622, -41.712],
        [ -8.599,  -8.379, -40.289],
        [-10.153,  -9.125, -36.888],
        [ -8.666, -10.079, -33.526],
        [-10.155, -11.378, -30.292],
        [-13.094,  -7.565, -30.992],
        [-17.94 ,  -9.18 , -39.311],
        [-19.94 , -11.863, -34.229],
        [-19.094, -15.751, -30.874],
        [-14.2  , -18.88 , -28.346],
        [ -5.049, -16.467, -42.392],
        [-15.012, -11.449, -48.05 ],
        [ -6.977, -11.001, -45.152]], dtype=float32),
 'pocket_one_hot': array([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,

In [6]:
from egnn.egnn_sh import EGNN_Spherical
import torch
import torch.nn as nn
import numpy as np
import requests
from pathlib import Path
from rdkit import Chem
import py3Dmol

# e3nn and other dependencies for the model
from e3nn import o3

In [None]:
x = torch.from_numpy(lig_data['lig_coords']).float()
h = torch.from_numpy(lig_data['lig_one_hot']).float()

print(f" - Kích thước tensor tọa độ (x): {x.shape}")
print(f" - Kích thước tensor feature (h): {h.shape}")



in_node_nf = h.shape[1]
hidden_nf = 64

model = EGNN_Spherical(
    in_node_nf=in_node_nf,
    hidden_nf=hidden_nf
)

 - Kích thước tensor tọa độ (x): torch.Size([21, 3])
 - Kích thước tensor feature (h): torch.Size([21, 10])


In [13]:
def build_edge_index(coords, max_radius):
    dist_matrix = torch.cdist(coords, coords, p=2)
    # Kết nối các nút trong bán kính, loại bỏ tự kết nối (i,i)
    adj_matrix = (dist_matrix < max_radius) & (dist_matrix > 0)
    edge_index = torch.stack(torch.where(adj_matrix)).long()
    return edge_index

MAX_RADIUS = 5.0 # Bán kính để coi các nguyên tử là "hàng xóm"
edge_index = build_edge_index(x, MAX_RADIUS)

print(f"Đã tạo 'edge_index' với hình dạng: {edge_index.shape}")
print("Ví dụ 5 cạnh đầu tiên (cặp chỉ số nguyên tử):")
print(edge_index[:, :5])

Đã tạo 'edge_index' với hình dạng: torch.Size([2, 248])
Ví dụ 5 cạnh đầu tiên (cặp chỉ số nguyên tử):
tensor([[0, 0, 0, 0, 0],
        [1, 2, 3, 4, 5]])


In [16]:
in_node_nf = h.shape[1]
hidden_nf = 64
out_node_nf = h.shape[1] 
in_edge_nf = 0
n_layers = 3    
lmax = 2       

# Khởi tạo mô hình
model = EGNN_Spherical(
    in_node_nf=in_node_nf,
    in_edge_nf=in_edge_nf,
    hidden_nf=hidden_nf,
    out_node_nf=out_node_nf,
    n_layers=n_layers,
    lmax=lmax,
    max_radius=MAX_RADIUS
)

print("Mô hình EGNN_Spherical đã được khởi tạo:")
print(model)



Mô hình EGNN_Spherical đã được khởi tạo:
EGNN_Spherical(
  (radial_basis): ExpNormalSmearing(
    (cutoff_fn): CosineCutoff()
  )
  (embedding): Sequential(
    (0): Linear(in_features=10, out_features=64, bias=True)
    (1): SiLU()
    (2): Linear(in_features=64, out_features=32, bias=True)
  )
  (scalar_to_irreps): Linear(32x0e -> 32x0e+16x1o+8x2e | 1024 weights)
  (conv_layers): ModuleList(
    (0-2): 3 x SphericalHarmonicsBlock(
      (conv): SeparableSphericalConvolution(
        (tp): TensorProduct(32x0e+16x1o+8x2e x 1x0e+1x1o+1x2e -> 32x0e+16x1o+8x2e | 3456 paths | 3456 weights)
        (edge_mlp): Sequential(
          (0): Linear(in_features=16, out_features=64, bias=True)
          (1): SiLU()
          (2): Linear(in_features=64, out_features=3456, bias=True)
        )
        (self_interaction): Linear(32x0e+16x1o+8x2e -> 32x0e+16x1o+8x2e | 1344 weights)
        (norm): BatchNorm (32x0e+16x1o+8x2e, eps=1e-05, momentum=0.1)
      )
    )
  )
  (output_head): ScalarOutputHead

In [17]:
edge_sh, edge_length_embedded = model.compute_edge_features(x, edge_index)

print(f"Hình dạng Spherical Harmonics (edge_sh): {edge_sh.shape}")
print("-> Mỗi cạnh giờ đây có một vector đặc trưng hướng E(3).")
print(f"Hình dạng Nhúng Khoảng cách (edge_length_embedded): {edge_length_embedded.shape}")
print("-> Mỗi cạnh có một vector đặc trưng mã hóa khoảng cách.")

Hình dạng Spherical Harmonics (edge_sh): torch.Size([248, 9])
-> Mỗi cạnh giờ đây có một vector đặc trưng hướng E(3).
Hình dạng Nhúng Khoảng cách (edge_length_embedded): torch.Size([248, 16])
-> Mỗi cạnh có một vector đặc trưng mã hóa khoảng cách.


In [19]:
print("\n--- Bước 5.2: Nhúng Đặc trưng Nút (Input Embedding) ---")

# Lớp embedding ban đầu (Linear)
h_embedded = model.embedding(h)
print(f"Hình dạng sau lớp Linear embedding (h_embedded): {h_embedded.shape}")
print("-> Chuyển one-hot vector thành vector đặc trưng dày đặc.")

# Nâng lên không gian irreps
h_irrpes_initial = model.scalar_to_irreps(h_embedded)
print(f"Hình dạng sau khi nâng lên Irreps (h_irrpes_initial): {h_irrpes_initial.shape}")
print("-> Đặc trưng giờ nằm trong không gian E(3), sẵn sàng cho tích chập.")


--- Bước 5.2: Nhúng Đặc trưng Nút (Input Embedding) ---
Hình dạng sau lớp Linear embedding (h_embedded): torch.Size([21, 32])
-> Chuyển one-hot vector thành vector đặc trưng dày đặc.
Hình dạng sau khi nâng lên Irreps (h_irrpes_initial): torch.Size([21, 120])
-> Đặc trưng giờ nằm trong không gian E(3), sẵn sàng cho tích chập.


In [20]:
print("\n--- Bước 5.3: Cập nhật Nút qua các Lớp Tích chập ---")

h_irrpes_current = h_irrpes_initial.clone()

for i, conv_layer in enumerate(model.conv_layers):
    print(f"\n[Lớp {i+1}/{model.n_layers}]")
    print(f"  Input h_irrpes shape: {h_irrpes_current.shape}")
    
    # Áp dụng lớp tích chập
    h_irrpes_current = conv_layer(
        h=h_irrpes_current,
        edge_index=edge_index,
        edge_sh=edge_sh,
        edge_features=edge_length_embedded
    )
    
    print(f"  Output h_irrpes shape: {h_irrpes_current.shape}")
    print("  -> Đặc trưng nút được cập nhật bằng thông tin từ hàng xóm.")

h_irrpes_final = h_irrpes_current


--- Bước 5.3: Cập nhật Nút qua các Lớp Tích chập ---

[Lớp 1/3]
  Input h_irrpes shape: torch.Size([21, 120])
  Output h_irrpes shape: torch.Size([21, 120])
  -> Đặc trưng nút được cập nhật bằng thông tin từ hàng xóm.

[Lớp 2/3]
  Input h_irrpes shape: torch.Size([21, 120])
  Output h_irrpes shape: torch.Size([21, 120])
  -> Đặc trưng nút được cập nhật bằng thông tin từ hàng xóm.

[Lớp 3/3]
  Input h_irrpes shape: torch.Size([21, 120])
  Output h_irrpes shape: torch.Size([21, 120])
  -> Đặc trưng nút được cập nhật bằng thông tin từ hàng xóm.


In [21]:
print("\n--- Bước 5.4: Tính toán Đầu ra từ Đặc trưng Cuối cùng ---")

print(f"Đặc trưng cuối cùng h_irrpes_final có hình dạng: {h_irrpes_final.shape}")

# Đầu ra vô hướng (Scalar Output)
h_out = model.output_head(h_irrpes_final)
print(f"\nĐặc trưng vô hướng đầu ra (h_out): {h_out.shape}")
print("-> Đây là các đặc trưng mới, bất biến với phép quay cho mỗi nguyên tử.")

# Đầu ra vector (Vector Output)
coord_update = model.coord_head(h_irrpes_final)
print(f"\nVector cập nhật tọa độ (coord_update): {coord_update.shape}")
print("-> Đây là vector 'dịch chuyển' dự đoán cho mỗi nguyên tử.")
print("Ví dụ 5 vector dịch chuyển đầu tiên:\n", coord_update[:5])


--- Bước 5.4: Tính toán Đầu ra từ Đặc trưng Cuối cùng ---
Đặc trưng cuối cùng h_irrpes_final có hình dạng: torch.Size([21, 120])

Đặc trưng vô hướng đầu ra (h_out): torch.Size([21, 10])
-> Đây là các đặc trưng mới, bất biến với phép quay cho mỗi nguyên tử.

Vector cập nhật tọa độ (coord_update): torch.Size([21, 3])
-> Đây là vector 'dịch chuyển' dự đoán cho mỗi nguyên tử.
Ví dụ 5 vector dịch chuyển đầu tiên:
 tensor([[ 3.6425e-04, -3.2480e-05,  7.2418e-04],
        [ 4.7803e-04, -7.9595e-05,  8.2186e-04],
        [ 1.7049e-04, -1.7320e-04, -2.1392e-04],
        [ 4.7902e-05, -5.5986e-05, -6.3559e-05],
        [ 1.7706e-04,  9.1116e-05,  7.1044e-04]], grad_fn=<SliceBackward0>)


In [22]:
print("\n--- Bước 5.5: Cập nhật Tọa độ Cuối cùng ---")

# Cập nhật tọa độ
x_out = x + coord_update

print(f"Tọa độ ban đầu (x) ví dụ:\n{x[:3]}")
print(f"\nTọa độ cuối cùng (x_out) ví dụ:\n{x_out[:3]}")
print("\n-> Tọa độ mới được tính bằng cách cộng vector dịch chuyển vào tọa độ cũ.")


--- Bước 5.5: Cập nhật Tọa độ Cuối cùng ---
Tọa độ ban đầu (x) ví dụ:
tensor([[-11.9680, -17.0760, -37.0360],
        [-11.2470, -16.0990, -36.3130],
        [-12.2480, -16.8710, -38.3750]])

Tọa độ cuối cùng (x_out) ví dụ:
tensor([[-11.9676, -17.0760, -37.0353],
        [-11.2465, -16.0991, -36.3122],
        [-12.2478, -16.8712, -38.3752]], grad_fn=<SliceBackward0>)

-> Tọa độ mới được tính bằng cách cộng vector dịch chuyển vào tọa độ cũ.


In [24]:
import py3Dmol
from rdkit import Chem
from rdkit.Chem import Draw
from rdkit.Chem import AllChem

# Đường dẫn đến file SDF của bạn
sdf_path = r"data/1B57_HUMAN_25_300_0/3upr_C_rec_3upr_1kx_lig_tt_docked_3.sdf"

# Sử dụng RDKit để tải phân tử trực tiếp từ file SDF
# removeHs=False để giữ lại nguyên tử Hydro nếu cần, True để loại bỏ
# Đây là cách chính xác nhất để đảm bảo có đủ thông tin (nguyên tử, tọa độ, liên kết)
supplier = Chem.SDMolSupplier(sdf_path, removeHs=False)
rdkit_mol = supplier[0] # Lấy phân tử đầu tiên trong file

def mol_with_new_coords_to_pdb_block(mol, coords):
    mol_copy = Chem.Mol(mol)
    mol_copy.RemoveAllConformers()
    conformer = Chem.Conformer(mol.GetNumAtoms())
    for i in range(mol.GetNumAtoms()):
        x_coord, y_coord, z_coord = coords[i]
        conformer.SetAtomPosition(i, (float(x_coord), float(y_coord), float(z_coord)))
    mol_copy.AddConformer(conformer, assignId=True)
    return AllChem.MolToPDBBlock(mol_copy)

pdb_initial = mol_with_new_coords_to_pdb_block(rdkit_mol, x)
pdb_final = mol_with_new_coords_to_pdb_block(rdkit_mol, x_out)

view = py3Dmol.view(width=700, height=500)
view.addModel(pdb_initial, 'pdb')
view.setStyle({'model': 0}, {'stick': {'colorscheme': 'default', 'radius': 0.15}})
view.addLabel('Initial (Stick)', {'position': {'x': -20, 'y': 15, 'z': 0}, 'backgroundColor': 'white', 'fontColor': 'black'})
view.addModel(pdb_final, 'pdb')
view.setStyle({'model': 1}, {'line': {'colorscheme': 'lightgrey', 'linewidth': 3.0}})
view.addLabel('Final (Line)', {'position': {'x': -20, 'y': -15, 'z': 0}, 'backgroundColor': '#D3D3D3', 'fontColor': 'black'})

view.zoomTo()
view.show()

In [25]:
import py3Dmol


from rdkit import Chem
from rdkit.Chem import AllChem

def mol_with_new_coords_to_pdb_block(mol, coords):
    """Tạo chuỗi PDB từ RDKit mol và tensor tọa độ mới."""
    mol_copy = Chem.Mol(mol)
    mol_copy.RemoveAllConformers()
    
    conformer = Chem.Conformer(mol.GetNumAtoms())
    for i in range(mol.GetNumAtoms()):
        # Đảm bảo chuyển đổi sang kiểu float của Python
        x_coord, y_coord, z_coord = map(float, coords[i].detach().cpu().numpy())
        conformer.SetAtomPosition(i, (x_coord, y_coord, z_coord))
    
    mol_copy.AddConformer(conformer, assignId=True)
    return AllChem.MolToPDBBlock(mol_copy)

# --- Tạo PDB cho cả hai trạng thái ---

# 1. Cấu trúc TRƯỚC khi qua EGNN (sử dụng tensor 'x')
pdb_initial = mol_with_new_coords_to_pdb_block(rdkit_mol, x)

# 2. Cấu trúc SAU khi qua EGNN (sử dụng tensor 'x_out')
pdb_final = mol_with_new_coords_to_pdb_block(rdkit_mol, x_out)


# --- Thiết lập trình hiển thị 3D ---
view = py3Dmol.view(width=800, height=500)

# Hiển thị cấu trúc BAN ĐẦU (dạng que)
view.addModel(pdb_initial, 'pdb')
view.setStyle({'model': 0}, {'stick': {'colorscheme': 'default', 'radius': 0.15}})
view.addLabel('Initial Structure (Before EGNN)', 
              {'position': {'x': -20, 'y': 15, 'z': 0}, 
               'backgroundColor': 'white', 'fontColor': 'black', 'fontSize': 12})

# Hiển thị cấu trúc CUỐI CÙNG (dạng đường kẻ)
view.addModel(pdb_final, 'pdb')
view.setStyle({'model': 1}, {'line': {'colorscheme': 'lightgrey', 'linewidth': 3.5}})
view.addLabel('Final Structure (After EGNN)', 
              {'position': {'x': -20, 'y': -15, 'z': 0}, 
               'backgroundColor': '#D3D3D3', 'fontColor': 'black', 'fontSize': 12})

view.zoomTo()
view.show()