In [1]:
import numpy as np
from itertools import combinations_with_replacement

In [2]:
def r2(w_AB, w_Ab, w_aB, n):
    p_AB = w_AB / float(n)
    p_Ab = w_Ab / float(n)
    p_aB = w_aB / float(n)

    p_A = p_AB + p_Ab
    p_B = p_AB + p_aB

    D_ = p_AB - (p_A * p_B)
    denom = p_A * p_B * (1 - p_A) * (1 - p_B)

    if denom == 0 and D_ == 0:
        return np.nan

    return (D_ * D_) / denom

In [3]:
def r(w_AB, w_Ab, w_aB, n):
    p_AB = w_AB / float(n)
    p_Ab = w_Ab / float(n)
    p_aB = w_aB / float(n)

    p_A = p_AB + p_Ab
    p_B = p_AB + p_aB

    D_ = p_AB - (p_A * p_B)
    denom = p_A * p_B * (1 - p_A) * (1 - p_B)

    if denom == 0 and D_ == 0:
        return np.nan

    return D_ / np.sqrt(denom)

In [4]:
def D(w_AB, w_Ab, w_aB, n):
    p_AB = w_AB / float(n)
    p_Ab = w_Ab / float(n)
    p_aB = w_aB / float(n)

    p_A = p_AB + p_Ab
    p_B = p_AB + p_aB

    return p_AB - (p_A * p_B)

In [5]:
def compute_hap_matrix(a_alleles, b_alleles, func, polarized, norm_strategy, re_norm=False, do_print=False):
    assert len(a_alleles) == len(b_alleles), 'inputs must be same length'
    assert norm_strategy in {'total', 'hap_weighted', 'af_weighted'}, f'unknown norm strategy: {norm_strategy}'
    a_alleles = np.asarray(a_alleles)
    b_alleles = np.asarray(b_alleles)

    result = np.zeros((2, 2))
    for (l_idx, left), (r_idx, right) in combinations_with_replacement([(0, a_alleles), (1, b_alleles)], 2):
        # NB: the left, right / A, B indices are correct. We are representing
        #     A as columns and B as rows, so the indexing looks funny, but it's not
        hap_mat = np.zeros((len(np.unique(right)), len(np.unique(left))))
        for A_i, B_i in zip(left, right):
            hap_mat[B_i, A_i] += 1
        stats, weights = compute_stat(hap_mat, func, polarized, norm_strategy)
        if re_norm:
            # TODO: consider re-norming the weights. This makes certain weighting schemes work with polarization
            weights = weights / weights.sum()
        if do_print:
            print(hap_mat, stats, weights, '============', sep='\n')
        result[l_idx, r_idx] = (stats * weights).sum()

    tri_idx = np.tril_indices(len(result), k=-1)
    result[tri_idx] = result.T[tri_idx]
    return result

In [6]:
def compute_stat(hap_mat, func, polarized, norm_strategy):
    hap_mat = np.asarray(hap_mat)

    n_B, n_A = hap_mat.shape
    n = hap_mat.sum()
    a_freq = hap_mat.sum(0) / n
    b_freq = hap_mat.sum(1) / n
    hap_freq = hap_mat / n

    weights = np.zeros(hap_mat.shape)
    stats = np.zeros(hap_mat.shape)
    for A_i in range(1 if polarized else 0, n_A):
        for B_i in range(1 if polarized else 0, n_B):
            # NB: the A, B indices are correct. We are representing A as columns and B as rows
            #     so the indexing looks funny, but it's not
            w_AB = hap_mat[B_i, A_i]
            w_Ab = hap_mat[:  , A_i].sum() - w_AB
            w_aB = hap_mat[B_i,   :].sum() - w_AB
            stats[B_i, A_i] = func(w_AB, w_Ab, w_aB, n)
            if norm_strategy == 'hap_weighted':
                weights[B_i, A_i] = hap_freq[B_i, A_i]
            elif norm_strategy == 'af_weighted':
                weights[B_i, A_i] = a_freq[A_i] * b_freq[B_i]
            elif norm_strategy == 'total':
                weights[B_i, A_i] = 1 / ((n_A - (1 if polarized else 0)) * (n_B - (1 if polarized else 0)))
    return stats, weights

biallelic repulsion case

