Skip to content

Commit

Permalink
Fixed spacing and wrapping in utils.py, removed test for old walk_tog…
Browse files Browse the repository at this point in the history
…ether arg (eq function), fixed edge case in _AltRecord
  • Loading branch information
Ben Weisburd authored and James Casbon committed Feb 10, 2014
1 parent 616f310 commit 2de70ce
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 80 deletions.
4 changes: 1 addition & 3 deletions vcf/model.py
Expand Up @@ -465,9 +465,7 @@ def __str__(self):
raise NotImplementedError

def __eq__(self, other):
if not isinstance(other, self.__class__):
return False
return self.type == other.type
return self.type == getattr(other, 'type', None)


class _Substitution(_AltRecord):
Expand Down
72 changes: 37 additions & 35 deletions vcf/test/test_vcf.py
Expand Up @@ -20,7 +20,7 @@ class TestVcfSpecs(unittest.TestCase):

def test_vcf_4_0(self):
reader = vcf.Reader(fh('example-4.0.vcf'))
assert reader.metadata['fileformat'] == 'VCFv4.0'
self.assertEqual(reader.metadata['fileformat'], 'VCFv4.0')

# test we can walk the file at least
for r in reader:
Expand Down Expand Up @@ -81,21 +81,21 @@ def test_vcf_4_1_bnd(self):
for a in r.ALT:
print(a)
if r.ID == "bnd1":
assert len(r.ALT) == 1
assert r.ALT[0].type == "BND"
assert r.ALT[0].chr == "2"
assert r.ALT[0].pos == 3
assert r.ALT[0].orientation == False
assert r.ALT[0].remoteOrientation == True
assert r.ALT[0].connectingSequence == "T"
self.assertEqual(len(r.ALT), 1)
self.assertEqual(r.ALT[0].type, "BND")
self.assertEqual(r.ALT[0].chr, "2")
self.assertEqual(r.ALT[0].pos, 3)
self.assertEqual(r.ALT[0].orientation, False)
self.assertEqual(r.ALT[0].remoteOrientation, True)
self.assertEqual(r.ALT[0].connectingSequence, "T")
if r.ID == "bnd4":
assert len(r.ALT) == 1
assert r.ALT[0].type == "BND"
assert r.ALT[0].chr == "1"
assert r.ALT[0].pos == 2
assert r.ALT[0].orientation == True
assert r.ALT[0].remoteOrientation == False
assert r.ALT[0].connectingSequence == "G"
self.assertEqual(len(r.ALT), 1)
self.assertEqual(r.ALT[0].type, "BND")
self.assertEqual(r.ALT[0].chr, "1")
self.assertEqual(r.ALT[0].pos, 2)
self.assertEqual(r.ALT[0].orientation, True)
self.assertEqual(r.ALT[0].remoteOrientation, False)
self.assertEqual(r.ALT[0].connectingSequence, "G")
for c in r:
print(c)
assert c
Expand Down Expand Up @@ -165,7 +165,7 @@ def testParse(self):
n+=1
for x in r:
assert x
assert n == self.n_calls
self.assertEqual(n, self.n_calls)

class TestSamtoolsOutput(unittest.TestCase):

Expand Down Expand Up @@ -748,7 +748,7 @@ def test_info_multiple_values(self):
def test_pickle(self):
reader = vcf.Reader(fh('example-4.0.vcf'))
for var in reader:
assert cPickle.loads(cPickle.dumps(var)) == var
self.assertEqual(cPickle.loads(cPickle.dumps(var)), var)


class TestCall(unittest.TestCase):
Expand Down Expand Up @@ -836,7 +836,7 @@ def testFetchSite(self):
if not self.run:
return
site = self.reader.fetch('20', 14370)
assert site.POS == 14370
self.assertEqual(site.POS, 14370)

site = self.reader.fetch('20', 14369)
assert site is None
Expand Down Expand Up @@ -878,7 +878,7 @@ def testApplyFilter(self):
return
s, out = commands.getstatusoutput('python scripts/vcf_filter.py --site-quality 30 test/example-4.0.vcf sq')
#print(out)
assert s == 0
self.assertEqual(s, 0)
buf = StringIO()
buf.write(out)
buf.seek(0)
Expand All @@ -900,15 +900,15 @@ def testApplyFilter(self):
n += 1
else:
assert 'sq30' not in r.FILTER
assert n == 2
self.assertEqual(n, 2)


def testApplyMultipleFilters(self):
# FIXME: broken with distribute
return
s, out = commands.getstatusoutput('python scripts/vcf_filter.py --site-quality 30 '
'--genotype-quality 50 test/example-4.0.vcf sq mgq')
assert s == 0
self.assertEqual(s, 0)
#print(out)
buf = StringIO()
buf.write(out)
Expand Down Expand Up @@ -954,10 +954,11 @@ def test_walk(self):

n = 0
for x in utils.walk_together(reader1, reader2, reader3):
assert len(x) == 3
assert (x[0] == x[1]) and (x[1] == x[2])
self.assertEqual(len(x), 3)
self.assertEqual(x[0], x[1])
self.assertEqual(x[1], x[2])
n+= 1
assert n == 5
self.assertEqual(n, 5)

# artificial case 2 from the left, 2 from the right, 2 together, 1 from the right, 1 from the left

Expand Down Expand Up @@ -1014,17 +1015,18 @@ def custom_eq(rec1, rec2):

