-
Notifications
You must be signed in to change notification settings - Fork 2
/
unused_untested.py
847 lines (693 loc) · 29.8 KB
/
unused_untested.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
from __future__ import print_function
import numpy as _np
import mdtraj as _md
from matplotlib import pyplot as _plt
from matplotlib.widgets import AxesWidget as _AxesWidget
from glob import glob
import os
import tempfile
try:
from sklearn.mixture import GaussianMixture as _GMM
except ImportError:
from sklearn.mixture import GMM as _GMM
try:
from pyemma.util.linalg import eig_corr
except ImportError:
from pyemma._ext.variational.solvers.direct import eig_corr
from pyemma.coordinates import source as _source, featurizer as _featurizer
from scipy.spatial import cKDTree as _cKDTree
def _dictionarize_list(list, input_dict, output_dict = None):
if output_dict is None:
output_dict = {}
for item in list:
output_dict[item] = input_dict[item]
return output_dict
def _correlations2CA_pairs(icorr, geom_sample, corr_cutoff_after_max=.95, feat_type='md.contacts'):
max_corr = _np.abs(icorr).argsort()[::-1]
max_corr = [max_corr[0]]+[mc for mc in max_corr[1:] if icorr[mc]>=corr_cutoff_after_max]
top = geom_sample[0].top
if feat_type == 'md.contacts':
res_pairs = _md.compute_contacts(geom_sample[0][0])[1]
else:
raise NotImplementedError('%s feat type is not implemented (here) yet'%feat_type)
CA_pairs = []
for ii,jj in res_pairs[max_corr]:
CA_pairs.append([top.residue(ii).atom('CA').index,
top.residue(jj).atom('CA').index])
CA_pairs = _np.vstack(CA_pairs)
return CA_pairs, max_corr
def _mdcontacts2CAidxs(geom):
res_pairs = _md.compute_contacts(geom)[1]
CA_pairs = []
for ii, jj in res_pairs:
CA_pairs.append([geom.top.residue(ii).atom('CA').index,
geom.top.residue(jj).atom('CA').index])
CA_pairs = _np.vstack(CA_pairs)
return CA_pairs
def _correlations2CA_pairs(icorr, geom_sample, corr_cutoff_after_max=.95, feat_type='md.contacts'):
max_corr = _np.abs(icorr).argsort()[::-1]
max_corr = [max_corr[0]]+[mc for mc in max_corr[1:] if icorr[mc]>=corr_cutoff_after_max]
top = geom_sample[0].top
if feat_type == 'md.contacts':
res_pairs = _md.compute_contacts(geom_sample[0][0])[1]
else:
raise NotImplementedError('%s feat type is not implemented (here) yet'%feat_type)
CA_pairs = []
for ii,jj in res_pairs[max_corr]:
CA_pairs.append([top.residue(ii).atom('CA').index,
top.residue(jj).atom('CA').index])
CA_pairs = _np.vstack(CA_pairs)
return CA_pairs, max_corr
def _input2output_corr(icov, U):
r""" Equivalent to feature_TIC_correlation of a pyemma-TICA object
"""
feature_sigma = _np.sqrt(_np.diag(icov))
return _np.dot(icov, U) / feature_sigma[:, _np.newaxis]
def _sequential_rmsd_fit(geomin, start_frame=0):
r"""Provided an md.Trajectory object and a starting frame, sequentially (forward and backwars) orient the frames
to maximize the overlap between neihgboring-structures in geomin and return them
"""
fwd_fit = geomin[start_frame]
for geom in geomin[start_frame+1:]:
fwd_fit = fwd_fit.join(geom.superpose(fwd_fit[-1]))
bwd_fit = geomin[start_frame]
for geom in geomin[:start_frame][::-1]:
bwd_fit = bwd_fit.join(geom.superpose(bwd_fit[-1]))
visual_fit = bwd_fit[::-1][:-1].join(fwd_fit)
assert visual_fit.n_frames == geomin.n_frames
return visual_fit
def _opentica_npz(ticanpzfile):
r"""Open a simon-type of ticafile.npz and return some variables
"""
lag_str = os.path.basename(ticanpzfile).replace('tica_','').replace('.npz','')
trajdata = _np.load(ticanpzfile, encoding='latin1')
icov, icovtau = trajdata['tica_cov'], trajdata['tica_cov_tau']
l, U = eig_corr(icov, icovtau)
tica_mean = trajdata['tica_mean']
data = trajdata['projdat']
corr = _input2output_corr(icov, U)
return lag_str, data, corr, tica_mean, l, U
def _find_centers_bimodal(distro, space, barrier=0):
#, geoms, mdfunction, pop=None, barrier=0, fitness='min')
r"""
pop : function assumed to be bimodal
space: space in which the function is bimodal
##given a space, divide it in two halves and find the max_pop on each side of the barrier
"""
assert _np.ndim(distro) == _np.ndim(space) == 1
assert len(space) == len(distro)
n_points = len(space)
# Points to the left of the barrier
left = [ii for ii in range(n_points) if space[ii] < barrier]
# Take the most populated
left = left[_np.argmax([distro[ii] for ii in left])]
# Points to the right of the barrier
right = [ii for ii in range(n_points) if space[ii] > barrier]
# Take the most populated
right = right[_np.argmax([distro[ii] for ii in right])]
return left, right
"""
rg_pos = function(geoms[start_idx_pos]).mean()
rg_neg = function(geoms[start_idx_neg]).mean()
start_idx = [start_idx_neg, start_idx_pos][np.argmin([rg_neg, rg_pos])]
path_smpl, start_idx_2 = visual_path(cat_smpl, geom_smpl, path_type='min_rmsd', start_pos=start_idx, history_aware=True)
Y_path_smpl = np.vstack([idata[ii][jj] for ii,jj in path_smpl])
trajs_path_smpl = pyemma.coordinates.save_traj(src.filenames, path_smpl, None, stride=traj_stride, top=struct)
assert start_idx_2 == start_idx
"""
def _elements_of_list1_in_list1(list1, list2):
tokeep = []
for el1 in list1:#args.target_featurizations:
for el2 in list2:
if el1 in el2:
tokeep.append(el2)
break
return tokeep
def _targets_in_candidates(candidates, targets, verbose=True ):
if verbose:
print('Available:\n', '\n'.join(candidates))
if isinstance(targets, str):
if targets == 'all':
if verbose:
print('All will be kept')
out_list = candidates
else:
targets = list(targets)
if isinstance(targets, list):
out_list = _elements_of_list1_in_list1(targets, candidates)
if out_list == []:
raise ValueError('Your target features do not match any of the available tica_featurizations:\n'
'%s'%('\n'.join(targets)))
else:
if verbose:
print('Kept:\n', '\n'.join(out_list))
return out_list
def _plot_histo_reaction_coord(data, reaction_path, reaction_coord_idx, start_idx=None,
log=False, extra_paths=[], smooth_path = True, type_of_coord='TIC',
contour_alpha = 1.0, n_sigma=1,
**plot_path_kwargs):
# 2D plot with the coordinate of choice as x-axis and the most varying as y-axis
# "visual" coordinates (coords to visualize the 2D plots in)
v_crd_1 = reaction_coord_idx # easy
# Appart from the reaction coordinate, which other coordinates vary the most?
ivar = reaction_path.var(0)
ivar[reaction_coord_idx] = 0
v_crd_2 = _np.argmax(ivar)
h, (x, y) = _np.histogramdd(_np.vstack(data)[:,[v_crd_1, v_crd_2]], bins=100)
if log:
h = -_np.log(h)
_plt.figure()
_plt.contourf(h.T, extent=[x[0], x[-1], y[0], y[-1]], alpha=contour_alpha)
if start_idx is not None:
refs = [reaction_path[start_idx, [v_crd_1, v_crd_2]]]
else:
refs = []
path_list = [reaction_path[:,[v_crd_1, v_crd_2]]]
if extra_paths != []:
path_list += [ipath[:,[v_crd_1, v_crd_2]] for ipath in extra_paths]
plot_path_kwargs['xlabel'] = '%s_%u'%(type_of_coord,v_crd_1)
plot_path_kwargs['ylabel'] = '%s %u'%(type_of_coord,v_crd_2)
plot_paths(path_list, refs=refs, ax=_plt.gca(), **plot_path_kwargs)
if smooth_path:
# "Some" heuristc to arrive at nice looking trajectory
#dp = np.sqrt(np.sum(np.diff(Y_path, 0)**2, 1)) # displacements along the path
#d_path = pyemma.coordinates.cluster_regspace(Y_path, dmin=2*dp.mean()).dtrajs[0]
#largest_con_set = np.bincount(d_path).argmax()
#compact_path = np.argwhere(d_path==largest_con_set).squeeze()
compact_paths = [_np.argwhere(_np.abs(path[:,1]-path[:, 1].mean())<n_sigma*path[:,1].std()).squeeze() for path in path_list]
path_list = [path[cp] for cp, path in zip(compact_paths, path_list)]
path_labels = [lbl+' (%u sigma)'%n_sigma for lbl in plot_path_kwargs['path_labels']]
iax = _plt.gca()
iax.set_prop_cycle(None)
#iax.set_color_cycle(None)
for pp, ll in zip(path_list, path_labels):
iax.plot(pp[:,0], pp[:,1], ' o', markeredgecolor='none', label=ll)
iax.legend(loc='best')
#plot_paths(path_list, refs=refs, ax=_plt.gca(), **plot_path_kwargs)
# Prepare a dictionary with plot-specific stuff
return compact_paths, dictionarize_list(['h', 'x', 'y', 'v_crd_1', 'v_crd_2', 'log'], locals())
def _plot_paths(path_list, path_labels=[],
ax=None, refs=[], backdrop=True, legend=True, markers=None, linestyles=None, alpha=1,
xlabel='x', ylabel='y'):
if ax is None:
_plt.figure()
else:
_plt.sca(ax)
if path_labels == []:
path_labels = ['path_%u'%u for u in range(len(path_list))]
if markers is None:
markers = ' '
if isinstance(markers, str) and len(markers)==1:
markers = [markers] * len(path_list)
if linestyles is None:
linestyles = ' '
if isinstance(linestyles, str):
linestyles = [linestyles] *len(path_list)
for ipath, ilabel, imark, ils in zip(path_list, path_labels, markers, linestyles):
_plt.plot(ipath[:,0], ipath[:,1], marker=imark, linestyle = ils, label=ilabel, alpha=alpha)
for iref in refs:
_plt.plot(iref[0], iref[1], 'ok', zorder=10)
if backdrop:
_plt.hlines(0, *_plt.gca().get_xlim(), color='k', linestyle='--')
_plt.vlines(0, *_plt.gca().get_ylim(), color='k', linestyle='--')
if legend:
_plt.legend(loc='best')
_plt.xlabel(xlabel)
_plt.ylabel(ylabel)
def _extract_visual_fnamez(fnamez, path_type, keys=['x','y','h',
'v_crd_1', 'v_crd_2'
]
):
r"""
Once the project has been visualized, extract key visual parameters
"""
# Load stuff to the namespace
a = _np.load(fnamez)
if path_type == 'min_rmsd':
data = a['Y_path_smpl']
selection = a['compact_path_sample']
elif path_type == 'min_disp':
data = a['Y_path']
selection = a['compact_path']
else:
raise ValueError("What type of path is %"%path_type)
return [data, selection]+[a[key] for key in keys]
def _customvmd(structure,
vmdout='custom.vmd',
rep = 'lines',
frames = None,
smooth = None,
distcolor = 'red',
molcolor = 'name',
atoms = None,
atomrep = 'VDW 1. 12.',
atompairs=None,
residues = None,
rescolor = 'name',
trajfile=None,
strfilextra = None,
trajfilextra = None,
atompairsextra = None,
white_stage = True,
freetext = None,
**mdtraj_save_kwargs):
lines = []
lappend = lines.append
if white_stage:
[lappend(l+'\n') for l in vmd_stage().split('\n')] #bad! code
if isinstance(structure, _md.Trajectory):
raise NotImplementedError
"""
strfile = 'temp'+_path.splitext(vmdout)[0]+'.%u.pdb'%_np.random.randint(1e8)
lappend('set outfile [open "%s" w]\n'%strfile)
for line in str.splitlines(structure.save_pdb(_StringIO(), **mdtraj_save_kwargs)):
lappend('puts $outfile "%s"\n'%line)
lappend("close $outfile\n")
lappend('mol new %s waitfor all\n'%strfile)
lappend('rm %s\n'%strfile)
"""
elif isinstance(structure, str):
strfile = structure
lappend('mol new %s waitfor all\n'%strfile)
else:
raise Exception
lappend('mol delrep 0 top\n')
lappend('mol representation %s \n'%rep)
lappend('mol color %s\n'%molcolor)
lappend('mol addrep top\n')
if smooth is not None:
lappend('mol smoothrep top 0 %s\n'%smooth)
if frames is not None:
frames = '{%s}'%', '.join(['%u'%ii for ii in frames])
lappend('mol drawframes top 0 %s\n'%frames)
lappend('label textsize 0.000001\n')
lappend('color Labels {Bonds} %s\n'%distcolor)
#Cycle through the atoms
if atoms is not None:
atoms = _np.unique(atoms)
if _np.size(atoms)==1:
atoms = [atoms]
atomsel='%u '*len(atoms)%(tuple(atoms))
myline='mol representation %s\n'%atomrep
lappend(myline)
myline='mol selection {index %s}\n'%atomsel
lappend(myline)
myline='mol addrep top\n'
lappend(myline)
#Cycle through residues
if residues is not None:
assert _is_iterable_of_int(residues)
residsel=' '.join([str(rr) for rr in residues])
myline='mol representation Licorice .3 10. 10.\n'
lappend(myline)
myline='mol color %s\n'%rescolor
lappend(myline)
myline='mol selection {residue %s}\n'%residsel
lappend(myline)
myline='mol addrep top\n'
lappend(myline)
# Cycle through the pairs
if atompairs is not None:
atompairs = atompairs.flatten().reshape(-1,2)
atompairs = _unique_redundant_pairlist(atompairs)
for jj in atompairs:
myline='label add Bonds 0/%u 0/%u \n'%(jj[0],jj[1])
lappend(myline)
# Add a trajectory
if trajfile is not None:
myline='mol addfile %s waitfor all\n'%trajfile
lappend(myline)
# Add second structre
if strfilextra is not None:
myline='mol new %s waitfor all\n'%strfilextra
lappend(myline)
lappend('mol delrep 0 top\n')
lappend('mol representation %s \n'%rep)
lappend('mol color %s\n'%molcolor)
lappend('mol addrep top\n')
# Add second trajectory
if trajfilextra is not None:
myline='mol addfile %s waitfor all\n'%trajfilextra
lappend(myline)
# Add second labels
if atompairsextra is not None:
for jj in atompairsextra:
myline='label add Bonds 1/%u 1/%u \n'%(jj[0],jj[1])
lappend(myline)
# Add free text at the end if wanted
if freetext is not None:
lappend(freetext)
if vmdout is None:
return lines
else:
f = open(vmdout,'w')
for line in lines:
f.write(line)
f.close()
from re import split
def _sort_nicely( l ):
""" Sort the given list in the way that humans expect.
https://blog.codinghorror.com/sorting-for-humans-natural-sort-order/
"""
convert = lambda text: int(text) if text.isdigit() else text
alphanum_key = lambda key: [ convert(c) for c in split('([0-9]+)', key) ]
l.sort( key=alphanum_key )
return l
def _fnamez2dict(fnamez, add_geometries=True):
import mdtraj as _md
project_dict = {}
for key, value in _np.load(fnamez).items():
project_dict[key] = value
if add_geometries:
# Load geometry
for path_type in ['min_rmsd', 'min_disp']:
project_dict['geom_'+path_type]= _md.load(fnamez.replace('.npz','.%s.pdb'%path_type))
return project_dict
def _vmd_stage(background='white',
axes=False,
):
mystr = 'color Display Background %s\n'%background
if not axes:
mystr += 'axes location off\n'
mystr += '\n'
return mystr
def _src_in_this_proj(proj, mdtraj_dir,
dirstartswith='DESRES-Trajectory_',
strfile_fmt = '%s-%u-protein',
ext='dcd',
starting_idx = 0, ## to deal with some dir-structure, unclean solution by now,
struct = None,
):
xtcs = []
ii = starting_idx
tocheck = os.path.join(mdtraj_dir, dirstartswith+proj+'*')
assert len(glob(tocheck)) != 0,("globbing for %s yields an empty list"%tocheck)
tocheck = sorted(glob(tocheck))
if isinstance(tocheck, str):
tocheck = [tocheck]
struct_already = False
for __, idir in enumerate(tocheck):
if not idir.endswith('tar.gz'):
subdir = os.path.join(idir,strfile_fmt%(proj, ii))
these_trajs = os.path.join(subdir,'*'+ext)
assert len(glob(these_trajs)) != 0,("globbing for %s yields an empty list"%these_trajs)
these_trajs = sorted(glob(these_trajs))
xtcs.append(these_trajs)
if struct is None:
struct = '%s-%u-protein.pdb'%(proj,ii)
struct_already = True
struct = os.path.join(subdir, struct)
struct = sorted(glob(struct))
elif isinstance(struct, str) and not struct_already:
struct = os.path.join(subdir,struct)
struct = sorted(glob(struct))
if isinstance(struct,list):
struct=struct[0]
ii += 1
src = _source(xtcs, top=struct)
return src, xtcs
def _myflush(pipe, istr='#', size=1e4):
pipe.write(''.join([istr+'\n\n' for ii in range(int(size))]))
def _link_ax_w_pos_2_vmd(ax, pos, geoms, **customVMD_kwargs):
r"""
Initial idea and key VMD-interface for this function comes from @fabian-paul
#TODO: CLEAN THE TEMPFILE
"""
# Prepare tempdir
tmpdir = tempfile.mkdtemp('vmd_interface')
print("please remember to: rm -r %s"%tmpdir)
# Prepare files
topfile = os.path.join(tmpdir,'top.pdb')
trjfile = os.path.join(tmpdir,'trj.xtc')
geoms[0].save(topfile)
geoms[1:].superpose(geoms[0]).save(trjfile)
# Create pipe
pipefile = os.path.join(tmpdir,'vmd_cmds.tmp.vmd')
os.mkfifo(pipefile)
os.system("vmd < %s & "%pipefile)
vmdpipe = open(pipefile,'w')
[vmdpipe.write(l) for l in customvmd(topfile, trajfile=trjfile, vmdout=None,
**customVMD_kwargs)]
myflush(vmdpipe)
kdtree = _cKDTree(pos)
x, y = pos.T
lineh = ax.axhline(ax.get_ybound()[0], c="black", ls='--')
linev = ax.axvline(ax.get_xbound()[0], c="black", ls='--')
dot, = ax.plot(pos[0,0],pos[0,1], 'o', c='red', ms=7)
def _onclick(event):
linev.set_xdata((event.xdata, event.xdata))
lineh.set_ydata((event.ydata, event.ydata))
data = [event.xdata, event.ydata]
_, index = kdtree.query(x=data, k=1)
dot.set_xdata((x[index]))
dot.set_ydata((y[index]))
vmdpipe.write(" animate goto %u;\nlist;\n\n"%index)
#myflush(vmdpipe,
#size=1e4
# )
# Connect axes to widget
axes_widget = _AxesWidget(ax)
axes_widget.connect_event('button_release_event', onclick)
return vmdpipe
# Do I really need this? Don't think so
def _fnamez(fname,
path_type='min_rmsd',
proj_type='TIC',
only_selection=True,
selection_label='within 1 sigma',
figsize=(10,10)
):
proj_name = _basename(fname)[:_basename(fname).find(proj_type)].strip('.')
data, selection, x, y, h, v_crd_1, v_crd_2 = _extract_visual_fnamez(fname, path_type)
data = data[:,[v_crd_1,v_crd_2]]
# Load geometry
geom = _md.load(fname.replace('.npz','.%s.pdb'%path_type))
# Create contourplot
_plt.figure(figsize=figsize)
_plt.contourf(x[:-1],y[:-1], h.T, alpha=.50)
#_plt.contourf(project_dict["h"].T, alpha=.50)
# This can be take care of in "visualize sample"
_plt.plot(data[:, v_crd_1],
data[:, v_crd_2],
alpha=.25, label=path_type)
if only_selection:
geom = geom[selection]
data = data[selection]
_plt.plot(data[:,0],data[:,1],'o', label=path_type+' '+selection_label)
_plt.legend(loc='best')
_plt.xlabel('%s %u'%(proj_type, v_crd_1))
_plt.ylabel('%s %u'%(proj_type, v_crd_2))
_plt.xlim(x[[0,-1]])
_plt.ylim(y[[0,-1]])
_plt.title('%s\n-np.log(counts)'%proj_name)
iwd = sample(data, geom, _plt.gca(), plot_path=False, clear_lines=False)
project_dict = {}
for key, value in _np.load(fname).items():
project_dict[key] = value
return iwd, project_dict
# Do I really need this? Don't think so
def _project_dict(project_dict,
path_type='min_rmsd',
proj_type='TIC',
only_compact_path=True,
selection_label='within 1 sigma',
figsize=(10,10),
project_name = None
):
# From dict to variables
if path_type == 'min_rmsd':
path = project_dict["Y_path_smpl"]
compact_path = project_dict["compact_path_sample"]
elif path_type == 'min_disp':
path = project_dict["Y_path"]
compact_path = project_dict["compact_path"]
v_crd_1 = project_dict["v_crd_1"]
v_crd_2 = project_dict["v_crd_2"]
path = path[:, [v_crd_1, v_crd_2]]
geom = project_dict["geom_"+path_type]
x = project_dict["x"]
y = project_dict["y"]
h = project_dict["h"]
# Create contourplot
_plt.figure(figsize=figsize)
_plt.contourf(x[:-1],y[:-1], h.T, alpha=.50) #ATTN h is already log(PDF)
# This can be taken care of in "visualize sample", but i'm doing it here
_plt.plot(path[:, 0],
path[:, 1],
alpha=.25, label=path_type)
if only_compact_path:
geom = geom[compact_path]
path = path[compact_path]
_plt.plot(path[:,0],path[:,1],'o', label=path_type+' '+selection_label)
_plt.legend(loc='best')
_plt.xlabel('%s %u'%(proj_type, v_crd_1))
_plt.ylabel('%s %u'%(proj_type, v_crd_2))
_plt.xlim(x[[0,-1]])
_plt.ylim(y[[0,-1]])
if project_name is not None:
_plt.title('-np.log(counts)\n%s'%project_name)
iwd = sample(path, geom, _plt.gca(), plot_path=False, clear_lines=False)
return iwd
def MEP_naive(euc_points, V, start_idx, end_idx, step_size=10, allow_jumps=True):
r"""
return the indices of a path that connects the start and end points miniminzing the energy
Parameters
----------
euc_points : np.ndarray of shape (n, m)
n points of dimension m in euclidean space that the path can consist of
V : iterable of floats of len(n)
energy value for each of the points in :py:obj:`euc_points`
start_idx : int
where the path is supossed to start
end_idx : int
where the path is supossed to end
step_size : int, default is 10
parameter of the method, something like the radius of the hypershpere around the current path point
for next-step search
"""
from scipy.spatial.distance import pdist, squareform
assert _np.ndim(euc_points) == 2
assert len(V) == euc_points.shape[0], (len(V), euc_points.shape)
# Distance matrix with infinity in the diagonal
D = squareform(pdist(euc_points)) + _np.diag(_np.ones(euc_points.shape[0]) + _np.inf)
imax = 1000
path = [start_idx]
for ii in range(imax):
# Update actual distance to final step
d2end = D[path[-1], end_idx]
# Closest candidates
cands = D[path[-1]].argsort()[:step_size]
if end_idx in cands:
path.append(end_idx)
break
# print(ii, ":", path[-1], cands, end_idx, end_idx in cands)
# Take those who reduce the distance (advanced cands) and have note yet been selected
cands = [ii for ii in cands if D[end_idx][ii] <= d2end and ii not in path]
# Take the one with the minimum energy among the advanced cands
try:
path.append(cands[_np.argmin([V[ii] for ii in cands])])
except ValueError:
if allow_jumps:
cands = [ii for ii in D[path[-1]].argsort()] # Don't use step_size
cands = [ii for ii in cands if ii not in path] # Do not revisit
cands = [ii for ii in cands if D[ii, end_idx] < d2end] # Go fwd
cands = [ii for ii in cands if D[ii, start_idx] > D[path[-1], start_idx]] # Don't go bckwd
path.append(cands[0])
else:
print("Path interrupted because of need to jump.\n", \
"Please inspect this result and decide for larger step_size or simply allowing for jumps")
break
if end_idx == path[-1]:
break
return path
def MEP_auto(Y, MD_trajectories, V=None,
resolution=1000, step_size=100, endpoints=None, top=None
):
# TODO document
# Cluster
cl = _bmutils.regspace_cluster_to_target(Y, resolution, n_try_max=10,
#verbose=True
)
# Energy
if V is None:
V = -_np.log([len(ii) for ii in cl.index_clusters])
# Generate endpoints if needed
if endpoints is None:
endpoints = _bmutils.auto_GMM_model(_np.vstack(Y)).means_
elif _bmutils._is_int(endpoints):
endpoints = _bmutils.auto_GMM_model(_np.vstack(Y), ncs=[endpoints]).means_
elif isinstance(endpoints, list):
endpoints = _np.vstack(endpoints)
# Iterate over endpoint connections
paths_dict = _defdict(dict)
for ii, jj in _np.vstack(_np.triu_indices(len(endpoints), k=1)).T:
# Closest points to start and end
start_idx = _np.sum((cl.clustercenters - endpoints[ii]) ** 2, 1).argmin()
end_idx = _np.sum((cl.clustercenters - endpoints[jj]) ** 2, 1).argmin()
# Naive MEP
path_idxs = _bmutils.MEP_naive(cl.clustercenters, V, start_idx, end_idx, step_size=step_size)
# (file, frame) samples corresponding to this path
cat_smpl = cl.sample_indexes_by_cluster(path_idxs, 100)
# Translated to geometries
geom_smpl = _bmutils.save_traj_wrapper(MD_trajectories, _np.vstack(cat_smpl), None, top=top)
geom_smpl = _bmutils.re_warp(geom_smpl, [100] * len(path_idxs))
path_smpl, __ = _bmutils.visual_path(cat_smpl, geom_smpl,
start_pos=0,
# start_frame=istart_frame,
path_type='min_rmsd',
history_aware=True,
# selection=geom_smpl[0].top.select(minRMSD_selection)
)
igeom = _bmutils.save_traj_wrapper(MD_trajectories, path_smpl, None, top=top)
try:
paths_dict[ii][jj]["geom"] = igeom
paths_dict[ii][jj]["proj"] = cl.clustercenters[path_idxs]
except KeyError:
paths_dict[ii][jj] = {}
paths_dict[ii][jj]["geom"] = igeom
paths_dict[ii][jj]["proj"] = cl.clustercenters[path_idxs]
# Now the inverse path
try:
paths_dict[jj][ii]["geom"] = igeom[::-1]
paths_dict[jj][ii]["proj"] = cl.clustercenters[path_idxs][::-1]
except KeyError:
paths_dict[jj][ii] = {}
paths_dict[jj][ii]["geom"] = igeom[::-1]
paths_dict[jj][ii]["proj"] = cl.clustercenters[path_idxs][::-1]
return paths_dict
def auto_GMM_model(Y, ncs=_np.arange(2, 7)):
bics = []
for nc in ncs:
igmm = _GMM(n_components=nc)
igmm.fit(Y)
bics.append(igmm.bic(Y))
nc = ncs[_np.argmin(bics)]
if len(ncs) > 1:
igmm = _GMM(n_components=nc)
igmm.fit(Y)
return igmm
def example_notebook(extra_flags_as_one_string=None, nb_file='Projection_Explorer.ipynb', just_show_available_nbs=False):
r"""
Open the example notebook in the default browser. The ipython terminal stays active while the notebook is still active.
Ctr+C in the ipython terminal will close the notebook.
Note: The displayed notebook is a working copy of the original notebook. Feel free to mess around with it
Parameters
----------
extra_flags_as_one_string : str
Any flags you would parse along to the "jupyter notebook" command, like --no-browser etc
nb_file : str, default is "Projection_Explorer.ipynb"
The notebook file that will be opened. If you want to choose from other notebooks, set
:py:obj:`just_show_available_nbs` to True
just_show_available_nbs` : bool, default is False
Show a list of available notebooks and exit.
"""
# TODO create deprecation annotator?
_warnings.warn('molpx.example_notebooks will be deprecated in future releases. Use molpx.example_notebooks() instead.')
sleep(5)
if just_show_available_nbs:
print("List of available notebooks found in molpx's notebook directory %s"%_molpxdir(join='notebooks/'))
print("You can use any of them as 'nb_file' to open them in a safe environment")
for ff in glob(_molpxdir(join='notebooks/*.ipynb')):
print('* %s'%ff.replace(_molpxdir(join='notebooks/'),''))
return
from IPython.terminal.interactiveshell import TerminalInteractiveShell
origfile = _molpxdir('notebooks/%s'%nb_file)
with TemporaryDirectory(suffix='_test_molpx_notebook') as tmpdir:
tmpfile = _os.path.abspath(_os.path.join(tmpdir, nb_file))
shutil.copy(origfile, tmpfile)
nbstring = open(tmpfile).read()
f = open(tmpfile,'w')
f.write(nbstring.replace("# ", "<font color='red', size=1>"
"This is a temporary copy of the original notebook found in `%s`. "
"This temporary copy is located in `%s`. "
"Feel free to play around, modify or even break this notebook. "
"It wil be deleted on exit it and a new one created next time you issue "
"`molpx.example_notebooks()`</font>\\n\\n"
"# "
%(origfile, tmpfile),1))
f.close()
cmd = 'jupyter notebook %s'%tmpfile
if isinstance(extra_flags_as_one_string,str):
cmd ='%s %s'%(cmd,extra_flags_as_one_string)
eshell = TerminalInteractiveShell()
eshell.system_raw(cmd)