Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LLE kernel option #293

Merged
merged 21 commits into from Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/whatsnew.rst
Expand Up @@ -10,6 +10,7 @@ A catalog of new features, improvements, and bug-fixes in each release.
v0.7.dev
--------

- Add kernel option to :class:`pyriemann.embedding.LocallyLinearEmbedding`. :pr:`293` by :user:`gabelstein`

v0.6 (April 2024)
-----------------
Expand Down
82 changes: 61 additions & 21 deletions pyriemann/embedding.py
@@ -1,14 +1,15 @@
"""Embedding SPD matrices via manifold learning techniques."""

import warnings

import numpy as np
from scipy.linalg import solve, eigh
from scipy.sparse import csr_matrix

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.manifold import spectral_embedding

from .utils.kernel import kernel
from .utils.distance import pairwise_distance
from .utils.kernel import kernel as kernel_fct


class SpectralEmbedding(BaseEstimator):
Expand Down Expand Up @@ -133,12 +134,20 @@ class LocallyLinearEmbedding(BaseEstimator, TransformerMixin):

Parameters
----------
n_components : int, default=2
n_components : int | None, default=2
Dimensionality of projected space.
n_neighbors : int, default=5
Number of neighbors for reconstruction of each point.
If None, ``n_components`` is set to ``n_matrices - 1``.
n_neighbors : int | None, default=5
Number of neighbors for reconstruction of each matrix.
If None, all available matrices are used.
If ``n_neighbors > n_matrices``, ``n_neighbors`` is set to
``n_matrices - 1``.
metric : {"euclid", "logeuclid", "riemann"}, default: "riemann"
Metric used for KNN and Kernel estimation.
kernel : callable | None, default=None
Kernel function to use for the embedding. If None, the canonical
kernel specified by the metric is used. Must be a function that
takes the arguments (X, Cref, metric).
reg : float, default=1e-3
Regularization parameter.

Expand Down Expand Up @@ -169,12 +178,17 @@ class LocallyLinearEmbedding(BaseEstimator, TransformerMixin):
Pattern Recognition
"""

def __init__(self, n_components=2, n_neighbors=5, metric="riemann",
reg=1e-3):
def __init__(self, n_components=2,
n_neighbors=5,
metric="riemann",
kernel=None,
reg=1e-3
):
self.n_components = n_components
self.n_neighbors = n_neighbors
self.metric = metric
self.reg = reg
self.kernel = kernel

def fit(self, X, y=None):
"""Fit the model from data in X.
Expand All @@ -193,7 +207,7 @@ def fit(self, X, y=None):

"""
self.data_ = X
_check_dimensions(
self.n_components, self.n_neighbors = _check_dimensions(
X,
n_components=self.n_components,
n_neighbors=self.n_neighbors,
Expand All @@ -205,6 +219,7 @@ def fit(self, X, y=None):
n_neighbors=self.n_neighbors,
metric=self.metric,
reg=self.reg,
kernel=self.kernel
)

self.embedding_, self.error_ = embd, err
Expand Down Expand Up @@ -240,6 +255,7 @@ def transform(self, X, y=None):
ind,
metric=self.metric,
reg=self.reg,
kernel=self.kernel
)

X_new = np.empty((X.shape[0], self.n_components))
Expand All @@ -266,7 +282,7 @@ def fit_transform(self, X, y=None):
return self.embedding_


def barycenter_weights(X, Y, indices, metric="riemann", reg=1e-3):
def barycenter_weights(X, Y, indices, metric="riemann", kernel=None, reg=1e-3):
"""Compute Riemannian barycenter weights of X from Y along the first axis.

Estimates the weights to assign to each point in Y[indices] to recover
Expand All @@ -282,9 +298,13 @@ def barycenter_weights(X, Y, indices, metric="riemann", reg=1e-3):
Indices of the points in Y used to compute the barycenter
metric : {"euclid", "logeuclid", "riemann"}, default="riemann"
Kernel metric.
kernel : callable | None, default=None
Kernel function to use for the embedding. If None, the canonical
kernel specified by the metric is used. Must be a function that
takes the arguments (X, Cref, metric).
reg : float, default=1e-3
Amount of regularization to add for the problem to be
well-posed in the case of n_neighbors > n_channels.
well-posed in the case of ``n_neighbors > n_channels``.

