diff --git a/.gitignore b/.gitignore index bca17c4d..9e1143eb 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,6 @@ /doc/auto_examples /dist *-e +*.egg-info/ +.vscode/ +sandbox/ diff --git a/doc/api.rst b/doc/api.rst index 925e84c2..e2fdf86e 100644 --- a/doc/api.rst +++ b/doc/api.rst @@ -60,7 +60,6 @@ Clustering Potato PotatoField - Tangent Space ------------------ .. _tangentspace_api: @@ -109,7 +108,7 @@ Channel selection :template: class.rst ElectrodeSelection - FlatChannelRemover + FlatChannelRemover Stats ------------------ @@ -123,6 +122,19 @@ Stats PermutationDistance PermutationModel +Datasets +------------------ +.. _datasets_api: +.. currentmodule:: pyriemann.datasets + +.. autosummary:: + :toctree: generated/ + + make_gaussian_blobs + make_outliers + make_covariances + sample_gaussian_spd + generate_random_spd_matrix Utils function -------------- @@ -233,3 +245,4 @@ Aproximate Joint Diagonalization rjd ajd_pham uwedge + diff --git a/doc/whatsnew.rst b/doc/whatsnew.rst index ca73fb0c..c3632283 100644 --- a/doc/whatsnew.rst +++ b/doc/whatsnew.rst @@ -24,7 +24,9 @@ v0.2.8.dev - Refactor tests + fix refit of :class:`pyriemann.tangentspace.TangentSpace` -- Add :class:`pyriemann.clustering.PotatoField`, and an example on artifact detection +- Add sampling SPD matrices from a Riemannian Gaussian distribution in :func:`pyriemann.datasets.sample_gaussian_spd` + +- Add new function :func:`pyriemann.datasets.make_gaussian_blobs` for generating random datasets with SPD matrices v0.2.7 (June 2021) ------------------ diff --git a/examples/simulated/README.txt b/examples/simulated/README.txt new file mode 100644 index 00000000..e359e365 --- /dev/null +++ b/examples/simulated/README.txt @@ -0,0 +1,4 @@ +Simulated data +--------------- + +Examples using datasets sampled from known probability distributions diff --git a/examples/simulated/plot_riemannian_gaussian.py b/examples/simulated/plot_riemannian_gaussian.py new file mode 100644 index 00000000..ac0333c2 --- /dev/null +++ b/examples/simulated/plot_riemannian_gaussian.py @@ -0,0 +1,77 @@ +""" +===================================================================== +Sample from the Riemannian Gaussian distribution in the SPD manifold +===================================================================== + +Spectral embedding of samples from the Riemannian Gaussian distribution +with different centerings and dispersions. + +""" +# Authors: Pedro Rodrigues +# +# License: BSD (3-clause) + +import numpy as np +import matplotlib.pyplot as plt + +from pyriemann.embedding import Embedding +from pyriemann.datasets import sample_gaussian_spd, generate_random_spd_matrix + + +print(__doc__) + +############################################################################### +# Set parameters for sampling from the Riemannian Gaussian distribution +n_matrices = 100 # how many SPD matrices to generate +n_dim = 4 # number of dimensions of the SPD matrices +sigma = 1.0 # dispersion of the Gaussian distribution +epsilon = 4.0 # parameter for controlling the distance between centers +random_state = 42 # ensure reproducibility + +# Generate the samples on three different conditions +mean = generate_random_spd_matrix(n_dim) # random reference point + +samples_1 = sample_gaussian_spd(n_matrices=n_matrices, + mean=mean, + sigma=sigma, + random_state=random_state) +samples_2 = sample_gaussian_spd(n_matrices=n_matrices, + mean=mean, + sigma=sigma/2, + random_state=random_state) +samples_3 = sample_gaussian_spd(n_matrices=n_matrices, + mean=epsilon*mean, + sigma=sigma, + random_state=random_state) + +# Stack all of the samples into one data array for the embedding +samples = np.concatenate([samples_1, samples_2, samples_3]) +labels = np.array(n_matrices*[1] + n_matrices*[2] + n_matrices*[3]) + +############################################################################### +# Apply the spectral embedding over the SPD matrices +lapl = Embedding(metric='riemann', n_components=2) +embd = lapl.fit_transform(X=samples) + +############################################################################### +# Plot the results + +fig, ax = plt.subplots(figsize=(8, 6)) + +colors = {1: 'C0', 2: 'C1', 3: 'C2'} +for i in range(len(samples)): + ax.scatter(embd[i, 0], embd[i, 1], c=colors[labels[i]], s=50) +ax.scatter([], [], c='C0', s=50, label=r'$\varepsilon = 1.00, \sigma = 1.00$') +ax.scatter([], [], c='C1', s=50, label=r'$\varepsilon = 1.00, \sigma = 0.50$') +ax.scatter([], [], c='C2', s=50, label=r'$\varepsilon = 4.00, \sigma = 1.00$') +ax.set_xticks([-1, -0.5, 0, 0.5, 1.0]) +ax.set_xticklabels([-1, -0.5, 0, 0.5, 1.0], fontsize=12) +ax.set_yticks([-1, -0.5, 0, 0.5, 1.0]) +ax.set_yticklabels([-1, -0.5, 0, 0.5, 1.0], fontsize=12) +ax.set_title(r'Spectral embedding of data points (fixed $n_{dim} = 4$)', + fontsize=14) +ax.set_xlabel(r'$\phi_1$', fontsize=14) +ax.set_ylabel(r'$\phi_2$', fontsize=14) +ax.legend() + +plt.show() diff --git a/examples/simulated/plot_toy_classification.py b/examples/simulated/plot_toy_classification.py new file mode 100644 index 00000000..53e35d2a --- /dev/null +++ b/examples/simulated/plot_toy_classification.py @@ -0,0 +1,74 @@ +""" +===================================================================== +Illustrate classification accuracy versus class separability +===================================================================== + +Generate several datasets containing data points from two-classes. Each class +is generated with a Riemannian Gaussian distribution centered at the class mean +and with the same dispersion sigma. The distance between the class means is +parametrized by Delta, which we make vary between zero and 5*sigma. We +illustrate how the accuracy of the MDM classifier varies when Delta increases. + +""" +# Authors: Pedro Rodrigues +# +# License: BSD (3-clause) + +import numpy as np +import matplotlib.pyplot as plt +from sklearn.model_selection import cross_val_score + +from pyriemann.classification import MDM +from pyriemann.datasets import make_gaussian_blobs + + +print(__doc__) + + +############################################################################### +# Set general parameters for the illustrations + + +n_matrices = 100 # how many matrices to sample on each class +n_dim = 4 # dimensionality of the data points +sigma = 1.0 # dispersion of the Gaussian distributions +random_state = 42 # ensure reproducibility + +############################################################################### +# Loop over different levels of separability between the classes +scores_array = [] +deltas_array = np.linspace(0, 5*sigma, 5) + +for delta in deltas_array: + # generate data points for a classification problem + X, y = make_gaussian_blobs(n_matrices=n_matrices, + n_dim=n_dim, + class_sep=delta, + class_disp=sigma, + random_state=random_state) + + # which classifier to consider + clf = MDM() + + # get the classification score for this setup + scores_array.append( + cross_val_score(clf, X, y, cv=5, scoring='roc_auc').mean()) + +scores_array = np.array(scores_array) + +############################################################################### +# Plot the results +fig, ax = plt.subplots(figsize=(7.5, 5.9)) +ax.plot(deltas_array, scores_array, lw=3.0, label=sigma) +ax.set_xticks([0, 1, 2, 3, 4, 5]) +ax.set_xticklabels([0, 1, 2, 3, 4, 5], fontsize=12) +ax.set_yticks([0.5, 0.6, 0.7, 0.8, 0.9, 1.0]) +ax.set_yticklabels([0.5, 0.6, 0.7, 0.8, 0.9, 1.0], fontsize=12) +ax.set_xlabel(r'$\Delta/\sigma$', fontsize=14) +ax.set_ylabel(r'score', fontsize=12) +ax.set_title(r'Classification score Vs class separability ($n_{dim} = 4$)', + fontsize=12) +ax.grid(True) +ax.legend(loc='lower right', title=r'$\sigma$') + +plt.show() diff --git a/pyriemann/datasets/__init__.py b/pyriemann/datasets/__init__.py new file mode 100644 index 00000000..a54ef3c0 --- /dev/null +++ b/pyriemann/datasets/__init__.py @@ -0,0 +1,9 @@ +from .sampling import sample_gaussian_spd, generate_random_spd_matrix +from .simulated import make_covariances, make_gaussian_blobs, make_outliers + + +__all__ = ["sample_gaussian_spd", + "generate_random_spd_matrix", + "make_covariances", + "make_gaussian_blobs", + "make_outliers"] diff --git a/pyriemann/datasets/sampling.py b/pyriemann/datasets/sampling.py new file mode 100644 index 00000000..d5fe800e --- /dev/null +++ b/pyriemann/datasets/sampling.py @@ -0,0 +1,355 @@ +from functools import partial +import warnings +import numpy as np +from sklearn.utils import check_random_state + +from ..utils.base import sqrtm, expm +from ..utils.test import is_sym_pos_semi_def as is_spsd + + +def _pdf_r(r, sigma): + """Pdf for the log of eigenvalues of a SPD matrix + + Probability density function for the logarithm of the eigenvalues of a SPD + matrix samples from the Riemannian Gaussian distribution. See Said et al. + "Riemannian Gaussian distributions on the space of symmetric positive + definite matrices" (2017) for the mathematical details. + + Parameters + ---------- + r : ndarray, shape (n_dim,) + Vector with the logarithm of the eigenvalues of a SPD matrix. + sigma : float + Dispersion of the Riemannian Gaussian distribution. + + Returns + ------- + p : float + Probability density function applied to data point r + """ + + if (sigma <= 0): + raise ValueError(f'sigma must be a positive number (Got {sigma})') + + n_dim = len(r) + partial_1 = -np.sum(r**2) / sigma**2 + partial_2 = 0 + for i in range(n_dim): + for j in range(i + 1, n_dim): + partial_2 = partial_2 + np.log(np.sinh(np.abs(r[i] - r[j]) / 2)) + + return np.exp(partial_1 + partial_2) + + +def _slice_sampling(ptarget, n_samples, x0, n_burnin=20, thin=10, + random_state=None): + """Slice sampling procedure + + Implementation of a slice sampling algorithm for sampling from any target + pdf or a multiple of it. The implementation follows the description given + in page 375 of David McKay's book "Information Theory, Inference, and + Learning Algorithms" (2003). + + Parameters + ---------- + ptarget : function with one input + The target pdf to sample from or a multiple of it. + n_samples : int + How many samples to get from the ptarget distribution. + x0 : array + Initial state for the MCMC procedure. Note that the shape of this array + defines the dimensionality n_dim of the data points to be sampled. + n_burnin : int + How many samples to discard from the beginning of the chain generated + by the slice sampling procedure. Usually the first samples are prone to + non-stationary behavior and do not follow very well the target pdf. + thin : int + Thinning factor for the slice sampling procedure. MCMC samples are + often correlated between them, so taking one sample every `thin` + samples can help reducing this correlation. Note that this makes the + algorithm actually sample `thin x n_samples` samples from the pdf, so + expect the whole sampling procedure to take longer + random_state : int, RandomState instance or None (default: None) + Pass an int for reproducible output across multiple function calls. + + Returns + ------- + samples : ndarray, shape (n_samples, n_dim) + Samples from the target pdf. + """ + + if (n_samples <= 0) or (not isinstance(n_samples, int)): + raise ValueError( + f'n_samples must be a positive integer (Got {n_samples})') + if (n_burnin <= 0) or (not isinstance(n_burnin, int)): + raise ValueError( + f'n_samples must be a positive integer (Got {n_burnin})') + if (thin <= 0) or (not isinstance(thin, int)): + raise ValueError(f'thin must be a positive integer (Got {thin})') + + rs = check_random_state(random_state) + w = 1.0 # initial bracket width + xt = np.copy(x0) + + n_dim = len(x0) + samples = [] + n_samples_total = (n_samples + n_burnin) * thin + + for _ in range(n_samples_total): + + for i in range(n_dim): + + ei = np.zeros(n_dim) + ei[i] = 1 + + # step 1 : evaluate ptarget(xt) + Px = ptarget(xt) + + # step 2 : draw vertical coordinate uprime ~ U(0, ptarget(xt)) + uprime_i = Px * rs.rand() + + # step 3 : create a horizontal interval (xl_i, xr_i) enclosing xt_i + r = rs.rand() + xl_i = xt[i] - r * w + xr_i = xt[i] + (1-r) * w + while ptarget(xt + (xl_i - xt[i]) * ei) > uprime_i: + xl_i = xl_i - w + while ptarget(xt + (xr_i - xt[i]) * ei) > uprime_i: + xr_i = xr_i + w + + # step 4 : loop + while True: + xprime_i = xl_i + (xr_i - xl_i) * rs.rand() + Px = ptarget(xt + (xprime_i - xt[i]) * ei) + if Px > uprime_i: + break + else: + if xprime_i > xt[i]: + xr_i = xprime_i + else: + xl_i = xprime_i + + # store coordinate i of new sample + xt = np.copy(xt) + xt[i] = xprime_i + + samples.append(xt) + + samples = np.array(samples)[(n_burnin * thin):][::thin] + + return samples + + +def _sample_parameter_r(n_samples, n_dim, sigma, random_state=None): + """Sample the r parameters of a Riemannian Gaussian distribution + + Sample the logarithm of the eigenvalues of a SPD matrix following a + Riemannian Gaussian distribution. + + See https://arxiv.org/pdf/1507.01760.pdf for the mathematical details + + Parameters + ---------- + n_samples : int + How many samples to generate. + n_dim : int + Dimensionality of the SPD matrices to be sampled. + sigma : float + Dispersion of the Riemannian Gaussian distribution. + random_state : int, RandomState instance or None (default: None) + Pass an int for reproducible output across multiple function calls. + + Returns + ------- + r_samples : ndarray, shape (n_samples, n_dim) + Samples of the r parameters of the Riemannian Gaussian distribution. + """ + + rs = check_random_state(random_state) + x0 = rs.randn(n_dim) + ptarget = partial(_pdf_r, sigma=sigma) + r_samples = _slice_sampling( + ptarget, n_samples=n_samples, x0=x0, random_state=random_state) + + return r_samples + + +def _sample_parameter_U(n_samples, n_dim, random_state=None): + """Sample the U parameters of a Riemannian Gaussian distribution + + Sample the eigenvectors of a SPD matrix following a Riemannian Gaussian + distribution. + + See https://arxiv.org/pdf/1507.01760.pdf for the mathematical details + + Parameters + ---------- + n_samples : int + How many samples to generate. + n_dim : int + Dimensionality of the SPD matrices to be sampled. + random_state : int, RandomState instance or None (default: None) + Pass an int for reproducible output across multiple function calls. + + Returns + ------- + u_samples : ndarray, shape (n_samples, n_dim) + Samples of the U parameters of the Riemannian Gaussian distribution + """ + + rs = check_random_state(random_state) + u_samples = np.zeros((n_samples, n_dim, n_dim)) + for i in range(n_samples): + A = rs.randn(n_dim, n_dim) + Q, _ = np.linalg.qr(A) + u_samples[i] = Q + + return u_samples + + +def _sample_gaussian_spd_centered(n_matrices, n_dim, sigma, random_state=None): + """Sample a Riemannian Gaussian distribution centered at the Identity + + Sample SPD matrices from a Riemannian Gaussian distribution centered at the + Identity, which has the role of the origin in the SPD manifold, and + dispersion parametrized by sigma. See [1]_ for the mathematical details. + + Parameters + ---------- + n_matrices : int + How many matrices to generate. + n_dim : int + Dimensionality of the SPD matrices to be sampled. + sigma : float + Dispersion of the Riemannian Gaussian distribution. + random_state : int, RandomState instance or None (default: None) + Pass an int for reproducible output across multiple function calls. + + Returns + ------- + samples : ndarray, shape (n_matrices, n_dim, n_dim) + Samples of the Riemannian Gaussian distribution + + Notes + ----- + .. versionadded:: 0.2.8 + + References + ---------- + .. [1] S. Said, L. Bombrun, Y. Berthoumieu, and J. Manton, “Riemannian + Gaussian distributions on the space of symmetric positive definite + matrices”, IEEE Trans Inf Theory, vol. 63, pp. 2153–2170, 2017. + https://arxiv.org/pdf/1507.01760.pdf + """ + + samples_r = _sample_parameter_r(n_samples=n_matrices, + n_dim=n_dim, + sigma=sigma, + random_state=random_state) + samples_U = _sample_parameter_U(n_samples=n_matrices, + n_dim=n_dim, + random_state=random_state) + + samples = np.zeros((n_matrices, n_dim, n_dim)) + for i in range(n_matrices): + Ui = samples_U[i] + ri = samples_r[i] + samples[i] = Ui.T @ np.diag(np.exp(ri)) @ Ui + samples[i] = 0.5 * (samples[i] + samples[i].T) + + return samples + + +def sample_gaussian_spd(n_matrices, mean, sigma, random_state=None): + """Sample a Riemannian Gaussian distribution + + Sample SPD matrices from a Riemannian Gaussian distribution centered at + mean and with dispersion parametrized by sigma. This distribution has been + defined in [1]_ and generalizes the notion of a Gaussian distribution to + the space of SPD matrices. The sampling is based on a spectral + factorization of SPD matrices in terms of their eigenvectors (U-parameters) + and the log of the eigenvalues (r-parameters). + + Parameters + ---------- + n_matrices : int + How many matrices to generate. + mean : ndarray, shape (n_dim, n_dim) + Center of the Riemannian Gaussian distribution. + sigma : float + Dispersion of the Riemannian Gaussian distribution. + random_state : int, RandomState instance or None (default: None) + Pass an int for reproducible output across multiple function calls. + + Returns + ------- + samples : ndarray, shape (n_matrices, n_dim, n_dim) + Samples of the Riemannian Gaussian distribution + + Notes + ----- + .. versionadded:: 0.2.8 + + References + ---------- + .. [1] S. Said, L. Bombrun, Y. Berthoumieu, and J. Manton, “Riemannian + Gaussian distributions on the space of symmetric positive definite + matrices”, IEEE Trans Inf Theory, vol. 63, pp. 2153–2170, 2017. + https://arxiv.org/pdf/1507.01760.pdf + """ + + n_dim = mean.shape[0] + samples_centered = _sample_gaussian_spd_centered(n_matrices=n_matrices, + n_dim=n_dim, + sigma=sigma, + random_state=random_state) + + # apply the parallel transport to mean on each of the samples + mean_sqrt = sqrtm(mean) + samples = mean_sqrt @ samples_centered @ mean_sqrt + + if not is_spsd(samples): + msg = "Some of the sampled matrices are very badly conditioned and \ + may not behave numerically as a SPD matrix. Try sampling again \ + or reducing the dimensionality of the matrix." + warnings.warn(msg) + + return samples + + +def generate_random_spd_matrix(n_dim, random_state=None): + """Generate a random SPD matrix + + Parameters + ---------- + n_dim : int + Dimensionality of the matrix to sample. + random_state : int, RandomState instance or None (default: None) + Pass an int for reproducible output across multiple function calls. + + Returns + ------- + C : ndarray, shape (n_dim, n_dim) + Random SPD matrix + + Notes + ----- + .. versionadded:: 0.2.8 + """ + + if (n_dim <= 0) or (not isinstance(n_dim, int)): + raise ValueError( + f'n_samples must be a positive integer (Got {n_dim})') + + rs = check_random_state(random_state) + A = rs.randn(n_dim, n_dim) + A = 0.5 * (A + A.T) + C = expm(A) + + if not is_spsd(C): + msg = "The sampled matrix is very badly conditioned and may not \ + behave numerically as a SPD matrix. Try sampling again or \ + reducing the dimensionality of the matrix." + warnings.warn(msg) + + return C diff --git a/pyriemann/datasets/simulated.py b/pyriemann/datasets/simulated.py new file mode 100644 index 00000000..87049472 --- /dev/null +++ b/pyriemann/datasets/simulated.py @@ -0,0 +1,163 @@ +import numpy as np +from sklearn.utils.validation import check_random_state + +from ..utils.distance import distance_riemann +from ..utils.base import powm, sqrtm +from .sampling import generate_random_spd_matrix, sample_gaussian_spd + + +def make_covariances(n_matrices, n_channels, rs, return_params=False): + """Generate a set of covariances matrices, with the same eigenvectors. + + Parameters + ---------- + n_matrices : int + Number of matrices to generate. + n_channels : int + Number of channels in covariance matrices. + rs : RandomState instance + Random state for reproducible output across multiple function calls. + return_params : bool (default False) + If True, then return parameters. + + Returns + ------- + covmats : ndarray, shape (n_matrices, n_channels, n_channels) + Covariances matrices + evals : ndarray, shape (n_matrices, n_channels) + Eigen values used for each covariance matrix. + Only returned if ``return_params=True``. + evecs : ndarray, shape (n_channels, n_channels) + Eigen vectors used for all covariance matrices. + Only returned if ``return_params=True``. + """ + evals = 2.0 + 0.1 * rs.randn(n_matrices, n_channels) + evecs = 2 * rs.rand(n_channels, n_channels) - 1 + evecs /= np.linalg.norm(evecs, axis=1)[:, np.newaxis] + + covmats = np.empty((n_matrices, n_channels, n_channels)) + for i in range(n_matrices): + covmats[i] = evecs @ np.diag(evals[i]) @ evecs.T + + if return_params: + return covmats, evals, evecs + else: + return covmats + + +def make_gaussian_blobs(n_matrices=100, n_dim=2, class_sep=1.0, class_disp=1.0, + return_centers=False, random_state=None): + """Generate SPD dataset with two classes sampled from Riemannian Gaussian + + Generate a dataset with SPD matrices drawn from two Riemannian Gaussian + distributions. The distributions have the same class dispersions and the + distance between their centers of mass is an input parameter. Useful for + testing classification or clustering methods. + + Parameters + ---------- + n_matrices : int (default: 100) + How many matrices to generate for each class. + n_dim : int (default: 2) + Dimensionality of the SPD matrices generated by the distributions. + class_sep : float (default: 1.0) + Parameter controlling the separability of the classes. + class_disp : float (default: 1.0) + Intra dispersion of the points sampled from each class. + return_centers : bool (default: False) + If True, then return the centers of each cluster + random_state : int, RandomState instance or None (default: None) + Pass an int for reproducible output across multiple function calls. + + Returns + ------- + X : ndarray, shape (2*n_matrices, n_dim, n_dim) + ndarray of SPD matrices. + y : ndarray, shape (2*n_matrices,) + labels corresponding to each matrix. + centers : ndarray, shape (2, n_dim, n_dim) + The centers of each class. Only returned if ``return_centers=True``. + + Notes + ----- + .. versionadded:: 0.2.8 + + """ + if not isinstance(class_sep, float): + raise ValueError(f'class_sep must be a float (Got {class_sep})') + + rs = check_random_state(random_state) + + # generate dataset for class 0 + C0 = generate_random_spd_matrix(n_dim) + X0 = sample_gaussian_spd(n_matrices=n_matrices, + mean=C0, + sigma=class_disp, + random_state=random_state) + y0 = np.zeros(n_matrices) + + # generate dataset for class 1 + epsilon = np.exp(class_sep / np.sqrt(n_dim)) + C1 = epsilon * C0 + X1 = sample_gaussian_spd(n_matrices=n_matrices, + mean=C1, + sigma=class_disp, + random_state=random_state) + y1 = np.ones(n_matrices) + + X = np.concatenate([X0, X1]) + y = np.concatenate([y0, y1]) + idx = rs.permutation(len(X)) + X = X[idx] + y = y[idx] + + if return_centers: + centers = np.stack([C0, C1]) + return X, y, centers + else: + return X, y + + +def make_outliers(n_matrices, mean, sigma, outlier_coeff=10, + random_state=None): + """Generate a set of outlier points + + Simulate data points that are outliers for a given Riemannian Gaussian + distribution with fixed mean and dispersion. + + Parameters + ---------- + n_matrices : int + How many matrices to generate. + mean : ndarray, shape (n_dim, n_dim) + Center of the Riemannian Gaussian distribution. + sigma : float + Dispersion of the Riemannian Gaussian distribution. + outlier_coeff: float + Coefficient determining how to define an outlier data point, i.e. how + many times the sigma parameter its distance to the mean should be. + random_state : int, RandomState instance or None (default: None) + Pass an int for reproducible output across multiple function calls. + + Returns + ------- + outliers : ndarray, shape (n_matrices, n_dim, n_dim) + Array of simulated outlier matrices + + Notes + ----- + .. versionadded:: 0.2.8 + """ + + n_dim = mean.shape[1] + mean_sqrt = sqrtm(mean) + + outliers = np.zeros((n_matrices, n_dim, n_dim)) + for i in range(n_matrices): + Oi = generate_random_spd_matrix(n_dim=n_dim, random_state=random_state) + epsilon_num = outlier_coeff * sigma * n_dim + epsilon_den = distance_riemann(Oi, np.eye(n_dim))**2 + epsilon = np.sqrt(epsilon_num / epsilon_den) + outliers[i] = mean_sqrt @ powm(Oi, epsilon) @ mean_sqrt + + return outliers diff --git a/pyriemann/utils/covariance.py b/pyriemann/utils/covariance.py index 32d96365..6d8d1b35 100644 --- a/pyriemann/utils/covariance.py +++ b/pyriemann/utils/covariance.py @@ -3,6 +3,7 @@ import numpy as np from sklearn.covariance import oas, ledoit_wolf, fast_mcd, empirical_covariance +from pyriemann.utils.test import is_square # Mapping different estimator on the sklearn toolbox @@ -396,9 +397,7 @@ def normalize(X, norm): Xn : ndarray, shape (..., n, n) The set of normalized matrices, same dimensions as X. """ - if X.ndim < 2: - raise ValueError('Input must have at least 2 dimensions') - if X.shape[-2] != X.shape[-1]: + if not is_square(X): raise ValueError('Matrices must be square') if norm == "trace": @@ -434,9 +433,7 @@ def get_nondiag_weight(X): separation of human electroencephalogram by approximate joint diagonalization of second order statistics", Clin Neurophysiol, 2008 """ - if X.ndim < 2: - raise ValueError('Input must have at least 2 dimensions') - if X.shape[-2] != X.shape[-1]: + if not is_square(X): raise ValueError('Matrices must be square') X2 = X**2 diff --git a/pyriemann/utils/test.py b/pyriemann/utils/test.py new file mode 100644 index 00000000..c8fdbb58 --- /dev/null +++ b/pyriemann/utils/test.py @@ -0,0 +1,104 @@ + +import numpy as np + + +def _get_eigenvals(X): + """ Private function to compute eigen values. """ + n = X.shape[-1] + return np.linalg.eigvals(X.reshape((-1, n, n))) + + +def is_square(X): + """ Check if matrices are square. + + Parameters + ---------- + X : ndarray, shape (..., n, n) + The set of square matrices, at least 2D ndarray. + + Returns + ------- + ret : boolean + True if matrices are square. + """ + return X.ndim >= 2 and X.shape[-2] == X.shape[-1] + + +def is_sym(X): + """ Check if all matrices are symmetric. + + Parameters + ---------- + X : ndarray, shape (..., n, n) + The set of square matrices, at least 2D ndarray. + + Returns + ------- + ret : boolean + True if all matrices are symmetric. + """ + return is_square(X) and np.allclose(X, np.swapaxes(X, -2, -1)) + + +def is_pos_def(X): + """ Check if all matrices are positive definite. + + Parameters + ---------- + X : ndarray, shape (..., n, n) + The set of square matrices, at least 2D ndarray. + + Returns + ------- + ret : boolean + True if all matrices are positive definite. + """ + return is_square(X) and np.all(_get_eigenvals(X) > 0.0) + + +def is_pos_semi_def(X): + """ Check if all matrices are positive semi-definite. + + Parameters + ---------- + X : ndarray, shape (..., n, n) + The set of square matrices, at least 2D ndarray. + + Returns + ------- + ret : boolean + True if all matrices are positive semi-definite. + """ + return is_square(X) and np.all(_get_eigenvals(X) >= 0.0) + + +def is_sym_pos_def(X): + """ Check if all matrices are symmetric positive-definite. + + Parameters + ---------- + X : ndarray, shape (..., n, n) + The set of square matrices, at least 2D ndarray. + + Returns + ------- + ret : boolean + True if all matrices are symmetric positive-definite. + """ + return is_sym(X) and is_pos_def(X) + + +def is_sym_pos_semi_def(X): + """ Check if all matrices are symmetric positive semi-definite. + + Parameters + ---------- + X : ndarray, shape (..., n, n) + The set of square matrices, at least 2D ndarray. + + Returns + ------- + ret : boolean + True if all matrices are symmetric positive semi-definite. + """ + return is_sym(X) and is_pos_semi_def(X) diff --git a/tests/conftest.py b/tests/conftest.py index 7ecb2226..22bf701e 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,9 @@ import pytest -from pytest import approx import numpy as np from functools import partial +from pyriemann.datasets import make_covariances + def requires_module(function, name, call=None): """Skip a test if package is not available (decorator).""" @@ -23,20 +24,6 @@ def requires_module(function, name, call=None): requires_seaborn = partial(requires_module, name="seaborn") -def generate_cov(n_trials, n_channels, rs, return_params=False): - """Generate a set of covariances matrices for test purpose""" - diags = 2.0 + 0.1 * rs.randn(n_trials, n_channels) - A = 2 * rs.rand(n_channels, n_channels) - 1 - A /= np.linalg.norm(A, axis=1)[:, np.newaxis] - covmats = np.empty((n_trials, n_channels, n_channels)) - for i in range(n_trials): - covmats[i] = A @ np.diag(diags[i]) @ A.T - if return_params: - return covmats, diags, A - else: - return covmats - - @pytest.fixture def rndstate(): return np.random.RandomState(1234) @@ -45,7 +32,8 @@ def rndstate(): @pytest.fixture def get_covmats(rndstate): def _gen_cov(n_trials, n_chan): - return generate_cov(n_trials, n_chan, rndstate, return_params=False) + return make_covariances(n_trials, n_chan, rndstate, + return_params=False) return _gen_cov @@ -53,7 +41,7 @@ def _gen_cov(n_trials, n_chan): @pytest.fixture def get_covmats_params(rndstate): def _gen_cov_params(n_trials, n_chan): - return generate_cov(n_trials, n_chan, rndstate, return_params=True) + return make_covariances(n_trials, n_chan, rndstate, return_params=True) return _gen_cov_params @@ -66,98 +54,6 @@ def _get_labels(n_trials, n_classes): return _get_labels -def is_positive_semi_definite(X): - """Check if all matrices are positive semi-definite. - - Parameters - ---------- - X : ndarray, shape (..., n, n) - The set of square matrices, at least 2D ndarray. - - Returns - ------- - ret : boolean - True if all matrices are positive semi-definite. - """ - cs = X.shape[-1] - return np.all(np.linalg.eigvals(X.reshape((-1, cs, cs))) >= 0.0) - - -def is_positive_definite(X): - """Check if all matrices are positive definite. - - Parameters - ---------- - X : ndarray, shape (..., n, n) - The set of square matrices, at least 2D ndarray. - - Returns - ------- - ret : boolean - True if all matrices are positive definite. - """ - cs = X.shape[-1] - return np.all(np.linalg.eigvals(X.reshape((-1, cs, cs))) > 0.0) - - -def is_symmetric(X): - """Check if all matrices are symmetric. - - Parameters - ---------- - X : ndarray, shape (..., n, n) - The set of square matrices, at least 2D ndarray. - - Returns - ------- - ret : boolean - True if all matrices are symmetric. - """ - return X == approx(np.swapaxes(X, -2, -1)) - - -@pytest.fixture -def is_spd(): - """Check if all matrices are symmetric positive-definite. - - Parameters - ---------- - X : ndarray, shape (..., n, n) - The set of square matrices, at least 2D ndarray. - - Returns - ------- - ret : boolean - True if all matrices are symmetric positive-definite. - """ - - def _is_spd(X): - return is_symmetric(X) and is_positive_definite(X) - - return _is_spd - - -@pytest.fixture -def is_spsd(): - """Check if all matrices are symmetric positive semi-definite. - - Parameters - ---------- - X : ndarray, shape (..., n, n) - The set of square matrices, at least 2D ndarray. - - Returns - ------- - ret : boolean - True if all matrices are symmetric positive semi-definite. - """ - - def _is_spsd(X): - return is_symmetric(X) and is_positive_semi_definite(X) - - return _is_spsd - - def get_distances(): distances = [ "riemann", diff --git a/tests/test_estimation.py b/tests/test_estimation.py index 2bb17c7b..c6f51767 100644 --- a/tests/test_estimation.py +++ b/tests/test_estimation.py @@ -9,12 +9,15 @@ ) import pytest +from pyriemann.utils.test import (is_sym_pos_def as is_spd, + is_sym_pos_semi_def as is_spsd) + estim = ["cov", "scm", "lwf", "oas", "mcd", "corr", "sch"] coh = ["ordinary", "instantaneous", "lagged", "imaginary"] @pytest.mark.parametrize("estimator", estim) -def test_covariances(estimator, rndstate, is_spd): +def test_covariances(estimator, rndstate): """Test Covariances""" n_trials, n_channels, n_times = 2, 3, 100 x = rndstate.randn(n_trials, n_channels, n_times) @@ -26,7 +29,7 @@ def test_covariances(estimator, rndstate, is_spd): assert is_spd(covmats) -def test_hankel_covariances(rndstate, is_spd): +def test_hankel_covariances(rndstate): """Test Hankel Covariances""" n_trials, n_channels, n_times = 2, 3, 100 x = rndstate.randn(n_trials, n_channels, n_times) @@ -38,7 +41,7 @@ def test_hankel_covariances(rndstate, is_spd): assert is_spd(covmats) -def test_hankel_covariances_delays(rndstate, is_spd): +def test_hankel_covariances_delays(rndstate): n_trials, n_channels, n_times = 2, 3, 100 x = rndstate.randn(n_trials, n_channels, n_times) cov = HankelCovariances(delays=[1, 2]) @@ -50,7 +53,7 @@ def test_hankel_covariances_delays(rndstate, is_spd): @pytest.mark.parametrize("estimator", estim) @pytest.mark.parametrize("svd", [None, 2]) -def test_erp_covariances(estimator, svd, rndstate, get_labels, is_spsd): +def test_erp_covariances(estimator, svd, rndstate, get_labels): """Test fit ERPCovariances""" n_classes, n_trials, n_channels, n_times = 2, 4, 3, 100 x = rndstate.randn(n_trials, n_channels, n_times) @@ -66,7 +69,7 @@ def test_erp_covariances(estimator, svd, rndstate, get_labels, is_spsd): assert is_spsd(covmats) -def test_erp_covariances_classes(rndstate, get_labels, is_spsd): +def test_erp_covariances_classes(rndstate, get_labels): n_trials, n_channels, n_times, n_classes = 4, 3, 100, 2 x = rndstate.randn(n_trials, n_channels, n_times) labels = get_labels(n_trials, n_classes) @@ -83,7 +86,7 @@ def test_erp_covariances_svd_error(rndstate): @pytest.mark.parametrize("est", estim) -def test_xdawn_est(est, rndstate, get_labels, is_spsd): +def test_xdawn_est(est, rndstate, get_labels): """Test fit XdawnCovariances""" n_classes, nfilter = 2, 2 n_trials, n_channels, n_times = 4, 6, 100 @@ -105,7 +108,7 @@ def test_xdawn_est(est, rndstate, get_labels, is_spsd): @pytest.mark.parametrize("xdawn_est", estim) -def test_xdawn_covariances_est(xdawn_est, rndstate, get_labels, is_spsd): +def test_xdawn_covariances_est(xdawn_est, rndstate, get_labels): """Test fit XdawnCovariances""" n_classes, nfilter = 2, 2 n_trials, n_channels, n_times = 4, 6, 100 @@ -127,7 +130,7 @@ def test_xdawn_covariances_est(xdawn_est, rndstate, get_labels, is_spsd): @pytest.mark.parametrize("nfilter", [2, 4]) -def test_xdawn_covariances_nfilter(nfilter, rndstate, get_labels, is_spsd): +def test_xdawn_covariances_nfilter(nfilter, rndstate, get_labels): """Test fit XdawnCovariances""" n_classes, n_trials, n_channels, n_times = 2, 4, 8, 100 x = rndstate.randn(n_trials, n_channels, n_times) @@ -147,7 +150,7 @@ def test_xdawn_covariances_nfilter(nfilter, rndstate, get_labels, is_spsd): assert is_spsd(covmats) -def test_xdawn_covariances_applyfilters(rndstate, get_labels, is_spsd): +def test_xdawn_covariances_applyfilters(rndstate, get_labels): n_classes, nfilter = 2, 2 n_trials, n_channels, n_times = 4, 6, 100 x = rndstate.randn(n_trials, n_channels, n_times) @@ -159,7 +162,7 @@ def test_xdawn_covariances_applyfilters(rndstate, get_labels, is_spsd): assert is_spsd(covmats) -def test_cosp_covariances(rndstate, is_spsd): +def test_cosp_covariances(rndstate): """Test fit CospCovariances""" n_trials, n_channels, n_times = 2, 3, 1000 x = rndstate.randn(n_trials, n_channels, n_times) @@ -175,7 +178,7 @@ def test_cosp_covariances(rndstate, is_spsd): @pytest.mark.parametrize("coh", coh) -def test_coherences(coh, rndstate, is_spsd): +def test_coherences(coh, rndstate): """Test fit Coherences""" n_trials, n_channels, n_times = 2, 5, 200 x = rndstate.randn(n_trials, n_channels, n_times) @@ -193,7 +196,7 @@ def test_coherences(coh, rndstate, is_spsd): @pytest.mark.parametrize("shrinkage", [0.1, 0.9]) -def test_shrinkage(shrinkage, rndstate, is_spd): +def test_shrinkage(shrinkage, rndstate): """Test Shrinkage""" n_trials, n_channels, n_times = 2, 3, 100 x = rndstate.randn(n_trials, n_channels, n_times) diff --git a/tests/test_sampling.py b/tests/test_sampling.py new file mode 100644 index 00000000..f057eb3d --- /dev/null +++ b/tests/test_sampling.py @@ -0,0 +1,52 @@ +import pytest +import numpy as np + +from pyriemann.datasets.sampling import (sample_gaussian_spd, + generate_random_spd_matrix) +from pyriemann.utils.distance import distance_riemann +from pyriemann.utils.test import is_sym_pos_def as is_spd + + +def test_sample_gaussian_spd(): + """Test Riemannian Gaussian sampling.""" + n_matrices, n_dim, sigma = 50, 16, 2. + mean = np.eye(n_dim) + X = sample_gaussian_spd(n_matrices, mean, sigma, random_state=None) + assert X.shape == (n_matrices, n_dim, n_dim) # X shape mismatch + assert is_spd(X) # X is an array of SPD matrices + + +def test_generate_random_spd_matrix(): + """Test generating random SPD matrix""" + n_dim = 16 + X = generate_random_spd_matrix(n_dim, random_state=None) + assert X.shape == (n_dim, n_dim) # X shape mismatch + assert is_spd(X) # X is a SPD matrix + + +def test_sigma_gaussian_spd(): + """Test sigma parameter from Riemannian Gaussian sampling.""" + n_matrices, n_dim, sig_1, sig_2 = 50, 8, 1., 2. + mean = np.eye(n_dim) + X1 = sample_gaussian_spd(n_matrices, mean, sig_1, random_state=None) + X2 = sample_gaussian_spd(n_matrices, mean, sig_2, random_state=None) + avg_d1 = np.mean([distance_riemann(X1_i, mean) for X1_i in X1]) + avg_d2 = np.mean([distance_riemann(X2_i, mean) for X2_i in X2]) + assert avg_d1 < avg_d2 + + +def test_functions_error(): + n_matrices, n_dim = 10, 16 + mean, sigma = np.eye(n_dim), 2. + with pytest.raises(ValueError): # mean is not a matrix + sample_gaussian_spd(n_matrices, np.ones(n_dim), sigma) + with pytest.raises(ValueError): # sigma is not a scalar + sample_gaussian_spd(n_matrices, mean, np.ones(n_dim)) + with pytest.raises(ValueError): # n_matrices is negative + sample_gaussian_spd(-n_matrices, mean, sigma) + with pytest.raises(ValueError): # n_matrices is not an integer + sample_gaussian_spd(4.2, mean, sigma) + with pytest.raises(ValueError): # n_dim is not an integer + generate_random_spd_matrix(1.7) + with pytest.raises(ValueError): # n_dim is negative + generate_random_spd_matrix(-n_dim) diff --git a/tests/test_simulated.py b/tests/test_simulated.py new file mode 100644 index 00000000..87f43c58 --- /dev/null +++ b/tests/test_simulated.py @@ -0,0 +1,78 @@ +import pytest +import numpy as np + +from pyriemann.datasets.sampling import generate_random_spd_matrix +from pyriemann.datasets.simulated import (make_gaussian_blobs, + make_outliers) +from pyriemann.utils.test import is_sym_pos_def as is_spd + + +def test_gaussian_blobs(): + """Test function for sampling Gaussian blobs.""" + n_matrices, n_dim = 50, 8 + X, y = make_gaussian_blobs(n_matrices=n_matrices, + n_dim=n_dim, + class_sep=2.0, + class_disp=1.0, + return_centers=False, + random_state=None) + assert X.shape == (2*n_matrices, n_dim, n_dim) # X shape mismatch + assert is_spd(X) # X is an array of SPD matrices + assert y.shape == (2*n_matrices,) # y shape mismatch + assert np.unique(y).shape == (2,) # Unexpected number of classes + assert sum(y == 0) == n_matrices # Unexpected number of samples in class 0 + assert sum(y == 1) == n_matrices # Unexpected number of samples in class 1 + _, _, centers = make_gaussian_blobs(n_matrices=1, + n_dim=n_dim, + class_sep=2.0, + class_disp=1.0, + return_centers=True, + random_state=None) + assert centers.shape == (2, n_dim, n_dim) # centers shape mismatch + + +def test_generate_random_spd_matrix(): + """Test function for sampling outliers""" + n_matrices, n_dim, sigma = 100, 8, 1. + mean = generate_random_spd_matrix(n_dim) + X = make_outliers(n_matrices=n_matrices, + mean=mean, + sigma=sigma, + outlier_coeff=10, + random_state=None) + assert X.shape == (n_matrices, n_dim, n_dim) # X shape mismatch + assert is_spd(X) # X is an array of SPD matrices + + +def test_functions_error(): + n_matrices, n_dim, class_sep, class_disp = 10, 16, 2., 1. + with pytest.raises(ValueError): # n_matrices is not an integer + make_gaussian_blobs(n_matrices=float(n_matrices), + n_dim=n_dim, + class_sep=class_sep, + class_disp=class_disp) + with pytest.raises(ValueError): # n_matrices is negative + make_gaussian_blobs(n_matrices=-n_matrices, + n_dim=n_dim, + class_sep=class_sep, + class_disp=class_disp) + with pytest.raises(ValueError): # n_dim is not an integer + make_gaussian_blobs(n_matrices=n_matrices, + n_dim=float(n_dim), + class_sep=class_sep, + class_disp=class_disp) + with pytest.raises(ValueError): # n_dim is negative + make_gaussian_blobs(n_matrices=n_matrices, + n_dim=-n_dim, + class_sep=class_sep, + class_disp=class_disp) + with pytest.raises(ValueError): # class_sep is not a scalar + make_gaussian_blobs(n_matrices=n_matrices, + n_dim=n_dim, + class_sep=class_sep * np.ones(n_dim), + class_disp=class_disp) + with pytest.raises(ValueError): # class_disp is not a scalar + make_gaussian_blobs(n_matrices=n_matrices, + n_dim=n_dim, + class_sep=class_sep, + class_disp=class_disp * np.ones(n_dim))