# Partial Least Squares

### Introduction

Partial Least Squares (PLS) differs from Principal Components Regression (PCR) mainly in the manner that the orthogonal directions are chosen. Instead of an unsupervised manner, using the variance of the data as a guide, PLS **also** takes into account the output labels $\textbf{y}$.

In this notebook, we shall implement Partial Least Squares from scratch and apply it to a synthetically-generated dataset.

In [1]:
import numpy as np
import numpy.typing as npt
import sklearn.datasets
import sklearn.linear_model

# bad practice in general, but useful to declutter output
import warnings
warnings.filterwarnings("ignore")

### Algorithm

The PLS algorithm goes as follows:

**Step 1**: Standardize each feature ($\textbf{x}_j$) to have mean $0$ and variance $1$. Also standardize the output labels $\textbf{y}$ the same way.

**Step 2**: Initialize the algorithm with $\hat{\textbf{y}}^{(0)} = \text{mean}(y)$ and $\textbf{x}_j^{(0)} = \textbf{x}_j$ for all $p$ features ($j = 1, ..., p$).

**Step 3**: Loop over the following steps $p$ times ($m = 1, ..., p$):

**Step 3a**: Define $\phi_{m, j} = \textbf{x}_{j}^{(m-1)}\cdot \textbf{y}$ for all $j$. This is the projection of $\textbf{y}$ on (orthogonalized) feature $\textbf{x}^{(m-1)}_{j}$. See step 3e for more details.

**Step 3b**: Define $\textbf{z}_{m} = \sum_{j=1}^{p} \phi_{m, j}\textbf{x}_{j}^{(m-1)}$. This is the $m^{\text{th}}$ "derived direction".

**Step 3c**: Now that we have the derived direction, perform univariate regression to find out its corresponding coefficient: $$ \hat{\theta}_{m} = \frac{\textbf{z}_{m}\cdot\textbf{y}}{|| \textbf{z}_{m} ||^2}$$
We can do this sort of direct projection because the derived directions are orthogonal to each other.

**Step 3d**: Next, update the prediction, $\hat{y}^{(m)} = \hat{y}^{(m-1)} + \hat{\theta}_{m}\textbf{z}_{m}$

**Step 3e**: Finally, before moving on to the next step, remove the contribution of the $m^{\text{th}}$ derived direction by orthogonalizing the input features. This naturally ensures that all of the derived directions are orthogonal to each other: $$ \textbf{x}_{j}^{(m)} = \textbf{x}_{j}^{(m-1)} - \left(\frac{\textbf{z}_{m} \cdot \textbf{x}_{j}^{(m-1)} }{|| \textbf{z}_{m} ||^2}\right)\textbf{z}_{m}$$


Finally, note that at the end of the day, $\textbf{z}_{m}$ and $\textbf{x}_{j}$ are linearly related to each other. When the whole procedure is done, we can invert the relationship between them to get the coefficients of the model in terms of the original features, $\textbf{x}_{j}$. We illustrate this in the "Implementation" section.

### Implementation

In [57]:
class PLS():

    def __init__(self):


        # X @ beta.T = yhat
        self._beta = None 

        # parameters for standardization, learnt from the training set during the `fit` call.
        # used during the `predict` call as well.

        self._mu_X  = None
        self._mu_y  = None
        self._std_X  = None
        self._std_y  = None

        # number of ignored features
        self._num_ignored_features = None

        return None
    
    def _compute_statistics(self, X, y):
        
        self._mu_X  = np.mean(X, axis=0)
        self._mu_y  = np.mean(y)
        self._std_X  = np.std(X, axis=0)
        self._std_y  = np.std(y, axis=0)

        return None

    
    def _standardize(self, X, y):
        
        X_standardized = (X - self._mu_X)/self._std_X
        y_standardized = (y - self._mu_y)/self._std_y
        
        return X_standardized, y_standardized
    
    def fit(self, X, y):

        self._compute_statistics(X, y)
        X0, y0 = self._standardize(X, y)

        N, p = X0.shape
        
        # formally the mean of y, but note that after standardization that mean is 0
        yhat = np.zeros((N, 1))
        
        Z = np.zeros((N, p))
        theta = np.zeros((1, p))
        
        # "forward" pass - computing the derived directions and associated parameters, theta.

        num_ignored_features : int = 0 # keep track of ignored features.

        for m in range(p):
            
            z_m = np.zeros((N, 1))

            for j in range(p):
                x_j = X0[:, j].reshape((N, 1))
                phi_m_j = np.dot(x_j.T, y0)
                z_m = z_m + phi_m_j*x_j

            if (np.linalg.norm(z_m) >= 1e-10): # drop any components that are "too small"

                z_m = z_m/np.linalg.norm(z_m) # unit norm (good practice)
                theta_m = np.dot(z_m.T, y0)/np.dot(z_m.T, z_m)

                theta[0, m] = theta_m

                yhat = yhat + (theta_m * z_m)

                for j in range(p):
                    x_j = X0[:, j].reshape((N, 1))
                    x_j = x_j - ((np.dot(z_m.T, x_j)/np.dot(z_m.T, z_m))*z_m)
                    X0[:, j] = x_j.reshape(-1)
                
                Z[:, m] = z_m.reshape(-1)
            else:
                num_ignored_features += 1
                
        self._num_ignored_features = num_ignored_features

        # "backward" pass - recovering parameters of X (beta) from Z (and theta)
        # X@A = Z, yhat = Z @ theta.T => yhat = X @ (A@theta.T) = X @ (beta.T)
        # use pinv() (pseudoinverse) to deal with the case when X is not invertible or has high condition number.

        A = np.matmul(np.linalg.pinv(X), Z)
        beta = np.matmul(A, theta.T).T
        
        self._beta = beta

        return None
    
    def predict(self, X):
        
        X0 = (X - self._mu_X)/self._std_X # standardize the features.
        yhat0 = np.matmul(X0,  self._beta.T) # "standardized" prediction.
        yhat = (yhat0 * self._std_y) + self._mu_y # rescaling to scale of training data.

        return yhat

