# Converting tissue specific eQTL summary statistics to HDF5
Here I convert GTEx summary statistics to HDF5 format, making it easier to query and share the results. 

This procedure was initially written and used for GTEx V6 data in 2015. It is now updated to deal with V8 data, for input to `mashr`.

## Input data
Summar statistics data are in BED format, one row per gene-snp pair. Each tissue has a separate BED file. Additionally there are support files of gene transcription start site coordinates, and SNP coordinates.

## Convert to HDF5 format

Benchmark from previous runs:

| time | original size | HDF5 size | method | data type |
|:------:|:-------:|:-------:|:-------:|:-----:|
| 118 min| 5G (179,083,485 lines) | 3.8G | TBData, bzip2 | float32|  
| 118 min| 5G (179,083,485 lines) | 5.1G | TBData, bzip2 | float64|
| 138 min| 5G (179,083,485 lines) | 4.9G | TBData, zlib | float32|
| 39 min| 5G (179,083,485 lines) | 6.9G | HPData, zlib | float64|

I will use `bzip2` compression and `float32` to make initial compression, then extract for `mashr` input and save to RDS format.

In [None]:
[global]
cwd = '~/Documents/GTEx/V8'
sumstats_files = glob.glob(".")

## Data table classes

In [None]:
[sumstats_to_hdf5]
depends: executable("h5copy")
parameter: msg = "GTEX V8 fastqtl analysis"
parameter: maxsize = 1000 # maximum number of groups per HDF5 file
input: sumstats_files, group_by = 1, pattern = "{name}.gz"
output: expand_pattern('{_name}.h5')
task:
python:
import sys, os
import numpy as np, pandas as pd, tables as tb
tb.parameters.MAX_GROUP_WIDTH = 51200
# tb.parameters.NODE_CACHE_SLOTS = -51200
# tb.parameters.METADATA_CACHE_SIZE = 1048576 * 100000
# tb.parameters.CHUNK_CACHE_SIZE = 2097152 * 100000
# tb.parameters.CHUNK_CACHE_NELMTS = 521

class TBData(dict):
    def __init__(self, data, name, msg = None, root = '/', complib = 'bzip2'):
        '''bzip2 may not be compatible with other hdf5 applications; but zlib is fine'''
        self.__root = root.strip('/')
        self.__group = name
        self.__msg = msg
        try:
            if type(data) is dict:
                self.update(data)
            elif type(data) is str:
                # is file name
                self.__load(tb.open_file(data))
            else:
                # is file stream
                self.__load(data)
        except tb.exceptions.NoSuchNodeError:
            raise ValueError('Cannot find dataset {}!'.format(name))
        self.tb_filters = tb.Filters(complevel=9, complib=complib)

    def sink(self, filename):
        with tb.open_file(filename, 'a') as f:
            if self.__root:
                try:
                    f.create_group("/", self.__root)
                except:
                    pass
            try:
                # there is existing data -- have to merge with current data
                # have to do this because the input file lines are not grouped by gene names!!
                # use try ... except to hopefully faster than if ... else
                # e.g., if not f.__contains__('/{}'.format(self.__group)) ... else ...
                for element in f.list_nodes('/{}/{}'.format(self.__root, self.__group)):
                    if element.name != 'colnames':
                        self[element.name] = np.concatenate((element[:], self[element.name]))
            except tb.exceptions.NoSuchNodeError:
                f.create_group("/" + self.__root, self.__group,
                               self.__msg if self.__msg else self.__group)
            for key in self:
                self.__store_array(key, f)
            f.flush()

    def dump(self, table, output = False):
        if output:
            pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames']).to_csv(sys.stdout, na_rep = 'NA')
            return None
        else:
            return pd.DataFrame({self['colnames'][i] : self[table][:,i] for i in range(len(self['colnames']))}, index = self['rownames'])

    def __load(self, fstream):
        try:
            for element in fstream.list_nodes('/{}/{}'.format(self.__root, self.__group)):
                self[element.name] = element[:]
            fstream.close()
        except:
            fstream.close()
            raise

    def __roll_back(self, group, name):
        try:
            n = getattr(group, name)
            n._f_remove()
        except AttributeError:
            pass

    def __store_array(self, name, fstream):
        if self.__root:
            element = getattr(getattr(fstream.root, self.__root), self.__group)
        else:
            element = getattr(fstream.root, self.__group)
        arr = self[name]
        if type(arr) is list:
            arr = np.array(arr)
        self.__roll_back(element, name)
        #
        if arr.shape != (0,):
            ds = fstream.create_carray(element, name, tb.Atom.from_dtype(arr.dtype), arr.shape,
                                       filters = self.tb_filters)
            ds[:] = arr

def get_tb_grps(filenames, group_name = None):
    if isinstance(filenames, str):
        filenames = [filenames]
    names = set()
    for filename in filenames:
        with tb.open_file(filename) as f:
            names.update([node._v_name for node in (f.root if group_name is None else getattr(f.root, '{}'.format(group_name)))])
    return sorted(names)

class SSData:
    def __init__(self, header = False):
        self.data = {'buffer':{'data':[], 'rownames':[]}, 'output':{}}
        self.header = header
        self.previous_name = self.current_name = None
        self.count = -1

    def parse(self, line):
        # input line is snp, gene, beta, t, pval
        if not line:
            self.__reset()
            self.current_name = None
            return 1
        line = line.strip().split()
        self.count += 1
        if self.header and self.count == 0:
            return 0
        #
        if self.previous_name is None:
            self.previous_name = line[1]
        self.current_name = line[1]
        if self.current_name != self.previous_name:
            self.__reset()
        self.data['buffer']['data'].append([line[2], line[3], line[4]])
        self.data['buffer']['rownames'].append(line[0])
        return 0

    def __reset(self):
        self.data['buffer']['data'] = np.array(self.data['buffer']['data'], dtype = np.float32)
        self.data['buffer']['rownames'] = np.array(self.data['buffer']['rownames'])
        self.data['buffer']['colnames'] = np.array(['beta','t-stat','p-value'])
        self.data['output'] = copy.deepcopy(self.data['buffer'])
        self.data['buffer'] = {'data':[], 'rownames':[]}

    def dump(self):
        return self.data['output']

ssp = SSData(header = True)
group_counts = 0
bname = os.path.basename(item)
with gzip.open(${_input!r}) as f:
    while True:
        line = f.readline()
        quit = ssp.parse(line)
        if ssp.current_name != ssp.previous_name:
            group_counts += 1
            with warnings.catch_warnings():
                warnings.filterwarnings("ignore", category = tb.NaturalNameWarning)
                # warnings.filterwarnings("ignore", category = tb.PerformanceWarning)
                data = TBData(ssp.dump(), ssp.previous_name, ${msg!r})
                data.sink("${_output!n}_%i.tmp" % (np.ceil(group_counts / ${maxsize})) if ${maxsize} > 0 else ${_output!r})
            ssp.previous_name = ssp.current_name

if ${maxsize} > 0:
    from glob import glob
    tmpfiles = list(glob("${_output!n}_*.tmp"))
    for item in sorted(tmpfiles):
        for name in get_tb_grps(item):
            os.system('h5copy -i {0} -o ${_output} -s "/{1}" -d "/{1}"'.format(item, name))