Skip to content

Commit

Permalink
Merge 3d56cb4 into 2946fda
Browse files Browse the repository at this point in the history
  • Loading branch information
Oddant1 committed Jul 28, 2020
2 parents 2946fda + 3d56cb4 commit 2a0f0b8
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 76 deletions.
188 changes: 112 additions & 76 deletions q2_types/feature_data/_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
# ----------------------------------------------------------------------------

import re
import skbio.io

import qiime2.plugin.model as model
from qiime2.plugin import ValidationError
Expand Down Expand Up @@ -144,69 +143,13 @@ def _validate_(self, level):


class DNAFASTAFormat(model.TextFileFormat):
def _validate_lines(self, max_lines):
def _validate_(self, level):
FASTADNAValidator = re.compile(r'[ACGTURYKMSWBDHVN]+\r?\n?')
ValidationSet = frozenset(('A', 'C', 'G', 'T', 'U', 'R', 'Y', 'K', 'M',
'S', 'W', 'B', 'D', 'H', 'V', 'N'))

last_line_was_ID = False
ids = {}

with open(str(self), 'rb') as fh:
try:
first = fh.read(6)
if first[:3] == b'\xEF\xBB\xBF':
first = first[3:]
# Empty files should validate
if first.strip() == b'':
return
if first[0] != ord(b'>'):
raise ValidationError("First line of file is not a valid "
"description. Descriptions must "
"start with '>'")
fh.seek(0)
for line_number, line in enumerate(fh, 1):
if line_number >= max_lines:
return
line = line.decode('utf-8-sig')
if line.startswith('>'):
if last_line_was_ID:
raise ValidationError('Multiple consecutive '
'descriptions starting on '
f'line {line_number-1!r}')
line = line.split()
if line[0] == '>':
if len(line) == 1:
raise ValidationError(
f'Description on line {line_number} is '
'missing an ID.')
else:
raise ValidationError(
f'ID on line {line_number} starts with a '
'space. IDs may not start with spaces')
if line[0] in ids:
raise ValidationError(
f'ID on line {line_number} is a duplicate of '
f'another ID on line {ids[line[0]]}.')
ids[line[0]] = line_number
last_line_was_ID = True
elif re.fullmatch(FASTADNAValidator, line):
last_line_was_ID = False
else:
for position, character in enumerate(line):
if character not in ValidationSet:
raise ValidationError(
f"Invalid character '{character}' at "
f"position {position} on line "
f"{line_number} (does not match IUPAC "
"characters for a DNA sequence).")
except UnicodeDecodeError as e:
raise ValidationError(f'utf-8 cannot decode byte on line '
f'{line_number}') from e

def _validate_(self, max_lines):
level_map = {'min': 100, 'max': float('inf')}
self._validate_lines(level_map[max_lines])
_validate_DNAFASTAFormats(self, FASTADNAValidator, ValidationSet,
level)


