In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import py_stringmatching as sm

# Solution Proposed

## 1 - Load Data

In [None]:
# Load the dataset
Xtr0 = pd.read_csv('data/Xtr0.csv', index_col=0)
Xtr1 = pd.read_csv('data/Xtr1.csv', index_col=0)
Xtr2 = pd.read_csv('data/Xtr2.csv',  index_col=0)
# Load the matrix representation of the sequences
Xtr0_mat100 = pd.read_csv('data/Xtr0_mat100.csv', header=None, sep=' ').values
Xtr1_mat100 = pd.read_csv('data/Xtr1_mat100.csv', header=None, sep=' ').values
Xtr2_mat100 = pd.read_csv('data/Xtr2_mat100.csv', header=None, sep=' ').values
# Load the labels
Ytr0 = pd.read_csv('data/Ytr0.csv', index_col=0)
Ytr1 = pd.read_csv('data/Ytr1.csv', index_col=0)
Ytr2 = pd.read_csv('data/Ytr2.csv', index_col=0)
# Convert the labels to -1, 1
Ytr0 = 2*Ytr0['Bound'].values - 1
Ytr1 = 2*Ytr1['Bound'].values - 1
Ytr2 = 2*Ytr2['Bound'].values - 1

## 2 - Compute Kernel Matrix

### 2.1 - Gaussian Kernel

In [None]:
# Define the kernel
def lin_kernel(x, y):
    return np.dot(x, y)

def dist_kernel(x, y):
    return np.linalg.norm(x-y)

def exp_kernel(K_dist, sigma=1):
    return np.exp(-K_dist**2/(2*sigma**2))

In [None]:
def compute_distance_matrix(X_left, X_right):
    # Initialize the distance matrix
    K_dist = np.zeros((X_left.shape[0], X_right.shape[0]))

    # Compute the distance matrix
    for i in tqdm(range(X_left.shape[0]), desc="Computing distance matrix"):
        for j in range(X_right.shape[0]):
            K_dist[i, j] = dist_kernel(X_left[i], X_right[j])
    return K_dist

# Example usage:
K_dist0 = compute_distance_matrix(Xtr0_mat100, Xtr0_mat100)
K_dist1 = compute_distance_matrix(Xtr1_mat100, Xtr1_mat100)
K_dist2 = compute_distance_matrix(Xtr2_mat100, Xtr2_mat100)

print(K_dist0.mean(), K_dist1.mean(), K_dist2.mean())
print(K_dist0.std(), K_dist1.std(), K_dist2.std())

In [None]:
K_exp_tr0 = exp_kernel(K_dist0, sigma=0.12)
K_exp_tr1 = exp_kernel(K_dist1, sigma=0.13)
K_exp_tr2 = exp_kernel(K_dist2, sigma=0.15)

print(K_exp_tr0.mean(), K_exp_tr1.mean(), K_exp_tr2.mean())
print(K_exp_tr0.std(), K_exp_tr1.std(), K_exp_tr2.std())

### 2.2 - Smith-Waterman Local Alignment Score

Smith-Waterman Local Alignment Score  $SW$ becomes a Kernel if we take  $\widetilde{SW} = SW -\lambda_{min}(SW)I$

In [None]:
def compute_sw_matrix(X_left, X_right, sw):
    SW_matrix = np.zeros((X_left.shape[0], X_right.shape[0]))
    #Symmetric case (X_left = X_right) : only compute the upper triangular part
    if X_left is X_right:
        for i in tqdm(range(X_left.shape[0]), desc=f"Computing SW matrix for {X_left.shape[0]} sequences"):
            for j in range(i, X_right.shape[0]):
                SW_matrix[i, j] = sw.get_raw_score(X_left['seq'].iloc[i], X_right['seq'].iloc[j])
                SW_matrix[j, i] = SW_matrix[i, j]
        return SW_matrix
    #Non-symmetric case : compute the whole matrix
    for i in tqdm(range(X_left.shape[0]), desc=f"Computing SW matrix for {X_left.shape[0]} sequences"):
        for j in range(X_right.shape[0]):
            SW_matrix[i, j] = sw.get_raw_score(X_left['seq'].iloc[i], X_right['seq'].iloc[j])
    return SW_matrix

# Compute SW matrices for the three datasets
SW_tr0 = compute_sw_matrix(Xtr0, Xtr0, sm.SmithWaterman())
SW_tr1 = compute_sw_matrix(Xtr1, Xtr1, sm.SmithWaterman())
SW_tr2 = compute_sw_matrix(Xtr2, Xtr2, sm.SmithWaterman())