ndist_cust, nover_cust = 0, 0
for x in utils.walk_together(reader1, reader2, eq_func=custom_eq):
assert len(x) == 2
self.assertEqual(len(x), 2)
# avoid assert() when one record is None
if x[0] is not None and x[1] is not None:
assert (custom_eq(x[0], x[1]) and custom_eq(x[1], x[0]))
nover_cust += 1
ndist_cust += 1
assert nover_cust == 4
assert ndist_cust == 5

# final check just to be absolutely sure
assert ndist_def != ndist_cust
assert nover_def != nover_cust
ncomps += 1
# still increment counter to ensure iteration is finished for all
# records
nrecs += 1
# check number of records total
self.assertEqual(nrecs, 5)
# check how many records found in all files
self.assertEqual(ncomps, 4)

def test_trim(self):
tests = [('TAA GAA', 'T G'),
Expand All @@ -1045,8 +1047,8 @@ def test_meta(self):
# expect no exceptions raised
reader = vcf.Reader(fh('gatk_26_meta.vcf'))
assert 'GATKCommandLine' in reader.metadata
assert reader.metadata['GATKCommandLine'][0]['CommandLineOptions'] == '"analysis_type=LeftAlignAndTrimVariants"'
assert reader.metadata['GATKCommandLine'][1]['CommandLineOptions'] == '"analysis_type=VariantAnnotator annotation=[HomopolymerRun, VariantType, TandemRepeatAnnotator]"'
self.assertEqual(reader.metadata['GATKCommandLine'][0]['CommandLineOptions'], '"analysis_type=LeftAlignAndTrimVariants"')
self.assertEqual(reader.metadata['GATKCommandLine'][1]['CommandLineOptions'], '"analysis_type=VariantAnnotator annotation=[HomopolymerRun, VariantType, TandemRepeatAnnotator]"')


suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TestVcfSpecs))
Expand Down
93 changes: 51 additions & 42 deletions vcf/utils.py
Expand Up @@ -2,50 +2,59 @@
Utilities for VCF files.
"""

def walk_together(*readers, **kwargs):
""" Simultaneously iteratate two or more VCF readers and return
lists of concurrent records from each
reader, with None if no record present. Caller must check the
inputs are sorted in the same way and use the same reference
otherwise behaviour is undefined.
Args:
vcf_record_sort_key: function that takes a VCF record and returns a tuple that can be used as the key for comparing and sorting VCF records across all given VCFReaders. The tuple's 1st element should be the contig name.
"""
if 'vcf_record_sort_key' in kwargs:
get_key = kwargs['vcf_record_sort_key']
else:
get_key = lambda r: (r.CHROM, r.POS)

nexts = []
for reader in readers:
try:
nexts.append(reader.next())
except StopIteration:
nexts.append(None)
def walk_together(*readers, **kwargs):
"""
Simultaneously iteratate over two or more VCF readers. For each
genomic position with a variant, return a list of size equal to the number
of VCF readers. This list contains the VCF record from readers that have
this variant, and None for readers that don't have it.
The caller must make sure that inputs are sorted in the same way and use the
same reference otherwise behaviour is undefined.
Args:
vcf_record_sort_key: function that takes a VCF record and returns a
tuple that can be used as a key for comparing and sorting VCF
records across all readers. This tuple defines what it means for two
variants to be equal (eg. whether it's only their position or also
their allele values), and implicitly determines the chromosome
ordering since the tuple's 1st element is typically the chromosome
name (or calculated from it).
"""
if 'vcf_record_sort_key' in kwargs:
get_key = kwargs['vcf_record_sort_key']
else:
get_key = lambda r: (r.CHROM, r.POS) #, r.REF, r.ALT)

nexts = []
for reader in readers:
try:
nexts.append(reader.next())
except StopIteration:
nexts.append(None)

min_k = (None,) # keep track of the previous min key's contig
while True:
next_idx_to_k = dict(
(i, get_key(r)) for i, r in enumerate(nexts) if r is not None)
keys_with_prev_contig = [
k for k in next_idx_to_k.values() if k[0] == min_k[0]]

if any(keys_with_prev_contig):
min_k = min(keys_with_prev_contig) # finish previous contig
else:
min_k = min(next_idx_to_k.values()) # move on to next contig

min_k = (None,) # keep track of the previous min key's contig
while True:
kdict = {i: get_key(x) for i,x in enumerate(nexts) if x is not None}
keys_with_prev_contig = [k for k in kdict.values() if k[0] == min_k[0]]
if any(keys_with_prev_contig):
# finish all records from previous contig
min_k = min(keys_with_prev_contig)
else:
# move on to the next contig
min_k = min(kdict.values())

min_k_idxs = set([i for i, k in kdict.items() if k == min_k])
yield [nexts[i] if i in min_k_idxs else None for i in range(len(nexts))]
min_k_idxs = set([i for i, k in next_idx_to_k.items() if k == min_k])
yield [nexts[i] if i in min_k_idxs else None for i in range(len(nexts))]

for i in min_k_idxs:
try:
nexts[i] = readers[i].next()
except StopIteration:
nexts[i] = None
if all([x is None for x in nexts]):
break
for i in min_k_idxs:
try:
nexts[i] = readers[i].next()
except StopIteration:
nexts[i] = None
if all([r is None for r in nexts]):
break


def trim_common_suffix(*sequences):
Expand Down

0 comments on commit 2de70ce

Please sign in to comment.