In [30]:
import numpy as np
import pandas as pd
from sklearn.metrics import mean_squared_error, mean_absolute_error
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
import statsmodels.api as sm
from sklearn.linear_model import LinearRegression

In [70]:
from sklearn.base import BaseEstimator


class LinReg(BaseEstimator):
    def __init__(
        self,
        delta=1.0,
        gd_type="Momentum",
        tolerance=1e-4,
        max_iter=2000,
        w0=None,
        eta=1e-2,
        alpha=1e-3,
        epsilon=1e-2,
        reg_cf=1e-2,
        rms_prop_cf=1 - 1e-7,
    ):
        """
        gd_type: str
            'GradientDescent', 'StochasticDescent', 'Momentum', 'Adagrad'
        delta: float
            proportion of object in a batch (for stochastic GD)
        tolerance: float
            for stopping gradient descent
        max_iter: int
            maximum number of steps in gradient descent
        w0: np.array of shape (d)
            init weights
        eta: float
            learning rate
        alpha: float
            momentum coefficient
        reg_cf: float
            regularization coefficient
        epsilon: float
            numerical stability
        """

        self.delta = delta
        self.gd_type = gd_type
        self.tolerance = tolerance
        self.max_iter = max_iter
        self.w0 = w0
        self.alpha = alpha
        self.w = None
        self.eta = eta
        self.loss_history = (
            None  # list of loss function values at each training iteration
        )
        self.epsilon = epsilon
        self.reg_cf = reg_cf
        self.rms_prop_cf = rms_prop_cf

    def fit(self, X, y):
        """
        X: np.array of shape (l, d)
        y: np.array of shape (l)
        ---
        output: self
        """
        if self.gd_type == "GradientDescent":
            self.loss_history = []
            if self.w0 is None:
                self.w = np.random.normal(size=X.shape[1])
            else:
                self.w = self.w0
            for i in range(self.max_iter):
                self.loss_history.append(self.calc_loss(X, y))
                w_new = self.w - self.eta * self.calc_gradient(X, y)
                if (np.abs(w_new - self.w)).mean() < self.tolerance:
                    break
                self.w = w_new

        elif self.gd_type == "StochasticDescent":
            self.loss_history = []
            if self.w0 is None:
                self.w = np.random.normal(size=X.shape[1])
            else:
                self.w = self.w0
            for i in range(self.max_iter):
                X_sample = X.sample(frac=self.delta, random_state=i)
                y_sample = y.sample(frac=self.delta, random_state=i)
                self.loss_history.append(self.calc_loss(X_sample, y_sample))
                w_new = self.w - self.eta * self.calc_gradient(X_sample, y_sample)
                if (np.abs(w_new - self.w)).mean() < self.tolerance:
                    break
                self.w = w_new

        elif self.gd_type == "Momentum":
            self.loss_history = []
            if self.w0 is None:
                self.w = np.random.normal(size=X.shape[1])
            else:
                self.w = self.w0
            h = 0
            for i in range(self.max_iter):
                self.loss_history.append(self.calc_loss(X, y))
                h_new = self.alpha * h + self.eta * self.calc_gradient(X, y)
                w_new = self.w - h_new
                if (np.abs(w_new - self.w)).mean() < self.tolerance:
                    break
                self.w = w_new
                h = h_new

        elif self.gd_type == "RMSProp":
            self.loss_history = []
            if self.w0 is None:
                self.w = np.random.normal(size=X.shape[1])
            else:
                self.w = self.w0
            g = 0
            for i in range(self.max_iter):
                self.loss_history.append(self.calc_loss(X, y))
                g_new = (
                    self.rms_prop_cf * g
                    + (1 - self.rms_prop_cf) * (self.calc_gradient(X, y) ** 2)
                )
                w_new = self.w - (self.eta * self.calc_gradient(X, y)) / np.sqrt(
                    g_new + self.epsilon
                )
                if (np.abs(w_new - self.w)).mean() < self.tolerance:
                    break
                self.w = w_new
                g = g_new

        return self

    def predict(self, X):
        if self.w is None:
            raise Exception("Not trained yet")
        else:
            return X @ self.w

    def calc_gradient(self, X, y):
        """
        X: np.array of shape (l, d) (l can be equal to 1 if stochastic)
        y: np.array of shape (l)
        ---
        output: np.array of shape (d)
        """
        return (2 / X.shape[0]) * (np.array(X.T) @ ((np.array(X) @ self.w) - y)) - (
            2 * self.reg_cf * self.w
        )

    def calc_loss(self, X, y):
        """
        X: np.array of shape (l, d)
        y: np.array of shape (l)
        ---
        output: float
        """
        return mean_squared_error(self.predict(X), y)


**Test on seaborn diamonds' prices dataset**

In [9]:
data = sns.load_dataset('diamonds')

y = data.price
X = data.drop(['price'], axis=1)
columns = data.drop(['price'], axis=1).columns

Preprocessing

In [None]:
X.replace({"cut": {'Fair': 1, 'Good': 2, 'Very Good': 3, 'Premium': 4, 'Ideal': 5}}, inplace = True)
X.replace({"color": {'J': 1, 'I': 2, 'H': 3, 'G': 4, 'F': 5, 'E': 6, 'D': 7}}, inplace = True)
X.replace({"clarity": {'I1': 1, 'SI2': 2, 'SI1': 3, 'VS2': 4, 'VS1': 5, 'VVS2': 6, 'VVS1': 7, 'IF': 8}}, inplace = True)

In [11]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=17)

In [12]:
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
X_train = pd.DataFrame(X_train_scaled, index=X_train.index)
X_test = pd.DataFrame(X_test_scaled, index=X_test.index)

In [13]:
X_train_wconst = sm.add_constant(X_train)
X_test_wconst = sm.add_constant(X_test)

Default Gradient descent realization:

In [39]:
gd = LinReg(gd_type='GradientDescent')
gd.fit(X_train_wconst, y_train)
mean_absolute_error(gd.predict(X_test_wconst), y_test)

846.6513035125685

Stochastic Gradient descent realization:

In [33]:
sd = LinReg(max_iter=10000, gd_type='StochasticDescent', delta=0.05)
sd.fit(X_train_wconst, y_train)
mean_absolute_error(sd.predict(X_test_wconst), y_test)

811.4811135181791

Momentum Gradient descent realization:

In [40]:
momentum = LinReg(gd_type='Momentum', alpha=0.5)
momentum.fit(X_train_wconst, y_train)
mean_absolute_error(momentum.predict(X_test_wconst), y_test)

812.9202353028369

RMSProp Gradient descent realization:

In [71]:
rmsprop = LinReg(gd_type='RMSProp', eta=0.1)
rmsprop.fit(X_train_wconst, y_train)
mean_absolute_error(rmsprop.predict(X_test_wconst), y_test)

806.9510642812699

Compare with default sklearn linreg:

In [31]:
model = LinearRegression()
model.fit(np.array(X_train_wconst), y_train)
mean_absolute_error(model.predict(np.array(X_test_wconst)), y_test)

812.8555360600542

As we can see, my implementation is comparable in quality to sklearn's implementation