In [None]:
# Import necessary libraries
import numpy as np
import pandas as pd
import os
from Bio import SeqIO
from tqdm.notebook import tqdm

from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.linear_model import SGDClassifier
from sklearn.multioutput import MultiOutputClassifier

print("✓ All libraries imported successfully!")

In [None]:
# Verify data directory
DATA_DIR = '../data'
if os.path.exists(DATA_DIR):
    print(f"✓ Data directory found: {DATA_DIR}")
else:
    print(f"✗ Data directory not found: {DATA_DIR}")

# SVM Baseline Model for Protein Function Prediction

This notebook trains a Linear SVM model using CTD features (Composition, Transition, Distribution) for protein function prediction.

In [None]:
# Calculating Naive Frequency given a sequence
def naive_freq(sequence):
    """
    Input: an amino acid sequence of variable length
    Output: a list of shape (1, 20), each element standing for an amino acid and its frequency
    """
    # Counting the appearance of each amino acid within the sequence
    AMINO_ACIDS = 'ARNDCQEGHILKMFPSTWYV'

    AA_TO_INDEX = {aa: i for i, aa in enumerate(AMINO_ACIDS)}

    frequency_vector = [0] * 20

    for amino_acid in sequence:
        try:
            index = AA_TO_INDEX[amino_acid]
            frequency_vector[index] += 1
        except KeyError:
            print(f"Warning: Skipping unknown amino acid {amino_acid}")
            pass

    # Calculating frequencies for each amino acid
    total_length = len(sequence)

    normalized_vector = frequency_vector
    if total_length > 0:
        normalized_vector = [count / total_length for count in frequency_vector]

    return normalized_vector

## Feature Extraction Functions

In [None]:
CTD_GROUPS_BY_LIST = {
    "Hydrophobicity": {
        1: {"A", "V", "L", "I", "M", "F", "W", "C"},               # Hydrophobic
        2: {"G", "H", "Y", "P", "T", "S"},          # Neutral
        3: {"R", "K", "Q", "E", "D", "N"}           # Polar
    },

    "Charge": {
        1: {"D", "E"},      # Negative
        2: {"A", "G", "I", "L", "M", "F", "P", "Q", "S", "T", "W", "Y", "V", "N", "C"}, # Neutral
        3: {"K", "R", "H"}     # Positive
    },

    "VanDerWaals": {
        1: {"A", "G", "S", "C"},  # Small
        2: {"T", "D", "P", "N", "V"},   # Medium
        3: {"E", "Q", "L", "I", "F", "Y", "M", "H", "K", "R", "W"}   # Large
    },

    "Polarity": {
        1: {"L", "A", "W", "F", "C", "M", "V", "I", "Y"},   # Small
        2: {"P", "T", "S", "G", "H"},    # Medium
        3: {"Q", "N", "E", "D", "K", "R"}   # High
    },

    "Polarizability": {
        1: {"G", "A", "S", "D", "C"},   # Small
        2: {"T", "P", "N", "H", "E", "Q", "K"},   # Medium
        3: {"M", "I", "L", "V", "F", "Y", "W", "R"}
    },

    "SecondStructure": {
        1: {"E", "A", "L", "M", "Q", "K", "R", "H"},  # Helix
        2: {"V", "I", "Y", "C", "W", "F", "T"},  # Strand
        3: {"G", "N", "P", "S", "D"}  # Coil
    },

    "Solvent": {
        1: {"A", "L", "F", "C", "G", "I", "V", "W"},  # Buried
        2: {"R", "K", "Q", "E", "D", "N"},  # Intermediate
        3: {"M", "S", "P", "T", "H", "Y"}  # Exposed
    }
}

def get_group(aa, property_map):
    for g, aa_set in property_map.items():
        if aa in aa_set:
            return g
    return None

In [None]:
import math

