__Required files:__
* ##### Part 1
    `gs://phenotype_31063/hail/imputed/ukb31063.dosage.autosomes.mt`

    `gs://nbaya/split/hapmap3_variants.tsv`

    `gs://nbaya/split/hapmap3_variants_v2.tsv`

    `gs://phenotype_31063/ukb31063.gwas_covariates.both_sexes.tsv`

* ##### Part 2
    `gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_v1.mt` **(Deprecated)**
    `gs://nbaya/split/ukb31063.hm3_variants.gwas_samples.mt` (Current)
* ##### Part 3
    `gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_v2.mt` **(Deprecated)**
    `gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_repart.mt` (Current)
    `gs://nbaya/split/meta_split/ukb31063.hm3_variants.gwas_samples_+phen+desc+.tsv`
* ##### Part 4
    `gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_+phen+_grouped_v1.mt`
* ##### Part 5
    `gs://nbaya/split/meta_split/ukb31063.hm3_+phen+_gmt.mt`

<br>

__Final output:__ 
`gs://nbaya/split/meta_split/ukb31063.hm3_+phen+_meta_split_final_s2.tsv.bgz`

__Note:__
The latest version of this is now a Python script submitted as a job to the cloud. When using this notebook, make sure to check code with the latest version to ensure that everything matches (most importantly, file names).

### Import packages, set phenotype 

In [4]:
import hail as hl
import numpy as np
import pandas as pd
phen = '50'
desc = 'height'
batch = '1'

# Part 1
Create filtered matrix table annotated with locus and alleles (Done)

In [2]:
mt = hl.read_matrix_table('gs://phenotype_31063/hail/imputed/ukb31063.dosage.autosomes.mt')


Initializing Spark and Hail with default parameters...
Running on Apache Spark version 2.2.1
SparkUI available at http://10.128.0.7:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2-e60bdb1a125a
LOGGING: writing to /home/hail/hail-20181119-2216-0.2-e60bdb1a125a.log


In [3]:
hm3 = hl.import_table('gs://nbaya/split/hapmap3_variants.tsv')
hm3 = hm3.annotate(**hl.parse_variant(hm3.v))
hm3 = hm3.key_by('locus','alleles')

2018-11-19 22:16:58 Hail: INFO: Reading table with no type imputation
  Loading column 'v' as type 'str' (type not specified)



In [None]:
hm3.write('gs://nbaya/split/hapmap3_variants_v2.tsv')

Filter to HapMap 3 variants

In [4]:
hm3.count()

1089172

In [5]:
mt = mt.filter_rows(hl.is_defined(hm3[mt.locus,mt.alleles])) 

In [11]:
mt.head(100).write('gs://nbaya/split/ukb31063.dosage.autosomes.hm3_variants.head100.mt')

2018-11-19 22:37:56 Hail: INFO: wrote 100 items in 7 partitions to gs://nbaya/split/ukb31063.dosage.autosomes.hm3_variants.head100.mt


In [None]:
#mt.write('gs://nbaya/split/ukb31063.dosage.autosomes.hm3_variants.mt')

In [None]:
ht_samples = hl.import_table('gs://phenotype_31063/ukb31063.gwas_covariates.both_sexes.tsv',
                             key='s', impute=True, types={'s': hl.tstr})


Annotate with covariates

In [None]:
mt = mt.annotate_cols(**ht_samples[mt.s])
mt = mt.filter_cols(hl.is_defined(mt.PC1), keep=True) #Only keeps columns with PC1 defined

In [None]:
# mt.write('gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_v1.mt', overwrite=True) #Deprecated
mt.write('gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_prerepart.mt', overwrite=True)

# Part 2
Repartition (Done)
#### Note: Task requires shuffle reading and writing. Do not use preemptible workers.

In [None]:
# mt = hl.read_matrix_table('gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_v1.mt') #Deprecated
mt = hl.read_matrix_table('gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_prerepart.mt')


mt = mt.repartition(1000)

# mt.write('gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_v2.mt', overwrite=True) #Deprecated
mt.write('gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_repart.mt', overwrite=True)


# Part 3
Annotate with phenotype and label with group IDs (Done)

In [None]:
# mt0 = hl.read_matrix_table('gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_v2.mt') #Deprecated
mt0 = hl.read_matrix_table('gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_repart.mt')
mt0.describe()
mt0.count()


In [None]:
phen_tb = hl.import_table('gs://nbaya/split/phen_tables/ukb31063.hm3_variants.gwas_samples_'+phen+'_'+desc+'.tsv', impute=True, types={'s': hl.tstr})

