In [None]:
import pandas
import numpy as np
from scipy.linalg import expm
import tree
import importlib

In [12]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Site-wise

In [None]:
# THis is the likelihood for one site. For multiple sites, we need to use log-transformed and addition to add them all up
def get_site_parent_probs(Q, t1, t2, v1, v2):
    # Get probability matrix
    P1 = scipy.linalg.expm(Q * t1) #t1=0.1
    P2 = scipy.linalg.expm(Q * t2) #t2=0.15

    # Get individual evolutionary matrix
    v_evo_1 = np.matmul(P1, v1)
    v_evo_2 = np.matmul(P2, v2)

    probs_p = [v_evo_1[i] * v_evo_2[i] for i in range(len(v1))]

    return probs_p


site_likelihoods = np.log(np.matmul(probs_p, [0.25,0.25,0.25,0.25]))

seq_likelihoods = np.sum(list_of_site_likelihoods)

# For a whole sequence

In [3]:
# THis is the likelihood for one site. For multiple sites, we need to use log-transformed and addition to add them all up
def get_seq_parent_prob(Q: np.array, 
                        dis_list: list, 
                        mat_list: list):
    from scipy.linalg import expm
    import numpy as np
    """
    dist_list: list of distance between the parent and the two leaf nodes
    m_list: List pf probability matrices (4, N) of sequences where N is the length of the sequence, each column is one site;
    * The two lists need to be corresponding to each other
    Q: Transitional rate matrix (predefined)
    """
    if len(dis_list) != len(mat_list):
        ValueError("Branch distant list (dist_list) and probability matrix list(m_list) need to be corresponding.")

    if not all([mat_list[0].shape == x.shape for x in mat_list[1:]]):
        ValueError(f"All probability matrices need to be in the same shape.")


    def correct_shape_m(mat):
        if mat.shape[0] != 4 and mat.shape[1] == 4:
            mat = mat.transpose()
            Warning(f"Matrix has shape {mat.shape}, transposed for downstream analysis")
        return mat
    
    seqm_list = [correct_shape_m(x) for x in mat_list]

    tpm_list = [expm(Q * d) for d in dis_list]
    evom_list = [np.matmul(tpm, seqm) for (tpm, seqm) in  zip(tpm_list, seqm_list)]

    # Get individual evolutionary matrix
    m_probs_p = np.prod(np.stack(evom_list), axis=0)

    return m_probs_p

In [4]:
import warnings
import numpy as np
from scipy.linalg import expm

def get_seq_parent_prob(
    Q: np.ndarray,
    dist_list: list[float],
    mat_list: list[np.ndarray],
) -> np.ndarray:
    """
    Combine child site-wise probability matrices under a time-reversible 4-state model.

    Parameters
    ----------
    Q : (4,4) np.ndarray
        Rate matrix. Each branch transition matrix is expm(Q * t).
    dist_list : list[float]
        Branch lengths (time/distances) for each child.
    mat_list : list[np.ndarray]
        List of child probability matrices, each of shape (4, N) or (N, 4).
        Columns correspond to sites; rows correspond to states (A,C,G,T) or similar.

    Returns
    -------
    m_probs_p : (4, N) np.ndarray
        Parent state likelihoods per site (unnormalized if normalize=False).

    Notes
    -----
    - If a child matrix is (N, 4), it is transposed to (4, N) with a warning.
    - All child matrices must have the same N.
    - dist_list and mat_list must have the same length and be non-empty.
    """

    # Basic checks
    if Q.shape != (4, 4):
        raise ValueError(f"Q must be (4,4), got {Q.shape}.")
    if len(dist_list) != len(mat_list):
        raise ValueError("dist_list and mat_list must be the same length.")
    if len(mat_list) == 0:
        raise ValueError("mat_list cannot be empty.")
    if any(d < 0 for d in dist_list):
        warnings.warn("Negative branch length found in dist_list.", RuntimeWarning)

    # sanity check and modification to ensure each child matrix has shape (4, N)
    def to_4xN(mat: np.ndarray, idx: int) -> np.ndarray:
        if mat.ndim != 2:
            raise ValueError(f"mat_list[{idx}] must be 2D, got ndim={mat.ndim}.")
        r, c = mat.shape
        if r == 4:
            return mat
        # In case r !=4 but c==4
        if c == 4:
            warnings.warn(
                f"mat_list[{idx}] is (N,4); transposing to (4,N).",
                RuntimeWarning,
                stacklevel=2,
            )
            return mat.T
        raise ValueError(
            f"mat_list[{idx}] must be (4,N) or (N,4); got {mat.shape}."
        )

    seqm_list = [to_4xN(m.astype(float, copy=False), i) for i, m in enumerate(mat_list)]

    # Check all N equal
    N = seqm_list[0].shape[1]
    if not all(m.shape == (4, N) for m in seqm_list[1:]):
        shapes = [m.shape for m in seqm_list]
        raise ValueError(f"All matrices must share the same (4,N) shape; got {shapes}.")

    # Transition matrices per branch
    tpm_list = [expm(Q * float(t)) for t in dist_list]  # each is (4,4)

    # Evolve each child up to the parent (T * child_probs), shape (4,N)
    evom_list = [T @ S for T, S in zip(tpm_list, seqm_list)]

    # Combine independent children by element-wise product across children
    # result shape (4, N)
    m_probs_p = np.prod(np.stack(evom_list, axis=0), axis=0)

    return m_probs_p


