In [1]:
import math
import numpy as np
from numpy import linalg as la
from scipy.optimize import linprog
from multiprocessing import Pool
from typing import Callable, Dict, List, FrozenSet, Tuple
from concurrent.futures import ProcessPoolExecutor

In [2]:
import matplotlib.pyplot as plt
import os
import timeit
import networkx as nx
from Bio.PDB import PDBParser
import pandas as pd
import faiss
import surface
from scipy.spatial import distance_matrix
from rdkit import Chem
import py3Dmol
from rdkit.Chem import AllChem


In [None]:
class Bound (object):

    def __init__(self, normal, bias, neighbor):
        self.normal = normal
        self.bias = bias
        self.neighbor = neighbor

    def valid(self, x):
        return self.normal.dot(x) <= self.bias
    
    def neg(self):
        return Bound(-self.normal, -self.bias)


class Surface (object):

    def __init__(self, center, radius, external):
        self.center = center
        self.radius = radius
        self.bounds = []
        self.feasible = True
        self.external = external

    def add_bound(self, bound : Bound):
        self.bounds.append(bound)                            # исправила ошибку bounds

    # def resolve(self):
    #     self.feasible = self.feasible and \
    #         linprog([0,0,0],
    #                 A_ub=[x.normal for x in self.bounds],
    #                 b_ub=[x.bias for x in self.bounds]).success
    
    # изменила функцию
    def resolve(self):
        if not self.bounds:
            self.feasible = False
            return

        A_ub = np.array([x.normal for x in self.bounds])
        b_ub = np.array([x.bias for x in self.bounds])

        if A_ub.ndim == 1:  # если вдруг получилось (N,), а не (N,3)
            A_ub = A_ub.reshape(-1, 3)

        self.feasible = self.feasible and linprog(
            c=np.zeros(A_ub.shape[1]),  # размерность c теперь соответствует числу столбцов в A_ub
            A_ub=A_ub,
            b_ub=b_ub
        ).success

    def valid(self, x):
        return all (bound.valid(x) for bound in self.bounds)
        
    def filter(self, points, normals):
        d = 1 if self.external else -1
        return [(p, d * n) for p, n in zip(points, normals) if self.valid(p)]
        

class Sphere (Surface):

    def generate_points(self, point_area):
        n = 1 + int(np.sqrt(4 * np.pi * (self.radius ** 2) / point_area - 1))

        u = np.linspace(0, 1, n)  
        v = np.linspace(0, 1, n)

        u_grid, v_grid = np.meshgrid(u, v)

        u_spher = u_grid * 2 * np.pi  
        v_spher = np.arcsin(2 * v_grid - 1)

        normals = np.array([np.cos(v_spher) * np.cos(u_spher),
                            np.cos(v_spher) * np.sin(u_spher),
                            np.sin(v_spher)])
        points = self.center + self.radius * normals

        return list(zip(points.reshape(3, -1).T, normals.reshape(3, -1).T))
        # return filter(points, normals)


class Atom (Sphere):

    def __init__(self, center, radius):
        super().__init__(center, radius, True)  # исправлено
        # super(Sphere, self).__init__(center, radius, True)
        self.dist = la.norm(self.center) ** 2
        self.bias = self.dist - self.radius ** 2
        

class Probe (Sphere):

    def __init__(self, center, radius):
        super().__init__(center, radius, False)  # исправлено
        # super(Sphere, self).__init__(center, radius, False)


class Torus (Surface):

    def __init__(self, center, normal, R, r):
        super().__init__(center, R, False)  # исправлено
        # super(Sphere, self).__init__(center, R, False)
        self.normal = normal
        self.r = r

        x = - self.normal[1] / np.sqrt(self.normal[0] ** 2 + self.normal[1] ** 2)
        y = self.normal[0] / np.sqrt(self.normal[0] ** 2 + self.normal[1] ** 2)
        t = np.arccos(self.normal[2] / la.norm(self.normal))

        self.rotate_matrix = \
            (1 - np.cos(t)) * np.array([[x ** 2, x * y,  0],
                                        [x * y,  y ** 2, 0],
                                        [0,      0,      0]]) + \
            np.sin(t) * np.array([[ 0, 0,  y],
                                  [ 0, 0, -x],
                                  [-y, x,  0]]) + \
            np.cos(t) * np.identity(3)


    def generate_points(self, point_area):
        n = 1 + int(np.sqrt(2 * (np.pi ** 2) * self.radius * self.r / point_area - 1))

        u = np.linspace(0, 1, n)  
        v = np.linspace(-1, 1, n)  
        u, v = np.meshgrid(u, v)

        u_spher = u * 2 * np.pi
        v_spher = np.arcsin(v - np.sign(v)) + np.pi + np.sign(v) * np.pi / 2

        normals = self.rotate_matrix.dot([np.cos(v_spher) * np.cos(u_spher),
                                          np.cos(v_spher) * np.sin(u_spher),
                                          np.sin(v_spher)])
        points = self.center + self.r * normals + \
            self.radius * self.rotate_matrix.dot([np.cos(u_spher), np.sin(u_spher), 0])

        # return filter(points, normals)
        return list(zip(points.reshape(3, -1).T, normals.reshape(3, -1).T))

    



