Linear Regressor

In [131]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import scipy
from typing import Tuple
import warnings
# import statsmodels.api as sm
# import statsmodels.stats.diagnostic as smd
# from statsmodels.stats.stattools import durbin_watson, jarque_bera
# from statsmodels.stats.outliers_influence import variance_inflation_factor
# from statsmodels.tools.tools import add_constant

In [50]:
from sklearn import datasets

data = datasets.fetch_california_housing()
X = pd.DataFrame(data['data'], columns=data['feature_names'])
y = pd.DataFrame(data['target'], columns=['Price'])

In [51]:
X_simple_train = X[['AveBedrms']].head(16000)
X_simple_test = X[['AveBedrms']].tail(4640)

X_mult = X.copy()
X_mult_train = X_mult.head(16000)
X_mult_test = X_mult.tail(4640)

y_train = y.head(16000)
y_test = y.tail(4640)

In [52]:
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.metrics import mean_absolute_error, root_mean_squared_error, mean_squared_error

Scikit Learn pred for Simple Reg

In [53]:
LR = LinearRegression()
RR = Ridge(alpha=1000)
LassoR = Lasso(alpha=1000)

LR.fit(X_simple_train, y_train)
LR_pred = LR.predict(X_simple_test)

RR.fit(X_simple_train, y_train)
RR_pred = RR.predict(X_simple_test)

LassoR.fit(X_simple_train, y_train)
LassoR_pred = LassoR.predict(X_simple_test)

print(f'Linear Simple RMSE: {mean_absolute_error(y_test, LR_pred)}')
print(f'Ridge Simple RMSE: {mean_absolute_error(y_test, RR_pred)}')
print(f'Lasso Simple RMSE: {mean_absolute_error(y_test, LassoR_pred)}')

Linear Simple RMSE: 0.982420666235715
Ridge Simple RMSE: 0.9828045748451621
Lasso Simple RMSE: 0.9845237667499999


Scikit Learn pred for Mult Reg

In [65]:
LR.fit(X_mult_train, y_train)
LR_pred_mult = LR.predict(X_mult_test)

RR.fit(X_mult_train, y_train)
RR_pred_mult = RR.predict(X_mult_test)

LassoR.fit(X_mult_train, y_train)
LassoR_pred_mult = LassoR.predict(X_mult_test)

print(f'Linear Simple RMSE: {mean_absolute_error(y_test, LR_pred_mult)}')
print(f'Ridge Simple RMSE: {mean_absolute_error(y_test, RR_pred_mult)}')
print(f'Lasso Simple RMSE: {mean_absolute_error(y_test, LassoR_pred_mult)}')

Linear Simple RMSE: 0.5183922114609337
Ridge Simple RMSE: 0.52261892925488
Lasso Simple RMSE: 0.9845237667499999


In [92]:
X_pru_aux_R.shape

(16000, 11)

In [119]:
def RamseyReset(X, y):

    all_w = LinearRegressor._get_weights(X, y)
    w1, b1 = all_w[0:-1], all_w[-1]
    pred1 = X @ w1 + b1
    pred1_sq = pred_pru ** 2
    pred1_cu = pred_pru ** 3
    ssrr = np.sum((pred1 - y)**2, axis=0)

    X2 = np.hstack([X, pred1_sq, pred1_cu])
    all_w_R = LinearRegressor._get_weights(X2, y)
    w2, b2 = all_w_R[0:-1], all_w_R[-1]
    pred2 = X2 @ w2 + b2
    ssra = np.sum((pred2 - y)**2, axis=0)

    q = 2
    n = X.shape[0]
    k = X.shape[1]

    F = ((ssrr-ssra) / q) / (ssra / (n - k - q - 1)) 
    F = F[0]

    if F > 3:
        warnings.warn('Ramsey Test failed, your model has non-linear relations, try using polynomial or logarithmic convertions.')

RamseyReset(X_mult_train.values, y_train.values)
Fish



In [None]:
def check_DW(X, y):

    all_w = LinearRegressor._get_weights(X, y)
    w, b = all_w[:-1], all_w[-1]

    pred = X @ w + b
    res = y - pred
    d = np.sum((res[:-1] - res[1:])**2) / np.sum(res **2)

    

