# PLS

Pocinjemo s importanjem paketa koje cemo koristiti i definiranjem _random seed_-a kako bi kod bio reproducibilan.

In [1]:
from enum import Enum
from time import sleep

import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
from sklearn.cross_decomposition import PLSRegression
from sklearn.linear_model import LinearRegression
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm

random_state = 42
rng = np.random.RandomState(random_state)

Sljedeca funkcija sluzi generiranju uzorka jedinicnih normalnih vektora takvih da je kovarijanca svaka dva vektora jednaka.

In [2]:
def get_random_sample_with_fixed_covariance(sample_size: int, N: int, ro: float, return_type: str = 'pandas'):

    """Generates random sample of normal vectors such that covariance 
    between any pair of vectors equals ro.

    Args:
        N (int): Dimension of the vector space.
        ro (float): Covariance between different vectors.
        return_type (str, optional): Defines the type of output. Supported values are 'numpy' and 'pandas'. Defaults to 'pandas'.
    Returns:
        X: Generated sample.
    """

    assert sample_size >= N

    X_0 = rng.normal(loc=0, scale=1, size=sample_size)

    X = []

    for i in range(N):
        new_independent_variable = rng.normal(loc=0, scale=1, size=sample_size)
        new_variable = np.sqrt(ro) * X_0 + np.sqrt(1-ro**2) * new_independent_variable
        X.append(new_variable)
    
    X = np.array(X).T

    if return_type == 'pandas':
        return pd.DataFrame(X, columns=[f'X_{i+1}' for i in range(N)])
    
    if return_type == 'numpy':
        return np.array(X)

Primjerice, za $ \rho = 0.9$ ocekujemo da ce korelacijska matrica imati $\approx 0.81$ na izvandijagonalnim elementima.

In [3]:
N = 3
sample_size = 10000
ro = 0.9
X = get_random_sample_with_fixed_covariance(N=N, sample_size=sample_size, ro=ro)
print(f'>>> X correlation matrix: \n {X.corr()}')

>>> X correlation matrix: 
           X_1       X_2       X_3
X_1  1.000000  0.833566  0.826982
X_2  0.833566  1.000000  0.828807
X_3  0.826982  0.828807  1.000000


Sljedeca funkcija sluzi generiranju varijable $y$ tako da su $x$ i $y$ u srednjem linearno povezane, tj. $$y = \beta x + \varepsilon, \ \ \varepsilon \sim N(0, \sigma^2).$$

In [4]:
def get_y_from_x_given_beta(X: np.array, beta: np.array, error_var: float = 1):

    """Generate y-sample such that y = X*beta + eps, where eps is normally distributed error with mean 0.

    Args:
        X (np.array): independent variable sample.
        beta (np.array): linear transformation coefficient.
        error_var (float, optional): Variance of error. Defaults to 1.

    Returns:
        y: Generated sample of target variable.
    """
    
    error = rng.normal(loc=0, scale=np.sqrt(error_var), size=X.shape[0])
    y = np.matmul(X, beta) + error
    return y

beta = np.ones(X.shape[1])

y = get_y_from_x_given_beta(X=X, beta=beta)
print(f'>>> Mean of y: {y.mean()}')

>>> Mean of y: -0.002380801084672142


Sad kombinirajuci gornje dvije funkcije mozemo napisati punkciju koja generira kompletan uzorak.

In [5]:
def get_sample(beta: np.array, ro: float, sample_size: int, error_var: float = 1):

    """Generates sample of X and y such that y = X*beta + eps, eps ~ N(0, sqrt(error_var))

    Args:
        beta (np.array): Linear transformation vector.
        ro (float): Covariance between predictors.

    Returns:
        sample: generated sample.
    """

    N = beta.shape[0]
    X = get_random_sample_with_fixed_covariance(sample_size=sample_size, N=N, ro=ro)
    y = get_y_from_x_given_beta(X=X, beta=beta, error_var=error_var)
    return X, y