Returns
-------
Expand All @@ -299,7 +319,8 @@ def barycenter_weights(X, Y, indices, metric="riemann", reg=1e-3):
msg = f"Number of index-sets in indices (is {n_matrices}) must match " \
f"number of matrices in X (is {X.shape[0]})."
assert X.shape[0] == n_matrices, msg

if kernel is None:
kernel = kernel_fct
B = np.empty((n_matrices, n_neighbors), dtype=X.dtype)
v = np.ones(n_neighbors, dtype=X.dtype)

Expand All @@ -322,6 +343,7 @@ def locally_linear_embedding(X,
n_components=2,
n_neighbors=5,
metric="riemann",
kernel=None,
reg=1e-3):
"""Perform a Locally Linear Embedding (LLE) of SPD matrices.

Expand All @@ -341,10 +363,18 @@ def locally_linear_embedding(X,
Set of SPD matrices.
n_components : int, default=2
Dimensionality of projected space.
If None, ``n_components`` is set to ``n_matrices - 1``.
n_neighbors : int, default=5
Number of neighbors for reconstruction of each point.
If None, all available matrices are used.
If ``n_neighbors > n_matrices``, ``n_neighbors`` is set to
``n_matrices - 1``.
metric : {"euclid", "logeuclid", "riemann"}, default: "riemann"
Metric used for KNN and Kernel estimation.
kernel : callable | None, default=None
Kernel function to use for the embedding. If None, the canonical
kernel specified by the metric is used. Must be a function that
takes the arguments (X, Cref, metric).
reg : float, default=1e-3
Regularization parameter.

Expand Down Expand Up @@ -372,7 +402,10 @@ def locally_linear_embedding(X,
neighbors = np.array([np.argsort(dist)[1:n_neighbors + 1]
for dist in pairwise_distances])

B = barycenter_weights(X, X, neighbors, metric=metric, reg=reg)
B = barycenter_weights(X, X, neighbors,
metric=metric,
reg=reg,
kernel=kernel)

indptr = np.arange(0, n_matrices * n_neighbors + 1, n_neighbors)
W = csr_matrix(
Expand All @@ -396,18 +429,25 @@ def locally_linear_embedding(X,
def _check_dimensions(X, Y=None, n_components=None, n_neighbors=None):
n_matrices, n_channels, n_channels = X.shape

if not isinstance(Y, type(None)):
if Y is not None and Y.shape[1:] != (n_channels, n_channels):
msg = f"Dimension of matrices in data to be transformed must match " \
f"dimension of data used for fitting. Expected " \
f"{(n_channels, n_channels)}, got {Y.shape[1:]}."
assert Y.shape[1:] == (n_channels, n_channels), msg

if not isinstance(n_neighbors, type(None)):
msg = f"n_neighbors (is {n_neighbors}) must be smaller than " \
f"n_matrices (is {n_matrices})."
assert n_matrices > n_neighbors, msg
raise ValueError(msg)

if not isinstance(n_components, type(None)):
if n_components is None:
n_components = n_matrices - 1
elif n_components >= n_matrices:
msg = f"n_components (is {n_components}) must be smaller than " \
f"n_matrices (is {n_matrices})."
assert n_components < n_matrices, msg
raise ValueError(msg)

if n_neighbors is None:
n_neighbors = n_matrices - 1
elif n_matrices <= n_neighbors:
n_neighbors = n_matrices - 1
warnings.warn(f"n_neighbors (is {n_neighbors}) must be smaller than "
f"n_matrices (is {n_matrices}). Setting n_neighbors to "
f"{n_matrices - 1}.")

return n_components, n_neighbors
77 changes: 47 additions & 30 deletions tests/test_embedding.py
Expand Up @@ -10,7 +10,7 @@
barycenter_weights,
locally_linear_embedding,
)

from pyriemann.utils.kernel import kernel, kernel_functions
rembd = [SpectralEmbedding, LocallyLinearEmbedding]


Expand Down Expand Up @@ -53,7 +53,7 @@ def embd_transform(self, embedding, covmats, n_components):
def embd_transform_error(self, embedding, covmats, n_components):
embd = embedding(n_components=n_components)
embd = embd.fit(covmats)
with pytest.raises(AssertionError):
with pytest.raises(ValueError):
embd.transform(covmats[:-1, :-1, :-1])

def embd_fit_independence(self, embedding, covmats, n_components):
Expand Down Expand Up @@ -82,19 +82,6 @@ def embd_result(self, embedding):
assert score == 1.


@pytest.mark.parametrize("n_components", [2, 4, 100])
@pytest.mark.parametrize("embd", rembd)
def embd_n_comp(n_components, embd, get_mats):
n_matrices, n_channels = 8, 3
covmats = get_mats(n_matrices, n_channels, "spd")
embd = embd(n_components=n_components)
if n_matrices <= n_components:
with pytest.raises(AssertionError):
embd.fit(covmats)
else:
embd.fit(covmats)


@pytest.mark.parametrize("metric", get_metrics())
@pytest.mark.parametrize("eps", [None, 0.1])
def test_spectral_embedding_parameters(metric, eps, get_mats):
Expand All @@ -106,26 +93,34 @@ def test_spectral_embedding_parameters(metric, eps, get_mats):
assert covembd.shape == (n_matrices, n_comp)


@pytest.mark.parametrize("n_components", [2, 4, 100])
@pytest.mark.parametrize("embd", rembd)
def test_embd_n_comp(n_components, embd, get_mats):
n_matrices, n_channels = 8, 3
covmats = get_mats(n_matrices, n_channels, "spd")
embd = embd(n_components=n_components)
if n_matrices <= n_components:
with pytest.raises(ValueError):
embd.fit(covmats)
else:
embd.fit(covmats)


@pytest.mark.parametrize("metric", ['riemann', 'euclid', 'logeuclid'])
@pytest.mark.parametrize("n_neighbors", [2, 4, 8, 16])
@pytest.mark.parametrize("reg", [1e-3, 0])
def test_locally_linear_parameters(metric, n_neighbors, reg, get_mats):
@pytest.mark.parametrize("kernel_fct", [kernel, None])
def test_locally_linear_parameters(metric, n_neighbors, reg, kernel_fct,
get_mats):
"""Test LocallyLinearEmbedding."""
n_matrices, n_channels, n_comp = 6, 3, 2
n_matrices, n_channels, n_components = 6, 3, 2
covmats = get_mats(n_matrices, n_channels, "spd")

if n_matrices <= n_neighbors:
with pytest.raises(AssertionError):
embd = LocallyLinearEmbedding(metric=metric,
n_components=n_comp,
n_neighbors=n_neighbors)
embd.fit(covmats)
else:
embd = LocallyLinearEmbedding(metric=metric,
n_components=n_comp,
n_neighbors=n_neighbors)
covembd = embd.fit_transform(covmats)
assert covembd.shape == (n_matrices, n_comp)
embd = LocallyLinearEmbedding(metric=metric,
n_components=n_components,
n_neighbors=n_neighbors,
kernel=kernel_fct)
covembd = embd.fit_transform(covmats)
assert covembd.shape == (n_matrices, n_components)


def test_barycenter_weights(get_mats):
Expand All @@ -146,3 +141,25 @@ def test_locally_linear_embedding(get_mats):
n_components=n_comps)
assert embedding.shape == (n_matrices, n_comps)
assert isinstance(error, float)


@pytest.mark.parametrize("metric", ['riemann', 'euclid', 'logeuclid'])
def test_locally_linear_none_kernel(metric, get_mats):
"""Test LocallyLinearEmbedding."""
n_matrices, n_channels, n_components = 6, 3, 2
covmats = get_mats(n_matrices, n_channels, "spd")

def kfun(X, Y=None, Cref=None, metric=None):
return kernel_functions[metric](X, Y, Cref=Cref)

embd = LocallyLinearEmbedding(metric=metric,
n_components=n_components,
kernel=kfun)
covembd = embd.fit_transform(covmats)

embd2 = LocallyLinearEmbedding(metric=metric,
n_components=n_components,
kernel=None)
covembd2 = embd2.fit_transform(covmats)

assert np.array_equal(covembd, covembd2)