# pc_relate_2

In [1]:
# %env HAIL_QUERY_BACKEND=local

In [2]:
import hail as hl
# hl.utils.get_1kg('tmp/')
mt = hl.read_matrix_table('tmp/1kg.mt')
mt.count()

Initializing Hail with default parameters...
Running on Apache Spark version 3.1.1
SparkUI available at http://wmbfd-7b8.home:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.64-4088c260a348
LOGGING: writing to /Users/pcumming/PycharmProjects/hail/hail/python/hail/methods/relatedness/hail-20210404-1540-0.2.64-4088c260a348.log


(10879, 284)

Compute results from current implementation located at `hail/hail/python/hail/methods/relatedness/pc_relate.py`:

In [3]:
pc_rel = hl.pc_relate(mt.GT, min_individual_maf=0.01, k=10)

2021-04-04 15:40:23 Hail: INFO: hwe_normalized_pca: running PCA using 10370 variants.
2021-04-04 15:40:25 Hail: INFO: pca: running PCA with 10 components...
2021-04-04 15:41:02 Hail: INFO: Wrote all 3 blocks of 10879 x 284 matrix with block size 4096.


Steps from current implementation that we can just reuse:

In [4]:
# Get PC scores
from hail.methods.pca import hwe_normalized_pca
_, scores, _ = hwe_normalized_pca(mt.GT, k=10, compute_loadings=False)
scores_expr = scores[mt.col_key].scores
scores_table = mt.select_cols(__scores=scores_expr)\
    .key_cols_by().select_cols('__scores').cols()

2021-04-04 15:41:04 Hail: INFO: hwe_normalized_pca: running PCA using 10370 variants.
2021-04-04 15:41:05 Hail: INFO: pca: running PCA with 10 components...


In [5]:
# Check for missing scores, create entries for g matrix
import hail.expr.aggregators as agg
n_missing = scores_table.aggregate(agg.count_where(hl.is_missing(scores_table.__scores)))
if n_missing > 0:
    raise ValueError(f'Found {n_missing} columns with missing scores array.')
mt = mt.select_entries(__gt=mt.GT.n_alt_alleles()).unfilter_entries()
mt = mt.annotate_rows(__mean_gt=agg.mean(mt.__gt))
mean_imputed_gt = hl.or_else(hl.float64(mt.__gt), mt.__mean_gt)

In [6]:
# Get PCs and g matrix
from hail.linalg import BlockMatrix
block_size = BlockMatrix.default_block_size()
g_bm = BlockMatrix.from_entry_expr(mean_imputed_gt, block_size=block_size)
pcs = scores_table.collect(_localize=False).map(lambda x: x.__scores)

2021-04-04 15:41:38 Hail: INFO: Wrote all 3 blocks of 10879 x 284 matrix with block size 4096.


At this point, the current implementation calls:

```
ht = Table(ir.BlockMatrixToTableApply(g._bmir, pcs._ir, {
    'name': 'PCRelate',
    'maf': min_individual_maf,
    'blockSize': block_size,
    'minKinship': min_kinship,
    'statistics': {'kin': 0, 'kin2': 1, 'kin20': 2, 'all': 3}[statistics]
}))
```

So we want to replace the Scala code at `hail/hail/src/main/scala/is/hail/methods/PCRelate.scala` with Python that can run on the query backend.

Below we'll work out a new implementation, will just refer to it as `pc_relate_2`.

In [7]:
# Concat array of ones (intercept) with PCs, do QR
pcs_nd = hl.nd.array(pcs)
v_nd = hl.nd.concatenate([hl.nd.ones((pcs_nd.shape[0], 1)), pcs_nd], axis=1)
q_nd, r_nd = hl.nd.qr(v_nd, mode='reduced')
rinv_qt_nd = hl.nd.inv(r_nd) @ q_nd.T

In [8]:
# Check dims
nd_shapes = {
    'v': hl.eval(v_nd.shape),
    'q': hl.eval(q_nd.shape),
    'r': hl.eval(r_nd.shape),
    'rinv @ qT': hl.eval(rinv_qt_nd.shape)
}
print(nd_shapes)

{'v': (284, 11), 'q': (284, 11), 'r': (11, 11), 'rinv @ qT': (11, 284)}


In [9]:
# Convert inv(r) @ q.T to bm for computing beta
rinv_qt_bm = BlockMatrix.from_numpy(hl.eval(rinv_qt_nd))
beta_bm = rinv_qt_bm @ g_bm.T

# Convert v to bm for computing mu
v_bm = BlockMatrix.from_numpy(hl.eval(v_nd))
mu_bm = 0.5 * (v_bm @ beta_bm).T

