In [1]:
import numpy        as np
import numpy.random as nr
import numpy.linalg as nl
import scipy.stats  as ss
import scipy.sparse as sp
import pandas       as pd

In [2]:
import matplotlib.pyplot as plt
%matplotlib inline

In [3]:
%precision 3
np.set_printoptions(precision=3)

In [4]:
def equicorrelation_matrix(p, M=2):
    """Create an equicorrelation matrix
    
    Arguments:
    p -- Off-diagonal correlation (rho)
    M -- Size of correlation matrix (M x M)
    
    Returns:
    Sigma -- An M x M equicorrelation matrix"""

    x = np.full((M,M), p)
    np.fill_diagonal(x, 1)
    return(x)

In [5]:
equicorrelation_matrix(0.2, 5)

array([[ 1. ,  0.2,  0.2,  0.2,  0.2],
       [ 0.2,  1. ,  0.2,  0.2,  0.2],
       [ 0.2,  0.2,  1. ,  0.2,  0.2],
       [ 0.2,  0.2,  0.2,  1. ,  0.2],
       [ 0.2,  0.2,  0.2,  0.2,  1. ]])

In [6]:
def banded_matrix(p, K, M):
    """Create a banded correlation matrix.  
    
    Arguments:
    p -- Correlation between neighboring entries (rho)
    K -- number of off-diagonals (fills in from -K to K)
    M -- Size of correlation matrix (M x M)
    
    Returns:
    Sigma -- M x M matrix with p^k on each kth off-diagonal.
    """
    k  = np.arange(K)+1
    pk = p**k
    return(sp.diags(
        np.concatenate([[1], pk, pk]),
        np.concatenate([[0], k, -k]),
        (M,M)
    ).toarray())

In [7]:
banded_matrix(0.45, 2, 5)

array([[ 1.   ,  0.45 ,  0.203,  0.   ,  0.   ],
       [ 0.45 ,  1.   ,  0.45 ,  0.203,  0.   ],
       [ 0.203,  0.45 ,  1.   ,  0.45 ,  0.203],
       [ 0.   ,  0.203,  0.45 ,  1.   ,  0.45 ],
       [ 0.   ,  0.   ,  0.203,  0.45 ,  1.   ]])

In [8]:
def block_banded_matrix(p, K, M1, M2):
    """Create a block banded correlation matrix.
    
    A "block banded matrix" is similar to a "banded matrix" except
    the rows/columns have been rearranged so that there is a
    top-left block (corresponding to untyped SNPs) and a bottom-
    right block (corresponding to typed SNPs).
    
    Arguments:
    p  -- Correlation between neighboring entries (rho)
    K  -- number of off-diagonals (fills in from -K to K)
    M1 -- Size of top left block (M1 x M1)
    M2 -- Size of bottom right block (M2 x M2)
    
    Returns:
    Sigma -- the block banded matrix
    """
    M = M1 + M2
    
    Sigma = banded_matrix(p, K, M)
    
    top_left = np.linspace(0, M, M1, False).astype('uint')
    mask = np.zeros(M).astype('bool')
    mask[top_left] = True
    
    Sigma11 = Sigma[ mask][:, mask]
    Sigma12 = Sigma[ mask][:,~mask]
    Sigma22 = Sigma[~mask][:,~mask]
    
    return(np.bmat(
            [
                [Sigma11,   Sigma12],
                [Sigma12.T, Sigma22]
            ]))

In [9]:
Sigma = block_banded_matrix(0.45, 2, 2, 3)
Sigma

matrix([[ 1.   ,  0.203,  0.45 ,  0.   ,  0.   ],
        [ 0.203,  1.   ,  0.45 ,  0.45 ,  0.203],
        [ 0.45 ,  0.45 ,  1.   ,  0.203,  0.   ],
        [ 0.   ,  0.45 ,  0.203,  1.   ,  0.45 ],
        [ 0.   ,  0.203,  0.   ,  0.45 ,  1.   ]])

In [10]:
def multivariate_normal_sample(mean, cov, size=None, bias=True, ddof=None):
    """Draw random samples from a multivariate-normal-like distribution with specified sample mean and covariance.
    
    This is similar to numpy.random.multivariate_normal, except that the sample
    mean and covariance are guaranteed to be what you specify. It functions very
    similarly to how you generate a multivariate normal sample normally with one
    added step:
    
    1.  Perform a Cholesky decomposition on the covariance
    2.  Simulate N x size random numbers
    3.* Center and orthonormalize them
    4.  Multiply the random sample by the square root of the covariance
    
    Arguments:
    mean -- mean of the N-dimensional distribution
    cov -- covariance
    size -- how many samples to generate
    bias -- similar to numpy.cov bias
    ddof -- similar to numpy.cov ddof
    """
    N = len(mean)

    if len(cov.shape) != 2:
        raise ValueError('cov must be 2 dimensional')

    if cov.shape[0] != cov.shape[1]:
        raise ValueError('cov must be square')
    
    if N != cov.shape[0]:
        raise ValueError('mean and cov must have same length')

    if ddof is None:
        if bias:
            ddof = 0
        else:
            ddof = 1

    if size is None:
        size = N+1
    
    if ddof >= size:
        raise ValueError('ddof must be less than size')

    Sigma12 = nl.cholesky(cov)

    Z  = nr.normal(size=(N, size))
    Z -= Z.mean(axis=1)[:,None]
    Z  = nl.qr(Z.T)[0].T * np.sqrt(size - ddof)

    return((Sigma12.dot(Z) + mean[:,None]).T)