In [20]:
CORRELATED = (
    [0, 1, 1, 0, 2, 2, 1, 0, 1],
    [1, 2, 2, 1, 0, 0, 2, 1, 2]
)
CORRELATED_SYMMETRIC = (
    [0, 1, 1, 0, 2, 2, 1, 0, 2],
    [1, 2, 2, 1, 0, 0, 2, 1, 0]
)
CORRELATED_BIALLELIC = (
    [0, 0, 0, 0, 1, 1, 1, 1],
    [0, 0, 0, 0, 1, 1, 1, 1]
)
UNCORRELATED = (
    [0, 0, 0, 1, 1, 1, 2, 2, 2],
    [0, 1, 2, 0, 1, 2, 0, 1, 2]
)
UNCORRELATED_BIALLELIC = (
    [0, 0, 0, 0, 1, 1, 1, 1],
    [1, 1, 0, 0, 0, 0, 1, 1]
)
REPULSION_BIALLELIC = (
    [0, 0, 0, 0, 1, 1, 1, 1],
    [1, 1, 1, 1, 0, 0, 0, 0]
)
TEST_CASES = {k: v for k, v in locals().items() if 'CORRELATED' in k or 'ALLELIC' in k}

In [48]:
def run_all_test_cases(test_cases):
    for func in {r, r2, D}:
        for test_case_name, test_case in test_cases.items():
            print('======', test_case_name, '======')
            for polarized in (True, False):
                for norm_strategy in {'total', 'hap_weighted', 'af_weighted'}:
                    for re_norm in (True, False):
                        print(f'polarized={polarized}, norm_strategy=\'{norm_strategy}\', re_norm={re_norm}, func={func.__name__}')
                        print(compute_hap_matrix(*test_case, func, polarized, norm_strategy, re_norm))
        print('====================================================================')

In [49]:
run_all_test_cases(TEST_CASES)

polarized=True, norm_strategy='af_weighted', re_norm=True, func=r2
[[0.65714286 0.55918367]
 [0.55918367 0.70612245]]
polarized=True, norm_strategy='af_weighted', re_norm=False, func=r2
[[0.29206349 0.28994709]
 [0.28994709 0.42716049]]
polarized=True, norm_strategy='total', re_norm=True, func=r2
[[0.61428571 0.44285714]
 [0.44285714 0.7       ]]
polarized=True, norm_strategy='total', re_norm=False, func=r2
[[0.61428571 0.44285714]
 [0.44285714 0.7       ]]
polarized=True, norm_strategy='hap_weighted', re_norm=True, func=r2
[[1. 1.]
 [1. 1.]]
polarized=True, norm_strategy='hap_weighted', re_norm=False, func=r2
[[0.66666667 0.44444444]
 [0.44444444 0.77777778]]
polarized=False, norm_strategy='af_weighted', re_norm=True, func=r2
[[0.54285714 0.54285714]
 [0.54285714 0.54285714]]
polarized=False, norm_strategy='af_weighted', re_norm=False, func=r2
[[0.54285714 0.54285714]
 [0.54285714 0.54285714]]
