In [5]:
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import scipy as sc

In [3]:
def scaling(X, Y):
    """ Center X, Y and scale

    Returns
    -------
        X, Y, x_mean, y_mean, x_std, y_std
    """
    # center
    x_mean = X.mean(axis=0)
    X -= x_mean
    y_mean = Y.mean(axis=0)
    Y -= y_mean
    # scale
    x_std = X.std(axis=0, ddof=1)
    x_std[x_std == 0.0] = 1.0
    X /= x_std
    y_std = Y.std(axis=0, ddof=1)
    y_std[y_std == 0.0] = 1.0
    Y /= y_std
    return X, Y, x_mean, y_mean, x_std, y_std

In [6]:
def nipals(X, Y, max_iter=500, tol=1e-06):
    """Inner loop of the iterative NIPALS algorithm.
    Provides an alternative to the svd(X'Y); returns the first left and right
    singular vectors of X'Y.  See PLS for the meaning of the parameters.  It is
    similar to the Power method for determining the eigenvectors and
    eigenvalues of a X'Y.
    
    y_score - U
    y_weights - C
    x_score - T
    x_weights - W
    
    Return: vectors w, c
    """
    # 1: initialization of u
    u = Y[:, [0]]
# TEST IT!!!!
    t_old = 0
    ite = 1
    eps = np.finfo(X.dtype).eps
    # Inner loop of the Wold algo.
    while True:
        # w = X^T u / (u^T u)
        w = np.dot(X.T, u) / np.dot(u.T, u)
        
        # If u only has zeros w will only have zeros. In
        # this case add an epsilon to converge to a more acceptable
        # solution
        if np.dot(w.T, w) < eps:
            w += eps
        # w = w / |w|
        w /= (np.sqrt(np.dot(w.T, w)) + eps)
        
        # t = Xw
        t = np.dot(X, w)
        
        # c = Y^T t / (t^T t)
        c = np.dot(Y.T, t) / np.dot(t.T, t)
        
        # c = c / |c|
        c /= np.sqrt(np.dot(c.T, c)) + eps

        # u = Yc
# ????? (TEST IT!!!!)
        u = np.dot(Y, c) / (np.dot(c.T, c) + eps)
        
        # 10: convergence of t
        t_diff = t - t_old
        if np.dot(t_diff.T, t_diff) < tol or Y.shape[1] == 1:
            break 
        if ite == max_iter:
            warnings.warn('Maximum number of iterations reached')
            break
        t_old = t
        ite += 1
    return w, c, ite

In [None]:
from sklearn.utils.extmath import svd_flip

from sklearn.base import BaseEstimator, RegressorMixin, TransformerMixin
from sklearn.utils import check_array, check_consistent_length
from sklearn.externals import six

import warnings
from abc import ABCMeta, abstractmethod
from scipy import linalg
from sklearn.utils.validation import check_is_fitted, FLOAT_DTYPES

pinv2_args = {'check_finite': False}


class _PLS(six.with_metaclass(ABCMeta), BaseEstimator, TransformerMixin,
           RegressorMixin):
    """
    Partial Least Squares (PLS)
    
    """

    @abstractmethod
    def __init__(self, n_components=2, algorithm="nipals", max_iter=500, tol=1e-06):
        self.n_components = n_components
        self.algorithm = algorithm
        self.max_iter = max_iter
        self.tol = tol
        

    def fit(self, X, Y):
        """
        Fit model to data.
        
        """

        # copy since this will contains the residuals (deflated) matrices
        check_consistent_length(X, Y)
        X = check_array(X, dtype=np.float64, copy=True)
        Y = check_array(Y, dtype=np.float64, copy=True, ensure_2d=False)
        if Y.ndim == 1:
            Y = Y.reshape(-1, 1)

        n, p = X.shape
        q = Y.shape[1]

        if self.n_components < 1 or self.n_components > p:
            raise ValueError('Invalid number of components: %d' %
                             self.n_components)
        if self.algorithm not in ("nipals", "nipals_nonlinear"):
            raise ValueError("""Got algorithm %s when only 
                            'nipals' are known""" % self.algorithm)
        # Scale (in place)
        X, Y, self.x_mean_, self.y_mean_, self.x_std_, self.y_std_ = (
            scaling(X, Y))
        # Residuals (deflated) matrices
        Xk = X
        Yk = Y
        # Results matrices
        self.T = np.zeros((n, self.n_components))
        self.U = np.zeros((n, self.n_components))
        self.W = np.zeros((p, self.n_components))
        self.C = np.zeros((q, self.n_components))
        self.P = np.zeros((p, self.n_components))
        self.Q = np.zeros((q, self.n_components))
        self.n_iter_ = []

        
        # NIPALS algo: outer loop, over components
        for k in range(self.n_components):
            if np.all(np.dot(Yk.T, Yk) < np.finfo(np.double).eps):
                # Yk constant
                warnings.warn('Y residual constant at iteration %s' % k)
                break
            # 1) weights estimation (inner loop)
            # -----------------------------------
            w, c, n_iter_ = nipals(X=Xk, Y=Yk, max_iter=self.max_iter,\
                        tol=self.tol)
            self.n_iter_.append(n_iter_)
            
            # Forces sign stability of x_weights and y_weights
            # Sign undeterminacy issue from svd if algorithm == "svd"
            # and from platform dependent computation if algorithm == 'nipals'
            w, c = svd_flip(w, c.T)
            c = c.T
            # compute scores
            t = np.dot(Xk, w)
            y_ss = np.dot(c.T, c) + np.finfo(np.double).eps
            u = np.dot(Yk, c) / y_ss
            
            # test for null variance
            if np.dot(t.T, t) < np.finfo(np.double).eps:
                warnings.warn('X scores are null at iteration %s' % k)
                break
            # 2) Deflation (in place)
            # - regress Xk's on x_score
            p = np.dot(Xk.T, t) / (np.dot(t.T, t) + np.finfo(np.double).eps)
            # - subtract rank-one approximations to obtain remainder matrix
            Xk -= np.dot(t, p.T)
