In [15]:
import hail as hl
import numpy as np
import scipy.stats as stats
import subprocess
import datetime as dt
import pandas as pd

In [2]:
wd = 'gs://nbaya/risk_gradients/'
gwas_wd = wd+'gwas/'

In [3]:
phen_dict = {
    '50':'height',
    '2443':'diabetes',
    '21001':'bmi',
}
phen_ls = ['21001']
n_ls = ['359933']
n_cas_ls = ['0']
frac_all_ls = [0.2]
frac_cas_ls = [1]
frac_con_ls = [1]
seed = 1

In [4]:
def get_mt(phen, variant_set):
    mt0 = hl.read_matrix_table(f'gs://nbaya/split/ukb31063.{variant_set}_variants.gwas_samples_repart.mt')
    
    print(f'\nReading UKB phenotype {phen_dict[phen]} (code: {phen})...')
    #        mt0 = hl.read_matrix_table('gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_v2.mt') #old version
        
    phen_tb0 = hl.import_table('gs://phenotype_31063/ukb31063.phesant_phenotypes.both_sexes.tsv.bgz',
                               missing='',impute=True,types={'"userId"': hl.tstr}).rename({ '"userId"': 's', '"'+phen+'"': 'phen'})
    phen_tb0 = phen_tb0.key_by('s')
    phen_tb = phen_tb0.select(phen_tb0['phen'])    
    
    mt1 = mt0.annotate_cols(phen_str = hl.str(phen_tb[mt0.s]['phen']).replace('\"',''))
    mt1 = mt1.filter_cols(mt1.phen_str == '',keep=False)
    
    if phen_tb.phen.dtype == hl.dtype('bool'):
        mt1 = mt1.annotate_cols(phen = hl.bool(mt1.phen_str)).drop('phen_str')
    else:
        mt1 = mt1.annotate_cols(phen = hl.float64(mt1.phen_str)).drop('phen_str')            
    
    #Remove withdrawn samples
    withdrawn = hl.import_table('gs://nbaya/w31063_20181016.csv',missing='',no_header=True)
    withdrawn_set = set(withdrawn.f0.take(withdrawn.count()))
    mt1 = mt1.filter_cols(hl.literal(withdrawn_set).contains(mt1['s']),keep=False) 
    mt1 = mt1.key_cols_by('s')
    
    n = mt1.count_cols()
    n_cas = mt1.filter_cols(mt1.phen == 1).count_cols()
    
    variants = hl.import_table(wd+'ukb_imp_v3_pruned.bim',delimiter='\t',no_header=True,impute=True)
    variants = variants.rename({'f0':'chr','f1':'rsid','f3':'pos'}).key_by('rsid')
    
    mt2 = mt1.key_rows_by('rsid')
    mt2 = mt2.filter_rows(hl.is_defined(variants[mt2.rsid])) #filter to variants in variants table

    return mt2, n, n_cas

In [5]:
mt0, n, n_cas = get_mt(phen_ls[0],'hm3')

Initializing Spark and Hail with default parameters...
Running on Apache Spark version 2.2.3
SparkUI available at http://10.128.0.35:4041
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.12-13681278eb89
LOGGING: writing to /home/hail/hail-20190612-1655-0.2.12-13681278eb89.log



Reading UKB phenotype bmi (code: 21001)...


2019-06-12 16:57:38 Hail: INFO: Reading table to impute column types
2019-06-12 16:58:16 Hail: INFO: Finished type imputation
  Loading column '"userId"' as type 'str' (user-specified)
  Loading column '"46"' as type 'float64' (imputed)
  Loading column '"47"' as type 'float64' (imputed)
  Loading column '"48"' as type 'float64' (imputed)
  Loading column '"49"' as type 'float64' (imputed)
  Loading column '"50"' as type 'float64' (imputed)
  Loading column '"51"' as type 'float64' (imputed)
  Loading column '"78"' as type 'float64' (imputed)
  Loading column '"84"' as type 'float64' (imputed)
  Loading column '"93"' as type 'float64' (imputed)
  Loading column '"94"' as type 'float64' (imputed)
  Loading column '"102"' as type 'float64' (imputed)
  Loading column '"129"' as type 'float64' (imputed)
  Loading column '"130"' as type 'float64' (imputed)
  Loading column '"134"' as type 'str' (imputed)
  Loading column '"135"' as type 'str' (imputed)
  Loading column '"136"' as type 'str'

