In [1]:
import numpy as np
import pandas as pd
import random
import shutil
import sys
sys.path.append('..')
import os
import utils

from pathlib import Path

In [2]:
# Paths
fragments_dir = Path('../data/fragments/')

# Load fragment data
fragments_8 = utils.load(fragments_dir/'fragments_8_filtered.pickle')
fragments_10 = utils.load(fragments_dir/'fragments_10_filtered.pickle')
fragments_12 = utils.load(fragments_dir/'fragments_12_filtered.pickle')
fragments_14 = utils.load(fragments_dir/'fragments_14_filtered.pickle')
fragments_16 = utils.load(fragments_dir/'fragments_16_filtered.pickle')
fragments_18 = utils.load(fragments_dir/'fragments_18_filtered.pickle')
fragments_20 = utils.load(fragments_dir/'fragments_20_filtered.pickle')
fragments_22 = utils.load(fragments_dir/'fragments_22_filtered.pickle')
fragments_24 = utils.load(fragments_dir/'fragments_24_filtered.pickle')

In [3]:
def write_csv(path, data): # Data entries should be formatted as (sequence, label) tuple
    with open(path, "w") as f:
        f.write("sequence,label\n")
        for entry in data:
            f.write(f"{entry[0]},{entry[1]}\n")

In [4]:
def split_data(data, train_ratio = 0.8):
    train_cutoff = int(len(data) * train_ratio)
    dev_ratio = (1-train_ratio)/2
    dev_cutoff = int(len(data) * (1-dev_ratio))
    
    train_data = data[:train_cutoff]
    dev_data = data[train_cutoff:dev_cutoff]
    test_data = data[dev_cutoff:]
    return train_data, dev_data, test_data

In [6]:
# DNABERT2 finetuning CSV generation
for task in ["clusters", "tloop"]: # "gnravall", "gnra", "clusters", "tloop"
    for fragment_length in [16, 18, 20, 22, 24]: # 8, 10, 12, 14, 16, 18, 20, 22, 24
        for train_ratio in [0.8]:
            for nucleotides in ["T", "U"]: #"T", "U"
                data_dir = Path(f"{task}_{fragment_length}_{int(train_ratio*100)}_{nucleotides}/")
                if data_dir.exists():
                    shutil.rmtree(data_dir)
                data_dir.mkdir(parents=True, exist_ok=True)
                
                # fragment_length
                fragments = {
                    8: fragments_8,
                    10: fragments_10,
                    12: fragments_12,
                    14: fragments_14,
                    16: fragments_16,
                    18: fragments_18,
                    20: fragments_20,
                    22: fragments_22,
                    24: fragments_24
                }[fragment_length]
                
                # task
                if task == "gnravall":
                    data = [i for i in fragments if i.clust_id != 0]
                    data = [(i.res_seq, 1) if i.clust_id == 1 else (i.res_seq, 0) for i in fragments] # TODO fix
                elif task == "gnra":
                    data = [i for i in fragments if i.clust_id != 0]
                    data = [(i.res_seq, 1) if i.clust_id == 1 else (i.res_seq, 0) for i in data]
                elif task == "clusters":
                    data = [(i.res_seq, i.clust_id) for i in fragments]
                elif task == "tloop":
                    data = [(i.res_seq, 0) if i.clust_id == 0 else (i.res_seq, 1) for i in fragments]
                random.shuffle(data)
                
                # nucleotides
                if nucleotides == "T":
                    data = [(i.replace("U","T"), j) for i, j in data]
                
                # split_data(train_ratio)
                train_data, dev_data, test_data = split_data(data, train_ratio)

                write_csv(data_dir/"train.csv", train_data)
                write_csv(data_dir/"dev.csv", dev_data)
                write_csv(data_dir/"test.csv", test_data)

In [6]:
# ANN NPZ/NPY data generation

def encode_sequence(sequence, residue_map = {'A':0,'U':1,'T':1,'C':2,'G':3,'I':4}): # TODO should I be included?
    seq_array = np.array([residue_map[i] for i in sequence if i in residue_map.keys()])
    encoded_array = np.zeros((seq_array.size, 5), dtype=int)
    encoded_array[np.arange(seq_array.size), seq_array] = 1
    return encoded_array

def make_npzs():
    dirs = [filename for filename in os.listdir("./") if os.path.isdir(f"./{filename}")]
    for dir in dirs:
        csvs = [filename.split(".")[0] for filename in os.listdir(dir) if filename.endswith(".csv")]
        for csv in csvs:
            data = pd.read_csv(f"{dir}/{csv}.csv", sep=",")
            sequences = data["sequence"].tolist()
            labels = data["label"].astype(int).tolist()
            encoded_matrices = [encode_sequence(i) for i in sequences]
            np.savez(f"{dir}/{csv}_matrices.npz", np.array(encoded_matrices))
            np.save(f"{dir}/{csv}_labels.npy", labels)

make_npzs()

In [3]:
# all = len(entries)
# decoys = len([i for i in entries if i.clust_id == 0])
# tloops = len([i for i in entries if i.clust_id != 0])
# print(f"All: {all}\tTetraloops: {tloops} ({tloops/all*100:.4})\tDecoys: {decoys} ({decoys/all*100:.4})")

All: 4991	Tetraloops: 4991 (100.0)	Decoys: 0 (0.0)