def resolve(arr, surf, dead = None):
    if dead is None:
        for s in arr:
            s.resolve()
            if s.feasible:
                surf.append(s)
    else:
        for i, s in enumerate(arr):
            s.resolve()
            if s.feasible:
                surf.append(s)
            else:
                dead.append(i)


def find_toroidal_fragments(
        atoms : List[Atom], 
        probe_radius : float) \
            -> Dict[int, Dict[int, Torus]]:
    
    torus_map = {}
    for i, a in enumerate(atoms):
        if i not in torus_map: torus_map[i] = {}
        torus_a = torus_map[i]

        for j, b in enumerate(atoms[i+1:]):
            normal = a.center - b.center          # было normal = a - b
            normal_size = la.norm(normal)       # было t.normal ?
            if normal_size >= a.radius + b.radius + 2 * probe_radius:
                continue

            alpha = (1 - (a.radius - b.radius) \
                         * (a.radius + b.radius + 2 * probe_radius) \
                         / normal_size ** 2) / 2
            center = b.center - alpha * normal   # было center = b - alpha * normal
            radius = (b.radius + probe_radius) ** 2 - (alpha * normal_size) ** 2

            # исправила
            a_point = np.array([[probe_radius], [a.radius]]) / (probe_radius + a.radius)

            print("normal:", normal.shape)
            print("a.center - center:", (a.center - center).shape)
            print("a_point:", a_point.shape)


            up_bias = np.linalg.multi_dot([normal, a.center - center, a_point])
            # up_bias = np.linalg.multi_dot([normal, a, center, a_point])
            b_point = np.array([[probe_radius], [b.radius]]) / (probe_radius + b.radius)
            down_bias = np.linalg.multi_dot([normal, b.center - center, b_point])
            # down_bias = np.linalg.multi_dot([normal, b, center, b_point])






            # было
            # with [probe_radius, a.radius] / (probe_radius + a.radius) as a_point:
            #     up_bias = np.multi_dot(normal, [a, center], a_point)
            # with [probe_radius, b.radius] / (probe_radius + b.radius) as b_point:
            #     down_bias = np.multi_dot(normal, np.array([b, center]), b_point)

            t = Torus(center, radius, normal)

            with Bound(normal, up_bias) as bound:
                t.add_bound(bound)
                a.add_bound(bound.neg())

            with Bound(normal, down_bias) as bound:
                t.add_bound(bound.neg())
                b.add_bound(bound)
                
            if up_bias <= down_bias:
                continue

            torus_a[j + i + 1] = t
    
    return torus_map


def add_atom_torus_bound(a : Atom, t : Torus, p : Probe):
    with np.cross(t.normal, p.center - t.center) as normal:
        bound = Bound(normal, normal.dot(p.center))
        if bound.valid(a):
            p.add_bound(bound)
            t.add_bound(bound.neg())
        else:
            p.add_bound(bound.neg())
            t.add_bound(bound)