phen_tb.describe()



In [None]:
mt1 = mt0.annotate_cols(phen = phen_tb.key_by('s')[mt0.s]['sa.'+desc])
mt1.describe()

In [None]:
mt2 = mt1.add_col_index()
n = 300 #number of subgroups
group_size = int(mt2.count_cols()/n)+1
#list of group ids to be paired to each sample (Note: length of group_ids > # of cols in mt, but it doesn't affect the result)
group_ids = np.ndarray.tolist(np.ndarray.flatten(np.asarray([range(300)]*group_size))) 
np.random.shuffle(group_ids)
mt3 = mt2.annotate_cols(group_id = hl.literal(group_ids)[hl.int32(mt2.col_idx)]) #assign group ids
mt3.cols().show()
mt3.describe()


In [None]:
# print(mt3.aggregate_cols(hl.agg.counter(mt3.group_id))) #determine the evenness of the split

In [None]:
# Takes about 20 min with 100 workers, 0 preemptibles
# Requires shuffle reading and writing
mt3.write('gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_'+phen+'_grouped_v1.mt', overwrite=True)


# Part 4
Run linear regression (Done)

In [None]:
mt = hl.read_matrix_table('gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_'+phen+'_grouped_v1.mt')


In [None]:
mt = mt.rename({'dosage': 'x', 'phen': 'y'})

cov_list = [ mt['isFemale'], mt['age'], mt['age_squared'], mt['age_isFemale'],
            mt['age_squared_isFemale'] ]+ [mt['PC{:}'.format(i)] for i in range(1, 21)] 

In [None]:
gmt = (mt.group_cols_by(mt.group_id)
        .aggregate(linreg = hl.agg.linreg(y=mt.y, x = [mt.x, 1] + cov_list)))

In [None]:
# Takes about 1.7 hours (with 10 workers + 200 pre-emptible workers)
# First and only stage should have 1000 tasks
gmt.select_entries(beta=gmt.linreg.beta[0],standard_error=gmt.linreg.standard_error[0],
                  beta_int=gmt.linreg.beta[1],standard_error_int=gmt.linreg.standard_error[1]).write('gs://nbaya/split/meta_split/ukb31063.hm3_'+phen+'_gmt_batch_'+batch+'.mt')

# Part 5
Split 300 subgroups into two groups

In [6]:
gmt = hl.read_matrix_table(f'gs://nbaya/split/meta_split/ukb31063.hm3_{phen}_gmt_batch_{batch}.mt')

gmt = gmt.add_row_index()
gmt = gmt.key_rows_by('row_idx')
# # gmt = gmt.drop('locus','alleles','varid')
# # gmt = gmt.rename({'rsid': 'SNP'})

gmt = gmt.rename({'rsid': 'SNP'})
gmt = gmt.drop('locus','alleles','varid')
gmt = gmt.key_cols_by('group_id')
gmt = gmt.add_col_index()

Initializing Spark and Hail with default parameters...
Running on Apache Spark version 2.2.3
SparkUI available at http://10.128.0.148:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.12-13681278eb89
LOGGING: writing to /home/hail/hail-20190412-1524-0.2.12-13681278eb89.log


In [10]:
# mt = hl.read_matrix_table(f'gs://nbaya/split/ukb31063.hm3_variants.gwas_samples_{phen}_grouped_batch_{batch}.mt')
# mt.describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    's': str
    'isFemale': bool
    'age': int32
    'age_squared': int32
    'age_isFemale': int32
    'age_squared_isFemale': int32
    'PC1': float64
    'PC2': float64
    'PC3': float64
    'PC4': float64
    'PC5': float64
    'PC6': float64
    'PC7': float64
    'PC8': float64
    'PC9': float64
    'PC10': float64
    'PC11': float64
    'PC12': float64
    'PC13': float64
    'PC14': float64
    'PC15': float64
    'PC16': float64
    'PC17': float64
    'PC18': float64
    'PC19': float64
    'PC20': float64
    'phen_str': str
    'phen': float64
    'col_idx': int64
    'group_id': int32
----------------------------------------
Row fields:
    'locus': locus<GRCh37>
    'alleles': array<str>
    'rsid': str
    'varid': str
----------------------------------------
Entry fields:
    'dosage': float64
----------------------------------------
Column key: 

In [63]:
n = 300 #number of subgroups
groups = list(range(n))
randstate = np.random.RandomState(int(batch)) #seed with batch number
randstate.shuffle(groups)