2019-06-12 16:58:17 Hail: INFO: Reading table with no type imputation
  Loading column 'f0' as type 'str' (type not specified)

2019-06-12 17:01:15 Hail: INFO: Reading table to impute column types
2019-06-12 17:01:15 Hail: INFO: Finished type imputation
  Loading column 'f0' as type 'int32' (imputed)
  Loading column 'f1' as type 'str' (imputed)
  Loading column 'f2' as type 'int32' (imputed)
  Loading column 'f3' as type 'int32' (imputed)
  Loading column 'f4' as type 'str' (imputed)
  Loading column 'f5' as type 'str' (imputed)


In [8]:
phen = phen_ls[0]
frac_all = frac_all_ls[0]
frac_cas = frac_cas_ls[0]
frac_con = frac_con_ls[0]
n_new0 = int(frac_all*n)
n_cas_new = int(frac_cas*n_cas)
n_new = int(frac_con*(n_new0-n_cas_new)+n_cas_new)
if frac_con == 1 and frac_cas ==1:
    suffix = f'.{phen}.n_{n_new}of{n}.seed_{seed}'
else:
    suffix = f'.{phen}.n_{n_new}of{n}.n_cas_{n_cas_new}of{n_cas}.seed_{seed}'

In [10]:
print('importing table...')
gwas = hl.import_table(f'{gwas_wd}ss{suffix}.tsv.bgz',force_bgz=True,impute=True)
gwas = gwas.key_by('SNP')
print('annotating with betas...')
mt2 = mt0.annotate_rows(beta = gwas[mt0.rsid].eff)

importing table...


2019-06-12 17:10:35 Hail: INFO: Reading table to impute column types


annotating with betas...


2019-06-12 17:10:36 Hail: INFO: Finished type imputation
  Loading column 'SNP' as type 'str' (imputed)
  Loading column 'A1' as type 'str' (imputed)
  Loading column 'A2' as type 'str' (imputed)
  Loading column 'N' as type 'int32' (imputed)
  Loading column 'ch' as type 'int32' (imputed)
  Loading column 'pos' as type 'int32' (imputed)
  Loading column 'pval' as type 'float64' (imputed)
  Loading column 'maf' as type 'float64' (imputed)
  Loading column 'eff' as type 'float64' (imputed)


In [13]:
print('calculating PGS...')
print('Time: {:%H:%M:%S (%Y-%b-%d)}'.format(dt.datetime.now()))
start_pgs = dt.datetime.now()
mt3 = mt2.annotate_cols(pgs = hl.agg.sum(mt2.dosage*mt2.beta))
mt3.cols().select('phen','pgs').export(f'{wd}pgs{suffix}.tsv.bgz')
elapsed_pgs = dt.datetime.now()-start_pgs
print('\nFinished calculating PGS')
print(f'Time for calculating PGS: {round(elapsed_pgs.seconds/60, 2)}minutes')

calculating PGS...
Time: 17:11:16 (2019-Jun-12)


2019-06-12 17:11:16 Hail: WARN: cols(): Resulting column table is sorted by 'col_key'.
    To preserve matrix table column order, first unkey columns with 'key_cols_by()'
2019-06-12 17:16:56 Hail: INFO: Ordering unsorted dataset with network shuffle
2019-06-12 17:16:58 Hail: INFO: Ordering unsorted dataset with network shuffle
2019-06-12 17:17:02 Hail: INFO: Coerced sorted dataset
2019-06-12 17:27:25 Hail: INFO: Coerced sorted dataset



Finished calculating PGS
Time for calculating PGS: 16.23minutes


2019-06-12 17:27:30 Hail: INFO: while writing:
    gs://nbaya/risk_gradients/pgs.21001.n_71986of359933.seed_1.tsv.bgz
  merge time: 1.157s