def find_probe_fragments(
        atoms : List[Atom], 
        torus_map : Dict[int, Dict[int, Torus]]) \
            -> Dict[int, Dict[int, Torus]]:
    
    probe_map = {}
    for i in torus_map:
        a = atoms[i]
        for j in torus_map[i]:
            b = atoms[j]
            t_ab = torus_map[i][j]
            for k in torus_map[j]:
                c = atoms[k]
                t_ac = torus_map[i][k]
                t_bc = torus_map[j][k]

                # M = np.array(t_ab.normal, t_ac.normal, t_bc.normal)
                M = np.array([t_ab.normal, t_ac.normal, t_bc.normal])

                assert la.matrix_rank(M) == 3

                with la.solve(M, np.array([t_ab.bias, t_ac.bias, t_bc.bias])) as x:
                    if x in probe_map:
                        p = probe_map[x]
                    else:
                        p = Probe(x)
                        probe_map[x] = p

                add_atom_torus_bound(a, t_bc, p)
                add_atom_torus_bound(b, t_ac, p)
                add_atom_torus_bound(c, t_ab, p)

    return probe_map

# добавила новый класс, т.к. lambda нельзя сериализовать 
class Resolver:
    def __init__(self, res, dead_atoms):
        self.res = res
        self.dead_atoms = dead_atoms

    def resolve_task(self, args):
        i, x = args
        resolve(x, self.res[i], self.dead_atoms)


def find_ses_fragments(
        atoms : List[Atom], 
        probe_radius : float,
        jobs_num : int) -> List[Surface]:
    
    res = np.array([ np.array([]) for _ in range(jobs_num) ])

    torus_map = find_toroidal_fragments(atoms, probe_radius)

    
    dead_atoms = []
    resolver = Resolver(res, dead_atoms)

    with Pool(jobs_num) as p:
        p.map(resolver.resolve_task, enumerate(np.array_split(atoms, jobs_num)), chunksize=1)

    # with Pool(jobs_num) as p:
    #     p.map(lambda i,x: resolve(x, res[i], dead_atoms), 
    #           enumerate(np.array_split(atoms, jobs_num)), chunksize=1) # было np.split(atoms, jobs_num)
        
    tori = []
    for i in dead_atoms: del torus_map[i]
    for i in torus_map:
        for j in dead_atoms: torus_map[i].pop(j, None)
        tori.extend(torus_map[i].values())

    probe_map = find_probe_fragments(atoms, torus_map)

    resolver = Resolver(res, None)  # Второй вызов без dead_atoms
    with Pool(jobs_num) as p:
        p.map(resolver.resolve_task, enumerate(np.array_split(tori, jobs_num)), chunksize=1)

    with Pool(jobs_num) as p:
        p.map(resolver.resolve_task, enumerate(np.array_split(list(probe_map.values()), jobs_num)), chunksize=1)
    
    # with Pool(jobs_num) as p:
    #     p.map(lambda i,x: resolve(x, res[i], None), 
    #           enumerate(np.split(tori, jobs_num)), chunksize=1)
    
    # with Pool(jobs_num) as p:
    #     p.map(lambda i,x: resolve(x, res[i], None), 
    #           enumerate(np.split(probe_map.values(), jobs_num)), chunksize=1)

    return np.concatenate(res)

def generate_ses_points(coords, radii, probe_radius, point_area, jobs_num):
    atoms = [ Atom(c,r) for c,r in zip(coords, radii) ]
    fragments = find_ses_fragments(atoms, probe_radius, jobs_num)

    res = np.array([ np.array([]) for _ in range(jobs_num) ])
    with Pool(jobs_num) as p:
        p.map(lambda i,x: [ res[i].extend(f.generate_points(point_area)) for f in x ], 
              enumerate(np.split(fragments, jobs_num)), chunksize=1)
        
    return np.concatenate(res)


In [22]:
molecules_npydir = "/auto/datasets/npi/raw/01-benchmark_surfaces_npy"

# Получаем список всех файлов
files = os.listdir(molecules_npydir)

# Оставляем только файлы .npy
npy_files = [f for f in files if f.endswith(".npy")]

print("Найденные файлы .npy:", npy_files)