In [72]:
# Utility functions - mainly used for generating helpful information and performance summaries
# Also used for working with Ordinary least squares solution by Normal equations method.

def generate_data(n_samples : int, n_features : int, collinear : bool = False, n_collinear : int = 0, corr_strength : float = 0.9, noise : float = 1.0):
    '''
    Wrapper to generate data for regression. Same as `LinearRegression.ipynb`
    '''
    X, y, coef = sklearn.datasets.make_regression(n_samples = n_samples, n_features=n_features,
                                 n_informative=n_features - (collinear*n_collinear), n_targets=1, 
                                 bias=2.0, effective_rank=n_features - (collinear*n_collinear),
                                 noise=noise, shuffle=True, random_state=42, coef=True)
    y : npt.NDArray[np.float64] = y.reshape(-1, 1)
    
    if (collinear==True):
        for i in range(n_features - n_collinear, n_features):
            base_feature = np.random.randint(0, n_features - n_collinear)
            X[:, i] = corr_strength * X[:, base_feature] + (1 - corr_strength) * np.random.randn(n_samples) * noise
            
    return X, y, coef

def MaxVif(X : npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
    ''' Returns the Maximum VIF amongst all the features'''
    N, K = X.shape
    vif_i : list[float] = []
    for i in range(K):
        x_i : npt.NDArray[np.float64] = X[:, i].reshape(-1)
        x_rest : npt.NDArray[np.float64] = np.delete(X, i, axis=1)
        x_i_pred : npt.NDArray[np.float64] = sklearn.linear_model.LinearRegression().fit(x_rest, x_i).predict(x_rest)
        R_i_sq : float = 1 - np.sum(np.power(x_i - x_i_pred, 2.0))/(np.sum(np.power(x_i - np.mean(x_i), 2)))
        vif_i.append(1.0/(1.0 - R_i_sq))
    vif_i = np.array(vif_i)

    return np.array([np.max(vif_i), np.argmax(vif_i)])

def PerformanceSummary(y : npt.NDArray[np.float64], y_pred : npt.NDArray[np.float64]) -> dict[str, float]:
    y_bar : float = np.mean(y)
    mse_f : float = np.sum(np.power(y - y_pred, 2.0))/len(y)
    mae_f : float = np.sum(np.absolute(y - y_pred))/len(y)
    rsq : float = 1 - (np.sum(np.power((y - y_pred), 2.0)))/(np.sum(np.power((y - y_bar), 2.0)))
    perf : dict[str, float] = {"MSE":mse_f, "MAE": mae_f, "R^2": rsq}

    return perf

def generate_datasets(n_samples : int , n_train : int, n_features : int = 10, collinear : bool = True, n_collinear : int = 2, corr_strength : float = 0.6, noise : float = 2.0) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64]]:

    def standardize(X : npt.NDArray[np.float64], y : npt.NDArray[np.float64], train_set : bool = False, params : tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64]] = None) -> tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64]]]:
        '''Standardize Dataset - a function brought over for ordinary linear regression. Defined within `generate_datasets` to avoid confusion and conflict with PLS class.'''
        if (train_set == True) and (params is None):
            mu_X : npt.NDArray[np.float64] = np.mean(X, axis=0)
            mu_y : npt.NDArray[np.float64] = np.mean(y)
            std_X : npt.NDArray[np.float64] = np.std(X, axis=0)
            std_y : npt.NDArray[np.float64] = np.std(y, axis=0)

            X : npt.NDArray[np.float64] = (X - mu_X)/std_X
            y : npt.NDArray[np.float64] = (y - mu_y)/std_y
            params : tuple[npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64], npt.NDArray[np.float64]] = (mu_X, mu_y, std_X, std_y)
            
        elif (train_set == False) and (params is not None):
            mu_X, mu_y, std_X, std_y = params
            X : npt.NDArray[np.float64] = (X - mu_X)/std_X
            y : npt.NDArray[np.float64] = (y - mu_y)/std_y

        else:
            raise ValueError("Invalid set of inputs! Please ensure `params` is not None for train_set == False")
        
        return X, y, params

    X, y, _ = generate_data(n_samples, n_features = n_features, collinear=collinear, n_collinear=n_collinear, corr_strength=corr_strength, noise = noise)

    X_train : npt.NDArray[np.float64] = X[:n_train]
    y_train : npt.NDArray[np.float64] = y[:n_train]
    X_train, y_train, params = standardize(X_train, y_train, train_set = True, params=None)

    X_test : npt.NDArray[np.float64] = X[n_train:]
    y_test : npt.NDArray[np.float64] = y[n_train:]
    X_test, y_test, params = standardize(X_test, y_test, train_set = False, params=params)

    max_vif, max_vif_idx = MaxVif(X_train)

    if (max_vif < 5):
        print(f"Max VIF: {max_vif.round(2)} at column: {int(max_vif_idx)}")
        print("Maximum VIF in training set < 5, no need to deal with multicollinearity")
    else:
        print("WARNING!")
        print(f"Max VIF: {max_vif.round(2)} at column: {int(max_vif_idx)}")
    return X_train, y_train, X_test, y_test

