Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LLE kernel option #293

Merged
merged 21 commits into from Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/whatsnew.rst
Expand Up @@ -10,6 +10,7 @@ A catalog of new features, improvements, and bug-fixes in each release.
v0.7.dev
--------

- Add kernel option to :class:`pyriemann.embedding.LocallyLinearEmbedding`. :pr:`293` by :user:`gabelstein`

v0.6 (April 2024)
-----------------
Expand Down
78 changes: 59 additions & 19 deletions pyriemann/embedding.py
@@ -1,14 +1,15 @@
"""Embedding SPD matrices via manifold learning techniques."""

import warnings

import numpy as np
from scipy.linalg import solve, eigh
from scipy.sparse import csr_matrix

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.manifold import spectral_embedding

from .utils.kernel import kernel
from .utils.distance import pairwise_distance
from .utils.kernel import kernel as kernel_fct


class SpectralEmbedding(BaseEstimator):
Expand Down Expand Up @@ -133,12 +134,20 @@ class LocallyLinearEmbedding(BaseEstimator, TransformerMixin):

Parameters
----------
n_components : int, default=2
n_components : int | None, default=2
Dimensionality of projected space.
n_neighbors : int, default=5
Number of neighbors for reconstruction of each point.
If None, ``n_components`` is set to ``n_matrices - 1``.
n_neighbors : int | None, default=5
Number of neighbors for reconstruction of each matrix.
If None, all available matrices are used.
If ``n_neighbors > n_matrices``, ``n_neighbors`` is set to
``n_matrices - 1``.
metric : {"euclid", "logeuclid", "riemann"}, default: "riemann"
Metric used for KNN and Kernel estimation.
kernel : callable | None, default=None
Kernel function to use for the embedding. If None, the canonical
kernel specified by the metric is used. Must be a function that
takes the arguments (X, Cref, metric).
reg : float, default=1e-3
Regularization parameter.

Expand Down Expand Up @@ -169,12 +178,17 @@ class LocallyLinearEmbedding(BaseEstimator, TransformerMixin):
Pattern Recognition
"""

def __init__(self, n_components=2, n_neighbors=5, metric="riemann",
reg=1e-3):
def __init__(self, n_components=2,
n_neighbors=5,
metric="riemann",
kernel=None,
reg=1e-3
):
self.n_components = n_components
self.n_neighbors = n_neighbors
self.metric = metric
self.reg = reg
self.kernel = kernel

def fit(self, X, y=None):
"""Fit the model from data in X.
Expand All @@ -193,7 +207,7 @@ def fit(self, X, y=None):

"""
self.data_ = X
_check_dimensions(
self.n_components, self.n_neighbors = _check_dimensions(
X,
n_components=self.n_components,
n_neighbors=self.n_neighbors,
Expand All @@ -205,6 +219,7 @@ def fit(self, X, y=None):
n_neighbors=self.n_neighbors,
metric=self.metric,
reg=self.reg,
kernel=self.kernel
)

self.embedding_, self.error_ = embd, err
Expand Down Expand Up @@ -240,6 +255,7 @@ def transform(self, X, y=None):
ind,
metric=self.metric,
reg=self.reg,
kernel=self.kernel
)

X_new = np.empty((X.shape[0], self.n_components))
Expand All @@ -266,7 +282,7 @@ def fit_transform(self, X, y=None):
return self.embedding_


def barycenter_weights(X, Y, indices, metric="riemann", reg=1e-3):
def barycenter_weights(X, Y, indices, metric="riemann", kernel=None, reg=1e-3):
"""Compute Riemannian barycenter weights of X from Y along the first axis.

Estimates the weights to assign to each point in Y[indices] to recover
Expand All @@ -282,9 +298,13 @@ def barycenter_weights(X, Y, indices, metric="riemann", reg=1e-3):
Indices of the points in Y used to compute the barycenter
metric : {"euclid", "logeuclid", "riemann"}, default="riemann"
Kernel metric.
kernel : callable | None, default=None
Kernel function to use for the embedding. If None, the canonical
kernel specified by the metric is used. Must be a function that
takes the arguments (X, Cref, metric).
reg : float, default=1e-3
Amount of regularization to add for the problem to be
well-posed in the case of n_neighbors > n_channels.
well-posed in the case of ``n_neighbors > n_channels``.

Returns
-------
Expand All @@ -299,7 +319,8 @@ def barycenter_weights(X, Y, indices, metric="riemann", reg=1e-3):
msg = f"Number of index-sets in indices (is {n_matrices}) must match " \
f"number of matrices in X (is {X.shape[0]})."
assert X.shape[0] == n_matrices, msg

if kernel is None:
kernel = kernel_fct
B = np.empty((n_matrices, n_neighbors), dtype=X.dtype)
v = np.ones(n_neighbors, dtype=X.dtype)

