In [None]:
import os
import numpy as np
import pandas as pd
import torch
import tensorflow as tf
from tqdm import tqdm
from transformers import AutoModel, AutoTokenizer
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.layers import Conv1D, MaxPooling1D, LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
import matplotlib.pyplot as plt
from functools import lru_cache
from joblib import Parallel, delayed
import heapq

In [None]:
# GPU Configuration
gpu_devices = tf.config.experimental.list_physical_devices('GPU')
if gpu_devices:
    for device in gpu_devices:
        tf.config.experimental.set_memory_growth(device, True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Load ProtBERT Model & Tokenizer
tokenizer = AutoTokenizer.from_pretrained("Rostlab/prot_bert", do_lower_case=False)
model = AutoModel.from_pretrained("Rostlab/prot_bert").to(device)

In [None]:
# Data Loading Functions
def load_data(folder_path, label, max_sequences=100000, chunk_size=1000):
    sequences, labels = [], []
    for filename in os.listdir(folder_path):
        if filename.endswith(".csv"):
            file_path = os.path.join(folder_path, filename)
            for chunk in pd.read_csv(file_path, usecols=[0], chunksize=chunk_size):
                sequences.extend(chunk.iloc[:, 0].tolist()[:max_sequences - len(sequences)])
                labels.extend([label] * min(len(chunk), max_sequences - len(labels)))
                if len(sequences) >= max_sequences:
                    break
    return sequences, labels

In [None]:
# Feature Extraction Functions
def extract_features(sequences):
    def calc_feature(seq):
        length = len(seq)
        aa_counts = {char: seq.count(char) for char in "AILMFWVKRDEAEHKLQRIVYFWGNPSD"}
        hydrophobicity = sum(aa_counts.get(char, 0) for char in "AILMFWV") / length
        charge = (aa_counts.get('K', 0) + aa_counts.get('R', 0)) - (aa_counts.get('D', 0) + aa_counts.get('E', 0))
        molecular_weight = sum(ord(char) for char in seq) / length
        alpha_helix = sum(aa_counts.get(char, 0) for char in "AEHKLQR") / length
        beta_sheet = sum(aa_counts.get(char, 0) for char in "IVYFW") / length
        turn = sum(aa_counts.get(char, 0) for char in "GNPSD") / length
        return [length, hydrophobicity, charge, molecular_weight, alpha_helix, beta_sheet, turn]
    return np.array([calc_feature(seq) for seq in sequences], dtype=np.float32)
def get_protbert_embeddings(sequences, batch_size=16):
    model.eval()
    embeddings = []
    with torch.no_grad():
        for i in tqdm(range(0, len(sequences), batch_size), desc="Extracting Embeddings", unit="batch"):
            batch = [" ".join(list(seq)) for seq in sequences[i:i + batch_size]]  
            inputs = tokenizer(batch, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
            outputs = model(**inputs)
            embeddings.append(outputs.last_hidden_state.mean(dim=1).cpu().numpy())
    return np.vstack(embeddings).astype(np.float32)

In [None]:
# String Algorithm Functions
def build_suffix_array(seq):
    """Builds suffix array using sorted suffixes"""
    suffixes = sorted((seq[i:], i) for i in range(len(seq)))
    return [suffix[1] for suffix in suffixes]

def build_lcp(seq, suffix_array):
    """Builds LCP (Longest Common Prefix) array"""
    n = len(seq)
    rank = [0] * n
    lcp = [0] * n
    for i, suffix in enumerate(suffix_array):
        rank[suffix] = i
    h = 0
    for i in range(n):
        if rank[i] > 0:
            j = suffix_array[rank[i] - 1]
            while (i + h < n) and (j + h < n) and (seq[i + h] == seq[j + h]):
                h += 1
            lcp[rank[i]] = h
            if h > 0:
                h -= 1
    return lcp

def compute_kmp_lps(pattern):
    """Computes Longest Prefix Suffix (LPS) array for KMP algorithm"""
    lps = [0] * len(pattern)
    j = 0  # length of previous longest prefix suffix
    for i in range(1, len(pattern)):
        while j > 0 and pattern[i] != pattern[j]:
            j = lps[j - 1]
        if pattern[i] == pattern[j]:
            j += 1
            lps[i] = j
    return lps

def kmp_search(sequence, pattern):
    """Finds occurrences of a pattern in a sequence using KMP algorithm"""
    lps = compute_kmp_lps(pattern)
    matches = []
    j = 0  # index for pattern
    for i in range(len(sequence)):
        while j > 0 and sequence[i] != pattern[j]:
            j = lps[j - 1]
        if sequence[i] == pattern[j]:
            j += 1
        if j == len(pattern):
            matches.append(i - j + 1)
            j = lps[j - 1]
    return matches

# Aho-Corasick Algorithm
class TrieNode:
    def _init_(self):
        self.children = {}
        self.fail = None
        self.output = []

class AhoCorasickTrie:
    def _init_(self, patterns):
        self.root = TrieNode()
        self.build_trie(patterns)
        self.build_failure_links()

    def build_trie(self, patterns):
        """Builds Trie from patterns"""
        for pattern in patterns:
            node = self.root
            for char in pattern:
                if char not in node.children:
                    node.children[char] = TrieNode()
                node = node.children[char]
            node.output.append(pattern)

    def build_failure_links(self):
        """Builds failure links for Aho-Corasick"""
        queue = []
        for node in self.root.children.values():
            node.fail = self.root
            queue.append(node)
        while queue:
            current = queue.pop(0)
            for char, child in current.children.items():
                queue.append(child)
                fail_node = current.fail
                while fail_node and char not in fail_node.children:
                    fail_node = fail_node.fail
                child.fail = fail_node.children[char] if fail_node else self.root
                child.output += child.fail.output if child.fail else []

    def search(self, text):
        """Searches for all patterns in text"""
        node = self.root
        results = []
        for i, char in enumerate(text):
            while node and char not in node.children:
                node = node.fail
            if not node:
                node = self.root
                continue
            node = node.children[char]
            if node.output:
                results.append((i, node.output))
        return results  
# FFT Algorithm
def compute_fft(sequence):
    """Computes FFT (Fast Fourier Transform) on peptide sequence"""
    numeric_seq = [ord(char) for char in sequence] 
    fft_result = np.fft.fft(numeric_seq)
    return np.abs(fft_result)[:len(fft_result)//2]
# MST Algorithm
def prim_mst(graph):
    """Computes MST using Prim's Algorithm"""
    if not graph:
        return []
    
    n = len(graph)
    mst = []
    visited = set()
    min_heap = [(0, 0, -1)] 

    while len(visited) < n:
        cost, node, prev = heapq.heappop(min_heap)
        if node in visited:
            continue
        visited.add(node)
        if prev != -1:
            mst.append((prev, node, cost))
        for neighbor, weight in graph[node]:
            if neighbor not in visited:
                heapq.heappush(min_heap, (weight, neighbor, node))
    return mst
# Dynamic Programming Functions
@lru_cache(None)
def lcs(seq1, seq2):
    """Longest Common Subsequence using Dynamic Programming with memoization"""
    if not seq1 or not seq2:
        return 0
    elif seq1[-1] == seq2[-1]:
        return 1 + lcs(seq1[:-1], seq2[:-1])
    else:
        return max(lcs(seq1[:-1], seq2), lcs(seq1, seq2[:-1]))

def compute_lcs_features(sequences):
    """Compute LCS features for sequences"""
    reference_seq = sequences[0]  # Choose a reference sequence
    return np.array([lcs(seq, reference_seq) / len(seq) for seq in sequences]).reshape(-1, 1)
# Backtracking Function
def generate_subsequences(seq, max_length=4):
    """Generate all subsequences up to max_length using backtracking"""
    subsequences = []
    
    def backtrack(current, index):
        if 1 < len(current) <= max_length:
            subsequences.append(current)
        if len(current) == max_length:
            return
        for i in range(index, len(seq)):
            backtrack(current + seq[i], i + 1)
    
    backtrack("", 0)
    return subsequences

def subseq_features(sequences):
    """Convert subsequences to features"""
    return np.array([len(generate_subsequences(seq)) for seq in sequences]).reshape(-1, 1)
# QuickSort for Feature Scaling
def quicksort(arr):
    if len(arr) <= 1:
        return arr
    pivot = arr[len(arr) // 2]
    left = [x for x in arr if x < pivot]
    middle = [x for x in arr if x == pivot]
    right = [x for x in arr if x > pivot]
    return quicksort(left) + middle + quicksort(right)

def apply_quicksort_scaling(features):
    return np.array([quicksort(feature_row.tolist()) for feature_row in features])

In [None]:
# CNN-LSTM Model
def create_model(input_shape):
    model = Sequential([
        Conv1D(filters=128, kernel_size=5, activation='relu', input_shape=(input_shape, 1)),
        BatchNormalization(),
        MaxPooling1D(pool_size=2),
        LSTM(256, return_sequences=True, activation='tanh'),
        Dropout(0.5),  
        LSTM(200, return_sequences=False, activation='tanh'),
        Dropout(0.5),
        Dense(128, activation='relu'),
        Dense(2, activation='softmax')
    ])
    model.compile(optimizer=Adam(learning_rate=0.0001),  
                  loss='sparse_categorical_crossentropy', 
                  metrics=['accuracy'])
    return model
# Training Function
def train_model(X_train, y_train, X_val, y_val):
    model = create_model(X_train.shape[1])
    early_stopping = EarlyStopping(monitor='val_accuracy', patience=20, restore_best_weights=True, mode='max')
    model_checkpoint = ModelCheckpoint('Therapeutic vs Non-Therapeutic Classifier.h5', monitor='val_accuracy', save_best_only=True, mode='max')
    history = model.fit(X_train, y_train, epochs=100, batch_size=128, validation_data=(X_val, y_val), 
                        callbacks=[early_stopping, model_checkpoint])
    return model, history

In [None]:
# Prediction Function
def predict_sequence(sequence, model_path='Therapeutic vs Non-Therapeutic Classifier.h5'):
    model = load_model(model_path)
    sequence_embedding = get_protbert_embeddings([sequence])
    sequence_features = extract_features([sequence])
    input_data = np.hstack((sequence_embedding, sequence_features)).reshape(1, -1, 1)
    prediction = model.predict(input_data)[0]  
    prob_therapeutic = prediction[1]
    return "Therapeutic" if prob_therapeutic >= 0.5 else "Non-Therapeutic", prob_therapeutic
# Load data paths
folder_therapeutic = r"data\Therapeutic vs Non-Therapeutic Classification data\Therapeutic data"
folder_nontherapeutic = r"data\Therapeutic vs Non-Therapeutic Classification data\Non-Therapeutic data"
# Load dataset
seqs_thera, labels_thera = load_data(folder_therapeutic, 1)
seqs_nonthera, labels_nonthera = load_data(folder_nontherapeutic, 0)

sequences = seqs_thera + seqs_nonthera
labels = np.array(labels_thera + labels_nonthera, dtype=np.int32)
# Feature extraction
if not os.path.exists("X_embeddings.npy"):
    X_embeddings = get_protbert_embeddings(sequences)
    np.save("X_embeddings.npy", X_embeddings)
else:
    X_embeddings = np.load("X_embeddings.npy")

X_features = extract_features(sequences)
X_combined = np.hstack((X_embeddings, X_features))
# Reshape for CNN-LSTM
X_combined = X_combined.reshape(X_combined.shape[0], X_combined.shape[1], 1)
# Split data
X_train, X_val, y_train, y_val = train_test_split(X_combined, labels, test_size=0.2, random_state=42)
# Train model
best_model, history = train_model(X_train, y_train, X_val, y_val)
# Plot training results
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Training Accuracy')
plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
plt.title('Model Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()
# Print final metrics
final_train_acc = history.history['accuracy'][-1]
final_val_acc = history.history['val_accuracy'][-1]
print(f"Final Training Accuracy: {final_train_acc:.4f}")
print(f"Final Validation Accuracy: {final_val_acc:.4f}")
# Evaluate on validation set
val_loss, val_acc = best_model.evaluate(X_val, y_val)
print(f"Validation Accuracy: {val_acc:.4f}")