In [16]:
print('calculating R^2 between PGS and phenotype...')
print('Time: {:%H:%M:%S (%Y-%b-%d)}'.format(dt.datetime.now()))
start_r2 = dt.datetime.now()
subprocess.call(['gsutil','cp',f'{wd}pgs{suffix}.tsv.bgz','/home/nbaya/'])
subprocess.call(['gsutil','cp',f'{gwas_wd}iid{suffix}.tsv.bgz','/home/nbaya/'])
df = pd.read_csv(f'/home/nbaya/pgs{suffix}.tsv.bgz',delimiter='\t',compression='gzip')
r_all, pval_all = stats.pearsonr(df.pgs, df.phen)
iid = pd.read_csv(f'/home/nbaya/iid{suffix}.tsv.bgz',delimiter='\t',compression='gzip')
df1 = df[df.s.isin(iid.iid.tolist())]
r_sub, pval_sub = stats.pearsonr(df1.pgs,df1.phen)
print('\n****************************')
print(f'PGS x phenotype correlation for all {n} individuals')
print(f'r = {r_all}, pval = {pval_all}')
print(f'PGS x phenotype correlation for all {n_new} individuals')
print(f'r = {r_sub}, pval = {pval_sub}')
print('****************************')
array = [[r_all, pval_all],[r_sub, pval_sub]]
np.savetxt(f'/home/nbaya/corr{suffix}.txt',array,delimiter='\t')
subprocess.call(['gsutil','cp',f'/home/nbaya/corr{suffix}.txt',wd])
elapsed_r2 = dt.datetime.now()-start_r2
print('\nFinished calculating R^2')
print(f'Time for calculating R^2: {round(elapsed_r2.seconds/60, 2)}minutes')

calculating R^2 between PGS and phenotype...
Time: 17:29:47 (2019-Jun-12)

****************************
PGS x phenotype correlation for all 359933 individuals
r = 0.3049085094344871, pval = 0.0
PGS x phenotype correlation for all 71986 individuals
r = 0.717609084761418, pval = 0.0
****************************

Finished calculating R^2
Time for calculating R^2: 0.05minutes


In [7]:
mt1 = mt0.key_rows_by('rsid')
mt1 = mt1.filter_rows(hl.is_defined(variants[mt1.rsid])) #filter to variants in variants table

In [8]:
mt1.count_rows()

2019-06-11 18:42:48 Hail: INFO: Ordering unsorted dataset with network shuffle
2019-06-11 18:42:49 Hail: INFO: Ordering unsorted dataset with network shuffle


101651

In [9]:
mt1.rsid.show()

2019-06-11 18:45:32 Hail: INFO: Ordering unsorted dataset with network shuffle
2019-06-11 18:45:33 Hail: INFO: Ordering unsorted dataset with network shuffle


+--------------+
| rsid         |
+--------------+
| str          |
+--------------+
| "rs10000300" |
| "rs10000404" |
| "rs10000488" |
| "rs1000053"  |
| "rs1000058"  |
| "rs10000609" |
| "rs10000747" |
| "rs10000959" |
| "rs10001148" |
| "rs1000122"  |
+--------------+
showing top 10 rows



In [10]:
for i, phen in enumerate(phen_ls):
    n = int(n_ls[i])
    n_cas = int(n_cas_ls[i])
    for frac_all in frac_all_ls:
        for frac_cas in frac_cas_ls:
            for frac_con in frac_con_ls:
                n_new = int(frac_all*n)
                n_cas_new = int(frac_cas*n_cas)
                if frac_con == 1 and frac_cas ==1:
                    gwas_path = f'{gwas_wd}ss.{phen}.n_{n_new}of{n}.seed_{seed}.tsv.bgz' 
                else:
                    gwas_path = f'{gwas_wd}ss.{phen}.n_{n_new}of{n}.n_cas_{n_cas_new}of{n_cas}.seed_{seed}.tsv.bgz' 
                print('importing table...')
                gwas = hl.import_table(gwas_path,force_bgz=True,impute=True)
                gwas = gwas.key_by('SNP')
                
                print('annotating with betas...')
                mt2 = mt1.annotate_rows(beta = gwas[mt1.rsid].eff)
                print('calculating dot product of genotypes with betas...')
                mt3 = mt2.annotate_cols(pgs = hl.agg.sum(mt2.dosage*mt2.beta))
                if frac_con == 1 and frac_cas ==1:
                    pgs_path = f'{wd}pgs.{phen}.n_{n_new}of{n}.seed_{seed}.tsv.bgz' 
                else:
                    pgs_path = f'{wd}pgs.{phen}.n_{n_new}of{n}.n_cas_{n_cas_new}of{n_cas}.seed_{seed}.tsv.bgz' 
                mt3.cols().select('pgs').export(pgs_path)

importing table...


