In [1]:
import numpy as np
import matplotlib.pyplot as plt
import random
import pandas as pd

In [2]:
class Node:
    def __init__(self, name=None, parent=None, birth_time=0.0):
        self.name = name
        self.parent = parent
        self.left = None
        self.right = None
        self.birth_time = birth_time
        self.branch_length = 0.0

    def is_leaf(self):
        return self.left is None and self.right is None


def yule_tree(n_leaves, birth_rate=1.0, seed=None):
    """
    Simulate an ultrametric phylogenetic tree under the Yule (pure birth) model.
    """

    if seed is not None:
        random.seed(seed)
        np.random.seed(seed)

    # Start at time 0 with one lineage
    root = Node(birth_time=0.0)
    active_lineages = [root]

    current_time = 0.0

    # Forward simulation
    while len(active_lineages) < n_leaves:
        k = len(active_lineages)

        # waiting time ~ Exp(k * lambda)
        wait_time = np.random.exponential(1.0 / (k * birth_rate))
        current_time += wait_time

        # choose lineage to split
        lineage = random.choice(active_lineages)

        # create children at current_time
        left = Node(parent=lineage, birth_time=current_time)
        right = Node(parent=lineage, birth_time=current_time)

        lineage.left = left
        lineage.right = right

        # branch length from parent = split_time - birth_time
        lineage.branch_length = current_time - lineage.birth_time

        # update active set
        active_lineages.remove(lineage)
        active_lineages.extend([left, right])

    # Final tree height
    total_height = current_time

    # Extend all remaining active lineages to total_height
    for leaf in active_lineages:
        leaf.branch_length = total_height - leaf.birth_time

    # Label leaves
    for i, leaf in enumerate(active_lineages):
        leaf.name = f"t{i+1}"

    return root


def to_newick(node):
    if node.is_leaf():
        return f"{node.name}:{node.branch_length:.6f}"
    else:
        left = to_newick(node.left)
        right = to_newick(node.right)
        return f"({left},{right}):{node.branch_length:.6f}"


def generate_yule_newick(n_leaves, birth_rate=1.0, seed=None):
    root = yule_tree(n_leaves, birth_rate, seed)
    return to_newick(root) + ";"


In [5]:
# Evolve Cognate by CTMC
def evolve_cognate(root, root_state, gain_rate, loss_rate, seed=None):
    """
    Simulate evolution of a single binary cognate.
    Returns states keyed by leaf name.
    """

    if seed is not None:
        np.random.seed(seed)

    states = {root: root_state}
    total_rate = gain_rate + loss_rate

    def evolve_branch(parent, child):
        parent_state = states[parent]
        t = child.branch_length

        if total_rate == 0:
            states[child] = parent_state
            return

        exp_term = np.exp(-total_rate * t)

        if parent_state == 0:
            p01 = (gain_rate / total_rate) * (1 - exp_term)
            states[child] = int(np.random.rand() < p01)
        else:
            p10 = (loss_rate / total_rate) * (1 - exp_term)
            states[child] = int(not (np.random.rand() < p10))

    def traverse(node):
        if node.left is not None:
            evolve_branch(node, node.left)
            traverse(node.left)
        if node.right is not None:
            evolve_branch(node, node.right)
            traverse(node.right)

    traverse(root)

    # Return only leaf states keyed by leaf name
    leaf_states = {}
    for node, state in states.items():
        if node.is_leaf():
            leaf_states[node.name] = state

    return leaf_states


In [7]:
def generate_independent_dataset(tree, languages, n_sites,
                                 gain_rate=0.4, loss_rate=0.6,
                                 root_freq=0.3):
    """
    Generates a cognate matrix under full site independence.
    """
    rows = []

    for i in range(n_sites):
        root_state = int(np.random.rand() < root_freq)
        states = evolve_cognate(tree, root_state, gain_rate, loss_rate)
        rows.append([states[lang] for lang in languages])

    df = pd.DataFrame(rows, columns=languages)
    df.index = [f"cog_{i}" for i in range(len(df))]
    return df

