diff --git a/q2_types/feature_data/_format.py b/q2_types/feature_data/_format.py index c93cba55..2c280da6 100644 --- a/q2_types/feature_data/_format.py +++ b/q2_types/feature_data/_format.py @@ -7,7 +7,6 @@ # ---------------------------------------------------------------------------- import re -import skbio.io import qiime2.plugin.model as model from qiime2.plugin import ValidationError @@ -144,69 +143,15 @@ def _validate_(self, level): class DNAFASTAFormat(model.TextFileFormat): - def _validate_lines(self, max_lines): + def _validate_(self, max_lines): + level_map = {'min': 100, 'max': float('inf')} + FASTADNAValidator = re.compile(r'[ACGTURYKMSWBDHVN]+\r?\n?') ValidationSet = frozenset(('A', 'C', 'G', 'T', 'U', 'R', 'Y', 'K', 'M', 'S', 'W', 'B', 'D', 'H', 'V', 'N')) - last_line_was_ID = False - ids = {} - - with open(str(self), 'rb') as fh: - try: - first = fh.read(6) - if first[:3] == b'\xEF\xBB\xBF': - first = first[3:] - # Empty files should validate - if first.strip() == b'': - return - if first[0] != ord(b'>'): - raise ValidationError("First line of file is not a valid " - "description. Descriptions must " - "start with '>'") - fh.seek(0) - for line_number, line in enumerate(fh, 1): - if line_number >= max_lines: - return - line = line.decode('utf-8-sig') - if line.startswith('>'): - if last_line_was_ID: - raise ValidationError('Multiple consecutive ' - 'descriptions starting on ' - f'line {line_number-1!r}') - line = line.split() - if line[0] == '>': - if len(line) == 1: - raise ValidationError( - f'Description on line {line_number} is ' - 'missing an ID.') - else: - raise ValidationError( - f'ID on line {line_number} starts with a ' - 'space. IDs may not start with spaces') - if line[0] in ids: - raise ValidationError( - f'ID on line {line_number} is a duplicate of ' - f'another ID on line {ids[line[0]]}.') - ids[line[0]] = line_number - last_line_was_ID = True - elif re.fullmatch(FASTADNAValidator, line): - last_line_was_ID = False - else: - for position, character in enumerate(line): - if character not in ValidationSet: - raise ValidationError( - f"Invalid character '{character}' at " - f"position {position} on line " - f"{line_number} (does not match IUPAC " - "characters for a DNA sequence).") - except UnicodeDecodeError as e: - raise ValidationError(f'utf-8 cannot decode byte on line ' - f'{line_number}') from e - - def _validate_(self, max_lines): - level_map = {'min': 100, 'max': float('inf')} - self._validate_lines(level_map[max_lines]) + _validate_DNAFASTAFormats(self, FASTADNAValidator, ValidationSet, + level_map[max_lines]) DNASequencesDirectoryFormat = model.SingleFileDirectoryFormat( @@ -221,22 +166,16 @@ class PairedDNASequencesDirectoryFormat(model.DirectoryFormat): class AlignedDNAFASTAFormat(model.TextFileFormat): - def sniff(self): - filepath = str(self) - sniffer = skbio.io.io_registry.get_sniffer('fasta') - if sniffer(filepath)[0]: - generator = skbio.io.read(filepath, constructor=skbio.DNA, - format='fasta', verify=False) - try: - initial_length = len(next(generator)) - for seq, _ in zip(generator, range(4)): - if len(seq) != initial_length: - return False - return True - # ValueError raised by skbio if there are invalid DNA chars. - except (StopIteration, ValueError): - pass - return False + def _validate_(self, max_lines): + level_map = {'min': 100, 'max': float('inf')} + + FASTADNAValidator = re.compile(r'[ACGTURYKMSWBDHVN.-]+\r?\n?') + ValidationSet = frozenset(('A', 'C', 'G', 'T', 'U', 'R', 'Y', 'K', 'M', + 'S', 'W', 'B', 'D', 'H', 'V', 'N', '.', + '-')) + + _validate_DNAFASTAFormats(self, FASTADNAValidator, ValidationSet, + level_map[max_lines], True) AlignedDNASequencesDirectoryFormat = model.SingleFileDirectoryFormat( @@ -244,6 +183,104 @@ def sniff(self): AlignedDNAFASTAFormat) +def _validate_DNAFASTAFormats(fmt, FASTADNAValidator, ValidationSet, + max_lines, aligned=False): + last_line_was_ID = False + ids = {} + + seq_len = 0 + prev_seq_len = 0 + prev_seq_start_line = 0 + + with open(str(fmt), 'rb') as fh: + try: + first = fh.read(6) + if first[:3] == b'\xEF\xBB\xBF': + first = first[3:] + + # Empty files should validate + if first.strip() == b'': + return + + if first[0] != ord(b'>'): + raise ValidationError("First line of file is not a valid " + "description. Descriptions must " + "start with '>'") + fh.seek(0) + + for line_number, line in enumerate(fh, 1): + if line_number >= max_lines: + return + line = line.decode('utf-8-sig') + + if line.startswith('>'): + if seq_len == 0: + seq_len = prev_seq_len + + if aligned: + _validate_line_lengths(seq_len, prev_seq_len, + prev_seq_start_line) + + prev_seq_len = 0 + prev_seq_start_line = 0 + + if last_line_was_ID: + raise ValidationError('Multiple consecutive ' + 'descriptions starting on ' + f'line {line_number-1!r}') + + line = line.split() + + if line[0] == '>': + if len(line) == 1: + raise ValidationError( + f'Description on line {line_number} is ' + 'missing an ID.') + else: + raise ValidationError( + f'ID on line {line_number} starts with a ' + 'space. IDs may not start with spaces') + + if line[0] in ids: + raise ValidationError( + f'ID on line {line_number} is a duplicate of ' + f'another ID on line {ids[line[0]]}.') + + ids[line[0]] = line_number + last_line_was_ID = True + + elif re.fullmatch(FASTADNAValidator, line): + if prev_seq_start_line == 0: + prev_seq_start_line = line_number + + prev_seq_len += len(line) + last_line_was_ID = False + else: + for position, character in enumerate(line): + if character not in ValidationSet: + raise ValidationError( + f"Invalid character '{character}' at " + f"position {position} on line " + f"{line_number} (does not match IUPAC " + "characters for a DNA sequence).") + + except UnicodeDecodeError as e: + raise ValidationError(f'utf-8 cannot decode byte on line ' + f'{line_number}') from e + + if aligned: + _validate_line_lengths(seq_len, prev_seq_len, prev_seq_start_line) + + +def _validate_line_lengths(seq_len, prev_seq_len, prev_seq_start_line): + if prev_seq_len != seq_len: + raise ValidationError('The sequence starting on line ' + f'{prev_seq_start_line} was length ' + f'{prev_seq_len}. All previous sequences were ' + f'length {seq_len}. All sequences must be the ' + 'same length for AlignedDNAFASTAFormat.') + + class DifferentialFormat(model.TextFileFormat): def validate(self, *args): try: diff --git a/q2_types/feature_data/tests/test_format.py b/q2_types/feature_data/tests/test_format.py index 7282e186..9a2dff33 100644 --- a/q2_types/feature_data/tests/test_format.py +++ b/q2_types/feature_data/tests/test_format.py @@ -271,6 +271,14 @@ def test_aligned_dna_fasta_format_validate_negative(self): with self.assertRaisesRegex(ValidationError, 'AlignedDNAFASTA'): format.validate() + def test_aligned_dna_fasta_format_unaligned(self): + filepath = self.get_data_path('dna-sequences.fasta') + format = AlignedDNAFASTAFormat(filepath, mode='r') + + with self.assertRaisesRegex(ValidationError, + 'line 4.*length 90.*length 65'): + format.validate() + def test_aligned_dna_sequences_directory_format(self): filepath = self.get_data_path('aligned-dna-sequences.fasta') temp_dir = self.temp_dir.name