Expand All @@ -322,6 +343,7 @@ def locally_linear_embedding(X,
n_components=2,
n_neighbors=5,
metric="riemann",
kernel=None,
reg=1e-3):
"""Perform a Locally Linear Embedding (LLE) of SPD matrices.

Expand All @@ -341,10 +363,18 @@ def locally_linear_embedding(X,
Set of SPD matrices.
n_components : int, default=2
Dimensionality of projected space.
If None, ``n_components`` is set to ``n_matrices - 1``.
n_neighbors : int, default=5
Number of neighbors for reconstruction of each point.
If None, all available matrices are used.
If ``n_neighbors > n_matrices``, ``n_neighbors`` is set to
``n_matrices - 1``.
metric : {"euclid", "logeuclid", "riemann"}, default: "riemann"
Metric used for KNN and Kernel estimation.
kernel : callable | None, default=None
Kernel function to use for the embedding. If None, the canonical
kernel specified by the metric is used. Must be a function that
takes the arguments (X, Cref, metric).
reg : float, default=1e-3
Regularization parameter.

Expand Down Expand Up @@ -372,7 +402,10 @@ def locally_linear_embedding(X,
neighbors = np.array([np.argsort(dist)[1:n_neighbors + 1]
for dist in pairwise_distances])

B = barycenter_weights(X, X, neighbors, metric=metric, reg=reg)
B = barycenter_weights(X, X, neighbors,
metric=metric,
reg=reg,
kernel=kernel)

indptr = np.arange(0, n_matrices * n_neighbors + 1, n_neighbors)
W = csr_matrix(
Expand All @@ -396,18 +429,25 @@ def locally_linear_embedding(X,
def _check_dimensions(X, Y=None, n_components=None, n_neighbors=None):
n_matrices, n_channels, n_channels = X.shape

if not isinstance(Y, type(None)):
if Y is not None:
msg = f"Dimension of matrices in data to be transformed must match " \
f"dimension of data used for fitting. Expected " \
f"{(n_channels, n_channels)}, got {Y.shape[1:]}."
assert Y.shape[1:] == (n_channels, n_channels), msg
qbarthelemy marked this conversation as resolved.
Show resolved Hide resolved

if not isinstance(n_neighbors, type(None)):
msg = f"n_neighbors (is {n_neighbors}) must be smaller than " \
f"n_matrices (is {n_matrices})."
assert n_matrices > n_neighbors, msg

if not isinstance(n_components, type(None)):
if n_components is None:
n_components = n_matrices - 1
else:
msg = f"n_components (is {n_components}) must be smaller than " \
f"n_matrices (is {n_matrices})."
assert n_components < n_matrices, msg
qbarthelemy marked this conversation as resolved.
Show resolved Hide resolved

if n_neighbors is None:
n_neighbors = n_matrices - 1
elif n_matrices <= n_neighbors:
n_neighbors = n_matrices - 1
warnings.warn(f"n_neighbors (is {n_neighbors}) must be smaller than "
f"n_matrices (is {n_matrices}). Setting n_neighbors to "
f"{n_matrices - 1}.")

return n_components, n_neighbors
50 changes: 35 additions & 15 deletions tests/test_embedding.py
Expand Up @@ -11,6 +11,9 @@
locally_linear_embedding,
)

from pyriemann.utils.kernel import (kernel,
kernel_functions)

gabelstein marked this conversation as resolved.
Show resolved Hide resolved
rembd = [SpectralEmbedding, LocallyLinearEmbedding]


Expand Down Expand Up @@ -109,23 +112,18 @@ def test_spectral_embedding_parameters(metric, eps, get_mats):
@pytest.mark.parametrize("metric", ['riemann', 'euclid', 'logeuclid'])
@pytest.mark.parametrize("n_neighbors", [2, 4, 8, 16])
@pytest.mark.parametrize("reg", [1e-3, 0])
def test_locally_linear_parameters(metric, n_neighbors, reg, get_mats):
@pytest.mark.parametrize("kernel_fct", [kernel, None])
def test_locally_linear_parameters(metric, n_neighbors, reg, kernel_fct,
get_mats):
"""Test LocallyLinearEmbedding."""
n_matrices, n_channels, n_comp = 6, 3, 2
n_matrices, n_channels, n_components = 6, 3, 2
covmats = get_mats(n_matrices, n_channels, "spd")

if n_matrices <= n_neighbors:
with pytest.raises(AssertionError):
embd = LocallyLinearEmbedding(metric=metric,
n_components=n_comp,
n_neighbors=n_neighbors)
embd.fit(covmats)
else:
embd = LocallyLinearEmbedding(metric=metric,
n_components=n_comp,
n_neighbors=n_neighbors)
covembd = embd.fit_transform(covmats)
assert covembd.shape == (n_matrices, n_comp)
embd = LocallyLinearEmbedding(metric=metric,
n_components=n_components,
n_neighbors=n_neighbors,
kernel=kernel_fct)
covembd = embd.fit_transform(covmats)
assert covembd.shape == (n_matrices, n_components)


def test_barycenter_weights(get_mats):
Expand All @@ -146,3 +144,25 @@ def test_locally_linear_embedding(get_mats):
n_components=n_comps)
assert embedding.shape == (n_matrices, n_comps)
assert isinstance(error, float)


@pytest.mark.parametrize("metric", ['riemann', 'euclid', 'logeuclid'])
def test_locally_linear_none_kernel(metric, get_mats):
"""Test LocallyLinearEmbedding."""
n_matrices, n_channels, n_components = 6, 3, 2
covmats = get_mats(n_matrices, n_channels, "spd")

def kfun(X, Y=None, Cref=None, metric=None):
return kernel_functions[metric](X, Y, Cref=Cref)

embd = LocallyLinearEmbedding(metric=metric,
n_components=n_components,
kernel=kfun)
covembd = embd.fit_transform(covmats)

embd2 = LocallyLinearEmbedding(metric=metric,
n_components=n_components,
kernel=None)
covembd2 = embd2.fit_transform(covmats)

assert np.array_equal(covembd, covembd2)