Skip to content

Commit

Permalink
Merge f64ed9c into 03a15c2
Browse files Browse the repository at this point in the history
  • Loading branch information
Oddant1 committed Jun 6, 2019
2 parents 03a15c2 + f64ed9c commit e7e2e27
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 16 deletions.
96 changes: 80 additions & 16 deletions q2_types/feature_data/_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,13 @@
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------

import re
import io
import math

import skbio.io
import qiime2.plugin.model as model
from qiime2.plugin import ValidationError

from ..plugin_setup import plugin

Expand Down Expand Up @@ -131,23 +136,82 @@ def sniff(self):


class DNAFASTAFormat(model.TextFileFormat):
def sniff(self):
filepath = str(self)
sniffer = skbio.io.io_registry.get_sniffer('fasta')
if sniffer(filepath)[0]:
generator = skbio.io.read(filepath, constructor=skbio.DNA,
format='fasta', verify=False)
try:
for seq, _ in zip(generator, range(5)):
pass
return True
# ValueError raised by skbio if there are invalid DNA chars.
except ValueError:
pass
def _check(self, level):
FASTADNAValidator = re.compile(r'[ACGTURYKMSWBDHVN\-\.]+\n?')
last_line_was_ID = False

# Empty files are ok also
empty_sniffer = skbio.io.io_registry.get_sniffer('<emptyfile>')
return empty_sniffer(filepath)[0]
with open(str(self)) as fh:
try:
line_number = 1
# If we read 2 and only get one \n we have only one \n in the
# file
first = fh.read(2)
# Empty files should pass
if first == '\n' or first == '':
return
if first[0] != '>':
raise ValidationError(
"First line of file is not a valid FASTA ID. FASTA "
"IDs must start with '>'")
fh.seek(0)
line = fh.readline()
while line != '' and line_number < level:
if line.startswith('>'):
if last_line_was_ID:
raise ValidationError(
'Multiple consecutive IDs starting on line '
f'{line_number-1!r}')
last_line_was_ID = True
elif re.fullmatch(FASTADNAValidator, line):
last_line_was_ID = False
else:
raise ValidationError(
f'Invalid sequence on line {line_number}')
line_number += 1
line = fh.readline()
except UnicodeDecodeError as e:
# We tell() on the buffer because we want the actual buffer the
# error occured in, tell() on the file handle will report being
# at the end of the last buffer when the error occured well
# into the next buffer
buffer = fh.buffer
pos = buffer.tell()
# We want to start our read from the beginning of the buffer we
# encountered the error on because we have yet to count any
# lines in that buffer
pos = (math.ceil(pos / io.DEFAULT_BUFFER_SIZE) - 1) * \
io.DEFAULT_BUFFER_SIZE
buffer.seek(pos)
# e.start reports the position of the bad byte in the current
# buffer not the file, this is the cause of all the pain
# determining which buffer in the file the error occured in so
# we can find the bad byte's position in the file not just the
# buffer. If the bad byte is the final byte in a buffer it will
# report it as being the first byte in the next buffer. This is
# likely due to the fact that e.start reports the buffer
# position of the bad byte, and e.end reports the buffer
# position of the next good byte. If the last byte in a buffer
# is bad then e.start should be 8192 and e.end should be byte 0
# of the next buffer, but end being less than start doesn't
# really make sense, so this is handled by bumping e.start to 0
# and e.end to 1
fh_error = io.TextIOWrapper(io.BytesIO(buffer.read(e.start)),
errors='ignore')
# We need to count the lines in the buffer the error occured in
# only counting newline terminated lines to ensure we do not
# double count the line the error occured on, the line count is
# initialized to one which accounts for the error line, and the
# error line will always terminate with the bad byte not a
# newline
for line in fh_error:
if line[-1] == '\n':
line_number += 1
raise ValidationError('Unicode cannot decode byte on line '
f'{line_number}') from e

def _validate_(self, level):
level_map = {'min': 100, 'max': float('inf')}
self._check(level_map[level])


DNASequencesDirectoryFormat = model.SingleFileDirectoryFormat(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
>This is an ID
>This is another ID
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
>This data is corrupt
��
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This is not an id
21 changes: 21 additions & 0 deletions q2_types/feature_data/tests/test_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,27 @@ def test_dna_fasta_format_empty_file(self):

format.validate()

def test_dna_fasta_format_consecutive_IDs(self):
filepath = self.get_data_path('dna-sequences-consecutive-ids.fasta')
format = DNAFASTAFormat(filepath, mode='r')

with self.assertRaisesRegex(ValidationError, 'consecutive IDs.*1'):
format.validate()

def test_dna_fasta_format_missing_initial_ID(self):
filepath = self.get_data_path('dna-sequences-first-line-not-id.fasta')
format = DNAFASTAFormat(filepath, mode='r')

with self.assertRaisesRegex(ValidationError, 'First line'):
format.validate()

def test_dna_fasta_format_corrupt_characters(self):
filepath = self.get_data_path('dna-sequences-corrupt-characters.fasta')
format = DNAFASTAFormat(filepath, mode='r')

with self.assertRaisesRegex(ValidationError, 'Unicode.*2'):
format.validate()

def test_dna_sequences_directory_format(self):
filepath = self.get_data_path('dna-sequences.fasta')
shutil.copy(filepath,
Expand Down

0 comments on commit e7e2e27

Please sign in to comment.