check_DW(X_mult_train.values, y_train.values)

0.8705042151218245


In [138]:
scipy.stats.f.ppf(1-0.05, 8, 16000-8-1)

1.9389904335366368

In [146]:
def check_Heterocedaskicity(X, y):

    all_w = LinearRegressor._get_weights(X, y)
    w, b = all_w[:-1], all_w[-1]
    pred = X @ w + b
    res = y - pred
    res_sq = res**2
    n = X.shape[0]
    k = X.shape[1]
    ssr = np.sum(res_sq)

    var_res = ssr / (n-k-1)
    g =  res_sq / var_res

    all_w2 = LinearRegressor._get_weights(X, g)
    w2, b2 = all_w2[:-1], all_w2[-1]
    pred2 = X @ w2 + b2

    R2 = Metrics.get_R2(pred2, g)
    LM = n * R2
    chi_square_value = scipy.stats.chi2.ppf(1-0.05, k)

    if LM > chi_square_value:
        warnings.warn('The BP test shows that your model has heterocedaskicity problems, try transforming the dependent and independent variables.')
        
check_Heterocedaskicity(X_mult_train.values, y_train.values)



In [158]:
import numpy as np
import warnings
from scipy.stats import chi2 # Necesaria para el valor crítico

def test_JB_Corregido(X, y):

    # 1. Regresión y Residuos
    all_w = LinearRegressor._get_weights(X, y)
    w, b = all_w[:-1], all_w[-1]
    pred = X @ w + b
    res = y - pred
    n = X.shape[0]

    # La media de residuos en MCO es ~0, pero se incluye por robustez
    res_mean = np.mean(res) 
    
    # --- 2. Momentos Centrales y Normalización ---
    
    # 2.1. Varianza (Momento central de orden 2)
    M2 = np.sum((res - res_mean)**2) / n # Varianza muestral
    
    # Desviación estándar al cuadrado y al cubo para la normalización
    std_dev = np.sqrt(M2)
    
    # 2.2. Asimetría (S)
    M3 = np.sum((res - res_mean)**3) / n # Momento central de orden 3
    S = M3 / (std_dev**3) # M3 normalizado por la desviación estándar al cubo
    
    # 2.3. Curtosis (K)
    M4 = np.sum((res - res_mean)**4) / n # Momento central de orden 4
    K = M4 / (std_dev**4) # M4 normalizado por la desviación estándar a la cuarta
    
    # --- 3. Estadístico JB ---
    
    # Fórmula correcta: utiliza la curtosis excesiva (K - 3)
    JB = n / 6 * ((S**2) + ((K - 3)**2 / 4)) # <--- CORRECCIÓN CLAVE
    
    # --- 4. Evaluación ---
    
    # El valor crítico Chi-cuadrado para alpha=0.05 y GL=2 es ~5.991
    JB_critic = chi2.ppf(1 - 0.05, 2)
    
    print(f"Estadístico Jarque-Bera: {JB:.4f}")
    
    if JB > JB_critic:
        warnings.warn('Jarque Bera test failed. The residuals are not normally distributed (JB > Chi^2_crit).', UserWarning)
    else:
        print("Test de Jarque-Bera Aprobado. Los residuos son consistentes con una distribución normal.")
        
    print(JB)

test_JB_Corregido(X_mult_train.values, y_train.values)


Estadístico Jarque-Bera: 13584.7122
13584.71215673286




In [None]:
def test_JB(X, y):

    all_w = LinearRegressor._get_weights(X, y)
    w, b = all_w[:-1], all_w[-1]

    pred = X @ w + b
    res = y - pred
    n = X.shape[0]
    res_mean = np.mean(res)

    M2 = 1/n * np.sum((res-res_mean)**2)
    M3 = 1/n * np.sum((res-res_mean)**3)
    M4 = 1/n * np.sum((res-res_mean)**4)
    std_dev = np.sqrt(M2)
    
    S = M3 / std_dev**3
    K = M4 / std_dev**4

    JB = n/6 * ((S**2) + ((K - 3)**2 / 4))

    JB_critic = scipy.stats.chi2.ppf(1-0.05, 2)

    if JB > JB_critic:
        warnings.warn('Jarque Bera test failed. The residuals are not normally distributed.')


