Skip to content

Commit

Permalink
exapdned unit tests and cleaned up some style issues in fasta iterators
Browse files Browse the repository at this point in the history
  • Loading branch information
pjuren committed Nov 15, 2015
1 parent 111bc7c commit 34b61b2
Showing 1 changed file with 88 additions and 38 deletions.
126 changes: 88 additions & 38 deletions src/pyokit/io/fastaIterators.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@
import sys
import os
import unittest
import StringIO

# pyokit imports
from pyokit.datastruct.sequence import Sequence
from pyokit.util.progressIndicator import ProgressIndicator
from pyokit.util.progressIndicator import ProgressIndicatorError
from pyokit.common.pyokitError import PyokitError


###############################################################################
# EXCEPTION CLASSES #
###############################################################################

class FastaFileFormatError(Exception):
class FastaFileFormatError(PyokitError):
def __init__(self, msg):
self.value = msg

Expand All @@ -49,7 +52,7 @@ def __str__(self):
# HELPER FUNCTIONS #
###############################################################################

def numSequences(fileh):
def num_sequences(fileh):
"""
Determine how many sequences there are in fasta file/stream.
Expand All @@ -63,7 +66,7 @@ def numSequences(fileh):
else:
fh = fileh
count = 0
for seq in fastaIterator(fh) :
for _ in fastaIterator(fh):
count += 1
return count

Expand Down Expand Up @@ -92,6 +95,51 @@ def _isSequenceHead(line):
# ITERATOR FUNCTIONS #
###############################################################################

def __read_seq_header(fh, prev_line):
"""
Given a file handle/stream, read the next fasta header from that stream.
:param fh: filehandle to read from
:param prev_line: the previous line read from this filestream
"""
seq_header = ""
if prev_line is not None and not _isSequenceHead(prev_line):
raise FastaFileFormatError("terminated on non-read header: " + prev_line)
if prev_line is None:
while seq_header.strip() == "":
seq_header = fh.readline()
else:
seq_header = prev_line
return seq_header


def __read_seq_data(fh):
"""
:return: a tuple of the sequence data and the last line that was read from
the file handle/stream
"""
line = None
seq_data = ""
while line is None or not _isSequenceHead(line):
line = fh.readline()
if line == "":
break # file is finished...
if not _isSequenceHead(line):
seq_data += line.strip()
return seq_data, line


def __build_progress_indicator(fh):
try:
total = os.path.getsize(fh.name)
except AttributeError:
raise ProgressIndicatorError("Failed to get max size for stream")
pind = ProgressIndicator(totalToDo=total,
messagePrefix="completed",
messageSuffix="of processing " + fh.name)
return pind


def fastaIterator(fn, useMutableString=False, verbose=False):
"""
A generator function which yields fastaSequence objects from a fasta-format
Expand All @@ -106,55 +154,30 @@ def fastaIterator(fn, useMutableString=False, verbose=False):
:param verbose: if True, output additional status messages to stderr about
progress
"""
prevLine = None
fh = fn
if type(fh).__name__ == "str":
fh = open(fh)

if verbose:
try:
total = os.path.getsize(fh.name)
pind = ProgressIndicator(totalToDo=total,
messagePrefix="completed",
messageSuffix="of processing "
+ fh.name)
except AttributeError:
sys.stderr.write("Warning: unable to show progress for stream")
pind = __build_progress_indicator(fh)
except ProgressIndicatorError as e:
sys.stderr.write("Warning: unable to show progress for stream. " +
"Reason: " + str(e))
verbose = False

prev_line = None
while True:
# either we have a sequence header left over from the prev call, or we need
# to read a new one from the file... try to do that now
seqHeader = ""
if prevLine is not None and not _isSequenceHead(prevLine):
raise FastaFileFormatError("terminated on non-read header: " + prevLine)
if prevLine == None:
while seqHeader.strip() == "":
seqHeader = fh.readline()
else:
seqHeader = prevLine
seqHeader = __read_seq_header(fh, prev_line)
name = seqHeader[1:].strip()

# now we need to read lines until we hit another sequence header, or we
# run out of lines.. this is our sequence data
line = None
seqdata = ""
while line == None or not _isSequenceHead(line):
line = fh.readline()
if line == "":
break # file is finished...
if not _isSequenceHead(line):
seqdata += line.strip()

# package it all up..
seq_data, prev_line = __read_seq_data(fh)
if verbose:
pind.done = fh.tell()
pind.showProgress()
yield Sequence(name, seqdata, useMutableString)
yield Sequence(name, seq_data, useMutableString)

# remember where we stopped for next call, or finish
prevLine = line
if prevLine == "":
if prev_line == "":
break


Expand All @@ -163,7 +186,34 @@ def fastaIterator(fn, useMutableString=False, verbose=False):
###############################################################################

class TestFastaIterators(unittest.TestCase):
pass

""" Unit tests for fasta iterators. """

def setUp(self):
self.file1 = ">s1\n" +\
"ACTGATCGATGCGCGATGCTAGTGC\n" +\
">s1\n" +\
"ACTGATCGATGCGCGATGCTAGTGC\n" +\
">s1\n" +\
"ACTGATCGATGCGCGATGCTAGTGC\n" +\
">s1\n" +\
"ACTGATCGATGCGCGATGCTAGTGC\n"

def test_num_sequences(self):
fh = StringIO.StringIO(self.file1)
self.assertEqual(num_sequences(fh), 4)

def test_fasta_iterator(self):
""" round-trip for a fasta stream """
fh = StringIO.StringIO(self.file1)
res = "\n".join([s.to_fasta_str(include_coords=False)
for s in fastaIterator(fh)])
self.assertEqual(res.strip(), self.file1.strip())


###############################################################################
# MAIN ENTRY POINT WHEN RUN AS STAND-ALONE MODULE #
###############################################################################

if __name__ == '__main__':
unittest.main()

0 comments on commit 34b61b2

Please sign in to comment.