2019-06-11 18:48:16 Hail: INFO: Reading table to impute column types
2019-06-11 18:48:18 Hail: INFO: Finished type imputation
  Loading column 'SNP' as type 'str' (imputed)
  Loading column 'A1' as type 'str' (imputed)
  Loading column 'A2' as type 'str' (imputed)
  Loading column 'N' as type 'int32' (imputed)
  Loading column 'ch' as type 'int32' (imputed)
  Loading column 'pos' as type 'int32' (imputed)
  Loading column 'pval' as type 'float64' (imputed)
  Loading column 'maf' as type 'float64' (imputed)
  Loading column 'eff' as type 'float64' (imputed)
2019-06-11 18:48:18 Hail: WARN: cols(): Resulting column table is sorted by 'col_key'.
    To preserve matrix table column order, first unkey columns with 'key_cols_by()'


annotating with betas...
calculating dot product of genotypes with betas...


2019-06-11 18:50:50 Hail: INFO: Ordering unsorted dataset with network shuffle
2019-06-11 18:50:51 Hail: INFO: Ordering unsorted dataset with network shuffle
2019-06-11 18:50:53 Hail: INFO: Coerced sorted dataset
2019-06-11 19:09:06 Hail: INFO: Coerced sorted dataset
2019-06-11 19:09:11 Hail: INFO: while writing:
    gs://nbaya/risk_gradients/pgs.50.n_72067of360338.seed_1.tsv.bgz
  merge time: 794.552ms


In [22]:
pgs = hl.import_table('gs://nbaya/risk_gradients/pgs.50.n_72067of360338.seed_1.tsv.bgz',force_bgz=True,impute=True,types={'s':hl.tstr})
pgs = pgs.key_by('s')

2019-06-11 19:17:34 Hail: INFO: Reading table to impute column types
2019-06-11 19:17:34 Hail: INFO: Finished type imputation
  Loading column 's' as type 'str' (user-specified)
  Loading column 'pgs' as type 'float64' (imputed)


In [23]:
mt4 = mt0.annotate_cols(pgs = pgs[mt0.s].pgs)
df = mt4.cols().select('pgs','phen').to_pandas()

2019-06-11 19:19:14 Hail: INFO: Coerced sorted dataset


In [60]:
r_all, pval_all = stats.pearsonr(df.pgs,df.phen)
print(f'{r_all}, {pval_all}')

0.2886767548030616, 0.0


In [63]:
if frac_con == 1 and frac_cas ==1:
    r_path = f'corr.{phen}.n_{n_new}of{n}.seed_{seed}.txt' 
else:
    r_path = f'corr.{phen}.n_{n_new}of{n}.n_cas_{n_cas_new}of{n_cas}.seed_{seed}.txt'
array = [[r_all, pval_all],[r_sub, pval_sub]]
np.savetxt(f'/home/nbaya/{r_path}',array,delimiter='\t')
subprocess.call(['gsutil','cp',f'/home/nbaya/{r_path}',wd])

0

In [50]:
iid = hl.import_table(gwas_wd+f'iid.{phen}.n_{n_new}of{n}.seed_{seed}.tsv.bgz',force_bgz=True)
iid = iid.rename({'iid':'s'}).key_by('s')

2019-06-11 19:37:07 Hail: INFO: Reading table with no type imputation
  Loading column 'fid' as type 'str' (type not specified)
  Loading column 'iid' as type 'str' (type not specified)



In [51]:
mt5 = mt4.filter_cols(hl.is_defined(iid[mt4.s]))

In [52]:
mt5.count_cols()

72067

In [19]:
mt2.count_rows()

2019-06-11 17:44:50 Hail: INFO: Ordering unsorted dataset with network shuffle
2019-06-11 17:44:50 Hail: INFO: Ordering unsorted dataset with network shuffle


101651

In [53]:
ids = iid.s.take(int(1e6))

2019-06-11 19:40:27 Hail: INFO: Coerced sorted dataset


In [57]:
df1 = df[df.s.isin(ids)]

In [61]:
r_sub, pval_sub = stats.pearsonr(df1.pgs,df1.phen)


In [59]:
r

0.527970009287676

In [20]:
mt3 = mt2.annotate_cols(pgs = hl.agg.sum(mt2.dosage*mt2.beta))

In [21]:
mt3.pgs.show()

KeyboardInterrupt: 

In [None]:
hl.plot.histogram(mt3.pgs)