test_JB(X_mult_train.values, y_train.values)

13584.71215673286




Now my implementation.

Cosas a corregir del modelo.

1. Divide y conquista: Tienes todo en una sola clase, eso lo hace dificil de mantener, trata de separar cada etapa.
2. Trata de que todo este escrito con Numpy, en vez de recibir dataframes de pandas, trata de que solo se pueda recibir arrays de numpy y trabajar con ellos, o por lo menos separar o convertir al inicio y luego hacer todo el modelo con puro numpy.
3. Separa las metricas y puedes crear una clase solo para ellas, de modo que sea mas legible y facil de mantener
4. Que los tests del modelo sean netamente producto del modelo y no de otras librerias, sino pierde sentido porque no es "desde cero".
5. Tu regresor tiene que ser a prueba de balas, preparalo para NaNs, tipos de datos coherentes, dimensiones de arrays correctas, variables categoricas.
6. Usa Warnings en vez de prints, los print no sirven para produccion

In [None]:
# class Validator:

#     def validate(self, X, y):

class Preprocessor:

    @staticmethod
    def _convert_data(self, X: pd.DataFrame | pd.Series, y: pd.Series, train: bool=True) -> Tuple[np.ndarray, np.ndarray]:

        if isinstance(X, pd.Series):
            X = X.values.reshape(-1, 1)
        elif isinstance(X, pd.DataFrame):
            X = X.values

        if train:
            if isinstance(y, pd.Series):
                y = y.values.reshape(-1, 1)
            elif isinstance(y, pd.DataFrame):
                y = y.values

            return X, y
        
        return X
    

class CorrChecker:

    def __init__(self, X: np.ndarray, y: np.ndarray, feature_names:list):
        self.X = X
        self.y = y
        self.feature_names = feature_names
    
    @classmethod
    def _check_corr(self, X: np.ndarray, y: np.ndarray) -> None:

        X = pd.DataFrame(X, columns=self.feature_names)
        y = pd.DataFrame(y, columns=['Target'])

        df = pd.concat([X, y], axis=1)
        corrs = df.corr().iloc[:, -1]

        for col, corr in zip(corrs.index, corrs):
            if abs(corr) < 0.3:
                print(f'WARNING! {col} has a low correlation with the target variable ({np.round(corr, 2)}).')


class LinearRegressor:

    def __init__(self, w=None, b=None, TimeSeries=False):
        self.w = w
        self.b = b
        self.TimeSeries = TimeSeries
    
    @staticmethod
    def _get_weights(X: np.ndarray, y: np.ndarray) -> np.ndarray:

        b = np.ones(shape=(X.shape[0], 1))
        input_b = np.hstack((X, b))

        weights = (np.linalg.inv(input_b.T @ input_b) @ input_b.T) @ y

        return weights
    
    @staticmethod
    def __get_residuals(pred: np.ndarray, y: np.ndarray) -> np.ndarray:

        ''' This is a private method '''

        residuals = pred - y
        return residuals

    def fit(self, X_train: pd.DataFrame | pd.Series, y_train: pd.Series) -> None:

        '''
            This method acts like training the model, it will also tell you if your
            model has heterocedaskicity, autocorrelation, multicolinearity, is it
            bad specified and if the data is normally distributed.
        '''

        CorrChecker._check_corr(X_train, y_train)
        X_train_r, y_train_r = Preprocessor._convert_data(X_train, y_train)
        w_b = self._get_weights(X_train_r, y_train_r)
        self.w = w_b[:-1]
        self.b = w_b[-1]
        print()
        
        AssumpChecker._check_assumptions(X_train_r, y_train_r, self.TimeSeries)

        if len(X_train.shape) > 1:
            AssumpChecker._check_multicol(X_train)

    def predict(self, X: pd.DataFrame | pd.Series) -> np.ndarray:

        '''
            This method predicts the test set using the weights and the Bias.
        '''

        X = Preprocessor._convert_data(X, np.array([1]), train=False)

        pred = X @ self.w + self.b
        return pred
    
    