In [11]:
Z = multivariate_normal_sample(np.zeros(5), Sigma)
Z

matrix([[-1.328,  0.179, -0.379,  1.753,  0.877],
        [ 0.766, -0.621, -0.919, -0.107, -1.238],
        [ 0.499,  0.123, -0.739, -0.269,  1.611],
        [-0.723, -1.809, -0.13 , -1.387, -0.54 ],
        [-0.741,  1.187,  0.057, -0.673, -0.884],
        [ 1.526,  0.942,  2.109,  0.683,  0.173]])

In [12]:
np.cov(Z.T, bias=True)

array([[  1.000e+00,   2.025e-01,   4.500e-01,  -2.961e-16,  -1.943e-16],
       [  2.025e-01,   1.000e+00,   4.500e-01,   4.500e-01,   2.025e-01],
       [  4.500e-01,   4.500e-01,   1.000e+00,   2.025e-01,  -3.701e-17],
       [ -2.961e-16,   4.500e-01,   2.025e-01,   1.000e+00,   4.500e-01],
       [ -1.943e-16,   2.025e-01,  -3.701e-17,   4.500e-01,   1.000e+00]])

In [13]:
def sim_geno(p, Sigma, N, binomial=True):
    """Simulate genotypes.
    
    Arguments:
    p        -- Vector of allele frequencies
    Sigma    -- LD matrix
    N        -- Number of samples
    binomial -- Generate binomial (True, default) or normal genotypes (False)
    
    Returns:
    G -- Genotype matrix
    """
    if type(p) is list:
        p = np.array(p)

    if type(Sigma) is list:
        Sigma = np.array(Sigma)

    if len(Sigma.shape) != 2:
        raise ValueError('Sigma must be 2D')
    
    if Sigma.shape[0] != Sigma.shape[1]:
        raise ValueError('Sigma must be square')

    if p.shape[0] != Sigma.shape[0]:
        raise ValueError('p and Sigma must have same number of rows')
    
    M = p.shape[0]
    
    X = multivariate_normal_sample(np.zeros(M), Sigma, N).T

    if binomial:
        Z = ss.norm().sf(X)
        G = np.array([ ss.binom(2, pi).isf(Zi) for pi, Zi in zip(p, Z) ])
        return(G)

    else:
        X = np.array(X)
        G = X * np.sqrt(2*p*(1-p))[:, None] + 2*p[:,None]
        return(G)

In [14]:
p = nr.uniform(0.05, 0.95, 5)

In [15]:
G1 = sim_geno(p, Sigma, 10)
G1

array([[ 1.,  1.,  1.,  2.,  0.,  2.,  0.,  2.,  0.,  1.],
       [ 0.,  0.,  1.,  1.,  1.,  2.,  1.,  1.,  1.,  2.],
       [ 1.,  1.,  1.,  2.,  0.,  2.,  0.,  1.,  0.,  2.],
       [ 0.,  0.,  1.,  0.,  1.,  2.,  0.,  0.,  0.,  1.],
       [ 2.,  1.,  2.,  1.,  1.,  2.,  2.,  2.,  2.,  2.]])

In [16]:
G2 = sim_geno(p, Sigma, 10, False)
G2

array([[ 0.104,  0.258,  2.021,  0.794,  0.254,  1.375,  1.804,  1.983,
         0.599,  1.325],
       [ 1.307,  0.338,  0.447,  0.983, -0.188,  0.404,  0.899,  1.488,
         1.456,  2.396],
       [ 0.631, -0.205,  0.563,  0.612,  1.28 ,  1.349,  0.892,  2.53 ,
         1.335,  1.652],
       [ 0.609,  0.047, -0.49 , -0.07 ,  0.192,  0.843,  1.348, -0.085,
         0.738,  1.29 ],
       [ 2.058,  2.029,  1.761,  1.239,  1.314,  2.392,  1.859,  1.721,
         2.352,  1.871]])

In [17]:
def he_regression(X, Y):
    """Perform Haseman-Elston regression.
    
    Arguments:
    X -- Independent variables (M x N)
    Y -- Dependent variable (N x 1)
    
    Returns:
    var1 -- Variance from typed SNPs
    """
    Xc = X - X.mean(axis=1)[:,None]
    Yc = Y - Y.mean()
    
    A = Xc.T.dot(Xc) / X.shape[0]
    B = Yc[:,None].dot(Yc[None,:])
    return(np.triu(A*B,1).sum() / np.triu(A*A,1).sum())

