diff --git a/python/tests/test_ms.py b/python/tests/test_ms.py new file mode 100644 index 0000000000..91fb88a2c4 --- /dev/null +++ b/python/tests/test_ms.py @@ -0,0 +1,344 @@ +# MIT License +# +# Copyright (c) 2018-2020 Tskit Developers +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +""" +Test cases for ms output in tskit. All of these tests have separate versions +for the cases of single replicate and multiple replicates. This is because +msprime.simulate generates a tree_sequence object if the num_replicates argument +is not used but an iterator over tree_sequences if the num_replicates argument +is used. +""" +import collections +import itertools +import os +import tempfile +import unittest + +import msprime + +import tskit as ts + +length = 1e2 +mutation_rate = 1e-2 +num_replicates = 3 + + +def get_ms_file_quantity(ms_file, quantity): + quantities = {} + num_replicates = 0 + num_sites = [] + num_positions = [] + num_haplotypes = [] + genotypes = [] + positions = [] + gens = [] + for line in ms_file: + if len(line.split()) > 0: + if line.split()[0] == "//": + num_replicates = num_replicates + 1 + num_haplotypes.append(0) + if len(gens) > 0: + genotypes.append(gens) + gens = [] + if line.split()[0] == "segsites:": + num_sites.append(int(line.split()[1])) + if line.split()[0] == "positions:": + num_positions.append(len(line.split()) - 1) + positions.append(line[11:].rstrip()) + if ( + line[0:2] == "00" + or line[0:2] == "01" + or line[0:2] == "10" + or line[0:2] == "11" + ): + num_haplotypes[-1] = num_haplotypes[-1] + 1 + gens.append(line.rstrip()) + genotypes.append(gens) + quantities["num_replicates"] = num_replicates + quantities["num_sites"] = num_sites + quantities["num_positions"] = num_positions + quantities["num_haplotypes"] = num_haplotypes + quantities["genotypes"] = genotypes + quantities["positions"] = positions + + return quantities[quantity] + + +class TestNumReplicates(unittest.TestCase): + """ + Tests that the number of replicates written out is the same as + the number of replicates simulated + """ + + def verify_num_replicates(self, tree_seq, num_replicates): + if isinstance(tree_seq, collections.Iterable): + with tempfile.TemporaryDirectory() as temp_dir: + ms_file_path = os.path.join(temp_dir, "testing_ms_file.txt") + with open(ms_file_path, "w") as f: + ts.write_ms( + tree_seq, + f, + num_replicates=num_replicates, + ) + with open(ms_file_path) as handle: + num_replicates_file = get_ms_file_quantity(handle, "num_replicates") + else: + with tempfile.TemporaryDirectory() as temp_dir: + ms_file_path = os.path.join(temp_dir, "testing_ms_file.txt") + with open(ms_file_path, "w") as f: + ts.write_ms(tree_seq, f) + with open(ms_file_path) as handle: + num_replicates_file = get_ms_file_quantity(handle, "num_replicates") + self.assertEqual(num_replicates, num_replicates_file) + + def test_num_replicates(self): + tree_seq = msprime.simulate( + 25, length=length, mutation_rate=mutation_rate, random_seed=123 + ) + self.verify_num_replicates(tree_seq, 1) + + def test_num_replicates_multiple(self): + tree_seq = msprime.simulate( + 25, + length=length, + mutation_rate=mutation_rate, + random_seed=123, + num_replicates=num_replicates, + ) + self.verify_num_replicates(tree_seq, num_replicates) + + +class TestNumHaplotypes(unittest.TestCase): + """ + Tests that the number of haplotypes output are the same as the + number of individuals simulated. + """ + + def verify_num_haplotypes(self, tree_seq, tree_seq2, num_replicates): + if isinstance(tree_seq, collections.Iterable): + with tempfile.TemporaryDirectory() as temp_dir: + ms_file_path = os.path.join(temp_dir, "testing_ms_file.txt") + with open(ms_file_path, "w") as f: + ts.write_ms( + tree_seq, + f, + num_replicates=num_replicates, + ) + with open(ms_file_path) as handle: + num_haplotypes = get_ms_file_quantity(handle, "num_haplotypes") + j = 0 + for ts_indv in tree_seq2: + self.assertEqual(ts_indv.num_samples, num_haplotypes[j]) + j = j + 1 + else: + with tempfile.TemporaryDirectory() as temp_dir: + ms_file_path = os.path.join(temp_dir, "testing_ms_file.txt") + with open(ms_file_path, "w") as f: + ts.write_ms(tree_seq, f) + with open(ms_file_path) as handle: + num_haplotypes = get_ms_file_quantity(handle, "num_haplotypes") + self.assertEqual(tree_seq.num_samples, num_haplotypes[0]) + + def test_num_haplotypes(self): + tree_seq = msprime.simulate( + 25, length=length, mutation_rate=mutation_rate, random_seed=123 + ) + self.verify_num_haplotypes(tree_seq, tree_seq, 1) + + def test_num_haplotypes_replicates(self): + tree_seq = msprime.simulate( + 25, + length=length, + mutation_rate=mutation_rate, + random_seed=123, + num_replicates=num_replicates, + ) + tree_seq, tree_seq2 = itertools.tee(tree_seq) + self.verify_num_haplotypes(tree_seq, tree_seq2, num_replicates) + + +class TestNumSites(unittest.TestCase): + """ + Tests that the number of sites written out as well as the length + of the positions list match the number of variants in the tree sequence + """ + + def verify_num_sites(self, tree_seq, tree_seq2, num_replicates): + if isinstance(tree_seq, collections.Iterable): + with tempfile.TemporaryDirectory() as temp_dir: + ms_file_path = os.path.join(temp_dir, "testing_ms_file.txt") + with open(ms_file_path, "w") as f: + ts.write_ms( + tree_seq, + f, + num_replicates=num_replicates, + ) + with open(ms_file_path) as handle: + num_sites = get_ms_file_quantity(handle, "num_sites") + with open(ms_file_path) as handle: + num_positions = get_ms_file_quantity(handle, "num_positions") + j = 0 + for ts_indv in tree_seq2: + self.assertEqual(ts_indv.num_sites, num_sites[j]) + self.assertEqual(ts_indv.num_sites, num_positions[j]) + j = j + 1 + else: + with tempfile.TemporaryDirectory() as temp_dir: + ms_file_path = os.path.join(temp_dir, "testing_ms_file.txt") + with open(ms_file_path, "w") as f: + ts.write_ms(tree_seq, f) + with open(ms_file_path) as handle: + num_sites = get_ms_file_quantity(handle, "num_sites") + with open(ms_file_path) as handle: + num_positions = get_ms_file_quantity(handle, "num_positions") + self.assertEqual(tree_seq.num_sites, num_sites[0]) + self.assertEqual(tree_seq.num_sites, num_positions[0]) + + def test_num_sites(self): + tree_seq = msprime.simulate( + 25, length=length, mutation_rate=mutation_rate, random_seed=123 + ) + self.verify_num_sites(tree_seq, tree_seq, 1) + + def test_num_sites_replicates(self): + tree_seq = msprime.simulate( + 25, + length=length, + mutation_rate=mutation_rate, + random_seed=123, + num_replicates=num_replicates, + ) + tree_seq, tree_seq2 = itertools.tee(tree_seq) + self.verify_num_sites(tree_seq, tree_seq2, num_replicates) + + +class TestGenotypes(unittest.TestCase): + """ + Tests that the haplotypes written out are the same as the haplotypes generated. + """ + + def get_genotypes(self, tree_seq): + genotypes = tree_seq.genotype_matrix() + gens_array = [] + for k in range(tree_seq.num_samples): + tmp_str = "".join(map(str, genotypes[:, k])) + gens_array.append(tmp_str) + return gens_array + + def verify_genotypes(self, tree_seq, tree_seq2, num_replicates): + if isinstance(tree_seq, collections.Iterable): + with tempfile.TemporaryDirectory() as temp_dir: + ms_file_path = os.path.join(temp_dir, "testing_ms_file.txt") + with open(ms_file_path, "w") as f: + ts.write_ms( + tree_seq, + f, + num_replicates=num_replicates, + ) + with open(ms_file_path) as handle: + genotypes = get_ms_file_quantity(handle, "genotypes") + j = 0 + for ts_indv in tree_seq2: + self.assertEqual(self.get_genotypes(ts_indv), genotypes[j]) + j = j + 1 + else: + with tempfile.TemporaryDirectory() as temp_dir: + ms_file_path = os.path.join(temp_dir, "testing_ms_file.txt") + with open(ms_file_path, "w") as f: + ts.write_ms(tree_seq, f) + with open(ms_file_path) as handle: + genotypes = get_ms_file_quantity(handle, "genotypes") + self.assertEqual(self.get_genotypes(tree_seq), genotypes[0]) + + def test_genotypes(self): + tree_seq = msprime.simulate( + 25, length=length, mutation_rate=mutation_rate, random_seed=123 + ) + self.verify_genotypes(tree_seq, tree_seq, 1) + + def test_genotypes_replicates(self): + tree_seq = msprime.simulate( + 25, + length=length, + mutation_rate=mutation_rate, + random_seed=123, + num_replicates=num_replicates, + ) + tree_seq, tree_seq2 = itertools.tee(tree_seq) + self.verify_genotypes(tree_seq, tree_seq2, num_replicates) + + +class TestPositions(unittest.TestCase): + """ + Tests that the positions for the mutations written out are the same as the + positions generated. + """ + + def get_positions(self, tree_seq): + positions = [] + for i in range(tree_seq.num_sites): + positions.append( + "{:.4f}".format(tree_seq.site(i).position / tree_seq.sequence_length) + ) + positions = " ".join(positions) + return positions + + def verify_positions(self, tree_seq, tree_seq2, num_replicates): + if isinstance(tree_seq, collections.Iterable): + with tempfile.TemporaryDirectory() as temp_dir: + ms_file_path = os.path.join(temp_dir, "testing_ms_file.txt") + with open(ms_file_path, "w") as f: + ts.write_ms( + tree_seq, + f, + num_replicates=num_replicates, + ) + with open(ms_file_path) as handle: + positions = get_ms_file_quantity(handle, "positions") + j = 0 + for ts_indv in tree_seq2: + self.assertEqual(self.get_positions(ts_indv), positions[j]) + j = j + 1 + else: + with tempfile.TemporaryDirectory() as temp_dir: + ms_file_path = os.path.join(temp_dir, "testing_ms_file.txt") + with open(ms_file_path, "w") as f: + ts.write_ms(tree_seq, f) + with open(ms_file_path) as handle: + positions = get_ms_file_quantity(handle, "positions") + self.assertEqual(self.get_positions(tree_seq), positions[0]) + + def test_positions(self): + tree_seq = msprime.simulate( + 25, length=length, mutation_rate=mutation_rate, random_seed=123 + ) + self.verify_positions(tree_seq, tree_seq, 1) + + def test_positions_replicates(self): + tree_seq = msprime.simulate( + 25, + length=length, + mutation_rate=mutation_rate, + random_seed=123, + num_replicates=num_replicates, + ) + tree_seq, tree_seq2 = itertools.tee(tree_seq) + self.verify_positions(tree_seq, tree_seq2, num_replicates) diff --git a/python/tskit/trees.py b/python/tskit/trees.py index 2273ccaf40..7fd2abcf0d 100644 --- a/python/tskit/trees.py +++ b/python/tskit/trees.py @@ -6403,3 +6403,108 @@ def newick_trees(self, precision=3, breakpoints=None, Ne=1): "This method is no longer supported. Please use the Tree.newick" " method instead" ) + + +def write_ms( + tree_sequence, + output, + print_trees=False, + precision=4, + num_replicates=1, + write_header=True, +): + """ + Write ``ms`` formatted output from the genotypes of a tree sequence + or an iterator over tree sequences. Usage: + + .. code-block:: python + + import tskit as ts + + tree_sequence = msprime.simulate( + sample_size=sample_size, + Ne=Ne, + length=length, + mutation_rate=mutation_rate, + recombination_rate=recombination_rate, + random_seed=random_seed, + num_replicates=num_replicates, + ) + with open("output.ms", "w") as ms_file: + ts.write_ms(tree_sequence, ms_file) + + :param ts tree_sequence: The tree sequence (or iterator over tree sequences) to + write to ms file + :param io.IOBase output: The file-like object to write the ms-style output + :param bool print_trees: Boolean parameter to write out newick format trees + to output [optional] + :param int precision: Numerical precision with which to write the ms + output [optional] + :param bool write_header: Boolean parameter to write out the header. [optional] + :param int num_replicates: Number of replicates simulated [required if + num_replicates used in simulation] + + The first line of this ms-style output file written has two arguments which + are sample size and number of replicates. The second line has a 0 as a substitute + for the random seed. + """ + if not isinstance(tree_sequence, collections.Iterable): + tree_sequence = [tree_sequence] + + i = 0 + for tree_seq in tree_sequence: + if i > 0: + write_header = False + i = i + 1 + + if write_header is True: + print( + f"ms {tree_seq.sample_size} {num_replicates}", + file=output, + ) + print("0", file=output) + + print(file=output) + print("//", file=output) + if print_trees is True: + """ + Print out the trees in ms-format from the specified tree sequence. + """ + if len(tree_seq.trees()) == 1: + tree = next(tree_seq.trees()) + newick = tree.newick(precision=precision) + print(newick, file=output) + else: + for tree in tree_seq.trees(): + newick = tree.newick(precision=precision) + print(f"[{tree.span:.{precision}f}]", newick, file=output) + + else: + s = tree_seq.get_num_sites() + print("segsites:", s, file=output) + if s != 0: + print("positions: ", end="", file=output) + positions = [ + variant.position / (tree_seq.sequence_length) + for variant in tree_seq.variants() + ] + for position in positions: + print( + f"{position:.{precision}f}", + end=" ", + file=output, + ) + print(file=output) + + genotypes = tree_seq.genotype_matrix() + for k in range(tree_seq.num_samples): + tmp_str = "".join(map(str, genotypes[:, k])) + if set(tmp_str).issubset({"0", "1", "-"}): + print(tmp_str, file=output) + else: + raise ValueError( + "This tree sequence contains non-biallelic" + "SNPs and is incompatible with the ms format!" + ) + else: + print(file=output)