In [4]:
import os
import bcolz
import json

import tensorflow as tf
import numpy as np
import pandas as ps

%reload_ext autoreload
%autoreload 2

%aimport dataset_interval_reader

### Write some dummy test data

In [2]:
NUM_SEQ_CHARS = 4
SEQ_LEN_CHR = int(1e4)
NUM_CHRS = 5
NUM_DATASETS = 3

NUM_INTERVALS = 500
INTERVAL_LENGTH = 100
NUM_TASKS = 6

DATA_DIR = 'test-data'
FA_DIRS = [os.path.join(DATA_DIR, 'seq-{}'.format(i)) for i in range(NUM_DATASETS)]
BW_DIRS = [os.path.join(DATA_DIR, 'bgw-{}'.format(i)) for i in range(NUM_DATASETS)]

INTERVALS_FILE = os.path.join(DATA_DIR, 'intervals_file.json')
INPUTS_FILE = os.path.join(DATA_DIR, 'inputs_file.json')

TASK_NAMES = ['task-name-{}'.format(i) for i in range(NUM_TASKS)]
DATASET_NAMES = ['dataset-name-{}'.format(i) for i in range(NUM_DATASETS)]

BLOSC_CPARAMS = bcolz.cparams(clevel=5, shuffle=bcolz.SHUFFLE, cname='lz4')

if not os.path.isdir(DATA_DIR):
    os.mkdir(DATA_DIR)

for FA_DIR in FA_DIRS:
    if not os.path.isdir(FA_DIR):
        os.mkdir(FA_DIR)

for BW_DIR in BW_DIRS:
    if not os.path.isdir(BW_DIR):
        os.mkdir(BW_DIR)

def random_fasta_seq():
    seq_idxs = np.random.randint(0, NUM_SEQ_CHARS, SEQ_LEN_CHR)
    seq_arr = np.zeros((NUM_SEQ_CHARS, SEQ_LEN_CHR))
    seq_arr[seq_idxs, np.arange(SEQ_LEN_CHR, dtype=int)] = 1
    return seq_arr

def random_bw_data():
    # Just use low-frequency wave function for now
    bw_data = np.sin(np.arange(SEQ_LEN_CHR) / 1e-3)
    return bw_data

def random_labels():
    # Just random labels for now, as ints
    labels = np.random.randint(0, 3, size=(NUM_INTERVALS, NUM_TASKS))
    return labels

def random_intervals():
    interval_starts = np.random.randint(0, SEQ_LEN_CHR - INTERVAL_LENGTH, size=NUM_INTERVALS)
    interval_ends = interval_starts + INTERVAL_LENGTH
    interval_chrs = np.random.randint(0, NUM_CHRS, size=NUM_INTERVALS)
    interval_chrs = np.array(list(map(lambda x: 'chr{}'.format(x), interval_chrs)))
    intervals = ps.DataFrame([interval_chrs, interval_starts, interval_ends]).T
    return intervals
    

seq_arrs = {'chr{}'.format(i): random_fasta_seq() for i in range(NUM_CHRS)}
bw_arrs = {'chr{}'.format(i): random_bw_data() for i in range(NUM_CHRS)}

def dump_to_disk(chr_key, arr, base_dir):
    target_fname = os.path.join(base_dir, chr_key)
    c_arr = bcolz.carray(arr, cparams=BLOSC_CPARAMS, rootdir=target_fname, mode='w')
    c_arr.flush()

def write_metadata(base_dir):
    # Check the first file to get the shape
    arr_shape = bcolz.carray(rootdir=os.path.join(base_dir, 'chr0'), mode='r').shape
    chr_shapes = {'chr{}'.format(i): arr_shape for i in range(NUM_CHRS)}
    metadata = {'type': 'array_bcolz', 'file_shapes': chr_shapes}
    with open(os.path.join(base_dir, 'metadata.json'), 'w') as fp:
        json.dump(metadata, fp)

for FA_DIR in FA_DIRS:
    for chr_key, arr in seq_arrs.items():
        dump_to_disk(chr_key, arr, FA_DIR)
    write_metadata(FA_DIR)

