In [6]:
import numpy as np
import pandas as pd

# Train test split

In [8]:
# Hàm tự định nghĩa để chia dữ liệu thành train/test
def custom_train_test_split(X, y, test_size=0.33, random_state=42):
    np.random.seed(random_state)
    n_samples = X.shape[0]
    indices = np.arange(n_samples)
    np.random.shuffle(indices)
    
    test_size = int(test_size * n_samples)
    train_indices = indices[test_size:]
    test_indices = indices[:test_size]
    
    X_train = X[train_indices]
    X_test = X[test_indices]
    y_train = y[train_indices]
    y_test = y[test_indices]
    
    return X_train, X_test, y_train, y_test

# Linear Regression model

In [13]:
class Model():
    def __init__(self, data, regression_type='standard', lr=0.01, epochs=100, batch_size=32, optimizer='batch', lambda_reg=0.01):
        # Chia dữ liệu thành train/test
        self.X_train, self.X_test, self.y_train, self.y_test = custom_train_test_split(
            data[:, :-1], data[:, -1], test_size=0.33, random_state=42)
        
        # Các tham số cơ bản
        self.lr = lr
        self.epochs = epochs
        self.batch_size = batch_size
        self.optimizer = optimizer
        self.regression_type = regression_type
        self.lambda_reg = lambda_reg  # Tham số điều chỉnh cho Ridge Regression
        
        # Chuẩn bị dữ liệu cho các công thức hồi quy
        self._prepare_features()
        
        # Khởi tạo trọng số
        self.weight = np.random.randn(self.X_train.shape[1], 1)
        
        # Adam parameters
        self.beta1 = 0.9
        self.beta2 = 0.999
        self.epsilon = 1e-8
        self.m = np.zeros_like(self.weight)
        self.v = np.zeros_like(self.weight)
        self.t = 0
    
    def _prepare_features(self):
        """Chuẩn bị dữ liệu đầu vào dựa trên loại hồi quy"""
        if self.regression_type == 'standard':
            pass
        elif self.regression_type == 'polynomial':
            self.X_train = np.vstack([self.X_train[:, 0]**2, self.X_train[:, 1], 
                                    self.X_train[:, 2]**2, self.X_train[:, 3]]).T
            self.X_test = np.vstack([self.X_test[:, 0]**2, self.X_test[:, 1], 
                                   self.X_test[:, 2]**2, self.X_test[:, 3]]).T
        elif self.regression_type == 'mixed':
            self.X_train = np.vstack([self.X_train[:, 0] + self.X_train[:, 1], 
                                    self.X_train[:, 2]**2, self.X_train[:, 3]]).T
            self.X_test = np.vstack([self.X_test[:, 0] + self.X_test[:, 1], 
                                   self.X_test[:, 2]**2, self.X_test[:, 3]]).T
        elif self.regression_type == 'interaction':
            self.X_train = np.vstack([self.X_train[:, 0] * self.X_train[:, 1], 
                                    self.X_train[:, 2]**2]).T
            self.X_test = np.vstack([self.X_test[:, 0] * self.X_test[:, 1], 
                                   self.X_test[:, 2]**2]).T
        else:
            raise ValueError("regression_type must be 'standard', 'polynomial', 'mixed', or 'interaction'")
    
    def predict(self, X):
        """Dự đoán giá trị y từ X và weights"""
        return np.dot(X, self.weight)
    
    def test_eval(self): 
        """Đánh giá lỗi trung bình tuyệt đối trên tập test"""
        y_hat = self.predict(self.X_test)
        return np.mean(np.abs(y_hat - self.y_test.reshape(-1, 1)))
    
    def gradient(self, X, y, y_hat):
        """Tính gradient cho hàm mất mát MSE (không có regularization)"""
        return 2 * np.dot(X.T, (y_hat - y.reshape(-1, 1))) / len(y)
    
    def ridge_gradient(self, X, y, y_hat):
        """Tính gradient cho Ridge Regression (có L2 regularization)"""
        grad = self.gradient(X, y, y_hat) + 2 * self.lambda_reg * self.weight
        return grad
    
    def batch_gradient_descent(self):
        """Batch Gradient Descent"""
        for epoch in range(self.epochs):
            y_hat = self.predict(self.X_train)
            grad = self.gradient(self.X_train, self.y_train, y_hat)
            self.weight -= self.lr * grad
            
            if epoch % 10 == 0:
                loss = np.mean((y_hat - self.y_train.reshape(-1, 1)) ** 2)
                print(f'Epoch {epoch}, Loss: {loss:.4f}')
    
    def stochastic_gradient_descent(self):
        """Stochastic Gradient Descent"""
        n_samples = len(self.X_train)
        indices = np.arange(n_samples)
        
        for epoch in range(self.epochs):
            np.random.shuffle(indices)
            total_loss = 0
            
            for idx in indices:
                X_i = self.X_train[idx:idx+1]
                y_i = self.y_train[idx:idx+1]
                y_hat = self.predict(X_i)
                grad = self.gradient(X_i, y_i, y_hat)
                self.weight -= self.lr * grad
                total_loss += np.mean((y_hat - y_i) ** 2)
            
            if epoch % 10 == 0:
                print(f'Epoch {epoch}, Average Loss: {total_loss/n_samples:.4f}')
    
    def adam_optimizer(self):
        """Adam Optimization"""
        for epoch in range(self.epochs):
            self.t += 1
            y_hat = self.predict(self.X_train)
            grad = self.gradient(self.X_train, self.y_train, y_hat)
            
            self.m = self.beta1 * self.m + (1 - self.beta1) * grad
            self.v = self.beta2 * self.v + (1 - self.beta2) * (grad ** 2)
            
            m_hat = self.m / (1 - self.beta1 ** self.t)
            v_hat = self.v / (1 - self.beta2 ** self.t)
            
            self.weight -= self.lr * m_hat / (np.sqrt(v_hat) + self.epsilon)
            
            if epoch % 10 == 0:
                loss = np.mean((y_hat - self.y_train.reshape(-1, 1)) ** 2)
                print(f'Epoch {epoch}, Loss: {loss:.4f}')
    
    def pseudo_inverse(self):
        """Pseudo-inverse method"""
        X = self.X_train
        y = self.y_train.reshape(-1, 1)
        self.weight = np.linalg.pinv(X.T @ X) @ X.T @ y
        loss = np.mean((self.predict(X) - y) ** 2)
        print(f'Pseudo-inverse Loss: {loss:.4f}')
    
    def ridge_analytical(self):
        """Ridge Regression với Analytical Solution"""
        X = self.X_train
        y = self.y_train.reshape(-1, 1)
        n_features = X.shape[1]
        I = np.eye(n_features)
        self.weight = np.linalg.inv(X.T @ X + self.lambda_reg * I) @ X.T @ y
        loss = np.mean((self.predict(X) - y) ** 2) + self.lambda_reg * np.sum(self.weight ** 2)
        print(f'Ridge Analytical Loss: {loss:.4f}')
    
    def ridge_gradient_descent(self):
        """Ridge Regression với Gradient Descent"""
        for epoch in range(self.epochs):
            y_hat = self.predict(self.X_train)
            grad = self.ridge_gradient(self.X_train, self.y_train, y_hat)
            self.weight -= self.lr * grad
            
            if epoch % 10 == 0:
                loss = np.mean((y_hat - self.y_train.reshape(-1, 1)) ** 2) + self.lambda_reg * np.sum(self.weight ** 2)
                print(f'Epoch {epoch}, Ridge Loss: {loss:.4f}')
    
    def least_squares(self):
        """Least Squares với np.linalg.lstsq"""
        X = self.X_train
        y = self.y_train.reshape(-1, 1)
        self.weight, residuals, rank, s = np.linalg.lstsq(X, y, rcond=None)
        loss = np.mean((self.predict(X) - y) ** 2) if residuals.size == 0 else residuals[0]
        print(f'Least Squares Loss: {loss:.4f}')
    
    def fit(self):
        """Huấn luyện mô hình với optimizer được chỉ định"""
        if self.optimizer == 'batch':
            self.batch_gradient_descent()
        elif self.optimizer == 'sgd':
            self.stochastic_gradient_descent()
        elif self.optimizer == 'adam':
            self.adam_optimizer()
        elif self.optimizer == 'pseudo':
            self.pseudo_inverse()
        elif self.optimizer == 'ridge_analytical':
            self.ridge_analytical()
        elif self.optimizer == 'ridge_gd':
            self.ridge_gradient_descent()
        elif self.optimizer == 'least_squares':
            self.least_squares()
        else:
            raise ValueError("Optimizer must be one of: 'batch', 'sgd', 'adam', 'pseudo', 'ridge_analytical', 'ridge_gd', 'least_squares'")

