In [None]:
import pandas as pd
from pathlib import Path
import os
from sys import stdout
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.cross_decomposition import PLSRegression
from sklearn.model_selection import cross_val_predict
from sklearn.metrics import mean_squared_error, r2_score
from sklearn.preprocessing import StandardScaler

In [None]:
# Algoritmo PLSR
def pretreat(X, method):
    if method == 'autoscaling':
        scaler = StandardScaler()
        Xs = scaler.fit_transform(X)
        return Xs, scaler.mean_, scaler.scale_
    elif method == 'center':
        mean = np.mean(X, axis=0)
        Xs = X - mean
        return Xs, mean
    else:
        return X, 0, 1
def optimise_pls_cv(X, y, n_comp, max_iter=100, method='center'):
 
    '''Run PLS including a variable number of components, up to n_comp,
       and calculate MSE '''
 
    mse = []
    component = np.arange(1, n_comp+1)
    
    X, mean_X = pretreat(X, method)  # Centralizando os dados
    y, mean_y = pretreat(y, method)
 
    for i in component:
        pls = PLSRegression(n_components=i)
 
        # Cross-validation
        y_cv = cross_val_predict(pls, X, y, cv=10)
 
        mse.append(mean_squared_error(y, y_cv))
 
        comp = 100*(i+1)/n_comp
        # Trick to update status on the same line
        stdout.write("\r%d%% completed" % comp)
        stdout.flush()
    stdout.write("\n")
 
    # Calculate and print the position of minimum in MSE
    msemin = np.argmin(mse)
    print("Suggested number of components: ", msemin+1)
    stdout.write("\n")
    
 
    # Define PLS object with optimal number of components
    pls_opt = PLSRegression(n_components=msemin+1)
 
    # Fir to the entire dataset
    pls_opt.fit(X, y)
    y_c = pls_opt.predict(X)
 
    # Cross-validation
    y_cv = cross_val_predict(pls_opt, X, y, cv=10)
 
    # Calculate scores for calibration and cross-validation
    score_c = r2_score(y, y_c)
    score_cv = r2_score(y, y_cv)
 
    # Calculate mean squared error for calibration and cross validation
    mse_c = mean_squared_error(y, y_c)
    mse_cv = mean_squared_error(y, y_cv)
    
    rmse_c = np.sqrt(mean_squared_error(y, y_c))
    rmse_cv = np.sqrt(mean_squared_error(y, y_cv))
    
    rpd_value = y.std() / np.sqrt(rmse_cv)
 
    print('R2 calib: %5.3f'  % score_c)
    print('R2 CV: %5.3f'  % score_cv)
    print('RMSE calib: %5.3f' % rmse_c)
    print('RMSE CV: %5.3f' % rmse_cv)
    print('RPD: %5.3f' % rpd_value)
 

    # Fit a line to the CV vs response
    z = np.polyfit(y, y_c, 1)
    with plt.style.context(('ggplot')):
        fig, ax = plt.subplots(figsize=(9, 5))
        ax.scatter(y_c, y, c='red', edgecolors='k')
        #Plot the best fit line
        ax.plot(np.polyval(z,y), y, c='blue', linewidth=1)
        #Plot the ideal 1:1 line
        ax.plot(y, y, color='green', linewidth=1)
        plt.title('$R^{2}$ (CV): '+str(score_cv))
        plt.xlabel('Predicted')
        plt.ylabel('Measured')
 
        plt.show()
 
    return

# PLSR

In [None]:
# Supondo que você já tenha suas matrizes X e Y
# Calculando as médias
X_mean = X.mean(axis=0)
Y_mean = Y['SST'].mean(axis=0)

# Centralizando as matrizes
Xc = X - np.ones((X.shape[0], 1)) * X_mean
Yc = Y - np.ones((Y.shape[0], 1)) * Y_mean

H = 10  # número de iterações
w = np.zeros((Xc.shape[1], H))
t = np.zeros((Xc.shape[0], H))
p = np.zeros((Xc.shape[1], H))
q = np.zeros((H, 1))

# Iteração sobre H
for h in range(H):
    v = np.dot(Xc.T, Yc)
    w[:, h] = v / np.sqrt(np.dot(v.T, v))
    norm_w_h = np.linalg.norm(w[:, h])
    print("norma=" + norm_w_h)
    t[:, h] = np.dot(Xc, w[:, h])
    tt = np.dot(t[:, h].T, t[:, h])
    p[:, h] = np.dot(Xc.T, t[:, h]) / tt
    R = Xc - np.outer(t[:, h], p[:, h].T)
    q[h] = np.dot(t[:, h].T, Yc) / tt

# Calculando b
b = np.cumsum(np.dot(w, np.dot(np.linalg.inv(np.dot(p.T, w)), np.diag(q.flatten()))), axis=1)

# Se necessário, você pode converter os resultados finais para DataFrame do pandas
b_df = pd.DataFrame(b)

In [None]:

def pretreat(X, method):
    if method == 'autoscaling':
        scaler = StandardScaler()
        Xs = scaler.fit_transform(X)
        return Xs, scaler.mean_, scaler.scale_
    elif method == 'center':
        mean = np.mean(X, axis=0)
        Xs = X - mean
        return Xs, mean, 1
    else:
        return X, 0, 1