In [10]:
# Check dims again
bm_shapes = {
    'rinv @ qT': hl.eval(rinv_qt_bm.shape),
    'beta': hl.eval(beta_bm.shape),
    'v': hl.eval(v_bm.shape),
    'mu': hl.eval(mu_bm.shape),
    'g': hl.eval(g_bm.shape)
}
print(bm_shapes)

{'rinv @ qT': (11, 284), 'beta': (11, 10879), 'v': (284, 11), 'mu': (10879, 284), 'g': (10879, 284)}


Define a few methods to use below to check the entries in mu and g matrices, as well as compute Gram matrix used to compute phi.

In [11]:
min_individual_maf = 0.01

def bad_mu(mu, maf):
    return (mu <= maf) | (mu >= (1.0 - maf)) | (mu <= 0.0) | (mu >= 1.0)

def bad_gt(gt):
    return (gt != hl.float64(0)) & (gt != hl.float64(1)) & (gt != hl.float64(2))

def gram(M):
    return M.T @ M

In [12]:
(hl.eval(bad_gt(-1)), hl.eval(bad_gt(0)), hl.eval(bad_gt(1)), hl.eval(bad_gt(2)), hl.eval(bad_gt(3)))

(True, False, False, False, True)

Workaround without using Block Matrix `map` function.

Convert Block Matrices for g, mu to matrix tables to check bad values and get centered AFs. 

Create variance matrix table after checking for bad values. 

Convert matrix tables back to Block Matrices and compute estimate for phi.

In [13]:
g_mt = g_bm.to_matrix_table_row_major()
g_mt.describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    'col_idx': int64
----------------------------------------
Row fields:
    'row_idx': int64
----------------------------------------
Entry fields:
    'element': float64
----------------------------------------
Column key: ['col_idx']
Row key: ['row_idx']
----------------------------------------


2021-04-04 15:41:45 Hail: INFO: wrote matrix with 10879 rows and 284 columns as 3 blocks of size 4096 to /tmp/UJyfRvEUOfioWydrrU3Yes


In [14]:
pre_mu_mt = mu_bm.to_matrix_table_row_major()
pre_mu_mt.describe()

2021-04-04 15:41:45 Hail: INFO: BlockMatrix multiply: writing right input with 11 rows and 10879 cols (3 blocks of size 4096) to temporary file /tmp/blockmatrix-dot-right-gOeVLrqAxpyxsGj7skJJsV.bm
2021-04-04 15:41:46 Hail: INFO: wrote matrix with 11 rows and 10879 columns as 3 blocks of size 4096 to /tmp/blockmatrix-dot-right-gOeVLrqAxpyxsGj7skJJsV.bm


----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    'col_idx': int64
----------------------------------------
Row fields:
    'row_idx': int64
----------------------------------------
Entry fields:
    'element': float64
----------------------------------------
Column key: ['col_idx']
Row key: ['row_idx']
----------------------------------------


2021-04-04 15:41:47 Hail: INFO: wrote matrix with 10879 rows and 284 columns as 3 blocks of size 4096 to /tmp/A6bqMQvy6uMx1DiAQdg2J0


In [15]:
# Define NaN to use instead of missing values, otherwise cannot go back to block matrix
nan = hl.literal(0) / 0

In [16]:
# Replace bad entries in g matrix with NaNs
g_mt = g_mt.annotate_entries(g = hl.if_else(bad_gt(g_mt.element), nan, g_mt.element))
g_mt = g_mt.select_entries(g_mt.g)
g_mt.show()

Unnamed: 0_level_0,0,1,2,3,4,5
row_idx,g,g,g,g,g,g
int64,float64,float64,float64,float64,float64,float64
0,0.0,0.0,0.0,0.0,0.0,0.0
1,0.0,0.0,0.0,0.0,0.0,0.0
2,0.0,0.0,0.0,0.0,,0.0
3,,0.0,0.0,0.0,0.0,0.0
4,1.0,1.0,1.0,0.0,0.0,1.0
5,0.0,,0.0,0.0,0.0,0.0
6,2.0,1.0,1.0,1.0,0.0,2.0
7,1.0,1.0,2.0,0.0,1.0,0.0
8,0.0,1.0,1.0,1.0,0.0,0.0
9,0.0,0.0,0.0,0.0,0.0,0.0


In [17]:
# Replace bad entries in mu matrix with NaNs, call pre_mu for now
pre_mu_mt = pre_mu_mt.annotate_entries(pre_mu = hl.if_else(bad_mu(pre_mu_mt.element, min_individual_maf), 
                                                           nan, 
                                                           pre_mu_mt.element))
