<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Merge-larger-datasets" data-toc-modified-id="Merge-larger-datasets-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Merge larger datasets</a></span></li><li><span><a href="#scratch" data-toc-modified-id="scratch-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>scratch</a></span></li></ul></div>

This notebook should only be run if some of the RNAseq runs failed.  

In [1]:
import glob
import biom
import arviz as az
import numpy as np
import xarray as xr
from arviz.utils import Dask
from dask.distributed import Client
import dask
Dask.enable_dask(dask_kwargs={"dask": "parallelized", "output_dtypes": [float]})
client = Client(threads_per_worker=2, n_workers=60, memory_limit="500GB")

Perhaps you already have a cluster running?
Hosting the HTTP server on port 33533 instead


In [32]:
def merge_inferences(inf_list, log_likelihood, posterior_predictive,
                     coords, concatenation_name='features',
                     sample_name='samples'):
    group_list = []
    group_list.append([x.posterior for x in inf_list])
    group_list.append([x.sample_stats for x in inf_list])
    if log_likelihood is not None:
        group_list.append([x.log_likelihood for x in inf_list])
    if posterior_predictive is not None:
        group_list.append(
            [x.posterior_predictive for x in inf_list]
        )

    po_ds = dask.compute(xr.concat(group_list[0], concatenation_name))[0]
    ss_ds = dask.compute(xr.concat(group_list[1], concatenation_name))[0]
    group_dict = {"posterior": po_ds, "sample_stats": ss_ds}

    if log_likelihood is not None:
        ll_ds = dask.compute(xr.concat(group_list[2], concatenation_name))[0]
        ll_ds = ll_ds.rename_dims({'log_lhood_dim_0': sample_name})
        group_dict["log_likelihood"] = ll_ds
    if posterior_predictive is not None:
        pp_ds = dask.compute(xr.concat(group_list[3], concatenation_name))[0]
        pp_ds = pp_ds.rename_dims({'y_predict_dim_0': sample_name})
        group_dict["posterior_predictive"] = pp_ds

    all_group_inferences = []
    for group in group_dict:
        # Set concatenation dim coords
        group_ds = group_dict[group].assign_coords(
            {concatenation_name: coords[concatenation_name],
             sample_name: coords[sample_name]}
        )
        group_inf = az.InferenceData(**{group: group_ds})  # hacky
        all_group_inferences.append(group_inf)

    return az.concat(*all_group_inferences)

In [42]:
inference_files = glob.glob('../sfari/data/recount3/intermediate/*.nc')

In [43]:
intvs = np.arange(1000, len(inference_files), 1000)

In [45]:
for inv in range(len(intvs)):
    if inv == len(intvs) - 1:
        start, end = intvs[inv], -1
    else:
        start, end = intvs[inv], intvs[inv + 1]
    inf_files = inference_files[start:end]
    group_kwargs={'posterior':{'chunks': {'features': 100}}}
    inf_list = dask.compute(*[az.from_netcdf(x, group_kwargs=group_kwargs) for x in inf_files])
    feature_ids = list(map(lambda x: x.split('/')[-1].split('.nc')[0], inf_files))
    coords = {'features': feature_ids, 
              'samples': table.ids(),  
              'monte_carlo_samples': np.arange(100)}
    samples = merge_inferences(inf_list, 'y_predict', 'log_lhood', coords)
    samples.to_netcdf(f'../sfari/data/recount3/subsets/rna_differentials_{start}_{end}.nc')

# Merge larger datasets

In [46]:
sub_files = glob.glob('../sfari/data/recount3/subsets/*.nc')
group_kwargs={'posterior':{'chunks': {'features': 100}}}
sub_list = dask.compute(*[az.from_netcdf(x, group_kwargs=group_kwargs) for x in sub_files])

'../sfari/data/recount3/rna_differentials_backup-v2.nc'

In [57]:
sub_list[0].posterior_predictive

In [None]:
inf_list = sub_list
log_likelihood = 'y_predict'        # wut?
posterior_predictive = 'log_lhood'  # wut?        
concatenation_name='features'
sample_name='samples'

group_list = []
group_list.append([x.posterior for x in inf_list])
group_list.append([x.sample_stats for x in inf_list])
if log_likelihood is not None:
    group_list.append([x.log_likelihood for x in inf_list])
if posterior_predictive is not None:
    group_list.append(
        [x.posterior_predictive for x in inf_list]
    )

po_ds = dask.compute(xr.concat(group_list[0], concatenation_name))[0]
ss_ds = dask.compute(xr.concat(group_list[1], concatenation_name))[0]
group_dict = {"posterior": po_ds, "sample_stats": ss_ds}

In [None]:
if log_likelihood is not None:
    ll_ds = dask.compute(xr.concat(group_list[2], concatenation_name))[0]
    #ll_ds = ll_ds.rename_dims({'log_lhood_dim_0': sample_name})
    group_dict["log_likelihood"] = ll_ds
if posterior_predictive is not None:
    pp_ds = dask.compute(xr.concat(group_list[3], concatenation_name))[0]
    #pp_ds = pp_ds.rename_dims({'y_predict_dim_0': sample_name})
    group_dict["posterior_predictive"] = pp_ds

In [80]:
#all_files = glob.glob('../sfari/data/recount3/intermediate/*.nc')

feature_ids = ll_ds.features.values
coords = {'features': feature_ids, 
          'samples': table.ids(),  
          'monte_carlo_samples': np.arange(100)}

In [81]:
all_group_inferences = []
for group in group_dict:
    # Set concatenation dim coords
    group_ds = group_dict[group].assign_coords(
        {concatenation_name: coords[concatenation_name],
         sample_name: coords[sample_name]}
    )
    group_inf = az.InferenceData(**{group: group_ds})  # hacky
    all_group_inferences.append(group_inf)

samples = az.concat(*all_group_inferences)

In [83]:
samples.to_netcdf(f'../sfari/data/recount3/rna_differentials_backup-v2.nc')

'../sfari/data/recount3/rna_differentials_backup-v2.nc'

In [None]:
samples

# scratch

In [24]:
k = 1000
inference_files = glob.glob('../sfari/data/recount3/intermediate/*.nc')

In [25]:
group_kwargs={'posterior':{'chunks': {'features': 100}}}
inf_list = dask.compute(*[az.from_netcdf(x, group_kwargs=group_kwargs) for x in inference_files])

In [26]:
table = biom.load_table('../sfari/data/recount3/table.biom')

In [27]:
feature_ids = list(map(lambda x: x.split('/')[-1].split('.nc')[0], inference_files))[:k]

In [28]:
for i, inf in enumerate(inf_list):
    if not hasattr(inf, 'sample_stats'):
        print(i, inf)
        break
        
del inf_list_[i]
del feature_ids[i]

In [29]:
feature_ids[i]

'ENSG00000258748.1'

In [30]:
i = 27080
inf_list_ = list(inf_list)

In [31]:
coords = {'features': feature_ids, 
          'samples': table.ids(),  
          'monte_carlo_samples': np.arange(100)}

In [33]:
#from q2_matchmaker._stan import merge_inferences
samples = merge_inferences(inf_list_, 'y_predict', 'log_lhood', coords)

In [None]:
samples.to_netcdf('../sfari/data/recount3/subsets/rna_differentials_.nc')