From 9e124320684c0d87e0ee3c5dfecbbf85a6dcb0c1 Mon Sep 17 00:00:00 2001 From: Sylvain Chevallier Date: Mon, 20 Sep 2021 21:22:01 +0200 Subject: [PATCH] Refactor tests (#136) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * rewriting base file to get ride of numpy matrices. * last modification to rewrite into full numpy integration * rewriting mean with dotted notation and getting rid of numpy matrices * continuing to rewrite the geodesic * rewriting of the spatial filters * small style correction in channelselection * updating the use of numpy for all higher packages * last rewriting of pyriemann file * adding slack notification * finishing to update numpy ref and removing numpy matrices * covering all functions of classification: TSclassifier, kNN * PEP8 conformation for test clustering * PEP 8 normalisation * fixing last bugs for testing * update test * adding test for Potato * end of modification of test units and coverage improvment. * modifying and adapting travis file. * updating travis file and correcting error in testing wasserstein mean * adding pandas in the requierement for Travis * ajout de la generation de matrice SPD * Correcting the Karcher mean implementation if the gradient descent is initialized on one of the point of the set, generating zero-divide error. * adding ALM to mean computation * adding ref for the parametrized geodesic * update avant merge * uncomplete merge * merge with upstream * use fixture and parametrize for tests, avoid code redundancy, systematic test of dist and metric * correct tangent fit bug Co-authored-by: kjersbry Co-authored-by: sylvchev * add tsupdate check * reduce complexity of the test to make them faster * add flake8 to tests and examples, correct style in tests * switch to triangular inequality to avoid random failure * update coverage * remove redundant tests, add coverage * correct flake8 * correct F401,F811 import errors with fixtures * correct w value * remove assert_array_equal and correct variable names Co-authored-by: Quentin Barthélemy * add is_spsd and get_labels fixture * adding test for Schaefer estimator * update test for spatial filters Co-authored-by: Quentin Barthélemy * correct var name for inverse transform * use get_labels fixture * correct flake8 * add is_psd and is_semi_psd * add independence test for fit/transform * correct zeros -> empty * Schaefer-Strimmer shrinkage covariance estimator (#59) * rewriting base file to get ride of numpy matrices. * last modification to rewrite into full numpy integration * rewriting mean with dotted notation and getting rid of numpy matrices * continuing to rewrite the geodesic * rewriting of the spatial filters * small style correction in channelselection * updating the use of numpy for all higher packages * last rewriting of pyriemann file * adding slack notification * finishing to update numpy ref and removing numpy matrices * covering all functions of classification: TSclassifier, kNN * PEP8 conformation for test clustering * PEP 8 normalisation * fixing last bugs for testing * update test * adding test for Potato * end of modification of test units and coverage improvment. * modifying and adapting travis file. * updating travis file and correcting error in testing wasserstein mean * adding pandas in the requierement for Travis * ajout de la generation de matrice SPD * Correcting the Karcher mean implementation if the gradient descent is initialized on one of the point of the set, generating zero-divide error. * adding ALM to mean computation * adding ref for the parametrized geodesic * update avant merge * adding computation and test of Schaefer-Strimmer covariance estimator * replacing loops with algebra * adding an example * correcting plots * uncomplete merge * pep8 * update test for tangent space * correct docstring and bug * converting notebook into py file * correct style * rewrite plots, reduce computation time * update link and rename variables * correct style * correct plot layout * adding Schaefer-Strimmer in whatsnew * update test, docstring and docs from suggestions Co-authored-by: Quentin Barthélemy Co-authored-by: Alexandre Gramfort * improve tests for covariance * keep the same color for each estimator across figures * correct example text Co-authored-by: Quentin Barthélemy * raise error for covariance_EP and initialize RandomState in example * correct import order * Apply suggestions from code review Co-authored-by: Quentin Barthélemy * sort import, correct spd Co-authored-by: Quentin Barthélemy * switch zeros to np.empty Co-authored-by: Alexandre Gramfort Co-authored-by: Quentin Barthélemy Co-authored-by: Alexandre Gramfort * add doc * rebase on master * correct tangent fit bug Co-authored-by: kjersbry Co-authored-by: sylvchev * add tsupdate check * reduce complexity of the test to make them faster * add flake8 to tests and examples, correct style in tests * switch to triangular inequality to avoid random failure * update coverage * remove redundant tests, add coverage * correct flake8 * correct F401,F811 import errors with fixtures * correct w value * add is_spsd and get_labels fixture * remove assert_array_equal and correct variable names Co-authored-by: Quentin Barthélemy * adding test for Schaefer estimator * correct var name for inverse transform * update test for spatial filters Co-authored-by: Quentin Barthélemy * use get_labels fixture * correct flake8 * add is_psd and is_semi_psd * add independence test for fit/transform * correct zeros -> empty * define is_spd/is_spsd fixtures * clean code * updating whats new Co-authored-by: kjersbry Co-authored-by: Quentin Barthélemy Co-authored-by: Alexandre Gramfort --- .github/workflows/testing.yml | 2 +- doc/whatsnew.rst | 2 + pyriemann/clustering.py | 11 +- pyriemann/spatialfilters.py | 6 +- pyriemann/tangentspace.py | 1 - tests/conftest.py | 176 +++++++++++- tests/test_ajd.py | 87 +++--- tests/test_channelselection.py | 28 +- tests/test_classification.py | 291 ++++++++++++------- tests/test_clustering.py | 301 +++++++++++++------- tests/test_embedding.py | 37 +-- tests/test_estimation.py | 226 +++++++++++---- tests/test_preprocessing.py | 164 +++++------ tests/test_spatialfilters.py | 447 +++++++++++++++++------------- tests/test_stats.py | 40 ++- tests/test_tangentspace.py | 208 +++++++------- tests/test_utils_covariance.py | 111 ++++---- tests/test_utils_distance.py | 172 +++++------- tests/test_utils_geodesic.py | 194 ++++++------- tests/test_utils_mean.py | 52 ++-- tests/test_utils_tangent_space.py | 59 ++-- tests/test_viz.py | 6 +- 22 files changed, 1527 insertions(+), 1094 deletions(-) diff --git a/.github/workflows/testing.yml b/.github/workflows/testing.yml index c3c7e4f3..7350777f 100644 --- a/.github/workflows/testing.yml +++ b/.github/workflows/testing.yml @@ -33,7 +33,7 @@ jobs: python -m pip install .[tests] - name: Lint with flake8 run: | - flake8 examples pyriemann + flake8 examples tests pyriemann - name: Test with pytest run: | pytest diff --git a/doc/whatsnew.rst b/doc/whatsnew.rst index d8c573d0..2c8ebc0b 100644 --- a/doc/whatsnew.rst +++ b/doc/whatsnew.rst @@ -22,6 +22,8 @@ v0.2.8.dev - Add Schaefer-Strimmer covariance estimator in :func:`pyriemann.utils.covariance.covariances`, and an example to compare estimators +- Refactor tests + fix refit of :class:`pyriemann.tangentspace.TangentSpace` + v0.2.7 (June 2021) ------------------ diff --git a/pyriemann/clustering.py b/pyriemann/clustering.py index e16d5772..2e2571a1 100644 --- a/pyriemann/clustering.py +++ b/pyriemann/clustering.py @@ -80,15 +80,12 @@ class Kmeans(BaseEstimator, ClassifierMixin, ClusterMixin, TransformerMixin): The generator used to initialize the centers. If an integer is given, it fixes the seed. Defaults to the global numpy random number generator. - init : 'k-means++', 'random' or an ndarray (default 'random') + init : 'random' or an ndarray (default 'random') Method for initialization of centers. - 'k-means++' : selects initial cluster centers for k-mean - clustering in a smart way to speed up convergence. See section - Notes in k_init for more details. 'random': choose k observations (rows) at random from data for the initial centroids. - If an ndarray is passed, it should be of shape (n_clusters, n_features) - and gives the initial centers. + If an ndarray is passed, it should be of shape + (n_clusters, n_channels, n_channels) and gives the initial centers. n_init : int, (default: 10) Number of time the k-means algorithm will be run with different centroid seeds. The final results will be the best output of @@ -151,7 +148,7 @@ def fit(self, X, y=None): self : Kmeans instance The Kmean instance. """ - if (self.init != 'random') | (self.n_init == 1): + if (self.init != 'random'): # no need to iterate if init is not random labels, inertia, mdm = _fit_single(X, y, n_clusters=self.n_clusters, diff --git a/pyriemann/spatialfilters.py b/pyriemann/spatialfilters.py index e8271033..8ffed20f 100644 --- a/pyriemann/spatialfilters.py +++ b/pyriemann/spatialfilters.py @@ -185,7 +185,7 @@ def __init__(self, filters, log=False): self.log = log def fit(self, X, y): - """Train CSP spatial filters. + """Train BilinearFilter spatial filters. Parameters ---------- @@ -196,8 +196,8 @@ def fit(self, X, y): Returns ------- - self : CSP instance - The CSP instance. + self : BilinearFilter instance + The BilinearFilter instance. """ self.filters_ = self.filters return self diff --git a/pyriemann/tangentspace.py b/pyriemann/tangentspace.py index ed1d3edb..6663569a 100644 --- a/pyriemann/tangentspace.py +++ b/pyriemann/tangentspace.py @@ -164,7 +164,6 @@ def fit_transform(self, X, y=None, sample_weight=None): the tangent space projection of the matrices. """ # compute mean covariance - self._check_reference_points(X) self.reference_ = mean_covariance(X, metric=self.metric, sample_weight=sample_weight) return tangent_space(X, self.reference_) diff --git a/tests/conftest.py b/tests/conftest.py index 31194dc5..7ecb2226 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,4 +1,5 @@ import pytest +from pytest import approx import numpy as np from functools import partial @@ -22,27 +23,178 @@ def requires_module(function, name, call=None): requires_seaborn = partial(requires_module, name="seaborn") -def generate_cov(n_trials, n_channels): - """Generate a set of cavariances matrices for test purpose""" - rs = np.random.RandomState(1234) +def generate_cov(n_trials, n_channels, rs, return_params=False): + """Generate a set of covariances matrices for test purpose""" diags = 2.0 + 0.1 * rs.randn(n_trials, n_channels) A = 2 * rs.rand(n_channels, n_channels) - 1 A /= np.linalg.norm(A, axis=1)[:, np.newaxis] covmats = np.empty((n_trials, n_channels, n_channels)) for i in range(n_trials): covmats[i] = A @ np.diag(diags[i]) @ A.T - return covmats, diags, A + if return_params: + return covmats, diags, A + else: + return covmats + + +@pytest.fixture +def rndstate(): + return np.random.RandomState(1234) + + +@pytest.fixture +def get_covmats(rndstate): + def _gen_cov(n_trials, n_chan): + return generate_cov(n_trials, n_chan, rndstate, return_params=False) + + return _gen_cov + + +@pytest.fixture +def get_covmats_params(rndstate): + def _gen_cov_params(n_trials, n_chan): + return generate_cov(n_trials, n_chan, rndstate, return_params=True) + + return _gen_cov_params + + +@pytest.fixture +def get_labels(): + def _get_labels(n_trials, n_classes): + return np.arange(n_classes).repeat(n_trials // n_classes) + + return _get_labels + + +def is_positive_semi_definite(X): + """Check if all matrices are positive semi-definite. + + Parameters + ---------- + X : ndarray, shape (..., n, n) + The set of square matrices, at least 2D ndarray. + + Returns + ------- + ret : boolean + True if all matrices are positive semi-definite. + """ + cs = X.shape[-1] + return np.all(np.linalg.eigvals(X.reshape((-1, cs, cs))) >= 0.0) + + +def is_positive_definite(X): + """Check if all matrices are positive definite. + + Parameters + ---------- + X : ndarray, shape (..., n, n) + The set of square matrices, at least 2D ndarray. + + Returns + ------- + ret : boolean + True if all matrices are positive definite. + """ + cs = X.shape[-1] + return np.all(np.linalg.eigvals(X.reshape((-1, cs, cs))) > 0.0) + + +def is_symmetric(X): + """Check if all matrices are symmetric. + + Parameters + ---------- + X : ndarray, shape (..., n, n) + The set of square matrices, at least 2D ndarray. + + Returns + ------- + ret : boolean + True if all matrices are symmetric. + """ + return X == approx(np.swapaxes(X, -2, -1)) @pytest.fixture -def covmats(): - """Generate covariance matrices for test""" - covmats, _, _ = generate_cov(6, 3) - return covmats +def is_spd(): + """Check if all matrices are symmetric positive-definite. + + Parameters + ---------- + X : ndarray, shape (..., n, n) + The set of square matrices, at least 2D ndarray. + + Returns + ------- + ret : boolean + True if all matrices are symmetric positive-definite. + """ + + def _is_spd(X): + return is_symmetric(X) and is_positive_definite(X) + + return _is_spd @pytest.fixture -def many_covmats(): - """Generate covariance matrices for test""" - covmats, _, _ = generate_cov(100, 3) - return covmats +def is_spsd(): + """Check if all matrices are symmetric positive semi-definite. + + Parameters + ---------- + X : ndarray, shape (..., n, n) + The set of square matrices, at least 2D ndarray. + + Returns + ------- + ret : boolean + True if all matrices are symmetric positive semi-definite. + """ + + def _is_spsd(X): + return is_symmetric(X) and is_positive_semi_definite(X) + + return _is_spsd + + +def get_distances(): + distances = [ + "riemann", + "logeuclid", + "euclid", + "logdet", + "kullback", + "kullback_right", + "kullback_sym", + ] + for dist in distances: + yield dist + + +def get_means(): + means = [ + "riemann", + "logeuclid", + "euclid", + "logdet", + "identity", + "wasserstein", + "ale", + "harmonic", + "kullback_sym", + ] + for mean in means: + yield mean + + +def get_metrics(): + metrics = [ + "riemann", + "logeuclid", + "euclid", + "logdet", + "kullback_sym", + ] + for met in metrics: + yield met diff --git a/tests/test_ajd.py b/tests/test_ajd.py index 0fe9afa7..a1c801d5 100644 --- a/tests/test_ajd.py +++ b/tests/test_ajd.py @@ -1,73 +1,90 @@ from numpy.testing import assert_array_equal import numpy as np -from numpy.testing import assert_allclose import pytest +from pytest import approx from pyriemann.utils.ajd import rjd, ajd_pham, uwedge, _get_normalized_weight -def generate_cov(Nt, Ne): - """Generate a set of cavariances matrices for test purpose""" - rs = np.random.RandomState(1234) - diags = 2.0 + 0.1 * rs.randn(Nt, Ne) - A = 2*rs.rand(Ne, Ne) - 1 - A /= np.atleast_2d(np.sqrt(np.sum(A**2, 1))).T - covmats = np.empty((Nt, Ne, Ne)) - for i in range(Nt): - covmats[i] = np.dot(np.dot(A, np.diag(diags[i])), A.T) - return covmats, diags, A - - -def test_get_normalized_weight(): +def test_get_normalized_weight(get_covmats): """Test get_normalized_weight""" - Nt = 100 - covmats, diags, A = generate_cov(Nt, 3) + n_trials, n_channels = 5, 3 + covmats = get_covmats(n_trials, n_channels) w = _get_normalized_weight(None, covmats) - assert np.isclose(np.sum(w), 1., atol=1e-10) + assert np.sum(w) == approx(1.0, abs=1e-10) + +def test_get_normalized_weight_length(get_covmats): + n_trials, n_channels = 5, 3 + covmats = get_covmats(n_trials, n_channels) + w = _get_normalized_weight(None, covmats) with pytest.raises(ValueError): # not same length - _get_normalized_weight(w[:Nt//2], covmats) + _get_normalized_weight(w[: n_trials // 2], covmats) + + +def test_get_normalized_weight_pos(get_covmats): + n_trials, n_channels = 5, 3 + covmats = get_covmats(n_trials, n_channels) + w = _get_normalized_weight(None, covmats) with pytest.raises(ValueError): # not strictly positive weight w[0] = 0 _get_normalized_weight(w, covmats) -def test_rjd(): - """Test rjd""" - covmats, _, _ = generate_cov(100, 3) +@pytest.mark.parametrize("ajd", [rjd, ajd_pham]) +def test_ajd_shape(ajd, get_covmats): + n_trials, n_channels = 5, 3 + covmats = get_covmats(n_trials, n_channels) V, D = rjd(covmats) - assert V.shape == (3, 3) - assert D.shape == (100, 3, 3) + assert V.shape == (n_channels, n_channels) + assert D.shape == (n_trials, n_channels, n_channels) -def test_pham(): +def test_pham(get_covmats): """Test pham's ajd""" - Nt = 100 - covmats, diags, A = generate_cov(Nt, 3) + n_trials, n_channels, w_val = 5, 3, 2 + covmats = get_covmats(n_trials, n_channels) V, D = ajd_pham(covmats) + assert V.shape == (n_channels, n_channels) + assert D.shape == (n_trials, n_channels, n_channels) - w = 5 * np.ones(Nt) - Vw, Dw = ajd_pham(covmats, sample_weight=w) + Vw, Dw = ajd_pham(covmats, sample_weight=w_val * np.ones(n_trials)) assert_array_equal(V, Vw) # same result as ajd_pham without weight assert_array_equal(D, Dw) + +def test_pham_pos_weight(get_covmats): # Test that weight must be strictly positive + n_trials, n_channels, w_val = 5, 3, 2 + covmats = get_covmats(n_trials, n_channels) + w = w_val * np.ones(n_trials) with pytest.raises(ValueError): # not strictly positive weight w[0] = 0 ajd_pham(covmats, sample_weight=w) + +def test_pham_zero_weight(get_covmats): # now test that setting one weight to almost zero it's almost # like not passing the matrix - V, D = ajd_pham(covmats[1:]) + n_trials, n_channels, w_val = 5, 3, 2 + covmats = get_covmats(n_trials, n_channels) + w = w_val * np.ones(n_trials) + V, D = ajd_pham(covmats[1:], sample_weight=w[1:]) w[0] = 1e-12 Vw, Dw = ajd_pham(covmats, sample_weight=w) - assert_allclose(V, Vw, rtol=1e-4, atol=1e-8) - assert_allclose(D, Dw[1:], rtol=1e-4, atol=1e-8) + assert V == approx(Vw, rel=1e-4, abs=1e-8) + assert D == approx(Dw[1:], rel=1e-4, abs=1e-8) -def test_uwedge(): +@pytest.mark.parametrize("init", [True, False]) +def test_uwedge(init, get_covmats_params): """Test uwedge.""" - covmats, diags, A = generate_cov(100, 3) - V, D = uwedge(covmats) - V, D = uwedge(covmats, init=A) + n_trials, n_channels = 5, 3 + covmats, _, A = get_covmats_params(n_trials, n_channels) + if init: + V, D = uwedge(covmats) + else: + V, D = uwedge(covmats, init=A) + assert V.shape == (n_channels, n_channels) + assert D.shape == (n_trials, n_channels, n_channels) diff --git a/tests/test_channelselection.py b/tests/test_channelselection.py index 3d1cd063..a4bdc671 100644 --- a/tests/test_channelselection.py +++ b/tests/test_channelselection.py @@ -1,31 +1,23 @@ -"""Test for channel selection.""" -import numpy as np +from numpy.testing import assert_array_equal from pyriemann.channelselection import ElectrodeSelection, FlatChannelRemover -def generate_cov(Nt, Ne): - """Generate a set of cavariances matrices for test purpose.""" - diags = 1.0+0.1*np.random.randn(Nt, Ne) - covmats = np.empty((Nt, Ne, Ne)) - for i in range(Nt): - covmats[i] = np.diag(diags[i]) - return covmats - - -def test_ElectrodeSelection_transform(): +def test_ElectrodeSelection_transform(get_covmats, get_labels): """Test transform of channelselection.""" - covset = generate_cov(10, 30) - labels = np.array([0, 1]).repeat(5) + n_trials, n_channels, n_classes = 10, 30, 2 + covset = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) se = ElectrodeSelection() se.fit(covset, labels) se.transform(covset) -def test_FlatChannelRemover(): - X = np.random.rand(100, 10, 3) +def test_FlatChannelRemover(rndstate): + n_times, n_trials, n_channels = 100, 10, 3 + X = rndstate.rand(n_times, n_trials, n_channels) X[:, 0, :] = 999 fcr = FlatChannelRemover() fcr.fit(X) - np.testing.assert_array_equal(fcr.channels_, range(1, 10)) + assert_array_equal(fcr.channels_, range(1, 10)) Xt = fcr.fit_transform(X) - np.testing.assert_array_equal(Xt, X[:, 1:, :]) + assert_array_equal(Xt, X[:, 1:, :]) diff --git a/tests/test_classification.py b/tests/test_classification.py index 08d6e68a..b5529b1f 100644 --- a/tests/test_classification.py +++ b/tests/test_classification.py @@ -1,117 +1,196 @@ +from conftest import get_distances, get_means, get_metrics import numpy as np from numpy.testing import assert_array_equal +from pyriemann.classification import MDM, FgMDM, KNearestNeighbor, TSclassifier +from pyriemann.estimation import Covariances import pytest -from pyriemann.classification import (MDM, FgMDM, KNearestNeighbor, - TSclassifier) - - -def generate_cov(Nt, Ne): - """Generate a set of cavariances matrices for test purpose.""" - rs = np.random.RandomState(1234) - diags = 2.0 + 0.1 * rs.randn(Nt, Ne) - A = 2*rs.rand(Ne, Ne) - 1 - A /= np.atleast_2d(np.sqrt(np.sum(A**2, 1))).T - covmats = np.empty((Nt, Ne, Ne)) - for i in range(Nt): - covmats[i] = np.dot(np.dot(A, np.diag(diags[i])), A.T) - return covmats - - -def test_MDM_init(): - """Test init of MDM""" - MDM(metric='riemann') - - # Should raise if metric not string or dict - with pytest.raises(TypeError): - MDM(metric=42) - - # Should raise if metric is not contain bad keys +from pytest import approx +from sklearn.dummy import DummyClassifier + + +rclf = [MDM, FgMDM, KNearestNeighbor, TSclassifier] + + +@pytest.mark.parametrize("classif", rclf) +class ClassifierTestCase: + def test_two_classes(self, classif, get_covmats, get_labels): + n_classes, n_trials, n_channels = 2, 6, 3 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) + self.clf_predict(classif, covmats, labels) + self.clf_fit_independence(classif, covmats, labels) + if classif is MDM: + self.clf_fitpredict(classif, covmats, labels) + if classif in (MDM, FgMDM): + self.clf_transform(classif, covmats, labels) + if classif in (MDM, FgMDM, KNearestNeighbor): + self.clf_jobs(classif, covmats, labels) + if classif in (MDM, FgMDM, TSclassifier): + self.clf_predict_proba(classif, covmats, labels) + self.clf_populate_classes(classif, covmats, labels) + if classif is KNearestNeighbor: + self.clf_predict_proba_trials(classif, covmats, labels) + if classif is (FgMDM, TSclassifier): + self.clf_tsupdate(classif, covmats, labels) + + def test_multi_classes(self, classif, get_covmats, get_labels): + n_classes, n_trials, n_channels = 3, 9, 3 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) + self.clf_fit_independence(classif, covmats, labels) + self.clf_predict(classif, covmats, labels) + if classif is MDM: + self.clf_fitpredict(classif, covmats, labels) + if classif in (MDM, FgMDM): + self.clf_transform(classif, covmats, labels) + if classif in (MDM, FgMDM, KNearestNeighbor): + self.clf_jobs(classif, covmats, labels) + if classif in (MDM, FgMDM, TSclassifier): + self.clf_predict_proba(classif, covmats, labels) + self.clf_populate_classes(classif, covmats, labels) + if classif is KNearestNeighbor: + self.clf_predict_proba_trials(classif, covmats, labels) + if classif is (FgMDM, TSclassifier): + self.clf_tsupdate(classif, covmats, labels) + + +class TestClassifier(ClassifierTestCase): + def clf_predict(self, classif, covmats, labels): + n_trials = len(labels) + clf = classif() + clf.fit(covmats, labels) + predicted = clf.predict(covmats) + assert predicted.shape == (n_trials,) + + def clf_predict_proba(self, classif, covmats, labels): + n_trials = len(labels) + n_classes = len(np.unique(labels)) + clf = classif() + clf.fit(covmats, labels) + probabilities = clf.predict_proba(covmats) + assert probabilities.shape == (n_trials, n_classes) + assert probabilities.sum(axis=1) == approx(np.ones(n_trials)) + + def clf_predict_proba_trials(self, classif, covmats, labels): + n_trials = len(labels) + # n_classes = len(np.unique(labels)) + clf = classif() + clf.fit(covmats, labels) + probabilities = clf.predict_proba(covmats) + assert probabilities.shape == (n_trials, n_trials) + assert probabilities.sum(axis=1) == approx(np.ones(n_trials)) + + def clf_fitpredict(self, classif, covmats, labels): + clf = classif() + clf.fit_predict(covmats, labels) + assert_array_equal(clf.classes_, np.unique(labels)) + + def clf_transform(self, classif, covmats, labels): + n_trials = len(labels) + n_classes = len(np.unique(labels)) + clf = classif() + clf.fit(covmats, labels) + transformed = clf.transform(covmats) + assert transformed.shape == (n_trials, n_classes) + + def clf_fit_independence(self, classif, covmats, labels): + clf = classif() + clf.fit(covmats, labels).predict(covmats) + # retraining with different size should erase previous fit + new_covmats = covmats[:, :-1, :-1] + clf.fit(new_covmats, labels).predict(new_covmats) + + def clf_jobs(self, classif, covmats, labels): + clf = classif(n_jobs=2) + clf.fit(covmats, labels) + clf.predict(covmats) + + def clf_populate_classes(self, classif, covmats, labels): + clf = classif() + clf.fit(covmats, labels) + assert_array_equal(clf.classes_, np.unique(labels)) + + def clf_classif_tsupdate(self, classif, covmats, labels): + clf = classif(tsupdate=True) + clf.fit(covmats, labels).predict(covmats) + + +@pytest.mark.parametrize("classif", [MDM, FgMDM, TSclassifier]) +@pytest.mark.parametrize("mean", ["faulty", 42]) +@pytest.mark.parametrize("dist", ["not_real", 27]) +def test_metric_dict_error(classif, mean, dist, get_covmats, get_labels): + with pytest.raises((TypeError, KeyError)): + n_trials, n_channels, n_classes = 6, 3, 2 + labels = get_labels(n_trials, n_classes) + covmats = get_covmats(n_trials, n_channels) + clf = classif(metric={"mean": mean, "distance": dist}) + clf.fit(covmats, labels).predict(covmats) + + +@pytest.mark.parametrize("classif", [MDM, FgMDM]) +@pytest.mark.parametrize("mean", get_means()) +@pytest.mark.parametrize("dist", get_distances()) +def test_metric_dist(classif, mean, dist, get_covmats, get_labels): + n_trials, n_channels, n_classes = 4, 3, 2 + labels = get_labels(n_trials, n_classes) + covmats = get_covmats(n_trials, n_channels) + clf = classif(metric={"mean": mean, "distance": dist}) + clf.fit(covmats, labels).predict(covmats) + + +@pytest.mark.parametrize("classif", rclf) +@pytest.mark.parametrize("metric", [42, "faulty", {"foo": "bar"}]) +def test_metric_wrong_keys(classif, metric, get_covmats, get_labels): + with pytest.raises((TypeError, KeyError)): + n_trials, n_channels, n_classes = 6, 3, 2 + labels = get_labels(n_trials, n_classes) + covmats = get_covmats(n_trials, n_channels) + clf = classif(metric=metric) + clf.fit(covmats, labels).predict(covmats) + + +@pytest.mark.parametrize("classif", rclf) +@pytest.mark.parametrize("metric", get_metrics()) +def test_metric_str(classif, metric, get_covmats, get_labels): + n_trials, n_channels, n_classes = 6, 3, 2 + labels = get_labels(n_trials, n_classes) + covmats = get_covmats(n_trials, n_channels) + clf = classif(metric=metric) + clf.fit(covmats, labels).predict(covmats) + + +@pytest.mark.parametrize("dist", ["not_real", 42]) +def test_knn_dict_dist(dist, get_covmats, get_labels): with pytest.raises(KeyError): - MDM(metric={'universe': 42}) - - # should works with correct dict - MDM(metric={'mean': 'riemann', 'distance': 'logeuclid'}) - - -def test_MDM_fit(): - """Test Fit of MDM""" - covset = generate_cov(100, 3) - labels = np.array([0, 1]).repeat(50) - mdm = MDM(metric='riemann') - mdm.fit(covset, labels) - - -def test_MDM_predict(): - """Test prediction of MDM""" - covset = generate_cov(100, 3) - labels = np.array([0, 1]).repeat(50) - mdm = MDM(metric='riemann') - mdm.fit(covset, labels) - mdm.predict(covset) - - # test fit_predict - mdm = MDM(metric='riemann') - mdm.fit_predict(covset, labels) - - # test transform - mdm.transform(covset) - - # predict proba - mdm.predict_proba(covset) - - # test n_jobs - mdm = MDM(metric='riemann', n_jobs=2) - mdm.fit(covset, labels) - mdm.predict(covset) - - -def test_KNN(): - """Test KNearestNeighbor""" - covset = generate_cov(30, 3) - labels = np.array([0, 1, 2]).repeat(10) - - knn = KNearestNeighbor(1, metric='riemann') - knn.fit(covset, labels) - preds = knn.predict(covset) + n_trials, n_channels, n_classes = 6, 3, 2 + labels = get_labels(n_trials, n_classes) + covmats = get_covmats(n_trials, n_channels) + clf = KNearestNeighbor(metric={"distance": dist}) + clf.fit(covmats, labels).predict(covmats) + + +def test_1NN(get_covmats, get_labels): + """Test KNearestNeighbor with K=1""" + n_trials, n_channels, n_classes = 9, 3, 3 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) + + knn = KNearestNeighbor(1, metric="riemann") + knn.fit(covmats, labels) + preds = knn.predict(covmats) assert_array_equal(labels, preds) -def test_TSclassifier(): +def test_TSclassifier_classifier(get_covmats, get_labels): """Test TS Classifier""" - covset = generate_cov(40, 3) - labels = np.array([0, 1]).repeat(20) - - with pytest.raises(TypeError): - TSclassifier(clf='666') + n_trials, n_channels, n_classes = 6, 3, 2 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) + clf = TSclassifier(clf=DummyClassifier()) + clf.fit(covmats, labels).predict(covmats) - clf = TSclassifier() - clf.fit(covset, labels) - assert_array_equal(clf.classes_, np.array([0, 1])) - clf.predict(covset) - clf.predict_proba(covset) - -def test_FgMDM_init(): - """Test init of FgMDM""" - FgMDM(metric='riemann') - - # Should raise if metric not string or dict +def test_TSclassifier_classifier_error(): + """Test TS if not Classifier""" with pytest.raises(TypeError): - FgMDM(metric=42) - - # Should raise if metric is not contain bad keys - with pytest.raises(KeyError): - FgMDM(metric={'universe': 42}) - - # should works with correct dict - FgMDM(metric={'mean': 'riemann', 'distance': 'logeuclid'}) - - -def test_FgMDM_predict(): - """Test prediction of FgMDM""" - covset = generate_cov(100, 3) - labels = np.array([0, 1]).repeat(50) - fgmdm = FgMDM(metric='riemann') - fgmdm.fit(covset, labels) - fgmdm.predict(covset) - fgmdm.transform(covset) + TSclassifier(clf=Covariances()) diff --git a/tests/test_clustering.py b/tests/test_clustering.py index 4b28d9dd..63f41e72 100644 --- a/tests/test_clustering.py +++ b/tests/test_clustering.py @@ -1,128 +1,223 @@ +from conftest import get_metrics import numpy as np from numpy.testing import assert_array_equal import pytest from pyriemann.clustering import Kmeans, KmeansPerClassTransform, Potato -def generate_cov(Nt, Ne): - """Generate a set of cavariances matrices for test purpose.""" - rs = np.random.RandomState(1234) - diags = 2.0 + 0.1 * rs.randn(Nt, Ne) - A = 2*rs.rand(Ne, Ne) - 1 - A /= np.atleast_2d(np.sqrt(np.sum(A**2, 1))).T - covmats = np.empty((Nt, Ne, Ne)) - for i in range(Nt): - covmats[i] = np.dot(np.dot(A, np.diag(diags[i])), A.T) - return covmats - - -def test_Kmeans_init(): - """Test Kmeans""" - covset = generate_cov(20, 3) - labels = np.array([0, 1]).repeat(10) - - # init - km = Kmeans(2) - - # fit - km.fit(covset) - - # fit with init - km = Kmeans(2, init=covset[0:2]) - km.fit(covset) - - # fit with labels - km.fit(covset, y=labels) - - # predict - km.predict(covset) - - # transform - km.transform(covset) - - # n_jobs - km = Kmeans(2, n_jobs=2) - km.fit(covset) +@pytest.mark.parametrize("clust", [Kmeans, KmeansPerClassTransform, Potato]) +class ClusteringTestCase: + def test_two_clusters(self, clust, get_covmats, get_labels): + n_clusters = 2 + n_trials, n_channels = 6, 3 + covmats = get_covmats(n_trials, n_channels) + if clust is Kmeans: + self.clf_predict(clust, covmats, n_clusters) + self.clf_transform(clust, covmats, n_clusters) + self.clf_jobs(clust, covmats, n_clusters) + self.clf_centroids(clust, covmats, n_clusters) + self.clf_fit_independence(clust, covmats) + if clust is KmeansPerClassTransform: + n_classes = 2 + labels = get_labels(n_trials, n_classes) + self.clf_transform_per_class(clust, covmats, n_clusters, labels) + self.clf_jobs(clust, covmats, n_clusters, labels) + self.clf_fit_labels_independence(clust, covmats, labels) + if clust is Potato: + self.clf_transform(clust, covmats) + self.clf_predict(clust, covmats) + self.clf_predict_proba(clust, covmats) + self.clf_partial_fit(clust, covmats) + self.clf_fit_independence(clust, covmats) + + def test_three_clusters(self, clust, get_covmats, get_labels): + n_clusters = 3 + n_trials, n_channels = 6, 3 + covmats = get_covmats(n_trials, n_channels) + if clust is Kmeans: + self.clf_predict(clust, covmats, n_clusters) + self.clf_transform(clust, covmats, n_clusters) + self.clf_jobs(clust, covmats, n_clusters) + self.clf_centroids(clust, covmats, n_clusters) + self.clf_fit_independence(clust, covmats) + if clust is KmeansPerClassTransform: + n_classes = 2 + labels = get_labels(n_trials, n_classes) + self.clf_transform_per_class(clust, covmats, n_clusters, labels) + self.clf_jobs(clust, covmats, n_clusters, labels) + self.clf_fit_labels_independence(clust, covmats, labels) + + +class TestRiemannianClustering(ClusteringTestCase): + def clf_transform(self, clust, covmats, n_clusters=None): + n_trials = covmats.shape[0] + if n_clusters is None: + clf = clust() + else: + clf = clust(n_clusters=n_clusters) + clf.fit(covmats) + transformed = clf.transform(covmats) + if n_clusters is None: + assert transformed.shape == (n_trials,) + else: + assert transformed.shape == (n_trials, n_clusters) + + def clf_jobs(self, clust, covmats, n_clusters, labels=None): + n_trials = covmats.shape[0] + clf = clust(n_clusters=n_clusters, n_jobs=2) + if labels is None: + clf.fit(covmats) + else: + clf.fit(covmats, labels) + transformed = clf.transform(covmats) + assert len(transformed) == (n_trials) + + def clf_centroids(self, clust, covmats, n_clusters): + _, n_channels, n_channels = covmats.shape + clf = clust(n_clusters=n_clusters).fit(covmats) + centroids = clf.centroids() + shape = (n_clusters, n_channels, n_channels) + assert np.array(centroids).shape == shape + + def clf_transform_per_class(self, clust, covmats, n_clusters, labels): + n_classes = len(np.unique(labels)) + n_trials = covmats.shape[0] + clf = clust(n_clusters=n_clusters) + clf.fit(covmats, labels) + transformed = clf.transform(covmats) + assert transformed.shape == (n_trials, n_classes * n_clusters) + + def clf_predict(self, clust, covmats, n_clusters=None): + n_trials = covmats.shape[0] + if n_clusters is None: + clf = clust() + else: + clf = clust(n_clusters=n_clusters) + clf.fit(covmats) + predicted = clf.predict(covmats) + assert predicted.shape == (n_trials,) + + def clf_predict_proba(self, clust, covmats): + n_trials = covmats.shape[0] + clf = clust() + clf.fit(covmats) + probabilities = clf.predict(covmats) + assert probabilities.shape == (n_trials,) + + def clf_partial_fit(self, clust, covmats): + clf = clust() + clf.fit(covmats) + clf.partial_fit(covmats) + clf.partial_fit(covmats[np.newaxis, 0]) # fit one sample at a time + + def clf_fit_independence(self, clust, covmats): + clf = clust() + clf.fit(covmats).transform(covmats) + # retraining with different size should erase previous fit + new_covmats = covmats[:, :-1, :-1] + clf.fit(new_covmats).transform(new_covmats) + + def clf_fit_labels_independence(self, clust, covmats, labels): + clf = clust() + clf.fit(covmats, labels).transform(covmats) + # retraining with different size should erase previous fit + new_covmats = covmats[:, :-1, :-1] + clf.fit(new_covmats, labels).transform(new_covmats) + + +@pytest.mark.parametrize("clust", [Kmeans, KmeansPerClassTransform]) +@pytest.mark.parametrize("init", ["random", "ndarray"]) +@pytest.mark.parametrize("n_init", [1, 5]) +@pytest.mark.parametrize("metric", get_metrics()) +def test_km_init_metric(clust, init, n_init, metric, get_covmats, get_labels): + n_clusters, n_trials, n_channels = 2, 6, 3 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_clusters) + if init == "ndarray": + clf = clust( + n_clusters=n_clusters, + metric=metric, + init=covmats[:n_clusters], + n_init=n_init, + ) + else: + clf = clust( + n_clusters=n_clusters, metric=metric, init=init, n_init=n_init + ) + clf.fit(covmats, labels) + transformed = clf.transform(covmats) + assert len(transformed) == n_trials + + +def test_Potato_equal_labels(): + with pytest.raises(ValueError): + Potato(pos_label=0) -def test_KmeansPCT_init(): - """Test Kmeans PCT""" - covset = generate_cov(20, 3) - labels = np.array([0, 1]).repeat(10) +@pytest.mark.parametrize("y_fail", [[1], [0] * 6, [0] * 7, [0, 1, 2] * 2]) +def test_Potato_fit_error(y_fail, get_covmats): + n_trials, n_channels = 6, 3 + covmats = get_covmats(n_trials, n_channels) + with pytest.raises(ValueError): + Potato().fit(covmats, y=y_fail) - # init - km = KmeansPerClassTransform(2) - # fit - km.fit(covset, labels) +def test_Potato_partial_fit_not_fitted(get_covmats): + n_trials, n_channels = 6, 3 + covmats = get_covmats(n_trials, n_channels) + with pytest.raises(ValueError): # potato not fitted + Potato().partial_fit(covmats) - # transform - km.transform(covset) +def test_Potato_partial_fit_diff_channels(get_covmats, get_labels): + n_trials, n_channels, n_classes = 6, 3, 2 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) + pt = Potato().fit(covmats, labels) + with pytest.raises(ValueError): # unequal # of chans + pt.partial_fit(get_covmats(2, n_channels + 1)) -def test_Potato_init(): - """Test Potato""" - n_trials, n_channels = 20, 3 - covset = generate_cov(n_trials, n_channels) - cov = covset[0][np.newaxis, ...] # to test potato with a single trial - labels = np.array([0, 1]).repeat(n_trials // 2) - # init - with pytest.raises(ValueError): # positive and neg labels equal - Potato(pos_label=0) - pt = Potato() +def test_Potato_partial_fit_no_poslabel(get_covmats, get_labels): + n_trials, n_channels, n_classes = 6, 3, 2 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) + pt = Potato().fit(covmats, labels) + with pytest.raises(ValueError): # no positive labels + pt.partial_fit(covmats, [0] * n_trials) - # fit no labels - pt.fit(covset) - # fit with labels +@pytest.mark.parametrize("alpha", [-0.1, 1.1]) +def test_Potato_partial_fit_alpha(alpha, get_covmats, get_labels): + n_trials, n_channels, n_classes = 6, 3, 2 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) + pt = Potato().fit(covmats, labels) with pytest.raises(ValueError): - pt.fit(covset, y=[1]) - with pytest.raises(ValueError): - pt.fit(covset, y=[0] * 20) - with pytest.raises(ValueError): - pt.fit(covset, y=[0, 2, 3] + [1] * 17) - pt.fit(covset, labels) + pt.partial_fit(covmats, labels, alpha=alpha) - # partial_fit - with pytest.raises(ValueError): # potato not fitted - Potato().partial_fit(covset) - with pytest.raises(ValueError): # unequal # of chans - pt.partial_fit(generate_cov(2, n_channels + 1)) - with pytest.raises(ValueError): # alpha < 0 - pt.partial_fit(covset, labels, alpha=-0.1) - with pytest.raises(ValueError): # alpha > 1 - pt.partial_fit(covset, labels, alpha=1.1) - with pytest.raises(ValueError): # no positive labels - pt.partial_fit(covset, [0] * n_trials) - pt.partial_fit(covset, labels, alpha=0.6) - pt.partial_fit(cov, alpha=0.1) - - # transform - pt.transform(covset) - pt.transform(cov) - # predict - pt.predict(covset) - pt.predict(cov) - - # predict_proba - pt.predict_proba(covset) - pt.predict_proba(cov) +def test_Potato_1channel(get_covmats): + n_trials, n_channels = 6, 1 + covmats_1chan = get_covmats(n_trials, n_channels) + pt = Potato() + pt.fit_transform(covmats_1chan) + pt.predict(covmats_1chan) + pt.predict_proba(covmats_1chan) - # potato with a single channel - covset_1chan = generate_cov(n_trials, 1) - pt.fit_transform(covset_1chan) - pt.predict(covset_1chan) - pt.predict_proba(covset_1chan) - # lower threshold +def test_Potato_threshold(get_covmats): + n_trials, n_channels = 6, 3 + covmats = get_covmats(n_trials, n_channels) pt = Potato(threshold=1) - pt.fit(covset) + pt.fit(covmats) + - # test positive labels +def test_Potato_specific_labels(get_covmats): + n_trials, n_channels = 6, 3 + covmats = get_covmats(n_trials, n_channels) pt = Potato(threshold=1, pos_label=2, neg_label=7) - pt.fit(covset) - assert_array_equal(np.unique(pt.predict(covset)), [2, 7]) + pt.fit(covmats) + assert_array_equal(np.unique(pt.predict(covmats)), [2, 7]) # fit with custom positive label - pt.fit(covset, y=[2]*n_trials) + pt.fit(covmats, y=[2] * n_trials) diff --git a/tests/test_embedding.py b/tests/test_embedding.py index 592aa3ad..1a3d16f2 100644 --- a/tests/test_embedding.py +++ b/tests/test_embedding.py @@ -1,23 +1,24 @@ -import numpy as np - +from conftest import get_metrics from pyriemann.embedding import Embedding -from numpy.testing import assert_array_equal +import pytest -def generate_cov(Nt, Ne): - """Generate a set of cavariances matrices for test purpose.""" - rs = np.random.RandomState(1234) - diags = 2.0 + 0.1 * rs.randn(Nt, Ne) - A = 2*rs.rand(Ne, Ne) - 1 - A /= np.atleast_2d(np.sqrt(np.sum(A**2, 1))).T - covmats = np.empty((Nt, Ne, Ne)) - for i in range(Nt): - covmats[i] = np.dot(np.dot(A, np.diag(diags[i])), A.T) - return covmats, diags, A +@pytest.mark.parametrize("metric", get_metrics()) +@pytest.mark.parametrize("eps", [None, 0.1]) +def test_embedding(metric, eps, get_covmats): + """Test Embedding.""" + n_trials, n_channels, n_comp = 6, 3, 2 + covmats = get_covmats(n_trials, n_channels) + embd = Embedding(metric=metric, n_components=n_comp, eps=eps) + covembd = embd.fit_transform(covmats) + assert covembd.shape == (n_trials, n_comp) -def test_embedding(): - """Test Embedding.""" - covmats, diags, A = generate_cov(100, 3) - embd = Embedding(metric='riemann', n_components=2).fit_transform(covmats) - assert_array_equal(embd.shape[1], 2) +def test_fit_independence(get_covmats): + n_trials, n_channels = 6, 3 + covmats = get_covmats(n_trials, n_channels) + embd = Embedding() + embd.fit_transform(covmats) + # retraining with different size should erase previous fit + new_covmats = covmats[:, :-1, :-1] + embd.fit_transform(new_covmats) diff --git a/tests/test_estimation.py b/tests/test_estimation.py index 867eecf2..2bb17c7b 100644 --- a/tests/test_estimation.py +++ b/tests/test_estimation.py @@ -1,91 +1,205 @@ -import numpy as np -from pyriemann.estimation import (Covariances, ERPCovariances, - XdawnCovariances, CospCovariances, - HankelCovariances, Coherences, Shrinkage) +from pyriemann.estimation import ( + Covariances, + ERPCovariances, + XdawnCovariances, + CospCovariances, + HankelCovariances, + Coherences, + Shrinkage, +) import pytest +estim = ["cov", "scm", "lwf", "oas", "mcd", "corr", "sch"] +coh = ["ordinary", "instantaneous", "lagged", "imaginary"] + -def test_covariances(): +@pytest.mark.parametrize("estimator", estim) +def test_covariances(estimator, rndstate, is_spd): """Test Covariances""" - x = np.random.randn(2, 3, 100) - cov = Covariances() + n_trials, n_channels, n_times = 2, 3, 100 + x = rndstate.randn(n_trials, n_channels, n_times) + cov = Covariances(estimator=estimator) cov.fit(x) - cov.fit_transform(x) - assert cov.get_params() == dict(estimator='scm') + covmats = cov.fit_transform(x) + assert cov.get_params() == dict(estimator=estimator) + assert covmats.shape == (n_trials, n_channels, n_channels) + assert is_spd(covmats) -def test_hankel_covariances(): +def test_hankel_covariances(rndstate, is_spd): """Test Hankel Covariances""" - x = np.random.randn(2, 3, 100) + n_trials, n_channels, n_times = 2, 3, 100 + x = rndstate.randn(n_trials, n_channels, n_times) cov = HankelCovariances() cov.fit(x) - cov.fit_transform(x) - assert cov.get_params() == dict(estimator='scm', delays=4) + covmats = cov.fit_transform(x) + assert cov.get_params() == dict(estimator="scm", delays=4) + assert covmats.shape == (n_trials, 4 * n_channels, 4 * n_channels) + assert is_spd(covmats) + +def test_hankel_covariances_delays(rndstate, is_spd): + n_trials, n_channels, n_times = 2, 3, 100 + x = rndstate.randn(n_trials, n_channels, n_times) cov = HankelCovariances(delays=[1, 2]) cov.fit(x) - cov.fit_transform(x) + covmats = cov.fit_transform(x) + assert covmats.shape == (n_trials, 3 * n_channels, 3 * n_channels) + assert is_spd(covmats) -def test_erp_covariances(): +@pytest.mark.parametrize("estimator", estim) +@pytest.mark.parametrize("svd", [None, 2]) +def test_erp_covariances(estimator, svd, rndstate, get_labels, is_spsd): """Test fit ERPCovariances""" - x = np.random.randn(10, 3, 100) - labels = np.array([0, 1]).repeat(5) - cov = ERPCovariances() - cov.fit_transform(x, labels) + n_classes, n_trials, n_channels, n_times = 2, 4, 3, 100 + x = rndstate.randn(n_trials, n_channels, n_times) + labels = get_labels(n_trials, n_classes) + cov = ERPCovariances(estimator=estimator, svd=svd) + covmats = cov.fit_transform(x, labels) + if svd is None: + covsize = (n_classes + 1) * n_channels + else: + covsize = n_classes * svd + n_channels + assert cov.get_params() == dict(classes=None, estimator=estimator, svd=svd) + assert covmats.shape == (n_trials, covsize, covsize) + assert is_spsd(covmats) + + +def test_erp_covariances_classes(rndstate, get_labels, is_spsd): + n_trials, n_channels, n_times, n_classes = 4, 3, 100, 2 + x = rndstate.randn(n_trials, n_channels, n_times) + labels = get_labels(n_trials, n_classes) cov = ERPCovariances(classes=[0]) - cov.fit_transform(x, labels) + covmats = cov.fit_transform(x, labels) + assert covmats.shape == (n_trials, 2 * n_channels, 2 * n_channels) + assert is_spsd(covmats) + + +def test_erp_covariances_svd_error(rndstate): # assert raise svd with pytest.raises(TypeError): - ERPCovariances(svd='42') - cov = ERPCovariances(svd=2) - assert cov.get_params() == dict(classes=None, estimator='scm', svd=2) - cov.fit_transform(x, labels) + ERPCovariances(svd="42") -def test_xdawn_covariances(): +@pytest.mark.parametrize("est", estim) +def test_xdawn_est(est, rndstate, get_labels, is_spsd): """Test fit XdawnCovariances""" - x = np.random.randn(10, 3, 100) - labels = np.array([0, 1]).repeat(5) - cov = XdawnCovariances() - cov.fit_transform(x, labels) - assert cov.get_params() == dict(nfilter=4, applyfilters=True, - classes=None, estimator='scm', - xdawn_estimator='scm', - baseline_cov=None) + n_classes, nfilter = 2, 2 + n_trials, n_channels, n_times = 4, 6, 100 + x = rndstate.randn(n_trials, n_channels, n_times) + labels = get_labels(n_trials, n_classes) + cov = XdawnCovariances(nfilter, estimator=est) + covmats = cov.fit_transform(x, labels) + assert cov.get_params() == dict( + nfilter=nfilter, + applyfilters=True, + classes=None, + estimator=est, + xdawn_estimator="scm", + baseline_cov=None, + ) + covsize = 2 * (n_classes * nfilter) + assert covmats.shape == (n_trials, covsize, covsize) + assert is_spsd(covmats) -def test_cosp_covariances(): +@pytest.mark.parametrize("xdawn_est", estim) +def test_xdawn_covariances_est(xdawn_est, rndstate, get_labels, is_spsd): + """Test fit XdawnCovariances""" + n_classes, nfilter = 2, 2 + n_trials, n_channels, n_times = 4, 6, 100 + x = rndstate.randn(n_trials, n_channels, n_times) + labels = get_labels(n_trials, n_classes) + cov = XdawnCovariances(nfilter, xdawn_estimator=xdawn_est) + covmats = cov.fit_transform(x, labels) + assert cov.get_params() == dict( + nfilter=nfilter, + applyfilters=True, + classes=None, + estimator="scm", + xdawn_estimator=xdawn_est, + baseline_cov=None, + ) + covsize = 2 * (n_classes * nfilter) + assert covmats.shape == (n_trials, covsize, covsize) + assert is_spsd(covmats) + + +@pytest.mark.parametrize("nfilter", [2, 4]) +def test_xdawn_covariances_nfilter(nfilter, rndstate, get_labels, is_spsd): + """Test fit XdawnCovariances""" + n_classes, n_trials, n_channels, n_times = 2, 4, 8, 100 + x = rndstate.randn(n_trials, n_channels, n_times) + labels = get_labels(n_trials, n_classes) + cov = XdawnCovariances(nfilter=nfilter) + covmats = cov.fit_transform(x, labels) + assert cov.get_params() == dict( + nfilter=nfilter, + applyfilters=True, + classes=None, + estimator="scm", + xdawn_estimator="scm", + baseline_cov=None, + ) + covsize = 2 * (n_classes * nfilter) + assert covmats.shape == (n_trials, covsize, covsize) + assert is_spsd(covmats) + + +def test_xdawn_covariances_applyfilters(rndstate, get_labels, is_spsd): + n_classes, nfilter = 2, 2 + n_trials, n_channels, n_times = 4, 6, 100 + x = rndstate.randn(n_trials, n_channels, n_times) + labels = get_labels(n_trials, n_classes) + cov = XdawnCovariances(nfilter=nfilter, applyfilters=False) + covmats = cov.fit_transform(x, labels) + covsize = n_classes * nfilter + n_channels + assert covmats.shape == (n_trials, covsize, covsize) + assert is_spsd(covmats) + + +def test_cosp_covariances(rndstate, is_spsd): """Test fit CospCovariances""" - x = np.random.randn(2, 3, 1000) + n_trials, n_channels, n_times = 2, 3, 1000 + x = rndstate.randn(n_trials, n_channels, n_times) cov = CospCovariances() cov.fit(x) - cov.fit_transform(x) - assert cov.get_params() == dict(window=128, overlap=0.75, fmin=None, - fmax=None, fs=None) + covmats = cov.transform(x) + assert cov.get_params() == dict( + window=128, overlap=0.75, fmin=None, fmax=None, fs=None + ) + n_freqs = 65 + assert covmats.shape == (n_trials, n_channels, n_channels, n_freqs) + assert is_spsd(covmats.transpose(0, 3, 1, 2)) -@pytest.mark.parametrize('coh', - ['ordinary', 'instantaneous', 'lagged', 'imaginary'] -) -def test_coherences(coh): + +@pytest.mark.parametrize("coh", coh) +def test_coherences(coh, rndstate, is_spsd): """Test fit Coherences""" - rs = np.random.RandomState(42) - n_trials, n_channels, n_times = 10, 3, 1000 - x = rs.randn(n_trials, n_channels, n_times) + n_trials, n_channels, n_times = 2, 5, 200 + x = rndstate.randn(n_trials, n_channels, n_times) cov = Coherences(coh=coh) cov.fit(x) - cov.fit_transform(x) - assert cov.get_params() == dict(window=128, overlap=0.75, fmin=None, - fmax=None, fs=None, coh=coh) + covmats = cov.fit_transform(x) + assert cov.get_params() == dict( + window=128, overlap=0.75, fmin=None, fmax=None, fs=None, coh=coh + ) + n_freqs = 65 + assert covmats.shape == (n_trials, n_channels, n_channels, n_freqs) + if coh in ["ordinary", "instantaneous"]: + assert is_spsd(covmats.transpose(0, 3, 1, 2)) -def test_shrinkage(): +@pytest.mark.parametrize("shrinkage", [0.1, 0.9]) +def test_shrinkage(shrinkage, rndstate, is_spd): """Test Shrinkage""" - x = np.random.randn(2, 3, 100) - cov = Covariances() - covs = cov.fit_transform(x) - sh = Shrinkage() - sh.fit(covs) - sh.transform(covs) - assert sh.get_params() == dict(shrinkage=0.1) + n_trials, n_channels, n_times = 2, 3, 100 + x = rndstate.randn(n_trials, n_channels, n_times) + covmats = Covariances().fit_transform(x) + sh = Shrinkage(shrinkage=shrinkage) + covmats = sh.fit(covmats).transform(covmats) + assert sh.get_params() == dict(shrinkage=shrinkage) + assert covmats.shape == (n_trials, n_channels, n_channels) + assert is_spd(covmats) diff --git a/tests/test_preprocessing.py b/tests/test_preprocessing.py index 9f7d30aa..4f5e29c7 100644 --- a/tests/test_preprocessing.py +++ b/tests/test_preprocessing.py @@ -1,125 +1,99 @@ -import pytest - import numpy as np -from numpy.testing import assert_array_equal, assert_array_almost_equal - +from numpy.testing import assert_array_almost_equal from pyriemann.spatialfilters import Whitening +import pytest -def generate_cov(Nt, Ne): - """Generate a set of cavariances matrices for test purpose""" - rs = np.random.RandomState(1234) - diags = 2.0 + 0.1 * rs.randn(Nt, Ne) - A = 2*rs.rand(Ne, Ne) - 1 - A /= np.atleast_2d(np.sqrt(np.sum(A**2, 1))).T - covmats = np.empty((Nt, Ne, Ne)) - for i in range(Nt): - covmats[i] = np.dot(np.dot(A, np.diag(diags[i])), A.T) - return covmats - +n_components = 3 +expl_var = 0.9 +max_cond = 10 +dim_red = [ + None, + {"n_components": n_components}, + {"expl_var": expl_var}, + {"max_cond": max_cond}, +] -def test_whitening(): - """Test Whitening""" - n_trials, n_channels, n_components = 20, 6, 3 - cov = generate_cov(n_trials, n_channels) - rs = np.random.RandomState(1234) - w = rs.rand(n_trials) - max_cond = 10 - # Test Init +def test_whitening_init(): whit = Whitening() - assert whit.metric == 'euclid' + assert whit.metric == "euclid" assert whit.dim_red is None assert not whit.verbose + +def test_whitening_error(rndstate, get_covmats): + """Test Whitening""" + n_trials, n_channels = 20, 6 + cov = get_covmats(n_trials, n_channels) # Test Fit with pytest.raises(ValueError): # len dim_red not equal to 1 - Whitening(dim_red={'n_components': 2, 'expl_var': 0.5}).fit(cov) + Whitening(dim_red={"n_components": 2, "expl_var": 0.5}).fit(cov) with pytest.raises(ValueError): # n_components not superior to 1 - Whitening(dim_red={'n_components': 0}).fit(cov) + Whitening(dim_red={"n_components": 0}).fit(cov) with pytest.raises(ValueError): # n_components not a int - Whitening(dim_red={'n_components': 2.5}).fit(cov) + Whitening(dim_red={"n_components": 2.5}).fit(cov) with pytest.raises(ValueError): # expl_var out of bound - Whitening(dim_red={'expl_var': 0}).fit(cov) + Whitening(dim_red={"expl_var": 0}).fit(cov) with pytest.raises(ValueError): # expl_var out of bound - Whitening(dim_red={'expl_var': 1.1}).fit(cov) + Whitening(dim_red={"expl_var": 1.1}).fit(cov) with pytest.raises(ValueError): # max_cond not strictly superior to 1 - Whitening(dim_red={'max_cond': 1}).fit(cov) + Whitening(dim_red={"max_cond": 1}).fit(cov) with pytest.raises(ValueError): # unknown key - Whitening(dim_red={'abc': 42}).fit(cov) + Whitening(dim_red={"abc": 42}).fit(cov) with pytest.raises(ValueError): # unknown type - Whitening(dim_red='max_cond').fit(cov) + Whitening(dim_red="max_cond").fit(cov) with pytest.raises(ValueError): # unknown type Whitening(dim_red=20).fit(cov) - whit = Whitening().fit(cov, sample_weight=w) - assert whit.n_components_ == n_channels - assert_array_equal(whit.filters_.shape, [n_channels, n_channels]) - assert_array_equal(whit.inv_filters_.shape, [n_channels, n_channels]) - - whit = Whitening( - dim_red={'n_components': n_components} - ).fit(cov, sample_weight=w) - assert whit.n_components_ == n_components - assert_array_equal(whit.filters_.shape, [n_channels, n_components]) - assert_array_equal(whit.inv_filters_.shape, [n_components, n_channels]) - - whit = Whitening(dim_red={'expl_var': 0.9}).fit(cov, sample_weight=w) - assert whit.n_components_ <= n_channels - assert_array_equal(whit.filters_.shape, [n_channels, whit.n_components_]) - assert_array_equal(whit.inv_filters_.shape, - [whit.n_components_, n_channels]) - - whit = Whitening(dim_red={'max_cond': max_cond}).fit(cov, sample_weight=w) - assert whit.n_components_ <= n_channels - assert_array_equal(whit.filters_.shape, [n_channels, whit.n_components_]) - assert_array_equal(whit.inv_filters_.shape, - [whit.n_components_, n_channels]) +@pytest.mark.parametrize("dim_red", dim_red) +def test_whitening_dimred(dim_red, rndstate, get_covmats): + """Test Whitening""" + n_trials, n_channels = 20, 6 + cov = get_covmats(n_trials, n_channels) + + w = rndstate.rand(n_trials) + whit = Whitening(dim_red=dim_red).fit(cov, sample_weight=w) + if dim_red is None: + n_comp = n_channels + else: + n_comp = whit.n_components_ + + if dim_red and list(dim_red.keys())[0] in ["expl_var", "max_cond"]: + assert whit.n_components_ <= n_channels + else: + assert whit.n_components_ == n_comp + assert whit.filters_.shape == (n_channels, n_comp) + assert whit.inv_filters_.shape == (n_comp, n_channels) + + +@pytest.mark.parametrize("dim_red", dim_red) +def test_whitening_transform(dim_red, rndstate, get_covmats): + """Test Whitening""" + n_trials, n_channels = 20, 6 + cov = get_covmats(n_trials, n_channels) # Test transform whit = Whitening().fit(cov) cov_w = whit.transform(cov) - assert_array_equal(cov_w.shape, [n_trials, n_channels, n_channels]) - # after whitening, mean = identity - assert_array_almost_equal(cov_w.mean(axis=0), np.eye(n_channels)) - - whit = Whitening(dim_red={'n_components': n_components}).fit(cov) - cov_w = whit.transform(cov) - n_components_ = whit.n_components_ - assert_array_equal(cov_w.shape, [n_trials, n_components_, n_components_]) - # after whitening, mean = identity - assert_array_almost_equal(cov_w.mean(axis=0), np.eye(n_components_)) - - whit = Whitening(dim_red={'expl_var': 0.9}).fit(cov) - cov_w = whit.transform(cov) - n_components_ = whit.n_components_ - assert_array_equal(cov_w.shape, [n_trials, n_components_, n_components_]) - # after whitening, mean = identity - assert_array_almost_equal(cov_w.mean(axis=0), np.eye(n_components_)) - - whit = Whitening(dim_red={'max_cond': max_cond}).fit(cov) - cov_w = whit.transform(cov) - n_components_ = whit.n_components_ - assert_array_equal(cov_w.shape, [n_trials, n_components_, n_components_]) + if dim_red is None: + n_comp = n_channels + else: + n_comp = whit.n_components_ + assert cov_w.shape == (n_trials, n_comp, n_comp) # after whitening, mean = identity - mean = cov_w.mean(axis=0) - assert_array_almost_equal(mean, np.eye(n_components_)) - assert np.linalg.cond(mean) <= max_cond - - # Test inverse_transform - whit = Whitening().fit(cov) - cov_iw = whit.inverse_transform(whit.transform(cov)) - assert_array_equal(cov_iw.shape, [n_trials, n_channels, n_channels]) - assert_array_almost_equal(cov, cov_iw) + assert_array_almost_equal(cov_w.mean(axis=0), np.eye(n_comp)) + if dim_red is not None and "max_cond" in dim_red.keys(): + assert np.linalg.cond(cov_w.mean(axis=0)) <= max_cond - whit = Whitening(dim_red={'n_components': n_components}).fit(cov) - cov_iw = whit.inverse_transform(whit.transform(cov)) - assert_array_equal(cov_iw.shape, [n_trials, n_channels, n_channels]) - - whit = Whitening(dim_red={'expl_var': 0.9}).fit(cov) - cov_iw = whit.inverse_transform(whit.transform(cov)) - assert_array_equal(cov_iw.shape, [n_trials, n_channels, n_channels]) - whit = Whitening(dim_red={'max_cond': max_cond}).fit(cov) +@pytest.mark.parametrize("dim_red", dim_red) +def test_whitening_inverse_transform(dim_red, rndstate, get_covmats): + """Test Whitening inverse transform""" + n_trials, n_channels = 20, 6 + cov = get_covmats(n_trials, n_channels) + whit = Whitening(dim_red=dim_red).fit(cov) cov_iw = whit.inverse_transform(whit.transform(cov)) - assert_array_equal(cov_iw.shape, [n_trials, n_channels, n_channels]) + assert cov_iw.shape == (n_trials, n_channels, n_channels) + if dim_red is None: + assert_array_almost_equal(cov, cov_iw) diff --git a/tests/test_spatialfilters.py b/tests/test_spatialfilters.py index 401b342f..d50bc9d0 100644 --- a/tests/test_spatialfilters.py +++ b/tests/test_spatialfilters.py @@ -1,231 +1,292 @@ +from conftest import get_metrics import pytest import numpy as np from numpy.testing import assert_array_equal -from pyriemann.spatialfilters import (Xdawn, CSP, SPoC, - BilinearFilter, AJDC) - - -def generate_cov(Nt, Ne): - """Generate a set of cavariances matrices for test purpose""" - rs = np.random.RandomState(1234) - diags = 2.0 + 0.1 * rs.randn(Nt, Ne) - A = 2*rs.rand(Ne, Ne) - 1 - A /= np.atleast_2d(np.sqrt(np.sum(A**2, 1))).T - covmats = np.empty((Nt, Ne, Ne)) - for i in range(Nt): - covmats[i] = np.dot(np.dot(A, np.diag(diags[i])), A.T) - return covmats - - -def test_Xdawn_init(): - """Test init of Xdawn""" - _ = Xdawn() - - -def test_Xdawn_fit(): - """Test Fit of Xdawn""" - x = np.random.randn(100, 3, 10) - labels = np.array([0, 1]).repeat(50) - xd = Xdawn() - xd.fit(x, labels) - - -def test_Xdawn_transform(): - """Test transform of Xdawn""" - x = np.random.randn(100, 3, 10) - labels = np.array([0, 1]).repeat(50) - xd = Xdawn() - xd.fit(x, labels) - xd.transform(x) - - -def test_Xdawn_baselinecov(): +from pyriemann.spatialfilters import Xdawn, CSP, SPoC, BilinearFilter, AJDC + + +@pytest.mark.parametrize("spfilt", [Xdawn, CSP, SPoC, BilinearFilter, AJDC]) +class SpatialFiltersTestCase: + def test_two_classes(self, spfilt, get_covmats, rndstate, get_labels): + n_classes = 2 + n_trials, n_channels, n_times = 8, 3, 512 + labels = get_labels(n_trials, n_classes) + if spfilt is Xdawn: + X = rndstate.randn(n_trials, n_channels, n_times) + elif spfilt in (CSP, SPoC, BilinearFilter): + X = get_covmats(n_trials, n_channels) + elif spfilt is AJDC: + n_subjects, n_conditions = 2, 2 + X = rndstate.randn(n_subjects, n_conditions, n_channels, n_times) + + self.clf_fit(spfilt, X, labels, n_channels, n_times) + self.clf_fit_independence(spfilt, X, labels, n_channels, n_times) + if spfilt is CSP: + self.clf_fit_error(spfilt, X, labels) + self.clf_transform(spfilt, X, labels, n_trials, n_channels, n_times) + if spfilt in (CSP, SPoC, BilinearFilter): + self.clf_transform_error(spfilt, X, labels, n_channels) + + def test_three_classes(self, spfilt, get_covmats, rndstate, get_labels): + n_classes = 3 + n_trials, n_channels, n_times = 6, 3, 512 + labels = get_labels(n_trials, n_classes) + if spfilt is Xdawn: + X = rndstate.randn(n_trials, n_channels, n_times) + elif spfilt in (CSP, SPoC, BilinearFilter): + X = get_covmats(n_trials, n_channels) + elif spfilt is AJDC: + n_subjects, n_conditions = 2, 2 + X = rndstate.randn(n_subjects, n_conditions, n_channels, n_times) + + self.clf_fit(spfilt, X, labels, n_channels, n_times) + self.clf_fit_independence(spfilt, X, labels, n_channels, n_times) + if spfilt is CSP: + self.clf_fit_error(spfilt, X, labels) + self.clf_transform(spfilt, X, labels, n_trials, n_channels, n_times) + if spfilt in (CSP, SPoC, BilinearFilter): + self.clf_transform_error(spfilt, X, labels, n_channels) + + +class TestSpatialFilters(SpatialFiltersTestCase): + def clf_fit(self, spfilt, X, labels, n_channels, n_times): + n_classes = len(np.unique(labels)) + if spfilt is BilinearFilter: + filters = np.eye(n_channels) + sf = spfilt(filters) + else: + sf = spfilt() + sf.fit(X, labels) + if spfilt is Xdawn: + assert len(sf.classes_) == n_classes + assert sf.filters_.shape == (n_classes * n_channels, n_channels) + for sfilt in sf.filters_: + assert sfilt.shape == (n_channels,) + elif spfilt in [CSP, SPoC]: + assert sf.filters_.shape == (n_channels, n_channels) + elif spfilt is AJDC: + assert sf.forward_filters_.shape == (sf.n_sources_, n_channels) + + def clf_fit_error(self, spfilt, X, labels): + sf = spfilt() + with pytest.raises(ValueError): + sf.fit(X, labels * 0.0) # 1 class + with pytest.raises(ValueError): + sf.fit(X, labels[:1]) # unequal # of samples + with pytest.raises(TypeError): + sf.fit(X, "foo") # y must be an array + with pytest.raises(TypeError): + sf.fit("foo", labels) # X must be an array + with pytest.raises(ValueError): + sf.fit(X[:, 0], labels) + with pytest.raises(ValueError): + sf.fit(X, X) + + def clf_transform(self, spfilt, X, labels, n_trials, n_channels, n_times): + n_classes = len(np.unique(labels)) + if spfilt is BilinearFilter: + filters = np.eye(n_channels) + sf = spfilt(filters) + else: + sf = spfilt() + if spfilt is AJDC: + sf.fit(X, labels) + X_new = np.squeeze(X[0]) + n_trials = X_new.shape[0] + Xtr = sf.transform(X_new) + else: + Xtr = sf.fit(X, labels).transform(X) + if spfilt is Xdawn: + n_comp = n_classes * n_channels + assert Xtr.shape == (n_trials, n_comp, n_times) + elif spfilt is BilinearFilter: + assert Xtr.shape == (n_trials, n_channels, n_channels) + elif spfilt is AJDC: + assert Xtr.shape == (n_trials, n_channels, n_times) + else: + assert Xtr.shape == (n_trials, n_channels) + + def clf_transform_error(self, spfilt, X, labels, n_channels): + if spfilt is BilinearFilter: + filters = np.eye(n_channels) + sf = spfilt(filters) + else: + sf = spfilt() + with pytest.raises(ValueError): + sf.fit(X, labels).transform(X[:, :-1, :-1]) + + def clf_fit_independence(self, spfilt, X, labels, n_channels, n_times): + if spfilt is BilinearFilter: + filters = np.eye(n_channels) + sf = spfilt(filters) + else: + sf = spfilt() + sf.fit(X, labels) + if spfilt is Xdawn: + X_new = X[:, :-1, :] + elif spfilt in (CSP, SPoC, BilinearFilter): + X_new = X[:, :-1, :-1] + elif spfilt is AJDC: + X_new = X[:, :, :-1, :] + # retraining with different size should erase previous fit + sf.fit(X_new, labels) + + +def test_Xdawn_baselinecov(rndstate, get_labels): """Test cov precomputation""" - x = np.random.randn(100, 3, 10) - labels = np.array([0, 1]).repeat(50) - baseline_cov = np.identity(3) + n_trials, n_channels, n_times = 6, 5, 100 + n_classes, default_nfilter = 2, 4 + x = rndstate.randn(n_trials, n_channels, n_times) + labels = get_labels(n_trials, n_classes) + baseline_cov = np.identity(n_channels) xd = Xdawn(baseline_cov=baseline_cov) - xd.fit(x, labels) - xd.transform(x) - - -def test_CSP(): - """Test CSP""" - n_trials = 90 - X = generate_cov(n_trials, 3) - labels = np.array([0, 1, 2]).repeat(n_trials // 3) - - # Test Init - csp = CSP() - assert csp.nfilter == 4 - assert csp.metric == 'euclid' - assert csp.log - csp = CSP(3, 'riemann', False) - assert csp.nfilter == 3 - assert csp.metric == 'riemann' - assert not csp.log - - with pytest.raises(TypeError): - CSP('foo') - - with pytest.raises(ValueError): - CSP(metric='foo') - - with pytest.raises(TypeError): - CSP(log='foo') - - # Test fit - csp = CSP() - csp.fit(X, labels % 2) # two classes - csp.fit(X, labels) # 3 classes - - with pytest.raises(ValueError): - csp.fit(X, labels * 0.) # 1 class - with pytest.raises(ValueError): - csp.fit(X, labels[:1]) # unequal # of samples - with pytest.raises(TypeError): - csp.fit(X, 'foo') # y must be an array - with pytest.raises(TypeError): - csp.fit('foo', labels) # X must be an array - with pytest.raises(ValueError): - csp.fit(X[:, 0], labels) - with pytest.raises(ValueError): - csp.fit(X, X) - - assert_array_equal(csp.filters_.shape, [X.shape[1], X.shape[1]]) - assert_array_equal(csp.patterns_.shape, [X.shape[1], X.shape[1]]) - - # Test transform - Xt = csp.transform(X) - assert_array_equal(Xt.shape, [len(X), X.shape[1]]) - - with pytest.raises(TypeError): - csp.transform('foo') - with pytest.raises(ValueError): - csp.transform(X[:, 1:, :]) # unequal # of chans - - csp.log = False - Xt = csp.transform(X) - - -def test_Spoc(): - """Test Spoc""" - n_trials = 90 - X = generate_cov(n_trials, 3) - labels = np.random.randn(n_trials) - - # Test Init - spoc = SPoC() - - # Test fit - spoc.fit(X, labels) - - -def test_BilinearFilter(): - """Test Bilinear filter""" - n_trials = 90 - X = generate_cov(n_trials, 3) - labels = np.array([0, 1, 2]).repeat(n_trials // 3) - filters = np.eye(3) - # Test Init - bf = BilinearFilter(filters) - assert not bf.log + xd.fit(x, labels).transform(x) + assert len(xd.filters_) == n_classes * default_nfilter + for sfilt in xd.filters_: + assert sfilt.shape == (n_channels,) + + +@pytest.mark.parametrize("nfilter", [3, 4]) +@pytest.mark.parametrize("metric", get_metrics()) +@pytest.mark.parametrize("log", [True, False]) +def test_CSP_init(nfilter, metric, log, get_covmats, get_labels): + n_classes, n_trials, n_channels = 2, 6, 3 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) + csp = CSP(nfilter=nfilter, metric=metric, log=log) + csp.fit(covmats, labels) + Xtr = csp.transform(covmats) + if log: + assert Xtr.shape == (n_trials, n_channels) + else: + assert Xtr.shape == (n_trials, n_channels, n_channels) + assert csp.filters_.shape == (n_channels, n_channels) + assert csp.patterns_.shape == (n_channels, n_channels) + + +def test_BilinearFilter_filter_error(): with pytest.raises(TypeError): - BilinearFilter('foo') - - with pytest.raises(TypeError): - BilinearFilter(np.eye(3), log='foo') - - # test fit - bf = BilinearFilter(filters) - bf.fit(X, labels % 2) + BilinearFilter("foo") - # Test transform - Xt = bf.transform(X) - assert_array_equal(Xt.shape, [len(X), filters.shape[0], filters.shape[0]]) +def test_BilinearFilter_log_error(): with pytest.raises(TypeError): - bf.transform('foo') - with pytest.raises(ValueError): - bf.transform(X[:, 1:, :]) # unequal # of chans + BilinearFilter(np.eye(3), log="foo") - bf.log = True - Xt = bf.transform(X) - assert_array_equal(Xt.shape, [len(X), filters.shape[0]]) - filters = filters[0:2, :] - bf = BilinearFilter(filters) - Xt = bf.transform(X) - assert_array_equal(Xt.shape, [len(X), filters.shape[0], filters.shape[0]]) +@pytest.mark.parametrize("log", [True, False]) +def test_BilinearFilter_log(log, get_covmats, get_labels): + n_classes, n_trials, n_channels = 2, 6, 3 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) + bf = BilinearFilter(np.eye(n_channels), log=log) + Xtr = bf.fit(covmats, labels).transform(covmats) + if log: + assert Xtr.shape == (n_trials, n_channels) + else: + assert Xtr.shape == (n_trials, n_channels, n_channels) -def test_AJDC(): - """Test AJDC""" - rs = np.random.RandomState(33) - n_subjects, n_conditions, n_channels, n_times = 5, 3, 8, 512 - X = rs.randn(n_subjects, n_conditions, n_channels, n_times) - - # Test Init +def test_AJDC_init(): ajdc = AJDC(fmin=1, fmax=32, fs=64) assert ajdc.window == 128 assert ajdc.overlap == 0.5 assert ajdc.dim_red is None assert ajdc.verbose - # Test fit - ajdc.fit(X) - assert ajdc.n_channels_ == n_channels - assert ajdc.n_sources_ <= n_channels - assert_array_equal(ajdc.forward_filters_.shape, - [ajdc.n_sources_, n_channels]) - assert_array_equal(ajdc.backward_filters_.shape, - [n_channels, ajdc.n_sources_]) + +def test_AJDC_fit(rndstate): + n_subjects, n_conditions, n_channels, n_times = 5, 3, 8, 512 + X = rndstate.randn(n_subjects, n_conditions, n_channels, n_times) + ajdc = AJDC().fit(X) + assert ajdc.forward_filters_.shape == (ajdc.n_sources_, n_channels) + assert ajdc.backward_filters_.shape == (n_channels, ajdc.n_sources_) + + +def test_AJDC_fit_error(rndstate): + n_conditions, n_channels, n_times = 3, 8, 512 + ajdc = AJDC() with pytest.raises(ValueError): # unequal # of conditions - ajdc.fit([rs.randn(n_conditions, n_channels, n_times), - rs.randn(n_conditions + 1, n_channels, n_times)]) + ajdc.fit( + [ + rndstate.randn(n_conditions, n_channels, n_times), + rndstate.randn(n_conditions + 1, n_channels, n_times), + ] + ) with pytest.raises(ValueError): # unequal # of channels - ajdc.fit([rs.randn(n_conditions, n_channels, n_times), - rs.randn(n_conditions, n_channels + 1, n_times)]) + ajdc.fit( + [ + rndstate.randn(n_conditions, n_channels, n_times), + rndstate.randn(n_conditions, n_channels + 1, n_times), + ] + ) + + +def test_AJDC_transform_error(rndstate): + n_subjects, n_conditions, n_channels, n_times = 2, 2, 3, 256 + X = rndstate.randn(n_subjects, n_conditions, n_channels, n_times) + ajdc = AJDC().fit(X) + n_trials = 4 + X_new = rndstate.randn(n_trials, n_channels, n_times) + with pytest.raises(ValueError): # not 3 dims + ajdc.transform(X_new[0]) + with pytest.raises(ValueError): # unequal # of chans + ajdc.transform(rndstate.randn(n_trials, n_channels + 1, 1)) + + +def test_AJDC_fit_variable_input(rndstate): + n_subjects, n_cond, n_chan, n_times = 2, 2, 3, 256 + X = rndstate.randn(n_subjects, n_cond, n_chan, n_times) + ajdc = AJDC() # 3 subjects, same # conditions and channels, different # of times - X = [rs.randn(n_conditions, n_channels, n_times), - rs.randn(n_conditions, n_channels, n_times + 200), - rs.randn(n_conditions, n_channels, n_times + 500)] + X = [ + rndstate.randn(n_cond, n_chan, n_times + rndstate.randint(500)) + for _ in range(3) + ] ajdc.fit(X) + # 2 subjects, 2 conditions, same # channels, different # of times - X = [[rs.randn(n_channels, n_times), - rs.randn(n_channels, n_times + 200)], - [rs.randn(n_channels, n_times + 500), - rs.randn(n_channels, n_times + 100)]] + X = [ + [rndstate.randn(n_chan, n_times + rndstate.randint(500)) + for _ in range(2)], + [rndstate.randn(n_chan, n_times + rndstate.randint(500)) + for _ in range(2)], + ] ajdc.fit(X) - # Test transform - n_trials = 4 - X = rs.randn(n_trials, n_channels, n_times) - Xt = ajdc.transform(X) - assert_array_equal(Xt.shape, [n_trials, ajdc.n_sources_, n_times]) - with pytest.raises(ValueError): # not 3 dims - ajdc.transform(X[0]) - with pytest.raises(ValueError): # unequal # of chans - ajdc.transform(rs.randn(n_trials, n_channels + 1, 1)) - # Test inverse_transform - Xtb = ajdc.inverse_transform(Xt) - assert_array_equal(Xtb.shape, [n_trials, n_channels, n_times]) +def test_AJDC_inverse_transform(rndstate): + n_subjects, n_conditions, n_channels, n_times = 2, 2, 3, 256 + X = rndstate.randn(n_subjects, n_conditions, n_channels, n_times) + ajdc = AJDC().fit(X) + n_trials = 4 + X_new = rndstate.randn(n_trials, n_channels, n_times) + Xt = ajdc.transform(X_new) + Xit = ajdc.inverse_transform(Xt) + assert_array_equal(Xit.shape, [n_trials, n_channels, n_times]) with pytest.raises(ValueError): # not 3 dims ajdc.inverse_transform(Xt[0]) with pytest.raises(ValueError): # unequal # of sources - ajdc.inverse_transform(rs.randn(n_trials, ajdc.n_sources_ + 1, 1)) + shape = (n_trials, ajdc.n_sources_ + 1, 1) + ajdc.inverse_transform(rndstate.randn(*shape)) - Xtb = ajdc.inverse_transform(Xt, supp=[ajdc.n_sources_ - 1]) - assert_array_equal(Xtb.shape, [n_trials, n_channels, n_times]) + Xit = ajdc.inverse_transform(Xt, supp=[ajdc.n_sources_ - 1]) + assert Xit.shape == (n_trials, n_channels, n_times) with pytest.raises(ValueError): # not a list ajdc.inverse_transform(Xt, supp=1) + +def test_AJDC_expl_var(rndstate): # Test get_src_expl_var - v = ajdc.get_src_expl_var(X) - assert_array_equal(v.shape, [n_trials, ajdc.n_sources_]) + n_subjects, n_conditions, n_channels, n_times = 2, 2, 3, 256 + X = rndstate.randn(n_subjects, n_conditions, n_channels, n_times) + ajdc = AJDC().fit(X) + n_trials = 4 + X_new = rndstate.randn(n_trials, n_channels, n_times) + v = ajdc.get_src_expl_var(X_new) + assert v.shape == (n_trials, ajdc.n_sources_) with pytest.raises(ValueError): # not 3 dims - ajdc.get_src_expl_var(X[0]) + ajdc.get_src_expl_var(X_new[0]) with pytest.raises(ValueError): # unequal # of chans - ajdc.get_src_expl_var(rs.randn(n_trials, n_channels + 1, 1)) + ajdc.get_src_expl_var(rndstate.randn(n_trials, n_channels + 1, 1)) diff --git a/tests/test_stats.py b/tests/test_stats.py index 8d945e90..7a09bfd2 100644 --- a/tests/test_stats.py +++ b/tests/test_stats.py @@ -1,27 +1,31 @@ -from conftest import covmats, requires_matplotlib # noqa: F401 +from conftest import requires_matplotlib import numpy as np from pyriemann.stats import PermutationDistance, PermutationModel from pyriemann.spatialfilters import CSP import pytest -def test_permutation_badmode(covmats): # noqa: F811 +def test_permutation_badmode(): """Test one way permutation test""" with pytest.raises(ValueError): PermutationDistance(mode="badmode") @pytest.mark.parametrize("mode", ["ttest", "ftest"]) -def test_permutation_mode(mode, covmats): # noqa: F811 +def test_permutation_mode(mode, get_covmats, get_labels): """Test one way permutation test""" - labels = np.array([0, 1]).repeat(3) + n_trials, n_channels, n_classes = 6, 3, 2 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) p = PermutationDistance(100, mode=mode) p.test(covmats, labels) -def test_permutation_pairwise(covmats): # noqa: F811 +def test_permutation_pairwise(get_covmats, get_labels): """Test one way permutation pairwise test""" - labels = np.array([0, 1]).repeat(3) + n_trials, n_channels, n_classes = 6, 3, 2 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) groups = np.array([0] * 3 + [1] * 3) # pairwise p = PermutationDistance(100, mode="pairwise") @@ -30,34 +34,42 @@ def test_permutation_pairwise(covmats): # noqa: F811 p.test(covmats, labels, groups=groups) -def test_permutation_pairwise_estimator(covmats): # noqa: F811 +def test_permutation_pairwise_estimator(get_covmats, get_labels): """Test one way permutation with estimator""" - labels = np.array([0, 1]).repeat(3) + n_trials, n_channels, n_classes = 6, 3, 2 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) # with custom estimator p = PermutationDistance(10, mode="pairwise", estimator=CSP(2, log=False)) p.test(covmats, labels) -def test_permutation_pairwise_unique(covmats): # noqa: F811 +def test_permutation_pairwise_unique(get_covmats, get_labels): """Test one way permutation with estimator""" - labels = np.array([0, 1]).repeat(3) + n_trials, n_channels, n_classes = 6, 3, 2 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) # unique perms p = PermutationDistance(1000) p.test(covmats, labels) @requires_matplotlib -def test_permutation_pairwise_plot(covmats): # noqa: F811 +def test_permutation_pairwise_plot(get_covmats, get_labels): """Test one way permutation with estimator""" - labels = np.array([0, 1]).repeat(3) + n_trials, n_channels, n_classes = 6, 3, 2 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) p = PermutationDistance(100, mode="pairwise") p.test(covmats, labels) p.plot(nbins=2) -def test_permutation_model(covmats): # noqa: F811 +def test_permutation_model(get_covmats, get_labels): """Test one way permutation test""" - labels = np.array([0, 1]).repeat(3) + n_trials, n_channels, n_classes = 6, 3, 2 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) # pairwise p = PermutationModel(10) p.test(covmats, labels) diff --git a/tests/test_tangentspace.py b/tests/test_tangentspace.py index 21c3a84e..17f5e5cb 100644 --- a/tests/test_tangentspace.py +++ b/tests/test_tangentspace.py @@ -1,108 +1,110 @@ -"""Test tangent space functions.""" +from conftest import get_metrics import numpy as np -from numpy.testing import assert_array_almost_equal -import pytest - from pyriemann.tangentspace import TangentSpace, FGDA - - -def generate_cov(Nt, Ne): - """Generate a set of cavariances matrices for test purpose.""" - rs = np.random.RandomState(1234) - diags = 2.0 + 0.1 * rs.randn(Nt, Ne) - A = 2*rs.rand(Ne, Ne) - 1 - A /= np.atleast_2d(np.sqrt(np.sum(A**2, 1))).T - covmats = np.empty((Nt, Ne, Ne)) - for i in range(Nt): - covmats[i] = np.dot(np.dot(A, np.diag(diags[i])), A.T) - return covmats - - -def test_TangentSpace_init(): - """Test init of Tangent.""" - TangentSpace(metric='riemann') - - -def test_TangentSpace_fit(): - """Test Fit of Tangent Space.""" - covset = generate_cov(10, 3) - ts = TangentSpace(metric='riemann') - ts.fit(covset) - - -def test_TangentSpace_transform(): - """Test transform of Tangent Space.""" - covset = generate_cov(10, 3) - ts = TangentSpace(metric='riemann') - ts.fit(covset) - ts.transform(covset) - - -@pytest.mark.parametrize('shape', [(10, 9), (10, 9, 8), (10), (12, 8, 8)]) -def test_TangentSpace_transform_dim(shape): - """Test transform input shape, could be TS vector or covmat""" - n_trials, n_channels = 10, 3 - covset = generate_cov(n_trials, n_channels) - ts = TangentSpace(metric='riemann') - ts.fit(covset) - - X = np.zeros(shape=shape) +import pytest +from pytest import approx + + +@pytest.mark.parametrize("tspace", [TangentSpace, FGDA]) +class TangentSpaceTestCase: + def test_tangentspace(self, tspace, get_covmats, get_labels): + n_classes, n_trials, n_channels = 2, 6, 3 + covmats = get_covmats(n_trials, n_channels) + labels = get_labels(n_trials, n_classes) + self.clf_transform(tspace, covmats, labels) + self.clf_fit_transform(tspace, covmats, labels) + self.clf_fit_transform_independence(tspace, covmats, labels) + if tspace is TangentSpace: + self.clf_transform_wo_fit(tspace, covmats) + self.clf_inversetransform(tspace, covmats) + + +class TestTangentSpace(TangentSpaceTestCase): + def clf_transform(self, tspace, covmats, labels): + n_trials, n_channels, n_channels = covmats.shape + ts = tspace().fit(covmats, labels) + Xtr = ts.transform(covmats) + if tspace is TangentSpace: + n_ts = (n_channels * (n_channels + 1)) // 2 + assert Xtr.shape == (n_trials, n_ts) + else: + assert Xtr.shape == (n_trials, n_channels, n_channels) + + def clf_fit_transform(self, tspace, covmats, labels): + n_trials, n_channels, n_channels = covmats.shape + ts = tspace() + Xtr = ts.fit_transform(covmats, labels) + if tspace is TangentSpace: + n_ts = (n_channels * (n_channels + 1)) // 2 + assert Xtr.shape == (n_trials, n_ts) + else: + assert Xtr.shape == (n_trials, n_channels, n_channels) + + def clf_fit_transform_independence(self, tspace, covmats, labels): + n_trials, n_channels, n_channels = covmats.shape + ts = tspace() + ts.fit(covmats, labels) + # retraining with different size should erase previous fit + new_covmats = covmats[:, :-1, :-1] + ts.fit(new_covmats, labels) + # fit_transform should work as well + ts.fit_transform(covmats, labels) + + def clf_transform_wo_fit(self, tspace, covmats): + n_trials, n_channels, n_channels = covmats.shape + n_ts = (n_channels * (n_channels + 1)) // 2 + ts = tspace() + Xtr = ts.transform(covmats) + assert Xtr.shape == (n_trials, n_ts) + + def clf_inversetransform(self, tspace, covmats): + n_trials, n_channels, n_channels = covmats.shape + ts = tspace().fit(covmats) + Xtr = ts.transform(covmats) + covinv = ts.inverse_transform(Xtr) + assert covinv == approx(covmats) + + +@pytest.mark.parametrize("fit", [True, False]) +@pytest.mark.parametrize("tsupdate", [True, False]) +@pytest.mark.parametrize("metric", get_metrics()) +def test_TangentSpace_init(fit, tsupdate, metric, get_covmats): + n_trials, n_channels = 4, 3 + n_ts = (n_channels * (n_channels + 1)) // 2 + covmats = get_covmats(n_trials, n_channels) + ts = TangentSpace(metric=metric, tsupdate=tsupdate) + if fit: + ts.fit(covmats) + Xtr = ts.transform(covmats) + assert Xtr.shape == (n_trials, n_ts) + + +@pytest.mark.parametrize("tsupdate", [True, False]) +@pytest.mark.parametrize("metric", get_metrics()) +def test_FGDA_init(tsupdate, metric, get_covmats, get_labels): + n_classes, n_trials, n_channels = 2, 6, 3 + labels = get_labels(n_trials, n_classes) + covmats = get_covmats(n_trials, n_channels) + ts = FGDA(metric=metric, tsupdate=tsupdate) + ts.fit(covmats, labels) + Xtr = ts.transform(covmats) + assert Xtr.shape == (n_trials, n_channels, n_channels) + + +def test_TS_vecdim_error(get_covmats, rndstate): + n_trials, n_ts = 4, 6 + ts = TangentSpace() with pytest.raises(ValueError): - ts.transform(X) + tsvectors_wrong = np.empty((n_trials, n_ts + 1)) + ts.transform(tsvectors_wrong) -def test_TangentSpace_transform_without_fit(): - """Test transform of Tangent Space without fit.""" - covset = generate_cov(10, 3) - ts = TangentSpace(metric='riemann') - ts.transform(covset) - - -def test_TangentSpace_transform_with_ts_update(): - """Test transform of Tangent Space with TSupdate.""" - covset = generate_cov(10, 3) - ts = TangentSpace(metric='riemann', tsupdate=True) - ts.fit(covset) - ts.transform(covset) - - -def test_TangentSpace_inversetransform(): - """Test inverse transform of Tangent Space.""" - covset = generate_cov(10, 3) - ts = TangentSpace(metric='riemann') - ts.fit(covset) - t = ts.transform(covset) - cov = ts.inverse_transform(t) - assert_array_almost_equal(covset, cov) - - -def test_TangentSpace_inversetransform_without_fit(): - """Test inverse transform of Tangent Space without fit.""" - covset = generate_cov(10, 3) - ts = TangentSpace(metric='identity') - tsv = ts.fit_transform(covset) - ts = TangentSpace(metric='riemann') - cov = ts.inverse_transform(tsv) - assert_array_almost_equal(covset, cov) - - -def test_FGDA_init(): - """Test init of FGDA.""" - FGDA(metric='riemann') - - -def test_FGDA_fit(): - """Test Fit of FGDA.""" - covset = generate_cov(10, 3) - labels = np.array([0, 1]).repeat(5) - ts = FGDA(metric='riemann') - ts.fit(covset, labels) - - -def test_FGDA_transform(): - """Test transform of FGDA.""" - covset = generate_cov(10, 3) - labels = np.array([0, 1]).repeat(5) - ts = FGDA(metric='riemann') - ts.fit(covset, labels) - ts.transform(covset) +def test_TS_matdim_error(get_covmats): + n_trials, n_channels = 4, 3 + ts = TangentSpace() + with pytest.raises(ValueError): + not_square_mat = np.empty((n_trials, n_channels, n_channels + 1)) + ts.transform(not_square_mat) + with pytest.raises(ValueError): + too_many_dim = np.empty((1, 2, 3, 4)) + ts.transform(too_many_dim) diff --git a/tests/test_utils_covariance.py b/tests/test_utils_covariance.py index bd61e219..30fcc938 100644 --- a/tests/test_utils_covariance.py +++ b/tests/test_utils_covariance.py @@ -1,4 +1,4 @@ -from numpy.testing import assert_array_almost_equal, assert_array_equal +from numpy.testing import assert_array_almost_equal import numpy as np from scipy.signal import welch, csd, coherence as coherence_sp import pytest @@ -12,11 +12,10 @@ 'estimator', ['oas', 'lwf', 'scm', 'corr', 'mcd', 'sch', np.cov, 'truc', None] ) -def test_covariances(estimator): +def test_covariances(estimator, rndstate): """Test covariance for multiple estimator""" - rs = np.random.RandomState(42) n_trials, n_channels, n_times = 2, 3, 100 - x = rs.randn(n_trials, n_channels, n_times) + x = rndstate.randn(n_trials, n_channels, n_times) if estimator is None: cov = covariances(x) assert cov.shape == (n_trials, n_channels, n_channels) @@ -31,43 +30,31 @@ def test_covariances(estimator): @pytest.mark.parametrize( 'estimator', ['oas', 'lwf', 'scm', 'corr', 'mcd', 'sch', None] ) -def test_covariances_EP(estimator): +def test_covariances_EP(estimator, rndstate): """Test covariance_EP for multiple estimator""" - rs = np.random.RandomState(42) - n_trials, n_channels, n_times = 2, 3, 100 - x = rs.randn(n_trials, n_channels, n_times) - p = rs.randn(n_channels, n_times) + n_trials, n_channels_x, n_channels_p, n_times = 2, 3, 3, 100 + x = rndstate.randn(n_trials, n_channels_x, n_times) + p = rndstate.randn(n_channels_p, n_times) if estimator is None: cov = covariances_EP(x, p) else: cov = covariances_EP(x, p, estimator=estimator) - assert cov.shape == (n_trials, 2 * n_channels, 2 * n_channels) - - -def test_covariances_EP_ntimes(): - """Test that prototype and signal have same time dimension""" - rs = np.random.RandomState(42) - n_trials, n_channels, n_times, n_times_P = 2, 3, 100, 101 - x = rs.randn(n_trials, n_channels, n_times) - p = rs.randn(n_channels, n_times_P) - with pytest.raises(ValueError): - covariances_EP(x, p) + n_dim_cov = n_channels_x + n_channels_p + assert cov.shape == (n_trials, n_dim_cov, n_dim_cov) -def test_covariances_eegtocov(): +def test_covariances_eegtocov(rndstate): """Test eegtocov""" - rs = np.random.RandomState(42) n_times, n_channels = 1000, 3 - x = rs.randn(n_times, n_channels) + x = rndstate.randn(n_times, n_channels) cov = eegtocov(x) assert cov.shape[1] == n_channels -def test_covariances_cross_spectrum(): - rs = np.random.RandomState(42) +def test_covariances_cross_spectrum(rndstate): n_channels, n_times = 3, 1000 - x = rs.randn(n_channels, n_times) + x = rndstate.randn(n_channels, n_times) cross_spectrum(x) cross_spectrum(x, fs=128, fmin=2, fmax=40) cross_spectrum(x, fs=129, window=37) @@ -103,7 +90,7 @@ def test_covariances_cross_spectrum(): np.zeros_like(c.imag.diagonal())) # test equivalence between pyriemann and scipy for (auto-)spectra - x = rs.randn(5, n_times) + x = rndstate.randn(5, n_times) fs, window, overlap = 128, 256, 0.75 spect_pr, freqs_pr = cross_spectrum( x, @@ -127,7 +114,7 @@ def test_covariances_cross_spectrum(): assert_array_almost_equal(spect_pr, spect_sp, 6) # test equivalence between pyriemann and scipy for cross-spectra - x = rs.randn(2, n_times) + x = rndstate.randn(2, n_times) fs, window, overlap = 64, 128, 0.5 cross_pr, freqs_pr = cross_spectrum( x, @@ -151,10 +138,9 @@ def test_covariances_cross_spectrum(): assert_array_almost_equal(cross_pr, cross_sp, 6) -def test_covariances_cospectrum(): +def test_covariances_cospectrum(rndstate): """Test cospectrum""" - rs = np.random.RandomState(42) - x = rs.randn(3, 1000) + x = rndstate.randn(3, 1000) cospectrum(x) cospectrum(x, fs=128, fmin=2, fmax=40) @@ -181,11 +167,10 @@ def test_covariances_cospectrum(): @pytest.mark.parametrize( 'coh', ['ordinary', 'instantaneous', 'lagged', 'imaginary', 'foobar'] ) -def test_covariances_coherence(coh): +def test_covariances_coherence(coh, rndstate): """Test coherence""" - rs = np.random.RandomState(42) n_channels, n_times = 3, 2048 - x = rs.randn(n_channels, n_times) + x = rndstate.randn(n_channels, n_times) if coh == 'foobar': with pytest.raises(ValueError): # unknown coh @@ -228,18 +213,19 @@ def test_covariances_coherence(coh): t = np.arange(0, n_periods, 1 / fs) n_times = t.shape[0] - x = np.zeros((4, len(t))) + x = np.empty((4, len(t))) noise = 1e-9 # reference channel: a pure sine + small noise (to avoid nan or inf) - x[0] = np.sin(2 * np.pi * ft * t) + noise * rs.randn((n_times)) + x[0] = np.sin(2 * np.pi * ft * t) + noise * rndstate.randn((n_times)) # pi/4 shifted channel = pi/4 lagged phase x[1] = np.sin(2 * np.pi * ft * t + np.pi / 4) \ - + noise * rs.randn((n_times)) + + noise * rndstate.randn((n_times)) # pi/2 shifted channel = quadrature phase x[2] = np.sin(2 * np.pi * ft * t + np.pi / 2) \ - + noise * rs.randn((n_times)) + + noise * rndstate.randn((n_times)) # pi shifted channel = opposite phase - x[3] = np.sin(2 * np.pi * ft * t + np.pi) + noise * rs.randn((n_times)) + x[3] = np.sin(2 * np.pi * ft * t + np.pi) \ + + noise * rndstate.randn((n_times)) c, freqs = coherence(x, fs=fs, window=fs, overlap=0.5, coh=coh) foi = (freqs == ft) @@ -271,26 +257,25 @@ def test_covariances_coherence(coh): assert c[0, 3, foi] == pytest.approx(0.0) -def test_normalize(): +def test_normalize(rndstate): """Test normalize""" - rs = np.random.RandomState(42) n_conds, n_trials, n_channels = 15, 20, 3 # test a 2d array, ie a single square matrix - mat = rs.randn(n_channels, n_channels) + mat = rndstate.randn(n_channels, n_channels) mat_n = normalize(mat, "trace") - assert_array_equal(mat.shape, mat_n.shape) + assert mat.shape == mat_n.shape # test a 3d array, ie a group of square matrices - mat = rs.randn(n_trials, n_channels, n_channels) + mat = rndstate.randn(n_trials, n_channels, n_channels) mat_n = normalize(mat, "determinant") - assert_array_equal(mat.shape, mat_n.shape) + assert mat.shape == mat_n.shape # test a 4d array, ie a group of groups of square matrices - mat = rs.randn(n_conds, n_trials, n_channels, n_channels) + mat = rndstate.randn(n_conds, n_trials, n_channels, n_channels) mat_n = normalize(mat, "trace") - assert_array_equal(mat.shape, mat_n.shape) + assert mat.shape == mat_n.shape # after trace-normalization => trace equal to 1 - mat = rs.randn(n_trials, n_channels, n_channels) + mat = rndstate.randn(n_trials, n_channels, n_channels) mat_tn = normalize(mat, "trace") assert_array_almost_equal(np.ones(mat_tn.shape[0]), np.trace(mat_tn, axis1=-2, axis2=-1)) @@ -300,38 +285,40 @@ def test_normalize(): np.abs(np.linalg.det(mat_dn))) with pytest.raises(ValueError): # not at least 2d - normalize(rs.randn(n_channels), "trace") + normalize(rndstate.randn(n_channels), "trace") with pytest.raises(ValueError): # not square - normalize(rs.randn(n_trials, n_channels, n_channels + 2), "trace") + shape = (n_trials, n_channels, n_channels + 2) + normalize(rndstate.randn(*shape), "trace") with pytest.raises(ValueError): # invalid normalization type - normalize(rs.randn(n_trials, n_channels, n_channels), "abc") + normalize(rndstate.randn(n_trials, n_channels, n_channels), "abc") -def test_get_nondiag_weight(): +def test_get_nondiag_weight(rndstate): """Test get_nondiag_weight""" - rs = np.random.RandomState(17) n_conds, n_trials, n_channels = 10, 20, 3 # test a 2d array, ie a single square matrix - w = get_nondiag_weight(rs.randn(n_channels, n_channels)) + w = get_nondiag_weight(rndstate.randn(n_channels, n_channels)) assert np.isscalar(w) # test a 3d array, ie a group of square matrices - w = get_nondiag_weight(rs.randn(n_trials, n_channels, n_channels)) - assert_array_equal(w.shape, [n_trials]) + w = get_nondiag_weight(rndstate.randn(n_trials, n_channels, n_channels)) + assert w.shape == (n_trials,) # test a 4d array, ie a group of groups of square matrices - w = get_nondiag_weight(rs.randn(n_conds, n_trials, n_channels, n_channels)) - assert_array_equal(w.shape, [n_conds, n_trials]) + shape = (n_conds, n_trials, n_channels, n_channels) + w = get_nondiag_weight(rndstate.randn(*shape)) + assert w.shape == (n_conds, n_trials) # 2x2 constant matrices => non-diag weights equal to 1 - mats = rs.randn(n_trials, 1, 1) * np.ones((n_trials, 2, 2)) + mats = rndstate.randn(n_trials, 1, 1) * np.ones((n_trials, 2, 2)) w = get_nondiag_weight(mats) assert_array_almost_equal(w, np.ones(n_trials)) # diagonal matrices => non-diag weights equal to 0 - mats = rs.randn(n_trials, 1, 1) * ([np.eye(n_channels)] * n_trials) + mats = rndstate.randn(n_trials, 1, 1) * ([np.eye(n_channels)] * n_trials) w = get_nondiag_weight(mats) assert_array_almost_equal(w, np.zeros(n_trials)) with pytest.raises(ValueError): # not at least 2d - get_nondiag_weight(rs.randn(n_channels)) + get_nondiag_weight(rndstate.randn(n_channels)) with pytest.raises(ValueError): # not square - get_nondiag_weight(rs.randn(n_trials, n_channels, n_channels + 2)) + shape = (n_trials, n_channels, n_channels + 2) + get_nondiag_weight(rndstate.randn(*shape)) diff --git a/tests/test_utils_distance.py b/tests/test_utils_distance.py index 02baa52d..49de1e2c 100644 --- a/tests/test_utils_distance.py +++ b/tests/test_utils_distance.py @@ -1,122 +1,84 @@ -from numpy.testing import assert_array_almost_equal -import pytest +from conftest import get_distances import numpy as np - -from pyriemann.utils.distance import (distance_riemann, - distance_euclid, - distance_logeuclid, - distance_logdet, - distance_kullback, - distance_kullback_right, - distance_kullback_sym, - distance_wasserstein, - distance, pairwise_distance, - _check_distance_method) - - -def test_check_distance_methd(): - """Test _check_distance_method""" - _check_distance_method('riemann') - _check_distance_method(distance_riemann) - with pytest.raises(ValueError): - _check_distance_method('666') - with pytest.raises(ValueError): - _check_distance_method(42) - - -def test_distance_riemann(): - """Test riemannian distance""" - A = 2*np.eye(3) - B = 2*np.eye(3) - assert_array_almost_equal(distance_riemann(A, B), 0) - - -def test_distance_kullback(): - """Test kullback divergence""" - A = 2*np.eye(3) - B = 2*np.eye(3) - assert_array_almost_equal(distance_kullback(A, B), 0) - assert_array_almost_equal(distance_kullback_right(A, B), 0) - assert_array_almost_equal(distance_kullback_sym(A, B), 0) - - -def test_distance_euclid(): - """Test euclidean distance""" - A = 2*np.eye(3) - B = 2*np.eye(3) - assert distance_euclid(A, B) == 0 - - -def test_distance_logeuclid(): - """Test logeuclid distance""" - A = 2*np.eye(3) - B = 2*np.eye(3) - assert distance_logeuclid(A, B) == 0 - - -def test_distance_wasserstein(): - """Test wasserstein distance""" - A = 2*np.eye(3) - B = 2*np.eye(3) - assert distance_wasserstein(A, B) == 0 +from pyriemann.utils.distance import ( + distance_riemann, + distance_euclid, + distance_logeuclid, + distance_logdet, + distance_kullback, + distance_kullback_right, + distance_kullback_sym, + distance_wasserstein, + distance, + pairwise_distance, + _check_distance_method, +) +from pyriemann.utils.geodesic import geodesic +import pytest +from pytest import approx -def test_distance_logdet(): - """Test logdet distance""" - A = 2*np.eye(3) - B = 2*np.eye(3) - assert distance_logdet(A, B) == 0 +def get_dist_func(): + dist_func = [ + distance_riemann, + distance_logeuclid, + distance_euclid, + distance_logdet, + distance_kullback, + distance_kullback_right, + distance_kullback_sym, + distance_wasserstein, + ] + for df in dist_func: + yield df -def test_distance_generic_riemann(): - """Test riemannian distance for generic function""" - A = 2*np.eye(3) - B = 2*np.eye(3) - assert distance(A, B, metric='riemann') == distance_riemann(A, B) +@pytest.mark.parametrize("dist", get_distances()) +def test_check_distance_str(dist): + _check_distance_method(dist) -def test_distance_generic_euclid(): - """Test euclidean distance for generic function""" - A = 2*np.eye(3) - B = 2*np.eye(3) - assert distance(A, B, metric='euclid') == distance_euclid(A, B) +@pytest.mark.parametrize("dist", get_dist_func()) +def test_check_distance_func(dist): + _check_distance_method(dist) -def test_distance_generic_logdet(): - """Test logdet distance for generic function""" - A = 2*np.eye(3) - B = 2*np.eye(3) - assert distance(A, B, metric='logdet') == distance_logdet(A, B) +def test_check_distance_error(): + with pytest.raises(ValueError): + _check_distance_method("universe") + with pytest.raises(ValueError): + _check_distance_method(42) -def test_distance_generic_logeuclid(): - """Test logeuclid distance for generic function""" - A = 2*np.eye(3) - B = 2*np.eye(3) - assert distance(A, B, metric='logeuclid') == distance_logeuclid(A, B) +@pytest.mark.parametrize("dist", get_dist_func()) +def test_distance_func_eye(dist): + n_channels = 3 + A = 2 * np.eye(n_channels) + B = 2 * np.eye(n_channels) + assert dist(A, B) == approx(0) -def test_distance_generic_kullback(): - """Test logeuclid distance for generic function""" - A = 2*np.eye(3) - B = 2*np.eye(3) - assert distance(A, B, metric='kullback') == distance_kullback(A, B) - assert distance(A, B, metric='kullback_right') == \ - distance_kullback_right(A, B) - assert distance(A, B, metric='kullback_sym') == \ - distance_kullback_sym(A, B) +@pytest.mark.parametrize("dist", get_dist_func()) +def test_distance_func_rand(dist, get_covmats): + n_trials, n_channels = 2, 6 + covmats = get_covmats(n_trials, n_channels) + A, C = covmats[0], covmats[1] + B = geodesic(A, C, alpha=0.5) + assert dist(A, B) < dist(A, C) -def test_distance_generic_custom(): - """Test custom distance for generic function""" - A = 2*np.eye(3) - B = 2*np.eye(3) - assert distance( - A, B, metric=distance_logeuclid) == distance_logeuclid(A, B) +@pytest.mark.parametrize("dist, dfunc", zip(get_distances(), get_dist_func())) +def test_distance_wrapper(dist, dfunc, get_covmats): + n_trials, n_channels = 2, 5 + covmats = get_covmats(n_trials, n_channels) + A, B = covmats[0], covmats[1] + assert distance(A, B, metric=dist) == dfunc(A, B) -def test_pairwise_distance_matrix(): - """Test pairwise distance""" - A = np.array([2*np.eye(3), 3*np.eye(3)]) - B = np.array([2*np.eye(3), 3*np.eye(3)]) - pairwise_distance(A, B) +def test_pairwise_distance_matrix(get_covmats): + n_trials, n_channels = 6, 5 + covmats = get_covmats(n_trials, n_channels) + n_subset = 4 + A, B = covmats[:n_subset], covmats[n_subset:] + pdist = pairwise_distance(A, B) + assert pdist.shape == (n_subset, 2) diff --git a/tests/test_utils_geodesic.py b/tests/test_utils_geodesic.py index d0ec7748..73459173 100644 --- a/tests/test_utils_geodesic.py +++ b/tests/test_utils_geodesic.py @@ -1,104 +1,94 @@ -from numpy.testing import assert_array_almost_equal import numpy as np - from pyriemann.utils.geodesic import ( - geodesic_riemann, geodesic_euclid, geodesic_logeuclid, geodesic) - -# Riemannian metric - - -def test_geodesic_riemann_0(): - """Test riemannian geodesic when alpha = 0""" - A = 0.5*np.eye(3) - B = 2*np.eye(3) - assert_array_almost_equal(geodesic_riemann(A, B, 0), A) - - -def test_geodesic_riemann_1(): - """TTest riemannian geodesic when alpha = 1""" - A = 0.5*np.eye(3) - B = 2*np.eye(3) - assert_array_almost_equal(geodesic_riemann(A, B, 1), B) - - -def test_geodesic_riemann_middle(): - """Test riemannian geodesic when alpha = 0.5""" - A = 0.5*np.eye(3) - B = 2*np.eye(3) - Ctrue = np.eye(3) - assert_array_almost_equal(geodesic_riemann(A, B, 0.5), Ctrue) - -# euclidean metric - - -def test_geodesic_euclid_0(): - """Test euclidean geodesic when alpha = 0""" - A = 0.5*np.eye(3) - B = 2*np.eye(3) - assert_array_almost_equal(geodesic_euclid(A, B, 0), A) - - -def test_geodesic_euclid_1(): - """TTest euclidean geodesic when alpha = 1""" - A = 0.5*np.eye(3) - B = 2*np.eye(3) - assert_array_almost_equal(geodesic_euclid(A, B, 1), B) - - -def test_geodesic_euclid_middle(): - """Test euclidean geodesic when alpha = 0.5""" - A = 1*np.eye(3) - B = 2*np.eye(3) - Ctrue = 1.5*np.eye(3) - assert_array_almost_equal(geodesic_euclid(A, B, 0.5), Ctrue) - -# log-euclidean metric - - -def test_geodesic_logeuclid_0(): - """Test log euclidean geodesic when alpha = 0""" - A = 0.5*np.eye(3) - B = 2*np.eye(3) - assert_array_almost_equal(geodesic_logeuclid(A, B, 0), A) - - -def test_geodesic_logeuclid_1(): - """TTest log euclidean geodesic when alpha = 1""" - A = 0.5*np.eye(3) - B = 2*np.eye(3) - assert_array_almost_equal(geodesic_logeuclid(A, B, 1), B) - - -def test_geodesic_logeuclid_middle(): - """Test log euclidean geodesic when alpha = 0.5""" - A = 0.5*np.eye(3) - B = 2*np.eye(3) - Ctrue = 1*np.eye(3) - assert_array_almost_equal(geodesic_logeuclid(A, B, 0.5), Ctrue) - -######################## -# global geodesic - - -def test_geodesic_riemann(): - """Test riemannian geodesic when alpha = 0.5 for global function""" - A = 0.5*np.eye(3) - B = 2*np.eye(3) - Ctrue = 1*np.eye(3) - assert_array_almost_equal(geodesic(A, B, 0.5, metric='riemann'), Ctrue) - - -def test_geodesic_euclid(): - """Test euclidean geodesic when alpha = 0.5 for global function""" - A = 1*np.eye(3) - B = 2*np.eye(3) - Ctrue = 1.5*np.eye(3) - assert_array_almost_equal(geodesic(A, B, 0.5, metric='euclid'), Ctrue) - - -def test_geodesic_logeuclid(): - """Test riemannian geodesic when alpha = 0.5 for global function""" - A = 0.5*np.eye(3) - B = 2*np.eye(3) - Ctrue = 1*np.eye(3) - assert_array_almost_equal(geodesic(A, B, 0.5, metric='logeuclid'), Ctrue) + geodesic_riemann, + geodesic_euclid, + geodesic_logeuclid, + geodesic, +) +from pyriemann.utils.mean import mean_riemann, mean_logeuclid, mean_euclid +import pytest +from pytest import approx + + +def get_geod_func(): + geod_func = [geodesic_riemann, geodesic_euclid, geodesic_logeuclid] + for gf in geod_func: + yield gf + + +def get_geod_name(): + geod_name = ["riemann", "euclid", "logeuclid"] + for gn in geod_name: + yield gn + + +@pytest.mark.parametrize( + "geodesic_func", [geodesic_riemann, geodesic_euclid, geodesic_logeuclid] +) +class GeodesicFuncTestCase: + def test_simple_mat(self, geodesic_func, get_covmats): + n_channels = 3 + if geodesic_func is geodesic_euclid: + A = 1.0 * np.eye(n_channels) + B = 2.0 * np.eye(n_channels) + Ctrue = 1.5 * np.eye(n_channels) + else: + A = 0.5 * np.eye(n_channels) + B = 2 * np.eye(n_channels) + Ctrue = np.eye(n_channels) + self.geodesic_0(geodesic_func, A, B) + self.geodesic_1(geodesic_func, A, B) + self.geodesic_middle(geodesic_func, A, B, Ctrue) + + def test_random_mat(self, geodesic_func, get_covmats): + n_trials, n_channels = 2, 5 + covmats = get_covmats(n_trials, n_channels) + A, B = covmats[0], covmats[1] + if geodesic_func is geodesic_euclid: + Ctrue = mean_euclid(covmats) + elif geodesic_func is geodesic_logeuclid: + Ctrue = mean_logeuclid(covmats) + elif geodesic_func is geodesic_riemann: + Ctrue = mean_riemann(covmats) + self.geodesic_0(geodesic_func, A, B) + self.geodesic_1(geodesic_func, A, B) + self.geodesic_middle(geodesic_func, A, B, Ctrue) + + +class TestGeodesicFunc(GeodesicFuncTestCase): + def geodesic_0(self, geodesic_func, A, B): + assert geodesic_func(A, B, 0) == approx(A) + + def geodesic_1(self, geodesic_func, A, B): + assert geodesic_func(A, B, 1) == approx(B) + + def geodesic_middle(self, geodesic_func, A, B, Ctrue): + assert geodesic_func(A, B, 0.5) == approx(Ctrue) + + +@pytest.mark.parametrize("metric", get_geod_name()) +def test_distance_wrapper_simple(metric): + n_channels = 3 + if metric == "euclid": + A = 1.0 * np.eye(n_channels) + B = 2.0 * np.eye(n_channels) + Ctrue = 1.5 * np.eye(n_channels) + else: + A = 0.5 * np.eye(n_channels) + B = 2 * np.eye(n_channels) + Ctrue = np.eye(n_channels) + assert geodesic(A, B, 0.5, metric=metric) == approx(Ctrue) + + +@pytest.mark.parametrize("met, gfunc", zip(get_geod_name(), get_geod_func())) +def test_distance_wrapper_random(met, gfunc, get_covmats): + n_trials, n_channels = 2, 5 + covmats = get_covmats(n_trials, n_channels) + A, B = covmats[0], covmats[1] + if gfunc is geodesic_euclid: + Ctrue = mean_euclid(covmats) + elif gfunc is geodesic_logeuclid: + Ctrue = mean_logeuclid(covmats) + elif gfunc is geodesic_riemann: + Ctrue = mean_riemann(covmats) + assert geodesic(A, B, 0.5, metric=met) == approx(Ctrue) diff --git a/tests/test_utils_mean.py b/tests/test_utils_mean.py index e61d9fc6..a47dbe48 100644 --- a/tests/test_utils_mean.py +++ b/tests/test_utils_mean.py @@ -17,18 +17,6 @@ from pyriemann.utils.geodesic import geodesic_riemann -def generate_cov(n_trials, n_channels): - """Generate a set of cavariances matrices for test purpose""" - rs = np.random.RandomState(1234) - diags = 2.0 + 0.1 * rs.randn(n_trials, n_channels) - A = 2 * rs.rand(n_channels, n_channels) - 1 - A /= np.linalg.norm(A, axis=1)[:, np.newaxis] - covmats = np.empty((n_trials, n_channels, n_channels)) - for i in range(n_trials): - covmats[i] = A @ np.diag(diags[i]) @ A.T - return covmats, diags, A - - @pytest.mark.parametrize( "mean", [ @@ -43,28 +31,28 @@ def generate_cov(n_trials, n_channels): mean_wasserstein, ], ) -def test_mean_shape(mean): +def test_mean_shape(mean, get_covmats): """Test the shape of mean""" n_trials, n_channels = 5, 3 - covmats, _, A = generate_cov(n_trials, n_channels) + covmats = get_covmats(n_trials, n_channels) C = mean(covmats) assert C.shape == (n_channels, n_channels) @pytest.mark.parametrize("mean", [mean_riemann, mean_logdet]) -def test_mean_shape_with_init(mean): +def test_mean_shape_with_init(mean, get_covmats): """Test the shape of mean with init""" n_trials, n_channels = 5, 3 - covmats, _, A = generate_cov(n_trials, n_channels) + covmats = get_covmats(n_trials, n_channels) C = mean(covmats, init=covmats[0]) assert C.shape == (n_channels, n_channels) @pytest.mark.parametrize("init", [True, False]) -def test_riemann_mean(init): +def test_riemann_mean(init, get_covmats_params): """Test the riemannian mean""" n_trials, n_channels = 100, 3 - covmats, diags, A = generate_cov(n_trials, n_channels) + covmats, diags, A = get_covmats_params(n_trials, n_channels) if init: C = mean_riemann(covmats, init=covmats[0]) else: @@ -74,44 +62,44 @@ def test_riemann_mean(init): assert C == approx(Ctrue) -def test_euclid_mean(): +def test_euclid_mean(get_covmats): """Test the euclidean mean""" n_trials, n_channels = 100, 3 - covmats, _, _ = generate_cov(n_trials, n_channels) + covmats = get_covmats(n_trials, n_channels) C = mean_euclid(covmats) assert C == approx(covmats.mean(axis=0)) -def test_identity_mean(): +def test_identity_mean(get_covmats): """Test the identity mean""" n_trials, n_channels = 100, 3 - covmats, _, _ = generate_cov(n_trials, n_channels) + covmats = get_covmats(n_trials, n_channels) C = mean_identity(covmats) assert np.all(C == np.eye(n_channels)) -def test_alm_mean(): +def test_alm_mean(get_covmats): """Test the ALM mean""" n_trials, n_channels = 3, 3 - covmats, _, _ = generate_cov(n_trials, n_channels) + covmats = get_covmats(n_trials, n_channels) C_alm = mean_alm(covmats) C_riem = mean_riemann(covmats) assert C_alm == approx(C_riem) -def test_alm_mean_maxiter(): +def test_alm_mean_maxiter(get_covmats): """Test the ALM mean with max iteration""" n_trials, n_channels = 3, 3 - covmats, _, _ = generate_cov(n_trials, n_channels) - C = mean_alm(covmats, maxiter=1, verbose=True) # maxiter reached + covmats = get_covmats(n_trials, n_channels) + C = mean_alm(covmats, maxiter=1) assert C.shape == (n_channels, n_channels) -def test_alm_mean_2trials(): +def test_alm_mean_2trials(get_covmats): """Test the ALM mean with 2 trials""" n_trials, n_channels = 2, 3 - covmats, _, _ = generate_cov(n_trials, n_channels) - C = mean_alm(covmats) # n_trials=2 + covmats = get_covmats(n_trials, n_channels) + C = mean_alm(covmats) assert np.all(C == geodesic_riemann(covmats[0], covmats[1], alpha=0.5)) @@ -130,10 +118,10 @@ def test_alm_mean_2trials(): ("kullback_sym", mean_kullback_sym), ], ) -def test_mean_covariance_metric(metric, mean): +def test_mean_covariance_metric(metric, mean, get_covmats): """Test mean_covariance for metric""" n_trials, n_channels = 3, 3 - covmats, _, _ = generate_cov(n_trials, n_channels) + covmats = get_covmats(n_trials, n_channels) C = mean_covariance(covmats, metric=metric) Ctrue = mean(covmats) assert np.all(C == Ctrue) diff --git a/tests/test_utils_tangent_space.py b/tests/test_utils_tangent_space.py index dc696929..cb3730c5 100644 --- a/tests/test_utils_tangent_space.py +++ b/tests/test_utils_tangent_space.py @@ -1,36 +1,43 @@ +from conftest import get_metrics import numpy as np -from numpy.testing import assert_array_almost_equal +from pyriemann.utils.tangentspace import ( + tangent_space, untangent_space, transport +) +import pytest +from pytest import approx -from pyriemann.utils.tangentspace import tangent_space, untangent_space - -def generate_cov(Nt, Ne): - """Generate a set of cavariances matrices for test purpose""" - rs = np.random.RandomState(1234) - diags = 2.0 + 0.1 * rs.randn(Nt, Ne) - A = 2*rs.rand(Ne, Ne) - 1 - A /= np.atleast_2d(np.sqrt(np.sum(A**2, 1))).T - covmats = np.empty((Nt, Ne, Ne)) - for i in range(Nt): - covmats[i] = np.dot(np.dot(A, np.diag(diags[i])), A.T) - return covmats - - -def test_tangent_space(): +def test_tangent_space(get_covmats): """Test tangent space projection""" - C = generate_cov(10, 3) - tangent_space(C, np.eye(3)) + n_trials, n_channels = 6, 3 + n_ts = (n_channels * (n_channels + 1)) // 2 + covmats = get_covmats(n_trials, n_channels) + Xts = tangent_space(covmats, np.eye(n_channels)) + assert Xts.shape == (n_trials, n_ts) -def test_untangent_space(): +def test_untangent_space(rndstate): """Test untangent space projection""" - T = np.random.randn(10, 6) - untangent_space(T, np.eye(3)) + n_trials, n_channels = 10, 3 + n_ts = (n_channels * (n_channels + 1)) // 2 + T = rndstate.randn(n_trials, n_ts) + covmats = untangent_space(T, np.eye(n_channels)) + assert covmats.shape == (n_trials, n_channels, n_channels) -def test_tangent_and_untangent_space(): +def test_tangent_and_untangent_space(get_covmats): """Test tangent space projection and retro-projection should be the same""" - C = generate_cov(10, 3) - T = tangent_space(C, np.eye(3)) - covmats = untangent_space(T, np.eye(3)) - assert_array_almost_equal(C, covmats) + n_trials, n_channels = 10, 3 + covmats = get_covmats(n_trials, n_channels) + Xts = tangent_space(covmats, np.eye(n_channels)) + covmats_ut = untangent_space(Xts, np.eye(n_channels)) + assert covmats_ut == approx(covmats) + + +@pytest.mark.parametrize("metric", get_metrics()) +def test_transport(metric, get_covmats): + n_trials, n_channels = 10, 3 + covmats = get_covmats(n_trials, n_channels) + ref = np.eye(n_channels) + covtr = transport(covmats, ref, metric=metric) + assert covtr.shape == (n_trials, n_channels, n_channels) diff --git a/tests/test_viz.py b/tests/test_viz.py index 5319ea55..d0f416ac 100644 --- a/tests/test_viz.py +++ b/tests/test_viz.py @@ -1,4 +1,4 @@ -from conftest import covmats, requires_matplotlib # noqa: F401 +from conftest import requires_matplotlib import numpy as np import pytest @@ -10,8 +10,10 @@ @requires_matplotlib -def test_embedding(covmats): # noqa: F811 +def test_embedding(get_covmats): """Test Embedding.""" + n_trials, n_channels = 5, 3 + covmats = get_covmats(n_trials, n_channels) plot_embedding(covmats, y=None, metric="euclid") y = np.ones(covmats.shape[0]) plot_embedding(covmats, y=y, metric="euclid")