Jos jedan _sanity check_: ako je $\rho = 0,$ tada je $$\mathrm{Var}(y) = \sum_{k=0}^N \beta_k^2 \mathrm{Var}(X_k) + \mathrm{Var}(\varepsilon),$$ sto bi u slucaju $N = 3, \beta = (1, \dots, 1)$ moralo biti $4$. Zaista, za dovoljno velik uzorak imamo da je uzoracka varijanca $\approx 4$.

In [6]:
N = 3
beta = np.ones(3)
ro = 0
sample_size = 10000

X, y = get_sample(beta=beta, ro=ro, sample_size=sample_size)
print(f'>>> Var(y) = {y.var(ddof=1)}')

>>> Var(y) = 3.9730582932869334


Nastavljamo definiranjem funkcija za treniranje linearnog, PCR i PLS modela, redom, pri cemu potonja dva kao parametar primaju i broj glavnih komponenti koje koriste.

In [13]:
class Model(str, Enum):
    linreg = "linreg"
    pcr = "pcr"
    pls = "pls"

    @staticmethod
    def train(model_name: str, X: np.array, y: np.array, n_components: int | None = None):

        if model_name not in list(Model):
            raise ValueError(f'No such model. Available models are {list(Model)}')
        
        if model_name == Model.linreg:
            model = LinearRegression()
        
        elif model_name == Model.pcr:
            model = make_pipeline(StandardScaler(), PCA(n_components=n_components), LinearRegression())
        
        elif model_name == Model.pls:
            model = PLSRegression(n_components=n_components)
        
        model.fit(X, y)

        return model

Jedan razuman nacin validacije nasih modela bio bi da koristeci distribucije iz kojih smo generirali podatke izracunamo populacijski $\beta$ pa za gresku modela uzmemo
koliko se njegov koeficijent razlikuje od populacijskog, tj. ako je nas model dan s $y = \hat{\beta}x$, njegovu gresku mozemo racunati kao
\begin{align*}
\mathrm{Err}(\mathrm{Model}) = \|\beta-\hat{\beta}\|.
\end{align*}
Medutim, kako je prilikom visoke korelacije kovarijata taj $\beta$ "nestabilan", mi cemo umjesto toga testirati nase modele na velikom testnom uzorku. Drugim rijecima, prvo cemo izgenerirati jako velik uzorak, zatim trenirati model na njegovom malom dijelu, a na ostatku izracunati srednju kvaratnu gresku i $R2,$ sto ce nam biti primarna metrika za validaciju kvalitete modela. Takav pristup ima nekoliko prednosti:
1. Za male uzorke se moze dogoditi da dani podaci ne opisuju dobro svoju distribuciju pa bi i model s populacijskim $\beta$ lose predvidao na testnom skupu. Samim time testiramo i koliko su metode otporne na male uzorke.
2. Metrike poput kvaratne greske i $R2$ su interpretabilnije od udaljenosti do stvarnog $\beta$.
3. Tako se stvari rade u praksi (jer ne znamo stvarne distribucije pa ni vrijednost populacijskog koeficijenta); istrenira se model na uzorku koji nam je dan, a zatim validira na testnom skupu pa ide u produkciju.



In [14]:
SAMPLE_SIZE = 300_000

def train_and_evaluate_all_models(train_sample_size: int, ro: float, beta: np.array, n_components: int, error_var: float = 1):

    """Generates sample in which y = beta*x + eps, eps ~ N(0, error_var), and covariance between differnet
        components of x equals ro. Then trains LS, PCR and PCA models on its subsample and evaluates them on the rest.

    Args:
        n_components (int): Nubmer of components to use in fitting PCR and PLS.

    Returns:
        score_dict (dict): A dictionary whose keys are model names and values their respecitve R2-score on test set.
    """

    X, y = get_sample(beta=beta, ro=ro, sample_size=SAMPLE_SIZE)
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=random_state, train_size=sample_size)

    score_dict = {}

    for model_name in list(Model):
        model = Model.train(model_name=model_name, X=X_train, y=y_train, n_components=n_components)
        score_dict[model_name.value] = model.score(X_test, y_test)

    return score_dict