def aa_ctd(sequence, physicochem):
    """
    For this composition, we compute the statistics on groups divided by physicochemical properties:
        e.g: By hydrophobicity, we have three groups:
            Class 1: {A, G, V, L, I, M, F, W, P}
            Class 2: {R, K, H}
            Class 3: {S, T, Y, C, N, Q, D, E}

    Amino Acid composition is made up of three subcompositions for each physicochemical property:
    - Composition (C): The frequency of a group by the number of AAs
    - Transition (T) :
    Input: an amino acid sequence of variable length

    """
    property_map = CTD_GROUPS_BY_LIST[physicochem]
    L = len(sequence)

    groups = []
    for aa in sequence:
        g = get_group(aa, property_map)
        if g is not None:
            groups.append(g)

    L = len(groups)
    if L == 0:
        return [0.0] * 21  # safe fallback

    # Count members of each group
    N = {1: 0, 2: 0, 3: 0}
    for g in groups:
        N[g] += 1

    composition = [N[1]/L, N[2]/L, N[3]/L]

    # Transitions
    T12 = T13 = T23 = 0
    for i in range(L - 1):
        g1, g2 = groups[i], groups[i + 1]
        if g1 == g2:
            continue
        gmin, gmax = min(g1, g2), max(g1, g2)
        if gmin == 1 and gmax == 2:
            T12 += 1
        elif gmin == 1 and gmax == 3:
            T13 += 1
        elif gmin == 2 and gmax == 3:
            T23 += 1

    denom = L - 1
    transition = [T12/denom, T13/denom, T23/denom]

    # Distribution
    positions = {1: [], 2: [], 3: []}
    for i, g in enumerate(groups):
        positions[g].append(i + 1)

    P_k = [0.0, 0.25, 0.5, 0.75, 1.0]
    distribution = []

    for g in [1, 2, 3]:
        pos_list = positions[g]
        Ng = len(pos_list)
        if Ng == 0:
            distribution.extend([0.0]*5)
            continue
        for pk in P_k:
            if pk == 0:
                idx = 0
            else:
                idx = math.ceil(Ng * pk) - 1
            distribution.append(pos_list[idx] / L)

    return composition + transition + distribution

sequence = "AEAAAEAEEAAAAAEAEEEAAEEAEEEAAE"
ctd = aa_ctd(sequence, "Hydrophobicity")
print(len(ctd))

In [None]:
def dipeptide_composition(seq):
    """Compute dipeptide composition."""
    AA = "ACDEFGHIKLMNPQRSTVWY"
    dipeptides = [a+b for a in AA for b in AA]
    seq = seq.upper()
    length = len(seq)-1 if len(seq)>1 else 1
    return [sum(1 for i in range(len(seq)-1) if seq[i]+seq[i+1]==dp)/length for dp in dipeptides]

In [None]:
def extract_accession(header):
    """Extract accession from a FASTA header like >sp|A0JP26|POTB3_HUMAN"""
    parts = header.lstrip('>').split('|')
    if len(parts) >= 2:
        return parts[1]
    else:
        return header.lstrip('>')

In [None]:
# Feature extraction from FASTA files
bio_properties = ["Hydrophobicity", "Charge", "VanDerWaals", "Polarity",
                  "Polarizability", "SecondStructure", "Solvent"]

def load_fasta_features(fasta_path):
    """
    Load sequences from a FASTA file, extract features, and return
    a feature matrix X and a list of protein IDs.
    """
    if not os.path.exists(fasta_path):
        print(f"Error: {fasta_path} not found!")
        return [], np.array([])
    
    ids = []
    feats = []

    for record in SeqIO.parse(fasta_path, "fasta"):
        protein_id = extract_accession(record.id)
        seq = str(record.seq).upper()

        # Feature extraction
        x = []
        x.extend(naive_freq(seq))
        for prop in bio_properties:
            x.extend(aa_ctd(seq, prop))
        x.extend(dipeptide_composition(seq))

        ids.append(protein_id)
        feats.append(x)

    X = np.vstack(feats)
    print(f"Loaded {len(ids)} sequences with {X.shape[1]} features each")
    return ids, X

