# Improved Implementation for Stochastic Linear Regression
with regularization.\
with efficiency improvements.\
Recommended to check cost after convergence.

In [1]:
import numpy as np

## Implementation

In [2]:
class LinearRegression:
    """Linear regression model with L2 regularization."""
    
    DEFAULT_EPOCHS = 1000
    DEFAULT_ALPHA = 0.01
    DEFAULT_LAMBDA = 0.0001
    DEFAULT_ERROR_THRESHOLD = 0.001
    DEFAULT_VALIDATION_SIZE = 0.2
    
    COUNTING = False


    def compute_cost(self, y, y_, Lambda, W, m):
        """Compute cost function with L2 regularization."""
        return np.mean((y-y_)**2) + ((np.sum(W**2)) * Lambda/(2*m))
    
    
    def validation_split(self, X, y, validation_size=DEFAULT_VALIDATION_SIZE):
        """Splits X and y into train and validatation set"""
        val = int(X.shape[0] * (1 - validation_size))
        return X[:val], y[:val], X[val:], y[val:]
    
    
    def log_current(self, k, num_out, output_limit,cost, vcost):
        """Log current training information."""
        print(f"({k//num_out}/{output_limit}) > Epoch: {k}",
              f"cost: {cost:.8f}",
              f"vCost: {vcost:.8f}")
        
    
    def convergence_test(self, current_cost, past_cost, error_threshold, k):
        # Simple convergence test
        if  (past_cost - current_cost < error_threshold):
            COUNTING = True
            self.c+=1
            if self.c >= 10:
                print(f"Past: {past_cost}, Curr: {current_cost}")
                print(f"\nEpoch: {k} vCost Converged with threshold: {error_threshold}. OR performance degraded.")
                self.EXIT = True
                
            # Also returns in case of validation perf degradation (overfit)
            return True
        else: 
            COUNTING = False
            self.c=0

    def single_step(self, Xi, yi, m, W, b, alpha, Lambda):
        """Perform a single step of gradient descent."""
        
        y_i = np.dot(Xi, W) + b 
        res = yi - y_i
        
        dJ_dW = np.dot(res, Xi)  - Lambda * W
        dJ_db = res.mean()

        W += dJ_dW * alpha / m
        b += dJ_db * alpha

        return W,b
    
    def fit(self, X, y,
            epochs = DEFAULT_EPOCHS,
            alpha = DEFAULT_ALPHA,
            Lambda=DEFAULT_LAMBDA,
            error_threshold = DEFAULT_ERROR_THRESHOLD,
            validation_size = DEFAULT_VALIDATION_SIZE,
            output_limit=10):
        """Fit the linear regression model to the given data.
        
        Parameter
        ---------
        epochs: int, default=1000
            Number of complete iterations through X

        alpha : float, default=0.01
            Constant Learning Rate

        Lambda : float, default=0.0001
            Rate for l2 Regularization

        output_limit : int, default=10
            Number of iterations to show

        Returns
        -------
        W : numpy.ndarray
            The optimized weights.
        b : numpy.longdouble
            The optimized itercept.
        """
 
        if output_limit<=0:
            raise ValueError("Output limit should be greater than 0")
        
        num_out = epochs//output_limit
        np.set_printoptions(precision=4)
        
        X, y, X_val, y_val = self.validation_split(X,y, validation_size)
        m,n = X.shape
        
        W = np.random.rand(n)
        b = np.random.rand()
        
        y_ = np.dot(X,W) + b
        y_val_ = np.dot(X_val,W) + b

        cost = self.compute_cost(y,y_,Lambda,W,m)
        past_cost = self.compute_cost(y_val,y_val_,Lambda,W,m)
        
        self.c=0 # to count convergence for consecutive iterations
        self.EXIT = False # Exit flag for convergence
        
        self.log_current(0, num_out, output_limit, cost, past_cost) # Initial Out

        try:
            for k in range(1, epochs+1):
                for i in range(m):
                    W,b = self.single_step(X[i], y[i], m, W, b, alpha, Lambda)
                


                if k % num_out == 0:
                    y_ = np.dot(X,W) + b
                    y_val_ = np.dot(X_val,W) + b
                    
                    cost = self.compute_cost(y,y_,Lambda,W,m)
                    vcost = self.compute_cost(y_val,y_val_,Lambda,W,m)
                    
                    self.log_current(k, num_out, output_limit, cost, vcost)
                
                
                y_val_ = np.dot(X_val,W) + b
                current_cost = self.compute_cost(y_val,y_val_,Lambda,W,m) # vCost
                if self.convergence_test(current_cost, past_cost, error_threshold, k):
                    if self.EXIT:
                        return (W, b)
                past_cost = current_cost

                    
                    
        except KeyboardInterrupt:
            print(f"\nTerminated! Returned: Weights: {W}, Bias: {b}")
            return (W, b)
        return (W, b)