In [18]:
def he_regression_bivar(X1, X2, Y1, Y2):
    """Perform bivariate Haseman-Elston regression.
    
    Arguments:
    X1 -- First set of independent variables (M x N1)
    X2 -- Second set of independent variables (M x N2)
    Y1 -- First set of dependent variables (N1 x 1)
    Y2 -- Second set of dependent variables (N2 x 1)
    
    Returns:
    var1 -- Genetic variance for first set of variables
    var2 -- Genetic variance for second set of variables
    cov12 -- Cross-variable correlation
    """
    if (X1.shape[0] != X2.shape[0]):
        raise ValueError(
            'M1 ({}) != M2 ({})'.format(X1.shape[0], X2.shape[0])
        )

    var1 = he_regression(X1, Y1)
    var2 = he_regression(X2, Y2)

    X1c = X1 - X1.mean(axis=1)[:,None]
    X2c = X2 - X2.mean(axis=1)[:,None]
    
    Y1c = Y1 - Y1.mean()
    Y2c = Y2 - Y2.mean()

    A = X1c.T.dot(X2c) / X1.shape[0]
    B = Y1c[:,None].dot(Y2c[None,:])
    
    cov12 = (A*B).sum() / (A*A).sum()
    
    return(var1, var2, cov12 / np.sqrt(var1*var2))

In [19]:
b = multivariate_normal_sample(np.zeros(2), equicorrelation_matrix(0.8), 5).T

Y1 = G1.T.dot(b[0])
Y2 = G2.T.dot(b[1])

he_regression_bivar(G1, G2, Y1, Y2)

(0.858, 0.796, 0.258)

In [20]:
def fromfile_tril(*args, **kwargs):
    """Read a symmetric matrix stored as just the lower triangular part.

    For an MxM matrix, the lower triangular part takes up just N=M*(M+1)/2 bytes
    instead of M^2 bytes. Reversing this formula using the quadratic equation,
    M=(sqrt(1+8N)-1)/2. This is simply a wrapper around numpy.fromfile.
    """
    X = np.fromfile(*args, **kwargs)

    N = len(X)
    M = (np.sqrt(1+8*N)-1)/2
    if int(M) != M:
        raise ValueError('Length of matrix not of form M*(M+1)/2')
    M = int(M)

    Y = np.empty((M,M), X.dtype)
    indices = np.tril_indices(M)

    Y[indices] = Y[indices[::-1]] = X

    return(Y)

In [21]:
def tau(Ss, S2s=None):
    """Tau function which is the core of this work
    
    Arguments:
    Ss  -- ld matrix
    S2s -- squared ld matrix (optional)
    
    Returns:
    Tau function of given LD matrix
    """
    
    if S2s is None:
        S2s = Ss**2

    return(
        Ss.prod(axis=0).sum() /
        np.sqrt(S2s.sum(axis=(1,2)).prod())
    )

In [22]:
def tau_ratio(Ss, typed, S2s=None):
    """Tau ratio function which wraps around tau
    
    Arguments:
    Ss    -- ld matrix
    typed -- indicates which SNPs are typed
    S2s   -- squared ld matrix (optional)
    
    Returns:
    Tau ratio function
    """
    if S2s is None:
        S2s = Ss**2
    
    return(
        tau(Ss[:,typed], S2s[:,typed]) /
        tau(Ss[:,typed[:,None],typed], S2s[:,typed[:,None],typed])
    )

In [23]:
def tau_distance(Ss, dist, S2s=None):
    """Tau function which is the core of this work
    
    Arguments:
    Ss   -- ld matrix
    dist -- distance matrix
    S2s  -- squared ld matrix (optional)
    
    Returns:
    Tau function of given LD matrix
    """

    Ss_flat   = Ss.reshape((2,-1))
    
    if S2s is None:
        sq = Ss_flat**2
    else:
        sq        = S2s.reshape((2,-1))

    dist_flat = dist.flatten()

    cr = Ss_flat.prod(axis=0)

    cr_sum = cr.sum()
    sq_sum = sq.sum(axis=1)

    i_sort = np.argsort(dist_flat)
    
    d = -1
    taus = list()

    for i in i_sort[::-1]:
        if (d != dist_flat[i]):
            d = dist_flat[i]
            taus.append((d, cr_sum / np.sqrt(sq_sum.prod())))

        if (d == 0):
            break

        cr_sum -= cr[i]
        sq_sum -= sq[:,i]

    return(np.array(taus))

In [24]:
def tau_ratio_distance(Ss, typed, dist, S2s=None):
    if S2s is None:
        S2s = Ss**2

    tau_STA = tau_distance(Ss[:,typed], dist[typed], S2s[:,typed])
    tau_STT = tau_distance(Ss[:,typed[:,None],typed], dist[typed[:,None],typed], S2s[:,typed[:,None],typed])

    tau_STA_df = pd.DataFrame(tau_STA[::-1], columns=['dist', 'tau_STA'])
    tau_STT_df = pd.DataFrame(tau_STT[::-1], columns=['dist', 'tau_STT'])

    tau_df = pd.merge(tau_STA_df, tau_STT_df)

    tau_df['ratio'] = tau_df.tau_STA / tau_df.tau_STT
    
    return(tau_df)