-
Notifications
You must be signed in to change notification settings - Fork 84
/
demography.py
4431 lines (3975 loc) · 182 KB
/
demography.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Copyright (C) 2015-2021 University of Oxford
#
# This file is part of msprime.
#
# msprime is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# msprime is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with msprime. If not, see <http://www.gnu.org/licenses/>.
#
"""
Module responsible for defining and debugging demographic models.
"""
from __future__ import annotations
import collections
import copy
import dataclasses
import enum
import inspect
import itertools
import json
import logging
import math
import numbers
import operator
import re
import sys
import textwrap
import warnings
from typing import Any
from typing import ClassVar
from typing import MutableMapping
import demes
import numpy as np
import tskit
from . import ancestry
from . import core
from . import species_trees
logger = logging.getLogger(__name__)
class IncompletePopulationMetadataWarning(UserWarning):
"""
Warning raised when we don't have sufficient information to fill
out population metadata.
"""
class LruCache(collections.OrderedDict):
# LRU example from the OrderedDict documentation
def __init__(self, maxsize=128, *args, **kwds):
self.maxsize = maxsize
super().__init__(*args, **kwds)
def __getitem__(self, key):
value = super().__getitem__(key)
self.move_to_end(key)
return value
def __setitem__(self, key, value):
super().__setitem__(key, value)
if len(self) > self.maxsize:
oldest = next(iter(self))
del self[oldest]
_population_table_cache = LruCache(16)
def _build_population_table(populations):
"""
Return a tskit PopulationTable instance encoding the metadata for the
specified populations. Because encoding metadata is quite expensive
we maintain an LRU cache.
"""
population_metadata = []
for population in populations:
metadata = {
"name": population.name,
"description": population.description,
}
if population.extra_metadata is not None:
intersection = set(population.extra_metadata.keys()) & set(metadata.keys())
if len(intersection) > 0:
printed_list = list(sorted(intersection))
raise ValueError(
f"Cannot set standard metadata key(s) {printed_list} "
"using extra_metadata. Please set using the corresponding "
"property of the Population class."
)
metadata.update(population.extra_metadata)
population_metadata.append(metadata)
# The only thing we store in the Population table is the metadata, so
# we cache based on this.
key = json.dumps(population_metadata, sort_keys=True)
if key not in _population_table_cache:
table = tskit.PopulationTable()
table.metadata_schema = tskit.MetadataSchema(
{
"codec": "json",
"type": "object",
"properties": {
"name": {"type": "string"},
"description": {"type": ["string", "null"]},
},
# The name and description fields are always filled out by
# msprime, so we tell downstream tools this by making them
# "required" by the schema.
"required": ["name", "description"],
"additionalProperties": True,
}
)
for metadata in population_metadata:
table.add_row(metadata=metadata)
_population_table_cache[key] = table
return _population_table_cache[key]
def check_num_populations(num_populations):
"""
Check if an input number of populations is valid.
"""
if num_populations < 1:
raise ValueError("Must have at least one population")
def check_migration_rate(migration_rate):
"""
Check if an input migration rate makes sense.
"""
if migration_rate < 0:
raise ValueError("Migration rates must be non-negative")
@dataclasses.dataclass
class Population:
"""
A single population in a :class:`.Demography`. See the
:ref:`sec_demography_populations` section for more information on
what populations represent, and how they can be used.
.. warning:: This class should not be instantiated directly. Please use
:meth:`.Demography.add_population` method instead.
"""
initial_size: float = 0.0
"""
The absolute size of the population at time zero.
See the :ref:`sec_demography_populations_initial_size` section
for more details and examples.
"""
growth_rate: float = 0.0
"""
The exponential growth rate of the population per generation (forwards in time).
Growth rates can be negative. This is zero for a constant population size,
and positive for a population that has been growing.
See the :ref:`sec_demography_populations_growth_rate` section for more
details and examples.
"""
name: str | None = None
"""
The name of the population. If specified this must be a uniquely
identifying string and must be a valid Python identifier (i.e., could be
used as a variable name in Python code).
See :ref:`sec_demography_populations_identifiers` for more details
and recommendations on best practise.
"""
description: str = ""
"""
A concise description of the population. Defaults to the empty string if not
specified.
"""
extra_metadata: dict = dataclasses.field(default_factory=dict)
"""
A JSON-encodable dictionary of metadata items to be stored in the
associated tskit population object. This dictionary must not contain keys
for any of the pre-defined metadata items.
See the :ref:`sec_demography_populations_metadata` section for more
details and examples.
"""
default_sampling_time: float | None = None
"""
The default time at which samples are drawn from this population. See the
:ref:`sec_demography_populations_default_sampling_time` section for more
details.
"""
initially_active: bool | None = None
"""
If True, this population will always be initially active, regardless
of whether it participates in a :ref:`sec_demography_events_population_split`.
If not set, or None, the initial state of the population will be
set automatically depending on the events declared in the demography.
See the :ref:`sec_demography_populations_life_cycle` section for
more details.
"""
id: int | None = dataclasses.field(default=None) # noqa: A003
"""
The integer ID of this population within the parent :class:`.Demography`.
This attribute is assigned by the Demography class and should not be set
or changed by user code.
"""
def asdict(self):
return dataclasses.asdict(self)
def validate(self):
if self.initial_size < 0:
raise ValueError("Negative population size")
if self.name is None:
raise ValueError("A population name must be set.")
if not self.name.isidentifier():
raise ValueError("A population name must be a valid Python identifier")
@dataclasses.dataclass
class Demography(collections.abc.Mapping):
"""
The definition of a demographic model for an msprime simulation,
consisting of a set of populations, a migration matrix, and a list
of demographic events. See the :ref:`sec_demography` section for
detailed documentation on how to define, debug and simulate with
demography in msprime.
Please see the :ref:`sec_demography_demography_objects` section
for details of how to access and update population information
within a model.
Demography objects implement the Python
:class:`python:collections.abc.Mapping` protocol, in which the
keys are **either** the population ``name`` or
integer ``id`` values (see the :ref:`sec_demography_populations_identifiers`
section for more information) and the values are :class:`.Population`
objects.
In general, population references in methods such as
:meth:`.Demography.add_population_split` can either be string names
or integer IDs, and the two forms can be used interchangeably.
"""
populations: list[Population] = dataclasses.field(default_factory=list)
events: list = dataclasses.field(default_factory=list)
# Until we can use numpy type hints properly, it's not worth adding them
# here. We still have to add in ignores below for indexed assignment errors.
migration_matrix: Any | None = None
def __post_init__(self):
if self.migration_matrix is None:
N = self.num_populations
self.migration_matrix = np.zeros((N, N))
# People might get cryptic errors from passing in copies of the same
# population, so check for it.
if len({id(pop) for pop in self.populations}) != len(self.populations):
raise ValueError("Population objects must be distinct")
# Assign the IDs and default names, if needed.
for j, population in enumerate(self.populations):
if population.id is not None:
raise ValueError(
"Population ID should not be set before using to create "
"a Demography"
)
population.id = j
if population.name is None:
population.name = f"pop_{j}"
self._validate_populations()
def add_population(
self,
*,
initial_size: float,
growth_rate: float | None = None,
name: str | None = None,
description: str | None = None,
extra_metadata: dict | None = None,
default_sampling_time: float | None = None,
initially_active: bool | None = None,
) -> Population:
"""
Adds a new :class:`.Population` to this :class:`.Demography` with the
specified parameters. The new population will have ID equal to the
the number of populations immediately before ``add_population``
is called, such that the first population added has ID 0, the next
ID 1 and so on. If the ``name`` is not specified, this defaults
to ``"pop_{id}"``. An :ref:`sec_demography_populations_initial_size`
value must be specified (but may be zero).
:param float initial_size: The number of individuals of the population
at time zero. See the :ref:`sec_demography_populations_initial_size`
section for more details and examples.
:param float growth_rate: The exponential growth rate of the
population. See the :ref:`sec_demography_populations_growth_rate`
section for more details and examples.
:param str name: The human-readable identifier for this population.
If not specified, defaults to the string ``"pop_{id}"`` where
``id`` is the population's integer ID. See
:ref:`sec_demography_populations_identifiers` for more details
and recommendations on best practise.
:param str description: A concise but informative description of
what this population represents within the wider model. Defaults
to the empty strings.
:param dict extra_metadata: Extra metadata to associate with
this population that will be stored tree sequences output
by :func:`.sim_ancestry`. See the
:ref:`sec_demography_populations_metadata` section for more
details and examples.
:param float default_sampling_time: The time at which samples
will be taken from this population, if a time in not otherwise
specified. By default this is determined by the details
of the model, and whether populations are ancestral in
:ref:`sec_demography_events_population_split` events. See the
:ref:`sec_demography_populations_default_sampling_time` section
for more details.
:param bool initially_active: Whether this population is initially
:ref:`active<sec_demography_populations_life_cycle>`.
By default this is determined by the details
of the model, and whether populations are ancestral in
:ref:`sec_demography_events_population_split` events. See the
:ref:`sec_demography_populations_life_cycle` section
for more details.
:returns: The new :class:`.Population` instance.
:rtype: Population
"""
N = self.num_populations
population = Population(
id=N,
initial_size=initial_size,
growth_rate=0 if growth_rate is None else growth_rate,
name=f"pop_{N}" if name is None else name,
description="" if description is None else description,
extra_metadata={} if extra_metadata is None else extra_metadata,
default_sampling_time=default_sampling_time,
initially_active=initially_active,
)
self.populations.append(population)
# TODO this is inefficient - we should probably store the migration
# matrix in a sparse dictionary form internally.
M = self.migration_matrix
self.migration_matrix = np.zeros((N + 1, N + 1))
self.migration_matrix[:N, :N] = M
self._validate_populations()
return population
def _add_population_from_old_style(
self, pop_config: PopulationConfiguration, name: str | None = None
) -> Population:
population = self.add_population(
name=name,
initial_size=pop_config.initial_size,
growth_rate=pop_config.growth_rate,
)
metadata = pop_config.metadata
if metadata is not None and isinstance(metadata, collections.abc.Mapping):
metadata = metadata.copy()
if "name" in metadata:
population.name = metadata.pop("name")
if name is not None and name != population.name:
# Maybe this should be a warning, or just ignored entirely?
raise ValueError(
"Population name already set in old-style metadata "
f"({name}) and doesn't match supplied name "
f"({population.name})"
)
if "description" in metadata:
population.description = metadata.pop("description")
population.extra_metadata = metadata
return population
def add_event(self, event: DemographicEvent) -> DemographicEvent:
if not isinstance(event, DemographicEvent):
raise TypeError("Events must be instances of DemographicEvent")
event.demography = self
self.events.append(event)
return event
def set_migration_rate(
self, source: str | int, dest: str | int, rate: float
) -> None:
"""
Sets the backwards-time rate of migration from the specified ``source``
population to ``dest`` to the specified value. This has the effect of
setting ``demography.migration_matrix[source, dest] = rate``. It is
the rate at which a lineage currently in ``source`` moves to ``dest``
as one follows the lineage back through time.
.. important:: Note this is the
:ref:`backwards in time<sec_demography_direction_of_time>`;
migration rate and that
``source`` and ``dest`` are from the perspective of lineages in the
coalescent process. See :ref:`sec_demography_migration` for more
details and clarification on this vital point.
The ``source`` and ``dest`` populations can be referred to either by
their integer ``id`` or string ``name`` values.
:param str,int source: The source population from which lineages originate
in the backwards-time process.
:param str,int dest: The destination population where lineages are move
to in the backwards-time process.
:param float rate: The per-generation migration rate.
"""
source = self[source].id
dest = self[dest].id
if source == dest:
raise ValueError("The source and dest populations must be different")
self.migration_matrix[source, dest] = rate # type: ignore
def set_symmetric_migration_rate(
self,
populations: list[str | int],
rate: float,
) -> None:
"""
Sets the symmetric migration rate between all pairs of populations in
the specified list to the specified value. For a given pair of population
IDs ``j`` and ``k``, this sets ``demography.migration_matrix[j, k] = rate``
and ``demography.migration_matrix[k, j] = rate``.
Populations may be specified either by their integer IDs or by
their string names.
:param list populations: An iterable of population identifiers (integer
IDs or string names).
:param float rate: The value to set the migration matrix entries to.
"""
# There's an argument for not checking this so that corner cases on
# single population models can be handled. However, it's nearly always
# going to be a user error where someone forgets the second population
# so it seems better to raise an error to prevent hard-to-detect mistakes.
if len(populations) < 2:
raise ValueError("Must specify at least two populations")
pop_ids = [self[identifier].id for identifier in populations]
for pop_j, pop_k in itertools.combinations(pop_ids, 2):
self.migration_matrix[pop_j, pop_k] = rate # type: ignore
self.migration_matrix[pop_k, pop_j] = rate # type: ignore
# Demographic events.
def _check_population_references(self, populations: list[str | int]):
for pop_ref in populations:
# Slightly unsure whether this call can get optimised out, but
# there doesn't seem to be a better way to do it without duplicating
# the KeyError message
self[pop_ref]
def _add_activate_population_event(
self, time: float, *, population: str | int
) -> ActivatePopulationEvent:
"""
Activates a population at the specified time. The population is expected to be
initially inactive.
:param float time: The time at which this event occurs in generations.
:param str, int population: The population to activate.
"""
self._check_population_references([population])
return self.add_event(ActivatePopulationEvent(time=time, population=population))
def add_population_split(
self, time: float, *, derived: list[str | int], ancestral: str | int
) -> PopulationSplit:
"""
Adds a population split event at the specified time. In a population
split event all lineages from the (more recent) derived populations
move to the (more ancient) ancestral population. Forwards in time,
this corresponds to the ancestral population splitting into the
derived populations.
See the :ref:`sec_demography_events_population_split` section
for more details and examples.
In addition to moving lineages from the derived population(s) into the
ancestral population, a population split has the following additional
effects:
- All derived populations are set to
:ref:`inactive<sec_demography_populations_life_cycle>`.
- All migration rates to and from the derived populations are set to 0.
- Population sizes and growth rates for the derived populations are set
to 0.
- The ``default_sampling_time`` of the ``ancestral`` :class:`.Population`
is set to the time of this event, **if** the ``default_sampling_time``
for the ancestral population has not already been set.
:param float time: The time at which this event occurs in generations.
:param list(str, int) derived: The derived populations.
:param str, int ancestral: The ancestral population.
"""
self._check_population_references(list(derived) + [ancestral])
pop = self[ancestral]
if pop.initially_active is None:
pop.initially_active = False
if pop.default_sampling_time is None:
pop.default_sampling_time = time
return self.add_event(
PopulationSplit(time=time, derived=derived, ancestral=ancestral)
)
def add_admixture(
self,
time: float,
*,
derived: str | int,
ancestral: list[str | int],
proportions: list[float],
) -> Admixture:
"""
Adds an admixture event at the specified time. In an admixture
event all lineages from a (more recent) ``derived`` population
move to a list of (more ancient) ``ancestral`` populations according
to a list of ``proportions``, such that a given lineage has a
probability ``proportions[j]`` of being moved to the population
``ancestral[j]``. This movement of lineages backwards in time
corresponds to the initial state of the admixed derived population
the specified ``time`` being composed of individuals from the
specified ``ancestral`` populations in the specified ``proportions``.
See the :ref:`sec_demography_events_admixture` section
for more details and examples.
In addition to moving lineages from the derived population into the
ancestral population(s), an admixture has the following additional
effects:
- The derived population is set to
:ref:`inactive<sec_demography_populations_life_cycle>`.
- The ancestral populations are set to
:ref:`active<sec_demography_populations_life_cycle>`, if they are
not already active.
- All migration rates to and from the derived population are set to 0.
- Population sizes and growth rates for the derived population are set
to 0, and the population is marked as inactive.
:param float time: The time at which this event occurs in generations.
:param str, int derived: The derived population.
:param list(str, int) ancestral: The ancestral populations.
:param list(float) proportions: The proportion of the derived population
from each of the ancestral populations at the time of the event.
"""
self._check_population_references(list(ancestral) + [derived])
# Useful feature here might be to support taking n - 1 proportion values
# and computing 1 - sum for the last value. Could be tedious for users to
# do this manually.
if not math.isclose(sum(proportions), 1.0):
raise ValueError("Sum of the admixture proportions must be approximately 1")
return self.add_event(
Admixture(
time=time, derived=derived, ancestral=ancestral, proportions=proportions
)
)
def add_mass_migration(
self,
time: float,
*,
source: str | int,
dest: str | int,
proportion: float,
) -> MassMigration:
"""
Adds a mass migration (or "pulse migration") event at the specified
time. In a mass migration event, lineages in the ``source`` population
are moved to the ``dest`` population with probability ``proportion``.
Forwards-in-time, this corresponds to individuals migrating
**from** population ``dest`` **to** population ``source``.
Please see the :ref:`sec_demography_events_mass_migration` section
for more details and examples.
.. warning:: Mass migrations are an advanced feature and should
only be used if the required population dynamics cannot be
modelled by :ref:`sec_demography_events_population_split`
or :ref:`sec_demography_events_admixture` events.
.. important::
Note that ``source`` and ``dest`` are from the perspective of the
coalescent process, i.e.
:ref:`backwards in time<sec_demography_direction_of_time>`;
please see the
:ref:`sec_demography_migration` section for more details.
:param float time: The time at which this event occurs in generations.
:param str, int source: The population **from** which lineages are moved.
:param str, int dest: The population **to** which lineages are moved.
:param float proportion: For each lineage in the ``source`` population,
this is the probability that it moves to the ``dest`` population.
"""
self._check_population_references([source, dest])
return self.add_event(MassMigration(time, source, dest, proportion))
def add_migration_rate_change(
self,
time: float,
*,
rate: float,
source: int | str | None = None,
dest: int | str | None = None,
) -> MigrationRateChange:
"""
Changes the rate of migration from one deme to another to a new value at a
specific time. Migration rates are specified in terms of the rate at which
lineages move from population ``source`` to ``dest`` during the progress of
the simulation.
.. important::
Note that ``source`` and ``dest`` are from the perspective of the
coalescent process, i.e.
:ref:`backwards in time<sec_demography_direction_of_time>`;
please see the
:ref:`sec_demography_migration` section for more details.
By default, ``source=None`` and ``dest=None``, which results in all
non-diagonal elements of the migration matrix being changed to the new
rate. If ``source`` and ``dest`` are specified, they must refer to valid
populations (either integer IDs or string names).
:param float time: The time at which this event occurs in generations.
:param float rate: The new per-generation migration rate.
:param str, int source: The ID of the source population.
:param str, int dest: The ID of the destination population.
:param int source: The source population ID.
"""
if source is None:
source = -1
else:
self._check_population_references([source])
if dest is None:
dest = -1
else:
self._check_population_references([dest])
return self.add_event(
MigrationRateChange(time=time, source=source, dest=dest, rate=rate)
)
def add_symmetric_migration_rate_change(
self, time: float, populations: list[str | int], rate: float
) -> SymmetricMigrationRateChange:
"""
Sets the symmetric migration rate between all pairs of populations in
the specified list to the specified value. For a given pair of population
IDs ``j`` and ``k``, this sets ``migration_matrix[j, k] = rate``
and ``migration_matrix[k, j] = rate``.
Please see the :ref:`sec_demography_migration` section for more details.
Populations may be specified either by their integer IDs or by
their string names.
:param float time: The time at which this event occurs in generations.
:param list populations: An sequence of population identifiers (integer
IDs or string names).
:param float rate: The new migration rate.
"""
self._check_population_references(populations)
return self.add_event(
SymmetricMigrationRateChange(time=time, populations=populations, rate=rate)
)
def add_population_parameters_change(
self,
time: float,
*,
initial_size: float | None = None,
growth_rate: float | None = None,
population: int | None = None,
) -> PopulationParametersChange:
"""
Changes the size parameters of a population (or all populations)
at a given time.
Please see the :ref:`sec_demography_populations` section for more details.
:param float time: The length of time ago at which this event
occurred.
:param float initial_size: The number of individuals in the population
at the beginning of the time slice starting at ``time``. If None,
the initial_size of the population is computed according to
the initial population size and growth rate over the preceding
time slice.
:param float growth_rate: The new per-generation growth rate. If None,
the growth rate is not changed. Defaults to None.
:param str, int population: The ID of the population affected. If
``population`` is None, the changes affect all populations
simultaneously.
"""
if population is not None:
self._check_population_references([population])
event = PopulationParametersChange(
time,
initial_size=initial_size,
growth_rate=growth_rate,
population=population,
)
return self.add_event(event)
def add_simple_bottleneck(
self,
time: float,
population: int | str,
proportion: float | None = None,
) -> SimpleBottleneck:
"""
Adds a population bottleneck at the specified time in which each lineage
has probability equal to ``proportion`` of coalescing into a single
ancestor.
Please see the :ref:`sec_demography_events_simple_bottleneck` section
for more details.
:param float time: The length of time ago at which this event
occurred.
:param str, int population: The ID of the population affected.
:param float proportion: The probability of each lineage coalescing
into a single ancestor. Defaults to 1.0.
"""
proportion = 1.0 if proportion is None else proportion
self._check_population_references([population])
return self.add_event(
SimpleBottleneck(time=time, population=population, proportion=proportion)
)
def add_instantaneous_bottleneck(
self, time: float, *, population: str | int, strength: float
) -> InstantaneousBottleneck:
"""
Adds a bottleneck at the specified time in the specified population
that is equivalent to the coalescent process running for ``strength``
generations.
Please see the :ref:`sec_demography_events_instantaneous_bottleneck`
section for more details.
.. note:: The :ref:`ploidy<sec_ancestry_ploidy_coalescent_time_scales>`
is also use to scale the time scale of the coalescent process
during the bottleneck.
:param float time: The length of time ago at which this event
occurred.
:param str, int population: The ID of the population affected.
:param float strength: The equivalent amount of time in the standard
coalescent.
"""
self._check_population_references([population])
return self.add_event(
InstantaneousBottleneck(time=time, population=population, strength=strength)
)
def add_census(self, time: float) -> CensusEvent:
"""
Adds a "census" event at the specified time. In a census we add a node
to each branch of every tree, thus recording the population that each
lineage is in at the specified time.
This may be used to record all ancestral haplotypes present at that
time, and to extract other information related to these haplotypes: for
instance to trace the local ancestry of a sample back to a set of
contemporaneous ancestors, or to assess whether a subset of samples has
coalesced more recently than the census time. (However, these added
nodes will only represent the portions of ancestral genomes inherited
by the samples, rather than complete ancestral genomes.)
See :ref:`sec_ancestry_census_events` for more details.
.. warning:: When used in the conjunction with the DTWF model
non-integer census times should be used to guarantee that
the census nodes don't coincide with coalescences (and
therefore zero branch length errors).
See :ref:`sec_ancestry_census_events_dtwf` for more details.
:param float time: The time at which the census should occur.
"""
return self.add_event(CensusEvent(time))
def _populations_table(self):
cols = [
("id", ""),
("name", ""),
("description", ""),
("initial_size", ".1f"),
("growth_rate", ".2g"),
("default_sampling_time", ".2g"),
("extra_metadata", ""),
]
data = [
[f"{getattr(pop, attr):{fmt}}" for attr, fmt in cols]
for pop in self.populations
]
return [title for title, _ in cols], data
def _populations_text(self):
col_titles, data = self._populations_table()
alignments = ["^", "<", "<", "<", "^", ">", "<"]
data = [
[
[item.as_text() if isinstance(item, core.TableEntry) else item]
for item in row
]
for row in data
]
return core.text_table(
"Populations", [[title] for title in col_titles], alignments, data
)
def _populations_html(self):
col_titles, data = self._populations_table()
return core.html_table(
f"Populations ({len(self.populations)})", col_titles, data
)
def _migration_rate_info(self, source, dest, rate):
extra = None
if source != dest:
source_name = self.populations[source].name
dest_name = self.populations[dest].name
extra = (
"Backwards in time migration rate from population "
f"{source_name} to {dest_name} = {rate} per generation. "
"Forwards in time, this is the expected number of migrants "
f"moving from {dest_name} to {source_name} "
f"per generation, divided by the size of {source_name}."
)
return core.TableEntry(f"{rate:.4g}", extra)
def _migration_matrix_table(self):
col_titles = [""] + [pop.name for pop in self.populations]
data = []
for j in range(self.num_populations):
row = [self.populations[j].name] + [
self._migration_rate_info(j, k, self.migration_matrix[j, k])
for k in range(self.num_populations)
]
data.append(row)
return col_titles, data
def _migration_matrix_text(self):
col_titles, data = self._migration_matrix_table()
alignments = ">" + "^" * self.num_populations
data = [
[
[item.as_text() if isinstance(item, core.TableEntry) else item]
for item in row
]
for row in data
]
return core.text_table(
"Migration Matrix", [[title] for title in col_titles], alignments, data
)
def _migration_matrix_html(self):
if np.all(self.migration_matrix == 0):
return core.html_table("Migration matrix (all zero)", [], [])
else:
col_titles, data = self._migration_matrix_table()
return core.html_table("Migration matrix", col_titles, data)
def _events_text(self, events, title="Events"):
col_titles = [["time"], ["type"], ["parameters"], ["effect"]]
alignments = "><<<"
data = []
for event in events:
type_text = textwrap.wrap(event._type_str, 15)
description = textwrap.wrap(event._parameters(), 22)
effect = textwrap.wrap(event._effect(), 38)
row = [[f"{event.time:.4g}"], type_text, description, effect]
data.append(row)
return core.text_table(
title, col_titles, alignments, data, internal_hlines=True
)
def _events_html(self, events, title=None):
if title is None:
title = f"Events ({len(events)})"
if len(self.events) == 0:
return core.html_table(title, [], [])
col_titles = ["time", "type", "parameters", "effect"]
data = []
for event in events:
class_name = event.__class__.__name__
camel_case = re.sub(r"(?<!^)(?=[A-Z])", "_", class_name).lower()
add_method = f"msprime.Demography.add_{camel_case}"
# TODO change this to stable when 1.0 is released.
type_html = (
"<a href='https://tskit.dev/msprime/docs/latest/api.html#"
f"{add_method}'>{event._type_str}</a>"
)
row = [f"{event.time:.4g}", type_html, event._parameters(), event._effect()]
data.append(row)
return core.html_table(title, col_titles, data, no_escape=[1])
def _repr_html_(self):
resolved = self.validate()
return (
'<div style="margin-left:20px">'
+ resolved._populations_html()
+ resolved._migration_matrix_html()
+ resolved._events_html(self.events)
+ "</div>"
)
def __str__(self):
resolved = self.validate()
populations = resolved._populations_text()
migration_matrix = resolved._migration_matrix_text()
events = resolved._events_text(self.events)
def indent(table):
lines = table.splitlines()
s = "╟ " + lines[0] + "\n"
for line in lines[1:]:
s += "║ " + line + "\n"
return s
s = (
"Demography\n"
+ indent(populations)
+ indent(migration_matrix)
+ indent(events)
)
return s
def __len__(self):
return len(self.populations)
def __iter__(self):
for pop in self.populations:
yield pop.name
def __getitem__(self, identifier):
"""
Returns the population with the specified ID or name.
"""
if isinstance(identifier, str):
for population in self.populations:
if population.name == identifier:
return population
else:
raise KeyError(f"Population with name '{identifier}' not found")
elif isinstance(identifier, numbers.Integral):
# We don't support negative indexing here because -1 is used as
# way to refer to *all* populations in demographic events, and
# it would be too easy to introduce bugs in old code if we changed
# the meaning of this.
if identifier < 0 or identifier >= self.num_populations:
raise KeyError(f"Population id {identifier} out of bounds")
return self.populations[identifier]
raise KeyError(
"Keys must be either string population names or integer IDs:"
f"identifier '{identifier}' is of type {type(identifier)}"
)
@property
def num_populations(self):
return len(self.populations)
@property
def num_events(self):
return len(self.events)
def _validate_populations(self):
names = set()
for j, population in enumerate(self.populations):
if population.id != j:
raise ValueError(
"Incorrect population ID. ID values should not be updated "
"by users. Please use Demography.add_population to add extra "
"populations after initialisation."
)
population.validate()
if population.name in names:
raise ValueError(f"Duplicate population name: '{population.name}'")
names.add(population.name)
def validate(self):
"""
Checks the demography looks sensible and raises errors/warnings
appropriately, and return a copy in which all default values have
been appropriately resolved.