diff --git a/python/tests/ibd.py b/python/tests/ibd.py new file mode 100644 index 0000000000..d23a4fefec --- /dev/null +++ b/python/tests/ibd.py @@ -0,0 +1,401 @@ +# MIT License +# +# Copyright (c) 2020 Tskit Developers +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +""" +Python implementation of the IBD-finding algorithms. +""" +import argparse + +import numpy as np + +import tskit + + +class Segment: + """ + A class representing a single segment. Each segment has a left and right, + denoting the loci over which it spans, a node and a next, giving the next + in the chain. + + The node it records is the *output* node ID. + """ + + def __init__(self, left=None, right=None, node=None, next_seg=None): + self.left = left + self.right = right + self.node = node + self.next = next_seg + + def __str__(self): + s = "({}-{}->{}:next={})".format( + self.left, self.right, self.node, repr(self.next) + ) + return s + + def __repr__(self): + return repr((self.left, self.right, self.node)) + + def __eq__(self, other): + # NOTE: to simplify tests, we DON'T check for equality of 'next'. + return ( + self.left == other.left + and self.right == other.right + and self.node == other.node + ) + + def __lt__(self, other): + return (self.node, self.left, self.right) < ( + other.node, + other.left, + other.right, + ) + + +class SegmentList: + """ + A class representing a list of segments that are descended from a given ancestral + node via a particular child of the ancestor. + Each SegmentList keeps track of the first and last segment in the list, head and + tail. The next attribute points to another SegmentList, allowing SegmentList + objects to be 'chained' to one another. + """ + + def __init__(self, head=None, tail=None, next_list=None): + self.head = head + self.tail = tail + self.next = next_list + + def __str__(self): + s = "head={},tail={},next={}".format(self.head, self.tail, repr(self.next)) + return s + + def __repr__(self): + if self.head is None: + s = "[{}]".format(repr(None)) + elif self.head == self.tail: + s = "[{}]".format(repr(self.head)) + elif self.head.next == self.tail: + s = "[{}, {}]".format(repr(self.head), repr(self.tail)) + else: + s = "[{}, ..., {}]".format(repr(self.head), repr(self.tail)) + return s + + def add(self, other): + """ + Use to append another SegmentList, or a single segment. + SegmentList1.add(SegmentList2) will modify SegmentList1 so that + SegmentList1.tail.next = SegmentList2.head + SegmentList1.add(Segment1) will add Segment1 to the tail of SegmentList1 + """ + assert isinstance(other, SegmentList) or isinstance(other, Segment) + + if isinstance(other, SegmentList): + if self.head is None: + self.head = other.head + self.tail = other.tail + else: + self.tail.next = other.head + self.tail = other.tail + elif isinstance(other, Segment): + if self.head is None: + self.head = other + self.tail = other + else: + self.tail.next = other + self.tail = other + + +class IbdFinder: + """ + Finds all IBD relationships between specified samples in a tree sequence. + """ + + def __init__(self, ts, samples=None, min_length=0, max_time=None): + + self.ts = ts + # Note: samples *must* be in order of ascending node ID + if samples is None: + self.samples = ts.samples() + else: + self.samples = samples + if len(self.samples) == 0: + raise ValueError("The tree sequence contains no samples.") + + self.sample_id_map = np.zeros(ts.num_nodes, dtype=int) - 1 + for index, u in enumerate(self.samples): + self.sample_id_map[u] = index + self.min_length = min_length + if max_time is None: + self.max_time = 2 * ts.max_root_time + else: + self.max_time = max_time + self.A = [None for _ in range(ts.num_nodes)] # Descendant segments + self.tables = self.ts.tables + + self.oldest_parent = self.get_oldest_parents() + + # Objects below are needed for the IBD segment-holding object. + self.num_samples = len(self.samples) + self.sample_pairs = self.get_sample_pairs() + + # Note: in the C code the object below should be a struct array. + # Each item will be accessed using its index, which corresponds to a particular + # sample pair. The mapping between index and sample pair is defined in the + # find_sample_pair_index method further down. + + self.ibd_segments = {} + for key in self.sample_pairs: + self.ibd_segments[key] = None + + def get_oldest_parents(self): + oldest_parents = [-1 for _ in range(self.ts.num_nodes)] + node_times = self.ts.tables.nodes.time + for e in self.ts.tables.edges: + c = e.child + if ( + oldest_parents[c] == -1 + or node_times[oldest_parents[c]] < node_times[e.parent] + ): + oldest_parents[c] = e.parent + return oldest_parents + + def add_ibd_segments(self, sample0, sample1, seg): + index = self.find_sample_pair_index(sample0, sample1) + + # Note: the code below is specific to the Python implementation, where the + # output is a dictionary indexed by sample pairs. + # In the C implementation, it'll be more like + # self.ibd_segments[index].add(seg) + + if self.ibd_segments[self.sample_pairs[index]] is None: + self.ibd_segments[self.sample_pairs[index]] = SegmentList( + head=seg, tail=seg + ) + else: + self.ibd_segments[self.sample_pairs[index]].add(seg) + + def get_sample_pairs(self): + """ + Returns a list of all pairs of samples. Replaces itertools.combinations. + Note: they must be sorted + """ + sample_pairs = [] + for ind, i in enumerate(self.samples): + for j in self.samples[(ind + 1) :]: + sample_pairs.append((i, j)) + + return sample_pairs + + def find_sample_pair_index(self, sample0, sample1): + """ + Note: this method isn't strictly necessary for the Python implementation + but is needed for the C implemention, where the output ibd_segments is a + struct array. + This calculates the position of the object corresponding to the inputted + sample pair in the struct array. + """ + + # Ensure samples are in order. + if sample0 == sample1: + raise ValueError("Samples in pair must have different node IDs.") + elif sample0 > sample1: + sample0, sample1 = sample1, sample0 + + i0 = self.sample_id_map[sample0] + i1 = self.sample_id_map[sample1] + + # Calculate the position of the sample pair in the vector. + index = ( + (self.num_samples) * (self.num_samples - 1) / 2 + - (self.num_samples - i0) * (self.num_samples - i0 - 1) / 2 + + i1 + - i0 + - 1 + ) + + return int(index) + + def find_ibd_segments(self): + """ + The wrapper for the procedure that calculates IBD segments. + """ + + # Set up an iterator over the edges in the tree sequence. + edges_iter = iter(self.ts.edges()) + e = next(edges_iter) + parent_should_be_added = True + node_times = self.tables.nodes.time + + # Iterate over the edges. + while e is not None: + + current_parent = e.parent + current_time = node_times[current_parent] + if current_time > self.max_time: + # Stop looking for IBD segments once the + # processed nodes are older than the max time. + break + + seg = Segment(e.left, e.right, e.child) + + # Create a SegmentList() holding all segments that descend from seg. + list_to_add = SegmentList() + u = seg.node + if self.sample_id_map[u] != tskit.NULL: + list_to_add.add(seg) + else: + if self.A[u] is not None: + s = self.A[u].head + while s is not None: + intvl = ( + max(seg.left, s.left), + min(seg.right, s.right), + ) + if intvl[1] - intvl[0] > 0: + list_to_add.add(Segment(intvl[0], intvl[1], s.node)) + s = s.next + + if list_to_add.head is not None: + self.calculate_ibd_segs(current_parent, list_to_add) + + # For parents that are also samples + if ( + self.sample_id_map[current_parent] != tskit.NULL + ) and parent_should_be_added: + singleton_seg = SegmentList() + singleton_seg.add(Segment(0, self.ts.sequence_length, current_parent)) + self.calculate_ibd_segs(current_parent, singleton_seg) + parent_should_be_added = False + + # Move to next edge. + e = next(edges_iter, None) + + # Remove any processed nodes that are no longer needed. + if e is not None and e.parent != current_parent: + for i, n in enumerate(self.oldest_parent): + if current_parent == n: + self.A[i] = None + + return self.ibd_segments + + def calculate_ibd_segs(self, current_parent, list_to_add): + """ + Write later. + """ + + if list_to_add.head is None: + return [] + + if self.A[current_parent] is None: + self.A[current_parent] = list_to_add + + else: + seg0 = self.A[current_parent].head + while seg0 is not None: + seg1 = list_to_add.head + while seg1 is not None: + left = max(seg0.left, seg1.left) + right = min(seg0.right, seg1.right) + if left >= right: + seg1 = seg1.next + continue + nodes = [seg0.node, seg1.node] + nodes.sort() + + # If there are any overlapping segments, record as a new + # IBD relationship. + if right - left > self.min_length: + self.add_ibd_segments( + nodes[0], nodes[1], Segment(left, right, current_parent), + ) + seg1 = seg1.next + seg0 = seg0.next + + # Add list_to_add to A[u]. + self.A[current_parent].add(list_to_add) + + +if __name__ == "__main__": + """ + A simple CLI for running IBDFinder on a command line from the `python` + subdirectory. Basic usage: + > python3 ./tests/ibd.py --infile test.trees + """ + + parser = argparse.ArgumentParser( + description="Command line interface for the IBDFinder." + ) + + parser.add_argument( + "--infile", + type=str, + dest="infile", + nargs=1, + metavar="IN_FILE", + help="The tree sequence to be analysed.", + ) + + parser.add_argument( + "--min-length", + type=float, + dest="min_length", + nargs=1, + metavar="MIN_LENGTH", + help="Only segments longer than this cutoff will be returned.", + ) + + parser.add_argument( + "--max-time", + type=float, + dest="max_time", + nargs=1, + metavar="MAX_TIME", + help="Only segments younger this time will be returned.", + ) + + parser.add_argument( + "--samples", + type=int, + dest="samples", + nargs=2, + metavar="SAMPLES", + help="If provided, only this pair's IBD info is returned.", + ) + + args = parser.parse_args() + ts = tskit.load(args.infile[0]) + if args.min_length is None: + min_length = 0 + else: + min_length = args.min_length[0] + if args.max_time is None: + max_time = None + else: + max_time = args.max_time[0] + + s = IbdFinder(ts, samples=ts.samples(), min_length=min_length, max_time=max_time) + all_segs = s.find_ibd_segments() + + if args.samples is None: + print(all_segs) + else: + samples = args.samples + print(all_segs[(samples[0], samples[1])]) diff --git a/python/tests/test_ibd.py b/python/tests/test_ibd.py new file mode 100644 index 0000000000..2093107b43 --- /dev/null +++ b/python/tests/test_ibd.py @@ -0,0 +1,763 @@ +""" +Tests of IBD finding algorithms. +""" +import io +import itertools +import random +import unittest + +import msprime + +import tests.ibd as ibd +import tests.test_wright_fisher as wf +import tskit + +# Functions for computing IBD 'naively'. + + +def get_ibd( + sample0, + sample1, + treeSequence, + min_length=0, + max_time=None, + path_ibd=True, + mrca_ibd=True, +): + """ + Returns all IBD segments for a given pair of nodes in a tree + using a naive algorithm. + Note: This function probably looks more complicated than it needs to be -- + This is because it also calculates other 'versions' of IBD (mrca_ibd=False, + path_ibd=False) that we have't implemented properly yet. + """ + + ibd_list = [] + ts, node_map = treeSequence.simplify( + samples=[sample0, sample1], keep_unary=True, map_nodes=True + ) + node_map = node_map.tolist() + + for n in ts.nodes(): + + if max_time is not None and n.time > max_time: + break + + node_id = n.id + interval_list = [] + if n.flags == 1: + continue + + prev_dict = None + for t in ts.trees(): + + if len(list(t.nodes(n.id))) == 1 or t.num_samples(n.id) < 2: + continue + if mrca_ibd and n.id != t.mrca(0, 1): + continue + + current_int = t.get_interval() + if len(interval_list) == 0: + interval_list.append(current_int) + else: + prev_int = interval_list[-1] + if not path_ibd and prev_int[1] == current_int[0]: + interval_list[-1] = (prev_int[0], current_int[1]) + elif prev_dict is not None and subtrees_are_equal( + t, prev_dict, node_id + ): + interval_list[-1] = (prev_int[0], current_int[1]) + else: + interval_list.append(current_int) + + prev_dict = t.get_parent_dict() + + for interval in interval_list: + if min_length == 0 or interval[1] - interval[0] > min_length: + orig_id = node_map.index(node_id) + ibd_list.append(ibd.Segment(interval[0], interval[1], orig_id)) + + return ibd_list + + +def get_ibd_all_pairs( + treeSequence, + samples=None, + min_length=0, + max_time=None, + path_ibd=True, + mrca_ibd=False, +): + + """ + Returns all IBD segments for all pairs of nodes in a tree sequence + using the naive algorithm above. + """ + + ibd_dict = {} + + if samples is None: + samples = treeSequence.samples().tolist() + + pairs = itertools.combinations(samples, 2) + for pair in pairs: + ibd_list = get_ibd( + pair[0], + pair[1], + treeSequence, + min_length=min_length, + max_time=max_time, + path_ibd=path_ibd, + mrca_ibd=mrca_ibd, + ) + if len(ibd_list) > 0: + ibd_dict[pair] = ibd_list + + return ibd_dict + + +def subtrees_are_equal(tree1, pdict0, root): + """ + Checks for equality of two subtrees beneath a given root node. + """ + pdict1 = tree1.get_parent_dict() + if root not in pdict0.values() or root not in pdict1.values(): + return False + leaves1 = set(tree1.leaves(root)) + for l in leaves1: + node = l + while node != root: + p1 = pdict1[node] + if p1 not in pdict0.values(): + return False + p0 = pdict0[node] + if p0 != p1: + return False + node = p1 + + return True + + +def verify_equal_ibd(treeSequence): + """ + Calculates IBD segments using both the 'naive' and sophisticated algorithms, + verifies that the same output is produced. + NB: May be good to expand this in the future so that many different combos + of IBD options are tested simultaneously (all the MRCA and path-IBD combos), + for example. + """ + ts = treeSequence + ibd0 = ibd.IbdFinder(ts, samples=ts.samples()) + ibd0 = ibd0.find_ibd_segments() + ibd1 = get_ibd_all_pairs(ts, path_ibd=True, mrca_ibd=True) + + # Convert each SegmentList object into a list of Segment objects. + ibd0_tolist = {} + for key, val in ibd0.items(): + if val is not None: + ibd0_tolist[key] = convert_segmentlist_to_list(val) + + # Check for equality. + for key0, val0 in ibd0_tolist.items(): + + assert key0 in ibd1.keys() + val1 = ibd1[key0] + val0.sort() + val1.sort() + + +def convert_segmentlist_to_list(seglist): + """ + Turns a SegmentList object into a list of Segment objects. + (This makes them easier to compare for testing purposes) + """ + outlist = [] + if seglist is None: + return outlist + else: + seg = seglist.head + outlist = [seg] + seg = seg.next + while seg is not None: + outlist.append(seg) + seg = seg.next + + return outlist + + +def convert_dict_of_segmentlists(dict0): + """ + Turns a dictionary of SegmentList objects into a dictionary of lists of + Segment objects. (makes them easier to compare in tests). + """ + dict_out = {} + for key, val in dict0.items(): + dict_out[key] = convert_segmentlist_to_list(val) + + return dict_out + + +def ibd_is_equal(dict1, dict2): + """ + Verifies that two dictionaries have the same keys, and that + the set of items corresponding to each key is identical. + Used to check identical IBD output. + NOTE: is there a better/neater way to do this??? + """ + if len(dict1) != len(dict2): + return False + for key1, val1 in dict1.items(): + if key1 not in dict2.keys(): + return False + val2 = dict2[key1] + if not segment_lists_are_equal(val1, val2): + return False + + return True + + +def segment_lists_are_equal(val1, val2): + """ + Returns True if the two lists hold the same set of segments, otherwise + returns False. + """ + + if len(val1) != len(val2): + return False + + val1.sort() + val2.sort() + + if val1 is None: # get rid of this later -- we don't any empty dict values! + if val2 is not None: + return False + elif val2 is None: + if val1 is not None: + return False + for i in range(len(val1)): + if val1[i] != val2[i]: + return False + + return True + + +class TestIbdSingleBinaryTree(unittest.TestCase): + + # + # 2 4 + # / \ + # 1 3 \ + # / \ \ + # 0 0 1 2 + nodes = io.StringIO( + """\ + id is_sample time + 0 1 0 + 1 1 0 + 2 1 0 + 3 0 1 + 4 0 2 + """ + ) + edges = io.StringIO( + """\ + left right parent child + 0 1 3 0,1 + 0 1 4 2,3 + """ + ) + ts = tskit.load_text(nodes=nodes, edges=edges, strict=False) + + # Basic test + def test_defaults(self): + ibd_f = ibd.IbdFinder(self.ts) + ibd_segs = ibd_f.find_ibd_segments() + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + true_segs = { + (0, 1): [ibd.Segment(0.0, 1.0, 3)], + (0, 2): [ibd.Segment(0.0, 1.0, 4)], + (1, 2): [ibd.Segment(0.0, 1.0, 4)], + } + assert ibd_is_equal(ibd_segs, true_segs) + + # Max time = 1.5 + def test_time(self): + ibd_f = ibd.IbdFinder(self.ts, max_time=1.5) + ibd_segs = ibd_f.find_ibd_segments() + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + true_segs = {(0, 1): [ibd.Segment(0.0, 1.0, 3)], (0, 2): [], (1, 2): []} + assert ibd_is_equal(ibd_segs, true_segs) + + # Min length = 2 + def test_length(self): + ibd_f = ibd.IbdFinder(self.ts, min_length=2) + ibd_segs = ibd_f.find_ibd_segments() + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + true_segs = {(0, 1): [], (0, 2): [], (1, 2): []} + assert ibd_is_equal(ibd_segs, true_segs) + + +class TestIbdTwoSamplesTwoTrees(unittest.TestCase): + + # 2 + # | 3 + # 1 2 | / \ + # / \ | / \ + # 0 0 1 | 0 1 + # |------------|----------| + # 0.0 0.4 1.0 + nodes = io.StringIO( + """\ + id is_sample time + 0 1 0 + 1 1 0 + 2 0 1 + 3 0 1.5 + """ + ) + edges = io.StringIO( + """\ + left right parent child + 0 0.4 2 0,1 + 0.4 1.0 3 0,1 + """ + ) + ts = tskit.load_text(nodes=nodes, edges=edges, strict=False) + + # Basic test + def test_basic(self): + ibd_f = ibd.IbdFinder(self.ts) + ibd_segs = ibd_f.find_ibd_segments() + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + true_segs = {(0, 1): [ibd.Segment(0.0, 0.4, 2), ibd.Segment(0.4, 1.0, 3)]} + assert ibd_is_equal(ibd_segs, true_segs) + + # Max time = 1.2 + def test_time(self): + ibd_f = ibd.IbdFinder(self.ts, max_time=1.2) + ibd_segs = ibd_f.find_ibd_segments() + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + true_segs = {(0, 1): [ibd.Segment(0.0, 0.4, 2)]} + assert ibd_is_equal(ibd_segs, true_segs) + + # Min length = 0.5 + def test_length(self): + ibd_f = ibd.IbdFinder(self.ts, min_length=0.5) + ibd_segs = ibd_f.find_ibd_segments() + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + true_segs = {(0, 1): [ibd.Segment(0.4, 1.0, 3)]} + assert ibd_is_equal(ibd_segs, true_segs) + + +class TestIbdUnrelatedSamples(unittest.TestCase): + + # + # 2 3 + # | | + # 0 1 + + nodes = io.StringIO( + """\ + id is_sample time + 0 1 0 + 1 1 0 + 2 0 1 + 3 0 1 + """ + ) + edges = io.StringIO( + """\ + left right parent child + 0 1 2 0 + 0 1 3 1 + """ + ) + ts = tskit.load_text(nodes=nodes, edges=edges, strict=False) + + def test_basic(self): + ibd_f = ibd.IbdFinder(self.ts) + ibd_segs = ibd_f.find_ibd_segments() + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + true_segs = {(0, 1): []} + assert ibd_is_equal(ibd_segs, true_segs) + + def test_time(self): + ibd_f = ibd.IbdFinder(self.ts, max_time=1.2) + ibd_segs = ibd_f.find_ibd_segments() + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + true_segs = {(0, 1): []} + assert ibd_is_equal(ibd_segs, true_segs) + + def test_length(self): + ibd_f = ibd.IbdFinder(self.ts, min_length=0.2) + ibd_segs = ibd_f.find_ibd_segments() + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + true_segs = {(0, 1): []} + assert ibd_is_equal(ibd_segs, true_segs) + + +class TestIbdNoSamples(unittest.TestCase): + def test_no_samples(self): + # + # 2 + # / \ + # / \ + # / \ + # (0) (1) + nodes = io.StringIO( + """\ + id is_sample time + 0 0 0 + 1 0 0 + 2 0 1 + 3 0 1 + """ + ) + edges = io.StringIO( + """\ + left right parent child + 0 1 2 0 + 0 1 3 1 + """ + ) + ts = tskit.load_text(nodes=nodes, edges=edges, strict=False) + with self.assertRaises(ValueError): + ibd.IbdFinder(ts) + + +class TestIbdSamplesAreDescendants(unittest.TestCase): + # + # 2 + # | + # 1 + # | + # 0 + nodes = io.StringIO( + """\ + id is_sample time + 0 1 0 + 1 1 1 + 2 0 2 + """ + ) + edges = io.StringIO( + """\ + left right parent child + 0 1 1 0 + 0 1 2 1 + """ + ) + ts = tskit.load_text(nodes=nodes, edges=edges, strict=False) + + def test_basic(self): + ts = self.ts + ibd_f = ibd.IbdFinder(ts) + ibd_segs = ibd_f.find_ibd_segments() + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + true_segs = {(0, 1): [ibd.Segment(0.0, 1.0, 1)]} + + assert ibd_is_equal(ibd_segs, true_segs) + + +class TestIbdDifferentPaths(unittest.TestCase): + # + # 4 | 4 | 4 + # / \ | / \ | / \ + # / \ | / 3 | / \ + # / \ | 2 \ | / \ + # / \ | / \ | / \ + # 0 1 | 0 1 | 0 1 + # | | + # 0.2 0.7 + + nodes = io.StringIO( + """\ + id is_sample time + 0 1 0 + 1 1 0 + 2 0 1 + 3 0 1.5 + 4 0 2.5 + """ + ) + edges = io.StringIO( + """\ + left right parent child + 0.2 0.7 2 0 + 0.2 0.7 3 1 + 0.0 0.2 4 0 + 0.0 0.2 4 1 + 0.7 1.0 4 0 + 0.7 1.0 4 1 + 0.2 0.7 4 2 + 0.2 0.7 4 3 + """ + ) + ts = tskit.load_text(nodes=nodes, edges=edges, strict=False) + + def test_defaults(self): + ts = self.ts + ibd_f = ibd.IbdFinder(ts) + ibd_segs = ibd_f.find_ibd_segments() + true_segs = { + (0, 1): [ + ibd.Segment(0.0, 0.2, 4), + ibd.Segment(0.7, 1.0, 4), + ibd.Segment(0.2, 0.7, 4), + ] + } + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + assert ibd_is_equal(ibd_segs, true_segs) + + def test_time(self): + ts = self.ts + ibd_f = ibd.IbdFinder(ts, max_time=1.8) + ibd_segs = ibd_f.find_ibd_segments() + true_segs = {(0, 1): []} + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + assert ibd_is_equal(ibd_segs, true_segs) + ibd_f = ibd.IbdFinder(ts, max_time=2.8) + ibd_segs = ibd_f.find_ibd_segments() + true_segs = { + (0, 1): [ + ibd.Segment(0.0, 0.2, 4), + ibd.Segment(0.7, 1.0, 4), + ibd.Segment(0.2, 0.7, 4), + ] + } + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + assert ibd_is_equal(ibd_segs, true_segs) + + def test_length(self): + ts = self.ts + ibd_f = ibd.IbdFinder(ts, min_length=0.4) + ibd_segs = ibd_f.find_ibd_segments() + true_segs = {(0, 1): [ibd.Segment(0.2, 0.7, 4)]} + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + assert ibd_is_equal(ibd_segs, true_segs) + + +class TestIbdPolytomies(unittest.TestCase): + # + # 5 | 5 + # / \ | / \ + # 4 \ | 4 \ + # /|\ \ | /|\ \ + # / | \ \ | / | \ \ + # / | \ \ | / | \ \ + # / | \ \ | / | \ \ + # 0 1 2 3 | 0 1 3 2 + # | + # 0.3 + + nodes = io.StringIO( + """\ + id is_sample time + 0 1 0 + 1 1 0 + 2 1 0 + 3 1 0 + 4 0 2.5 + 5 0 3.5 + """ + ) + edges = io.StringIO( + """\ + left right parent child + 0.0 1.0 4 0 + 0.0 1.0 4 1 + 0.0 0.3 4 2 + 0.3 1.0 4 3 + 0.3 1.0 5 2 + 0.0 0.3 5 3 + 0.0 1.0 5 4 + """ + ) + ts = tskit.load_text(nodes=nodes, edges=edges, strict=False) + + def test_defaults(self): + ts = self.ts + ibd_f = ibd.IbdFinder(ts) + ibd_segs = ibd_f.find_ibd_segments() + # print(ibd_segs[(0,1)]) + true_segs = { + (0, 1): [ibd.Segment(0, 1, 4)], + (0, 2): [ibd.Segment(0, 0.3, 4), ibd.Segment(0.3, 1, 5)], + (0, 3): [ibd.Segment(0, 0.3, 5), ibd.Segment(0.3, 1, 4)], + (1, 2): [ibd.Segment(0, 0.3, 4), ibd.Segment(0.3, 1, 5)], + (1, 3): [ibd.Segment(0, 0.3, 5), ibd.Segment(0.3, 1, 4)], + (2, 3): [ibd.Segment(0.3, 1, 5), ibd.Segment(0, 0.3, 5)], + } + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + # print(ibd_segs) + assert ibd_is_equal(ibd_segs, true_segs) + + def test_time(self): + ts = self.ts + ibd_f = ibd.IbdFinder(ts, max_time=3) + ibd_segs = ibd_f.find_ibd_segments() + true_segs = { + (0, 1): [ibd.Segment(0, 1, 4)], + (0, 2): [ibd.Segment(0, 0.3, 4)], + (0, 3): [ibd.Segment(0.3, 1, 4)], + (1, 2): [ibd.Segment(0, 0.3, 4)], + (1, 3): [ibd.Segment(0.3, 1, 4)], + (2, 3): [], + } + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + assert ibd_is_equal(ibd_segs, true_segs) + + def test_length(self): + ts = self.ts + ibd_f = ibd.IbdFinder(ts, min_length=0.5) + ibd_segs = ibd_f.find_ibd_segments() + true_segs = { + (0, 1): [ibd.Segment(0, 1, 4)], + (0, 2): [ibd.Segment(0.3, 1, 5)], + (0, 3): [ibd.Segment(0.3, 1, 4)], + (1, 2): [ibd.Segment(0.3, 1, 5)], + (1, 3): [ibd.Segment(0.3, 1, 4)], + (2, 3): [ibd.Segment(0.3, 1, 5)], + } + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + assert ibd_is_equal(ibd_segs, true_segs) + + +class TestIbdInternalSamples(unittest.TestCase): + # + # + # 3 + # / \ + # / 2 + # / \ + # 0 (1) + nodes = io.StringIO( + """\ + id is_sample time + 0 1 0 + 1 0 0 + 2 1 1 + 3 0 2 + """ + ) + edges = io.StringIO( + """\ + left right parent child + 0.0 1.0 2 1 + 0.0 1.0 3 0 + 0.0 1.0 3 2 + """ + ) + ts = tskit.load_text(nodes=nodes, edges=edges, strict=False) + + def test_defaults(self): + ts = self.ts + ibd_f = ibd.IbdFinder(ts) + ibd_segs = ibd_f.find_ibd_segments() + ibd_segs = convert_dict_of_segmentlists(ibd_segs) + true_segs = { + (0, 2): [ibd.Segment(0, 1, 3)], + } + assert ibd_is_equal(ibd_segs, true_segs) + + +class TestIbdRandomExamples(unittest.TestCase): + """ + Randomly generated test cases. + """ + + # Infinite sites, Hudson model. + def test_random_example1(self): + ts = msprime.simulate(sample_size=10, recombination_rate=0.5, random_seed=2) + verify_equal_ibd(ts) + + def test_random_example2(self): + ts = msprime.simulate(sample_size=10, recombination_rate=0.5, random_seed=23) + verify_equal_ibd(ts) + + def test_random_example3(self): + ts = msprime.simulate(sample_size=10, recombination_rate=0.5, random_seed=232) + verify_equal_ibd(ts) + + def test_random_example4(self): + ts = msprime.simulate(sample_size=10, recombination_rate=0.3, random_seed=726) + verify_equal_ibd(ts) + + # Finite sites + def sim_finite_sites(self, random_seed, dtwf=False): + seq_length = int(1e5) + positions = random.sample(range(1, seq_length), 98) + [0, seq_length] + positions.sort() + rates = [random.uniform(1e-9, 1e-7) for _ in range(100)] + r_map = msprime.RecombinationMap( + positions=positions, rates=rates, num_loci=seq_length + ) + if dtwf: + model = "dtwf" + else: + model = "hudson" + ts = msprime.simulate( + sample_size=10, + recombination_map=r_map, + Ne=10, + random_seed=random_seed, + model=model, + ) + return ts + + def test_finite_sites1(self): + ts = self.sim_finite_sites(9257) + verify_equal_ibd(ts) + + def test_finite_sites2(self): + ts = self.sim_finite_sites(835) + verify_equal_ibd(ts) + + def test_finite_sites3(self): + ts = self.sim_finite_sites(27278) + verify_equal_ibd(ts) + + def test_finite_sites4(self): + ts = self.sim_finite_sites(22446688) + verify_equal_ibd(ts) + + # DTWF + def test_dtwf1(self): + ts = self.sim_finite_sites(84, dtwf=True) + verify_equal_ibd(ts) + + def test_dtwf2(self): + ts = self.sim_finite_sites(17482, dtwf=True) + verify_equal_ibd(ts) + + def test_dtwf3(self): + ts = self.sim_finite_sites(846, dtwf=True) + verify_equal_ibd(ts) + + def test_dtwf4(self): + ts = self.sim_finite_sites(273, dtwf=True) + verify_equal_ibd(ts) + + def test_sim_wright_fisher_generations(self): + # Uses the bespoke DTWF forward-time simulator. + number_of_gens = 3 + tables = wf.wf_sim(4, number_of_gens, deep_history=False, seed=83) + tables.sort() + ts = tables.tree_sequence() + verify_equal_ibd(ts) + + def test_sim_wright_fisher_generations2(self): + # Uses the bespoke DTWF forward-time simulator. + number_of_gens = 10 + tables = wf.wf_sim(10, number_of_gens, deep_history=False, seed=837) + tables.sort() + ts = tables.tree_sequence() + verify_equal_ibd(ts) + + def test_sim_wright_fisher_generations3(self): + # Uses the bespoke DTWF forward-time simulator. + number_of_gens = 10 + tables = wf.wf_sim(10, number_of_gens, deep_history=False, seed=37) + tables.sort() + ts = tables.tree_sequence() + verify_equal_ibd(ts)