In [17]:
# Rewrite this based on if t1 exists only then t2 has chance of being there etc
def generate_dependent_dataset(tree, languages,
                               n_blocks=50,
                               block_size=5,
                               base_gain=0.4,
                               base_loss=0.6,
                               root_freq=0.3,
                               rate_multipliers=(0.5, 2.0)):
    """
    Generates a cognate matrix with weak site dependence via
    shared latent rate regimes.
    """
    rows = []

    for block in range(n_blocks):
        rate_multiplier = np.random.choice(rate_multipliers)

        for _ in range(block_size):
            root_state = int(np.random.rand() < root_freq)
            states = evolve_cognate(
                tree,
                root_state,
                gain_rate=base_gain * rate_multiplier,
                loss_rate=base_loss * rate_multiplier
            )
            rows.append([states[lang] for lang in languages])

    df = pd.DataFrame(rows, columns=languages)
    df.index = [f"cog_{i}" for i in range(len(df))]
    return df


In [9]:
def ascertain(df):
    """
    Applies ascertainment correction by removing all-zero sites.
    """
    return df.loc[df.sum(axis=1) > 0].copy()

def get_leaf_names(root):
    leaves = []

    def collect(node):
        if node.is_leaf():
            leaves.append(node.name)
        else:
            collect(node.left)
            collect(node.right)

    collect(root)
    return leaves


In [18]:
np.random.seed(42)  # reproducibility

n_leaves = 4
birth_rate = 1.0

tree = yule_tree(n_leaves, birth_rate, seed=42)
languages = get_leaf_names(tree)

# Dataset A: independent sites
independent = generate_independent_dataset(
    tree, languages, n_sites=300
)
independent = ascertain(independent)

# Dataset B: weakly dependent sites
dependent = generate_dependent_dataset(
    tree, languages,
    n_blocks=60,
    block_size=5
)
dependent = ascertain(dependent)

print("Independent dataset shape:", independent.shape)
print("Dependent dataset shape:", dependent.shape)

print("\nIndependent (head):")
print(independent.head())

print("\nDependent (head):")
print(dependent.head())

Independent dataset shape: (207, 4)
Dependent dataset shape: (206, 4)

Independent (head):
        t2  t3  t4  t1
cog_1    1   0   0   0
cog_5    1   0   0   1
cog_7    0   0   0   1
cog_9    0   1   1   0
cog_10   0   0   0   1

Dependent (head):
       t2  t3  t4  t1
cog_0   1   1   1   0
cog_1   1   1   1   0
cog_2   1   0   0   0
cog_3   0   1   1   0
cog_4   0   1   1   0


### Create nex File

In [19]:
def df_to_sequences(df):
    """
    Rows = cognates, columns = languages
    Output: {language: '010101...'}
    """
    sequences = {}
    for lang in df.columns:
        sequences[lang] = "".join(df[lang].astype(str).tolist())
    return sequences

def write_nexus(df, outfile):
    sequences = df_to_sequences(df)

    taxa = list(sequences.keys())
    ntax = len(taxa)
    nchar = len(next(iter(sequences.values())))

    with open(outfile, "w") as f:
        f.write("#NEXUS\n\n")

        f.write("BEGIN DATA;\n")
        f.write(f"\tDIMENSIONS NTAX={ntax} NCHAR={nchar};\n")
        f.write("\tFORMAT MISSING=? GAP=- DATATYPE=STANDARD SYMBOLS=\"01\";\n")
        f.write("\tMATRIX\n")

        for taxon in taxa:
            f.write(f"\t{taxon:<10} {sequences[taxon]}\n")

        f.write("\t;\n")
        f.write("END;\n")

In [20]:
write_nexus(independent, "independent.nex")
write_nexus(dependent, "dependent.nex")