polarized=False, norm_strategy='total', re_norm=True, func=r2
[[0.5047619 0.5047619]
 [0.504

  weights = weights / weights.sum()


In [53]:
def run_all_with_params(test_cases, **kwargs):
    for test_case_name, test_case in test_cases.items():
        print('======', test_case_name, '======')
        for polarized in (True, False):
            norm_strategy = kwargs['norm_strategy']
            re_norm = kwargs['re_norm']
            func = kwargs['func']
            print(f'polarized={polarized}, norm_strategy=\'{norm_strategy}\', re_norm={re_norm}, func={func.__name__}')
            print(compute_hap_matrix(*test_case, polarized=polarized, **kwargs))

In [54]:
run_all_with_params(TEST_CASES, norm_strategy='hap_weighted', re_norm=True, func=r2)

polarized=True, norm_strategy='hap_weighted', re_norm=True, func=r2
[[1. 1.]
 [1. 1.]]
polarized=False, norm_strategy='hap_weighted', re_norm=True, func=r2
[[1. 1.]
 [1. 1.]]
polarized=True, norm_strategy='hap_weighted', re_norm=True, func=r2
[[1. 1.]
 [1. 1.]]
polarized=False, norm_strategy='hap_weighted', re_norm=True, func=r2
[[1. 1.]
 [1. 1.]]
polarized=True, norm_strategy='hap_weighted', re_norm=True, func=r2
[[1. 1.]
 [1. 1.]]
polarized=False, norm_strategy='hap_weighted', re_norm=True, func=r2
[[1. 1.]
 [1. 1.]]
polarized=True, norm_strategy='hap_weighted', re_norm=True, func=r2
[[1. 0.]
 [0. 1.]]
polarized=False, norm_strategy='hap_weighted', re_norm=True, func=r2
[[1. 0.]
 [0. 1.]]
polarized=True, norm_strategy='hap_weighted', re_norm=True, func=r2
[[1. 0.]
 [0. 1.]]
polarized=False, norm_strategy='hap_weighted', re_norm=True, func=r2
[[1. 0.]
 [0. 1.]]
polarized=True, norm_strategy='hap_weighted', re_norm=True, func=r2
[[ 1. nan]
 [nan  1.]]
polarized=False, norm_strategy='ha

  weights = weights / weights.sum()


In [59]:
run_all_with_params(TEST_CASES, norm_strategy='af_weighted', re_norm=True, func=r2)

polarized=True, norm_strategy='af_weighted', re_norm=True, func=r2
[[0.65714286 0.55918367]
 [0.55918367 0.70612245]]
polarized=False, norm_strategy='af_weighted', re_norm=True, func=r2
[[0.54285714 0.54285714]
 [0.54285714 0.54285714]]
polarized=True, norm_strategy='af_weighted', re_norm=True, func=r2
[[0.625  0.4375]
 [0.4375 0.625 ]]
polarized=False, norm_strategy='af_weighted', re_norm=True, func=r2
[[0.5 0.5]
 [0.5 0.5]]
polarized=True, norm_strategy='af_weighted', re_norm=True, func=r2
[[1. 1.]
 [1. 1.]]
polarized=False, norm_strategy='af_weighted', re_norm=True, func=r2
[[1. 1.]
 [1. 1.]]
polarized=True, norm_strategy='af_weighted', re_norm=True, func=r2
[[0.625 0.   ]
 [0.    0.625]]
polarized=False, norm_strategy='af_weighted', re_norm=True, func=r2
[[0.5 0. ]
 [0.  0.5]]
polarized=True, norm_strategy='af_weighted', re_norm=True, func=r2
[[1. 0.]
 [0. 1.]]
polarized=False, norm_strategy='af_weighted', re_norm=True, func=r2
[[1. 0.]
 [0. 1.]]
polarized=True, norm_strategy='af_w

In [56]:
compute_hap_matrix(*REPULSION_BIALLELIC, polarized=True, norm_strategy='hap_weighted', re_norm=True, func=r2, do_print=True)

[[4. 0.]
 [0. 4.]]
[[0. 0.]
 [0. 1.]]
[[0. 0.]
 [0. 1.]]
[[0. 4.]
 [4. 0.]]
[[0. 0.]
 [0. 1.]]
[[nan nan]
 [nan nan]]
[[4. 0.]
 [0. 4.]]
[[0. 0.]
 [0. 1.]]
[[0. 0.]
 [0. 1.]]


  weights = weights / weights.sum()


array([[ 1., nan],
       [nan,  1.]])

In [76]:
compute_hap_matrix([0, 1, 1, 0, 0, 1, 2, 1], [0, 2, 1, 1, 1, 1, 0, 0],
                   polarized=True, norm_strategy='total', re_norm=False, func=r, do_print=True)

[[3. 0. 0.]
 [0. 4. 0.]
 [0. 0. 1.]]
[[ 0.          0.          0.        ]
 [ 0.          1.         -0.37796447]
 [ 0.         -0.37796447  1.        ]]
[[0.   0.   0.  ]
 [0.   0.25 0.25]
 [0.   0.25 0.25]]
[[1. 1. 1.]
 [2. 2. 0.]
 [0. 1. 0.]]
[[ 0.          0.          0.        ]
 [ 0.          0.         -0.37796447]
 [ 0.          0.37796447 -0.14285714]]
[[0.   0.   0.  ]
 [0.   0.25 0.25]
 [0.   0.25 0.25]]
[[3. 0. 0.]
 [0. 4. 0.]
 [0. 0. 1.]]
[[ 0.          0.          0.        ]
 [ 0.          1.         -0.37796447]
 [ 0.         -0.37796447  1.        ]]
[[0.   0.   0.  ]
 [0.   0.25 0.25]
 [0.   0.25 0.25]]


array([[ 0.31101776, -0.03571429],
       [-0.03571429,  0.31101776]])

In [58]:
compute_hap_matrix(*REPULSION_BIALLELIC, polarized=True, norm_strategy='a

[[4. 0.]
 [0. 4.]]
[[0. 0.]
 [0. 1.]]
[[0. 0.]
 [0. 1.]]
[[0. 4.]
 [4. 0.]]
[[0. 0.]
 [0. 1.]]
[[0. 0.]
 [0. 1.]]
[[4. 0.]
 [0. 4.]]
[[0. 0.]
 [0. 1.]]
[[0. 0.]
 [0. 1.]]


array([[1., 1.],
       [1., 1.]])

In [57]:
compute_hap_matrix(*REPULSION_BIALLELIC, polarized=True, norm_strategy='hap_weighted', re_norm=False, func=r2, do_print=True)

[[4. 0.]
 [0. 4.]]
[[0. 0.]
 [0. 1.]]
[[0.  0. ]
 [0.  0.5]]
[[0. 4.]
 [4. 0.]]
[[0. 0.]
 [0. 1.]]
[[0. 0.]
 [0. 0.]]
[[4. 0.]
 [0. 4.]]
[[0. 0.]
 [0. 1.]]
[[0.  0. ]
 [0.  0.5]]


array([[0.5, 0. ],
       [0. , 0.5]])