In [1]:
import pickle
import torch
# import CNN_LSTM_n
# import TF_data_loader
from torch.utils import data as D
import numpy as np
from sklearn.metrics import PrecisionRecallDisplay, RocCurveDisplay
from sklearn.metrics import precision_recall_curve, average_precision_score, roc_curve, auc, roc_auc_score
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm

First, we make the panset, non-normalized

In [2]:
with open('../../../data/Chipseq_data/seq_breathing_feat.pkl', 'rb') as f:
    seq_feat = pickle.load(f)

In [3]:
feat_map = torch.load('../../../data/Chipseq_data/FeatMap.pt')
feat_map = {v: k for k, v in feat_map.items()}
group_index = torch.load('../../../data/Chipseq_data/group_index.pt')

In [4]:
panset = {}

In [5]:
for partition in seq_feat:
    partition = seq_feat[partition]
    for seq_id in partition:
        panset[seq_id] = partition[seq_id]

This has given us the panset, and now we make the positive/negative sorter for each TF

Be reminded that the structure for panset is:

seq_id:feature:value,

and we are gonna have to run some heavy analysis on the label, which is a feature, to determine what is negative and positive for each TF

--

First, what is a TF? and how can we explain it in terms of the values in labels?
A: panset[seq_id]['label'][cell-line index]

--

The sorter will fill TF_map:
take a TF_name and the indices, then
take a seq from panset, look at the label, then
if label == 1 for any of the TF_indices in this seq's label, add it to that TF's positive partition, else
if label != 1 for all of the TF_indices in this seq's label, add it to that TF's negative partition

TF_map's structure:

TF:partition(positive, negative):seq_id

In [6]:
TF_map = {}

# Iterate over the TF names
for TF_name in group_index:
    # Create a dictionary for the TF name
    TF_dict = {}
    
    # Iterate over the partitions (positive and negative)
    for partition in ['positive', 'negative']:
        # Initialize an empty list for the partition
        TF_dict[partition] = []
    
    # Add the TF dictionary to the layered dictionary
    TF_map[TF_name] = TF_dict

In [None]:
for seq_id in tqdm(panset, desc='Processing sequences', unit='sequence'):
    seq_data = panset[seq_id]
    for index, label_value in enumerate(seq_data['label']):
        # Check if the label is 1 for any TF_indices in this seq's label
        if label_value == 1:
            for TF_name, label_indices in group_index.items():
                if index in label_indices:
                    # Add the seq_id to the positive partition of the TF
                    TF_map[TF_name]['positive'].append(seq_id)
                    break
        else:
            # If the label is not 1 for all TF_indices, add the seq_id to the negative partition of the respective TFs
            for TF_name, label_indices in group_index.items():
                if all(label_value != 1 for label_index in label_indices):
                    TF_map[TF_name]['negative'].append(seq_id)


Processing sequences:   0%|          | 0/886625 [00:00<?, ?sequence/s]

In [None]:
for TF_name, partitions in TF_map.items():
    print(f'TF: {TF_name}')
    for partition, seq_ids in partitions.items():
        print(f'Partition: {partition}, Count: {len(seq_ids)}')
    print()

In [None]:
torch.save(TF_map, 'TF_map.pt')

In [None]:
import numpy as np
import torch
from torch.utils import data as D
import pickle

# so this is a necessary preprocessing step right
def seq2onehot(seq):
    window_size = 500
    matrix = np.zeros(shape = (4, window_size), dtype = np.uint8)
    for i, nt in enumerate(seq):
        if nt == "A":
            matrix[0][i] = 1
        elif nt == "G":
            matrix[1][i] = 1
        elif nt == "C":
            matrix[2][i] = 1
        elif nt == "T":
            matrix[3][i] = 1
        else:
            continue
    return matrix

def TF_sorter(panset, TF_map):
    np.random.seed(108)
    data_dict = {}

    for TF_name in TF_map:
        pos_seqs = TF_name['positive']
        neg_seqs = TF_name['negative']

        # Select 10 times the number of positive sequences from the negative sequences
        neg_indices = np.random.choice(len(neg_seqs), len(pos_seqs) * 10, replace=False)
        neg_seqs_selected = [neg_seqs[i] for i in neg_indices]

        data_dict[TF_name] = {}
        data_dict[TF_name]['train'] = {}
        data_dict[TF_name]['test'] = {}

        # Populate train and test partitions for positive sequences
        pos_train_count = int(len(pos_seqs) * 0.8)
        pos_train_seqs = pos_seqs[:pos_train_count]
        pos_test_seqs = pos_seqs[pos_train_count:]

        for seq_id in pos_train_seqs:
            data_dict[TF_name]['train'][seq_id] = panset[seq_id]
            data_dict[TF_name]['train'][seq_id]['label'] = np.array([1])

        for seq_id in pos_test_seqs:
            data_dict[TF_name]['test'][seq_id] = panset[seq_id]
            data_dict[TF_name]['test'][seq_id]['label'] = np.array([1])

        # Populate train and test partitions for negative sequences
        neg_train_count = int(len(neg_seqs_selected) * 0.8)
        neg_train_seqs = neg_seqs_selected[:neg_train_count]
        neg_test_seqs = neg_seqs_selected[neg_train_count:]

        for seq_id in neg_train_seqs:
            data_dict[TF_name]['train'][seq_id] = panset[seq_id]
            data_dict[TF_name]['train'][seq_id]['label'] = np.array([0])

        for seq_id in neg_test_seqs:
            data_dict[TF_name]['test'][seq_id] = panset[seq_id]
            data_dict[TF_name]['test'][seq_id]['label'] = np.array([0])

        del data_dict[TF_name]['positive']
        del data_dict[TF_name]['negative']

    return data_dict



    

# and this is the dataloader proper
class TF_data(D.Dataset):
    def __init__(self, panset_path, TF_map_path, TF):
        self.data_dict = TF_sorter(panset_path, TF_map_path)[TF]
        self.id_list = list(self.data_dict)
        self.len = len(self.id_list)

    def __getitem__(self, index):
        seq_id = self.id_list[index]
        sample = self.data_dict[seq_id]
        seq = torch.from_numpy(seq2onehot(sample['seq'])).float()
        label = torch.from_numpy(sample['label']).float()
        coord_feat = torch.from_numpy(sample['coord'])
        coordsq_feat = torch.from_numpy(sample['coord_sq'])
        flip_feat = torch.from_numpy(sample['flip'])
        bio_feat = torch.stack([coord_feat, coordsq_feat, flip_feat], dim = 0).float()
        #bio_feat = torch.cat([seq, bio_feat], dim = 0).float()
        return seq, bio_feat, label

    def __len__(self):
        return self.len

if __name__ == '__main__':
    Dset = TF_data(data_path='/home/blai/Breathing/data/Chipseq_data/seq_breathing_feat.pkl', partition = 'train')
    for feat in Dset:
        print(feat[0].size())
        print(feat[1].size())
        print(feat[2].size())      

         
            
