In [1]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.base import BaseEstimator

In [2]:
class LinearReg(BaseEstimator):
    def __init__(self, gd_type='stochastic', 
                 tolerance=1e-6, max_iter=10000, w0=None, eta=0.0001, alpha=0.001):
        """
        gd_type: 'full' or 'stochastic' or 'momentum'
        tolerance: for stopping gradient descent
        max_iter: maximum number of steps in gradient descent
        w0: np.array of shape (d) - init weights
        eta: learning rate
        alpha: momentum coefficient
        """
        self.gd_type = gd_type
        self.tolerance = tolerance
        self.max_iter = max_iter
        self.w0 = w0
        self.alpha = alpha
        self.w = None
        self.eta = eta
        self.loss_history = None # list of loss function values at each training iteration
    
    def fit(self, X, y):
        self.loss_history = []
        self.w = self.w0
        iters = 0
        euclidean = 1
        weights_history = [self.w.copy()]
        self.loss_history.append(self.calc_loss(X, y))
        
        if self.gd_type == 'full':
            while iters < self.max_iter and euclidean > self.tolerance:
                self.calc_gradient(X, y)
                self.loss_history.append(self.calc_loss(X, y))
                weights_history.append(self.w.copy())
                euclidean = np.linalg.norm(weights_history[iters] - weights_history[iters + 1])
                iters+=1            
            return self
        
        elif self.gd_type == 'stochastic':
            while iters < self.max_iter and euclidean > self.tolerance:
                sample = np.random.randint(0, X.shape[0], size=batch_size)
                X_i = X[sample].reshape(batch_size,X.shape[1])
                y_i = y[sample].reshape(batch_size,1)
                self.calc_gradient(X_i, y_i)
                self.loss_history.append(self.calc_loss(X_i, y_i))
                weights_history.append(self.w.copy())
                euclidean = np.linalg.norm(weights_history[iters] - weights_history[iters + 1])
                iters+=1
            return self
        
        elif self.gd_type == 'momentum':
            self.vel = 0
            while iters < self.max_iter and euclidean > self.tolerance:
                sample = np.random.randint(0, X.shape[0], size=batch_size)
                X_i = X[sample].reshape(batch_size,X.shape[1])
                y_i = y[sample].reshape(batch_size,1)
                self.calc_gradient(X_i, y_i)
                self.loss_history.append(self.calc_loss(X_i, y_i))
                weights_history.append(self.w.copy())
                euclidean = np.linalg.norm(weights_history[iters] - weights_history[iters + 1])
                iters+=1
            return self
        else:
            raise Exception("gd_type must be 'full' or 'stochastic' or 'momentum'")
                
    def predict(self, X):
        if self.w is None:
            raise Exception('Not trained yet')
        return X.dot(self.w)
    
    def calc_gradient(self, X, y):
        if self.gd_type == 'momentum':
            self.vel = 0.9 * self.vel + self.eta * np.dot(X.T, np.dot(X, self.w) - y) / len(y)
            self.w -= self.vel
            return self.w
        else:
            self.w -= 2 * self.eta * np.dot(X.T, np.dot(X, self.w) - y) / len(y)
            return self.w

    def calc_loss(self, X, y):
        predictions = X.dot(self.w)
        return np.mean((y-predictions)**2)

**Проверка работы алгоритма на синтетическом датасете:**

In [3]:
n_features = 50
n_objects = 100000
batch_size = 5

w_real = [np.random.randint(1,10) for i in range(n_features)]
X = 20 * np.random.rand(n_objects, n_features)
y = X.dot(w_real) + np.random.normal(0, 1, (n_objects))
y = y.reshape(-1,1)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state = 420)

In [4]:
%%time
reg1 = LinearReg(gd_type='full')
reg1.w0 = np.random.randn(n_features, 1)
reg1.fit(X_train,y_train)
y_pred = reg1.predict(X_test)
print('Full GD results:')
print('Test MSE = %.4f' % mean_squared_error(y_test, y_pred))
print('Iterations taken:', len(reg1.loss_history))

Full GD results:
Test MSE = 0.9846
Iterations taken: 1767
Wall time: 43.6 s


In [5]:
%%time
reg2 = LinearReg(gd_type='stochastic')
reg2.w0 = np.random.randn(n_features, 1)
reg2.fit(X_train,y_train)
y_pred = reg2.predict(X_test)
print('SGD results:')
print('Test MSE = %.4f' % mean_squared_error(y_test, y_pred))
print('Iterations taken:', len(reg2.loss_history))

SGD results:
Test MSE = 1.0282
Iterations taken: 10001
Wall time: 1.05 s


In [6]:
%%time
reg3 = LinearReg(gd_type='momentum')
reg3.w0 = np.random.randn(n_features, 1)
reg3.fit(X_train, y_train)
y_pred = reg3.predict(X_test)
print('Momentum GD results:')
print('Test MSE = %.4f' % mean_squared_error(y_test, y_pred))
print('Iterations taken:', len(reg3.loss_history))

Momentum GD results:
Test MSE = 1.3919
Iterations taken: 10001
Wall time: 1.08 s