pre_mu_mt = pre_mu_mt.select_entries(pre_mu_mt.pre_mu)
pre_mu_mt.show()

Unnamed: 0_level_0,0,1,2,3,4,5
row_idx,pre_mu,pre_mu,pre_mu,pre_mu,pre_mu,pre_mu
int64,float64,float64,float64,float64,float64,float64
0,,0.0122,,,0.0281,
1,,,,,0.0146,
2,,,,,,
3,,0.0247,0.0186,0.0115,0.0114,0.0126
4,0.239,0.193,0.224,0.208,0.213,0.234
5,,,,,,
6,0.351,0.338,0.364,0.346,0.317,0.372
7,0.623,0.656,0.629,0.637,0.673,0.64
8,0.326,0.292,0.308,0.314,0.312,0.322
9,,,,,,


In [18]:
# Combine entries from g and pre_mu matrices into single MT
g_pre_mu_mt = g_mt.annotate_entries(pre_mu = pre_mu_mt[g_mt.row_idx, g_mt.col_idx].pre_mu)
g_pre_mu_mt.show()

2021-04-04 15:41:49 Hail: WARN: cols(): Resulting column table is sorted by 'col_key'.
    To preserve matrix table column order, first unkey columns with 'key_cols_by()'


Unnamed: 0_level_0,0,0,1,1,2,2
row_idx,g,pre_mu,g,pre_mu,g,pre_mu
int64,float64,float64,float64,float64,float64,float64
0,0.0,,0.0,0.0122,0.0,
1,0.0,,0.0,,0.0,
2,0.0,,0.0,,0.0,
3,,,0.0,0.0247,0.0,0.0186
4,1.0,0.239,1.0,0.193,1.0,0.224
5,0.0,,,,0.0,
6,2.0,0.351,1.0,0.338,1.0,0.364
7,1.0,0.623,1.0,0.656,2.0,0.629
8,0.0,0.326,1.0,0.292,1.0,0.308
9,0.0,,0.0,,0.0,


In [19]:
# Create final mu matrix
# If either pre_mu or g entry in combined MT is NaN, insert NaN, otherwise use pre_mu entry
# To make sure we can compute centered AF, (g/2) - mu
mu_mt = g_pre_mu_mt.annotate_entries(mu = hl.if_else(hl.is_nan(g_pre_mu_mt.g) | hl.is_nan(g_pre_mu_mt.pre_mu),
                                                     nan, 
                                                     g_pre_mu_mt.pre_mu))
mu_mt = mu_mt.select_entries(mu_mt.mu)
mu_mt.show()

Unnamed: 0_level_0,0,1,2,3,4,5
row_idx,mu,mu,mu,mu,mu,mu
int64,float64,float64,float64,float64,float64,float64
0,,0.0122,,,0.0281,
1,,,,,0.0146,
2,,,,,,
3,,0.0247,0.0186,0.0115,0.0114,0.0126
4,0.239,0.193,0.224,0.208,0.213,0.234
5,,,,,,
6,0.351,0.338,0.364,0.346,0.317,0.372
7,0.623,0.656,0.629,0.637,0.673,0.64
8,0.326,0.292,0.308,0.314,0.312,0.322
9,,,,,,


In [20]:
# Create variance matrix based on mu matrix, replacing NaNs with zeros
variance_mt = mu_mt.annotate_entries(variance = hl.if_else(hl.is_nan(mu_mt.mu), 
                                                           hl.float(0.0), 
                                                           (mu_mt.mu * (1.0 - mu_mt.mu))))
variance_mt = variance_mt.select_entries(variance_mt.variance)
variance_mt.show()

Unnamed: 0_level_0,0,1,2,3,4
row_idx,variance,variance,variance,variance,variance
int64,float64,float64,float64,float64,float64
0,0.0,0.012,0.0,0.0,0.0273
1,0.0,0.0,0.0,0.0,0.0144
2,0.0,0.0,0.0,0.0,0.0
3,0.0,0.0241,0.0182,0.0114,0.0113
4,0.182,0.156,0.174,0.165,0.168
5,0.0,0.0,0.0,0.0,0.0
6,0.228,0.224,0.232,0.226,0.216
7,0.235,0.226,0.233,0.231,0.22
8,0.22,0.207,0.213,0.215,0.215
9,0.0,0.0,0.0,0.0,0.0


In [21]:
# Combine entries from g and mu matrices into single MT, for computing centered AF
g_mu_mt = g_mt.annotate_entries(mu = mu_mt[g_mt.row_idx, g_mt.col_idx].mu)
g_mu_mt.show()

