In [None]:
import os
import pandas as pd
from sklearn.neighbors import KernelDensity
import torch
from torch import nn
import numpy as np
import torch.utils.data as Data
from torch.utils.data import DataLoader
from sklearn.metrics import (accuracy_score, roc_auc_score,
                            matthews_corrcoef, average_precision_score,
                            confusion_matrix, f1_score, precision_score,
                            recall_score, #specificity_score,
                            classification_report)
from tqdm import tqdm
import time

In [54]:
#!rm -rf /content/EDEN
!git clone https://github.com/zabihis/EDEN/


fatal: destination path 'EDEN' already exists and is not an empty directory.


In [55]:

def evaluate_model(model, data_loader, criterion, device):
    """Evaluate model on given data loader and return loss and metrics"""
    model.eval()
    running_loss = 0.0
    all_targets = []
    all_probs = []
    all_preds = []

    with torch.no_grad():
        for data, target in data_loader:
            data, target = data.to(device), target.to(device)
            outputs = model(data)

            running_loss += criterion(outputs, target.float().unsqueeze(1)).item()

            probs = outputs.cpu().numpy().flatten()
            preds = (probs > 0.5).astype(int)

            all_targets.extend(target.cpu().numpy())
            all_probs.extend(probs)
            all_preds.extend(preds)

    loss = running_loss / len(data_loader)
    metrics = calculate_metrics(np.array(all_targets),
                              np.array(all_preds),
                              np.array(all_probs))
    return loss, metrics


def print_metrics(metrics, prefix=''):
    """Print formatted metrics"""
    print(f"{prefix}Accuracy: {metrics['accuracy']:.4f} | "
          f"AUC: {metrics['auc']:.4f} | MCC: {metrics['mcc']:.4f} | "
          f"AP: {metrics['ap']:.4f}")
    print(f"{prefix}Sensitivity: {metrics['sensitivity']:.4f} | "
          f"Specificity: {metrics['specificity']:.4f} | "
          f"Precision: {metrics['precision']:.4f} | "
          f"F1: {metrics['f1']:.4f}")


def calculate_metrics(y_true, y_pred, y_prob):
    """Calculate all required performance metrics"""
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    metrics = {
        'accuracy': accuracy_score(y_true, y_pred),
        'auc': roc_auc_score(y_true, y_prob),
        'mcc': matthews_corrcoef(y_true, y_pred),
        'ap': average_precision_score(y_true, y_prob),
        'sensitivity': recall_score(y_true, y_pred),  # recall
        'specificity': tn / (tn + fp), #specificity_score(y_true, y_pred),
        'precision': precision_score(y_true, y_pred),
        'f1': f1_score(y_true, y_pred)
    }
    return metrics

In [67]:
def EDNencoderMultiScale(seqs, kernel, bandwidthList, seqLenLimit):
    Nseq = len(seqs)
    Nscale=len(bandwidthList)
    seq_len = len(seqs[0])
    xspace = np.linspace(0, seqLenLimit-1, seqLenLimit)[:, np.newaxis]
    seqsEncoded = np.zeros((Nseq, 4, seqLenLimit, Nscale), dtype='float32') #shape: [n_samples, seq_length, 4]

    if abs(seqLenLimit-seq_len)>1:
      print('* ATTENTION: the sequence length is ', seq_len, ' but you set the seqLenLimit=', seqLenLimit, '. This may affect the model performance.')
      print()

    for iseq in range(Nseq):  #seq loop
        one_seq = seqs[iseq]
        if len(one_seq)>seqLenLimit:
            one_seq=one_seq[:seqLenLimit]
        one_seq=one_seq.upper()
        one_seq.replace("U", "T")

        # find Occurance
        idxA=np.array(list(findstr(one_seq, 'A')))[:, np.newaxis]
        idxT=np.array(list(findstr(one_seq, 'T')))[:, np.newaxis]
        idxG=np.array(list(findstr(one_seq, 'G')))[:, np.newaxis]
        idxC=np.array(list(findstr(one_seq, 'C')))[:, np.newaxis]

        for iscale in range(Nscale):    #scale loop
            bandwidth = bandwidthList[iscale]

            sigA = EDNcalc(idxA, bandwidth, kernel, xspace)
            sigT = EDNcalc(idxT, bandwidth, kernel, xspace)
            sigG = EDNcalc(idxG, bandwidth, kernel, xspace)
            sigC = EDNcalc(idxC, bandwidth, kernel, xspace)

            sig = np.array([sigA, sigT, sigG, sigC])

            seqsEncoded[iseq, :, :, iscale]=sig
    print('EDN encoding done!', Nseq, "x", seqLenLimit)
    return seqsEncoded


def EDNcalc(idxPoints, bandwidth, kernel, xspace):
    if len(idxPoints)>0:
        kde_model = KernelDensity(kernel=kernel, bandwidth=bandwidth).fit(idxPoints)
        EDN_sig = np.exp(kde_model.score_samples(xspace))*len(idxPoints)
    else:
        EDN_sig = np.zeros((xspace.shape[0]))
    return EDN_sig


def loadFile_gue(file_path):
    # Read CSV into pandas DataFrame
    df = pd.read_csv(file_path)
    # Convert to NumPy array
    seqs=df['sequence'].to_numpy()
    labels=df['label'].to_numpy()
    print("file loaded! data size: ", seqs.shape)
    return seqs, labels