Donji primjer pokazuje kako kod "skoro" nezavisnih kovarijata PLS bolje predvida nego PCR. To je i ocekivano, pogotovo ako je broj komponenti puno manji od $N$ jer tada PCA nuzno gubi bitne informacije za predvidanje. S druge strane, PLS, rastavljajuci zavisnu varijablu skupa s nezavisnima, ne izgubi gotovo nista te predvida jednako dobro kao i linearna regresija, ali u puno manjoj dimenziji pa je stoga interpretabilniji od nje.

In [17]:
N = 300
train_sample_size = 1000
n_components = 10
ro = 0.001

beta = np.ones(N)

score_dict = train_and_evaluate_all_models(train_sample_size=train_sample_size, ro=ro, beta=beta, n_components=n_components)
score_dict

{'linreg': 0.997357603684893,
 'pcr': 0.46705390839010574,
 'pls': 0.9973576036808041}

In [18]:
index_columns = ['train_sample_size', 'N', 'n_components', 'ro', 'beta']
score_df = pd.DataFrame(None, columns=index_columns+[x.value for x in Model])

loader = tqdm([1000, 500, 100, 50, 10, 5])

N_train_sample_size_ratios = [10, 5, 3, 2, 1]
N_n_components_ratios = [1, 2, 3, 5, 10, 100, 200]

for N in loader:
    for train_sample_size in [N*x for x in N_train_sample_size_ratios]:

        if train_sample_size <= N:
            continue

        for n_components in [N//x for x in N_n_components_ratios]:

            if n_components == 0:
                continue

            for ro in [0.01, 0.1, 0.2, 0.5, 0.7, 0.9, 0.99]:
                beta = rng.normal(size=N)*5
                score_dict = train_and_evaluate_all_models(train_sample_size=train_sample_size, beta=beta, ro=ro, n_components=n_components)
                hparams_dict = dict(train_sample_size=train_sample_size, N=N, ro=ro, n_components=n_components, beta=beta)
                new_row = hparams_dict | score_dict

                score_df.loc[len(score_df), :] = new_row

                new_row.pop('beta')
                loader.set_postfix(**new_row)
    
            score_df.to_csv('scores.csv', index=False)

score_df.to_csv('scores.csv', index=False)

  0%|          | 0/6 [32:35<?, ?it/s, N=1000, linreg=1, n_components=100, pcr=0.117, pls=1, ro=0.01, train_sample_size=1e+4]         

In [None]:
score_df.to_csv('scores.csv', index=False)

array([-5.90978613, -4.07209144])

In [None]:
score_df

Unnamed: 0,train_sample_size,N,n_components,ro,beta,linreg,pcr,pls
0,10000,1000,1000,0.01,"[8.838679669557582, 0.59767674512725, 4.826527...",0.999954,0.999954,0.999951


In [None]:
index_columns = ['N', 'train_sample_size', 'ro']
score_df = pd.DataFrame(None, columns=index_columns+[x.value for x in Model])

In [None]:
score_df

train_sample_size = 5
n = 5
ro = 5
d = dict(train_sample_size=train_sample_size, N=N, ro=ro)

score_df.loc[len(score_df), :] = d | score_dict

In [None]:
score_df

Unnamed: 0,N,train_sample_size,ro,linreg,pcr,pls
0,,5.0,5.0,0.998022,0.833751,0.998021
1,5.0,5.0,5.0,0.998022,0.833751,0.998021


In [None]:
loader = tqdm([10, 30, 50, 100, 500, 1000])

for N in loader:
    sleep(2)
    loader.set_postfix(konj='k')

100%|██████████| 6/6 [00:12<00:00,  2.01s/it, konj=k]
