-
Notifications
You must be signed in to change notification settings - Fork 1
/
cal_ifg_atom.py
168 lines (151 loc) · 5.27 KB
/
cal_ifg_atom.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#
# Original authors: Richard Hall and Guillaume Godin
# This file is part of the RDKit.
# The contents are covered by the terms of the BSD license
# which is included in the file license.txt, found at the root
# of the RDKit source tree.
#
#
# Richard hall 2017
# IFG main code
# Guillaume Godin 2017
# refine output function
# astex_ifg: identify functional groups a la Ertl, J. Cheminform (2017) 9:36
from rdkit import Chem
from collections import namedtuple
import rdkit
import numpy as np
import pandas as pd
from functools import partial
from multiprocessing import Pool
import pandas as pd
from tqdm.auto import tqdm
import pathlib
import argparse
def get_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
'--model', '-m',
type=str, default='aae',
help='model name : aae vae latentgan reinvent organ char_rnn'
)
parser.add_argument(
'--epoch', '-e',
type=int, default=100,
help='epoch that SMILES was sampled'
)
parser.add_argument(
'--n_jobs', '-n', type=int, default=40,
help='number of processes to use'
)
parser.add_argument(
'--input_file','-i', type=str,
default='./',
help='path to sampled files'
)
parser.add_argument(
'--output', '-o', type=str,
default='./SampledDatasetCano.csv',
help='path to save files'
)
parser.add_argument(
'--sample_id', '-si', type=int,
default='0',
help='Index of sampling'
)
return parser
def merge(mol, marked, aset):
bset = set()
for idx in aset:
atom = mol.GetAtomWithIdx(idx)
for nbr in atom.GetNeighbors():
jdx = nbr.GetIdx()
if jdx in marked:
marked.remove(jdx)
bset.add(jdx)
if not bset:
return
merge(mol, marked, bset)
aset.update(bset)
# atoms connected by non-aromatic double or triple bond to any heteroatom
# c=O should not match (see fig1, box 15). I think using A instead of * should sort that out?
PATT_DOUBLE_TRIPLE = Chem.MolFromSmarts('A=,#[!#6]')
# atoms in non aromatic carbon-carbon double or triple bonds
PATT_CC_DOUBLE_TRIPLE = Chem.MolFromSmarts('C=,#C')
# acetal carbons, i.e. sp3 carbons connected to tow or more oxygens, nitrogens or sulfurs; these O, N or S atoms must have only single bonds
PATT_ACETAL = Chem.MolFromSmarts('[CX4](-[O,N,S])-[O,N,S]')
# all atoms in oxirane, aziridine and thiirane rings
PATT_OXIRANE_ETC = Chem.MolFromSmarts('[O,N,S]1CC1')
PATT_TUPLE = (PATT_DOUBLE_TRIPLE, PATT_CC_DOUBLE_TRIPLE, PATT_ACETAL, PATT_OXIRANE_ETC)
def identify_functional_groups(smi):
try:
mol = Chem.MolFromSmiles(smi)
marked = set()
#mark all heteroatoms in a molecule, including halogens
for atom in mol.GetAtoms():
if atom.GetAtomicNum() not in (6,1): # would we ever have hydrogen?
marked.add(atom.GetIdx())
#mark the four specific types of carbon atom
for patt in PATT_TUPLE:
for path in mol.GetSubstructMatches(patt):
for atomindex in path:
marked.add(atomindex)
#merge all connected marked atoms to a single FG
groups = []
while marked:
grp = set([marked.pop()])
merge(mol, marked, grp)
groups.append(grp)
#extract also connected unmarked carbon atoms
ifg = namedtuple('IFG', ['atomIds', 'atoms', 'type'])
ifgs = []
for g in groups:
uca = set()
for atomidx in g:
for n in mol.GetAtomWithIdx(atomidx).GetNeighbors():
if n.GetAtomicNum() == 6:
uca.add(n.GetIdx())
#ifgs.append(ifg(atomIds=tuple(list(g)), atoms=Chem.MolFragmentToSmiles(mol, g, canonical=True), type=Chem.MolFragmentToSmiles(mol, g.union(uca),canonical=True)))
ifgs.append(Chem.MolFragmentToSmiles(mol, g, canonical=True))
return ifgs
except:
return None
def CollectFG(smis, n_jobs):
ifgs = []
with Pool(n_jobs) as pool:
identify_functional_groups_p = partial(identify_functional_groups)
ifgs = [x for x in tqdm(
pool.imap_unordered(identify_functional_groups_p, smis),
total=len(smis),
miniters=1000
)
if x is not None
]
ifg_1col = []
for i in ifgs:
for j in i:
ifg_1col.append(j)
return ifg_1col
def main(config):
#model = config.model
# epoch = config.epoch
# sample_id = config.sample_id
n_jobs = config.n_jobs
#sample_id = config.sample_id
# output = config.output
# file = '%s_%d_combine.csv' % (model, sample_id)
# file = 'gdb13_purged.csv'
file = config.input_file+'.csv'
mols = pd.read_csv(file, names=['SMILES','model'])
print('%s was loaded!\n'%(file))
smis = list(mols['SMILES'])
print('smis was retrieved')
ifgs = CollectFG(smis, n_jobs)
ifgs = pd.DataFrame(ifgs, columns=['ifgs'])
ifgs.drop_duplicates('ifgs', inplace=True)
# pd_ifg.to_csv('%s_%d_ifg.csv' % (model, sample_id), index=None)
ifgs.to_csv(config.input_file+'_ifgs_atom.csv', index=None)
if __name__ == "__main__":
parser = get_parser()
config, unknown = parser.parse_known_args()
main(config)