In [221]:
import numpy as np
import scipy.sparse
from sklearn.exceptions import NotFittedError
from sklearn.metrics import pairwise_kernels
from sklearn.decomposition import KernelPCA
import torch

In [218]:
def centered_kernel(X, X_index, Y, Y_index, metric="linear", filter_params=True, n_jobs=None, **kwds):
    """Compute the group-mean-centered kernel between arrays X and Y.
    
    This method takes either a vector array and returns a kernel matrix. 
    For the mean centering of the kernel within each group, the index to which 
    each sample belongs to must be privided as ``X_index`` and ``Y_index``. 
    When there is a sample that does not belong to any group, its index 
    must be set ``nan``, and centering is not applied to the sample.
    
    Parameters
    ----------
    X : array [n_samples_a, n_features]
        A feature array which is sorted accoding to the group index. 
        Note that if the index of an sample is 'np.nan', the sample must come at the last of the array.  
    X_index: integer array [n_samples]
        A sorted array of indices to which samples in X belongs to. When a sample does 
        not belong to any group, its index must be ``np.nan``. 
    Y: array [n_samples_b, 1 + n_features]
        A second feature array which is sorted accoding to the group index. 
        Note that if the index of an sample is 'np.nan', the sample must come at the last of the array.  
    Y_index: integer array [n_samples]
        A sorted array of indices to which samples in Y belongs to. When a sample does 
        not belong to any group, its index must be ``np.nan``. 
    metric : string
        The metric to use when calculating kernel between instances in a
        feature array. Valid values for metric are:
        ['additive_chi2', 'chi2', 'linear', 'poly', 'polynomial', 'rbf',
        'laplacian', 'sigmoid', 'cosine']
        the metric must be one of the metrics in sklearn.pairwise.PAIRWISE_KERNEL_FUNCTIONS. 
    filter_params : boolean
        Whether to filter invalid parameters or not.
    n_jobs : int or None, optional (default=None)
        The number of jobs to use for the computation. This works by breaking
        down the pairwise matrix into n_jobs even slices and computing them in
        parallel.
        ``None`` means 1 unless in a :obj:`joblib.parallel_backend` context.
        ``-1`` means using all processors. See :term:`Glossary <n_jobs>`
        for more details.
    **kwds : optional keyword parameters
        Any further parameters are passed directly to the kernel function.
        
    Returns
    -------
    K : array [n_samples_a, n_samples_b]
        A mean-centered kernel matrix K such that K_{i, j} is the kernel between the
        ith and jth vectors of the given matrix X, if Y is None.
        If Y is not None, then K_{i, j} is the kernel between the ith array
        from X and the jth array from Y.
        
    Examples
    --------
    >>> from src.kernel_pca
    """
    KXY = pairwise_kernels(X, Y, metric=metric, filter_params=filter_params, n_jobs=n_jobs, **kwds)
    DX = _get_design_matrix(X_index)
    DY = _get_design_matrix(Y_index)
    KXY_centered = DX @ KXY @ DY
    return KXY_centered
    
def _get_design_matrix(index):
    """Calculate the design matrix for group-mean-centering
    
    Example:
    Given ``index = [1, 1, 2, 2, np.nan]``, the design matrix becomes
                          [[1/2, 1/2,    0,   0, 0],
                           [1/2, 1/2,    0,   0, 0],
    np.eye(5) - np.array(  [  0,   0,  1/2, 1/2, 0],  )
                           [  0,   0,  1/2, 1/2, 0],
                           [  0,   0,    0,   0, 0]]
    """
    N = index.shape[0]
    mask_nan = np.isnan(index)
    N_nan = sum(mask_nan)
    _, group_counts = np.unique(index[~mask_nan], return_counts=True)
    assert np.all(group_counts > 1)  # There must be more than 2 elements in each group.
    blocks = [np.full([c, c], 1./c) for c in group_counts] + [0.0] * N_nan
    design_matrix = scipy.sparse.eye(N) - scipy.sparse.block_diag(blocks)
    return design_matrix

In [219]:
def _test_centered_kernel():
    
    D = 500
    N = 1000
    N_grouped = 900
    N_nan = N - N_grouped
    num_groups = 100
    
    # X contains groups of various size and non-centered samples
    X = np.random.randn(N * D).reshape(N, D)
    X_index = np.concatenate([
        np.repeat(np.arange(num_groups), 2),  
        np.random.randint(0, num_groups, N_grouped - 2*num_groups),
        np.full([N_nan], np.nan)
    ])
    X_index.sort()
    
    
    # Y only contains non-centered samples
    Y = np.random.randn(N * D).reshape(N, D)
    Y_index = np.array([np.nan] * N)
    
    Kxx = centered_kernel(X, X_index, X, X_index, metric="rbf")
    assert(np.allclose(Kxx, Kxx.T))
    eigvals = np.linalg.eigvalsh(Kxx)
    assert(np.all(-1e-8 < eigvals))  # Kernel matrix must be positive definite.
    assert(np.sum(1e-8 < eigvals) == N - num_groups)  # The effective rank of the matrix should be (N - num_groups).
    
    Kxy = centered_kernel(X, X_index, Y, Y_index, metric="rbf")
    Kyx = centered_kernel(Y, Y_index, X, X_index, metric="rbf")
    assert(np.allclose(Kxy, Kyx.T))  # Kernel matrix must be symmetric. 

