In [1]:
import sys
sys.path.append('../..')
from MPRA_predict.utils import *
from MPRA_predict.datasets import *

In [2]:
bed_df = pd.read_csv('/home/shared/enformer_data/human/sequences.bed', sep='\t', header=None)
bed_df.columns = ['chr', 'start', 'end', 'split']
bed_df.to_csv('data/enformer_data.csv', index=False)

In [3]:
bed_df = pd.read_csv('data/enformer_data.csv')
bed_df

Unnamed: 0,chr,start,end,split
0,chr18,928386,1059458,train
1,chr4,113630947,113762019,train
2,chr11,18427720,18558792,train
3,chr16,85805681,85936753,train
4,chr3,158386188,158517260,train
...,...,...,...,...
38166,chr19,33204702,33335774,test
38167,chr14,41861379,41992451,test
38168,chr19,30681544,30812616,test
38169,chr14,61473198,61604270,test


In [4]:
targets = pd.read_csv('/home/shared/enformer_data/human/targets.txt', sep='\t')
targets_K562 = targets[targets['description'] == 'DNASE:K562']
targets_K562

Unnamed: 0,index,genome,identifier,file,clip,scale,sum_stat,description
121,121,0,ENCFF413AHU,/home/drk/tillage/datasets/human/dnase/encode/...,32,2,mean,DNASE:K562
122,122,0,ENCFF868NHV,/home/drk/tillage/datasets/human/dnase/encode/...,32,2,mean,DNASE:K562
123,123,0,ENCFF565YDB,/home/drk/tillage/datasets/human/dnase/encode/...,32,2,mean,DNASE:K562
625,625,0,ENCFF971AHO,/home/drk/tillage/datasets/human/dnase/encode/...,32,2,mean,DNASE:K562


In [5]:
import pyBigWig
import numpy as np
from multiprocessing import Pool
from tqdm import tqdm

def init_worker(bw_path):
    # 在子进程初始化时打开 bigWig 文件，全局变量 bw 将在子进程内可用
    global bw
    bw = pyBigWig.open(bw_path)

def process_row(row):
    chrom, start, end = row
    # 1024个bin,每个bin128bp,一共128*1024=131072bp
    start = start
    end = end
    mean_values = bw.stats(chrom, start, end, nBins=1024, type='mean')
    mean_values = mean_values[64: -64]
    return mean_values

def process_track(track_row, bed_df, num_workers=4):
    track_index = track_row['index']
    identifier = track_row['identifier']
    bigwig_file = f'../../data/Enformer_tracks/downloads/{identifier}.bigWig'

    # 将 bed_df 转换成一个简单的列表，以便传入 pool.imap
    rows = [(r.chr, r.start, r.end) for r in bed_df.itertuples()]

    # 使用多进程
    # initializer 用于在子进程启动时运行 init_worker，将 bw 对象在子进程内打开
    with Pool(processes=num_workers, initializer=init_worker, initargs=(bigwig_file,)) as pool:
        # 使用 imap 异步迭代，配合 tqdm 显示进度条
        labels = list(tqdm(pool.imap(process_row, rows), total=len(rows)))

    labels = np.array(labels)
    np.save(f'data/labels_track_index_{track_index}.npy', labels)

# 假设你有一个 targets_K562 是 DataFrame，里面有至少四行数据
# 以及 bed_df 是你要处理的区间表格
for i in range(4):
    track_row = targets_K562.iloc[i]
    process_track(track_row, bed_df, num_workers=28)

100%|██████████| 38171/38171 [00:45<00:00, 840.79it/s]
100%|██████████| 38171/38171 [00:45<00:00, 846.70it/s]
100%|██████████| 38171/38171 [00:44<00:00, 856.32it/s]
100%|██████████| 38171/38171 [00:46<00:00, 829.59it/s]


In [None]:
# @title `get_dataset(organism, subset, num_threads=8)`
import glob
import json
import functools
import tensorflow as tf
import tensorflow_hub as hub

# @title `get_targets(organism)`
def get_targets(organism):
  # targets_txt = f'https://raw.githubusercontent.com/calico/basenji/master/manuscripts/cross2020/targets_{organism}.txt'
  targets_txt = f'/home/shared/enformer_data/{organism}/targets.txt'
  return pd.read_csv(targets_txt, sep='\t')


def organism_path(organism):
  # return os.path.join('gs://basenji_barnyard/data', organism)
  return os.path.join('/home/shared/enformer_data', organism)


def get_dataset(organism, subset, num_threads=8):
  metadata = get_metadata(organism)
  dataset = tf.data.TFRecordDataset(tfrecord_files(organism, subset),
                                    compression_type='ZLIB',
                                    num_parallel_reads=num_threads)
  dataset = dataset.map(functools.partial(deserialize, metadata=metadata),
                        num_parallel_calls=num_threads)
  return dataset


def get_metadata(organism):
  # Keys:
  # num_targets, train_seqs, valid_seqs, test_seqs, seq_length,
  # pool_width, crop_bp, target_length
  path = os.path.join(organism_path(organism), 'statistics.json')
  with tf.io.gfile.GFile(path, 'r') as f:
    return json.load(f)


def tfrecord_files(organism, subset):
  # Sort the values by int(*).
  return sorted(tf.io.gfile.glob(os.path.join(
      organism_path(organism), 'tfrecords', f'{subset}-*.tfr'
  )), key=lambda x: int(x.split('-')[-1].split('.')[0]))


def deserialize(serialized_example, metadata):
  """Deserialize bytes stored in TFRecordFile."""
  feature_map = {
      'sequence': tf.io.FixedLenFeature([], tf.string),
      'target': tf.io.FixedLenFeature([], tf.string),
  }
  example = tf.io.parse_example(serialized_example, feature_map)
  sequence = tf.io.decode_raw(example['sequence'], tf.bool)
  sequence = tf.reshape(sequence, (metadata['seq_length'], 4))
  sequence = tf.cast(sequence, tf.float32)

  target = tf.io.decode_raw(example['target'], tf.float16)
  target = tf.reshape(target,
                      (metadata['target_length'], metadata['num_targets']))
  target = tf.cast(target, tf.float32)

  return {'sequence': sequence,
          'target': target}

In [None]:
human_dataset = get_dataset('human', 'test')

In [None]:
l = list(human_dataset)

In [None]:
seqs = []
for item in l:
    seqs.append(item['sequence'].numpy())
seqs = np.stack(seqs, axis=0)
print(seqs.shape)

In [None]:
np.save('data/enformer_seqs.npy', seqs)

In [None]:
labels = []
for item in tqdm(l):
    labels.append(item['target'].numpy())
labels = np.stack(labels, axis=0)
print(labels.shape)

In [None]:
np.save('data/enformer_targets.npy', labels)

In [None]:
# seqs = np.load('data/enformer_seqs.npy')
# seqs = onehots2strs(seqs)
# save_txt('data/enformer_seqs.txt', seqs)