Найденные файлы .npy: ['4kze_R_atomxyz.npy', '2a8v_DE_atomtypes.npy', '1mw8_X_resnames.npy', '1le5_CD_atomtypes.npy', '1dux_CF_atomtypes.npy', '5dfi_A_resnames.npy', '3rmc_ABCD_resnames.npy', '5eev_W_atomxyz.npy', '4b5g_UV_atomxyz.npy', '2anr_B_resnames.npy', '6xeo_A_resnames.npy', '7jl2_ABCDEF_resnames.npy', '6ywo_ABCD_atomxyz.npy', '5f8h_BC_resnames.npy', '2wtf_OPST_atomtypes.npy', '1nyb_A_resnames.npy', '3kde_C_resnames.npy', '1vbz_A_atomtypes.npy', '6s3n_A_atomtypes.npy', '6kdb_AD_resnames.npy', '3q8r_TP_resnames.npy', '1x9m_CD_atomtypes.npy', '6ml5_EF_atomxyz.npy', '5o63_AB_resnames.npy', '6y93_CD_atomxyz.npy', '2qfj_AB_resnames.npy', '1j9n_AB_atomtypes.npy', '4ed6_PT_atomtypes.npy', '3s57_BC_resnames.npy', '2o5c_CD_atomtypes.npy', '4i2c_A_atomxyz.npy', '5oik_P_atomtypes.npy', '6ler_ABCDEFGH_atomtypes.npy', '3oor_A_resnames.npy', '6bte_A_resnames.npy', '5w1i_AC_atomtypes.npy', '7c4p_CDEF_resnames.npy', '4nca_CDG_atomtypes.npy', '1xpo_ABCDEF_resnames.npy', '2ibk_DE_atomtypes.npy', 

In [23]:
RADIUS = {}
with open("atomtype.txt") as f:
    for line in f.readlines()[1:]:
        atom = line.split()[0]
        RADIUS[atom] = line.split()[-2]

In [24]:
name_x = "1xvr_DE"
molecules_npydir = "/auto/datasets/npi/raw/01-benchmark_surfaces_npy"
coords = np.load(f"{molecules_npydir}/{name_x}_atomxyz.npy")
types = np.load(f"{molecules_npydir}/{name_x}_atomtypes.npy")
radii = np.array(list(map(lambda x: RADIUS[x], types)), dtype = float)
start_time = timeit.default_timer()
points, atoms_ids = surface.points_with_atomsid(coords, radii, additional_rad = 10)
time = timeit.default_timer() - start_time
print("Total time =", time)
print("Atoms on surface =", len(points))
print(points)
print(atoms_ids)
print("Радиусы атомов молекулы:", radii)
print("Типы атомов в молекуле: ", types.shape)
print("Количество координат молекулы: ", coords.shape)

search_neighbours_time:  0.005007591098546982 4.328121319602483 %
search_points_time:  0.11069136299192905 95.67187868039751 %
Total time = 0.11644427105784416
Atoms on surface = 62
[[ 4.78858528e-01  5.70761432e+01  1.73938580e+01]
 [-1.00790649e-01  4.65278742e+01  2.98957902e+01]
 [ 4.78858528e-01  5.70761432e+01  1.73938580e+01]
 [-2.68986652e-01  5.51874235e+01  1.66460128e+01]
 [ 1.48288340e+01  5.75488324e+01  2.57505286e+01]
 [ 1.48288340e+01  5.75488324e+01  2.57505286e+01]
 [ 1.78170353e+01  5.15125115e+01  2.10552850e+01]
 [ 6.99848229e+00  5.68473258e+01  1.46786752e+01]
 [ 1.49684896e+01  4.83582615e+01  1.47105098e+01]
 [ 6.37319300e+00  5.63924868e+01  1.43215120e+01]
 [ 6.37319300e+00  5.63924868e+01  1.43215120e+01]
 [ 1.41657828e+01  4.37172175e+01  3.03647832e+01]
 [ 1.54479303e+01  5.69389305e+01  2.57117898e+01]
 [ 1.41657828e+01  4.37172175e+01  3.03647832e+01]
 [ 1.02456731e+00  4.76730437e+01  3.15664332e+01]
 [ 1.53784392e+01  4.25045612e+01  2.24610780e+01]
 [

In [25]:
print(type(coords))
print(type(radii))

<class 'numpy.ndarray'>
<class 'numpy.ndarray'>


In [28]:
probe_radius = 1.4  # радиус зонда
point_area = 1  # площадь на одну точку (управляет плотностью точек)
jobs_num = 4  # количество потоков для параллельных вычислений

# Запускаем генерацию точек молекулярной поверхности
ses_points = generate_ses_points(coords, radii, probe_radius, point_area, jobs_num)

# Проверяем результат
print(ses_points.shape)  # Выведет количество точек и их координаты
print(ses_points[:5])  # Вывод первых 5 точек

normal: (3,)
a.center - center: (3,)
a_point: (2, 1)


ValueError: shapes (1,1) and (2,1) not aligned: 1 (dim 1) != 2 (dim 0)