diff --git a/pysam/Pileup.py b/pysam/Pileup.py index 998127b0..1fe05ecc 100644 --- a/pysam/Pileup.py +++ b/pysam/Pileup.py @@ -2,36 +2,37 @@ import collections import pysam -PileupSubstitution = collections.namedtuple( "PileupSubstitution", - " ".join( (\ - "chromosome", - "pos", - "reference_base", - "genotype", - "consensus_quality", - "snp_quality", - "mapping_quality", - "coverage", - "read_bases", - "base_qualities" ) ) ) - -PileupIndel = collections.namedtuple( "PileupIndel", - " ".join( (\ - "chromosome", - "pos", - "reference_base", - "genotype", - "consensus_quality", - "snp_quality", - "mapping_quality", - "coverage", - "first_allele", - "second_allele", - "reads_first", - "reads_second", - "reads_diff" ) ) ) - -def iterate( infile ): +PileupSubstitution = collections.namedtuple("PileupSubstitution", + " ".join(( + "chromosome", + "pos", + "reference_base", + "genotype", + "consensus_quality", + "snp_quality", + "mapping_quality", + "coverage", + "read_bases", + "base_qualities"))) + +PileupIndel = collections.namedtuple("PileupIndel", + " ".join(( + "chromosome", + "pos", + "reference_base", + "genotype", + "consensus_quality", + "snp_quality", + "mapping_quality", + "coverage", + "first_allele", + "second_allele", + "reads_first", + "reads_second", + "reads_diff"))) + + +def iterate(infile): '''iterate over ``samtools pileup -c`` formatted file. *infile* can be any iterator over a lines. @@ -39,25 +40,29 @@ def iterate( infile ): The function yields named tuples of the type :class:`pysam.Pileup.PileupSubstitution` or :class:`pysam.Pileup.PileupIndel`. - .. note:: + .. note:: + The parser converts to 0-based coordinates ''' - - conv_subst = (str,lambda x: int(x)-1,str,str,int,int,int,int,str,str) - conv_indel = (str,lambda x: int(x)-1,str,str,int,int,int,int,str,str,int,int,int) + + conv_subst = (str, lambda x: int(x) - 1, str, + str, int, int, int, int, str, str) + conv_indel = (str, lambda x: int(x) - 1, str, str, int, + int, int, int, str, str, int, int, int) for line in infile: d = line[:-1].split() if d[2] == "*": try: - yield PileupIndel( *[x(y) for x,y in zip(conv_indel,d) ] ) + yield PileupIndel(*[x(y) for x, y in zip(conv_indel, d)]) except TypeError: - raise pysam.SamtoolsError( "parsing error in line: `%s`" % line) + raise pysam.SamtoolsError("parsing error in line: `%s`" % line) else: try: - yield PileupSubstitution( *[x(y) for x,y in zip(conv_subst,d) ] ) + yield PileupSubstitution(*[x(y) for x, y in zip(conv_subst, d)]) except TypeError: - raise pysam.SamtoolsError( "parsing error in line: `%s`" % line) + raise pysam.SamtoolsError("parsing error in line: `%s`" % line) + ENCODE_GENOTYPE = { 'A': 'A', 'C': 'C', 'G': 'G', 'T': 'T', @@ -68,7 +73,7 @@ def iterate( infile ): 'GT': 'k', 'TG': 'K', 'CG': 's', 'GC': 'S', 'AT': 'w', 'TA': 'W', - } +} DECODE_GENOTYPE = { 'A': 'AA', @@ -81,59 +86,67 @@ def iterate( infile ): 'k': 'GT', 'K': 'GT', 's': 'CG', 'S': 'CG', 'w': 'AT', 'W': 'AT', - } +} + +# ------------------------------------------------------------ + -##------------------------------------------------------------ -def encodeGenotype( code ): +def encodeGenotype(code): '''encode genotypes like GG, GA into a one-letter code. The returned code is lower case if code[0] < code[1], otherwise it is uppercase. ''' - return ENCODE_GENOTYPE[ code.upper() ] + return ENCODE_GENOTYPE[code.upper()] -def decodeGenotype( code ): + +def decodeGenotype(code): '''decode single letter genotypes like m, M into two letters. This is the reverse operation to :meth:`encodeGenotype`. ''' - return DECODE_GENOTYPE[ code ] + return DECODE_GENOTYPE[code] + -def translateIndelGenotypeFromVCF( vcf_genotypes, ref ): +def translateIndelGenotypeFromVCF(vcf_genotypes, ref): '''translate indel from vcf to pileup format.''' # indels - def getPrefix( s1, s2 ): + def getPrefix(s1, s2): '''get common prefix of strings s1 and s2.''' - n = min( len( s1), len( s2 ) ) - for x in range( n ): - if s1[x] != s2[x]: return s1[:x] + n = min(len(s1), len(s2)) + for x in range(n): + if s1[x] != s2[x]: + return s1[:x] return s1[:n] - def getSuffix( s1, s2 ): + def getSuffix(s1, s2): '''get common sufix of strings s1 and s2.''' - n = min( len( s1), len( s2 ) ) - if s1[-1] != s2[-1]: return "" - for x in range( -2, -n - 1, -1 ): - if s1[x] != s2[x]: return s1[x+1:] + n = min(len(s1), len(s2)) + if s1[-1] != s2[-1]: + return "" + for x in range(-2, -n - 1, -1): + if s1[x] != s2[x]: + return s1[x + 1:] return s1[-n:] - def getGenotype( variant, ref ): + def getGenotype(variant, ref): + + if variant == ref: + return "*", 0 - if variant == ref: return "*", 0 - if len(ref) > len(variant): # is a deletion if ref.startswith(variant): return "-%s" % ref[len(variant):], len(variant) - 1 - elif ref.endswith( variant ): + elif ref.endswith(variant): return "-%s" % ref[:-len(variant)], -1 else: - prefix = getPrefix( ref, variant ) - suffix = getSuffix( ref, variant ) - shared = len(prefix) + len(suffix) - len(variant) + prefix = getPrefix(ref, variant) + suffix = getSuffix(ref, variant) + shared = len(prefix) + len(suffix) - len(variant) # print "-", prefix, suffix, ref, variant, shared, len(prefix), len(suffix), len(ref) if shared < 0: raise ValueError() - return "-%s" % ref[len(prefix):-(len(suffix)-shared)], len(prefix) - 1 + return "-%s" % ref[len(prefix):-(len(suffix) - shared)], len(prefix) - 1 elif len(ref) < len(variant): # is an insertion @@ -142,47 +155,49 @@ def getGenotype( variant, ref ): elif variant.endswith(ref): return "+%s" % variant[:len(ref)], 0 else: - prefix = getPrefix( ref, variant ) - suffix = getSuffix( ref, variant ) - shared = len(prefix) + len(suffix) - len(ref) + prefix = getPrefix(ref, variant) + suffix = getSuffix(ref, variant) + shared = len(prefix) + len(suffix) - len(ref) if shared < 0: raise ValueError() - return "+%s" % variant[len(prefix):-(len(suffix)-shared)], len(prefix) + return "+%s" % variant[len(prefix):-(len(suffix) - shared)], len(prefix) else: assert 0, "snp?" # in pileup, the position refers to the base # after the coordinate, hence subtract 1 - #pos -= 1 + # pos -= 1 genotypes, offsets = [], [] is_error = True for variant in vcf_genotypes: try: - g, offset = getGenotype( variant, ref ) + g, offset = getGenotype(variant, ref) except ValueError: break - genotypes.append( g ) - if g != "*": offsets.append( offset ) - - else: + genotypes.append(g) + if g != "*": + offsets.append(offset) + + else: is_error = False - if is_error: + if is_error: raise ValueError() - assert len(set(offsets )) == 1, "multiple offsets for indel" + assert len(set(offsets)) == 1, "multiple offsets for indel" offset = offsets[0] - genotypes = "/".join( genotypes ) + genotypes = "/".join(genotypes) return genotypes, offset -def vcf2pileup( vcf, sample ): + +def vcf2pileup(vcf, sample): '''convert vcf record to pileup record.''' - + chromosome = vcf.contig pos = vcf.pos reference = vcf.ref @@ -193,79 +208,75 @@ def vcf2pileup( vcf, sample ): # get genotype genotypes = data["GT"] if len(genotypes) > 1: - raise ValueError( "only single genotype per position, %s" % (str(vcf))) + raise ValueError("only single genotype per position, %s" % (str(vcf))) genotypes = genotypes[0] # not a variant - if genotypes[0] == ".": return None + if genotypes[0] == ".": + return None - genotypes = [ allelles[int(x)] for x in genotypes if x != "/" ] + genotypes = [allelles[int(x)] for x in genotypes if x != "/"] # snp_quality is "genotype quality" - snp_quality = consensus_quality = data.get( "GQ", [0])[0] - mapping_quality = vcf.info.get( "MQ", [0])[0] - coverage = data.get( "DP", 0) + snp_quality = consensus_quality = data.get("GQ", [0])[0] + mapping_quality = vcf.info.get("MQ", [0])[0] + coverage = data.get("DP", 0) - if len(reference) > 1 or max([len(x) for x in vcf.alt] ) > 1: + if len(reference) > 1 or max([len(x) for x in vcf.alt]) > 1: # indel - genotype, offset = translateIndelGenotypeFromVCF( genotypes, reference ) - - return PileupIndel( chromosome, - pos + offset, - "*", - genotype, - consensus_quality, - snp_quality, - mapping_quality, - coverage, - genotype, - "<" * len(genotype), - 0, - 0, - 0 ) - - else: - - genotype = encodeGenotype( "".join(genotypes) ) + genotype, offset = translateIndelGenotypeFromVCF(genotypes, reference) + + return PileupIndel(chromosome, + pos + offset, + "*", + genotype, + consensus_quality, + snp_quality, + mapping_quality, + coverage, + genotype, + "<" * len(genotype), + 0, + 0, + 0) - + else: + genotype = encodeGenotype("".join(genotypes)) read_bases = "" base_qualities = "" - return PileupSubstitution( chromosome, pos, reference, - genotype, - consensus_quality, - snp_quality, - mapping_quality, - coverage, read_bases, base_qualities ) + return PileupSubstitution(chromosome, pos, reference, + genotype, consensus_quality, + snp_quality, mapping_quality, + coverage, read_bases, + base_qualities) -def iterate_from_vcf( infile, sample ): +def iterate_from_vcf(infile, sample): '''iterate over a vcf-formatted file. *infile* can be any iterator over a lines. - The function yields named tuples of the type :class:`pysam.Pileup.PileupSubstitution` - or :class:`pysam.Pileup.PileupIndel`. + The function yields named tuples of the type + :class:`pysam.Pileup.PileupSubstitution` or + :class:`pysam.Pileup.PileupIndel`. - Positions without a snp will be skipped. + Positions without a snp will be skipped. - This method is wasteful and written to support same - legacy code that expects samtools pileup output. + This method is wasteful and written to support same legacy code + that expects samtools pileup output. Better use the vcf parser directly. ''' - - vcf = pysam.VCF() - vcf.connect( infile ) + vcf.connect(infile) if sample not in vcf.getsamples(): - raise KeyErorr( "sample %s not vcf file" ) + raise KeyError("sample %s not vcf file") for row in vcf.fetch(): - result = vcf2pileup( row, sample ) - if result: yield result - + result = vcf2pileup(row, sample) + if result: + yield result diff --git a/pysam/__init__.py b/pysam/__init__.py index c142c6c4..ed9d7cf3 100644 --- a/pysam/__init__.py +++ b/pysam/__init__.py @@ -74,8 +74,9 @@ def get_include(): def get_defines(): '''return a list of defined compilation parameters.''' - return [] #('_FILE_OFFSET_BITS', '64'), + # ('_FILE_OFFSET_BITS', '64'), # ('_USE_KNETFILE', '')] + return [] def get_libraries(): diff --git a/pysam/namedtuple.py b/pysam/namedtuple.py deleted file mode 100644 index a60fb1af..00000000 --- a/pysam/namedtuple.py +++ /dev/null @@ -1,117 +0,0 @@ -from operator import itemgetter as _itemgetter -from keyword import iskeyword as _iskeyword -import sys as _sys - -def namedtuple(typename, field_names, verbose=False, rename=False): - """Returns a new subclass of tuple with named fields. - - >>> Point = namedtuple('Point', 'x y') - >>> Point.__doc__ # docstring for the new class - 'Point(x, y)' - >>> p = Point(11, y=22) # instantiate with positional args or keywords - >>> p[0] + p[1] # indexable like a plain tuple - 33 - >>> x, y = p # unpack like a regular tuple - >>> x, y - (11, 22) - >>> p.x + p.y # fields also accessable by name - 33 - >>> d = p._asdict() # convert to a dictionary - >>> d['x'] - 11 - >>> Point(**d) # convert from a dictionary - Point(x=11, y=22) - >>> p._replace(x=100) # _replace() is like str.replace() but targets named fields - Point(x=100, y=22) - - """ - - # Parse and validate the field names. Validation serves two purposes, - # generating informative error messages and preventing template injection attacks. - if isinstance(field_names, basestring): - field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas - field_names = tuple(map(str, field_names)) - if rename: - names = list(field_names) - seen = set() - for i, name in enumerate(names): - if (not min(c.isalnum() or c=='_' for c in name) or _iskeyword(name) - or not name or name[0].isdigit() or name.startswith('_') - or name in seen): - names[i] = '_%d' % i - seen.add(name) - field_names = tuple(names) - for name in (typename,) + field_names: - if not min(c.isalnum() or c=='_' for c in name): - raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name) - if _iskeyword(name): - raise ValueError('Type names and field names cannot be a keyword: %r' % name) - if name[0].isdigit(): - raise ValueError('Type names and field names cannot start with a number: %r' % name) - seen_names = set() - for name in field_names: - if name.startswith('_') and not rename: - raise ValueError('Field names cannot start with an underscore: %r' % name) - if name in seen_names: - raise ValueError('Encountered duplicate field name: %r' % name) - seen_names.add(name) - - # Create and fill-in the class template - numfields = len(field_names) - argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes - reprtxt = ', '.join('%s=%%r' % name for name in field_names) - template = '''class %(typename)s(tuple): - '%(typename)s(%(argtxt)s)' \n - __slots__ = () \n - _fields = %(field_names)r \n - def __new__(_cls, %(argtxt)s): - return _tuple.__new__(_cls, (%(argtxt)s)) \n - @classmethod - def _make(cls, iterable, new=tuple.__new__, len=len): - 'Make a new %(typename)s object from a sequence or iterable' - result = new(cls, iterable) - if len(result) != %(numfields)d: - raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result)) - return result \n - def __repr__(self): - return '%(typename)s(%(reprtxt)s)' %% self \n - def _asdict(self): - 'Return a new dict which maps field names to their values' - return dict(zip(self._fields, self)) \n - def _replace(_self, **kwds): - 'Return a new %(typename)s object replacing specified fields with new values' - result = _self._make(map(kwds.pop, %(field_names)r, _self)) - if kwds: - raise ValueError('Got unexpected field names: %%r' %% kwds.keys()) - return result \n - def __getnewargs__(self): - return tuple(self) \n\n''' % locals() - for i, name in enumerate(field_names): - template += ' %s = _property(_itemgetter(%d))\n' % (name, i) - if verbose: - print template - - # Execute the template string in a temporary namespace - namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename, - _property=property, _tuple=tuple) - try: - exec template in namespace - except SyntaxError, e: - raise SyntaxError(e.message + ':\n' + template) - result = namespace[typename] - - # For pickling to work, the __module__ variable needs to be set to the frame - # where the named tuple is created. Bypass this step in enviroments where - # sys._getframe is not defined (Jython for example) or sys._getframe is not - # defined for arguments greater than 0 (IronPython). - try: - result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__') - except (AttributeError, ValueError): - pass - - return result - - - - - diff --git a/pysam/utils.py b/pysam/utils.py index 239f5db4..528c411c 100644 --- a/pysam/utils.py +++ b/pysam/utils.py @@ -41,7 +41,7 @@ def __init__(self, collection, dispatch, parsers): self.dispatch = dispatch self.parsers = parsers self.stderr = [] - + def __call__(self, *args, **kwargs): '''execute a samtools command. @@ -70,7 +70,7 @@ def __call__(self, *args, **kwargs): "%s returned with error %i: " "stdout=%s, stderr=%s" % (self.collection, - retval, + retval, stdout, stderr)) @@ -102,4 +102,3 @@ def usage(self): return stderr else: return stdout - diff --git a/setup.cfg b/setup.cfg index 5cb6c3fb..1f061e55 100644 --- a/setup.cfg +++ b/setup.cfg @@ -6,3 +6,5 @@ universal = 0 # -v: verbose output addopts = -s -v testpaths = pysam tests +pep8maxlinelength = 120 +pep8ignore = E402 diff --git a/tests/AlignedSegment_test.py b/tests/AlignedSegment_test.py index 8d9ec4a0..c1b11741 100644 --- a/tests/AlignedSegment_test.py +++ b/tests/AlignedSegment_test.py @@ -8,7 +8,6 @@ from TestUtils import checkFieldEqual, BAM_DATADIR, WORKDIR - class ReadTest(unittest.TestCase): def build_read(self): @@ -39,7 +38,8 @@ def testEmpty(self): a = pysam.AlignedSegment() self.assertEqual(a.query_name, None) self.assertEqual(a.query_sequence, None) - self.assertEqual(pysam.qualities_to_qualitystring(a.query_qualities), None) + self.assertEqual(pysam.qualities_to_qualitystring( + a.query_qualities), None) self.assertEqual(a.flag, 0) self.assertEqual(a.reference_id, -1) self.assertEqual(a.mapping_quality, 0) @@ -148,14 +148,16 @@ def testUpdate2(self): ''' a = self.build_read() a.query_sequence = a.query_sequence[5:10] - self.assertEqual(pysam.qualities_to_qualitystring(a.query_qualities), None) + self.assertEqual(pysam.qualities_to_qualitystring( + a.query_qualities), None) a = self.build_read() s = pysam.qualities_to_qualitystring(a.query_qualities) a.query_sequence = a.query_sequence[5:10] a.query_qualities = pysam.qualitystring_to_array(s[5:10]) - self.assertEqual(pysam.qualities_to_qualitystring(a.query_qualities), s[5:10]) + self.assertEqual(pysam.qualities_to_qualitystring( + a.query_qualities), s[5:10]) def testLargeRead(self): '''build an example read.''' @@ -362,7 +364,7 @@ def test_get_aligned_pairs(self): (3, 23, 'A'), (4, 24, 'c'), (None, 25, 'T'), (None, 26, 'T'), (5, 27, 'A'), (6, 28, 'A'), (7, 29, 'A'), (8, 30, 'A')] - ) + ) a.cigarstring = "5M2D2I2M" a.set_tag("MD", "4C^TT2") @@ -373,7 +375,7 @@ def test_get_aligned_pairs(self): (None, 25, 'T'), (None, 26, 'T'), (5, None, None), (6, None, None), (7, 27, 'A'), (8, 28, 'A')] - ) + ) def test_get_aligned_pairs_with_malformed_MD_tag(self): @@ -438,7 +440,7 @@ def testNoSequence(self): self.assertEqual(a.query_alignment_length, 20) def test_query_length_is_limited(self): - + a = self.build_read() a.query_name = "A" * 1 a.query_name = "A" * 251 @@ -451,7 +453,7 @@ def test_query_length_is_limited(self): class TestCigar(ReadTest): - + def testCigarString(self): r = self.build_read() self.assertEqual(r.cigarstring, "10M1D9M1I20M") @@ -463,16 +465,17 @@ def testCigarString(self): def testCigar(self): r = self.build_read() - self.assertEqual(r.cigartuples, [(0, 10), (2, 1), (0, 9), (1, 1), (0, 20)]) + self.assertEqual( + r.cigartuples, [(0, 10), (2, 1), (0, 9), (1, 1), (0, 20)]) # unsetting cigar string r.cigartuples = None self.assertEqual(r.cigartuples, None) class TestCigarStats(ReadTest): - + def testStats(self): - + a = self.build_read() a.cigarstring = None @@ -576,7 +579,7 @@ def testArrayTags(self): read.set_tag, key, array.array(dtype, range(10))) - + def testAddTagsType(self): a = self.build_read() a.tags = None @@ -847,7 +850,7 @@ def testLongTags(self): tags = [('XC', 85), ('XT', 'M'), ('NM', 5), ('SM', 29), ('AM', 29), ('XM', 1), ('XO', 1), ('XG', 4), ('MD', '37^ACCC29T18'), - ('XA', '5,+11707,36M1I48M,2;21,-48119779,46M1I38M,2;hs37d5,-10060835,40M1D45M,3;5,+11508,36M1I48M,3;hs37d5,+6743812,36M1I48M,3;19,-59118894,46M1I38M,3;4,-191044002,6M1I78M,3;')] + ('XA', '5,+11707,36M1I48M,2;21,-48119779,46M1I38M,2;hs37d5,-10060835,40M1D45M,3;5,+11508,36M1I48M,3;hs37d5,+6743812,36M1I48M,3;19,-59118894,46M1I38M,3;4,-191044002,6M1I78M,3;')] # noqa r.tags = tags r.tags += [("RG", rg)] * 100 @@ -870,7 +873,7 @@ def testNegativeIntegersWrittenToFile(self): "tests/test.bam", "wb", referencenames=("chr1",), - referencelengths = (1000,)) as outf: + referencelengths=(1000,)) as outf: outf.write(r) with pysam.AlignmentFile("tests/test.bam") as inf: r = next(inf) @@ -918,7 +921,7 @@ def check_tag(self, tag, value, value_type, alt_value_type=None): self.assertEqual(t, alt_value_type) else: self.assertEqual(t, value_type) - + def test_set_tag_with_A(self): self.check_tag('TT', "x", value_type="A") @@ -953,10 +956,12 @@ def test_set_tag_with_H(self): self.check_tag('TT', "AE12", value_type="H") def test_set_tag_with_automated_type_detection(self): - self.check_tag('TT', -(1 << 7), value_type=None, alt_value_type="c") - self.check_tag('TT', -(1 << 7) - 1, value_type=None, alt_value_type="s") - self.check_tag('TT', -(1 << 15), value_type=None, alt_value_type="s") - self.check_tag('TT', -(1 << 15) - 1, value_type=None, alt_value_type="i") + self.check_tag('TT', -(1 << 7), value_type=None, alt_value_type="c") + self.check_tag('TT', -(1 << 7) - 1, + value_type=None, alt_value_type="s") + self.check_tag('TT', -(1 << 15), value_type=None, alt_value_type="s") + self.check_tag('TT', -(1 << 15) - 1, + value_type=None, alt_value_type="i") self.check_tag('TT', -(1 << 31), value_type=None, alt_value_type="i") self.assertRaises( ValueError, @@ -965,12 +970,14 @@ def test_set_tag_with_automated_type_detection(self): -(1 << 31) - 1, value_type=None, alt_value_type="i") - + self.check_tag('TT', (1 << 8) - 1, value_type=None, alt_value_type="C") self.check_tag('TT', (1 << 8), value_type=None, alt_value_type="S") - self.check_tag('TT', (1 << 16) - 1, value_type=None, alt_value_type="S") + self.check_tag('TT', (1 << 16) - 1, + value_type=None, alt_value_type="S") self.check_tag('TT', (1 << 16), value_type=None, alt_value_type="I") - self.check_tag('TT', (1 << 32) - 1, value_type=None, alt_value_type="I") + self.check_tag('TT', (1 << 32) - 1, + value_type=None, alt_value_type="I") self.assertRaises( ValueError, self.check_tag, @@ -991,7 +998,7 @@ def check_tag(self, tag, value, value_type, alt_value_type=None): else: self.assertEqual(t, value_type) self.assertEqual(v, value) - + class TestAsString(unittest.TestCase): @@ -1000,13 +1007,13 @@ def testAsString(self): reference = [x[:-1] for x in samf if not x.startswith("@")] with pysam.AlignmentFile( - os.path.join(BAM_DATADIR, "ex2.bam"), "r") as pysamf: + os.path.join(BAM_DATADIR, "ex2.bam"), "r") as pysamf: for s, p in zip(reference, pysamf): self.assertEqual(s, p.tostring(pysamf)) class TestEnums(unittest.TestCase): - + def test_cigar_enums_are_defined(self): self.assertEqual(pysam.CMATCH, 0) self.assertEqual(pysam.CINS, 1) diff --git a/tests/AlignmentFilePileup_bench.py b/tests/AlignmentFilePileup_bench.py index 331c4d2d..8b02e458 100644 --- a/tests/AlignmentFilePileup_bench.py +++ b/tests/AlignmentFilePileup_bench.py @@ -47,7 +47,7 @@ def test_build_depth_with_filter_from_bam_with_pysam(benchmark): result = benchmark(build_depth_with_filter_with_pysam, os.path.join(BAM_DATADIR, "ex2.bam")) assert sum(result) == 107248 - + def test_build_query_bases_from_bam_with_samtools(benchmark): result = benchmark(build_query_bases_with_samtools, @@ -114,4 +114,3 @@ def test_build_query_positions_from_bam_with_pysam(benchmark): result = benchmark(build_query_positions_with_pysam, os.path.join(BAM_DATADIR, "ex2.bam")) assert sum([sum(x) for x in result]) == 1841736 - diff --git a/tests/AlignmentFilePileup_test.py b/tests/AlignmentFilePileup_test.py index 7d1288a1..0a8fd3bc 100644 --- a/tests/AlignmentFilePileup_test.py +++ b/tests/AlignmentFilePileup_test.py @@ -34,17 +34,18 @@ def test_pileup_mapping_qualities_are_equal(self): # convert to chars pysam_result = [ [chr(min(126, x + 33)) for x in l] for l in pysam_result] - + self.assertEqual("".join(flatten_nested_list(pysam_result)), "".join(flatten_nested_list(samtools_result))) - + def test_pileup_query_qualities_from_pileups_are_equal(self): samtools_result = build_query_qualities_with_samtoolspipe(self.fn) pysam_result = build_query_qualities_with_pysam_pileups(self.fn) pysam_result = [ "".join([chr(min(126, x + 33)) for x in l]) for l in pysam_result] - + self.assertEqual(pysam_result, samtools_result) + if __name__ == "__main__": unittest.main() diff --git a/tests/AlignmentFile_test.py b/tests/AlignmentFile_test.py index b0865b15..92bb29da 100644 --- a/tests/AlignmentFile_test.py +++ b/tests/AlignmentFile_test.py @@ -53,99 +53,46 @@ def tearDown(self): self.samfile.close() def testARqname(self): - self.assertEqual( - self.reads[0].query_name, - "read_28833_29006_6945", - "read name mismatch in read 1: %s != %s" % ( - self.reads[0].query_name, "read_28833_29006_6945")) - self.assertEqual( - self.reads[1].query_name, - "read_28701_28881_323b", - "read name mismatch in read 2: %s != %s" % ( - self.reads[1].query_name, "read_28701_28881_323b")) + self.assertEqual(self.reads[0].query_name, + "read_28833_29006_6945") + self.assertEqual(self.reads[1].query_name, + "read_28701_28881_323b") def testARflag(self): - self.assertEqual( - self.reads[0].flag, 99, - "flag mismatch in read 1: %s != %s" % ( - self.reads[0].flag, 99)) - self.assertEqual( - self.reads[1].flag, 147, - "flag mismatch in read 2: %s != %s" % ( - self.reads[1].flag, 147)) + self.assertEqual(self.reads[0].flag, 99) + self.assertEqual(self.reads[1].flag, 147) def testARrname(self): - self.assertEqual( - self.reads[0].reference_id, 0, - "chromosome/target id mismatch in read 1: %s != %s" % - (self.reads[0].reference_id, 0)) - self.assertEqual( - self.reads[1].reference_id, 1, - "chromosome/target id mismatch in read 2: %s != %s" % - (self.reads[1].reference_id, 1)) + self.assertEqual(self.reads[0].reference_id, 0) + self.assertEqual(self.reads[1].reference_id, 1) def testARpos(self): - self.assertEqual( - self.reads[0].reference_start, 33 - 1, - "mapping position mismatch in read 1: %s != %s" % - (self.reads[0].reference_start, 33 - 1)) - self.assertEqual( - self.reads[1].reference_start, 88 - 1, - "mapping position mismatch in read 2: %s != %s" % - (self.reads[1].reference_start, 88 - 1)) + self.assertEqual(self.reads[0].reference_start, 33 - 1) + self.assertEqual(self.reads[1].reference_start, 88 - 1) def testARmapq(self): - self.assertEqual( - self.reads[0].mapping_quality, 20, - "mapping quality mismatch in read 1: %s != %s" % - (self.reads[0].mapping_quality, 20)) - self.assertEqual( - self.reads[1].mapping_quality, 30, - "mapping quality mismatch in read 2: %s != %s" % ( - self.reads[1].mapping_quality, 30)) + self.assertEqual(self.reads[0].mapping_quality, 20) + self.assertEqual(self.reads[1].mapping_quality, 30) def testARcigar(self): - self.assertEqual( - self.reads[0].cigartuples, - [(0, 10), (2, 1), (0, 25)], - "read name length mismatch in read 1: %s != %s" % - (self.reads[0].cigartuples, [(0, 10), (2, 1), (0, 25)])) - self.assertEqual( - self.reads[1].cigartuples, [(0, 35)], - "read name length mismatch in read 2: %s != %s" % - (self.reads[1].cigartuples, [(0, 35)])) + self.assertEqual(self.reads[0].cigartuples, [(0, 10), (2, 1), (0, 25)]) + self.assertEqual(self.reads[1].cigartuples, [(0, 35)]) def testARcigarstring(self): self.assertEqual(self.reads[0].cigarstring, '10M1D25M') self.assertEqual(self.reads[1].cigarstring, '35M') def testARmrnm(self): - self.assertEqual( - self.reads[0].next_reference_id, 0, - "mate reference sequence name mismatch in read 1: %s != %s" % - (self.reads[0].next_reference_id, 0)) - self.assertEqual( - self.reads[1].next_reference_id, 1, - "mate reference sequence name mismatch in read 2: %s != %s" % - (self.reads[1].next_reference_id, 1)) - self.assertEqual( - self.reads[0].next_reference_id, 0, - "mate reference sequence name mismatch in read 1: %s != %s" % - (self.reads[0].next_reference_id, 0)) - self.assertEqual( - self.reads[1].next_reference_id, 1, - "mate reference sequence name mismatch in read 2: %s != %s" % - (self.reads[1].next_reference_id, 1)) + self.assertEqual(self.reads[0].next_reference_id, 0) + self.assertEqual(self.reads[1].next_reference_id, 1) + self.assertEqual(self.reads[0].next_reference_id, 0) + self.assertEqual(self.reads[1].next_reference_id, 1) def testARmpos(self): - self.assertEqual(self.reads[ - 0].next_reference_start, 200 - 1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].next_reference_start, 200 - 1)) - self.assertEqual(self.reads[ - 1].next_reference_start, 500 - 1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].next_reference_start, 500 - 1)) - self.assertEqual(self.reads[ - 0].next_reference_start, 200 - 1, "mate mapping position mismatch in read 1: %s != %s" % (self.reads[0].next_reference_start, 200 - 1)) - self.assertEqual(self.reads[ - 1].next_reference_start, 500 - 1, "mate mapping position mismatch in read 2: %s != %s" % (self.reads[1].next_reference_start, 500 - 1)) + self.assertEqual(self.reads[0].next_reference_start, 200 - 1) + self.assertEqual(self.reads[1].next_reference_start, 500 - 1) + self.assertEqual(self.reads[0].next_reference_start, 200 - 1) + self.assertEqual(self.reads[1].next_reference_start, 500 - 1) def testARQueryLength(self): self.assertEqual( @@ -166,12 +113,15 @@ def testARQueryLength(self): (self.reads[1].query_length, 35)) def testARseq(self): - self.assertEqual(self.reads[0].query_sequence, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 1: %s != %s" % ( - self.reads[0].query_sequence, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG")) - self.assertEqual(self.reads[1].query_sequence, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA", "sequence size mismatch in read 2: %s != %s" % ( - self.reads[1].query_sequence, "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA")) - self.assertEqual(self.reads[3].query_sequence, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG", "sequence mismatch in read 4: %s != %s" % ( - self.reads[3].query_sequence, "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG")) + self.assertEqual( + self.reads[0].query_sequence, + "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") + self.assertEqual( + self.reads[1].query_sequence, + "ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA") + self.assertEqual( + self.reads[3].query_sequence, + "AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG") def testARqual(self): self.assertEqual( @@ -206,19 +156,22 @@ def testARquery(self): def testARqqual(self): self.assertEqual( - pysam.qualities_to_qualitystring(self.reads[0].query_alignment_qualities), + pysam.qualities_to_qualitystring( + self.reads[0].query_alignment_qualities), "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<", "qquality string mismatch in read 1: %s != %s" % (pysam.qualities_to_qualitystring(self.reads[0].query_alignment_qualities), "<<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<<")) self.assertEqual( - pysam.qualities_to_qualitystring(self.reads[1].query_alignment_qualities), + pysam.qualities_to_qualitystring( + self.reads[1].query_alignment_qualities), "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<", "qquality string mismatch in read 2: %s != %s" % (pysam.qualities_to_qualitystring(self.reads[1].query_alignment_qualities), "<<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<<")) self.assertEqual( - pysam.qualities_to_qualitystring(self.reads[3].query_alignment_qualities), + pysam.qualities_to_qualitystring( + self.reads[3].query_alignment_qualities), "<<<<<<<<<<<<<<<<<:<9/,&,22", "qquality string mismatch in read 3: %s != %s" % (pysam.qualities_to_qualitystring(self.reads[3].query_alignment_qualities), @@ -404,7 +357,7 @@ class BasicTestSAMFromStringIO(BasicTestBAMFromFetch): def testRaises(self): statement = "samtools view -h {}".format( - os.path.join(BAM_DATADIR, "ex3.bam")) + os.path.join(BAM_DATADIR, "ex3.bam")) stdout = subprocess.check_output(statement.split(" ")) bam = StringIO() if sys.version_info.major >= 3: @@ -613,9 +566,9 @@ def testReadSamWithoutTargetNames(self): check_header=True) with pysam.AlignmentFile( - input_filename, - check_header=False, - check_sq=False) as infile: + input_filename, + check_header=False, + check_sq=False) as infile: result = list(infile.fetch(until_eof=True)) self.assertEqual(2, len(result)) @@ -638,7 +591,7 @@ def testReadBamWithoutTargetNames(self): check_header=True) with pysam.AlignmentFile( - input_filename, check_sq=False) as infile: + input_filename, check_sq=False) as infile: result = list(infile.fetch(until_eof=True)) def test_fail_read_sam_without_header(self): @@ -739,7 +692,6 @@ def testClosedFile(self): # write on closed file self.assertEqual(0, samfile.write(None)) - # TOOD # def testReadingFromSamFileWithoutHeader(self): # '''read from samfile without header. @@ -855,7 +807,7 @@ def test_fetch_by_tid(self): IndexError, samfile.fetch, tid=-1) - self.assertEqual(len(list(samfile.fetch('chr1',start=1000, end=2000))), + self.assertEqual(len(list(samfile.fetch('chr1', start=1000, end=2000))), len(list(samfile.fetch(tid=0, start=1000, end=2000)))) @@ -1163,13 +1115,14 @@ def testLargeFileBug(self): causes an error: NotImplementedError: tags field too large ''' - samfile = pysam.AlignmentFile(os.path.join(BAM_DATADIR, "issue100.bam")) + samfile = pysam.AlignmentFile( + os.path.join(BAM_DATADIR, "issue100.bam")) read = next(samfile.fetch(until_eof=True)) new_read = pysam.AlignedSegment() new_read.tags = read.tags self.assertEqual(new_read.tags, read.tags) - + class TestClipping(unittest.TestCase): def testClipping(self): @@ -1186,7 +1139,8 @@ def testClipping(self): self.assertEqual(pysam.qualities_to_qualitystring(read.query_qualities), None) self.assertEqual( - pysam.qualities_to_qualitystring(read.query_alignment_qualities), + pysam.qualities_to_qualitystring( + read.query_alignment_qualities), None) elif read.query_name == "r002": @@ -1197,7 +1151,8 @@ def testClipping(self): pysam.qualities_to_qualitystring(read.query_qualities), '01234567890') self.assertEqual( - pysam.qualities_to_qualitystring(read.query_alignment_qualities), + pysam.qualities_to_qualitystring( + read.query_alignment_qualities), '567890') elif read.query_name == "r003": @@ -1208,7 +1163,8 @@ def testClipping(self): pysam.qualities_to_qualitystring(read.query_qualities), '01234567890') self.assertEqual( - pysam.qualities_to_qualitystring(read.query_alignment_qualities), + pysam.qualities_to_qualitystring( + read.query_alignment_qualities), '012345') elif read.query_name == "r004": @@ -1219,7 +1175,8 @@ def testClipping(self): pysam.qualities_to_qualitystring(read.query_qualities), '01234') self.assertEqual( - pysam.qualities_to_qualitystring(read.query_alignment_qualities), + pysam.qualities_to_qualitystring( + read.query_alignment_qualities), '01234') @@ -1321,8 +1278,7 @@ class TestHeaderFromRefs(unittest.TestCase): class TestHeader1000Genomes(unittest.TestCase): '''see issue 110''' - # bamfile = "http://ftp.1000genomes.ebi.ac.uk/vol1/ftp/technical/phase2b_alignment/data/NA07048/exome_alignment/NA07048.unmapped.ILLUMINA.bwa.CEU.exome.20120522_p2b.bam" - bamfile = "http://ftp.1000genomes.ebi.ac.uk/vol1/ftp/technical/phase3_EX_or_LC_only_alignment/data/HG00104/alignment/HG00104.chrom11.ILLUMINA.bwa.GBR.low_coverage.20130415.bam" + bamfile = "http://ftp.1000genomes.ebi.ac.uk/vol1/ftp/technical/phase3_EX_or_LC_only_alignment/data/HG00104/alignment/HG00104.chrom11.ILLUMINA.bwa.GBR.low_coverage.20130415.bam" # noqa def testRead(self): @@ -1346,7 +1302,7 @@ class TestHeaderWriteRead(unittest.TestCase): 'PP': 'P1'}], 'HD': {'VN': '1.0'}, 'CO': ['this is a comment', 'this is another comment'], - } + } def compare_headers(self, a, b): '''compare two headers a and b. @@ -1421,7 +1377,7 @@ def test_fetch_with_asterisk_only_returns_unmapped_reads_by_contig(self): with pysam.AlignmentFile(os.path.join(BAM_DATADIR, "test_mapped_unmapped.bam"), "rb") as samfile: self.assertEqual(len(list(samfile.fetch(contig="*"))), 4) - + class TestPileupObjects(unittest.TestCase): @@ -1579,7 +1535,7 @@ def testOpenSamAsBam(self): def testOpenBamAsSam(self): # test fails, needs to be implemented. # sam.fetch() fails on reading, not on opening - #self.assertRaises(ValueError, pysam.AlignmentFile, + # self.assertRaises(ValueError, pysam.AlignmentFile, # os.path.join(BAM_DATADIR, 'ex1.bam'), # 'r') pass @@ -1605,7 +1561,7 @@ class TestDeNovoConstruction(unittest.TestCase): read_28833_29006_6945 99 chr1 33 20 10M1D25M = 200 167 AGCTTAGCTAGCTACCTATATCTTGGTCTTGGCCG <<<<<<<<<<<<<<<<<<<<<:<9/,&,22;;<<< NM:i:1 RG:Z:L1 read_28701_28881_323b 147 chr2 88 30 35M = 500 412 ACCTATATCTTGGCCTTGGCCGATGCGGCCTTGCA <<<<<;<<<<7;:<<<6;<<<<<<<<<<<<7<<<< MF:i:18 RG:Z:L2 - ''' + ''' # noqa header = {'HD': {'VN': '1.0'}, 'SQ': [{'LN': 1575, 'SN': 'chr1'}, @@ -1674,7 +1630,8 @@ def test_pass_if_reads_binary_equal(self): references = list(infile) for denovo, reference in zip(references, self.reads): checkFieldEqual(self, reference, denovo) - print("reference", str(reference), reference.get_tags(with_value_type=True)) + print("reference", str(reference), + reference.get_tags(with_value_type=True)) print("denovo", str(denovo), denovo.get_tags(with_value_type=True)) self.assertEqual(reference.compare(denovo), 0) @@ -1762,7 +1719,9 @@ def testTruncatedBam2(self): def testTruncatedBam2(self): s = pysam.AlignmentFile(os.path.join(BAM_DATADIR, 'ex2_truncated.bam'), ignore_truncation=True) - iterall = lambda x: len([a for a in x]) + + def iterall(x): + return len([a for a in x]) self.assertRaises(IOError, iterall, s) @@ -1790,6 +1749,7 @@ def testTruncatedBam2(self): 295, 0, 200, 16, 172, 3, 16, 182, 3, 11, 0, 0, 223, 111, 103, 0, 5, 225, 0, 95] + class TestBTagSam(unittest.TestCase): '''see issue 81.''' @@ -2100,7 +2060,7 @@ def setUp(self): template=self.samfile) for ii, read in enumerate(self.samfile.fetch()): # if ii % 2 == 0: # setting BFUNMAP makes no sense... - #read.flag = read.flag | 0x4 + # read.flag = read.flag | 0x4 if ii % 3 == 0: read.flag = read.flag | 0x100 if ii % 5 == 0: @@ -2221,7 +2181,7 @@ def filter(read): fast_counts = samfile.count_coverage( chrom, start, stop, read_callback='all', - #read_callback = lambda read: ~(read.flag & (0x4 | 0x100 | 0x200 | 0x400)), + # read_callback = lambda read: ~(read.flag & (0x4 | 0x100 | 0x200 | 0x400)), quality_threshold=0) manual_counts = samfile.count_coverage( chrom, start, stop, @@ -2239,7 +2199,7 @@ def test_count_coverage_nofilter(self): "test_count_coverage_nofilter.bam", 'wb', template=self.samfile) for ii, read in enumerate(self.samfile.fetch()): # if ii % 2 == 0: # setting BFUNMAP makes no sense... - #read.flag = read.flag | 0x4 + # read.flag = read.flag | 0x4 if ii % 3 == 0: read.flag = read.flag | 0x100 if ii % 5 == 0: @@ -2303,16 +2263,18 @@ def tearDown(self): def test_total(self): all_read_counts = self.samfile.count() splice_sites = self.samfile.find_introns(self.samfile.fetch()) - self.assertEqual(sum(splice_sites.values()), all_read_counts -1) # there is a single unspliced read in there + # there is a single unspliced read in there + self.assertEqual(sum(splice_sites.values()), all_read_counts - 1) def test_first(self): reads = list(self.samfile.fetch())[:10] splice_sites = self.samfile.find_introns(reads) - starts = [14792+38 - 1] - stops = [14792+38 + 140 - 1] + starts = [14792 + 38 - 1] + stops = [14792 + 38 + 140 - 1] self.assertEqual(len(splice_sites), 1) self.assertTrue((starts[0], stops[0]) in splice_sites) - self.assertEqual(splice_sites[(starts[0], stops[0])], 9) # first one is the unspliced read + # first one is the unspliced read + self.assertEqual(splice_sites[(starts[0], stops[0])], 9) def test_all(self): reads = list(self.samfile.fetch()) @@ -2327,8 +2289,8 @@ def test_all(self): (17055, 17605): 3, (17055, 17914): 1, (17368, 17605): 7, - }) - self.assertEqual(should, splice_sites) + }) + self.assertEqual(should, splice_sites) class TestLogging(unittest.TestCase): @@ -2508,7 +2470,7 @@ def test_counts_of_mapped_and_unmapped_are_correct_per_chromosome(self): counts_contigs = [x.contig for x in counts] self.assertEqual(sorted(counts_contigs), sorted(inf.references)) - + for contig in inf.references: unmapped_flag = 0 unmapped_nopos = 0 @@ -2524,6 +2486,7 @@ def test_counts_of_mapped_and_unmapped_are_correct_per_chromosome(self): self.assertEqual(cc.unmapped, unmapped_flag) self.assertEqual(cc.total, mapped_flag + unmapped_flag) + class TestSamtoolsProxy(unittest.TestCase): '''tests for sanity checking access to samtools functions.''' @@ -2634,8 +2597,8 @@ def test_empty_read_gives_value_error(self): if __name__ == "__main__": # build data files - print ("building data files") + print("building data files") subprocess.call("make -C %s" % BAM_DATADIR, shell=True) - print ("starting tests") + print("starting tests") unittest.main() - print ("completed tests") + print("completed tests") diff --git a/tests/PileupTestUtils.py b/tests/PileupTestUtils.py index 26a31088..7684d7f4 100644 --- a/tests/PileupTestUtils.py +++ b/tests/PileupTestUtils.py @@ -7,8 +7,10 @@ from TestUtils import BAM_DATADIR, force_str #################################################### -#################################################### +#################################################### # Simply building a pileup and counting number of piled-up columns + + def build_pileup_with_samtools(fn): os.system("samtools mpileup -x {} 2> /dev/null | wc -l > /dev/null".format(fn)) return 2998 @@ -29,7 +31,8 @@ def build_pileup_with_pysam(*args, **kwargs): def build_depth_with_samtools(fn): - os.system("samtools mpileup -x {} 2> /dev/null | awk '{{a += $4}} END {{print a}}' > /dev/null".format(fn)) + os.system( + "samtools mpileup -x {} 2> /dev/null | awk '{{a += $4}} END {{print a}}' > /dev/null".format(fn)) return 107248 @@ -51,7 +54,7 @@ def build_depth_with_filter_with_pysam(*args, **kwargs): def build_depth_with_pysam(*args, **kwargs): with pysam.AlignmentFile(*args, **kwargs) as inf: return [x.nsegments for x in inf.pileup(stepper="samtools")] - + def build_query_bases_with_samtools(fn): os.system("samtools mpileup -x {} 2> /dev/null | awk '{{a = a $5}} END {{print a}}' | wc -c > /dev/null".format(fn)) @@ -72,7 +75,8 @@ def build_query_bases_with_pysam_pileups(*args, **kwargs): total_pileup = [] with pysam.AlignmentFile(*args, **kwargs) as inf: total_pileup = [ - [r.alignment.query_sequence[r.query_position] for r in column.pileups if r.query_position is not None] + [r.alignment.query_sequence[r.query_position] + for r in column.pileups if r.query_position is not None] for column in inf.pileup(stepper="samtools")] return total_pileup @@ -81,7 +85,8 @@ def build_query_qualities_with_pysam_pileups(*args, **kwargs): total_pileup = [] with pysam.AlignmentFile(*args, **kwargs) as inf: total_pileup = [ - [r.alignment.query_qualities[r.query_position_or_next] for r in column.pileups if r.query_position_or_next is not None] + [r.alignment.query_qualities[r.query_position_or_next] + for r in column.pileups if r.query_position_or_next is not None] for column in inf.pileup(stepper="samtools")] return total_pileup @@ -90,7 +95,7 @@ def build_query_bases_with_pysam(*args, **kwargs): total_pileup = [] with pysam.AlignmentFile(*args, **kwargs) as inf: total_pileup = [column.get_query_sequences() for column in - inf.pileup(stepper="samtools")] + inf.pileup(stepper="samtools")] return total_pileup @@ -128,7 +133,7 @@ def build_mapping_qualities_with_samtoolspipe(fn): stderr=FNULL) as proc: data = [force_str(x).split()[6] for x in proc.stdout.readlines()] return data - + def build_mapping_qualities_with_pysam(*args, **kwargs): total_pileup = [] @@ -144,9 +149,10 @@ def build_query_positions_with_samtoolspipe(fn): stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=FNULL) as proc: - data = [list(map(int, force_str(x).split()[6].split(","))) for x in proc.stdout.readlines()] + data = [list(map(int, force_str(x).split()[6].split(","))) + for x in proc.stdout.readlines()] return data - + def build_query_positions_with_pysam(*args, **kwargs): total_pileup = [] @@ -154,5 +160,3 @@ def build_query_positions_with_pysam(*args, **kwargs): total_pileup = [column.get_query_positions() for column in inf.pileup(stepper="samtools")] return total_pileup - - diff --git a/tests/StreamFiledescriptors_test.py b/tests/StreamFiledescriptors_test.py index f6c5ced6..f09ef372 100644 --- a/tests/StreamFiledescriptors_test.py +++ b/tests/StreamFiledescriptors_test.py @@ -4,13 +4,11 @@ import threading import errno import unittest - from pysam import AlignmentFile +from TestUtils import BAM_DATADIR IS_PYTHON2 = sys.version_info[0] == 2 -from TestUtils import BAM_DATADIR - def alignmentfile_writer_thread(infile, outfile): def _writer_thread(infile, outfile): @@ -51,7 +49,8 @@ def test_text_processing(self): shell=True) as proc: in_stream = AlignmentFile(os.path.join(BAM_DATADIR, 'ex1.bam')) - out_stream = AlignmentFile(proc.stdin, 'wh', header=in_stream.header) + out_stream = AlignmentFile( + proc.stdin, 'wh', header=in_stream.header) writer = alignmentfile_writer_thread(in_stream, out_stream) @@ -63,7 +62,7 @@ def test_text_processing(self): @unittest.skip("test contains bug") def test_samtools_processing(self): - + # The following test causes the suite to hang # as the stream_processor raises: # ValueError: file has no sequences defined (mode='r') - is it SAM/BAM format? @@ -72,9 +71,10 @@ def test_samtools_processing(self): stdin=subprocess.PIPE, stdout=subprocess.PIPE, shell=True) as proc: - + in_stream = AlignmentFile(os.path.join(BAM_DATADIR, 'ex1.bam')) - out_stream = AlignmentFile(proc.stdin, 'wb', header=in_stream.header) + out_stream = AlignmentFile( + proc.stdin, 'wb', header=in_stream.header) writer = alignmentfile_writer_thread(in_stream, out_stream) @@ -83,7 +83,7 @@ def test_samtools_processing(self): out_stream, writer) self.assertEqual(read, 35) - + if __name__ == "__main__": unittest.main() diff --git a/tests/TestUtils.py b/tests/TestUtils.py index 97ce8a72..fa0768b2 100644 --- a/tests/TestUtils.py +++ b/tests/TestUtils.py @@ -10,15 +10,16 @@ "pysam_test_work")) BAM_DATADIR = os.path.abspath(os.path.join(os.path.dirname(__file__), - "pysam_data")) + "pysam_data")) TABIX_DATADIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "tabix_data")) CBCF_DATADIR = os.path.abspath(os.path.join(os.path.dirname(__file__), - "cbcf_data")) + "cbcf_data")) -LINKDIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "linker_tests")) +LINKDIR = os.path.abspath(os.path.join( + os.path.dirname(__file__), "..", "linker_tests")) TESTS_TEMPDIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "tmp")) @@ -41,6 +42,7 @@ def force_str(s): return s.decode('ascii') except AttributeError: return s + def force_bytes(s): try: return s.encode('ascii') @@ -49,6 +51,7 @@ def force_bytes(s): else: def force_str(s): return s + def force_bytes(s): return s @@ -117,9 +120,9 @@ def check_samtools_view_equal( l1 = sorted(l1[:-1].split("\t")) l2 = sorted(l2[:-1].split("\t")) if l1 != l2: - print ("mismatch in line %i" % n) - print (l1) - print (l2) + print("mismatch in line %i" % n) + print(l1) + print(l2) return False else: return False diff --git a/tests/VariantFile_test.py b/tests/VariantFile_test.py index dd8df5bb..1c53dbf3 100644 --- a/tests/VariantFile_test.py +++ b/tests/VariantFile_test.py @@ -44,12 +44,12 @@ def check(self, filename): self.assertEqual(True, os.path.exists(fn)) v = pysam.VariantFile(fn) for site in v: - for ss,rec in site.samples.items(): + for ss, rec in site.samples.items(): a, b = ss, rec v = pysam.VariantFile(fn) for x, site in enumerate(v): - for ss,rec in site.samples.items(): + for ss, rec in site.samples.items(): a, b = ss, rec.alleles a, b = ss, rec.allele_indices @@ -79,8 +79,7 @@ def testEmptyFileVCF(self): os.unlink("tests/tmp_testEmptyFile.vcf") - - if Path and sys.version_info >= (3,6): + if Path and sys.version_info >= (3, 6): def testEmptyFileVCFFromPath(self): with open("tests/tmp_testEmptyFile.vcf", "w"): pass @@ -132,7 +131,7 @@ def testEmptyFileVCFGZOnlyHeader(self): def testDetectVCF(self): with pysam.VariantFile(os.path.join(CBCF_DATADIR, - "example_vcf40.vcf")) as inf: + "example_vcf40.vcf")) as inf: self.assertEqual(inf.category, 'VARIANTS') self.assertEqual(inf.format, 'VCF') self.assertEqual(inf.compression, 'NONE') @@ -142,7 +141,7 @@ def testDetectVCF(self): def testDetectVCFGZ(self): with pysam.VariantFile(os.path.join(CBCF_DATADIR, - "example_vcf40.vcf.gz")) as inf: + "example_vcf40.vcf.gz")) as inf: self.assertEqual(inf.category, 'VARIANTS') self.assertEqual(inf.format, 'VCF') self.assertEqual(inf.compression, 'BGZF') @@ -210,7 +209,7 @@ def testChrom(self): chrom = [rec.chrom for rec in v] self.assertEqual(chrom, ['M', '17', '20', '20', '20']) - if Path and sys.version_info >= (3,6): + if Path and sys.version_info >= (3, 6): def testChromFromPath(self): fn = os.path.join(CBCF_DATADIR, self.filename) v = pysam.VariantFile(Path(fn)) @@ -239,7 +238,8 @@ def testId(self): fn = os.path.join(CBCF_DATADIR, self.filename) v = pysam.VariantFile(fn) ids = [rec.id for rec in v] - self.assertEqual(ids, [None, 'rs6054257', None, 'rs6040355', 'microsat1']) + self.assertEqual( + ids, [None, 'rs6054257', None, 'rs6040355', 'microsat1']) def testRef(self): fn = os.path.join(CBCF_DATADIR, self.filename) @@ -251,13 +251,15 @@ def testAlt(self): fn = os.path.join(CBCF_DATADIR, self.filename) v = pysam.VariantFile(fn) alts = [rec.alts for rec in v] - self.assertEqual(alts, [None, ('A',), ('A',), ('G', 'T'), ('G', 'GTACT')]) + self.assertEqual(alts, [None, ('A',), ('A',), + ('G', 'T'), ('G', 'GTACT')]) def testAlleles(self): fn = os.path.join(CBCF_DATADIR, self.filename) v = pysam.VariantFile(fn) alleles = [rec.alleles for rec in v] - self.assertEqual(alleles, [('T',), ('G', 'A'), ('T', 'A'), ('A', 'G', 'T'), ('GTCT', 'G', 'GTACT')]) + self.assertEqual(alleles, [ + ('T',), ('G', 'A'), ('T', 'A'), ('A', 'G', 'T'), ('GTCT', 'G', 'GTACT')]) def testQual(self): fn = os.path.join(CBCF_DATADIR, self.filename) @@ -269,17 +271,20 @@ def testFilter(self): fn = os.path.join(CBCF_DATADIR, self.filename) v = pysam.VariantFile(fn) filter = [rec.filter.keys() for rec in v] - self.assertEqual(filter, [['PASS'], ['PASS'], ['q10'], ['PASS'], ['PASS']]) + self.assertEqual(filter, [['PASS'], ['PASS'], + ['q10'], ['PASS'], ['PASS']]) def testInfo(self): fn = os.path.join(CBCF_DATADIR, self.filename) v = pysam.VariantFile(fn) info = [rec.info.items() for rec in v] self.assertEqual(info, [[('NS', 3), ('DP', 13), ('AA', 'T')], - [('NS', 3), ('DP', 14), ('AF', (0.5,)), ('DB', True), ('H2', True)], - [('NS', 3), ('DP', 11), ('AF', (0.017000000923871994,))], + [('NS', 3), ('DP', 14), ('AF', (0.5,)), + ('DB', True), ('H2', True)], + [('NS', 3), ('DP', 11), + ('AF', (0.017000000923871994,))], [('NS', 2), ('DP', 10), ('AF', (0.3330000042915344, 0.6669999957084656)), - ('AA', 'T'), ('DB', True)], + ('AA', 'T'), ('DB', True)], [('NS', 3), ('DP', 9), ('AA', 'G')]]) def testFormat(self): @@ -308,17 +313,28 @@ def testSampleFormats(self): v = pysam.VariantFile(fn) format = [s.items() for rec in v for s in rec.samples.values()] self.assertEqual(format, [[('GT', (0, 0)), ('GQ', 54), ('DP', 7), ('HQ', (56, 60))], - [('GT', (0, 0)), ('GQ', 48), ('DP', 4), ('HQ', (51, 51))], - [('GT', (0, 0)), ('GQ', 61), ('DP', 2), ('HQ', (None,))], - [('GT', (0, 0)), ('GQ', 48), ('DP', 1), ('HQ', (51, 51))], - [('GT', (1, 0)), ('GQ', 48), ('DP', 8), ('HQ', (51, 51))], - [('GT', (1, 1)), ('GQ', 43), ('DP', 5), ('HQ', (None, None))], - [('GT', (0, 0)), ('GQ', 49), ('DP', 3), ('HQ', (58, 50))], - [('GT', (0, 1)), ('GQ', 3), ('DP', 5), ('HQ', (65, 3))], - [('GT', (0, 0)), ('GQ', 41), ('DP', 3), ('HQ', (None,))], - [('GT', (1, 2)), ('GQ', 21), ('DP', 6), ('HQ', (23, 27))], - [('GT', (2, 1)), ('GQ', 2), ('DP', 0), ('HQ', (18, 2))], - [('GT', (2, 2)), ('GQ', 35), ('DP', 4), ('HQ', (None,))], + [('GT', (0, 0)), ('GQ', 48), + ('DP', 4), ('HQ', (51, 51))], + [('GT', (0, 0)), ('GQ', 61), + ('DP', 2), ('HQ', (None,))], + [('GT', (0, 0)), ('GQ', 48), + ('DP', 1), ('HQ', (51, 51))], + [('GT', (1, 0)), ('GQ', 48), + ('DP', 8), ('HQ', (51, 51))], + [('GT', (1, 1)), ('GQ', 43), + ('DP', 5), ('HQ', (None, None))], + [('GT', (0, 0)), ('GQ', 49), + ('DP', 3), ('HQ', (58, 50))], + [('GT', (0, 1)), ('GQ', 3), + ('DP', 5), ('HQ', (65, 3))], + [('GT', (0, 0)), ('GQ', 41), + ('DP', 3), ('HQ', (None,))], + [('GT', (1, 2)), ('GQ', 21), + ('DP', 6), ('HQ', (23, 27))], + [('GT', (2, 1)), ('GQ', 2), + ('DP', 0), ('HQ', (18, 2))], + [('GT', (2, 2)), ('GQ', 35), + ('DP', 4), ('HQ', (None,))], [('GT', (0, 1)), ('GQ', 35), ('DP', 4)], [('GT', (0, 2)), ('GQ', 17), ('DP', 2)], [('GT', (1, 1)), ('GQ', 40), ('DP', 3)]]) @@ -336,7 +352,7 @@ class TestIndexFilename(unittest.TestCase): filenames = [('example_vcf40.vcf.gz', 'example_vcf40.vcf.gz.tbi'), ('example_vcf40.vcf.gz', 'example_vcf40.vcf.gz.csi'), - ('example_vcf40.bcf', 'example_vcf40.bcf.csi')] + ('example_vcf40.bcf', 'example_vcf40.bcf.csi')] def testOpen(self): for fn, idx_fn in self.filenames: @@ -435,7 +451,7 @@ def testConstructionWithLines(self): self.complete_check(fn_in, fn_out) -#class TestConstructionVCFWithoutContigs(TestConstructionVCFWithContigs): +# class TestConstructionVCFWithoutContigs(TestConstructionVCFWithContigs): # """construct VariantFile from scratch.""" # filename = "example_vcf40.vcf" @@ -466,7 +482,8 @@ def testBase(self): self.assertEqual(inf.format, 'VCF') self.assertEqual(inf.version, (4, 0)) self.assertEqual(inf.compression, 'NONE') - self.assertEqual(inf.description, 'VCF version 4.0 variant calling text') + self.assertEqual( + inf.description, 'VCF version 4.0 variant calling text') self.assertTrue(inf.is_open) self.assertEqual(inf.is_read, True) self.assertEqual(inf.is_write, False) @@ -485,14 +502,15 @@ def testGenotype(self): with pysam.VariantFile(os.path.join(CBCF_DATADIR, self.filename)) as inf: record = next(inf) sample = record.samples["NA00001"] - print (sample["GT"]) + print(sample["GT"]) self.assertEqual(sample["GT"], (0, 0)) sample["GT"] = sample["GT"] + class TestSubsetting(unittest.TestCase): - + filename = "example_vcf42.vcf.gz" - + def testSubsetting(self): with pysam.VariantFile(os.path.join(CBCF_DATADIR, self.filename)) as inf: @@ -501,8 +519,8 @@ def testSubsetting(self): if __name__ == "__main__": # build data files - print ("building data files") + print("building data files") subprocess.call("make -C %s" % CBCF_DATADIR, shell=True) - print ("starting tests") + print("starting tests") unittest.main() - print ("completed tests") + print("completed tests") diff --git a/tests/compile_test.py b/tests/compile_test.py index f91e180d..f56adb77 100644 --- a/tests/compile_test.py +++ b/tests/compile_test.py @@ -8,21 +8,20 @@ # clean up previous compilation import os +import unittest +import pysam +from TestUtils import BAM_DATADIR, TABIX_DATADIR + try: os.unlink('tests/_compile_test.c') os.unlink('tests/_compile_test.pyxbldc') except OSError: pass - import pyximport pyximport.install(build_in_temp=False) import _compile_test -import unittest -import pysam -from TestUtils import BAM_DATADIR, TABIX_DATADIR - class BAMTest(unittest.TestCase): @@ -43,7 +42,7 @@ def testCount(self): nread = _compile_test.testCountGTF( pysam.Tabixfile(self.input_filename)) self.assertEqual(nread, 237) - + if __name__ == "__main__": unittest.main() diff --git a/tests/faidx_bench.py b/tests/faidx_bench.py index a6d1a156..2c58fcef 100644 --- a/tests/faidx_bench.py +++ b/tests/faidx_bench.py @@ -16,10 +16,11 @@ def iterate_over_fastx(fn, persist=True): def iterate_over_fastx_as_file(fn): with open(fn) as inf: return len(inf.read()) - + def test_fasta_iteration_short_sequences(benchmark): - result = benchmark(iterate_over_fastx, os.path.join(BAM_DATADIR, "faidx_ex1.fa")) + result = benchmark(iterate_over_fastx, os.path.join( + BAM_DATADIR, "faidx_ex1.fa")) assert result == 3270 @@ -29,35 +30,42 @@ def test_fasta_iteration_long_sequences(benchmark): def test_fasta_iteration_short_sequences_without_persistence(benchmark): - result = benchmark(iterate_over_fastx, os.path.join(BAM_DATADIR, "faidx_ex1.fa"), persist=False) + result = benchmark(iterate_over_fastx, os.path.join( + BAM_DATADIR, "faidx_ex1.fa"), persist=False) assert result == 3270 def test_fasta_iteration_long_sequences_without_persistence(benchmark): - result = benchmark(iterate_over_fastx, os.path.join(BAM_DATADIR, "ex1.fa"), persist=False) + result = benchmark(iterate_over_fastx, os.path.join( + BAM_DATADIR, "ex1.fa"), persist=False) assert result == 2 def test_fasta_iteration_short_sequences_as_file(benchmark): - result = benchmark(iterate_over_fastx_as_file, os.path.join(BAM_DATADIR, "faidx_ex1.fa")) + result = benchmark(iterate_over_fastx_as_file, + os.path.join(BAM_DATADIR, "faidx_ex1.fa")) assert result == 195399 def test_fasta_iteration_long_sequences_as_file(benchmark): - result = benchmark(iterate_over_fastx_as_file, os.path.join(BAM_DATADIR, "ex1.fa")) + result = benchmark(iterate_over_fastx_as_file, + os.path.join(BAM_DATADIR, "ex1.fa")) assert result == 3225 def test_fastq_iteration_short_sequences(benchmark): - result = benchmark(iterate_over_fastx, os.path.join(BAM_DATADIR, "faidx_ex1.fq")) + result = benchmark(iterate_over_fastx, os.path.join( + BAM_DATADIR, "faidx_ex1.fq")) assert result == 3270 def test_fastq_iteration_short_sequences_without_persistence(benchmark): - result = benchmark(iterate_over_fastx, os.path.join(BAM_DATADIR, "faidx_ex1.fq"), persist=False) + result = benchmark(iterate_over_fastx, os.path.join( + BAM_DATADIR, "faidx_ex1.fq"), persist=False) assert result == 3270 - + def test_fastq_iteration_short_sequences_as_file(benchmark): - result = benchmark(iterate_over_fastx_as_file, os.path.join(BAM_DATADIR, "faidx_ex1.fq")) + result = benchmark(iterate_over_fastx_as_file, + os.path.join(BAM_DATADIR, "faidx_ex1.fq")) assert result == 320458 diff --git a/tests/faidx_test.py b/tests/faidx_test.py index 9df34b65..e6da74a6 100644 --- a/tests/faidx_test.py +++ b/tests/faidx_test.py @@ -12,9 +12,9 @@ class TestFastaFile(unittest.TestCase): sequences = { 'chr1': - "CACTAGTGGCTCATTGTAAATGTGTGGTTTAACTCGTCCATGGCCCAGCATTAGGGAGCTGTGGACCCTGCAGCCTGGCTGTGGGGGCCGCAGTGGCTGAGGGGTGCAGAGCCGAGTCACGGGGTTGCCAGCACAGGGGCTTAACCTCTGGTGACTGCCAGAGCTGCTGGCAAGCTAGAGTCCCATTTGGAGCCCCTCTAAGCCGTTCTATTTGTAATGAAAACTATATTTATGCTATTCAGTTCTAAATATAGAAATTGAAACAGCTGTGTTTAGTGCCTTTGTTCAACCCCCTTGCAACAACCTTGAGAACCCCAGGGAATTTGTCAATGTCAGGGAAGGAGCATTTTGTCAGTTACCAAATGTGTTTATTACCAGAGGGATGGAGGGAAGAGGGACGCTGAAGAACTTTGATGCCCTCTTCTTCCAAAGATGAAACGCGTAACTGCGCTCTCATTCACTCCAGCTCCCTGTCACCCAATGGACCTGTGATATCTGGATTCTGGGAAATTCTTCATCCTGGACCCTGAGAGATTCTGCAGCCCAGCTCCAGATTGCTTGTGGTCTGACAGGCTGCAACTGTGAGCCATCACAATGAACAACAGGAAGAAAAGGTCTTTCAAAAGGTGATGTGTGTTCTCATCAACCTCATACACACACATGGTTTAGGGGTATAATACCTCTACATGGCTGATTATGAAAACAATGTTCCCCAGATACCATCCCTGTCTTACTTCCAGCTCCCCAGAGGGAAAGCTTTCAACGCTTCTAGCCATTTCTTTTGGCATTTGCCTTCAGACCCTACACGAATGCGTCTCTACCACAGGGGGCTGCGCGGTTTCCCATCATGAAGCACTGAACTTCCACGTCTCATCTAGGGGAACAGGGAGGTGCACTAATGCGCTCCACGCCCAAGCCCTTCTCACAGTTTCTGCCCCCAGCATGGTTGTACTGGGCAATACATGAGATTATTAGGAAATGCTTTACTGTCATAACTATGAAGAGACTATTGCCAGATGAACCACACATTAATACTATGTTTCTTATCTGCACATTACTACCCTGCAATTAATATAATTGTGTCCATGTACACACGCTGTCCTATGTACTTATCATGACTCTATCCCAAATTCCCAATTACGTCCTATCTTCTTCTTAGGGAAGAACAGCTTAGGTATCAATTTGGTGTTCTGTGTAAAGTCTCAGGGAGCCGTCCGTGTCCTCCCATCTGGCCTCGTCCACACTGGTTCTCTTGAAAGCTTGGGCTGTAATGATGCCCCTTGGCCATCACCCAGTCCCTGCCCCATCTCTTGTAATCTCTCTCCTTTTTGCTGCATCCCTGTCTTCCTCTGTCTTGATTTACTTGTTGTTGGTTTTCTGTTTCTTTGTTTGATTTGGTGGAAGACATAATCCCACGCTTCCTATGGAAAGGTTGTTGGGAGATTTTTAATGATTCCTCAATGTTAAAATGTCTATTTTTGTCTTGACACCCAACTAATATTTGTCTGAGCAAAACAGTCTAGATGAGAGAGAACTTCCCTGGAGGTCTGATGGCGTTTCTCCCTCGTCTTCTTA", + "CACTAGTGGCTCATTGTAAATGTGTGGTTTAACTCGTCCATGGCCCAGCATTAGGGAGCTGTGGACCCTGCAGCCTGGCTGTGGGGGCCGCAGTGGCTGAGGGGTGCAGAGCCGAGTCACGGGGTTGCCAGCACAGGGGCTTAACCTCTGGTGACTGCCAGAGCTGCTGGCAAGCTAGAGTCCCATTTGGAGCCCCTCTAAGCCGTTCTATTTGTAATGAAAACTATATTTATGCTATTCAGTTCTAAATATAGAAATTGAAACAGCTGTGTTTAGTGCCTTTGTTCAACCCCCTTGCAACAACCTTGAGAACCCCAGGGAATTTGTCAATGTCAGGGAAGGAGCATTTTGTCAGTTACCAAATGTGTTTATTACCAGAGGGATGGAGGGAAGAGGGACGCTGAAGAACTTTGATGCCCTCTTCTTCCAAAGATGAAACGCGTAACTGCGCTCTCATTCACTCCAGCTCCCTGTCACCCAATGGACCTGTGATATCTGGATTCTGGGAAATTCTTCATCCTGGACCCTGAGAGATTCTGCAGCCCAGCTCCAGATTGCTTGTGGTCTGACAGGCTGCAACTGTGAGCCATCACAATGAACAACAGGAAGAAAAGGTCTTTCAAAAGGTGATGTGTGTTCTCATCAACCTCATACACACACATGGTTTAGGGGTATAATACCTCTACATGGCTGATTATGAAAACAATGTTCCCCAGATACCATCCCTGTCTTACTTCCAGCTCCCCAGAGGGAAAGCTTTCAACGCTTCTAGCCATTTCTTTTGGCATTTGCCTTCAGACCCTACACGAATGCGTCTCTACCACAGGGGGCTGCGCGGTTTCCCATCATGAAGCACTGAACTTCCACGTCTCATCTAGGGGAACAGGGAGGTGCACTAATGCGCTCCACGCCCAAGCCCTTCTCACAGTTTCTGCCCCCAGCATGGTTGTACTGGGCAATACATGAGATTATTAGGAAATGCTTTACTGTCATAACTATGAAGAGACTATTGCCAGATGAACCACACATTAATACTATGTTTCTTATCTGCACATTACTACCCTGCAATTAATATAATTGTGTCCATGTACACACGCTGTCCTATGTACTTATCATGACTCTATCCCAAATTCCCAATTACGTCCTATCTTCTTCTTAGGGAAGAACAGCTTAGGTATCAATTTGGTGTTCTGTGTAAAGTCTCAGGGAGCCGTCCGTGTCCTCCCATCTGGCCTCGTCCACACTGGTTCTCTTGAAAGCTTGGGCTGTAATGATGCCCCTTGGCCATCACCCAGTCCCTGCCCCATCTCTTGTAATCTCTCTCCTTTTTGCTGCATCCCTGTCTTCCTCTGTCTTGATTTACTTGTTGTTGGTTTTCTGTTTCTTTGTTTGATTTGGTGGAAGACATAATCCCACGCTTCCTATGGAAAGGTTGTTGGGAGATTTTTAATGATTCCTCAATGTTAAAATGTCTATTTTTGTCTTGACACCCAACTAATATTTGTCTGAGCAAAACAGTCTAGATGAGAGAGAACTTCCCTGGAGGTCTGATGGCGTTTCTCCCTCGTCTTCTTA", # noqa 'chr2': - "TTCAAATGAACTTCTGTAATTGAAAAATTCATTTAAGAAATTACAAAATATAGTTGAAAGCTCTAACAATAGACTAAACCAAGCAGAAGAAAGAGGTTCAGAACTTGAAGACAAGTCTCTTATGAATTAACCCAGTCAGACAAAAATAAAGAAAAAAATTTTAAAAATGAACAGAGCTTTCAAGAAGTATGAGATTATGTAAAGTAACTGAACCTATGAGTCACAGGTATTCCTGAGGAAAAAGAAAAAGTGAGAAGTTTGGAAAAACTATTTGAGGAAGTAATTGGGGAAAACCTCTTTAGTCTTGCTAGAGATTTAGACATCTAAATGAAAGAGGCTCAAAGAATGCCAGGAAGATACATTGCAAGACAGACTTCATCAAGATATGTAGTCATCAGACTATCTAAAGTCAACATGAAGGAAAAAAATTCTAAAATCAGCAAGAGAAAAGCATACAGTCATCTATAAAGGAAATCCCATCAGAATAACAATGGGCTTCTCAGCAGAAACCTTACAAGCCAGAAGAGATTGGATCTAATTTTTGGACTTCTTAAAGAAAAAAAAACCTGTCAAACACGAATGTTATGCCCTGCTAAACTAAGCATCATAAATGAAGGGGAAATAAAGTCAAGTCTTTCCTGACAAGCAAATGCTAAGATAATTCATCATCACTAAACCAGTCCTATAAGAAATGCTCAAAAGAATTGTAAAAGTCAAAATTAAAGTTCAATACTCACCATCATAAATACACACAAAAGTACAAAACTCACAGGTTTTATAAAACAATTGAGACTACAGAGCAACTAGGTAAAAAATTAACATTACAACAGGAACAAAACCTCATATATCAATATTAACTTTGAATAAAAAGGGATTAAATTCCCCCACTTAAGAGATATAGATTGGCAGAACAGATTTAAAAACATGAACTAACTATATGCTGTTTACAAGAAACTCATTAATAAAGACATGAGTTCAGGTAAAGGGGTGGAAAAAGATGTTCTACGCAAACAGAAACCAAATGAGAGAAGGAGTAGCTATACTTATATCAGATAAAGCACACTTTAAATCAACAACAGTAAAATAAAACAAAGGAGGTCATCATACAATGATAAAAAGATCAATTCAGCAAGAAGATATAACCATCCTACTAAATACATATGCACCTAACACAAGACTACCCAGATTCATAAAACAAATACTACTAGACCTAAGAGGGATGAGAAATTACCTAATTGGTACAATGTACAATATTCTGATGATGGTTACACTAAAAGCCCATACTTTACTGCTACTCAATATATCCATGTAACAAATCTGCGCTTGTACTTCTAAATCTATAAAAAAATTAAAATTTAACAAAAGTAAATAAAACACATAGCTAAAACTAAAAAAGCAAAAACAAAAACTATGCTAAGTATTGGTAAAGATGTGGGGAAAAAAGTAAACTCTCAAATATTGCTAGTGGGAGTATAAATTGTTTTCCACTTTGGAAAACAATTTGGTAATTTCGTTTTTTTTTTTTTCTTTTCTCTTTTTTTTTTTTTTTTTTTTGCATGCCAGAAAAAAATATTTACAGTAACT", + "TTCAAATGAACTTCTGTAATTGAAAAATTCATTTAAGAAATTACAAAATATAGTTGAAAGCTCTAACAATAGACTAAACCAAGCAGAAGAAAGAGGTTCAGAACTTGAAGACAAGTCTCTTATGAATTAACCCAGTCAGACAAAAATAAAGAAAAAAATTTTAAAAATGAACAGAGCTTTCAAGAAGTATGAGATTATGTAAAGTAACTGAACCTATGAGTCACAGGTATTCCTGAGGAAAAAGAAAAAGTGAGAAGTTTGGAAAAACTATTTGAGGAAGTAATTGGGGAAAACCTCTTTAGTCTTGCTAGAGATTTAGACATCTAAATGAAAGAGGCTCAAAGAATGCCAGGAAGATACATTGCAAGACAGACTTCATCAAGATATGTAGTCATCAGACTATCTAAAGTCAACATGAAGGAAAAAAATTCTAAAATCAGCAAGAGAAAAGCATACAGTCATCTATAAAGGAAATCCCATCAGAATAACAATGGGCTTCTCAGCAGAAACCTTACAAGCCAGAAGAGATTGGATCTAATTTTTGGACTTCTTAAAGAAAAAAAAACCTGTCAAACACGAATGTTATGCCCTGCTAAACTAAGCATCATAAATGAAGGGGAAATAAAGTCAAGTCTTTCCTGACAAGCAAATGCTAAGATAATTCATCATCACTAAACCAGTCCTATAAGAAATGCTCAAAAGAATTGTAAAAGTCAAAATTAAAGTTCAATACTCACCATCATAAATACACACAAAAGTACAAAACTCACAGGTTTTATAAAACAATTGAGACTACAGAGCAACTAGGTAAAAAATTAACATTACAACAGGAACAAAACCTCATATATCAATATTAACTTTGAATAAAAAGGGATTAAATTCCCCCACTTAAGAGATATAGATTGGCAGAACAGATTTAAAAACATGAACTAACTATATGCTGTTTACAAGAAACTCATTAATAAAGACATGAGTTCAGGTAAAGGGGTGGAAAAAGATGTTCTACGCAAACAGAAACCAAATGAGAGAAGGAGTAGCTATACTTATATCAGATAAAGCACACTTTAAATCAACAACAGTAAAATAAAACAAAGGAGGTCATCATACAATGATAAAAAGATCAATTCAGCAAGAAGATATAACCATCCTACTAAATACATATGCACCTAACACAAGACTACCCAGATTCATAAAACAAATACTACTAGACCTAAGAGGGATGAGAAATTACCTAATTGGTACAATGTACAATATTCTGATGATGGTTACACTAAAAGCCCATACTTTACTGCTACTCAATATATCCATGTAACAAATCTGCGCTTGTACTTCTAAATCTATAAAAAAATTAAAATTTAACAAAAGTAAATAAAACACATAGCTAAAACTAAAAAAGCAAAAACAAAAACTATGCTAAGTATTGGTAAAGATGTGGGGAAAAAAGTAAACTCTCAAATATTGCTAGTGGGAGTATAAATTGTTTTCCACTTTGGAAAACAATTTGGTAATTTCGTTTTTTTTTTTTTCTTTTCTCTTTTTTTTTTTTTTTTTTTTGCATGCCAGAAAAAAATATTTACAGTAACT", # noqa } def setUp(self): @@ -98,8 +98,9 @@ def testOpenWithOtherIndex(self): self.assertFalse(os.path.exists(tmpfilename + ".fai")) os.unlink(tmpfilename) + class TestFastaFilePathIndexCompressed(TestFastaFilePathIndex): - + filename = os.path.join(BAM_DATADIR, "ex1.fa.gz") @@ -219,7 +220,8 @@ class TestRemoteFileFTP(unittest.TestCase): '''test remote access. ''' - url = "ftp://ftp-trace.ncbi.nih.gov/1000genomes/ftp/technical/reference/GRCh38_reference_genome/GRCh38_full_analysis_set_plus_decoy_hla.fa" + url = ("ftp://ftp-trace.ncbi.nih.gov/1000genomes/ftp/technical/reference/" + "GRCh38_reference_genome/GRCh38_full_analysis_set_plus_decoy_hla.fa") def testFTPView(self): if not checkURL(self.url): @@ -242,22 +244,24 @@ def test_sequence_lengths_are_available(self): self.assertEqual(f.get_reference_length("chr1"), 248956422) + class TestFastqRecord(unittest.TestCase): filetype = pysam.FastxFile filename = "faidx_ex1.fq" - + def setUp(self): with self.filetype(os.path.join(BAM_DATADIR, self.filename), persist=True) as inf: self.record = next(inf) - + def test_fastx_record_sequence_can_be_modified(self): old_sequence = self.record.sequence new_record = copy.copy(self.record) new_sequence = "AAAC" new_record.set_sequence(new_sequence) - self.assertEqual(str(new_record), ">{}\n{}".format(self.record.name, new_sequence)) + self.assertEqual(str(new_record), ">{}\n{}".format( + self.record.name, new_sequence)) self.assertEqual(self.record.sequence, old_sequence) self.assertEqual(new_record.sequence, new_sequence) @@ -273,7 +277,7 @@ def test_fastx_record_fail_if_name_is_None(self): self.assertRaises(ValueError, self.record.set_name, None) - + def test_fastx_record_comment_can_be_modified(self): old_comment = self.record.comment new_comment = "this is a new comment" @@ -289,7 +293,7 @@ def test_fastx_record_comment_can_be_None(self): new_record.set_comment(new_comment) self.assertEqual(new_record.comment, new_comment) self.assertEqual(self.record.comment, old_comment) - + def test_fastx_record_quality_can_be_modified(self): old_quality = self.record.quality new_quality = "A" * len(old_quality) @@ -314,7 +318,7 @@ def test_fastx_record_can_be_created_from_scratch(self): fastx_record) fastx_record.set_sequence("sequence") self.assertEqual(str(fastx_record), ">name\nsequence") - - + + if __name__ == "__main__": unittest.main() diff --git a/tests/linking_test.py b/tests/linking_test.py index 25b9b04c..15fd91a8 100644 --- a/tests/linking_test.py +++ b/tests/linking_test.py @@ -15,7 +15,8 @@ def check_import(statement): statement, stderr=subprocess.STDOUT, shell=True) except subprocess.CalledProcessError as exc: if b"ImportError" in exc.output: - raise ImportError("module could not be imported: {}".format(str(exc.output))) + raise ImportError( + "module could not be imported: {}".format(str(exc.output))) else: raise @@ -40,11 +41,12 @@ class TestLinking(unittest.TestCase): def setUp(self): self.workdir = os.path.join(LINKDIR, self.package_name) - + def test_package_can_be_installed(self): subprocess.check_output( - "cd {} && rm -rf build && python setup.py install".format(self.workdir), - shell=True) + "cd {} && rm -rf build && python setup.py install".format( + self.workdir), + shell=True) @unittest.skipUnless( @@ -53,7 +55,7 @@ def test_package_can_be_installed(self): class TestLinkWithRpath(TestLinking): package_name = "link_with_rpath" - + def test_package_tests_pass(self): self.assertTrue(check_pass( "cd {} && python test_module.py".format(os.path.join(self.workdir, "tests")))) @@ -76,14 +78,15 @@ def test_package_tests_fail_on_import(self): def test_package_tests_pass_if_ld_library_path_set(self): pysam_libraries = pysam.get_libraries() - pysam_libdirs, pysam_libs = zip(*[os.path.split(x) for x in pysam_libraries]) + pysam_libdirs, pysam_libs = zip( + *[os.path.split(x) for x in pysam_libraries]) pysam_libdir = pysam_libdirs[0] self.assertTrue(check_pass( "export LD_LIBRARY_PATH={}:$PATH && cd {} && python test_module.py".format( pysam_libdir, os.path.join(self.workdir, "tests")))) - + if __name__ == "__main__": unittest.main() diff --git a/tests/samtools_test.py b/tests/samtools_test.py index a926f5c6..e12270f5 100644 --- a/tests/samtools_test.py +++ b/tests/samtools_test.py @@ -123,6 +123,7 @@ class SamtoolsTest(unittest.TestCase): def check_version(self): samtools_version = get_version(self.executable) + def _r(s): # patch - remove any of the alpha/beta suffixes, i.e., 0.1.12a -> # 0.1.12 @@ -184,7 +185,7 @@ def check_statement(self, statement): pysam_targets = [x % r_pysam for x in targets] pysam_method = getattr(self.module, command) - + # run samtools full_statement = re.sub("%\(out\)s", self.executable, statement) run_command(" ".join((self.executable, full_statement))) @@ -220,7 +221,8 @@ def check_statement(self, statement): for s, p in zip(samtools_files, pysam_files): binary_equal = checkBinaryEqual(s, p) - error_msg = "%s failed: files %s and %s are not the same" % (command, s, p) + error_msg = "%s failed: files %s and %s are not the same" % ( + command, s, p) if binary_equal: continue elif s.endswith(".bam"): @@ -232,7 +234,7 @@ def check_statement(self, statement): check_lines_equal( self, s, p, filter_f=lambda x: x.startswith("#"), - msg=error_msg) + msg=error_msg) def testStatements(self): for statement in self.statements: @@ -241,9 +243,9 @@ def testStatements(self): # bioconda samtools will be available. if command in ("bedcov", "stats", "dict", "bam2fq"): continue - - if (command == "calmd" and - list(sys.version_info[:2]) == [3, 3]): + + if (command == "calmd" and + list(sys.version_info[:2]) == [3, 3]): # skip calmd test, fails only on python 3.3.5 # in linux (empty output). Works in OsX and passes # for 3.4 and 3.5, see issue #293 @@ -256,7 +258,7 @@ def testUsage(self): if self.executable == "bcftools": # bcftools usage messages end with exit(1) return - + for statement in self.statements: command = self.get_command(statement, map_to_internal=False) # ignore commands that exit or cause other failures @@ -300,7 +302,8 @@ def testReturnValueString(self): self.assertTrue(isinstance(retval, basestring)) def testReturnValueData(self): - args = "-O BAM {}".format(os.path.join(BAM_DATADIR, "ex1.bam")).split(" ") + args = "-O BAM {}".format(os.path.join(BAM_DATADIR, + "ex1.bam")).split(" ") retval = pysam.view(*args) if IS_PYTHON3: @@ -310,7 +313,6 @@ def testReturnValueData(self): self.assertTrue(isinstance(retval, bytes)) self.assertTrue(isinstance(retval, basestring)) - class StdoutTest(unittest.TestCase): '''test if stdout can be redirected.''' @@ -344,9 +346,9 @@ def testSaveStdout(self): self.assertTrue(len(r) > 0) class PysamTest(SamtoolsTest): - """check access to samtools command in the pysam + """check access to samtools command in the pysam main package. - + This is for backwards capability. """ diff --git a/tests/tabix_bench.py b/tests/tabix_bench.py index 43733c99..ce7077de 100644 --- a/tests/tabix_bench.py +++ b/tests/tabix_bench.py @@ -62,12 +62,14 @@ def iterate_file_uncompressed(fn): def test_read_python_compressed(benchmark): - result = benchmark(read_python_compressed, os.path.join(TABIX_DATADIR, FN_COMPRESSED)) + result = benchmark(read_python_compressed, + os.path.join(TABIX_DATADIR, FN_COMPRESSED)) assert result == 164 def test_read_python_uncompressed(benchmark): - result = benchmark(read_python_uncompressed, os.path.join(TABIX_DATADIR, FN_UNCOMPRESSED)) + result = benchmark(read_python_uncompressed, + os.path.join(TABIX_DATADIR, FN_UNCOMPRESSED)) assert result == 164 @@ -77,85 +79,102 @@ def test_fetch_plain(benchmark): def test_fetch_parsed(benchmark): - result = benchmark(fetch_parsed, os.path.join(TABIX_DATADIR, FN_COMPRESSED)) + result = benchmark(fetch_parsed, os.path.join( + TABIX_DATADIR, FN_COMPRESSED)) assert result == 164 def test_iterate_generic_compressed(benchmark): - result = benchmark(iterate_generic_compressed, os.path.join(TABIX_DATADIR, FN_COMPRESSED)) + result = benchmark(iterate_generic_compressed, + os.path.join(TABIX_DATADIR, FN_COMPRESSED)) assert result == 164 def test_iterate_generic_uncompressed(benchmark): - result = benchmark(iterate_generic_uncompressed, os.path.join(TABIX_DATADIR, FN_UNCOMPRESSED)) + result = benchmark(iterate_generic_uncompressed, + os.path.join(TABIX_DATADIR, FN_UNCOMPRESSED)) assert result == 164 def test_iterate_parsed_compressed(benchmark): - result = benchmark(iterate_parsed_compressed, os.path.join(TABIX_DATADIR, FN_COMPRESSED)) + result = benchmark(iterate_parsed_compressed, + os.path.join(TABIX_DATADIR, FN_COMPRESSED)) assert result == 164 def test_iterate_parsed_uncompressed(benchmark): - result = benchmark(iterate_parsed_uncompressed, os.path.join(TABIX_DATADIR, FN_UNCOMPRESSED)) + result = benchmark(iterate_parsed_uncompressed, + os.path.join(TABIX_DATADIR, FN_UNCOMPRESSED)) assert result == 164 def test_iterate_file_compressed(benchmark): - result = benchmark(iterate_file_compressed, os.path.join(TABIX_DATADIR, FN_COMPRESSED)) + result = benchmark(iterate_file_compressed, + os.path.join(TABIX_DATADIR, FN_COMPRESSED)) assert result == 164 def test_iterate_file_uncompressed(benchmark): - result = benchmark(iterate_file_uncompressed, os.path.join(TABIX_DATADIR, FN_UNCOMPRESSED)) + result = benchmark(iterate_file_uncompressed, + os.path.join(TABIX_DATADIR, FN_UNCOMPRESSED)) assert result == 164 def test_read_python_large_compressed(benchmark): - result = benchmark(read_python_compressed, os.path.join(TABIX_DATADIR, FN_LARGE_COMPRESSED)) + result = benchmark(read_python_compressed, os.path.join( + TABIX_DATADIR, FN_LARGE_COMPRESSED)) assert result == 100000 def test_read_python_large_uncompressed(benchmark): - result = benchmark(read_python_uncompressed, os.path.join(TABIX_DATADIR, FN_LARGE_UNCOMPRESSED)) + result = benchmark(read_python_uncompressed, os.path.join( + TABIX_DATADIR, FN_LARGE_UNCOMPRESSED)) assert result == 100000 def test_fetch_plain(benchmark): - result = benchmark(fetch_plain, os.path.join(TABIX_DATADIR, FN_LARGE_COMPRESSED)) + result = benchmark(fetch_plain, os.path.join( + TABIX_DATADIR, FN_LARGE_COMPRESSED)) assert result == 100000 def test_fetch_parsed(benchmark): - result = benchmark(fetch_parsed, os.path.join(TABIX_DATADIR, FN_LARGE_COMPRESSED)) + result = benchmark(fetch_parsed, os.path.join( + TABIX_DATADIR, FN_LARGE_COMPRESSED)) assert result == 100000 def test_iterate_generic_large_compressed(benchmark): - result = benchmark(iterate_generic_compressed, os.path.join(TABIX_DATADIR, FN_LARGE_COMPRESSED)) + result = benchmark(iterate_generic_compressed, os.path.join( + TABIX_DATADIR, FN_LARGE_COMPRESSED)) assert result == 100000 def test_iterate_generic_large_uncompressed(benchmark): - result = benchmark(iterate_generic_uncompressed, os.path.join(TABIX_DATADIR, FN_LARGE_UNCOMPRESSED)) + result = benchmark(iterate_generic_uncompressed, os.path.join( + TABIX_DATADIR, FN_LARGE_UNCOMPRESSED)) assert result == 100000 def test_iterate_parsed_large_compressed(benchmark): - result = benchmark(iterate_parsed_compressed, os.path.join(TABIX_DATADIR, FN_LARGE_COMPRESSED)) + result = benchmark(iterate_parsed_compressed, os.path.join( + TABIX_DATADIR, FN_LARGE_COMPRESSED)) assert result == 100000 def test_iterate_parsed_large_uncompressed(benchmark): - result = benchmark(iterate_parsed_uncompressed, os.path.join(TABIX_DATADIR, FN_LARGE_UNCOMPRESSED)) + result = benchmark(iterate_parsed_uncompressed, os.path.join( + TABIX_DATADIR, FN_LARGE_UNCOMPRESSED)) assert result == 100000 def test_iterate_file_large_compressed(benchmark): - result = benchmark(iterate_file_compressed, os.path.join(TABIX_DATADIR, FN_LARGE_COMPRESSED)) + result = benchmark(iterate_file_compressed, os.path.join( + TABIX_DATADIR, FN_LARGE_COMPRESSED)) assert result == 100000 def test_iterate_file_large_uncompressed(benchmark): - result = benchmark(iterate_file_uncompressed, os.path.join(TABIX_DATADIR, FN_LARGE_UNCOMPRESSED)) + result = benchmark(iterate_file_uncompressed, os.path.join( + TABIX_DATADIR, FN_LARGE_UNCOMPRESSED)) assert result == 100000 diff --git a/tests/tabix_test.py b/tests/tabix_test.py index 7341f1e2..9a02b134 100644 --- a/tests/tabix_test.py +++ b/tests/tabix_test.py @@ -53,7 +53,7 @@ def checkBinaryEqual(filename1, filename2): if len(d1) != len(d2): return False - + found = False for c1, c2 in zip(d1, d2): if c1 != c2: @@ -77,17 +77,18 @@ def test_indexing_with_preset_works(self): '''test indexing via preset.''' pysam.tabix_index(self.tmpfilename, preset="gff") - self.assertTrue(checkBinaryEqual(self.tmpfilename + ".tbi", self.filename_idx)) + self.assertTrue(checkBinaryEqual( + self.tmpfilename + ".tbi", self.filename_idx)) def test_indexing_to_custom_location_works(self): '''test indexing a file with a non-default location.''' index_path = get_temp_filename(suffix='custom.tbi') - pysam.tabix_index(self.tmpfilename, preset="gff", index=index_path, force=True) + pysam.tabix_index(self.tmpfilename, preset="gff", + index=index_path, force=True) self.assertTrue(checkBinaryEqual(index_path, self.filename_idx)) os.unlink(index_path) - def test_indexing_with_explict_columns_works(self): '''test indexing via preset.''' @@ -97,7 +98,8 @@ def test_indexing_with_explict_columns_works(self): end_col=4, line_skip=0, zerobased=False) - self.assertTrue(checkBinaryEqual(self.tmpfilename + ".tbi", self.filename_idx)) + self.assertTrue(checkBinaryEqual( + self.tmpfilename + ".tbi", self.filename_idx)) def test_indexing_with_lineskipping_works(self): '''test indexing via preset and lineskip.''' @@ -107,8 +109,9 @@ def test_indexing_with_lineskipping_works(self): end_col=4, line_skip=1, zerobased=False) - self.assertFalse(checkBinaryEqual(self.tmpfilename + ".tbi", self.filename_idx)) - + self.assertFalse(checkBinaryEqual( + self.tmpfilename + ".tbi", self.filename_idx)) + def tearDown(self): os.unlink(self.tmpfilename) if os.path.exists(self.tmpfilename + ".tbi"): @@ -123,7 +126,7 @@ class TestCompression(unittest.TestCase): def setUp(self): self.tmpfilename = get_temp_filename(suffix="gtf") with gzip.open(self.filename, "rb") as infile, \ - open(self.tmpfilename, "wb") as outfile: + open(self.tmpfilename, "wb") as outfile: outfile.write(infile.read()) def testCompression(self): @@ -207,19 +210,19 @@ def getSubset(self, contig=None, start=None, end=None): if start is not None and end is None: # until end of contig subset = [x[3] - for x in self.compare if x[0] == contig - and x[2] > start] + for x in self.compare if x[0] == contig and + x[2] > start] elif start is None and end is not None: # from start of contig subset = [x[3] - for x in self.compare if x[0] == contig - and x[1] <= end] + for x in self.compare if x[0] == contig and + x[1] <= end] elif start is None and end is None: subset = [x[3] for x in self.compare if x[0] == contig] else: # all within interval - subset = [x[3] for x in self.compare if x[0] == contig - and min(x[2], end) - max(x[1], start) > 0] + subset = [x[3] for x in self.compare if x[0] == contig and + min(x[2], end) - max(x[1], start) > 0] if self.with_comments: subset.extend(self.comments) @@ -402,7 +405,7 @@ class TestIterationWithComments(TestIterationWithoutComments): def setUp(self): TestIterationWithoutComments.setUp(self) - + class TestIterators(unittest.TestCase): filename = os.path.join(TABIX_DATADIR, "example.gtf.gz") @@ -416,7 +419,7 @@ def setUp(self): self.compare = load_and_convert(self.filename) self.tmpfilename_uncompressed = 'tmp_TestIterators' with gzip.open(self.filename, "rb") as infile, \ - open(self.tmpfilename_uncompressed, "wb") as outfile: + open(self.tmpfilename_uncompressed, "wb") as outfile: outfile.write(infile.read()) def tearDown(self): @@ -592,7 +595,8 @@ def testFromTabix(self): with pysam.TabixFile( self.tmpfilename + ".gz", encoding="ascii") as t: results = list(t.fetch(parser=pysam.asVCF())) - self.assertRaises(UnicodeDecodeError, getattr, results[1], "id") + self.assertRaises(UnicodeDecodeError, + getattr, results[1], "id") with pysam.TabixFile( self.tmpfilename + ".gz", encoding="utf-8") as t: @@ -626,7 +630,7 @@ def setUp(self): def tearDown(self): self.tabix.close() TestVCF.tearDown(self) - + def testRead(self): ncolumns = len(self.columns) @@ -749,7 +753,7 @@ def get_failure_stage(self): for x, msg in self.fail_on_parsing: if "{}.vcf".format(x) == fn: return "parsing" - + for x, msg in self.fail_on_samples: if "{}.vcf".format(x) == fn: return "samples" @@ -996,7 +1000,8 @@ def convert_field(f): v = smp.values() if 'GT' in smp: - alleles = [str(a) if a is not None else '.' for a in smp.allele_indices] + alleles = [ + str(a) if a is not None else '.' for a in smp.allele_indices] v[0] = '/|'[smp.phased].join(alleles) comp = ":".join(map(convert_field, v)) @@ -1045,7 +1050,7 @@ def setUp(self): self.remote_file = None else: self.remote_file = pysam.TabixFile(self.url, "r") - + self.local_file = pysam.TabixFile(self.local, "r") def tearDown(self): @@ -1092,7 +1097,7 @@ def testHeader(self): self.assertEqual(list(self.local_file.header), ["# comment at start"]) self.assertEqual(list(self.local_file.header), self.remote_file.header) - + class TestIndexArgument(unittest.TestCase): @@ -1189,8 +1194,10 @@ def testJoinedIterators(self): def testDisjointIterators(self): # two iterators working on the same file with pysam.TabixFile(self.filename) as tabix: - a = tabix.fetch(parser=pysam.asGTF(), multiple_iterators=True).next() - b = tabix.fetch(parser=pysam.asGTF(), multiple_iterators=True).next() + a = tabix.fetch(parser=pysam.asGTF(), + multiple_iterators=True).next() + b = tabix.fetch(parser=pysam.asGTF(), + multiple_iterators=True).next() # both iterators are at top of file self.assertEqual(str(a), str(b)) diff --git a/tests/tabixproxies_test.py b/tests/tabixproxies_test.py index 35ad8fc8..9c1a81ef 100644 --- a/tests/tabixproxies_test.py +++ b/tests/tabixproxies_test.py @@ -89,7 +89,7 @@ def testIteratorUncompressed(self): '''test iteration from uncompressed file.''' tmpfilename = 'tmp_testIteratorUncompressed' with gzip.open(self.filename, "rb") as infile, \ - open(tmpfilename, "wb") as outfile: + open(tmpfilename, "wb") as outfile: outfile.write(infile.read()) with open(tmpfilename) as infile: @@ -130,7 +130,7 @@ def build_attribute_string(self, d): # remove quotes around numeric values s = re.sub("\"(\d+)\"", r"\1", s) return s - + def testRead(self): for x, r in enumerate(self.tabix.fetch(parser=self.parser())): @@ -149,7 +149,7 @@ def test_setting_fields(self): r = self.tabix.fetch(parser=self.parser()).next() - r.contig = r.contig + "_test_contig" + r.contig = r.contig + "_test_contig" r.source = r.source + "_test_source" r.feature = r.feature + "_test_feature" r.start += 10 @@ -173,7 +173,7 @@ def test_setAttribute_makes_changes(self): sr = str(r) self.assertEqual(r.transcript_id, "abcd") self.assertTrue("transcript_id \"abcd\"" in sr) - + def test_added_attribute_is_output(self): r = self.tabix.fetch(parser=self.parser()).next() @@ -187,14 +187,14 @@ def test_added_attribute_is_output(self): self.assertTrue("new_text_attribute \"abc\"" in str(r).split("\t")[8]) def test_setting_start_is_one_based(self): - + r = self.tabix.fetch(parser=self.parser()).next() r.start = 1800 self.assertEqual(r.start, 1800) self.assertEqual(str(r).split("\t")[3], "1801") def test_setting_end_is_one_based(self): - + r = self.tabix.fetch(parser=self.parser()).next() r.end = 2100 self.assertEqual(r.end, 2100) @@ -322,7 +322,7 @@ def testRead(self): def test_setting_fields(self): for r in self.tabix.fetch(parser=self.parser()): - r.contig = r.contig + "_test_contig" + r.contig = r.contig + "_test_contig" r.source = "test_source" r.feature = "test_feature" r.start += 10 @@ -330,7 +330,7 @@ def test_setting_fields(self): r.score = 20 r.strand = "+" r.frame = 0 - r.ID="test" + r.ID = "test" sr = str(r) self.assertTrue("test_contig" in sr) self.assertTrue("test_source" in sr) @@ -357,6 +357,6 @@ def test_added_attribute_is_output(self): r.new_text_attribute = "abc" self.assertTrue("new_text_attribute=abc" in str(r).split("\t")[8]) - + if __name__ == "__main__": unittest.main() diff --git a/tests/test_samtools_python.py b/tests/test_samtools_python.py index f7a351ba..6088ce71 100644 --- a/tests/test_samtools_python.py +++ b/tests/test_samtools_python.py @@ -6,7 +6,7 @@ def test_idxstats_parse_split_lines(): bam_filename = os.path.join(BAM_DATADIR, "ex2.bam") # Test pysam 0.8.X style output, which returns a list of lines - lines = pysam.idxstats(bam_filename, split_lines=True) + lines = pysam.idxstats(bam_filename, split_lines=True) for line in lines: _seqname, _seqlen, nmapped, _nunmapped = line.split() @@ -37,7 +37,7 @@ def test_bedcov(): bam_filename = os.path.join(BAM_DATADIR, "ex1.bam") bed_filename = os.path.join(BAM_DATADIR, "ex1.bed") # Test pysam 0.9.X style output, which returns a string that needs to be split by \n - bedcov_string = pysam.bedcov(bed_filename, bam_filename, split_lines=False) + bedcov_string = pysam.bedcov(bed_filename, bam_filename, split_lines=False) lines = bedcov_string.splitlines() for line in lines: fields = line.split('\t')