diff --git a/q2_types/feature_data/_format.py b/q2_types/feature_data/_format.py index 90583f81..69171ccb 100644 --- a/q2_types/feature_data/_format.py +++ b/q2_types/feature_data/_format.py @@ -6,10 +6,13 @@ # The full license is in the file LICENSE, distributed with this software. # ---------------------------------------------------------------------------- +import re import skbio.io + import qiime2.plugin.model as model -import qiime2 from qiime2.plugin import ValidationError +import qiime2 + from ..plugin_setup import plugin @@ -132,23 +135,47 @@ def sniff(self): class DNAFASTAFormat(model.TextFileFormat): - def sniff(self): - filepath = str(self) - sniffer = skbio.io.io_registry.get_sniffer('fasta') - if sniffer(filepath)[0]: - generator = skbio.io.read(filepath, constructor=skbio.DNA, - format='fasta', verify=False) - try: - for seq, _ in zip(generator, range(5)): - pass - return True - # ValueError raised by skbio if there are invalid DNA chars. - except ValueError: - pass + def _validate_lines(self, max_lines): + FASTADNAValidator = re.compile(r'[ACGTURYKMSWBDHVN]+\r?\n?') + last_line_was_ID = False - # Empty files are ok also - empty_sniffer = skbio.io.io_registry.get_sniffer('') - return empty_sniffer(filepath)[0] + with open(str(self), 'rb') as fh: + try: + first = fh.read(6) + if first[:3] == b'\xEF\xBB\xBF': + first = first[3:] + # Empty files should validate + if first.strip() == b'': + return + if first[0] != ord(b'>'): + raise ValidationError("First line of file is not a valid " + "FASTA ID. FASTA IDs must start " + "with '>'") + fh.seek(0) + for line_number, line in enumerate(fh, 1): + if line_number >= max_lines: + return + line = line.decode('utf-8-sig') + if line.startswith('>'): + if last_line_was_ID: + raise ValidationError('Multiple consecutive IDs ' + 'starting on line ' + f'{line_number-1!r}') + last_line_was_ID = True + elif re.fullmatch(FASTADNAValidator, line): + last_line_was_ID = False + else: + raise ValidationError('Invalid characters on line ' + f'{line_number} (does not match ' + 'IUPAC characters for a DNA ' + 'sequence).') + except UnicodeDecodeError as e: + raise ValidationError(f'utf-8 cannot decode byte on line ' + f'{line_number}') from e + + def _validate_(self, max_lines): + level_map = {'min': 100, 'max': float('inf')} + self._validate_lines(level_map[max_lines]) DNASequencesDirectoryFormat = model.SingleFileDirectoryFormat( diff --git a/q2_types/feature_data/tests/data/dna-sequences-consecutive-ids.fasta b/q2_types/feature_data/tests/data/dna-sequences-consecutive-ids.fasta new file mode 100644 index 00000000..bead64a8 --- /dev/null +++ b/q2_types/feature_data/tests/data/dna-sequences-consecutive-ids.fasta @@ -0,0 +1,2 @@ +>This is an ID +>This is another ID diff --git a/q2_types/feature_data/tests/data/dna-sequences-corrupt-characters.fasta b/q2_types/feature_data/tests/data/dna-sequences-corrupt-characters.fasta new file mode 100644 index 00000000..8edc937e --- /dev/null +++ b/q2_types/feature_data/tests/data/dna-sequences-corrupt-characters.fasta @@ -0,0 +1,2 @@ +>This data is corrupt +مممممممممم \ No newline at end of file diff --git a/q2_types/feature_data/tests/data/dna-sequences-first-line-not-id.fasta b/q2_types/feature_data/tests/data/dna-sequences-first-line-not-id.fasta new file mode 100644 index 00000000..8c7e4d62 --- /dev/null +++ b/q2_types/feature_data/tests/data/dna-sequences-first-line-not-id.fasta @@ -0,0 +1 @@ +This is not an id diff --git a/q2_types/feature_data/tests/data/dna-with-bom-fails.fasta b/q2_types/feature_data/tests/data/dna-with-bom-fails.fasta new file mode 100644 index 00000000..69ab9f94 --- /dev/null +++ b/q2_types/feature_data/tests/data/dna-with-bom-fails.fasta @@ -0,0 +1 @@ +ُ؛؟Not a valid id diff --git a/q2_types/feature_data/tests/data/dna-with-bom-passes.fasta b/q2_types/feature_data/tests/data/dna-with-bom-passes.fasta new file mode 100644 index 00000000..9ef0c808 --- /dev/null +++ b/q2_types/feature_data/tests/data/dna-with-bom-passes.fasta @@ -0,0 +1,2 @@ +ُ؛؟>Some kinda DNA +ACGTACGTACGT diff --git a/q2_types/feature_data/tests/test_format.py b/q2_types/feature_data/tests/test_format.py index 705878d3..9f735247 100644 --- a/q2_types/feature_data/tests/test_format.py +++ b/q2_types/feature_data/tests/test_format.py @@ -144,12 +144,11 @@ def test_dna_fasta_format_validate_positive(self): format.validate() - def test_dna_fasta_format_validate_negative(self): - filepath = self.get_data_path('not-dna-sequences') + def test_dna_fasta_format_bom_passes(self): + filepath = self.get_data_path('dna-with-bom-passes.fasta') format = DNAFASTAFormat(filepath, mode='r') - with self.assertRaisesRegex(ValidationError, 'DNAFASTA'): - format.validate() + format.validate() def test_dna_fasta_format_empty_file(self): filepath = os.path.join(self.temp_dir.name, 'empty') @@ -159,6 +158,41 @@ def test_dna_fasta_format_empty_file(self): format.validate() + def test_dna_fasta_format_validate_negative(self): + filepath = self.get_data_path('not-dna-sequences') + format = DNAFASTAFormat(filepath, mode='r') + + with self.assertRaisesRegex(ValidationError, 'DNAFASTA'): + format.validate() + + def test_dna_fasta_format_consecutive_IDs(self): + filepath = self.get_data_path('dna-sequences-consecutive-ids.fasta') + format = DNAFASTAFormat(filepath, mode='r') + + with self.assertRaisesRegex(ValidationError, 'consecutive IDs.*1'): + format.validate() + + def test_dna_fasta_format_missing_initial_ID(self): + filepath = self.get_data_path('dna-sequences-first-line-not-id.fasta') + format = DNAFASTAFormat(filepath, mode='r') + + with self.assertRaisesRegex(ValidationError, 'First line'): + format.validate() + + def test_dna_fasta_format_corrupt_characters(self): + filepath = self.get_data_path('dna-sequences-corrupt-characters.fasta') + format = DNAFASTAFormat(filepath, mode='r') + + with self.assertRaisesRegex(ValidationError, 'utf-8.*2'): + format.validate() + + def test_dna_fasta_format_bom_fails(self): + filepath = self.get_data_path('dna-with-bom-fails.fasta') + format = DNAFASTAFormat(filepath, mode='r') + + with self.assertRaisesRegex(ValidationError, 'First line'): + format.validate() + def test_dna_sequences_directory_format(self): filepath = self.get_data_path('dna-sequences.fasta') shutil.copy(filepath,