-
Notifications
You must be signed in to change notification settings - Fork 40
/
_format.py
493 lines (358 loc) · 17.5 KB
/
_format.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2023, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import re
import pandas as pd
import skbio
import qiime2.plugin.model as model
from qiime2.plugin import ValidationError
import qiime2
from ..plugin_setup import plugin
class TaxonomyFormat(model.TextFileFormat):
"""Legacy format for any 2+ column TSV file, with or without a header.
This format has been superseded by taxonomy file formats explicitly with
and without headers, `TSVTaxonomyFormat` and `HeaderlessTSVTaxonomyFormat`,
respectively.
This format remains in place for backwards-compatibility. Transformers are
intentionally not hooked up to transform this format into the canonical
.qza format (`TSVTaxonomyFormat`) to prevent users from importing data in
this format. Transformers will remain in place to transform this format
into in-memory Python objects (e.g. `pd.Series`) so that existing .qza
files can still be loaded and processed.
The only header recognized by this format is:
Feature ID<tab>Taxon
Optionally followed by other arbitrary columns.
If this header isn't present, the format is assumed to be headerless.
This format supports comment lines starting with #, and blank lines.
"""
def sniff(self):
with self.open() as fh:
count = 0
while count < 10:
line = fh.readline()
if line == '':
# EOF
break
elif line.lstrip(' ') == '\n':
# Blank line
continue
else:
cells = line.split('\t')
if len(cells) < 2:
return False
count += 1
return False if count == 0 else True
TaxonomyDirectoryFormat = model.SingleFileDirectoryFormat(
'TaxonomyDirectoryFormat', 'taxonomy.tsv', TaxonomyFormat)
class HeaderlessTSVTaxonomyFormat(TaxonomyFormat):
"""Format for a 2+ column TSV file without a header.
This format supports comment lines starting with #, and blank lines.
"""
pass
HeaderlessTSVTaxonomyDirectoryFormat = model.SingleFileDirectoryFormat(
'HeaderlessTSVTaxonomyDirectoryFormat', 'taxonomy.tsv',
HeaderlessTSVTaxonomyFormat)
class TSVTaxonomyFormat(model.TextFileFormat):
"""Format for a 2+ column TSV file with an expected minimal header.
The only header recognized by this format is:
Feature ID<tab>Taxon
Optionally followed by other arbitrary columns.
This format supports blank lines. The expected header must be the first
non-blank line. In addition to the header, there must be at least one line
of data.
"""
HEADER = ['Feature ID', 'Taxon']
def _check_n_records(self, n=None):
with self.open() as fh:
data_line_count = 0
header = None
file_ = enumerate(fh) if n is None else zip(range(n), fh)
for i, line in file_:
# Tracks line number for error reporting
i = i + 1
if line.lstrip(' ') == '\n':
# Blank line
continue
cells = line.strip('\n').split('\t')
if header is None:
if cells[:2] != self.HEADER:
raise ValidationError(
'%s must be the first two header values. The '
'first two header values provided are: %s (on '
'line %s).' % (self.HEADER, cells[:2], i))
header = cells
else:
if len(cells) != len(header):
raise ValidationError(
'Number of values on line %s are not the same as '
'number of header values. Found %s values '
'(%s), expected %s.' % (i, len(cells), cells,
len(self.HEADER)))
data_line_count += 1
if data_line_count == 0:
raise ValidationError('No taxonomy records found, only blank '
'lines and/or a header row.')
def _validate_(self, level):
self._check_n_records(n={'min': 10, 'max': None}[level])
TSVTaxonomyDirectoryFormat = model.SingleFileDirectoryFormat(
'TSVTaxonomyDirectoryFormat', 'taxonomy.tsv', TSVTaxonomyFormat)
class FASTAFormat(model.TextFileFormat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.aligned = False
self.alphabet = None
def _validate_(self, level):
FASTAValidator, ValidationSet = _construct_validator_from_alphabet(
self.alphabet)
self._validate_FASTA(level, FASTAValidator, ValidationSet)
def _validate_FASTA(self, level, FASTAValidator=None, ValidationSet=None):
last_line_was_ID = False
ids = {}
seq_len = 0
prev_seq_len = 0
prev_seq_start_line = 0
level_map = {'min': 100, 'max': float('inf')}
max_lines = level_map[level]
with self.path.open('rb') as fh:
try:
first = fh.read(6)
if first[:3] == b'\xEF\xBB\xBF':
first = first[3:]
# Empty files should validate
if first.strip() == b'':
return
if first[0] != ord(b'>'):
raise ValidationError("First line of file is not a valid "
"description. Descriptions must "
"start with '>'")
fh.seek(0)
for line_number, line in enumerate(fh, 1):
line = line.strip()
if line_number >= max_lines:
return
line = line.decode('utf-8-sig')
if line.startswith('>'):
if FASTAValidator and ValidationSet:
if seq_len == 0:
seq_len = prev_seq_len
if self.aligned:
self._validate_line_lengths(
seq_len, prev_seq_len, prev_seq_start_line)
prev_seq_len = 0
prev_seq_start_line = 0
if last_line_was_ID:
raise ValidationError('Multiple consecutive '
'descriptions starting on '
f'line {line_number-1!r}')
line = line.split()
if line[0] == '>':
if len(line) == 1:
raise ValidationError(
f'Description on line {line_number} is '
'missing an ID.')
else:
raise ValidationError(
f'ID on line {line_number} starts with a '
'space. IDs may not start with spaces')
if line[0] in ids:
raise ValidationError(
f'ID on line {line_number} is a duplicate of '
f'another ID on line {ids[line[0]]}.')
ids[line[0]] = line_number
last_line_was_ID = True
elif FASTAValidator and ValidationSet:
if re.fullmatch(FASTAValidator, line):
if prev_seq_start_line == 0:
prev_seq_start_line = line_number
prev_seq_len += len(line)
last_line_was_ID = False
else:
for position, character in enumerate(line):
if character not in ValidationSet:
raise ValidationError(
f"Invalid character '{character}' at "
f"position {position} on line "
f"{line_number} (does not match IUPAC "
"characters for this sequence type). "
"Allowed characters are "
f"{self.alphabet}.")
else:
last_line_was_ID = False
except UnicodeDecodeError as e:
raise ValidationError(f'utf-8 cannot decode byte on line '
f'{line_number}') from e
if self.aligned:
self._validate_line_lengths(
seq_len, prev_seq_len, prev_seq_start_line)
class AlignedFASTAFormatMixin:
def _turn_into_alignment(self):
self.aligned = True
self.alphabet = self.alphabet + ".-"
def _validate_line_lengths(
self, seq_len, prev_seq_len, prev_seq_start_line):
if prev_seq_len != seq_len:
raise ValidationError('The sequence starting on line '
f'{prev_seq_start_line} was length '
f'{prev_seq_len}. All previous sequences '
f'were length {seq_len}. All sequences must '
'be the same length for AlignedFASTAFormat.')
class DNAFASTAFormat(FASTAFormat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.alphabet = "ACGTRYKMSWBDHVN"
DNASequencesDirectoryFormat = model.SingleFileDirectoryFormat(
'DNASequencesDirectoryFormat', 'dna-sequences.fasta', DNAFASTAFormat)
class MixedCaseDNAFASTAFormat(DNAFASTAFormat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.alphabet = self.alphabet + self.alphabet.lower()
MixedCaseDNASequencesDirectoryFormat = model.SingleFileDirectoryFormat(
'MixedCaseDNASequencesDirectoryFormat', 'dna-sequences.fasta',
MixedCaseDNAFASTAFormat)
class RNAFASTAFormat(FASTAFormat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.alphabet = "ACGURYKMSWBDHVN"
RNASequencesDirectoryFormat = model.SingleFileDirectoryFormat(
'RNASequencesDirectoryFormat', 'rna-sequences.fasta', RNAFASTAFormat)
class MixedCaseRNAFASTAFormat(RNAFASTAFormat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.alphabet = self.alphabet + self.alphabet.lower()
MixedCaseRNASequencesDirectoryFormat = model.SingleFileDirectoryFormat(
'MixedCaseRNASequencesDirectoryFormat', 'rna-sequences.fasta',
MixedCaseRNAFASTAFormat)
class PairedDNASequencesDirectoryFormat(model.DirectoryFormat):
left_dna_sequences = model.File('left-dna-sequences.fasta',
format=DNAFASTAFormat)
right_dna_sequences = model.File('right-dna-sequences.fasta',
format=DNAFASTAFormat)
class PairedRNASequencesDirectoryFormat(model.DirectoryFormat):
left_rna_sequences = model.File('left-rna-sequences.fasta',
format=RNAFASTAFormat)
right_rna_sequences = model.File('right-rna-sequences.fasta',
format=RNAFASTAFormat)
class AlignedDNAFASTAFormat(AlignedFASTAFormatMixin, DNAFASTAFormat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
super()._turn_into_alignment()
AlignedDNASequencesDirectoryFormat = model.SingleFileDirectoryFormat(
'AlignedDNASequencesDirectoryFormat', 'aligned-dna-sequences.fasta',
AlignedDNAFASTAFormat)
class MixedCaseAlignedDNAFASTAFormat(AlignedFASTAFormatMixin,
MixedCaseDNAFASTAFormat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
super()._turn_into_alignment()
MixedCaseAlignedDNASequencesDirectoryFormat = model.SingleFileDirectoryFormat(
'MixedCaseAlignedDNASequencesDirectoryFormat',
'aligned-dna-sequences.fasta', MixedCaseAlignedDNAFASTAFormat)
class AlignedRNAFASTAFormat(AlignedFASTAFormatMixin, RNAFASTAFormat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
super()._turn_into_alignment()
AlignedRNASequencesDirectoryFormat = model.SingleFileDirectoryFormat(
'AlignedRNASequencesDirectoryFormat', 'aligned-rna-sequences.fasta',
AlignedRNAFASTAFormat)
class MixedCaseAlignedRNAFASTAFormat(AlignedFASTAFormatMixin,
MixedCaseRNAFASTAFormat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
super()._turn_into_alignment()
MixedCaseAlignedRNASequencesDirectoryFormat = model.SingleFileDirectoryFormat(
'MixedCaseAlignedRNASequencesDirectoryFormat',
'aligned-rna-sequences.fasta', MixedCaseAlignedRNAFASTAFormat)
def _construct_validator_from_alphabet(alphabet_str):
if alphabet_str:
Validator = re.compile(fr'[{alphabet_str}]+\r?\n?')
ValidationSet = frozenset(alphabet_str)
else:
Validator, ValidationSet = None, None
return Validator, ValidationSet
class DifferentialFormat(model.TextFileFormat):
def validate(self, *args):
try:
md = qiime2.Metadata.load(str(self))
except qiime2.metadata.MetadataFileError as md_exc:
raise ValidationError(md_exc) from md_exc
if md.column_count == 0:
raise ValidationError('Format must contain at least 1 column')
filtered_md = md.filter_columns(column_type='numeric')
if filtered_md.column_count != md.column_count:
raise ValidationError('Must only contain numeric values.')
DifferentialDirectoryFormat = model.SingleFileDirectoryFormat(
'DifferentialDirectoryFormat', 'differentials.tsv', DifferentialFormat)
class ProteinFASTAFormat(FASTAFormat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZ*"
ProteinSequencesDirectoryFormat = model.SingleFileDirectoryFormat(
'ProteinSequencesDirectoryFormat',
'protein-sequences.fasta',
ProteinFASTAFormat)
class MixedCaseProteinFASTAFormat(ProteinFASTAFormat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
lower_case = "abcdefghijklmnopqrstuvwxyz"
self.alphabet = self.alphabet + lower_case
MixedCaseProteinSequencesDirectoryFormat = model.SingleFileDirectoryFormat(
'MixedCaseProteinSequencesDirectoryFormat',
'protein-sequences.fasta',
MixedCaseProteinFASTAFormat)
class AlignedProteinFASTAFormat(AlignedFASTAFormatMixin, ProteinFASTAFormat):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
super()._turn_into_alignment()
AlignedProteinSequencesDirectoryFormat = model.SingleFileDirectoryFormat(
'AlignedProteinSequencesDirectoryFormat',
'aligned-protein-sequences.fasta',
AlignedProteinFASTAFormat)
class MixedCaseAlignedProteinFASTAFormat(
AlignedFASTAFormatMixin, MixedCaseProteinFASTAFormat
):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
super()._turn_into_alignment()
MixedCaseAlignedProteinSequencesDirectoryFormat = (
model.SingleFileDirectoryFormat(
'MixedCaseAlignedProteinSequencesDirectoryFormat',
'aligned-protein-sequences.fasta',
MixedCaseAlignedProteinFASTAFormat
)
)
class BLAST6Format(model.TextFileFormat):
def validate(self, *args):
try:
_ = skbio.read(str(self), format='blast+6', into=pd.DataFrame,
default_columns=True)
except pd.errors.EmptyDataError:
raise ValidationError('BLAST6 file is empty.')
except ValueError:
raise ValidationError('Invalid BLAST6 format.')
BLAST6DirectoryFormat = model.SingleFileDirectoryFormat(
'BLAST6DirectoryFormat', 'blast6.tsv', BLAST6Format)
plugin.register_formats(
TSVTaxonomyFormat, TSVTaxonomyDirectoryFormat,
HeaderlessTSVTaxonomyFormat, HeaderlessTSVTaxonomyDirectoryFormat,
TaxonomyFormat, TaxonomyDirectoryFormat, DNAFASTAFormat,
DNASequencesDirectoryFormat, PairedDNASequencesDirectoryFormat,
AlignedDNAFASTAFormat, AlignedDNASequencesDirectoryFormat,
DifferentialFormat, DifferentialDirectoryFormat, ProteinFASTAFormat,
AlignedProteinFASTAFormat, MixedCaseProteinFASTAFormat,
MixedCaseAlignedProteinFASTAFormat, ProteinSequencesDirectoryFormat,
AlignedProteinSequencesDirectoryFormat,
MixedCaseProteinSequencesDirectoryFormat,
MixedCaseAlignedProteinSequencesDirectoryFormat, RNAFASTAFormat,
RNASequencesDirectoryFormat, AlignedRNAFASTAFormat,
AlignedRNASequencesDirectoryFormat, PairedRNASequencesDirectoryFormat,
BLAST6Format, BLAST6DirectoryFormat, MixedCaseDNAFASTAFormat,
MixedCaseDNASequencesDirectoryFormat, MixedCaseRNAFASTAFormat,
MixedCaseRNASequencesDirectoryFormat, MixedCaseAlignedDNAFASTAFormat,
MixedCaseAlignedDNASequencesDirectoryFormat,
MixedCaseAlignedRNAFASTAFormat,
MixedCaseAlignedRNASequencesDirectoryFormat
)