Skip to content

Commit

Permalink
Merge 572a6d6 into 2946fda
Browse files Browse the repository at this point in the history
  • Loading branch information
Oddant1 committed Jul 9, 2020
2 parents 2946fda + 572a6d6 commit fac3b75
Showing 1 changed file with 95 additions and 16 deletions.
111 changes: 95 additions & 16 deletions q2_types/feature_data/_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# ----------------------------------------------------------------------------

import re
import skbio.io

import qiime2.plugin.model as model
from qiime2.plugin import ValidationError
Expand Down Expand Up @@ -221,22 +220,102 @@ class PairedDNASequencesDirectoryFormat(model.DirectoryFormat):


class AlignedDNAFASTAFormat(model.TextFileFormat):
def sniff(self):
filepath = str(self)
sniffer = skbio.io.io_registry.get_sniffer('fasta')
if sniffer(filepath)[0]:
generator = skbio.io.read(filepath, constructor=skbio.DNA,
format='fasta', verify=False)
def _validate_lines(self, max_lines):
FASTADNAValidator = re.compile(r'[ACGTURYKMSWBDHVN.-]+\r?\n?')
ValidationSet = frozenset(('A', 'C', 'G', 'T', 'U', 'R', 'Y', 'K', 'M',
'S', 'W', 'B', 'D', 'H', 'V', 'N', '.',
'-'))

last_line_was_ID = False
ids = {}

seq_len = 0
prev_seq_len = 0
prev_seq_start_line = 0

with open(str(self), 'rb') as fh:
try:
initial_length = len(next(generator))
for seq, _ in zip(generator, range(4)):
if len(seq) != initial_length:
return False
return True
# ValueError raised by skbio if there are invalid DNA chars.
except (StopIteration, ValueError):
pass
return False
first = fh.read(6)
if first[:3] == b'\xEF\xBB\xBF':
first = first[3:]

# Empty files should validate
if first.strip() == b'':
return

if first[0] != ord(b'>'):
raise ValidationError("First line of file is not a valid "
"description. Descriptions must "
"start with '>'")
fh.seek(0)

for line_number, line in enumerate(fh, 1):
if line_number >= max_lines:
return
line = line.decode('utf-8-sig')

if line.startswith('>'):
if seq_len == 0:
seq_len = prev_seq_len
elif prev_seq_len != seq_len:
raise ValidationError(
'The sequence starting on line '
f'{prev_seq_start_line} was length '
f'{prev_seq_len}. All previous sequences '
f'were length {seq_len}. All sequences must '
'be the same length for '
'AlignedDNAFASTAFormat.')

prev_seq_len = 0
prev_seq_start_line = 0

if last_line_was_ID:
raise ValidationError('Multiple consecutive '
'descriptions starting on '
f'line {line_number-1!r}')

line = line.split()

if line[0] == '>':
if len(line) == 1:
raise ValidationError(
f'Description on line {line_number} is '
'missing an ID.')
else:
raise ValidationError(
f'ID on line {line_number} starts with a '
'space. IDs may not start with spaces')

if line[0] in ids:
raise ValidationError(
f'ID on line {line_number} is a duplicate of '
f'another ID on line {ids[line[0]]}.')

ids[line[0]] = line_number
last_line_was_ID = True

elif re.fullmatch(FASTADNAValidator, line):
if prev_seq_start_line == 0:
prev_seq_start_line = line

prev_seq_len += len(line)
last_line_was_ID = False
else:
for position, character in enumerate(line):
if character not in ValidationSet:
raise ValidationError(
f"Invalid character '{character}' at "
f"position {position} on line "
f"{line_number} (does not match IUPAC "
"characters for a DNA sequence).")

except UnicodeDecodeError as e:
raise ValidationError(f'utf-8 cannot decode byte on line '
f'{line_number}') from e

def _validate_(self, max_lines):
level_map = {'min': 100, 'max': float('inf')}
self._validate_lines(level_map[max_lines])


AlignedDNASequencesDirectoryFormat = model.SingleFileDirectoryFormat(
Expand Down

0 comments on commit fac3b75

Please sign in to comment.