In [33]:
# for subset_i in range(1):  #restricted range (< n) to start with
#     subset_i += 1
#     subset = hl.literal(set(groups[0:subset_i])) #set of group ids to use in sample
#     gmt_subset = gmt.filter_cols(subset.contains(gmt['group_id']))
#     gmt_subset['group_id'].show()
#     mt = gmt_subset.aggregate_cols(
#         hl.struct(unnorm_meta_beta=hl.agg.sum(gmt_subset.beta / gmt_subset.standard_error ** 2),
#                               inv_se2 = hl.agg.sum(1 / gmt_subset.standard_error ** 2)))
#     mt.describe()

+----------+
| group_id |
+----------+
|    int32 |
+----------+
|      189 |
+----------+



2018-09-28 13:26:06 Hail: ERROR: scope violation: 'MatrixTable.aggregate_cols' supports aggregation over indices ['column']
    Found indices ['row', 'column'], with unexpected indices ['row']. Invalid fields:
        'standard_error' (indices ['row', 'column'])
2018-09-28 13:26:06 Hail: ERROR: scope violation: 'MatrixTable.aggregate_cols' supports aggregation over indices ['column']
    Found indices ['row', 'column'], with unexpected indices ['row']. Invalid fields:
        'beta' (indices ['row', 'column'])
        'standard_error' (indices ['row', 'column'])


ExpressionException: scope violation: 'MatrixTable.aggregate_cols' supports aggregation over indices ['column']
    Found indices ['row', 'column'], with unexpected indices ['row']. Invalid fields:
        'standard_error' (indices ['row', 'column'])

In [68]:
# for subset_i in range(1,2):  #restricted range (< n) to start with
#     subset_i += 1
subset_i = 1
subset = hl.literal(set(groups[0:subset_i])) #set of group ids to use in sample
gmt_subset = gmt.filter_cols(subset.contains(gmt['group_id']))
mt = gmt_lab.aggregate_entries(
    hl.struct(
    unnorm_meta_beta=hl.agg.sum(gmt_lab.beta / gmt_lab.standard_error ** 2),
    inv_se2 = hl.agg.sum(1 / gmt_lab.standard_error ** 2),
    group_id = hl.agg.sum(gmt_lab.group_id)))
# mt = mt.annotate()
# ht = ht.annotate(A_Z = ht['A.unnorm_meta_beta'] / hl.sqrt(ht['A.inv_se2']))
# ht = ht.drop('A.unnorm_meta_beta','A.inv_se2').key_by('SNP')

# variants = hl.import_table('gs://nbaya/rg_sex/'+phen+'_snps_alleles_N.tsv.gz',types={'N': hl.tint64})
# variants = variants.key_by('SNP')

# metaA = variants.annotate(Z = ht[variants.SNP].A_Z)

# metaA_path = 'gs://nbaya/rg_sex/'+phen+'_dnsample_A_batch_'+batch+'_set'+str(i)+'.tsv.bgz'
# metaA.export(metaA_path)

                              
"""
for subset_i in range(1,2):  #restricted range (< n) to start with
    subset_i += 1
    subset = hl.literal(set(groups[0:subset_i])) #set of group ids to use in sample
    gmt_subset = gmt.filter_cols(subset.contains(gmt['group_id']))
#     pi[groups[0:subset_i]] = ['A']*(subset_i) #label the groups that belong to the subset
    pi = np.array(['A']*n).tolist()
    gmt_lab = gmt_subset.annotate_cols(label = hl.literal(pi)[hl.int32(gmt_subset.col_idx)])
    ht = gmt_lab.group_cols_by(gmt_lab.label).aggregate(
            unnorm_meta_beta=hl.agg.sum(gmt_lab.beta / gmt_lab.standard_error ** 2),
            inv_se2 = hl.agg.sum(1 / gmt_lab.standard_error ** 2),
        group_id = hl.agg.sum(gmt_lab.group_id)).make_table()
    ht = ht.annotate(A_Z = ht['A.unnorm_meta_beta'] / hl.sqrt(ht['A.inv_se2']))
    ht = ht.drop('A.unnorm_meta_beta','A.inv_se2').key_by('SNP')
    
    variants = hl.import_table('gs://nbaya/rg_sex/'+phen+'_snps_alleles_N.tsv.gz',types={'N': hl.tint64})
    variants = variants.key_by('SNP')
    
    metaA = variants.annotate(Z = ht[variants.SNP].A_Z
                              
    metaA_path = 'gs://nbaya/rg_sex/'+phen+'_dnsample_A_batch_'+batch+'_set'+str(i)+'.tsv.bgz'
    metaA.export(metaA_path)
"""