In [220]:
_test_centered_kernel()

In [181]:
np.concatenate([
    np.arange(10).reshape(10, 1), 
    np.arange(20).reshape(10, 2)
], axis=1)

array([[ 0,  0,  1],
       [ 1,  2,  3],
       [ 2,  4,  5],
       [ 3,  6,  7],
       [ 4,  8,  9],
       [ 5, 10, 11],
       [ 6, 12, 13],
       [ 7, 14, 15],
       [ 8, 16, 17],
       [ 9, 18, 19]])

In [None]:
class CenteredKernelPCA:
    """Kernel PCA with mean-centering in feature space within each group.
    
    Remark
    ------
    The design of this class is based on the sklearn.decomposition.KernelPCA.
    """
    def __init__(self, n_components=None, *, kernel="linear",
                 gamma=None, degree=3, coef0=1, kernel_params=None,
                 alpha=1.0, eigen_solver='auto',
                 tol=0, max_iter=None, remove_zero_eig=False,
                 random_state=None, copy_X=True, n_jobs=None):
        self.n_components = n_components
        self.kernel = kernel
        self.kernel_params = kernel_params
        self.gamma = gamma
        self.degree = degree
        self.coef0 = coef0
        self.alpha = alpha
        self.eigen_solver = eigen_solver
        self.remove_zero_eig = remove_zero_eig
        self.tol = tol
        self.max_iter = max_iter
        self.random_state = random_state
        self.n_jobs = n_jobs
        self.copy_X = copy_X
        self.dual_coef_ = None
        assert callable(self.kernel)

    def _get_kernel(self, X, X_index, Y, Y_index):
        params = self.kernel_params or {}
        return pairwise_kernels(X, X_index, Y, Y_index, metric=self.kernel,
                                filter_params=True, n_jobs=self.n_jobs,
                                **params)

    def _fit_transform(self, K):
        """ Fit's using kernel K"""
        # center kernel
        K = self._centerer.fit_transform(K)

        if self.n_components is None:
            n_components = K.shape[0]
        else:
            n_components = min(K.shape[0], self.n_components)

        # compute eigenvectors
        if self.eigen_solver == 'auto':
            if K.shape[0] > 200 and n_components < 10:
                eigen_solver = 'arpack'
            else:
                eigen_solver = 'dense'
        else:
            eigen_solver = self.eigen_solver

        if eigen_solver == 'dense':
            self.lambdas_, self.alphas_ = linalg.eigh(
                K, eigvals=(K.shape[0] - n_components, K.shape[0] - 1))
        elif eigen_solver == 'arpack':
            random_state = check_random_state(self.random_state)
            # initialize with [-1,1] as in ARPACK
            v0 = random_state.uniform(-1, 1, K.shape[0])
            self.lambdas_, self.alphas_ = eigsh(K, n_components,
                                                which="LA",
                                                tol=self.tol,
                                                maxiter=self.max_iter,
                                                v0=v0)

        # make sure that the eigenvalues are ok and fix numerical issues
        self.lambdas_ = _check_psd_eigenvalues(self.lambdas_,
                                               enable_warnings=False)

        # flip eigenvectors' sign to enforce deterministic output
        self.alphas_, _ = svd_flip(self.alphas_,
                                   np.zeros_like(self.alphas_).T)

        # sort eigenvectors in descending order
        indices = self.lambdas_.argsort()[::-1]
        self.lambdas_ = self.lambdas_[indices]
        self.alphas_ = self.alphas_[:, indices]

        # remove eigenvectors with a zero eigenvalue (null space) if required
        if self.remove_zero_eig or self.n_components is None:
            self.alphas_ = self.alphas_[:, self.lambdas_ > 0]
            self.lambdas_ = self.lambdas_[self.lambdas_ > 0]

        # Maintenance note on Eigenvectors normalization
        # ----------------------------------------------
        # there is a link between
        # the eigenvectors of K=Phi(X)'Phi(X) and the ones of Phi(X)Phi(X)'
        # if v is an eigenvector of K
        #     then Phi(X)v  is an eigenvector of Phi(X)Phi(X)'
        # if u is an eigenvector of Phi(X)Phi(X)'
        #     then Phi(X)'u is an eigenvector of Phi(X)'Phi(X)
        #
        # At this stage our self.alphas_ (the v) have norm 1, we need to scale
        # them so that eigenvectors in kernel feature space (the u) have norm=1
        # instead
        #
        # We COULD scale them here:
        #       self.alphas_ = self.alphas_ / np.sqrt(self.lambdas_)
        #
        # But choose to perform that LATER when needed, in `fit()` and in
        # `transform()`.

        return K

    def fit_inverse_transform(self, X):
        if hasattr(X, "tocsr"):
            raise NotImplementedError("Inverse transform not implemented for "
                                      "sparse matrices!")

        n_samples = X_transformed.shape[0]
        K = self._get_kernel(X_transformed)
        K.flat[::n_samples + 1] += self.alpha
        self.dual_coef_ = linalg.solve(K, X, sym_pos=True, overwrite_a=True)
        self.X_transformed_fit_ = X_transformed

    def fit(self, X, y=None):
        """Fit the model from data in X.
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Training vector, where n_samples in the number of samples
            and n_features is the number of features.
        Returns
        -------
        self : object
            Returns the instance itself.
        """
        X = self._validate_data(X, accept_sparse='csr', copy=self.copy_X)
        self._centerer = KernelCenterer()
        K = self._get_kernel(X)
        self._fit_transform(K)

        if self.fit_inverse_transform:
            # no need to use the kernel to transform X, use shortcut expression
            X_transformed = self.alphas_ * np.sqrt(self.lambdas_)

            self._fit_inverse_transform(X_transformed, X)

        self.X_fit_ = X
        return self

    def fit_transform(self, X, y=None, **params):
        """Fit the model from data in X and transform X.
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
            Training vector, where n_samples in the number of samples
            and n_features is the number of features.
        Returns
        -------
        X_new : array-like, shape (n_samples, n_components)
        """
        self.fit(X, **params)

        # no need to use the kernel to transform X, use shortcut expression
        X_transformed = self.alphas_ * np.sqrt(self.lambdas_)

        if self.fit_inverse_transform:
            self._fit_inverse_transform(X_transformed, X)

        return X_transformed

    def transform(self, X):
        """Transform X.
        Parameters
        ----------
        X : array-like, shape (n_samples, n_features)
        Returns
        -------
        X_new : array-like, shape (n_samples, n_components)
        """
        check_is_fitted(self)

        # Compute centered gram matrix between X and training data X_fit_
        K = self._centerer.transform(self._get_kernel(X, self.X_fit_))

        # scale eigenvectors (properly account for null-space for dot product)
        non_zeros = np.flatnonzero(self.lambdas_)
        scaled_alphas = np.zeros_like(self.alphas_)
        scaled_alphas[:, non_zeros] = (self.alphas_[:, non_zeros]
                                       / np.sqrt(self.lambdas_[non_zeros]))

        # Project with a scalar product between K and the scaled eigenvectors
        return np.dot(K, scaled_alphas)

    def inverse_transform(self, X):
        """Transform X back to original space.
        Parameters
        ----------
        X : array-like, shape (n_samples, n_components)
        Returns
        -------
        X_new : array-like, shape (n_samples, n_features)
        References
        ----------
        "Learning to Find Pre-Images", G BakIr et al, 2004.
        """
        if not self.dual_coef_:
            raise NotFittedError("The fit_inverse_transform was not called."
                                 " set to True when instantiating and hence "
                                 "the inverse transform is not available.")

        K = self._get_kernel(X, self.X_transformed_fit_)
        n_samples = self.X_transformed_fit_.shape[0]
        K.flat[::n_samples + 1] += self.alpha
        return np.dot(K, self.dual_coef_)

