In [1]:
from dask.distributed import Client, LocalCluster
from dask import delayed
import dask
from wepredict.pytorch_regression import pytorch_linear
from wepredict.plink_reader import Genetic_data_read, get_genotypes
from wepredict.helpers import *

In [2]:
plink_file = '../data/sim_1000G_chr10'
pheno_file = '../data/sim_1000G_chr10.txt'
ld_block_file = '../data/Berisa.EUR.hg19.bed'
data = Genetic_data_read(plink_file, ld_block_file, pheno_file)
train_index, valid_index, test_index = generate_valid_test_data(data.n, 0.1, 0.1)

In [47]:
indexes = (train_index, valid_index, test_index)
names = ('train', 'valid', 'test')
for n, i in zip(names, indexes):
    data.fam.iloc[i,:].to_csv(n+'.fam', index=False, sep='\t')

In [29]:
cluster = LocalCluster()
client = Client(cluster)

In [30]:
client

0,1
Client  Scheduler: tcp://127.0.0.1:39821  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 8  Memory: 16.61 GB


In [31]:
cluster.scale(6)

In [32]:
out = list()
for ld_block in data.groups[10]:
    mat = delayed(get_genotypes)(22, ld_block, plink_file, data.sub_in)
    sample = delayed(get_samples)(mat, data.pheno['V1'].values,
                                  train_index, valid_index)
    del mat
    model = delayed(pytorch_linear)(sample['training_x'],
                                    sample['training_y'],
                                    sample['valid_x'],
                                    sample['valid_y'],
                                    mini_batch_size=100, type='c')
    results = delayed(model.run)('l1', lamb=0.01, epochs=300, logging_frq=10)
    out.append(results)

In [33]:
res = dask.compute(out)

In [48]:
cluster.close()

In [35]:
pheno = data.pheno['V1'].values[valid_index]

In [36]:
sum_pred = sum([k.prediction_valid for k in res[0]])

In [37]:
np.corrcoef(pheno, sum_pred)

array([[1.        , 0.06774068],
       [0.06774068, 1.        ]])

In [38]:
import pickle
with open('l1_300epo_1kg_V1.pickle', 'wb') as f:
    pickle.dump(res, f)