# TO THINK HOW TO CHANGE TILDE Y
            # - regress Yk's on x_score, then subtract rank-one approx.
            q = (np.dot(Yk.T, t) / np.dot(t.T, t))
            Yk -= np.dot(t, q.T)
            # 3) Store weights, scores and loadings # Notation:
            self.T[:, k] = t.ravel()  # T
            self.U[:, k] = u.ravel()  # U
            self.W[:, k] = w.ravel()  # W
            self.C[:, k] = c.ravel()  # C
            self.P[:, k] = p.ravel()  # P
            self.Q[:, k] = q.ravel()  # Q
        # Such that: X = TP' + Err and Y = UQ' + Err

        # 4) rotations from input space to transformed space (scores)
        # T = X W(P'W)^-1 = XW* (W* : p x k matrix)
        # U = Y C(Q'C)^-1 = YC* (W* : q x k matrix)
        self.x_rotations_ = np.dot(
            self.W, linalg.pinv2(np.dot(self.P.T, self.W), **pinv2_args))
        if Y.shape[1] > 1:
            self.y_rotations_ = np.dot(
                self.C, linalg.pinv2(np.dot(self.Q.T, self.C), **pinv2_args))
        else:
            self.Q = np.ones(1)

        # Estimate regression coefficient
        # Regress Y on T
        # Y = TQ' + Err,
        # Then express in function of X
        # Y = X W(P'W)^-1Q' + Err = XB + Err
        # => B = W*Q' (p x q)
        self.coef_ = np.dot(self.P, self.Q.T)
        self.coef_ = (1. / self.x_std_.reshape((p, 1)) * self.coef_ *
                      self.y_std_)
        return self

    def transform(self, X, Y=None):
        """Apply the dimension reduction learned on the train data.

        x_scores if Y is not given, (x_scores, y_scores) otherwise.
        """
        check_is_fitted(self, 'x_mean_')
        X = check_array(X, copy=True, dtype=FLOAT_DTYPES)
        # Normalize
        X -= self.x_mean_
        X /= self.x_std_
        # Apply rotation
        T = np.dot(X, self.P)
        if Y is not None:
            Y = check_array(Y, ensure_2d=False, copy=True, dtype=FLOAT_DTYPES)
            if Y.ndim == 1:
                Y = Y.reshape(-1, 1)
# G?
            Y -= self.y_mean_
            Y /= self.y_std_
# MB TILDE Y
            y_scores = np.dot(Y, self.Q)
            return T, U

        return T

    def predict(self, X):
        """Apply the dimension reduction learned on the train data.
        
        This call requires the estimation of a p x q matrix, which may
        be an issue in high dimensional space.
        """
        check_is_fitted(self, 'x_mean_')
        X = check_array(X, copy=True, dtype=FLOAT_DTYPES)
        # Normalize
        X -= self.x_mean_
        X /= self.x_std_
        Ypred = np.dot(X, self.coef_)
        return Ypred + self.y_mean_

    def fit_transform(self, X, y=None, **fit_params):
        """
        Learn and apply the dimension reduction on the train data.

        x_scores if Y is not given, (x_scores, y_scores) otherwise.
        """
        return self.fit(X, y, **fit_params).transform(X, y)


class PLSRegression(_PLS):
    """
    PLS regression
    """

    def __init__(self, n_components=2,max_iter=500, tol=1e-06):
        super(PLSRegression, self).__init__(
            n_components=n_components, max_iter=max_iter, tol=tol)