print(SW_tr0.mean(), SW_tr1.mean(), SW_tr2.mean())
print(SW_tr0.std(), SW_tr1.std(), SW_tr2.std())

In [None]:
np.save('SW_tr0', SW_tr0)
np.save('SW_tr1', SW_tr1)
np.save('SW_tr2', SW_tr2)

### 2.3 - Local Alignment Kernel  : <span style="color:green">TODO / Time Complexity too high + value too high</span>

The Local Alignment Kernel defined as:
$$K_{LA}^{(\beta)}(x,y) = \sum_{\pi\in\Pi(x,y)} s_{S,g}(\pi)$$

is symmetric positive definite.

We assume an affine gap penalty:
$$\left\{\begin{aligned}
&g(0) = 0 \\
&g(n) = d + e(n-1) \quad \text{for } n>0
\end{aligned}\right.$$

where $l(\pi)$ is the length of the alignment $\pi$.

We use the formula for the Local Alignment Kernel:
$$K_{LA}^{(\beta)}(x,y) = 1 + X_2(|x|,|y|)+ Y_2(|x|,|y|) + M(|x|,|y|)$$ 
where $X_2$, $Y_2$ and $M$ are defined recursively.

In [None]:
# Define the similarity function S (example: identity function)
def S(a, b):
    return 1 if a == b else 0

def LA_Kernel(x, y, beta, d, e):
    # Lengths of the sequences
    len_x = len(x)
    len_y = len(y)
    # Initialize matrices
    M = np.zeros((len_x + 1, len_y + 1))
    X = np.zeros((len_x + 1, len_y + 1))
    Y = np.zeros((len_x + 1, len_y + 1))
    X2 = np.zeros((len_x + 1, len_y + 1))
    Y2 = np.zeros((len_x + 1, len_y + 1))
    # Fill the matrices using the recursive equations
    for i in range(1, len_x + 1):
        for j in range(1, len_y + 1):
            # Compute M(i,j)
            M[i][j] = np.exp(beta * S(x[i-1], y[j-1])) * (1 + X[i-1][j-1] + Y[i-1][j-1] + M[i-1][j-1])
            # Compute X(i,j)
            X[i][j] = np.exp(beta * d) * M[i-1][j] + np.exp(beta * e) * X[i-1][j]            
            # Compute Y(i,j)
            Y[i][j] = np.exp(beta * d) * (M[i][j-1] + X[i][j-1]) + np.exp(beta * e) * Y[i][j-1]
            # Compute X2(i,j)
            X2[i][j] = M[i-1][j] + X2[i-1][j]            
            # Compute Y2(i,j)
            Y2[i][j] = M[i][j-1] + X2[i][j-1] + Y2[i][j-1]
    # Compute the LA kernel
    K_LA = 1 + X2[len_x][len_y] + Y2[len_x][len_y] + M[len_x][len_y]
    return K_LA

In [None]:
beta = .05
d = 11.0
e = 1.0

In [None]:
LA_tr0 = np.zeros((Xtr0_mat100.shape[0], Xtr0_mat100.shape[0]))

for i in tqdm(range(Xtr0_mat100.shape[0]), desc="Computing LA_tr0"):
    for j in range(i, Xtr0_mat100.shape[0]):
        LA_tr0[i, j] = np.log(LA_Kernel(Xtr0['seq'][i], Xtr0['seq'][j], beta, d, e)) / beta
        if i != j:
            LA_tr0[j, i] = LA_tr0[i, j]

## 3 - Optimizer Function (Only Euler Method)

In [None]:
# Define the loss function using the precomputed kernel matrix K
def loss(Y, m_t, alpha, lambd):
    n = Y.shape[0]
    loss = np.sum(np.log(1 + np.exp(-Y * m_t)))
    return lambd * np.dot(alpha, m_t) / 2 + loss / n

# Define the sigmoid function
def sigmoid(u):
    return 1/(1 + np.exp(-u))

def W_func(Y, m_t):
    n = Y.shape[0]
    W = np.zeros(n)
    W = sigmoid(m_t) * sigmoid(-m_t)
    return np.diag(W)

def P_func(Y, m_t):
    n = Y.shape[0]
    P = -sigmoid(-Y * m_t)
    return np.diag(P)

def z_func(Y, m_t):
    return m_t + Y / sigmoid(Y * m_t)

def quadratic_loss(K, Y, alpha_t, lambd, alpha):
    n = K.shape[0]
    m_t = np.dot(K, alpha_t)
    W = W_func(Y, m_t)
    P = P_func(Y, m_t)
    z = z_func(Y, m_t)
    return lambd * np.dot(alpha, np.dot(K, alpha)) + (K @ alpha - z).T @ W @ (K @ alpha - z) / n

In [None]:
# Solve KLR by IRLS on the quadratic loss
def IRLS(K, Y, alpha_0, lambd, tol=1e-6, max_iter=100, Y_test=None, K_test=None):
    n = K.shape[0]
    alpha_t = alpha_0.copy()
    criterion = np.inf
    for i in tqdm(range(max_iter), desc="IRLS Progress"):
        m_t = np.dot(K, alpha_t)
        accuracy = np.mean(np.sign(m_t) == Y)
        if Y_test is not None and K_test is not None:
            test_accuracy = np.mean(np.sign(np.dot(K_test, alpha_t)) == Y_test)
            print(f"Iteration {i}: Loss - {loss(Y, m_t, alpha_t, lambd):.3f}, Convergence Criterion - : {criterion}, \n Train Accuracy - {accuracy:.2f}, Test Accuracy - {test_accuracy}")
        else:
            print(f"Iteration {i}: Loss - {loss(Y, m_t, alpha_t, lambd):.3f}, Convergence Criterion - : {criterion}, Train Accuracy - {accuracy}")
        W_t = W_func(Y, m_t)
        z_t = z_func(Y, m_t)
        W_t_sqrt = np.sqrt(W_t)
        A = (W_t_sqrt @ K @ W_t_sqrt + lambd * n * np.eye(n)) @ np.diag(1 / np.diag(W_t_sqrt))
        B = W_t_sqrt @ z_t
        alpha_new = np.linalg.solve(A, B)
        criterion = np.linalg.norm(alpha_new - alpha_t)
        if criterion < tol:
            accuracy = np.mean(np.sign(np.dot(K, alpha_new)) == Y)
            if Y_test is not None and K_test is not None:
                test_accuracy = np.mean(np.sign(np.dot(K_test, alpha_new)) == Y_test)
                print(f"END : Iteration {i}: Loss - {loss(Y, m_t, alpha_t, lambd):.3f}, Convergence Criterion - : {criterion}, \n Train Accuracy - {accuracy:.2f}, Test Accuracy - {test_accuracy}")
            else:
                print(f"END : Iteration {i}: Loss - {loss(Y, m_t, alpha_t, lambd):.3f}, Convergence Criterion - : {criterion}, Train Accuracy - {accuracy}")
            break
        alpha_t = alpha_new
        print(f'Alpha norm : {np.linalg.norm(alpha_t):.2f}')
    return alpha_t

## 4 - Run Kernel Method on Train Data

In [None]:
# Load the precomputed Smith-Waterman matrices
SW_tr0 = np.load('SW_tr0.npy')
SW_tr1 = np.load('SW_tr1.npy')
SW_tr2 = np.load('SW_tr2.npy')

# Function to shift the matrix to make it positive definite if the smallest eigenvalue is negative
def make_positive_definite(SW_matrix):
    eigenvalues, _ = np.linalg.eigh(SW_matrix)
    min_eigenvalue = np.min(eigenvalues)
    if min_eigenvalue < 0:
        print(f"Matrix is not positive definite, shifting by {-min_eigenvalue + 1e-6}")
        SW_matrix += np.eye(SW_matrix.shape[0]) * (-min_eigenvalue + 1e-6)
    return SW_matrix

# Apply the function to each dataset
SW_tr0 = make_positive_definite(SW_tr0)
SW_tr1 = make_positive_definite(SW_tr1)
SW_tr2 = make_positive_definite(SW_tr2)

In [None]:
n_train = 1999
s = 0.5 # Scaling factor for the Kernel matrices SW

def split_kernel_matrix_random(K_exp, n_train):
    indices = np.random.choice(K_exp.shape[0], n_train, replace=False)
    K_train = K_exp[indices][:, indices]
    K_test = np.delete(K_exp, indices, axis=0)[:, indices]
    return K_train, K_test, indices

# Split the datasets into training and testing sets
K_tr0_train, K_tr0_test, indices0 = split_kernel_matrix_random(SW_tr0*s, n_train)
K_tr1_train, K_tr1_test, indices1 = split_kernel_matrix_random(SW_tr1*s, n_train)
K_tr2_train, K_tr2_test, indices2 = split_kernel_matrix_random(K_exp_tr2, n_train)

Ytr0_train, Ytr0_test = Ytr0[indices0], np.delete(Ytr0, indices0)
Ytr1_train, Ytr1_test = Ytr1[indices1], np.delete(Ytr1, indices1)
Ytr2_train, Ytr2_test = Ytr2[indices2], np.delete(Ytr2, indices2)

In [None]:
lambd_0 = 0.2
alpha_0_0 = np.zeros(K_tr0_train.shape[0])
# Run the IRLS algorithm with the new initialization for the first dataset
alpha_0_0 = IRLS(K_tr0_train, Ytr0_train, alpha_0_0, lambd_0, tol=1e-5, max_iter=10, 
                 Y_test=Ytr0_test, K_test=K_tr0_test)

In [None]:
lambd_1 = 0.1
alpha_0_1 = np.zeros(K_tr1_train.shape[0])
# Run the IRLS algorithm with the new initialization for the second dataset
alpha_0_1 = IRLS(K_tr1_train, Ytr1_train, alpha_0_1, lambd_1, tol=1e-5, max_iter=10, 
                 Y_test=Ytr1_test, K_test=K_tr1_test)

In [None]:
lambd_2 = 0.0005
alpha_0_2 = np.zeros(K_tr2_train.shape[0])
# Run the IRLS algorithm with the new initialization for the third dataset
alpha_0_2 = IRLS(K_tr2_train, Ytr2_train, alpha_0_2, lambd_2, tol=1e-5, max_iter=10, 
                 Y_test=Ytr2_test, K_test=K_tr2_test)

In [None]:
def calculate_accuracy(K_train, K_test, Y_train, Y_test, alpha):
    train_predictions = np.sign(K_train @ alpha)
    train_accuracy = np.mean(train_predictions == Y_train) * 100
    print(f"Train Accuracy: {train_accuracy:.2f}%")

    test_predictions = np.sign(K_test @ alpha)
    test_accuracy = np.mean(test_predictions == Y_test) * 100
    print(f"Test Accuracy: {test_accuracy:.2f}%")

    print(f"Alpha's norm: {np.linalg.norm(alpha):.2f}")
    return train_accuracy, test_accuracy

# Calculate accuracy for the first dataset
print("-----First dataset-----")
_, a0 = calculate_accuracy(K_tr0_train, K_tr0_test, Ytr0_train, Ytr0_test, alpha_0_0)
# Calculate accuracy for the second dataset
print("-----Second dataset-----")
_, a1 = calculate_accuracy(K_tr1_train, K_tr1_test, Ytr1_train, Ytr1_test, alpha_0_1)
# Calculate accuracy for the third dataset
print("-----Third dataset-----")
_, a2 = calculate_accuracy(K_tr2_train, K_tr2_test, Ytr2_train, Ytr2_test, alpha_0_2)

# Mean accuracy
mean_accuracy = (a0 + a1 + a2) / 3
print(f"Mean accuracy: {mean_accuracy:.2f}%")

## 5 - Apply Kernel Predictor on Test Data

In [None]:
# Load the matrix representation of the sequences
Xte0_mat100 = pd.read_csv('data/Xte0_mat100.csv', header=None, sep=' ').values
Xte1_mat100 = pd.read_csv('data/Xte1_mat100.csv', header=None, sep=' ').values
Xte2_mat100 = pd.read_csv('data/Xte2_mat100.csv', header=None, sep=' ').values
Xte0 = pd.read_csv('data/Xte0.csv', index_col=0)
Xte1 = pd.read_csv('data/Xte1.csv', index_col=0)
Xte2 = pd.read_csv('data/Xte2.csv', index_col=0)

In [None]:
# Compute the Kernels for the test set
SW_te0 = compute_sw_matrix(Xte0, Xtr0.iloc[:n_train], sm.SmithWaterman())
SW_te1 = compute_sw_matrix(Xte1, Xtr1.iloc[:n_train], sm.SmithWaterman())
K_dist_te2 = compute_distance_matrix(Xte2_mat100, Xtr2_mat100[:n_train,:])
K_exp_te2 = exp_kernel(K_dist_te2, sigma=0.15)

K_te0 = SW_te0[:,:n_train] * s
K_te1 = SW_te1[:,:n_train] * s
K_te2 = K_exp_te2[:,:n_train]

In [None]:
# Predictions
Yte0 = np.sign(K_te0 @ alpha_0_0)
Yte1 = np.sign(K_te1 @ alpha_0_1)
Yte2 = np.sign(K_te2 @ alpha_0_2)

In [None]:
# Concatenate and add Id column
Yte = np.concatenate([Yte0, Yte1, Yte2])
Yte = (Yte + 1) // 2
Yte = pd.DataFrame(data=Yte, columns=['Bound'], dtype='int64')
Yte.insert(0, 'Id', Yte.index)
# Save the predictions
Yte.to_csv('Yte_3.csv', index=False)