AttributeError: Struct instance has no field, method, or property 'result'

In [67]:
mt.describe()

AttributeError: Struct instance has no field, method, or property 'describe'

In [49]:
gmt_lab.group_cols_by(gmt_lab.label).aggregate(
            unnorm_meta_beta=hl.agg.sum(gmt_lab.beta / gmt_lab.standard_error ** 2),
            inv_se2 = hl.agg.sum(1 / gmt_lab.standard_error ** 2),
        group_id = hl.agg.sum(gmt_lab.group_id)).describe()

----------------------------------------
Global fields:
    None
----------------------------------------
Column fields:
    'label': str 
----------------------------------------
Row fields:
    'SNP': str 
    'row_idx': int64 
----------------------------------------
Entry fields:
    'unnorm_meta_beta': float64 
    'inv_se2': float64 
    'group_id': int64 
----------------------------------------
Column key: ['label']
Row key: ['row_idx']
----------------------------------------


In [4]:
n = 300 #number of subgroups
pi = ['A']*int(n/2) + ['B']*int(n/2)

In [5]:
np.random.shuffle(pi)
gmt = gmt.add_col_index()
gmt = gmt.annotate_cols(label = hl.literal(pi)[hl.int32(gmt.col_idx)])


In [6]:
# print(gmt.aggregate_cols(hl.agg.counter(gmt.label))) #determine the evenness of the split
mt = gmt.group_cols_by(gmt.label).aggregate(unnorm_meta_beta=hl.agg.sum(gmt.beta / gmt.standard_error ** 2), inv_se2 = hl.agg.sum(1 / gmt.standard_error ** 2))


In [None]:
#Make table of just the rsid column (already complete)
# rsid_mt = mt.drop( 'unnorm_meta_beta', 'inv_se2')
# rsid_tb = rsids_mt.make_table()
# rsid_tb.show()
# rsid_tb.export('gs://nbaya/split/meta_split/meta_split_rsid.tsv.bgz')


In [7]:
ht = mt.make_table()
# ht = ht.annotate(A_meta_beta = ht['A.unnorm_meta_beta'] / ht['A.inv_se2'],
#                    A_meta_se = hl.sqrt(1 / ht['A.inv_se2']), 
#                    B_meta_beta = ht['B.unnorm_meta_beta'] / ht['B.inv_se2'], 
#                    B_meta_se = hl.sqrt(1 / ht['B.inv_se2']))
ht = ht.annotate(A_Z = ht['A.unnorm_meta_beta'] / hl.sqrt(ht['A.inv_se2']),
                     B_Z = ht['B.unnorm_meta_beta'] / hl.sqrt(ht['B.inv_se2']))
ht = ht.drop('A.unnorm_meta_beta','B.unnorm_meta_beta','A.inv_se2','B.inv_se2').key_by('SNP')
ht.show()

2018-09-19 20:25:35 Hail: INFO: Ordering unsorted dataset with network shuffle


+------------+---------+--------------+--------------+
| SNP        | row_idx |          A_Z |          B_Z |
+------------+---------+--------------+--------------+
| str        |   int64 |      float64 |      float64 |
+------------+---------+--------------+--------------+
| rs1000000  |  798306 | -3.02286e-01 | -2.54503e-01 |
| rs10000010 |  267854 |  2.17979e-01 |  2.65505e+00 |
| rs1000002  |  252333 |  4.16166e-01 |  1.09169e+00 |
| rs10000023 |  292853 | -1.02628e+00 |  8.07921e-01 |
| rs1000003  |  220949 | -1.42453e+00 | -1.24809e+00 |
| rs10000033 |  307480 |  8.85953e-01 | -1.52904e+00 |
| rs10000037 |  274436 |  3.07317e+00 |  2.70448e+00 |
| rs10000041 |  316329 |  7.90511e-02 | -5.05603e-02 |
| rs1000007  |  179994 |  8.67723e-01 | -3.86379e-01 |
| rs10000075 |  321240 |  1.24276e+00 |  2.04676e+00 |
+------------+---------+--------------+--------------+
showing top 10 rows



In [8]:
variants = hl.import_table('gs://nbaya/rg_sex/'+phen+'_snps_alleles_N.tsv.gz',types={'N': hl.tint64})
variants = variants.key_by('SNP')