Unnamed: 0_level_0,0,0,1,1,2,2
row_idx,g,mu,g,mu,g,mu
int64,float64,float64,float64,float64,float64,float64
0,0.0,,0.0,0.0122,0.0,
1,0.0,,0.0,,0.0,
2,0.0,,0.0,,0.0,
3,,,0.0,0.0247,0.0,0.0186
4,1.0,0.239,1.0,0.193,1.0,0.224
5,0.0,,,,0.0,
6,2.0,0.351,1.0,0.338,1.0,0.364
7,1.0,0.623,1.0,0.656,2.0,0.629
8,0.0,0.326,1.0,0.292,1.0,0.308
9,0.0,,0.0,,0.0,


In [22]:
# Create matrix of centered AFs as entries
centered_af_mt = g_mu_mt.annotate_entries(centered_af = hl.if_else(hl.is_nan(g_mu_mt.mu), 
                                                                   hl.float64(0.0), 
                                                                   (g_mu_mt.g / 2) - g_mu_mt.mu))
centered_af_mt = centered_af_mt.select_entries(centered_af_mt.centered_af)
centered_af_mt.show()

Unnamed: 0_level_0,0,1,2,3
row_idx,centered_af,centered_af,centered_af,centered_af
int64,float64,float64,float64,float64
0,0.0,-0.0122,0.0,0.0
1,0.0,0.0,0.0,0.0
2,0.0,0.0,0.0,0.0
3,0.0,-0.0247,-0.0186,-0.0115
4,0.261,0.307,0.276,-0.208
5,0.0,0.0,0.0,0.0
6,0.649,0.162,0.136,0.154
7,-0.123,-0.156,0.371,-0.637
8,-0.326,0.208,0.192,0.186
9,0.0,0.0,0.0,0.0


Alternatively, to accomplish the same thing with fewer lines of code, just store all entries in a single matrix table:

In [23]:
g_mt = g_bm.to_matrix_table_row_major()
g_mt = g_mt.annotate_entries(g = hl.if_else(bad_gt(g_mt.element), nan, g_mt.element))

pre_mu_mt = mu_bm.to_matrix_table_row_major()
pre_mu_mt = pre_mu_mt.annotate_entries(pre_mu = hl.if_else(bad_mu(pre_mu_mt.element, min_individual_maf), 
                                                           nan, 
                                                           pre_mu_mt.element))

# Use bm_mt to store entries for g, pre_mu, mu, var, and centered_af
bm_mt = g_mt.annotate_entries(pre_mu = pre_mu_mt[g_mt.row_idx, g_mt.col_idx].pre_mu)
bm_mt = bm_mt.annotate_entries(mu=hl.if_else(hl.is_nan(bm_mt.g) | hl.is_nan(bm_mt.pre_mu),
                                             nan,
                                             bm_mt.pre_mu))

bm_mt = bm_mt.annotate_entries(variance=hl.if_else(hl.is_nan(bm_mt.mu),
                                                   hl.float64(0.0),
                                                   (bm_mt.mu * (1.0 - bm_mt.mu))), 
                               centered_af=hl.if_else(hl.is_nan(bm_mt.mu),
                                                      hl.float64(0.0),
                                                      (bm_mt.g / 2) - bm_mt.mu))
bm_mt.show()

2021-04-04 15:41:57 Hail: INFO: wrote matrix with 10879 rows and 284 columns as 3 blocks of size 4096 to /tmp/h3d6QCck0HhS7x3cWymX24
2021-04-04 15:41:57 Hail: INFO: BlockMatrix multiply: writing right input with 11 rows and 10879 cols (3 blocks of size 4096) to temporary file /tmp/blockmatrix-dot-right-dA8P4I9SLLP6yV7jk8P3Bu.bm
2021-04-04 15:41:57 Hail: INFO: wrote matrix with 11 rows and 10879 columns as 3 blocks of size 4096 to /tmp/blockmatrix-dot-right-dA8P4I9SLLP6yV7jk8P3Bu.bm
2021-04-04 15:41:58 Hail: INFO: wrote matrix with 10879 rows and 284 columns as 3 blocks of size 4096 to /tmp/il19MYcuk20d47s17pz9BP


Unnamed: 0_level_0,0,0,0,0,0,0
row_idx,element,g,pre_mu,mu,variance,centered_af
int64,float64,float64,float64,float64,float64,float64
0,0.0,0.0,,,0.0,0.0
1,0.0,0.0,,,0.0,0.0
2,0.0,0.0,,,0.0,0.0
3,0.0369,,,,0.0,0.0
4,1.0,1.0,0.239,0.239,0.182,0.261
5,0.0,0.0,,,0.0,0.0
6,2.0,2.0,0.351,0.351,0.228,0.649
7,1.0,1.0,0.623,0.623,0.235,-0.123
8,0.0,0.0,0.326,0.326,0.22,-0.326
9,0.0,0.0,,,0.0,0.0


