In [1]:
import numpy as np
from moabb.datasets import BNCI2014001
from moabb.paradigms import MotorImagery
from pyriemann.estimation import Covariances
from pyriemann.utils.mean import mean_riemann
from pyriemann.utils.distance import distance_riemann
from pyriemann.utils.base import logm, expm
from sklearn.manifold import SpectralEmbedding
import matplotlib.pyplot as plt

# Initialize dataset and paradigm
dataset = BNCI2014001()
paradigm = MotorImagery(n_classes=4)

# Get data for two subjects (source and target)
source_subject = 1
target_subject = 2
X_source, y_source, _ = paradigm.get_data(dataset=dataset, subjects=[source_subject])
X_target, y_target, _ = paradigm.get_data(dataset=dataset, subjects=[target_subject])

# Compute covariance matrices
cov = Covariances(estimator='scm')
C_source = cov.fit_transform(X_source)
C_target = cov.fit_transform(X_target)


BNCI2014001 has been renamed to BNCI2014_001. BNCI2014001 will be removed in version 1.1.
The dataset class name 'BNCI2014001' must be an abbreviation of its code 'BNCI2014-001'. See moabb.datasets.base.is_abbrev for more information.
Choosing from all possible events
 'left_hand': 12
 'right_hand': 12
 'feet': 12
 'tongue': 12>
  warn(f"warnEpochs {epochs}")
 'left_hand': 12
 'right_hand': 12
 'feet': 12
 'tongue': 12>
  warn(f"warnEpochs {epochs}")
 'left_hand': 12
 'right_hand': 12
 'feet': 12
 'tongue': 12>
  warn(f"warnEpochs {epochs}")
 'left_hand': 12
 'right_hand': 12
 'feet': 12
 'tongue': 12>
  warn(f"warnEpochs {epochs}")
 'left_hand': 12
 'right_hand': 12
 'feet': 12
 'tongue': 12>
  warn(f"warnEpochs {epochs}")
 'left_hand': 12
 'right_hand': 12
 'feet': 12
 'tongue': 12>
  warn(f"warnEpochs {epochs}")
 'left_hand': 12
 'right_hand': 12
 'feet': 12
 'tongue': 12>
  warn(f"warnEpochs {epochs}")
 'left_hand': 12
 'right_hand': 12
 'feet': 12
 'tongue': 12>
  warn(f"warnEpoch

In [2]:
def center_matrices(matrices, reference=None):
    """Re-center matrices to identity"""
    if reference is None:
        reference = mean_riemann(matrices)
    
    reference_sqrt = expm(0.5 * logm(reference))
    reference_sqrt_inv = np.linalg.inv(reference_sqrt)
    
    return [reference_sqrt_inv @ matrix @ reference_sqrt_inv for matrix in matrices]

def stretch_matrices(matrices, source_dispersion, target_dispersion):
    """Stretch matrices to match dispersion"""
    s = np.sqrt(source_dispersion / target_dispersion)
    return [expm(s * logm(matrix)) for matrix in matrices]

def compute_rotation(G_source, G_target):
    """Compute rotation matrix using eigendecomposition"""
    evals_source, evecs_source = np.linalg.eigh(G_source)
    evals_target, evecs_target = np.linalg.eigh(G_target)
    return evecs_target @ evecs_source.T

def rpa_transform(C_source, C_target, y_source, y_target):
    """Apply full RPA transformation"""
    # Step 1: Re-center
    M_source = mean_riemann(C_source)
    M_target = mean_riemann(C_target)
    
    C_source_centered = center_matrices(C_source, M_source)
    C_target_centered = center_matrices(C_target, M_target)
    
    # Step 2: Stretch
    d_source = np.mean([distance_riemann(M_source, c)**2 for c in C_source])
    d_target = np.mean([distance_riemann(M_target, c)**2 for c in C_target])
    
    C_target_stretched = stretch_matrices(C_target_centered, d_source, d_target)
    
    # Step 3: Rotate
    unique_labels = np.unique(y_source)
    U = np.eye(C_source[0].shape[0])
    
    for label in unique_labels:
        idx_source = y_source == label
        idx_target = y_target == label
        
        if np.sum(idx_target) > 0:
            G_source = mean_riemann([c for c, flag in zip(C_source_centered, idx_source) if flag])
            G_target = mean_riemann([c for c, flag in zip(C_target_stretched, idx_target) if flag])
            U = compute_rotation(G_source, G_target)
    
    C_target_transformed = [U.T @ c @ U for c in C_target_stretched]
    
    return C_source_centered, C_target_transformed


In [3]:
def visualize_embeddings(C_source, C_target, y_source, y_target, title):
    """Create 2D embedding visualization"""
    # Combine matrices and labels
    C_combined = np.array(C_source + C_target)
    y_combined = np.concatenate([y_source, y_target])
    domain_labels = np.concatenate([np.zeros_like(y_source), np.ones_like(y_target)])
    
    # Compute embedding
    embedding = SpectralEmbedding(n_components=2)
    C_2d = embedding.fit_transform(C_combined.reshape(len(C_combined), -1))
    
    # Plot
    plt.figure(figsize=(10, 8))
    for i, label in enumerate(np.unique(y_combined)):
        # Plot source points
        mask = (y_combined == label) & (domain_labels == 0)
        plt.scatter(C_2d[mask, 0], C_2d[mask, 1], marker='o', 
                   label=f'Source - Class {label}', alpha=0.7)
        
        # Plot target points
        mask = (y_combined == label) & (domain_labels == 1)
        plt.scatter(C_2d[mask, 0], C_2d[mask, 1], marker='^',
                   label=f'Target - Class {label}', alpha=0.7)
    
    plt.title(title)
    plt.legend()
    plt.grid(True)
    return plt

# Main execution


In [4]:
# Apply RPA transformation
C_source_transformed, C_target_transformed = rpa_transform(C_source, C_target, y_source, y_target)

# Visualize before RPA
plt.figure(figsize=(15, 5))
plt.subplot(121)
visualize_embeddings(C_source, C_target, y_source, y_target, "Before RPA")

# Visualize after RPA
plt.subplot(122)
visualize_embeddings(C_source_transformed, C_target_transformed, y_source, y_target, "After RPA")
plt.tight_layout()
plt.show()


AttributeError: 'list' object has no attribute 'shape'