In [None]:
import numpy as np
import pandas as pd 
from tqdm import tqdm
from collections import Counter
from itertools import product
import os
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.metrics import classification_report, accuracy_score
import gc
import joblib
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt
from sklearn.linear_model import LogisticRegression
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.metrics import roc_curve, auc

from utils import *


In [None]:
df_train=read_data_from_file(file_name_ncbi_datas)

df_val=read_data_from_file(gasaid_test_file_name)

df_test_gasaid=read_data_from_file(gasaid_test_file_name)
df_test_ncbi=read_data_from_file(ncbi_test_file_name)

df_test = pd.concat([df_test_gasaid, df_test_ncbi], ignore_index=True)

In [None]:
df_Asian_flu=read_data_from_file(Asian_flu_test)
df_hong_kong_flu=read_data_from_file(hong_kong_flu_test)
df_pdmh1n1_flu=read_data_from_file(pdmh1n1_flu_test)
df_covid= read_data_from_file(covid_test)
df_cows=read_data_from_file(cows_test)

In [None]:
print(len(df_train))
print(len(df_val))

In [None]:
df_train = remove_duplicates(df_train)
df_val = remove_duplicates(df_val)

print(len(df_train))
print(len(df_val))

In [None]:
df_train=remove_common_sequences(df_train,df_test)
df_train=remove_common_sequences(df_train,df_val)
df_train=remove_common_sequences(df_train,df_Asian_flu)
df_train=remove_common_sequences(df_train,df_hong_kong_flu)
df_train=remove_common_sequences(df_train,df_covid)
df_train=remove_common_sequences(df_train,df_cows)



print(len(df_train))
print(len(df_val))

In [None]:
def get_all_combinations(alphabet, k):
    return [''.join(p) for p in product(alphabet, repeat=k)]

# All the m-mers (not k-mers!) combinations
combos = get_all_combinations(alphabet, m)
combo_index = {mer: i for i, mer in enumerate(combos)}  # Faster lookup

def get_kmers(seq, k):
    return [seq[i:i+k] for i in range(len(seq) - k + 1)]

def get_minimizer(kmer, m):
    """
    Get lex smallest m-mer in both the k-mer and its reverse.
    """
    kmer_rev = kmer[::-1]
    all_mmers = [kmer[i:i+m] for i in range(len(kmer) - m + 1)]
    all_mmers += [kmer_rev[i:i+m] for i in range(len(kmer_rev) - m + 1)]
    return min(all_mmers)

def comp_minimizers(seq, k, m):
    """Return list of minimizers for k-mers in sequence"""
    kmers = get_kmers(seq, k)
    return [get_minimizer(kmer, m) for kmer in kmers]

def get_alphabet_count(col, alphabet):
    """Return count vector of characters in one column of matrix A"""
    counter = Counter(col)
    return np.array([counter.get(char, 0) for char in alphabet])

def get_p_c():
    # Codon-based number of mappings to amino acids
    codon_table = {
        'A': 4, 'C': 2, 'D': 2, 'E': 2, 'F': 2, 'G': 4,
        'H': 2, 'I': 3, 'K': 2, 'L': 6, 'M': 1, 'N': 2,
        'P': 4, 'Q': 2, 'R': 6, 'S': 6, 'T': 4, 'V': 4,
        'W': 1, 'Y': 2
    }
    # Return vector for p(c) aligned with the alphabet
    return np.array([codon_table.get(c, 1) / 61 for c in alphabet])

def comp_pwm(pfm, p_c):
    # Step 1: Add pseudocounts
    pfm = pfm + pseudo_count
    # Step 2: Compute PPM with small constant to avoid division by zero
    ppm = pfm / (np.sum(pfm, axis=0) + 1e-9)
    # Step 3: Compute log-odds PWM
    return np.log2(ppm / p_c[:, np.newaxis])

def comp_mmers_score(mmer, pwm):
    # Score is sum of weights for each character position
    # W("AC") = W['A'][0] + W['C'][1]
    return sum(pwm[char_to_idx[c], i] for i, c in enumerate(mmer))

pfm = np.zeros((alphabet_size, m))
v = np.zeros(len(combos))

def Virus2Vec(S, alphabet, k, m):
    V = np.zeros((len(S), len(combos)), dtype=np.float32)
    for j, seq  in enumerate(tqdm(S, desc="Processing sequences")):
        A = comp_minimizers(seq, k, m)  # List of minimizers (m-mers)

        pfm.fill(0)
        for i in range(m):
            # the chars in pos i in A
            col = [a[i] for a in A]
            # PFM[c][i] = count of character c at position i across all m-mers
            pfm[:, i] = get_alphabet_count(col, alphabet)
        
        p_c = get_p_c()
        pwm = comp_pwm(pfm, p_c)
        
        # Step 4: Score for all Minimizers
        W   = [comp_mmers_score(mmer, pwm) for mmer in A]
        # Step 5
        v.fill(0) # feature vector of size |Σ|^m
        for i in range(len(A)):    # A contains minimizers (m-mers)
            idx = combo_index[A[i]]  # find index of the i-th m-mer
            v[idx] += W[i]  # add the minimizer's score
        V[j, :] = v  # Assign the computed vector directly to row j
    return V

In [None]:
def getXY(df):
    X = Virus2Vec(df['Sequence'], alphabet, k, m)
    Y = np.array((df["Class"].str.lower() != "human").astype(int))
    print(f"X.shape is {X.shape}")
    print(f"y_train.size is {Y.size}")
    return X,Y

In [None]:
X_train,y_train = getXY(df_train)

In [None]:
X_val,y_val = getXY(df_val)

In [None]:
plot_tsne(X_val,y_val)

In [None]:
rf = RandomForestClassifier(n_estimators=100, random_state=42)
rf.fit(X_train, y_train)

In [None]:
preds = RandomForest(X_val,y_val, rf)
con_matrix(y_val,preds)

In [None]:
joblib.dump(rf, 'RF_Virus2vec.pkl')

In [None]:
lr = LogisticRegression(max_iter=1000, random_state=42)
lr.fit(X_train, y_train)

In [None]:
preds = LogisticReg(X_val,y_val, lr)
con_matrix(y_val,preds)

In [None]:
# Save Error
joblib.dump(lr, 'LR_Virus2vec.pkl')

In [None]:
svm = SVC(kernel='rbf',random_state=42)
svm.fit(X_train, y_train)

In [None]:
preds = SVM(X_val,y_val, svm)
con_matrix(y_val,preds)

In [None]:
# Save
joblib.dump(svm, 'SVM_Virus2vec.pkl')