# Load training data
train_fasta_path = "../data/Train/train_sequences.fasta"
train_ids, X_train = load_fasta_features(train_fasta_path)

## Load Training Data

In [None]:
# Load training labels from train_terms.tsv
train_terms_path = "../data/Train/train_terms.tsv"

if os.path.exists(train_terms_path):
    df = pd.read_csv(train_terms_path, sep="\t", names=["EntryID", "GO", "Ont"])
    
    df["EntryID"] = df["EntryID"].str.strip()
    df["GO"] = df["GO"].str.strip()
    df["Ont"] = df["Ont"].str.strip()
    
    # Split labels by ontology
    labels_MF = {}  # Molecular Function
    labels_BP = {}  # Biological Process
    labels_CC = {}  # Cellular Component
    
    for entry, go, ont in zip(df["EntryID"], df["GO"], df["Ont"]):
        if ont == "F":
            labels_MF.setdefault(entry, []).append(go)
        elif ont == "P":
            labels_BP.setdefault(entry, []).append(go)
        elif ont == "C":
            labels_CC.setdefault(entry, []).append(go)
    
    print(f"Loaded labels for:")
    print(f"  - Molecular Function (MF): {len(labels_MF)} proteins")
    print(f"  - Biological Process (BP): {len(labels_BP)} proteins")
    print(f"  - Cellular Component (CC): {len(labels_CC)} proteins")
else:
    print(f"Error: {train_terms_path} not found!")

In [None]:
# Build label matrices for each ontology
def build_Y(train_ids, label_dict):
    """Build binary label matrix from label dictionary"""
    # Collect all GO terms for this ontology
    all_terms = sorted({go for gos in label_dict.values() for go in gos})
    term_to_index = {go: i for i, go in enumerate(all_terms)}

    Y = np.zeros((len(train_ids), len(all_terms)), dtype=np.uint8)

    for i, pid in enumerate(train_ids):
        if pid in label_dict:
            for go in label_dict[pid]:
                j = term_to_index[go]
                Y[i, j] = 1

    return Y, term_to_index, all_terms

# Build label matrices
Y_MF_UNFILTERED, mf_term_to_idx, mf_terms = build_Y(train_ids, labels_MF)
Y_BP_UNFILTERED, bp_term_to_idx, bp_terms = build_Y(train_ids, labels_BP)
Y_CC_UNFILTERED, cc_term_to_idx, cc_terms = build_Y(train_ids, labels_CC)

print(f"Label matrix shapes (before filtering):")
print(f"  MF: {Y_MF_UNFILTERED.shape}")
print(f"  BP: {Y_BP_UNFILTERED.shape}")
print(f"  CC: {Y_CC_UNFILTERED.shape}")

In [None]:
# Train SVM models for each ontology
print("Training SVM models...")

# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_reduced)

# SGDClassifier (linear SVM equivalent)
base = SGDClassifier(loss='hinge', max_iter=1000, tol=1e-3, class_weight='balanced', random_state=42)

# Train models for each ontology
print("Training MF model...")
svm_mf = MultiOutputClassifier(base, n_jobs=-1)
svm_mf.fit(X_train_scaled, Y_MF)

print("Training BP model...")
svm_bp = MultiOutputClassifier(base, n_jobs=-1)
svm_bp.fit(X_train_scaled, Y_BP)

print("Training CC model...")
svm_cc = MultiOutputClassifier(base, n_jobs=-1)
svm_cc.fit(X_train_scaled, Y_CC)

print("All models trained successfully!")

## Train SVM Models

In [None]:
# Filter rare labels (keep only labels with >= min_pos annotations)
min_pos = 50

keep_MF = (Y_MF_UNFILTERED.sum(axis=0) >= min_pos)
keep_BP = (Y_BP_UNFILTERED.sum(axis=0) >= min_pos)
keep_CC = (Y_CC_UNFILTERED.sum(axis=0) >= min_pos)

