Skip to content

Commit

Permalink
Add LLE kernel option (#293)
Browse files Browse the repository at this point in the history
* Add kernel option and tests

* Update whatsnew.rst

* Update whatsnew.rst

* Apply suggestions from code review

Co-authored-by: Quentin Barthélemy <q.barthelemy@gmail.com>

* lint

* improve whatsnew

* Apply suggestions from code review

Co-authored-by: Quentin Barthélemy <q.barthelemy@gmail.com>

* linting

* add none kernel test

* might needed for flake8

* Update embedding.py

* Delete .idea/workspace.xml

* Update test_embedding.py

* Update test_embedding.py

* Apply suggestions from code review

Co-authored-by: Quentin Barthélemy <q.barthelemy@gmail.com>

* change error

* linting

---------

Co-authored-by: Quentin Barthélemy <q.barthelemy@gmail.com>
  • Loading branch information
gabelstein and qbarthelemy committed Apr 15, 2024
1 parent c1675fa commit 6e07fae
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 51 deletions.
1 change: 1 addition & 0 deletions doc/whatsnew.rst
Expand Up @@ -10,6 +10,7 @@ A catalog of new features, improvements, and bug-fixes in each release.
v0.7.dev
--------

- Add kernel option to :class:`pyriemann.embedding.LocallyLinearEmbedding`. :pr:`293` by :user:`gabelstein`

v0.6 (April 2024)
-----------------
Expand Down
82 changes: 61 additions & 21 deletions pyriemann/embedding.py
@@ -1,14 +1,15 @@
"""Embedding SPD matrices via manifold learning techniques."""

import warnings

import numpy as np
from scipy.linalg import solve, eigh
from scipy.sparse import csr_matrix

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.manifold import spectral_embedding

from .utils.kernel import kernel
from .utils.distance import pairwise_distance
from .utils.kernel import kernel as kernel_fct


class SpectralEmbedding(BaseEstimator):
Expand Down Expand Up @@ -133,12 +134,20 @@ class LocallyLinearEmbedding(BaseEstimator, TransformerMixin):
Parameters
----------
n_components : int, default=2
n_components : int | None, default=2
Dimensionality of projected space.
n_neighbors : int, default=5
Number of neighbors for reconstruction of each point.
If None, ``n_components`` is set to ``n_matrices - 1``.
n_neighbors : int | None, default=5
Number of neighbors for reconstruction of each matrix.
If None, all available matrices are used.
If ``n_neighbors > n_matrices``, ``n_neighbors`` is set to
``n_matrices - 1``.
metric : {"euclid", "logeuclid", "riemann"}, default: "riemann"
Metric used for KNN and Kernel estimation.
kernel : callable | None, default=None
Kernel function to use for the embedding. If None, the canonical
kernel specified by the metric is used. Must be a function that
takes the arguments (X, Cref, metric).
reg : float, default=1e-3
Regularization parameter.
Expand Down Expand Up @@ -169,12 +178,17 @@ class LocallyLinearEmbedding(BaseEstimator, TransformerMixin):
Pattern Recognition
"""

def __init__(self, n_components=2, n_neighbors=5, metric="riemann",
reg=1e-3):
def __init__(self, n_components=2,
n_neighbors=5,
metric="riemann",
kernel=None,
reg=1e-3
):
self.n_components = n_components
self.n_neighbors = n_neighbors
self.metric = metric
self.reg = reg
self.kernel = kernel

def fit(self, X, y=None):
"""Fit the model from data in X.
Expand All @@ -193,7 +207,7 @@ def fit(self, X, y=None):
"""
self.data_ = X
_check_dimensions(
self.n_components, self.n_neighbors = _check_dimensions(
X,
n_components=self.n_components,
n_neighbors=self.n_neighbors,
Expand All @@ -205,6 +219,7 @@ def fit(self, X, y=None):
n_neighbors=self.n_neighbors,
metric=self.metric,
reg=self.reg,
kernel=self.kernel
)

self.embedding_, self.error_ = embd, err
Expand Down Expand Up @@ -240,6 +255,7 @@ def transform(self, X, y=None):
ind,
metric=self.metric,
reg=self.reg,
kernel=self.kernel
)

X_new = np.empty((X.shape[0], self.n_components))
Expand All @@ -266,7 +282,7 @@ def fit_transform(self, X, y=None):
return self.embedding_


def barycenter_weights(X, Y, indices, metric="riemann", reg=1e-3):
def barycenter_weights(X, Y, indices, metric="riemann", kernel=None, reg=1e-3):
"""Compute Riemannian barycenter weights of X from Y along the first axis.
Estimates the weights to assign to each point in Y[indices] to recover
Expand All @@ -282,9 +298,13 @@ def barycenter_weights(X, Y, indices, metric="riemann", reg=1e-3):
Indices of the points in Y used to compute the barycenter
metric : {"euclid", "logeuclid", "riemann"}, default="riemann"
Kernel metric.
kernel : callable | None, default=None
Kernel function to use for the embedding. If None, the canonical
kernel specified by the metric is used. Must be a function that
takes the arguments (X, Cref, metric).
reg : float, default=1e-3
Amount of regularization to add for the problem to be
well-posed in the case of n_neighbors > n_channels.
well-posed in the case of ``n_neighbors > n_channels``.
Returns
-------
Expand All @@ -299,7 +319,8 @@ def barycenter_weights(X, Y, indices, metric="riemann", reg=1e-3):
msg = f"Number of index-sets in indices (is {n_matrices}) must match " \
f"number of matrices in X (is {X.shape[0]})."
assert X.shape[0] == n_matrices, msg

if kernel is None:
kernel = kernel_fct
B = np.empty((n_matrices, n_neighbors), dtype=X.dtype)
v = np.ones(n_neighbors, dtype=X.dtype)

Expand All @@ -322,6 +343,7 @@ def locally_linear_embedding(X,
n_components=2,
n_neighbors=5,
metric="riemann",
kernel=None,
reg=1e-3):
"""Perform a Locally Linear Embedding (LLE) of SPD matrices.
Expand All @@ -341,10 +363,18 @@ def locally_linear_embedding(X,
Set of SPD matrices.
n_components : int, default=2
Dimensionality of projected space.
If None, ``n_components`` is set to ``n_matrices - 1``.
n_neighbors : int, default=5
Number of neighbors for reconstruction of each point.
If None, all available matrices are used.
If ``n_neighbors > n_matrices``, ``n_neighbors`` is set to
``n_matrices - 1``.
metric : {"euclid", "logeuclid", "riemann"}, default: "riemann"
Metric used for KNN and Kernel estimation.
kernel : callable | None, default=None
Kernel function to use for the embedding. If None, the canonical
kernel specified by the metric is used. Must be a function that
takes the arguments (X, Cref, metric).
reg : float, default=1e-3
Regularization parameter.
Expand Down Expand Up @@ -372,7 +402,10 @@ def locally_linear_embedding(X,
neighbors = np.array([np.argsort(dist)[1:n_neighbors + 1]
for dist in pairwise_distances])

B = barycenter_weights(X, X, neighbors, metric=metric, reg=reg)
B = barycenter_weights(X, X, neighbors,
metric=metric,
reg=reg,
kernel=kernel)

indptr = np.arange(0, n_matrices * n_neighbors + 1, n_neighbors)
W = csr_matrix(
Expand All @@ -396,18 +429,25 @@ def locally_linear_embedding(X,
def _check_dimensions(X, Y=None, n_components=None, n_neighbors=None):
n_matrices, n_channels, n_channels = X.shape

if not isinstance(Y, type(None)):
if Y is not None and Y.shape[1:] != (n_channels, n_channels):
msg = f"Dimension of matrices in data to be transformed must match " \
f"dimension of data used for fitting. Expected " \
f"{(n_channels, n_channels)}, got {Y.shape[1:]}."
assert Y.shape[1:] == (n_channels, n_channels), msg

if not isinstance(n_neighbors, type(None)):
msg = f"n_neighbors (is {n_neighbors}) must be smaller than " \
f"n_matrices (is {n_matrices})."
assert n_matrices > n_neighbors, msg
raise ValueError(msg)

if not isinstance(n_components, type(None)):
if n_components is None:
n_components = n_matrices - 1
elif n_components >= n_matrices:
msg = f"n_components (is {n_components}) must be smaller than " \
f"n_matrices (is {n_matrices})."
assert n_components < n_matrices, msg
raise ValueError(msg)

if n_neighbors is None:
n_neighbors = n_matrices - 1
elif n_matrices <= n_neighbors:
n_neighbors = n_matrices - 1
warnings.warn(f"n_neighbors (is {n_neighbors}) must be smaller than "
f"n_matrices (is {n_matrices}). Setting n_neighbors to "
f"{n_matrices - 1}.")

return n_components, n_neighbors
77 changes: 47 additions & 30 deletions tests/test_embedding.py
Expand Up @@ -10,7 +10,7 @@
barycenter_weights,
locally_linear_embedding,
)

from pyriemann.utils.kernel import kernel, kernel_functions
rembd = [SpectralEmbedding, LocallyLinearEmbedding]


Expand Down Expand Up @@ -53,7 +53,7 @@ def embd_transform(self, embedding, covmats, n_components):
def embd_transform_error(self, embedding, covmats, n_components):
embd = embedding(n_components=n_components)
embd = embd.fit(covmats)
with pytest.raises(AssertionError):
with pytest.raises(ValueError):
embd.transform(covmats[:-1, :-1, :-1])

def embd_fit_independence(self, embedding, covmats, n_components):
Expand Down Expand Up @@ -82,19 +82,6 @@ def embd_result(self, embedding):
assert score == 1.


@pytest.mark.parametrize("n_components", [2, 4, 100])
@pytest.mark.parametrize("embd", rembd)
def embd_n_comp(n_components, embd, get_mats):
n_matrices, n_channels = 8, 3
covmats = get_mats(n_matrices, n_channels, "spd")
embd = embd(n_components=n_components)
if n_matrices <= n_components:
with pytest.raises(AssertionError):
embd.fit(covmats)
else:
embd.fit(covmats)


@pytest.mark.parametrize("metric", get_metrics())
@pytest.mark.parametrize("eps", [None, 0.1])
def test_spectral_embedding_parameters(metric, eps, get_mats):
Expand All @@ -106,26 +93,34 @@ def test_spectral_embedding_parameters(metric, eps, get_mats):
assert covembd.shape == (n_matrices, n_comp)


@pytest.mark.parametrize("n_components", [2, 4, 100])
@pytest.mark.parametrize("embd", rembd)
def test_embd_n_comp(n_components, embd, get_mats):
n_matrices, n_channels = 8, 3
covmats = get_mats(n_matrices, n_channels, "spd")
embd = embd(n_components=n_components)
if n_matrices <= n_components:
with pytest.raises(ValueError):
embd.fit(covmats)
else:
embd.fit(covmats)


@pytest.mark.parametrize("metric", ['riemann', 'euclid', 'logeuclid'])
@pytest.mark.parametrize("n_neighbors", [2, 4, 8, 16])
@pytest.mark.parametrize("reg", [1e-3, 0])
def test_locally_linear_parameters(metric, n_neighbors, reg, get_mats):
@pytest.mark.parametrize("kernel_fct", [kernel, None])
def test_locally_linear_parameters(metric, n_neighbors, reg, kernel_fct,
get_mats):
"""Test LocallyLinearEmbedding."""
n_matrices, n_channels, n_comp = 6, 3, 2
n_matrices, n_channels, n_components = 6, 3, 2
covmats = get_mats(n_matrices, n_channels, "spd")

if n_matrices <= n_neighbors:
with pytest.raises(AssertionError):
embd = LocallyLinearEmbedding(metric=metric,
n_components=n_comp,
n_neighbors=n_neighbors)
embd.fit(covmats)
else:
embd = LocallyLinearEmbedding(metric=metric,
n_components=n_comp,
n_neighbors=n_neighbors)
covembd = embd.fit_transform(covmats)
assert covembd.shape == (n_matrices, n_comp)
embd = LocallyLinearEmbedding(metric=metric,
n_components=n_components,
n_neighbors=n_neighbors,
kernel=kernel_fct)
covembd = embd.fit_transform(covmats)
assert covembd.shape == (n_matrices, n_components)


def test_barycenter_weights(get_mats):
Expand All @@ -146,3 +141,25 @@ def test_locally_linear_embedding(get_mats):
n_components=n_comps)
assert embedding.shape == (n_matrices, n_comps)
assert isinstance(error, float)


@pytest.mark.parametrize("metric", ['riemann', 'euclid', 'logeuclid'])
def test_locally_linear_none_kernel(metric, get_mats):
"""Test LocallyLinearEmbedding."""
n_matrices, n_channels, n_components = 6, 3, 2
covmats = get_mats(n_matrices, n_channels, "spd")

def kfun(X, Y=None, Cref=None, metric=None):
return kernel_functions[metric](X, Y, Cref=Cref)

embd = LocallyLinearEmbedding(metric=metric,
n_components=n_components,
kernel=kfun)
covembd = embd.fit_transform(covmats)

embd2 = LocallyLinearEmbedding(metric=metric,
n_components=n_components,
kernel=None)
covembd2 = embd2.fit_transform(covmats)

assert np.array_equal(covembd, covembd2)

0 comments on commit 6e07fae

Please sign in to comment.