-
Notifications
You must be signed in to change notification settings - Fork 0
/
io_stockholm.py
executable file
·95 lines (76 loc) · 2.68 KB
/
io_stockholm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
"""
I/O for fasta sequence format variant generated by RDP aligner.
http://pyro.cme.msu.edu/pyro/aligner.jsp
"""
__version__ = "$Id$"
import re, os, sys
import warnings
import logging
from collections import defaultdict
log = logging
from Seq import Seq
from sequtil import wrap, removeWhitespace, removeAllButAlpha
def read(input, degap=False, case=None, keep_struct=True, keep_ref=True, check_duplicates=True):
"""
* input - filename or string containing stockholm format sequence alignment
* degap (bool) - if True, Non-alphanumeric characters are removed
* case - specify "upper" or "lower" to force sequences into either
* keep_struct - keep structural model (#=GC SS_cons element)
* keep_ref - keep reference sequence (#=GC RF element)
Raise a ValueError if sequence names are not unique.
return a list of Seq objects
"""
if len(input) < 50 and os.access(input, os.F_OK):
lines = open(input)
else:
lines = input.splitlines()
seqdata = {}
names = []
linecounts = defaultdict(int)
for line in lines:
name, seqstr = None, None
if not line.strip():
continue
elif line.startswith('#=GC'):
_, name, seqstr = line.split()
elif line.startswith("#") or line.startswith('//'):
continue
else:
name, seqstr = line.split()
if name:
linecounts[name] += 1
if name not in seqdata:
names.append(name)
seqdata[name] = seqdata.get(name, '') + seqstr.strip()
lcset = set(linecounts.values())
# all names should have the same number of lines
if len(lcset) > 1:
log.error('The following sequence names are not unique:')
expected = min(lcset)
not_unique = []
for name, count in linecounts.items():
if count != expected:
log.error('%s appears %s times' % (name, count/expected))
not_unique.append(name)
msg = 'The following sequence names are not unique: %s' % \
','.join(not_unique)
raise ValueError(msg)
seqlist = []
for name in names:
seq = seqdata[name]
if name == 'SS_cons':
if not keep_struct:
continue
elif name == 'RF' and not keep_ref:
continue
else:
seq = re.sub(r'[^a-zA-Z-]','-',seq)
if degap:
seq = removeAllButAlpha(seq)
if case == 'upper':
seq = seq.upper()
elif case == 'lower':
seq = seq.lower()
seqlist.append(Seq(name, seq))
log.info('writing %s of %s sequences' % (len(seqlist), len(names)))
return seqlist