Skip to content

Commit

Permalink
Added tests for walk_together with more complex inputs
Browse files Browse the repository at this point in the history
  • Loading branch information
datagram authored and James Casbon committed Feb 10, 2014
1 parent d51db23 commit 28dfe37
Showing 1 changed file with 14 additions and 51 deletions.
65 changes: 14 additions & 51 deletions vcf/test/test_vcf.py
Expand Up @@ -961,13 +961,11 @@ def test_walk(self):
self.assertEqual(n, 5)

# artificial case 2 from the left, 2 from the right, 2 together, 1 from the right, 1 from the left

expected = 'llrrttrl'
reader1 = vcf.Reader(fh('walk_left.vcf'))
reader2 = vcf.Reader(fh('example-4.0.vcf'))

for ex, recs in zip(expected, utils.walk_together(reader1, reader2)):

if ex == 'l':
assert recs[0] is not None
assert recs[1] is None
Expand All @@ -978,55 +976,20 @@ def test_walk(self):
assert recs[0] is not None
assert recs[1] is not None

# case with working custom equality function

# without custom function, most records in these files
# are different since the default equality checks
# for ALT values

reader1 = vcf.Reader(fh('example-4.0.vcf'))
reader2 = vcf.Reader(fh('walk_refcall.vcf'))

# counters for distinct records and overlapping records
ndist_def, nover_def = 0, 0
for x in utils.walk_together(reader1, reader2):
assert len(x) == 2
if x[0] is not None and x[1] is not None:
assert (x[0] == x[1] and x[1] == x[0])
nover_def += 1
ndist_def += 1
# check how many overlapping records
assert nover_def == 1
# check how many distinct records
assert ndist_def == 8

# with custom function that does not check ALT,
# we see more overlaps and less distinct records

def custom_eq(rec1, rec2):
# check for equality only on CHROM, POS, and REF
if rec1 is None or rec2 is None:
return False
return rec1.CHROM == rec2.CHROM and rec1.POS == rec2.POS and \
rec1.REF == rec2.REF

reader1 = vcf.Reader(fh('example-4.0.vcf'))
reader2 = vcf.Reader(fh('walk_refcall.vcf'))

ndist_cust, nover_cust = 0, 0
for x in utils.walk_together(reader1, reader2, eq_func=custom_eq):
self.assertEqual(len(x), 2)
# avoid assert() when one record is None
if x[0] is not None and x[1] is not None:
assert (custom_eq(x[0], x[1]) and custom_eq(x[1], x[0]))
ncomps += 1
# still increment counter to ensure iteration is finished for all
# records
nrecs += 1
# check number of records total
self.assertEqual(nrecs, 5)
# check how many records found in all files
self.assertEqual(ncomps, 4)
# test files with many chromosomes, set 'vcf_record_sort_key' to define chromosome order
chr_order = map(str, range(1, 30)) + ['X', 'Y', 'M']
get_key = lambda r: (chr_order.index(r.CHROM.replace('chr','')), r.POS)
reader1 = vcf.Reader(fh('issue-140-file1.vcf'))
reader2 = vcf.Reader(fh('issue-140-file2.vcf'))
reader3 = vcf.Reader(fh('issue-140-file3.vcf'))
expected = "66642577752767662466" # each char is an integer bit flag - like file permissions
for ex, recs in zip(expected, utils.walk_together(reader1, reader2, reader3, vcf_record_sort_key = get_key)):
ex = int(ex)
for i, flag in enumerate([0x4, 0x2, 0x1]):
if ex & flag:
self.assertNotEqual(recs[i], None)
else:
self.assertEqual(recs[i], None)

def test_trim(self):
tests = [('TAA GAA', 'T G'),
Expand Down

0 comments on commit 28dfe37

Please sign in to comment.