Now back to BlockMatrix:

In [24]:
g_bm = BlockMatrix.from_entry_expr(bm_mt.g)
mu_bm = BlockMatrix.from_entry_expr(bm_mt.mu)
variance_bm = BlockMatrix.from_entry_expr(bm_mt.variance)
std_dev_bm = variance_bm.sqrt()
centered_af_bm = BlockMatrix.from_entry_expr(bm_mt.centered_af)

2021-04-04 15:42:00 Hail: INFO: Wrote all 3 blocks of 10879 x 284 matrix with block size 4096.
2021-04-04 15:42:02 Hail: INFO: Wrote all 3 blocks of 10879 x 284 matrix with block size 4096.
2021-04-04 15:42:04 Hail: INFO: Wrote all 3 blocks of 10879 x 284 matrix with block size 4096.
2021-04-04 15:42:05 Hail: INFO: Wrote all 3 blocks of 10879 x 284 matrix with block size 4096.


Now we can compute our estimate of phi, and compare to the existing pc_relate implementation in Hail:

In [25]:
phi_bm = gram(centered_af_bm) / gram(std_dev_bm)

In [26]:
pc_rel.describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'i': struct {
        s: str
    } 
    'j': struct {
        s: str
    } 
    'kin': float64 
    'ibd0': float64 
    'ibd1': float64 
    'ibd2': float64 
----------------------------------------
Key: ['i', 'j']
----------------------------------------


In [27]:
pc_rel_2 = phi_bm.entries()
pc_rel_2.describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'i': int64 
    'j': int64 
    'entry': float64 
----------------------------------------
Key: ['i', 'j']
----------------------------------------


In [28]:
%%capture output
pc_rel_count = pc_rel.count()

In [29]:
# Without self_kinship count should match pc_relate count
pc_rel_2 = pc_rel_2.filter(pc_rel_2.i == pc_rel_2.j, keep=False)
print(pc_rel_2.count() / 2)
print(pc_rel_count)

40186.0
40186


Re-key `pc_rel_2` results table with sample column keys from call expression, so we can compare to output of current `pc_relate()`:

In [30]:
col_keys = hl.literal(mt.select_cols().key_cols_by().cols().collect(), 
                      dtype=hl.tarray(mt.col_key.dtype))

pc_rel_2 = pc_rel_2.key_by(i=col_keys[hl.int32(pc_rel_2.i)], 
                           j=col_keys[hl.int32(pc_rel_2.j)])
pc_rel_2 = pc_rel_2.rename({'entry' : 'pc_relate_2'})
pc_rel_2.describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'i': struct {
        s: str
    } 
    'j': struct {
        s: str
    } 
    'pc_relate_2': float64 
----------------------------------------
Key: ['i', 'j']
----------------------------------------


In [31]:
compare_ht = pc_rel.join(pc_rel_2).select("kin", "pc_relate_2")
compare_ht.describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    'i': struct {
        s: str
    } 
    'j': struct {
        s: str
    } 
    'kin': float64 
    'pc_relate_2': float64 
----------------------------------------
Key: ['i', 'j']
----------------------------------------


2021-04-04 15:42:09 Hail: INFO: Table.join: renamed the following fields on the right to avoid name conflicts:
    'i' -> 'i_1'
    'j' -> 'j_1'


And it looks like the estimate from phi agrees with the existing method in Hail:

In [32]:
%%capture output
diffs = (compare_ht.kin - compare_ht.pc_relate_2).collect()

In [33]:
diffs[1:10]

[-7.331080187356065e-13,
 -8.934259582149551e-14,
 5.210864292143347e-13,
 2.1144891393376497e-13,
 1.479523101255431e-12,
 1.635245758246917e-12,
 1.91326121612434e-13,
 -1.50414403154997e-13,
 -5.540932296321799e-13]

In [34]:
abs_diffs = [abs(diff) for diff in diffs]
print(min(abs_diffs))
print(max(abs_diffs))

6.938893903907228e-18
2.967976558965191e-11


In [35]:
n_diff = len(diffs)
mean_sq_diffs = [(diff**2) / n_diff for diff in diffs]
print(min(mean_sq_diffs))
print(max(mean_sq_diffs))

1.198134888012763e-39
2.19202828212981e-26


#### TODO:

Still have to compute the IBD statistics, write the `hl.pc_relate_2()` function, test on other datasets.