From 49c0fe5454ae9cfc1d64b7aeee1a346c5960229a Mon Sep 17 00:00:00 2001 From: Ben Jeffery Date: Thu, 8 Feb 2024 13:26:39 +0000 Subject: [PATCH] Error when the VCF will have a zero position site --- python/CHANGELOG.rst | 9 ++- python/tests/test_cli.py | 42 +++++++++-- python/tests/test_vcf.py | 150 ++++++++++++++++++++++++++++++--------- python/tskit/cli.py | 16 ++++- python/tskit/trees.py | 8 +++ python/tskit/vcf.py | 17 ++++- 6 files changed, 199 insertions(+), 43 deletions(-) diff --git a/python/CHANGELOG.rst b/python/CHANGELOG.rst index 31151fcb35..167041725f 100644 --- a/python/CHANGELOG.rst +++ b/python/CHANGELOG.rst @@ -2,6 +2,13 @@ [0.5.7] - 2023-XX-XX -------------------- +**Breaking Changes** + +- The VCF writing methods (`ts.write_vcf`, `ts.as_vcf`) now error if a site with + position zero is encountered. The VCF spec does not allow zero position sites. + Suppress this error with the `allow_position_zero` argument. + (:user:`benjeffery`, :pr:`2901`, :issue:`2838`) + **Features** - Add ``TreeSequence.extend_edges`` method that extends ancestral haplotypes @@ -17,7 +24,7 @@ `TreeSequence.allele_frequency_spectrum(mode="branch", polarised=False)`, which was half as big as it should have been. (:user:`petrelharp`, :user:`nspope`, :pr:`2933`) - + -------------------- [0.5.6] - 2023-10-10 -------------------- diff --git a/python/tests/test_cli.py b/python/tests/test_cli.py index 4a000caca1..67b3890c2e 100644 --- a/python/tests/test_cli.py +++ b/python/tests/test_cli.py @@ -1,6 +1,6 @@ # MIT License # -# Copyright (c) 2018-2022 Tskit Developers +# Copyright (c) 2018-2024 Tskit Developers # Copyright (c) 2017 University of Oxford # # Permission is hereby granted, free of charge, to any person obtaining a copy @@ -295,6 +295,22 @@ def test_vcf_contig_id(self, flags, expected): assert args.tree_sequence == tree_sequence assert args.contig_id == expected + @pytest.mark.parametrize( + "flags,expected", + ( + [[], False], + [["-0"], True], + [["--allow-position-zero"], True], + ), + ) + def test_vcf_allow_position_zero(self, flags, expected): + parser = cli.get_tskit_parser() + cmd = "vcf" + tree_sequence = "test.trees" + args = parser.parse_args([cmd, tree_sequence, *flags]) + assert args.tree_sequence == tree_sequence + assert args.allow_position_zero == expected + def test_upgrade_default_values(self): parser = cli.get_tskit_parser() cmd = "upgrade" @@ -372,7 +388,7 @@ def test_trees_long_args(self): class TestTskitConversionOutput(unittest.TestCase): """ - Tests the output of msp to ensure it's correct. + Tests the output of tskit to ensure it's correct. """ @classmethod @@ -544,14 +560,16 @@ def test_fasta(self): def verify_vcf(self, output_vcf): with tempfile.TemporaryFile("w+") as f: - self._tree_sequence.write_vcf(f) + self._tree_sequence.write_vcf(f, allow_position_zero=True) f.seek(0) vcf = f.read() assert output_vcf == vcf def test_vcf(self): cmd = "vcf" - stdout, stderr = capture_output(cli.tskit_main, [cmd, self._tree_sequence_file]) + stdout, stderr = capture_output( + cli.tskit_main, [cmd, "-0", self._tree_sequence_file] + ) assert len(stderr) == 0 self.verify_vcf(stdout) @@ -582,6 +600,22 @@ def test_trees_draw(self): assert len(stdout.splitlines()) > 3 * ts.num_trees +class TestVCFZeroPosition: + """ + Tests that we can write VCF files with position 0. + """ + + def test_zero_position(self, tmp_path): + ts = msprime.simulate(10, mutation_rate=1, random_seed=1) + ts.dump(tmp_path / "test.trees") + with pytest.raises(ValueError): + capture_output(cli.tskit_main, ["vcf", str(tmp_path / "test.trees")]) + stdout, stderr = capture_output( + cli.tskit_main, ["vcf", "-0", str(tmp_path / "test.trees")] + ) + assert len(stderr) == 0 + + class TestBadFile: """ Tests that we deal with IO errors appropriately. diff --git a/python/tests/test_vcf.py b/python/tests/test_vcf.py index 147173cc34..a1ce300702 100644 --- a/python/tests/test_vcf.py +++ b/python/tests/test_vcf.py @@ -1,6 +1,6 @@ # MIT License # -# Copyright (c) 2018-2022 Tskit Developers +# Copyright (c) 2018-2024 Tskit Developers # Copyright (c) 2016 University of Oxford # # Permission is hereby granted, free of charge, to any person obtaining a copy @@ -57,7 +57,7 @@ def ts_to_pysam(ts, *args, **kwargs): with tempfile.TemporaryDirectory() as temp_dir: vcf_path = os.path.join(temp_dir, "file.vcf") with open(vcf_path, "w") as f: - ts.write_vcf(f, *args, **kwargs) + ts.write_vcf(f, *args, **kwargs, allow_position_zero=True) yield pysam.VariantFile(vcf_path) @@ -349,7 +349,9 @@ def test_individuals_no_nodes_default_args(self): tables = ts1.dump_tables() tables.individuals.add_row() ts2 = tables.tree_sequence() - assert ts1.as_vcf() == ts2.as_vcf() + assert ts1.as_vcf(allow_position_zero=True) == ts2.as_vcf( + allow_position_zero=True + ) def test_individuals_no_nodes_as_argument(self): ts1 = msprime.simulate(10, mutation_rate=0.1, random_seed=2) @@ -383,7 +385,7 @@ def test_duplicate_individuals(self): ts = msprime.sim_ancestry(3, random_seed=2) ts = tsutil.insert_branch_sites(ts) with pytest.raises(tskit.LibraryError, match="TSK_ERR_DUPLICATE_SAMPLE"): - ts.as_vcf(individuals=[0, 0]) + ts.as_vcf(individuals=[0, 0], allow_position_zero=True) def test_mixed_sample_non_sample_individuals(self): ts = msprime.sim_ancestry(3, random_seed=2) @@ -400,7 +402,7 @@ def test_mixed_sample_non_sample_individuals(self): ): ts.as_vcf() # but it's OK if we run without the affected individual - assert len(ts.as_vcf(individuals=[1, 2])) > 0 + assert len(ts.as_vcf(individuals=[1, 2], allow_position_zero=True)) > 0 def test_samples_with_and_without_individuals(self): ts = tskit.Tree.generate_balanced(3).tree_sequence @@ -417,7 +419,7 @@ def test_samples_with_and_without_individuals(self): ): ts.as_vcf() # But it's OK if explicitly specify that sample - assert len(ts.as_vcf(individuals=[0])) > 0 + assert len(ts.as_vcf(individuals=[0], allow_position_zero=True)) > 0 def test_bad_individuals(self): ts = msprime.simulate(10, mutation_rate=0.1, random_seed=2) @@ -429,7 +431,9 @@ def test_bad_individuals(self): def test_ploidy_positional(self): ts = msprime.simulate(2, mutation_rate=2, random_seed=1) - assert ts.as_vcf(2) == ts.as_vcf(ploidy=2) + assert ts.as_vcf(2, allow_position_zero=True) == ts.as_vcf( + ploidy=2, allow_position_zero=True + ) def test_only_ploidy_positional(self): ts = msprime.simulate(2, mutation_rate=2, random_seed=1) @@ -450,14 +454,14 @@ def test_many_alleles(self): for j in range(8): tables.mutations.add_row(0, node=j, derived_state=str(j + 1)) ts = tables.tree_sequence() - ts.write_vcf(io.StringIO()) + ts.write_vcf(io.StringIO(), allow_position_zero=True) for j in range(9, 15): tables.mutations.add_row(0, node=j, derived_state=str(j)) ts = tables.tree_sequence() with pytest.raises( ValueError, match="More than 9 alleles not currently supported" ): - ts.write_vcf(io.StringIO()) + ts.write_vcf(io.StringIO(), allow_position_zero=True) class TestPositionTransformErrors: @@ -483,6 +487,35 @@ def test_bad_func(self): ts.write_vcf(io.StringIO(), position_transform=bad_func) +class TestZeroPositionErrors: + """ + Tests for handling zero position sites + """ + + def test_zero_position_error(self): + ts = msprime.sim_ancestry(3, random_seed=2, sequence_length=10) + ts = msprime.sim_mutations(ts, rate=1, random_seed=2) + assert ts.sites_position[0] == 0 + + with pytest.raises(ValueError, match="A variant position of 0"): + ts.write_vcf(io.StringIO()) + + # Should succeed if we allow it, or the site is masked or transformed + ts.write_vcf(io.StringIO(), allow_position_zero=True) + ts.write_vcf(io.StringIO(), position_transform=lambda pos: [x + 1 for x in pos]) + mask = np.zeros(ts.num_sites, dtype=bool) + mask[0] = True + ts.write_vcf(io.StringIO(), site_mask=mask) + + def test_no_position_zero_ok(self): + ts = msprime.sim_ancestry(3, random_seed=2, sequence_length=10) + ts = msprime.sim_mutations(ts, rate=0.25, random_seed=4) + assert ts.num_sites > 0 + assert ts.sites_position[0] != 0 + ts.write_vcf(io.StringIO(), allow_position_zero=True) + ts.write_vcf(io.StringIO()) + + class TestIndividualNames: """ Tests for the individual names argument. @@ -548,11 +581,15 @@ def test_bad_type(self): with pytest.raises( TypeError, match="sequence item 0: expected str instance," " NoneType found" ): - ts.write_vcf(io.StringIO(), individual_names=[None, "b"]) + ts.write_vcf( + io.StringIO(), individual_names=[None, "b"], allow_position_zero=True + ) with pytest.raises( TypeError, match="sequence item 0: expected str instance," " bytes found" ): - ts.write_vcf(io.StringIO(), individual_names=[b"a", "b"]) + ts.write_vcf( + io.StringIO(), individual_names=[b"a", "b"], allow_position_zero=True + ) def drop_header(s): @@ -581,7 +618,7 @@ def test_site_mask_bad_type(self, mask): def test_sample_mask_bad_type(self, mask): # converting to a bool array is pretty lax in what's allows. with pytest.raises(ValueError, match="Sample mask must be"): - self.ts().as_vcf(sample_mask=mask) + self.ts().as_vcf(sample_mask=mask, allow_position_zero=True) def test_no_masks(self): s = """\ @@ -591,7 +628,7 @@ def test_no_masks(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t1\t0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0\t1""" expected = textwrap.dedent(s) - assert drop_header(self.ts().as_vcf()) == expected + assert drop_header(self.ts().as_vcf(allow_position_zero=True)) == expected def test_no_masks_triploid(self): s = """\ @@ -601,7 +638,10 @@ def test_no_masks_triploid(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0|1|0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0|0|1""" expected = textwrap.dedent(s) - assert drop_header(self.ts().as_vcf(ploidy=3)) == expected + assert ( + drop_header(self.ts().as_vcf(ploidy=3, allow_position_zero=True)) + == expected + ) def test_site_0_masked(self): s = """\ @@ -610,7 +650,9 @@ def test_site_0_masked(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t1\t0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0\t1""" expected = textwrap.dedent(s) - actual = self.ts().as_vcf(site_mask=[True, False, False, False]) + actual = self.ts().as_vcf( + site_mask=[True, False, False, False], allow_position_zero=True + ) assert drop_header(actual) == expected def test_site_0_masked_triploid(self): @@ -620,7 +662,9 @@ def test_site_0_masked_triploid(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0|1|0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0|0|1""" expected = textwrap.dedent(s) - actual = self.ts().as_vcf(ploidy=3, site_mask=[True, False, False, False]) + actual = self.ts().as_vcf( + ploidy=3, site_mask=[True, False, False, False], allow_position_zero=True + ) assert drop_header(actual) == expected def test_site_1_masked(self): @@ -630,14 +674,18 @@ def test_site_1_masked(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t1\t0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0\t1""" expected = textwrap.dedent(s) - actual = self.ts().as_vcf(site_mask=[False, True, False, False]) + actual = self.ts().as_vcf( + site_mask=[False, True, False, False], allow_position_zero=True + ) assert drop_header(actual) == expected def test_all_sites_masked(self): s = """\ #CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\ttsk_0\ttsk_1\ttsk_2""" expected = textwrap.dedent(s) - actual = self.ts().as_vcf(site_mask=[True, True, True, True]) + actual = self.ts().as_vcf( + site_mask=[True, True, True, True], allow_position_zero=True + ) assert drop_header(actual) == expected def test_all_sites_not_masked(self): @@ -648,7 +696,9 @@ def test_all_sites_not_masked(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t1\t0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0\t1""" expected = textwrap.dedent(s) - actual = self.ts().as_vcf(site_mask=[False, False, False, False]) + actual = self.ts().as_vcf( + site_mask=[False, False, False, False], allow_position_zero=True + ) assert drop_header(actual) == expected @pytest.mark.parametrize( @@ -663,7 +713,7 @@ def test_all_samples_not_masked(self, mask): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t1\t0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0\t1""" expected = textwrap.dedent(s) - actual = self.ts().as_vcf(sample_mask=mask) + actual = self.ts().as_vcf(sample_mask=mask, allow_position_zero=True) assert drop_header(actual) == expected @pytest.mark.parametrize( @@ -677,7 +727,7 @@ def test_sample_0_masked(self, mask): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t.\t1\t0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t.\t0\t1""" expected = textwrap.dedent(s) - actual = self.ts().as_vcf(sample_mask=mask) + actual = self.ts().as_vcf(sample_mask=mask, allow_position_zero=True) assert drop_header(actual) == expected @pytest.mark.parametrize( @@ -691,7 +741,7 @@ def test_sample_1_masked(self, mask): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t.\t0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t.\t1""" expected = textwrap.dedent(s) - actual = self.ts().as_vcf(sample_mask=mask) + actual = self.ts().as_vcf(sample_mask=mask, allow_position_zero=True) assert drop_header(actual) == expected @pytest.mark.parametrize( @@ -705,7 +755,7 @@ def test_all_samples_masked(self, mask): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t.\t.\t. 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t.\t.\t.""" expected = textwrap.dedent(s) - actual = self.ts().as_vcf(sample_mask=mask) + actual = self.ts().as_vcf(sample_mask=mask, allow_position_zero=True) assert drop_header(actual) == expected def test_all_functional_sample_mask(self): @@ -722,7 +772,7 @@ def mask(variant): return a expected = textwrap.dedent(s) - actual = self.ts().as_vcf(sample_mask=mask) + actual = self.ts().as_vcf(sample_mask=mask, allow_position_zero=True) assert drop_header(actual) == expected @pytest.mark.skipif(not _pysam_imported, reason="pysam not available") @@ -758,7 +808,7 @@ def test_defaults(self): 1\t0\t0\t0\t1\t.\tPASS\t.\tGT\t1\t0\t. 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t0\t1\t.""" expected = textwrap.dedent(s) - assert drop_header(self.ts().as_vcf()) == expected + assert drop_header(self.ts().as_vcf(allow_position_zero=True)) == expected def test_isolated_as_missing_true(self): s = """\ @@ -766,7 +816,12 @@ def test_isolated_as_missing_true(self): 1\t0\t0\t0\t1\t.\tPASS\t.\tGT\t1\t0\t. 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t0\t1\t.""" expected = textwrap.dedent(s) - assert drop_header(self.ts().as_vcf(isolated_as_missing=True)) == expected + assert ( + drop_header( + self.ts().as_vcf(isolated_as_missing=True, allow_position_zero=True) + ) + == expected + ) def test_isolated_as_missing_false(self): s = """\ @@ -774,7 +829,12 @@ def test_isolated_as_missing_false(self): 1\t0\t0\t0\t1\t.\tPASS\t.\tGT\t1\t0\t0 1\t2\t1\t0\t1\t.\tPASS\t.\tGT\t0\t1\t0""" expected = textwrap.dedent(s) - assert drop_header(self.ts().as_vcf(isolated_as_missing=False)) == expected + assert ( + drop_header( + self.ts().as_vcf(isolated_as_missing=False, allow_position_zero=True) + ) + == expected + ) @pytest.mark.skipif(not _pysam_imported, reason="pysam not available") def test_ok_with_pysam(self): @@ -823,7 +883,7 @@ def test_no_individuals_defaults(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0\t1\t0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0\t1""" expected = textwrap.dedent(s) - assert drop_header(ts.as_vcf()) == expected + assert drop_header(ts.as_vcf(allow_position_zero=True)) == expected def test_no_individuals_ploidy_3(self): ts = drop_individuals(self.ts()) @@ -834,7 +894,7 @@ def test_no_individuals_ploidy_3(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0|1|0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0|0|1""" expected = textwrap.dedent(s) - assert drop_header(ts.as_vcf(ploidy=3)) == expected + assert drop_header(ts.as_vcf(ploidy=3, allow_position_zero=True)) == expected def test_no_individuals_ploidy_3_names(self): ts = drop_individuals(self.ts()) @@ -845,7 +905,12 @@ def test_no_individuals_ploidy_3_names(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0|1|0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0|0|1""" expected = textwrap.dedent(s) - assert drop_header(ts.as_vcf(ploidy=3, individual_names="A")) == expected + assert ( + drop_header( + ts.as_vcf(ploidy=3, individual_names="A", allow_position_zero=True) + ) + == expected + ) def test_defaults(self): ts = self.ts() @@ -856,7 +921,7 @@ def test_defaults(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0|0\t1 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0|1\t0""" expected = textwrap.dedent(s) - assert drop_header(ts.as_vcf()) == expected + assert drop_header(ts.as_vcf(allow_position_zero=True)) == expected def test_individual_0(self): ts = self.ts() @@ -867,7 +932,10 @@ def test_individual_0(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t0|0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0|1""" expected = textwrap.dedent(s) - assert drop_header(ts.as_vcf(individuals=[0])) == expected + assert ( + drop_header(ts.as_vcf(individuals=[0], allow_position_zero=True)) + == expected + ) def test_individual_1(self): ts = self.ts() @@ -878,7 +946,10 @@ def test_individual_1(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t1 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0""" expected = textwrap.dedent(s) - assert drop_header(ts.as_vcf(individuals=[1])) == expected + assert ( + drop_header(ts.as_vcf(individuals=[1], allow_position_zero=True)) + == expected + ) def test_reversed(self): ts = self.ts() @@ -889,7 +960,10 @@ def test_reversed(self): 1\t4\t2\t0\t1\t.\tPASS\t.\tGT\t1\t0|0 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0|1""" expected = textwrap.dedent(s) - assert drop_header(ts.as_vcf(individuals=[1, 0])) == expected + assert ( + drop_header(ts.as_vcf(individuals=[1, 0], allow_position_zero=True)) + == expected + ) def test_reversed_names(self): ts = self.ts() @@ -901,6 +975,12 @@ def test_reversed_names(self): 1\t6\t3\t0\t1\t.\tPASS\t.\tGT\t0\t0|1""" expected = textwrap.dedent(s) assert ( - drop_header(ts.as_vcf(individuals=[1, 0], individual_names=["A", "B"])) + drop_header( + ts.as_vcf( + individuals=[1, 0], + individual_names=["A", "B"], + allow_position_zero=True, + ), + ) == expected ) diff --git a/python/tskit/cli.py b/python/tskit/cli.py index b8a660070b..b20bd260d9 100644 --- a/python/tskit/cli.py +++ b/python/tskit/cli.py @@ -1,7 +1,7 @@ # # MIT License # -# Copyright (c) 2018-2022 Tskit Developers +# Copyright (c) 2018-2024 Tskit Developers # Copyright (c) 2015-2018 University of Oxford # # Permission is hereby granted, free of charge, to any person obtaining a copy @@ -137,7 +137,12 @@ def run_fasta(args): def run_vcf(args): tree_sequence = load_tree_sequence(args.tree_sequence) - tree_sequence.write_vcf(sys.stdout, ploidy=args.ploidy, contig_id=args.contig_id) + tree_sequence.write_vcf( + sys.stdout, + ploidy=args.ploidy, + contig_id=args.contig_id, + allow_position_zero=args.allow_position_zero, + ) def add_tree_sequence_argument(parser): @@ -223,6 +228,13 @@ def get_tskit_parser(): parser.add_argument( "--contig-id", "-c", type=str, default="1", help="Specify the contig id" ) + parser.add_argument( + "--allow-position-zero", + "-0", + action="store_true", + default=False, + help="Allow position 0 sites", + ) parser.set_defaults(runner=run_vcf) parser = subparsers.add_parser( diff --git a/python/tskit/trees.py b/python/tskit/trees.py index d8829d8ab2..b452ebf844 100644 --- a/python/tskit/trees.py +++ b/python/tskit/trees.py @@ -6176,6 +6176,7 @@ def write_vcf( site_mask=None, sample_mask=None, isolated_as_missing=None, + allow_position_zero=None, ): """ Convert the genetic variation data in this tree sequence to Variant @@ -6302,7 +6303,13 @@ def write_vcf( missing samples (i.e., isolated samples without mutations) is "." If False, missing samples will be assigned the ancestral allele. See :meth:`.variants` for more information. Default: True. + :param bool allow_position_zero: If True allow sites with position zero to be + output to the VCF, otherwise if one is present an error will be raised. + The VCF spec does not allow for sites at position 0. However, in practise + many tools will be fine with this. Default: False. """ + if allow_position_zero is None: + allow_position_zero = False writer = vcf.VcfWriter( self, ploidy=ploidy, @@ -6313,6 +6320,7 @@ def write_vcf( site_mask=site_mask, sample_mask=sample_mask, isolated_as_missing=isolated_as_missing, + allow_position_zero=allow_position_zero, ) writer.write(output) diff --git a/python/tskit/vcf.py b/python/tskit/vcf.py index 2ee74b761c..dcbea01cb6 100644 --- a/python/tskit/vcf.py +++ b/python/tskit/vcf.py @@ -1,7 +1,7 @@ # # MIT License # -# Copyright (c) 2019-2022 Tskit Developers +# Copyright (c) 2019-2024 Tskit Developers # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -63,6 +63,7 @@ def __init__( site_mask, sample_mask, isolated_as_missing, + allow_position_zero, ): self.tree_sequence = tree_sequence self.contig_id = contig_id @@ -111,6 +112,20 @@ def __init__( sample_mask = np.array(sample_mask, dtype=bool) self.sample_mask = lambda _: sample_mask + # The VCF spec does not allow for positions to be 0, so we error if one of the + # transformed positions is 0 and allow_position_zero is False. + if not allow_position_zero and np.any( + self.transformed_positions[~site_mask] == 0 + ): + raise ValueError( + "A variant position of 0 was found in the VCF output, this is not " + "fully compliant with the VCF spec. If you still wish to write the VCF " + 'please use the "allow_position_zero" argument to write_vcf. ' + "Alternatively, you can increment all the positions by one using " + '"position_transform = lambda x: 1 + x" or coerce the zero to one with ' + '"position_transform = lambda x: np.fmax(1, x)"' + ) + def __make_sample_mapping(self, ploidy, individuals): """ Compute the sample IDs for each VCF individual and the template for