Y_MF = Y_MF_UNFILTERED[:, keep_MF]
Y_BP = Y_BP_UNFILTERED[:, keep_BP]
Y_CC = Y_CC_UNFILTERED[:, keep_CC]

# Filter term lists to match filtered label matrices
mf_terms_filtered = np.array(mf_terms)[keep_MF]
bp_terms_filtered = np.array(bp_terms)[keep_BP]
cc_terms_filtered = np.array(cc_terms)[keep_CC]

print(f"After filtering (min_pos={min_pos}):")
print(f"  Remaining labels MF: {Y_MF.shape[1]}")
print(f"  Remaining labels BP: {Y_BP.shape[1]}")
print(f"  Remaining labels CC: {Y_CC.shape[1]}")

In [None]:
# Dimensionality reduction using PCA
print("Applying PCA for dimensionality reduction...")

pca = PCA(n_components=0.95)  # Keep 95% variance
X_train_reduced = pca.fit_transform(X_train)

print(f"Original dimensions: {X_train.shape[1]}")
print(f"Reduced dimensions (95% variance): {X_train_reduced.shape[1]}")
print(f"Explained variance ratio: {pca.explained_variance_ratio_.sum():.4f}")

In [None]:
# Load test data
test_fasta_path = "../data/Test/testsuperset.fasta"

if os.path.exists(test_fasta_path):
    test_ids, X_test = load_fasta_features(test_fasta_path)
    
    # Apply same transformations as training data
    X_test_reduced = pca.transform(X_test)
    X_test_scaled = scaler.transform(X_test_reduced)
    
    print(f"Test data loaded: {X_test.shape[0]} samples")
    print(f"Test data reduced shape: {X_test_reduced.shape}")
else:
    print(f"Warning: {test_fasta_path} not found!")

## Generate Predictions

In [None]:
# GO term loading and Ensemble prediction class
def load_go_terms(obo_path):
    """Parses GO.obo to map GO IDs to their root ontology (MFO, BPO, CCO)."""
    go_terms = {}
    current_term = None
    
    if not os.path.exists(obo_path):
        print(f"Warning: {obo_path} not found!")
        return go_terms
    
    with open(obo_path, 'r') as f:
        for line in f:
            line = line.strip()
            if line.startswith('[Term]'):
                current_term = {}
            elif current_term is not None:
                if line.startswith('id:'):
                    current_term['id'] = line.split(': ')[1]
                elif line.startswith('namespace:'):
                    namespace = line.split(': ')[1]
                    if 'biological_process' in namespace:
                        current_term['root'] = 'BPO'
                    elif 'molecular_function' in namespace:
                        current_term['root'] = 'MFO'
                    elif 'cellular_component' in namespace:
                        current_term['root'] = 'CCO'
                elif line == '':
                    if current_term.get('id') and current_term.get('root'):
                        go_terms[current_term['id']] = current_term['root']
                    current_term = None
    return go_terms

class ProteinPredictions:
    """Stores and merges predictions from multiple sources."""
    def __init__(self):
        self.predictions = {}

    def add_prediction(self, protein, go_term, score, branch, bonus=0):
        if protein not in self.predictions:
            self.predictions[protein] = {'CCO': {}, 'MFO': {}, 'BPO': {}}
        
        score = float(score)

        if go_term in self.predictions[protein][branch]:
            current_score = self.predictions[protein][branch][go_term]
            new_score = max(current_score, score) + bonus
            self.predictions[protein][branch][go_term] = new_score
        else:
            self.predictions[protein][branch][go_term] = score

        if self.predictions[protein][branch][go_term] > 1:
            self.predictions[protein][branch][go_term] = 1

    def get_predictions(self, output_file='submission_ensemble.tsv', top=35):
        """Exports the merged predictions to a CAFA-formatted file."""
        with open(output_file, 'w') as f:
            for protein, branches in tqdm(self.predictions.items(), desc="Writing Final Submission"):
                for branch, go_terms in branches.items():
                    top_go_terms = sorted(go_terms.items(), key=lambda x: x[1], reverse=True)[:top]
                    
                    for go_term, score in top_go_terms:
                        f.write(f"{protein}\t{go_term}\t{score:.6f}\n")

        print(f"\nEnsemble CAFA submission saved to: {output_file}")