def findstr(str, ch):
    for i, ltr in enumerate(str):
        if ltr == ch:
            yield i

In [57]:
class Hybrid_CNN(nn.Module):
    def __init__(self, seq_len=200, scales=4, channels=4):
        super(Hybrid_CNN, self).__init__()
        self.channels = channels  # Number of channels
        self.seq_len = seq_len  # DNA sequence length
        self.scales = scales  # Number of scales

        # High-resolution branch (single scale)
        self.high_res = nn.Sequential(
            nn.Conv1d(in_channels=channels, out_channels=64,\
                       kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Dropout(0.2),

            nn.Conv1d(in_channels=64, out_channels=128,\
                       kernel_size=5, stride=1, padding=2, ),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Dropout(0.2),

            nn.Conv1d(in_channels=128, out_channels=128,\
                       kernel_size=5, stride=1, padding=2, ),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Dropout(0.2),
            nn.AdaptiveMaxPool1d(10)
        )

        # multi-resolution branch (multi scale)
        # First conv layer processes
        self.multiscale_conv1 = nn.Sequential(
            nn.Conv2d(channels, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.2),
        )
        # sec conv layer processes
        self.multiscale_conv2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.2),
        )
        # sec conv layer processes
        self.multiscale_conv3 = nn.Sequential(
            nn.Conv1d(128, 128, kernel_size=5, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(2),
            nn.Dropout(0.2),
        )
        self.AdapMaxPool1d10=nn.AdaptiveMaxPool1d(10)
        self.fc = nn.Sequential(nn.Linear(2*128*10, 256),
                                nn.ReLU(),
                                nn.Dropout(0.2)
                                )
        self.classifier = nn.Sequential(nn.Linear(256, 1),
                                )
    def forward(self,x):
        # High-res branch (first scale only)
        x_high = self.high_res(x[:,:,:,0]).squeeze(-1)
        # first conv layer
        x_multi = self.multiscale_conv1(x)
        # sec conv layer
        x_multi = self.multiscale_conv2(x_multi).squeeze(-1)
        # 3rd conv layer
        x_multi = self.multiscale_conv3(x_multi)
        x_multi=self.AdapMaxPool1d10(x_multi)
        # concat features x_high, x_multi
        x = torch.cat([x_high, x_multi], dim=1)
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        out = self.classifier(x)
        return out



In [68]:
curPath = os.getcwd()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


seqLenLimit=100
dataset_name = 'mouse_tf1'   # human_tfX / mouse_tfX / human_prom_core_XXX / human_prom_300_XXX

model_name='Hybrid_CNN'
pretrainedWeights_path=curPath +'/EDEN/models/Hybrid_CNN_'+dataset_name+'.pth'
data_path = curPath + "/EDEN/datasets/"
testdata_filename=dataset_name+"/test"
batch_size = 64
kernel="cosine"; bandwidthList=[0.5, 1.5, 3, 4.5]
encoding_name='MSEDNSeq'


#-disp info----------
print('* kernel: ', kernel)
print('* Dataset:  '+ data_path + dataset_name)
print('* Model:  '+ model_name)
print('* encoding_name:  '+ encoding_name)
print('* pretrainedWeights_path:  '+ pretrainedWeights_path)
print(device)

#========load, prepare/encode data========
print('loading data file ...')
[seq_test, Y_test] = loadFile_gue(data_path + testdata_filename+'.csv') #load test file

#--ENCODE DATA---
print('encodeing seqs ...')
X_test  = EDNencoderMultiScale(seq_test, kernel, bandwidthList, seqLenLimit)
print('encoding done!')

#-disp data info-------------
print("Shape of X_test:", X_test.shape)

#--Prepare DataLoaders--
test_dataset = Data.TensorDataset((torch.from_numpy(X_test)).to(torch.float32), torch.LongTensor(Y_test))
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

#=======Initialize Model=========
global model
exec('model = '+model_name+'(seq_len=seqLenLimit, scales=4, channels=4)')
model.to(device) # Move model to the device
criterion = nn.BCEWithLogitsLoss()

#-load model weights-------------
print('Load the weights...')
model.load_state_dict(torch.load(pretrainedWeights_path))

#============TEST==================
print('\ntesting...')
_, test_metrics = evaluate_model(model, test_loader, criterion, device)

print('\nFinal Test Performance:')
print_metrics(test_metrics, prefix='Test ')

* kernel:  cosine
* Dataset:  mouse_tf1
* Model:  Hybrid_CNN
* encoding_name:  MSEDNSeq
* pretrainedWeights_path:  /content/EDEN/models/Hybrid_CNN_mouse_tf1.pth
cuda
loading data file ...
file loaded! data size:  (6745,)
encodeing seqs ...
EDN encoding done! 6745 x 100
encoding done!
Shape of X_test: (6745, 4, 100, 4)
Load the weights...

testing...

Final Test Performance:
Test Accuracy: 0.9007 | AUC: 0.9703 | MCC: 0.8028 | AP: 0.9735
Test Sensitivity: 0.8701 | Specificity: 0.9312 | Precision: 0.9267 | F1: 0.8975