DNASequencesDirectoryFormat = model.SingleFileDirectoryFormat(
Expand All @@ -221,29 +164,122 @@ class PairedDNASequencesDirectoryFormat(model.DirectoryFormat):


class AlignedDNAFASTAFormat(model.TextFileFormat):
def sniff(self):
filepath = str(self)
sniffer = skbio.io.io_registry.get_sniffer('fasta')
if sniffer(filepath)[0]:
generator = skbio.io.read(filepath, constructor=skbio.DNA,
format='fasta', verify=False)
try:
initial_length = len(next(generator))
for seq, _ in zip(generator, range(4)):
if len(seq) != initial_length:
return False
return True
# ValueError raised by skbio if there are invalid DNA chars.
except (StopIteration, ValueError):
pass
return False
def _validate_(self, level):
FASTADNAValidator = re.compile(r'[ACGTURYKMSWBDHVN.-]+\r?\n?')
ValidationSet = frozenset(('A', 'C', 'G', 'T', 'U', 'R', 'Y', 'K', 'M',
'S', 'W', 'B', 'D', 'H', 'V', 'N', '.',
'-'))

_validate_DNAFASTAFormats(self, FASTADNAValidator, ValidationSet,
level, True)


AlignedDNASequencesDirectoryFormat = model.SingleFileDirectoryFormat(
'AlignedDNASequencesDirectoryFormat', 'aligned-dna-sequences.fasta',
AlignedDNAFASTAFormat)


def _validate_DNAFASTAFormats(fmt, FASTADNAValidator, ValidationSet, level,
aligned=False):
last_line_was_ID = False
ids = {}

seq_len = 0
prev_seq_len = 0
prev_seq_start_line = 0

level_map = {'min': 100, 'max': float('inf')}
max_lines = level_map[level]

with fmt.path.open('rb') as fh:
try:
first = fh.read(6)
if first[:3] == b'\xEF\xBB\xBF':
first = first[3:]

# Empty files should validate
if first.strip() == b'':
return

if first[0] != ord(b'>'):
raise ValidationError("First line of file is not a valid "
"description. Descriptions must "
"start with '>'")
fh.seek(0)

for line_number, line in enumerate(fh, 1):
if line_number >= max_lines:
return
line = line.decode('utf-8-sig')

if line.startswith('>'):
if seq_len == 0:
seq_len = prev_seq_len

if aligned:
_validate_line_lengths(seq_len, prev_seq_len,
prev_seq_start_line)

prev_seq_len = 0
prev_seq_start_line = 0

if last_line_was_ID:
raise ValidationError('Multiple consecutive '
'descriptions starting on '
f'line {line_number-1!r}')

line = line.split()

if line[0] == '>':
if len(line) == 1:
raise ValidationError(
f'Description on line {line_number} is '
'missing an ID.')
else:
raise ValidationError(
f'ID on line {line_number} starts with a '
'space. IDs may not start with spaces')

if line[0] in ids:
raise ValidationError(
f'ID on line {line_number} is a duplicate of '
f'another ID on line {ids[line[0]]}.')

ids[line[0]] = line_number
last_line_was_ID = True

elif re.fullmatch(FASTADNAValidator, line):
if prev_seq_start_line == 0:
prev_seq_start_line = line_number

prev_seq_len += len(line)
last_line_was_ID = False
else:
for position, character in enumerate(line):
if character not in ValidationSet:
raise ValidationError(
f"Invalid character '{character}' at "
f"position {position} on line "
f"{line_number} (does not match IUPAC "
"characters for a DNA sequence).")

except UnicodeDecodeError as e:
raise ValidationError(f'utf-8 cannot decode byte on line '
f'{line_number}') from e

if aligned:
_validate_line_lengths(seq_len, prev_seq_len, prev_seq_start_line)


def _validate_line_lengths(seq_len, prev_seq_len, prev_seq_start_line):
if prev_seq_len != seq_len:
raise ValidationError('The sequence starting on line '
f'{prev_seq_start_line} was length '
f'{prev_seq_len}. All previous sequences were '
f'length {seq_len}. All sequences must be the '
'same length for AlignedDNAFASTAFormat.')


class DifferentialFormat(model.TextFileFormat):
def validate(self, *args):
try:
Expand Down
8 changes: 8 additions & 0 deletions q2_types/feature_data/tests/test_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,14 @@ def test_aligned_dna_fasta_format_validate_negative(self):
with self.assertRaisesRegex(ValidationError, 'AlignedDNAFASTA'):
format.validate()

def test_aligned_dna_fasta_format_unaligned(self):
filepath = self.get_data_path('dna-sequences.fasta')
format = AlignedDNAFASTAFormat(filepath, mode='r')

with self.assertRaisesRegex(ValidationError,
'line 4.*length 90.*length 65'):
format.validate()

def test_aligned_dna_sequences_directory_format(self):
filepath = self.get_data_path('aligned-dna-sequences.fasta')
temp_dir = self.temp_dir.name
Expand Down

0 comments on commit 2a0f0b8

Please sign in to comment.