def plsnipals(X, Y, A):
    varX = np.sum(np.sum(X**2))
    varY = np.sum(np.sum(Y**2))
    
    n, p = X.shape
    _, m = Y.shape
    
    W = np.zeros((p, A))
    T = np.zeros((n, A))
    P = np.zeros((p, A))
    Q = np.zeros((m, A))
    
    for i in range(A):
        error = 1
        u = Y[:, 0]
        niter = 0
        while error > 1e-8 and niter < 1000:
            w = X.T @ u / (u.T @ u)
            w = w / np.linalg.norm(w)
            t = X @ w
            q = Y.T @ t / (t.T @ t)
            u1 = Y @ q / (q.T @ q)
            error = np.linalg.norm(u1 - u) / np.linalg.norm(u)
            u = u1
            niter += 1
        
        p = X.T @ t / (t.T @ t)
        X = X - np.outer(t, p.T)
        Y = Y - np.outer(t, q.T)
        
        W[:, i] = w
        T[:, i] = t
        P[:, i] = p
        Q[:, i] = q
    
    R2X = np.diag(T.T @ T @ P.T @ P) / varX
    R2Y = np.diag(T.T @ T @ Q.T @ Q) / varY
    
    Wstar = W @ np.linalg.inv(P.T @ W)
    B = Wstar @ Q.T
    
    return B, Wstar, T, P, Q, W, R2X, R2Y

def plscv(X, y, A=3, K=10, method='center', PROCESS=1, order=0):
    if order == 0:
        indexyy = np.argsort(y)
    elif order == 1:
        indexyy = np.random.permutation(len(y))
    elif order == 2:
        indexyy = np.arange(len(y))
    
    X = X[indexyy, :]
    y = y[indexyy]
    
    Mx, Nx = X.shape
    A = min([X.shape[1], A])
    yytest = np.full(Mx, np.nan)
    YR = np.full((Mx, A), np.nan)
    
    kf = KFold(n_splits=K)
    
    for group, (train_index, test_index) in enumerate(kf.split(X)):
        Xcal, Xtest = X[train_index], X[test_index]
        ycal, ytest = y[train_index], y[test_index]
        
        Xs, xpara1, xpara2 = pretreat(Xcal, method)
        ys, ypara1, ypara2 = pretreat(ycal, 'center')
        
        B, Wstar, T, P, Q, W, R2X, R2Y = plsnipals(Xs, ys[:, np.newaxis], A)
        
        yp = np.zeros((Xtest.shape[0], A))
        for j in range(A):
            Bj = Wstar[:, :j+1] @ Q[:j+1]
            C = ypara2 * Bj / xpara2
            coef = np.vstack([C, ypara1 - np.sum(xpara1 * C)])
            Xteste = np.hstack([Xtest, np.ones((Xtest.shape[0], 1))])
            ypred = Xteste @ coef
            yp[:, j] = ypred.flatten()
        
        YR[test_index, :] = yp
        yytest[test_index] = ytest
        if PROCESS == 1:
            print(f'The {group+1}th group finished.')
    
    YR[indexyy, :] = YR
    y = y[indexyy]
    
    error = YR - y[:, np.newaxis]
    error2 = error**2
    error2_MEAN = np.mean(error2, axis=0)
    error2_SD = np.std(error2, axis=0, ddof=1)
    
    cv = np.sqrt(error2_MEAN)
    RMSEP = np.min(cv)
    index = np.argmin(cv)
    SST = np.sum((yytest - np.mean(y))**2)
    
    Q2 = [1 - (np.sum((YR[:, i] - y)**2) / SST) for i in range(A)]
    
    indexSD = np.where(error2_MEAN <= np.min(error2_MEAN) + error2_SD[index])[0][0]
    
    results_df = pd.DataFrame({
        'Method': method,
        'Ypred': [list(row) for row in YR],
        'PredError': [list(row) for row in error],
        'RMSECV': cv,
        'Q2': Q2,
        'RMSECV_min': RMSEP,
        'Q2_max': Q2[index],
        'OptLV': index + 1,
        'Note': '*** The following is based on global min MSE + 1SD',
        'RMSECV_min_1SD': cv[indexSD],
        'Q2_max_1SD': Q2[indexSD],
        'OptLV_1SD': indexSD + 1
    })

    return results_df

# Exemplo de uso:
# X = ... # Sua matriz de entrada (m x n)
# y = ... # Seu vetor de resposta (m x 1)
# A = ... # Número máximo de variáveis latentes
# K = ... # Número de folds para validação cruzada
# method = ... # Método de pré-processamento ('autoscaling', 'pareto', 'minmax', 'center', 'none')
# PROCESS = ... # Imprimir progresso (1 para sim, 0 para não)
# order = ... # Ordem dos dados (0 para ordenado, 1 para aleatório, 2 para original)
X_array = X.values
Y_array = Y['SST'].values
plscv(X_array, Y_array, 1, 10, 'center', 1, 2)


The 1th group finished.
The 2th group finished.
The 3th group finished.
The 4th group finished.
The 5th group finished.
The 6th group finished.
The 7th group finished.
The 8th group finished.
The 9th group finished.
The 10th group finished.


ValueError: All arrays must be of the same length