In [74]:
import numpy as np
from sklearn.datasets import make_regression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import r2_score
from sklearn.linear_model import SGDRegressor

In [124]:
import numpy as np
class SGD_Regressor:
    m, n = 0, 0
    theta = 0
    yHat = 0
    grad = 0
    X = 0
    y = 0
    alpha = 0
    max_iter = 0

    def __init__(self, learning_rate=0.01, max_iter= 1000):
        self.alpha = learning_rate
        self.max_iter = max_iter

    def fit(self, X, y):
        self.X = np.array(X)
        self.y = np.array(y).reshape(-1, 1)

        self.m, self.n = self.X.shape
        self.X = np.append(self.X, np.ones((self.m,1)), axis=1)
        self.theta = np.zeros((self.n+1, 1))
        self.yHat = self.__calc_yHat(self.X)
        self.__calc_theta()
    
    def __calc_theta(self):
        for _ in range(self.max_iter):
            self.__calc_grad()
            new_theta = self.theta + self.alpha * self.grad
            if self.__converged(new_theta):
                break
            self.theta = new_theta
        else:
            print("doesn't converge")


    def __converged(self, new_theta):
        condition1 = abs(self.grad) <= 10**-2
        condition2 = abs(self.theta - new_theta) <= 10**-6
        return condition1.all() or condition2.all()

    def __calc_yHat(self, X:np.ndarray):
        yHat =  X @ self.theta
        return yHat
    
    def __calc_grad(self):
        self.yHat = self.__calc_yHat(self.X)
        self.grad = self.X.T @ (self.y - self.yHat)
    
    def predict(self, X:np.ndarray):
        m = X.shape[0]
        X = np.append(X, np.ones((m,1)), axis=1)
        yHat = self.__calc_yHat(X)
        return yHat

In [125]:
X, y = make_regression(n_samples=1500, n_features=10, noise=1, random_state=42)
X.shape, y.shape

((1500, 10), (1500,))

In [126]:
# split the data set into train and test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=0)
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [135]:
my_SGD = SGD_Regressor(learning_rate=0.001)
my_SGD.fit(X_train, y_train)
my_SGD_y_train_preds = my_SGD.predict(X_train)

sk_SGD = SGDRegressor()
sk_SGD.fit(X_train, y_train)
sk_SGD_y_train_preds = sk_SGD.predict(X_train)

print(f"my-r2_score: {r2_score(y_train, my_SGD_y_train_preds)}")
print(f"sk-r2_score: {r2_score(y_train, sk_SGD_y_train_preds)}")

my-r2_score: 0.9999639157537062
sk-r2_score: 0.9999639034550705