# Test

In [15]:
# Dữ liệu mẫu để kiểm tra
np.random.seed(42)
X = np.array([[1, 2, 3, 4], [2, 3, 4, 5], [3, 4, 5, 6], [4, 5, 6, 7], [5, 6, 7, 8]])
y = np.array([10, 15, 20, 25, 30])
data = np.hstack((X, y.reshape(-1, 1)))

# Thử nghiệm với các optimizer
print("Standard Linear Regression with Least Squares:")
model1 = Model(data, regression_type='standard', optimizer='least_squares')
model1.fit()
print(f"Test Error: {model1.test_eval():.4f}\n")

print("Polynomial Regression with Ridge Gradient Descent:")
model2 = Model(data, regression_type='polynomial', optimizer='ridge_gd', epochs=50, lambda_reg=0.1)
model2.fit()
print(f"Test Error: {model2.test_eval():.4f}\n")

print("Mixed Terms Regression with Adam:")
model3 = Model(data, regression_type='mixed', optimizer='adam', epochs=50)
model3.fit()
print(f"Test Error: {model3.test_eval():.4f}\n")

print("Interaction Terms Regression with Least Squares:")
model4 = Model(data, regression_type='interaction', optimizer='least_squares')
model4.fit()
print(f"Test Error: {model4.test_eval():.4f}")

Standard Linear Regression with Least Squares:
Least Squares Loss: 0.0000
Test Error: 0.0000

Polynomial Regression with Ridge Gradient Descent:
Epoch 0, Ridge Loss: 1709.1865
Epoch 10, Ridge Loss: 66875953316229920293288211906560.0000
Epoch 20, Ridge Loss: 2649573376120226973124785068083143982742290483811234705571840.0000
Epoch 30, Ridge Loss: 104974041151222040900252131509142434569025650139747425359534151855930059494010010036338688.0000
Epoch 40, Ridge Loss: 4158990052864434194613150512262897968897169911416808947354917558347271639030506773867310224623022526030952831255576576.0000
Test Error: 5711401360752179090527393873408653663500458931260987569596839555607035904.0000

Mixed Terms Regression with Adam:
Epoch 0, Loss: 463.4508
Epoch 10, Loss: 288.2138
Epoch 20, Loss: 163.2111
Epoch 30, Loss: 85.7249
Epoch 40, Loss: 45.6176
Test Error: 5.5119

Interaction Terms Regression with Least Squares:
Least Squares Loss: 0.1287
Test Error: 0.0655