class AssumpChecker:

    @staticmethod
    def __check_Ramsey(X: np.ndarray, y: np.ndarray) -> None:

        ''' 
            This method does all the Ramsey process from start to finish, it
            can be hard to understand, so I suggest supporting the explanations
            with AI.
        '''

        all_w = LinearRegressor._get_weights(X, y)
        w1, b1 = all_w[0:-1], all_w[-1]
        pred1 = X @ w1 + b1
        pred1_sq = pred_pru ** 2
        pred1_cu = pred_pru ** 3
        ssrr = np.sum((pred1 - y)**2, axis=0)

        X2 = np.hstack([X, pred1_sq, pred1_cu])
        all_w_R = LinearRegressor._get_weights(X2, y)
        w2, b2 = all_w_R[0:-1], all_w_R[-1]
        pred2 = X2 @ w2 + b2
        ssra = np.sum((pred2 - y)**2, axis=0)

        q = 2
        n = X.shape[0]
        k = X.shape[1]

        F = ((ssrr-ssra) / q) / (ssra / (n - k - q - 1)) 
        F = F[0]
        F_critic = scipy.stats.F.ppf(1-0.05, k, n-k-q-1)

        if F > F_critic:
            warnings.warn('Ramsey Test failed, your model has non-linear relations, try using polynomial or logarithmic convertions.')

    @staticmethod
    def __check_dw(X: np.ndarray, y: np.ndarray) -> None:

        ''' This method just applies for Time Series data only, it does the Durbin Watson test. '''

        all_w = LinearRegressor._get_weights(X, y)
        w, b = all_w[:-1], all_w[-1]

        pred = X @ w + b
        res = y - pred
        d = np.sum((res[:-1] - res[1:])**2) / np.sum(res **2)

        if d < 1.8:
            warnings.warn('Durbin Watson test shows that the model has positive correlation problems, try adding lags of one of the dependent variables as another dependent variable.')
        elif d > 2.2:
            warnings.warn('Durbin Watson test shows that the model has negative correlation problems, try adding lags of one of the dependent variables as another dependent variable.')

    @staticmethod
    def __check_ht(X,: np.ndarray, y: np.ndarray) -> None:

        ''' This test checks for Heterocedaskicity on your model.'''

        all_w = LinearRegressor._get_weights(X, y)
        w, b = all_w[:-1], all_w[-1]
        pred = X @ w + b
        res = y - pred
        res_sq = res**2
        n = X.shape[0]
        k = X.shape[1]
        ssr = np.sum(res_sq)

        var_res = ssr / (n-k-1)
        g =  res_sq / var_res

        all_w2 = LinearRegressor._get_weights(X, g)
        w2, b2 = all_w2[:-1], all_w2[-1]
        pred2 = X @ w2 + b2

        R2 = Metrics.get_R2(pred2, g)
        LM = n * R2
        chi_square_value = scipy.stats.chi2.ppf(1-0.05, k)

        if LM > chi_square_value:
            warnings.warn('The BP test shows that your model has heterocedaskicity problems, try transforming the dependent and independent variables.')

    @staticmethod
    def __check_jb(X: np.ndarray, y: np.ndarray) -> None:

        ''' This is a private method '''
        
        all_w = LinearRegressor._get_weights(X, y)
        w, b = all_w[:-1], all_w[-1]

        pred = X @ w + b
        res = y - pred
        n = X.shape[0]
        res_mean = np.mean(res)

        M2 = 1/n * np.sum((res-res_mean)**2)
        M3 = 1/n * np.sum((res-res_mean)**3)
        M4 = 1/n * np.sum((res-res_mean)**4)
        std_dev = np.sqrt(M2)
        
        S = M3 / std_dev**3
        K = M4 / std_dev**4

        JB = n/6 * ((S**2) + ((K - 3)**2 / 4))

        JB_critic = scipy.stats.chi2.ppf(1-0.05, 2)

        if JB > JB_critic:
            warnings.warn('Jarque Bera test failed. The residuals are not normally distributed.')

    @staticmethod
    def __check_multicol(X: pd.DataFrame) -> None:

        ''' This is a private method '''

        vif_data = pd.DataFrame()
        vif_data['feature'] = X.columns
        vif_data['VIF'] = [variance_inflation_factor(X.values, i) \
                               for i in range(X.shape[1])]

        for idx, row in vif_data.iterrows():
            if row['VIF'] >= 5:
                print(f'WARNING! Check the correlation between {row['feature']} and the other independent variables. VIF: {np.round(row['VIF'], 2)}')


    @staticmethod
    def _check_assumptions(X: np.ndarray, y: np.ndarray, TimeSeries: bool = False) -> None:

        ''' This is a private method '''

        jb = jarque_bera(residuals)
        AssumpChecker.__check_Ramsey(X, y)
        AssumpChecker.__check_jb(X, y)
        AssumpChecker.__check_ht(X, y)

        if TimeSeries:
            AssumpChecker.__check_dw(X, y)