# Load GO terms
GO_OBO_PATH = '../data/Train/go-basic.obo'
go_terms_dict = load_go_terms(GO_OBO_PATH)
print(f"Loaded {len(go_terms_dict)} GO terms from ontology")

In [None]:
# Function to make CAFA submission with SVM
def make_cafa_submission_batch_svm(
    test_ids,
    X_test_scaled,
    svm_mf, mf_terms,
    svm_bp, bp_terms,
    svm_cc, cc_terms,
    output_path="submission_svm.tsv",
    threshold=0.0,
    batch_size=1000
):
    """Generate CAFA-format submission file from SVM predictions"""
    N = len(test_ids)
    
    def get_scores(svm_model, X_batch):
        """Return decision function scores for each label in MultiOutputClassifier"""
        score_list = [est.decision_function(X_batch) for est in svm_model.estimators_]
        scores = np.column_stack(score_list)
        return scores

    with open(output_path, "w") as f:
        for start in tqdm(range(0, N, batch_size), desc="Processing proteins"):
            end = min(start + batch_size, N)

            X_batch = X_test_scaled[start:end]
            ids_batch = test_ids[start:end]

            # Get decision scores
            scores_mf = get_scores(svm_mf, X_batch)
            scores_bp = get_scores(svm_bp, X_batch)
            scores_cc = get_scores(svm_cc, X_batch)
            
            # Write predictions
            for i, pid in enumerate(ids_batch):
                ontologies = [
                    (scores_mf[i], mf_terms),
                    (scores_bp[i], bp_terms),
                    (scores_cc[i], cc_terms)
                ]
                
                for scores, terms in ontologies:
                    mask = scores >= threshold
                    for go, score in zip(np.array(terms)[mask], scores[mask]):
                        f.write(f"{pid}\t{go}\t{score:.6f}\n")

    print(f"\nCAFA submission saved to: {output_path}")

In [None]:
# Generate predictions and create ensemble submission
CTD_SUBMISSION_FILE = "../data/checkpoint/submissions/submission_svm.tsv"

# Generate SVM predictions
print("Generating SVM predictions...")
make_cafa_submission_batch_svm(
    test_ids,
    X_test_scaled,
    svm_mf, mf_terms_filtered,
    svm_bp, bp_terms_filtered, 
    svm_cc, cc_terms_filtered, 
    output_path=CTD_SUBMISSION_FILE, 
    batch_size=1000
)

# Initialize ensemble
predictor = ProteinPredictions()

# Load SVM predictions
print(f"\nLoading SVM predictions from: {CTD_SUBMISSION_FILE}")
if os.path.exists(CTD_SUBMISSION_FILE):
    with open(CTD_SUBMISSION_FILE, 'r') as f:
        for item in tqdm(f, desc="Processing SVM scores"):
            try:
                protein_id, go_term, score_str = item.strip().split('\t')
                score = float(score_str)
                if go_term in go_terms_dict:
                    root = go_terms_dict[go_term]
                    predictor.add_prediction(protein_id, go_term, score, root, bonus=0.0)
            except ValueError:
                continue

    # Export final ensemble submission
    FINAL_SUBMISSION_FILE = "../data/checkpoint/submissions/submission_svm_final.tsv"
    predictor.get_predictions(FINAL_SUBMISSION_FILE, top=35)
    print(f"\n✓ Submission file created: {FINAL_SUBMISSION_FILE}")
else:
    print(f"Error: {CTD_SUBMISSION_FILE} not found!")