In [25]:
import numpy as cp
import cvxpy as cvp
import warnings
from kernel import RKHSKernel
from solver import solve

warnings.simplefilter(action="ignore", category=FutureWarning)

In [26]:
def calc_accuracy(y: cp.ndarray, phi: cp.ndarray, M_hat: cp.ndarray, T):
    y_hat = []
    for i in range(T):
        a, b, c = i * 3, i * 3 + 1, i * 3 + 2
        distance = (2 * phi[a] - phi[b] - phi[c]).T @ M_hat @ (phi[c] - phi[b])
        y_hat.append(cp.sign(distance))
    y_hat = cp.array(y_hat)
    return cp.mean(y == y_hat)


def triplets_distance(k: cp.ndarray, G: cp.ndarray, T: int, deterministic: bool):

    y_s = []
    distance_s = []

    for i in range(T):
        h, i, j = i * 3, i * 3 + 1, i * 3 + 2
        distance = (2 * k[h] - k[i] - k[j]).T @ G @ (k[j] - k[i])

        if deterministic:
            y_t = cp.sign(distance)
        else:
            p_t = 1 / (1 + cp.exp(distance))
            y_t = -1 if cp.random.rand() < p_t else 1

        y_s.append(y_t)
        distance_s.append(distance)

    return cp.array(y_s), cp.array(distance_s)


def get_triplets(x: cp.ndarray, T: int):

    triplet_s = []
    for i in range(T):
        h, i, j = i * 3, i * 3 + 1, i * 3 + 2
        triplet = [x[h], x[i], x[j]]
        triplet_s.append(triplet)

    return cp.array(triplet_s)

In [27]:
# rank of linear functional
r = 2
# number of triplets
T = 1000
# number of validation triplets
T_val = 1000
# number of validation sets
num_vals = 5
# number of items
n = 3 * T
# number of validation items
n_val = 3 * T_val
# random seed
seed = 0
cp.random.seed(seed)
# dimension of Euclidean space
d = 7

In [28]:
z = cp.random.multivariate_normal(cp.zeros(d), 1 / d * cp.eye(d), size=r)
z.shape

(2, 7)

In [29]:
kernel_1 = RKHSKernel(
    kernel_type="gaussian", sigma=1, eigen_cutoff=20, eigen_threshold=None
)
kernel_2 = RKHSKernel(
    kernel_type="gaussian", sigma=1, eigen_cutoff=20, eigen_threshold=None
)

In [30]:
x = cp.random.multivariate_normal(cp.zeros(d), 1 / d * cp.eye(d), size=n)
x_val_s = [
    cp.random.multivariate_normal(cp.zeros(d), 1 / d * cp.eye(d), size=n_val)
    for _ in range(num_vals)
]

In [31]:
k = kernel_1.gram_matrix(x, z)
k_val_s = [kernel_1.gram_matrix(x_val, z) for x_val in x_val_s]

Computing the gram matrix
Computing the gram matrix
Computing the gram matrix
Computing the gram matrix
Computing the gram matrix
Computing the gram matrix


In [32]:
G = cp.random.normal(0, 1, size=(r, r))
G = G @ G.T

In [33]:
y_s, distance_s = triplets_distance(k, G, T, deterministic=False)
y_val_s, distance_val_s = list(
    zip(*[triplets_distance(k_val, G, T_val, deterministic=True) for k_val in k_val_s])
)

In [34]:
phi, phi_val_s = kernel_2.kpca(x, x_val_s)

Computing the gram matrix
Centering the gram matrix
Computing the eigenvectors
Forming the matrix A
Projecting the data
Projecting the validation data
Computing the gram matrix
Computing the gram matrix
Computing the gram matrix
Computing the gram matrix
Computing the gram matrix


In [35]:
triplet_s = get_triplets(phi, T)
triplet_val_s = [get_triplets(phi_val, T_val) for phi_val in phi_val_s]

In [36]:
problem, M_hat = solve(
    p=phi.shape[1],
    triplets=triplet_s,
    y_true=y_s,
    gamma=float(cp.max(cp.abs(distance_s))),
    lambda_=r,
    loss_type="logistic",
    constraint_type="nuc",
    solver="MOSEK",
    verbose=False,
)
M_hat = cp.array(M_hat)

Failure:interrupted


SolverError: Solver 'SCS' failed. Try another solver, or solve with verbose=True for more information.

In [None]:
train_accuracy = calc_accuracy(y_s, phi, M_hat, T)
validation_accurices = cp.array(
    [
        calc_accuracy(y_val, phi_val, M_hat, T_val)
        for phi_val, y_val in zip(phi_val_s, y_val_s)
    ]
)
print(f"Train accuracy: {train_accuracy:.3f}")
print(
    f"Validation accuracies: {validation_accurices.mean():.3f} +/- {validation_accurices.std():.3f}"
)