class Metrics:

    @staticmethod
    def get_MSE(pred: np.ndarray, y: np.ndarray) -> float:

        ''' This method calculates the MSE '''

        m = pred.shape[0]

        MSE = (1/m) * np.sum((pred - y) ** 2)
        return MSE

    @staticmethod
    def get_RMSE(pred: np.ndarray, y: np.ndarray) -> float:

        ''' This method calculates the RMSE '''

        m = pred.shape[0]

        RMSE = ((1/m) * np.sum((pred - y) ** 2)) ** 0.5
        return RMSE

    @staticmethod
    def get_MAE(pred: np.ndarray, y: np.ndarray) -> float:

        ''' This method calculates the MAE '''

        m = pred.shape[0]

        MAE = 1/m * (np.sum(abs(pred - y)))
        return MAE

    @staticmethod
    def get_R2(pred: np.ndarray, y: np.ndarray) -> float:

        ''' This method calculates the R2 '''

        num = np.sum((pred - y.mean()) ** 2)
        den = np.sum((y - y.mean()) ** 2)
        R2 = 1 - (num / den)

        return R2
    
    

In [None]:



class LinearRegressor:

    def __init__(self, w=None, b=None):
        self.__w = w
        self.__b = b


    
    @staticmethod
    def __get_regplot(X: np.ndarray, pred: np.ndarray, y: np.ndarray) -> None:

        ''' This is a private method '''

        plt.scatter(x=y, y=X, alpha=0.3)
        plt.plot(pred, X, c='r')
        plt.title('Regresion plot')
        plt.xlabel('Input')
        plt.ylabel('Actual Values')
        plt.show()

    @staticmethod
    def __get_residual_plot(pred: np.ndarray, residuals: np.ndarray) -> None:

        ''' This is a private method '''

        plt.scatter(pred, residuals)
        plt.axhline(y=0, color='r', linestyle='--')
        plt.xlabel('Predictions')
        plt.ylabel('Residuals')
        plt.title('Residuals Plot')
        plt.show()



    def get_metrics_report(self,
                           X: pd.DataFrame | pd.Series,
                           pred: np.ndarray,
                           y: np.ndarray,
                           charts=True) -> None:

        '''
            This method will give you a quick report of you regression, showing you metrics
            like RMSE, MSE, MAE, R2, a regression plot and a residual plot.
        '''

        X_res, y_res = LinearRegressor.__convert_training_data(X, y)

        RMSE = self.get_RMSE(pred, y_res)
        MSE = self.get_MSE(pred, y_res)
        MAE = self.get_MAE(pred, y_res)
        R2 = self.get_R2(pred, y_res)
        residuals = LinearRegressor.__get_residuals(pred, y_res)

        print()
        print('------------------METRICS REPORT-----------------\n')
        print(f'MSE: {MSE}\nRMSE: {RMSE}\nMAE: {MAE}\nR2: {R2}\n')

        if charts:

            if X_res.shape[1] == 1:
                LinearRegressor.__get_regplot(X_res, pred, y_res)
                print()

            LinearRegressor.__get_residual_plot(pred, residuals)


class RidgeRegressor(LinearRegressor):

    def __init__(self, alpha=1):
        super().__init__()
        self.alpha = alpha

    def _get_weights(self, input: np.ndarray, y: np.ndarray) -> np.ndarray:

        b = np.ones(shape=(input.shape[0], 1))
        input_b = np.hstack((input, b))

        weights = (np.linalg.inv((input_b.T @ input_b) + self.alpha * np.eye(input_b.shape[1])) @ input_b.T) @ y

        return weights
