In [4]:
import numpy as np
from scipy.linalg import svd
from sklearn.model_selection import train_test_split
import pandas as pd
from PreProcess import preprocess

In [5]:
class LinearRegressionWithNoise:
    def __init__(self, A, y, is_y_inconsistent=False, scale=0.5):
        """
        Initialize the Linear Regression model with the given dataset.
        
        Parameters:
        A (numpy.ndarray): Feature matrix.
        y (numpy.ndarray): Target variable.
        is_y_inconsistent (bool): Whether to introduce inconsistency in y.
        scale (float): Scale of inconsistency if applied.
        """
        self.A = np.hstack([np.ones((A.shape[0], 1)), A])  # Add intercept column
        self.y = y.reshape(-1, 1)
        self.is_y_inconsistent = is_y_inconsistent
        
        if self.is_y_inconsistent:
            self.y = self.make_y_inconsistent(scale)
            print("Using inconsistent y for training.")
        else:
            print("Using consistent y for training.")
        
        # Split data into training and testing sets (80% train, 20% test)
        self.A_train, self.A_test, self.y_train, self.y_test = train_test_split(
            self.A, self.y, test_size=0.2, random_state=42
        )
        
        self.is_full_col_rank = np.linalg.matrix_rank(self.A_train) == self.A_train.shape[1]
        
        if self.is_full_col_rank:
            print("Matrix A_train is full column rank. Proceeding with standard least squares solution.")
        else:
            print("Matrix A_train is not full column rank. Using SVD-based pseudo-inverse.")
        
    def make_y_inconsistent(self, scale):
        """
        Modifies y to ensure inconsistency by adding a component orthogonal to A.
        """
        U, S, Vt = svd(self.A, full_matrices=True)
        null_space_vectors = U[:, len(S):]

        if null_space_vectors.shape[1] == 0:
            print("Warning: A has full rank, hard to create inconsistency.")
            return self.y
        
        random_null_component = null_space_vectors @ np.random.randn(null_space_vectors.shape[1], 1)
        return self.y + scale * random_null_component
    
    def compute_pseudo_inverse(self, A):
        """
        Computes the pseudo-inverse of A.
        """
        rank = np.linalg.matrix_rank(A)
        if rank == A.shape[1]:
            try:
                return np.linalg.inv(A.T @ A) @ A.T
            except np.linalg.LinAlgError:
                print("Unexpected singular matrix, using SVD instead.")
                return self.compute_generalized_inverse(A)
        else:
            return self.compute_generalized_inverse(A)
    
    def compute_generalized_inverse(self, A):
        """
        Computes the Moore-Penrose pseudo-inverse using SVD.
        """
        U, S, Vt = svd(A, full_matrices=False)
        S_inv = np.diag([1/s if s > 1e-10 else 0 for s in S])
        return Vt.T @ S_inv @ U.T
    
    def solve_linear_regression(self):
        """
        Solves the linear regression problem using a manually computed pseudo-inverse.
        """
        A_pinv_train = self.compute_pseudo_inverse(self.A_train)
        return A_pinv_train @ self.y_train
    
    def correct_regression_coefficients(self, x0):
        """
        Computes corrected regression coefficients to handle inconsistency.
        """
        y_hat_train = self.A_train @ x0
        e_out = y_hat_train - self.y_train
        A_pinv_train = self.compute_pseudo_inverse(self.A_train)
        e_in = A_pinv_train @ e_out
        x_corrected = x0 + e_in
        mse_corrected = np.mean((self.y_train - self.A_train @ x_corrected) ** 2)
        return x_corrected, mse_corrected
    
    def run_regression(self):
        x0 = self.solve_linear_regression()
        print("Initial Linear Regression Coefficients (including intercept):")
        print(x0)
        
        if self.is_y_inconsistent:
            x_corrected, mse_corrected = self.correct_regression_coefficients(x0)
            print("Corrected Linear Regression Coefficients (including intercept):")
            print(x_corrected)
            print(f"Corrected Train MSE: {mse_corrected}")
            self.x_final = x_corrected
        else:
            y_hat_train = self.A_train @ x0
            mse_train = np.mean((self.y_train - y_hat_train) ** 2)
            print(f"Train MSE (Standard Regression - y is consistent): {mse_train}")
            self.x_final = x0
        
        # Predict on test set and evaluate
        y_hat_test = self.A_test @ self.x_final
        mse_test = np.mean((self.y_test - y_hat_test) ** 2)
        print(f"Test MSE: {mse_test}")


In [6]:
df = preprocess("housing_data.csv", "MEDV", ["RM", "AGE", "DIS", "LSTAT"])
y = df["MEDV"].values
df = df.drop(columns=["MEDV"])
A = df.to_numpy()

model = LinearRegressionWithNoise(A, y, is_y_inconsistent=True)
model.run_regression()

Using inconsistent y for training.
Matrix A_train is full column rank. Proceeding with standard least squares solution.
Initial Linear Regression Coefficients (including intercept):
[[22.02180094]
 [ 2.94234333]
 [-0.49567393]
 [-0.9987891 ]
 [-5.29356655]]
Corrected Linear Regression Coefficients (including intercept):
[[22.02180094]
 [ 2.94234333]
 [-0.49567393]
 [-0.9987891 ]
 [-5.29356655]]
Corrected Train MSE: 22.926613200707987
Test MSE: 21.92022512203619