2018-09-19 20:25:41 Hail: INFO: Reading table with no type imputation
  Loading column 'SNP' as type 'str' (type not specified)
  Loading column 'A1' as type 'str' (type not specified)
  Loading column 'A2' as type 'str' (type not specified)
  Loading column 'N' as type 'int64' (user-specified)



In [9]:
metaA = variants.annotate(Z = ht[variants.SNP].A_Z)
metaB = variants.annotate(Z = ht[variants.SNP].B_Z)

In [10]:
metaB.show()

2018-09-19 20:25:52 Hail: INFO: Coerced sorted dataset
2018-09-19 20:25:54 Hail: INFO: Ordering unsorted dataset with network shuffle


+------------+-----+-----+--------+--------------+
| SNP        | A1  | A2  |      N |            Z |
+------------+-----+-----+--------+--------------+
| str        | str | str |  int64 |      float64 |
+------------+-----+-----+--------+--------------+
| rs1000000  | A   | G   | 168049 | -2.54503e-01 |
| rs10000010 | C   | T   | 168049 |  2.65505e+00 |
| rs1000002  | T   | C   | 168049 |  1.09169e+00 |
| rs10000023 | T   | G   | 168049 |  8.07921e-01 |
| rs1000003  | G   | A   | 168049 | -1.24809e+00 |
| rs10000033 | C   | T   | 168049 | -1.52904e+00 |
| rs10000037 | A   | G   | 168049 |  2.70448e+00 |
| rs10000041 | T   | G   | 168049 | -5.05603e-02 |
| rs1000007  | C   | T   | 168049 | -3.86379e-01 |
| rs10000075 | T   | C   | 168049 |  2.04676e+00 |
+------------+-----+-----+--------+--------------+
showing top 10 rows



In [11]:
metaA.aggregate(hl.agg.mean(metaA.Z))

2018-09-19 20:26:02 Hail: INFO: Coerced sorted dataset
2018-09-19 20:26:04 Hail: INFO: Ordering unsorted dataset with network shuffle


0.002286701004582796

In [12]:
metaB.aggregate(hl.agg.mean(metaB.Z))

2018-09-19 20:27:04 Hail: INFO: Coerced sorted dataset
2018-09-19 20:27:09 Hail: INFO: Ordering unsorted dataset with network shuffle


0.0067514271940972755

In [None]:
metaA.export('gs://nbaya/rg_sex/'+phen+'_meta_A_s1.tsv.bgz')
metaB.export('gs://nbaya/rg_sex/'+phen+'_meta_B_s1.tsv.bgz')

In [None]:
ht.export('gs://nbaya/split/meta_split/ukb31063.hm3_'+phen+'_metasplit_final_s3.tsv.bgz')

NameError: name 'phen' is not defined

In [22]:
phen_tb_all = hl.import_table('gs://phenotype_31063/ukb31063.phesant_phenotypes.both_sexes.tsv.bgz', missing = "")


2018-10-02 14:09:52 Hail: INFO: Reading table with no type imputation
  Loading column '"userId"' as type 'str' (type not specified)
  Loading column '"46"' as type 'str' (type not specified)
  Loading column '"47"' as type 'str' (type not specified)
  Loading column '"48"' as type 'str' (type not specified)
  Loading column '"49"' as type 'str' (type not specified)
  Loading column '"50"' as type 'str' (type not specified)
  Loading column '"51"' as type 'str' (type not specified)
  Loading column '"78"' as type 'str' (type not specified)
  Loading column '"84"' as type 'str' (type not specified)
  Loading column '"93"' as type 'str' (type not specified)
  Loading column '"94"' as type 'str' (type not specified)
  Loading column '"102"' as type 'str' (type not specified)
  Loading column '"129"' as type 'str' (type not specified)
  Loading column '"130"' as type 'str' (type not specified)
  Loading column '"134"' as type 'str' (type not specified)
  Loading column '"135"' as type 'str

In [34]:
phen = '20160'
desc = 'smoking'
phen_tb = phen_tb_all.select(phen_tb_all['"userId"'], phen_tb_all['"'+phen+'"'])
phen_tb = phen_tb.rename({ '"userId"': 's', '"'+phen+'"': desc})
phen_tb = phen_tb.key_by('s')
phen_tb.describe()
phen_tb.count()

----------------------------------------
Global fields:
    None
----------------------------------------
Row fields:
    's': str 
    'smoking': str 
----------------------------------------
Key: ['s']
----------------------------------------


361194

In [36]:
phen_tb = phen_tb.filter(phen_tb[desc] == "",keep=False)
phen_tb.count()

2018-10-02 14:15:52 Hail: INFO: Coerced sorted dataset


359751