def NormalEquationSolution(X_train: npt.NDArray[np.float64], y_train : npt.NDArray[np.float64], X_test : npt.NDArray[np.float64], y_test : npt.NDArray[np.float64]):

    def f(x: npt.NDArray[np.float64], w: npt.NDArray[np.float64], b: float) -> float:
        ''' Linear Regression equation - local function '''
        f_wb: float = np.dot(w, x) + b
        return f_wb

    def normal_solution(X: npt.NDArray[np.float64], y: npt.NDArray[np.float64]) -> npt.NDArray[np.float64]:
        '''Find solution of regression by normal equations - local function'''
        beta : npt.NDArray[np.float64] = np.matmul(np.matmul(np.linalg.inv(np.matmul(X.T, X)), X.T), y)
        return beta
    
    w_fit : npt.NDArray[np.float64] = normal_solution(X_train, y_train).reshape(-1)
    b_fit : float = 0.0
    # bias is zero after standardization, no need to fit it.
    
    # performance metrics
    y_pred : npt.NDArray[np.float64] = np.zeros(y_test.shape)
    for i in range(len(y_pred)):
        y_pred[i] = f(X_test[i], w_fit, b_fit)

    perf : dict[str, float] = PerformanceSummary(y_test, y_pred)
    
    print("--------------------------------------------------------")
    print("Solution based on Normal Equations (Ordinary Least Squares)")
    print("--------------------------------------------------------")
    print(f"MSE after training (test set): {perf['MSE'].round(3)}")
    print(f"MAE after training (test set): {perf['MAE'].round(3)}")
    print(f"R^2 after training (test set): {perf['R^2'].round(3)}")

    return None

def PartialLeastSquaresSolution(X_train: npt.NDArray[np.float64], y_train : npt.NDArray[np.float64], X_test : npt.NDArray[np.float64], y_test : npt.NDArray[np.float64]):

    PLSRegressor = PLS()
    PLSRegressor.fit(X_train, y_train)

    y_pred = PLSRegressor.predict(X_test)
    perf : dict[str, float] = PerformanceSummary(y_test, y_pred)

    print("--------------------------------------------------------")
    print("Solution based on Partial Least Squares")
    print("--------------------------------------------------------")
    print(f"PLS dropped {PLSRegressor._num_ignored_features} features of {X_train.shape[-1]} features.")
    print(f"MSE after training (test set): {perf['MSE'].round(3)}")
    print(f"MAE after training (test set): {perf['MAE'].round(3)}")
    print(f"R^2 after training (test set): {perf['R^2'].round(3)}")

    return None

    

In [73]:
n_samples = 100
n_train = int(0.8 * n_samples)
n_features = 200
n_collinear = 5


X_train, y_train, X_test, y_test = generate_datasets(n_samples, n_train, n_features = n_features,
                                                      collinear = True, n_collinear = n_collinear, corr_strength = 0.4, 
                                                      noise = 0.1)

NormalEquationSolution(X_train, y_train, X_test, y_test)
PartialLeastSquaresSolution(X_train, y_train, X_test, y_test)

Max VIF: inf at column: 0
--------------------------------------------------------
Solution based on Normal Equations (Ordinary Least Squares)
--------------------------------------------------------
MSE after training (test set): 91409.647
MAE after training (test set): 266.938
R^2 after training (test set): -76426.371
--------------------------------------------------------
Solution based on Partial Least Squares
--------------------------------------------------------
PLS dropped 180 features of 200 features.
MSE after training (test set): 1.114
MAE after training (test set): 0.861
R^2 after training (test set): 0.069


### Conclusion and Closing Remarks

- In this notebook, we specifically worked with Partial least squares (PLS) and saw how it uses information about the labels to inform the choice of "derived directions". This can sometimes lead to better results than PCR, since "explained variance" is not always a strong predictor of variable importance by itself.
- An illustration showed us PLS working in one of the primary areas it shines - wide data. PLS is also useful as a dimensionality reduction technique. One can simply reduce the dimensions by dropping some $z_m$. It is also very useful with highly collinear data.
- It should be noted that Partial Least Squares also runs the risk of being prohibitively computationally expensive, much like PCR, even after all the loops in this version are vectorized.