/
ImmunoRender.py
251 lines (220 loc) · 10.6 KB
/
ImmunoRender.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
from __future__ import print_function
import os
import sys
import re
import glob
import logging
from time import sleep, time
import collections
from collections import OrderedDict
import chimera
from chimera import openModels
from chimera import Molecule
from chimera import runCommand as rc
from Bio import pairwise2
from Bio.SubsMat.MatrixInfo import blosum62
logging.basicConfig(format="[%(asctime)s] %(levelname)-8s-> %(message)s",
level=logging.NOTSET, datefmt='%d/%m/%Y %H:%M:%S %p')
logger = logging.getLogger(__name__)
#### Definitions of helper functions
def worms(residue, attribute):
"""Alter chain thicknesses based on an attribute"""
residue.ribbonDrawMode = chimera.Residue.Ribbon_Round
residue.ribbonDisplay = True
if getattr(r, attribute, None) is None:
rad = 0.05
else:
rad = r.attribute/2.0 + 1.0
residue.ribbonStyle = chimera.RibbonStyleWorm([rad])
def alignment2cigar(ref, qry):
"""Reconstruct a CIGAR string from a pair of pairwise-aligned sequence strings
Function modified from @lh3 on StackOverflow
"""
if len(ref) != len(qry):
raise Exception('Unequal length alignment found.')
cigar = []
# Iterate both strings by character
for i in range(len(ref)):
r, q = ref[i], qry[i]
# Set the correct character
op = '-' if (r == "-" and q == "-") else '=' if r == q else \
'I' if r == '-' else 'D' if q == '-' else 'X'
# Enumerate the CIGAR integers
if len(cigar) > 0 and cigar[-1][1] is op:
cigar[-1][0] += 1
else: cigar.append([1, op])
return "".join(map(lambda x: str(x[0]) + x[1], cigar))
def expand_cigar(cigar):
"""Given a CIGAR string, expand it to it's letter only representation.
This code uses a modified definition of CIGAR strings that permits dual
gap columns as (-).
M/= -> MATCH
I -> Insertion to the reference
D -> Deletion to the reference
N -> Skipped region from the reference (currently unused)
S -> Soft clipping (currently unused)
H -> Hard clipping (currently unused)
P -> Padding (silent deletion from padded reference) (currently unused)
= -> Sequence match
X -> Sequence mismatch
- -> Dual gap column (missing data)
e.g. 4=1M2D3= -> ====MDD===
e.g. '1-2D13X3D1X1=11X1=7X1=5X1=2X1=1X5I6-' ->
'-DDXXXXXXXXXXXXXDDDX=XXXXXXXXXXX=XXXXXXX=XXXXX=XX=XIIIII------'
"""
regex = re.compile(r'(\d+)([-=IDMX])')
return "".join([int(num) * op for num, op in
[submatch for submatch in re.findall(regex, cigar)]])
def cigar_guided_score_alignment(cigar, scores_list):
"""Taking a CIGAR representation of a pairwise alignment, expand and
recapitulate the 'moveset', and apply corresponding list manipulations
Behaviour map:
D -> Sequence deletion: drop the score value
M/= -> Sequence match: propagate the score value
I -> Sequence insertion: insert dummy data (0) (???)
(It should be unlikely the structure has more sequence)
X -> Sequence mismatch: insert dummy data (0)
"""
new_scores = []
for i, (j, k) in enumerate(zip(expand_cigar(cigar), scores_list)):
if j == "=":
new_scores.append(scores_list[i])
elif j == "D" or j == "-":
new_scores.append("-")
elif j == "I" or j == "X":
new_scores.append(0)
# Find out how chimera treats missing data for attributes. May need to set "" instead of 0
return new_scores
class AssociationContainer(object):
"""A class to hold associations between alignments, structures, and sequences.
"""
def __init__(self, model):
self.alignments = []
self.model = model
# Read in the values for the attributes
# Read data from file of OrderedDicts as literals
start = time()
data = []
try:
with open(sys.argv[1]) as fh:
for line in fh:
# do some basic checking that the file is of the correct format
if not isinstance(eval(line), collections.OrderedDict):
continue
data.append(eval(line))
except IOError as err:
logger.info("No file detected, assuming directory instead.")
try:
for filename in os.listdir(sys.argv[1]):
with open(os.path.join(sys.argv[1], filename), 'r') as fh:
for line in fh:
# do some basic checking that the file is of the correct format
if not isinstance(eval(line), collections.OrderedDict):
continue
data.append(eval(line))
except OSError as err:
logger.error("{}\nNo file or directory of files provided/detected.".format(err))
# Split the model
if len(openModels.list(modelTypes=Molecule)) == 0:
logger.error("No open models.")
sys.exit(1) # maybe 'break' instead incase this kills the chimera session
logger.info("Splitting model chains...")
rc("split")
all_associations = []
attributes = set()
# For each model
# Add the ability to specify specific models to compare against
for model in openModels.list(modelTypes=Molecule):
logger.info("Operating on model: {}.{} {}".format(model.id, model.subid, model.name))
# For each sequence in each model (though this should just be one)
for modelseq in model.sequences():
# For each entry in the inputfile
logger.info("Got model sequence: {}".format(modelseq))
for entry in data:
attributes.add(entry["AttributeName"])
association = AssociationContainer(model)
logger.info("Processing entry: {} {}...{}".format(entry["AttributeName"],
entry["Sequence"][0:10],
entry["Sequence"][-10:]))
association.entry = entry
# First test for exact sequence matches for speed
if str(entry["Sequence"]) == str(modelseq):
# and add an entry to the associations object
logger.info("Input sequence is an exact match for model sequence...")
all_associations.append(association)
else:
# If model sequence is some contiguous subsequence of the query seq
# Figure out where the subsequence starts (reciprocally)
# and assign values from that point until the end of the vals
# try:
# offest = str(modelseq).index(entry["Sequence"])
# except ValueError:
# try:
# offset = str(entry["Sequence"].index(modelseq))
# except ValueError():
# continue
# if offset:
# for i, val in enumerate(entry["Score"]):
# r = seq.residues[i+offset]
# setattr(r, entry["AttributeName"], val)
# Might be able to do away with the block above if the alignment strategy is solid
logger.info("Exact full length or substring matches not detected. Switching to alignment based approach...")
# Else, align the sequence of the MODEL to the input seq
# Treat the entry seq as reference (since it will more often than
# not be the longer of the 2), model seq as query
alignment = pairwise2.align.globalds(str(entry["Sequence"]), str(modelseq), blosum62, -10, -0.5)
logger.info("Alignment:\n {}".format(pairwise2.format_alignment(*alignment[0])))
# Gather all top alignments to then filter out the best for application
association.alignments.append(alignment[0])
all_associations.append(association)
# Now associations are assigned, modify scores and apply accordingly
# Now, take the best of all of the alignments as the correct one for that protein
# Sort the alignments in descending order to take the top one based on the alignment score
logger.info("Associations completed...")
for container in set(all_associations): # remove any identical associations?
if len(container.alignments) > 0:
logger.info("Alignments detected, manipulating scores to match aligned positions...")
container.alignments.sort(key=lambda tup: float(tup[2]))
# Next, construct a CIGAR string for the alignment to apply the same set of 'moves'
# to the scores arrays.
logger.info("Creating CIGARs...")
container.cigar = alignment2cigar(container.alignments[0][0], container.alignments[0][1]) # Uses the best alignment
temp_scores = cigar_guided_score_alignment(container.cigar, container.entry["Score"])
# Finally, drop all the deleted regions
container.entry["StructureScore"] = list(filter(lambda a: a != "-", temp_scores))
# For debugging:
# print("\n".join([alignment[0][0],expand_cigar(cigar),
# alignment[0][1], "".join([str(x) for x in new_scores])]))
# Doesn't play nice with floats
else:
container.entry["StructureScore"] = container.entry["Score"]
for score, res in zip(container.entry["StructureScore"], container.model.residues):
setattr(res, container.entry["AttributeName"], score)
logger.info("Final assignments:")
for association in set(all_associations): # set() needed because associations are duplicated for some reason...
print("Associated entry: {} {}...{}".format(association.entry["AttributeName"],
association.entry["Sequence"][0:10],
association.entry["Sequence"][-10:]))
print("With model: {}.{} {}".format(association.model.id,
association.model.subid,
association.model.name))
end = time()
logger.info("Association time taken: {}".format(end - start))
logger.info("Attributes set. Cycling render views...")
for attribute in attributes:
rc("rangecol {} min white mid yellow max red novalue #097b097b097b".format(attribute))
#[worms(res, attribute) for res in model.residues for model in [model for model in openModels.list(modelTypes=Molecule)]
sleep(1)
# Write a chimera session to avoid needing reassociation
try:
sys.argv[2]
rc("save {}_session.py".format(sys.argv[2]))
except IndexError:
pass
# To save an image
#import Midas
#Midas.copy(file='/home/goddard/hoohoo.png', format='PNG')
# format can be 'PNG', 'JPEG', 'TIFF', 'PS', 'EPS'
#chimera.closeSession()
#rc("stop now")
#sys.exit(0)