## Usage

In [3]:
m = LinearRegression()
X = np.random.rand(1000,2)
y = 5.55*X[:,0] + 11.22*X[:,1] + 50
m.fit(X, y ,epochs= 1000, alpha = 0.1, error_threshold = 0.0001, output_limit=10)

(0/10) > Epoch: 0 cost: 3296.22020254 vCost: 3318.95372763
(1/10) > Epoch: 100 cost: 2.51389679 vCost: 2.35991448
(2/10) > Epoch: 200 cost: 0.46209224 vCost: 0.43522069
(3/10) > Epoch: 300 cost: 0.08628342 vCost: 0.08155640
(4/10) > Epoch: 400 cost: 0.01658474 vCost: 0.01573418
Past: 0.00546643265440494, Curr: 0.005380941469931168

Epoch: 467 vCost Converged with threshold: 0.0001. OR performance degraded.


(array([ 5.41  , 11.0225]), 50.14072815374933)

In [4]:
from sklearn.datasets import make_regression

In [5]:

X, y = make_regression(n_samples=1000,n_features=15, n_informative=8)
m = LinearRegression()
m.fit(X, y ,epochs= 100, alpha = 0.5, error_threshold = 0.0001, output_limit=10)

(0/10) > Epoch: 0 cost: 24622.63849604 vCost: 23589.21474366
(1/10) > Epoch: 10 cost: 2.82533122 vCost: 3.08274955
(2/10) > Epoch: 20 cost: 0.00330751 vCost: 0.00350401
(3/10) > Epoch: 30 cost: 0.00195199 vCost: 0.00195039
Past: 0.0019418035479161198, Curr: 0.0019406333968708868

Epoch: 34 vCost Converged with threshold: 0.0001. OR performance degraded.


(array([-1.6195e-04,  8.0211e+01, -1.2919e-03,  7.1480e-04,  1.1354e-04,
         5.1150e+01,  8.8775e+01, -6.7683e-04,  5.8189e+01,  4.2497e+01,
         3.7029e-04,  7.0394e-01,  8.9075e+00,  6.6599e+01, -1.5054e-03]),
 -0.001320720106801069)

In [6]:
from sklearn.datasets import fetch_california_housing

In [7]:
X,y = fetch_california_housing(return_X_y=True)
X = X[:100]
y = y[:100]

In [8]:
m = LinearRegression()
m.fit(X, y ,epochs= 10000, alpha = 0.0001, error_threshold = 0.0001, output_limit=10)

(0/10) > Epoch: 0 cost: 600622.60615806 vCost: 1133748.22251306
(1/10) > Epoch: 1000 cost: 1.06732488 vCost: 2.44941944
(2/10) > Epoch: 2000 cost: 0.89299040 vCost: 2.30068927
(3/10) > Epoch: 3000 cost: 0.82875133 vCost: 2.14981130
(4/10) > Epoch: 4000 cost: 0.77588701 vCost: 2.02006497
(5/10) > Epoch: 5000 cost: 0.73191281 vCost: 1.90873490
Past: 1.887837853773696, Curr: 1.8877380058108637

Epoch: 5207 vCost Converged with threshold: 0.0001. OR performance degraded.


(array([ 2.1277e-01, -2.8978e-03,  4.4214e-01,  7.8433e-01,  5.4718e-04,
         1.1769e-01,  7.7065e-01,  2.6917e-01]),
 1.250649401879181)