In [None]:
species1 = np.array([[1,0,0,0], [0,1,0,0]])
species2 = np.array([[0,1,0,0], [0,1,0,0]])
u=0.3
Q = np.array([[-3*u,u,u,u],[u,-3*u,u,u],[u,u,-3*u,u],[u,u,u,-3*u]])
t1 = 0.1
t2 = 0.15

In [40]:
m_parent = get_seq_parent_prob(Q=Q, 
                               dis_list = [t1, t2], 
                               mat_list = [species1, species2]) 

# this only needs to be done in the last step
seq_likelihood = np.matmul(m_parent.transpose(), [0.25,0.25,0.25,0.25])
seq_ll = np.sum(np.log(seq_likelihood))
seq_ll

np.float64(-5.725259670749095)

In [65]:
tree_df = pd.read_csv(path.join("dataset", "ENSG00000013016_EHD3_NT.table.dat"), names=["parent", "child"])

In [None]:
importlib.reload(tree)

<module 'tree' from '/Users/jiesun/Desktop/Tree/Trees/tree.py'>

In [23]:
def parse_test_data():
    tree_f = path.join("dataset", "ENSG00000013016_EHD3_NT.table.dat")
    seq_f = path.join("dataset", "ENSG00000013016_EHD3_NT.msa.dat")
    branch_length_f = path.join("dataset", "ENSG00000013016_EHD3_NT.branchlength.dat")
    return tree.Tree(tree_f, seq_f, branch_length_f)

T = parse_test_data()

In [43]:
T

<tree.Tree at 0x14f8c8980>

In [63]:
pd.read_csv(
            path.join("dataset", "ENSG00000013016_EHD3_NT.branchlength.dat"),
            header=None,
        ).transpose()

Unnamed: 0,0
0,0.067
1,0.050
2,0.027
3,0.011
4,0.057
...,...
225,0.003
226,0.014
227,0.082
228,0.017


# Epilog

In [1]:
%connect_info

{"key":"f1fbc6ad-cf78-4cb9-8b2a-879a2b3c1dc7","signature_scheme":"hmac-sha256","transport":"tcp","ip":"127.0.0.1","hb_port":9000,"control_port":9001,"shell_port":9002,"stdin_port":9003,"iopub_port":9004,"kernel_name":"python3130jvsc74a57bd06cef1f122e7374cdb60f7d1da675f3f9e949f90b13f736e08d402034b700a9ed"}

Paste the above JSON into a file, and connect with:
    $> jupyter <app> --existing <file>
or, if you are local, you can connect with just:
    $> jupyter <app> --existing kernel-v3f6cfcdb3845771759625bb8e741afaab9c14b494.json
or even just:
    $> jupyter <app> --existing
if this is the most recent Jupyter kernel you have started.