In [None]:
import numpy as np
from sklearn.decomposition import KernelPCA
from src.kernel_pca import centered_kernel
N = 100
D = 10
X = np.random.randn(N * D).reshape(N, D)
X_index = np.repeat(np.arange(N//2), 2)
X_combined = np.concatenate([X_index.reshape(N, 1), X], axis=1)


Y = np.random.randn(N * D).reshape(N, D)
Y_index = np.repeat(np.arange(N//2), 2)
Y_combined = np.concatenate([X_index.reshape(N, 1), X], axis=1)

In [None]:
kernel = lambda x, y: pairwise_kernels(x, y, metric="rbf")
D = 500
N = 1000
N_grouped = 900
N_nan = N - N_grouped
num_groups = 100

# X contains groups of various size and non-centered samples
X = np.random.randn(N * D).reshape(N, D)
X_index = np.concatenate([
    np.repeat(np.arange(num_groups), 2),  
    np.random.randint(0, num_groups, N_grouped - 2*num_groups),
    np.full([N_nan], np.nan)
])
X_index.sort()


# Y only contains non-centered samples
Y = np.random.randn(N * D).reshape(N, D)
Y_index = np.array([np.nan] * N)

Kxx = centered_kernel(X, X_index, X, X_index, kernel)
assert(np.allclose(Kxx, Kxx.T))  # Kernel matrix must be symmetric. 
eigvals = np.linalg.eigvalsh(Kxx)
assert(np.all(-1e-8 < eigvals))  # Kernel matrix must be positive definite.
assert(np.sum(1e-8 < eigvals) == N - num_groups)  # The effective rank of the matrix should be (N - num_groups).

Kxy = centered_kernel(X, X_index, Y, Y_index, kernel)
Kyx = centered_kernel(Y, Y_index, X, X_index, kernel)
assert(np.allclose(Kxy, Kyx.T))

In [194]:
x = torch.tensor([1.0], requires_grad=True)

In [195]:
y = x**2

In [196]:
y.backward()

In [198]:
x.grad

tensor([2.])

In [202]:
A = np.eye(10)
A.flat[::10+1]

array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])