for BW_DIR in BW_DIRS:
    for chr_key, arr in bw_arrs.items():
        dump_to_disk(chr_key, arr, BW_DIR)
    write_metadata(BW_DIR)
    
intervals_file_dict = {'task_names': TASK_NAMES}
inputs_file_dict = {}

for dataset_idx, dataset_name in enumerate(DATASET_NAMES):
    labels_file = os.path.join(DATA_DIR, 'labels{}.npy'.format(dataset_idx))
    intervals_file = os.path.join(DATA_DIR, 'intervals{}.bed'.format(dataset_idx))
    
    labels = random_labels()
    np.save(labels_file, labels)
    
    intervals = random_intervals()
    intervals.to_csv(intervals_file, sep='\t', header=False, index=False)
    
    intervals_file_dict[dataset_name] = {'regions': intervals_file, 'labels': labels_file}
    inputs_file_dict[dataset_name] = {'dnase_data_dir': BW_DIRS[dataset_idx], 'genome_data_dir': FA_DIRS[dataset_idx]}

with open(INTERVALS_FILE, 'w') as fp:
    json.dump(intervals_file_dict, fp)

with open(INPUTS_FILE, 'w') as fp:
    json.dump(inputs_file_dict, fp)


### Set up the readers

The data we want to read is in `test-data/`.

We just need to use `dataset_interval_reader.get_readers` to create readers for all the datasets

In [5]:
readers = dataset_interval_reader.get_readers(INPUTS_FILE, INTERVALS_FILE)

In [6]:
s = tf.InteractiveSession()

In [7]:
s.run(tf.global_variables_initializer())
s.run(tf.local_variables_initializer())

# Note that you *must* start queue runners before fetching any of the dequeues.
queue_runner_threads = tf.train.start_queue_runners(s)

In [8]:
# Note that readers is a dictionary of `examples_queue`s
readers

{u'dataset-name-0': <tensorflow.python.ops.data_flow_ops.FIFOQueue at 0x1151092d0>,
 u'dataset-name-1': <tensorflow.python.ops.data_flow_ops.FIFOQueue at 0x1148e9490>,
 u'dataset-name-2': <tensorflow.python.ops.data_flow_ops.FIFOQueue at 0x115109cd0>}

In [9]:
# We can fetch from any of the readers like this
s.run(readers['dataset-name-0'].dequeue())

{'data/dnase_data_dir': array([-0.55269241,  0.76406795,  1.39545929,  0.78886145, -0.52480567,
        -1.39576578, -1.06171799,  0.18496498,  1.25313413,  1.207883  ,
         0.0888174 , -1.12460971, -1.37035608, -0.43333429,  0.86633509,
         1.39112699,  0.68172157, -0.64097989, -1.41929376, -0.97200698,
         0.30939606,  1.30337799,  1.1399641 , -0.03781878, -1.19912589,
        -1.32753265, -0.31065205,  0.96149951,  1.37548161,  0.56895995,
        -0.75216413, -1.43158734, -0.8746503 ,  0.43119261,  1.34301281,
         1.06274736, -0.16430394, -1.26417422, -1.27421117, -0.18562989,
         1.04879773,  1.34864879,  0.45148134, -0.85746628, -1.4325484 ,
        -0.77042884,  0.54937738,  1.37172079,  0.976852  , -0.28962335,
        -1.31923294, -1.21081948, -0.05927089,  1.12752926,  1.31084394,
         0.33022827, -0.95604169, -1.42216873, -0.66017896,  0.66300231,
         1.3892715 ,  0.88296735, -0.41277155, -1.36386025, -1.13786614,
         0.06741124,  1.1970

In [10]:
# Or fetch a batch like this
_ = s.run(readers['dataset-name-0'].dequeue_many(128))

In [11]:
# We can also check how fast it is,
# but note that this is slower because the full result is returned to python

def fetch():
    _ = s.run(readers['dataset-name-1'].dequeue_many(128))

%timeit fetch()

100 loops, best of 3: 27.4 ms per loop


### Try parallel output queue

In [None]:
shared_queue = tf.FIFOQueue()