This notebook performs the flowing two major operations:

1.   Generate single variable (+ bias) sample data with linear relationship 
2.   Perform gradient descent Linear Regression training.

The main purpose of this notebook is to demonstrate the working principle of linear regression for the single variable (+ bias) case which can be easily extended to n valiable case (polinomial regression).



In [1]:
import numpy as np
import pandas as pd
import random
import plotly.express as px

**Class to generate single variable (+ bias) linear data with random Gaussian noise**

In [2]:
class LinearDataWithRandomNoise:
  def __init__(self, intercept=0, slope=1, nb_of_sample=100):
    self.slope = slope
    self.intercept = intercept
    self.nb_of_sample = nb_of_sample
    self.data = self._generate_data()
  
  def _linear(self, x):
    return self.slope*x + self.intercept

  def _generate_data(self):
    errors = np.random.normal(scale=0.15, size=self.nb_of_sample)
    x = np.random.uniform(size=self.nb_of_sample)
    df = pd.DataFrame({'x': x, 'e': errors})
    df['y'] = df.apply(lambda x:self._linear(x.x) + x.e, axis=1)

    return df
  
  def get_data(self):
    return self.data.sort_values('x')

**Gradient descent Linear Regression Model**

In [3]:
class LinearRegressionGradientDescent:
    def __init__(self, intercept=0, 
                 slope=1, 
                 learning_rate=0.01, 
                 nb_iterations=100, 
                 log_results=True):
        self.beta_0 = intercept
        self.beta_1 = slope
        self.learning_rate = learning_rate
        self.nb_iterations = nb_iterations
        self.log_results = log_results
        self.intermediate_y = {}
        self.training_error = {}

    def _linear(self, x):
        return self.beta_0 + self.beta_1 * x

    def fit(self, X, y):
        self.intermediate_y['start'] = self._linear(X)
        for i in range(self.nb_iterations):
            gradient_beta_0 = np.sum(self.beta_0 + self.beta_1 * X - y)
            gradient_beta_1 = np.sum((self.beta_0 + self.beta_1 * X - y) * X)
            # we are minimizing, so graidents are made negative
            self.beta_0 -= self.learning_rate * gradient_beta_0
            self.beta_1 -= self.learning_rate * gradient_beta_1
            if self.log_results:
                y_hat = self._linear(X)
                self.training_error[i] = [0.5 * np.sum((y_hat - y) ** 2)]
                self.intermediate_y[i] = y_hat
                print('loss: {}'.format(self.training_error[i][0]))

    def get_model_params(self):
        return self.beta_0, self.beta_1

    def get_intermediate_y(self):
        return self.intermediate_y

    def get_training_loss(self):
        df = pd.DataFrame(self.training_error).T
        df = df.rename(columns={df.columns[0]: "squared_loss"})
        return df

In [4]:
def display_training_progression(train_data, intermediate_y, 
                                 initial_iterations=2):
    nb_data_points = len(intermediate_y) - 1
    mid_point_index = nb_data_points // 2
    fig = px.scatter(x=data.x, y=data.y)
    fig.add_scatter(x=data.x, y=intermediate_y['start'], name="start")
    for i in range(initial_iterations):
        fig.add_scatter(x=data.x, y=intermediate_y[i], 
                        name="model iter_{}".format(i))
    if mid_point_index > initial_iterations:
        fig.add_scatter(x=data.x, y=intermediate_y[mid_point_index], 
                        name="model (iter_{})".format(mid_point_index))
    if nb_data_points > initial_iterations:
        fig.add_scatter(x=data.x, y=intermediate_y[nb_data_points - 1],
                        name="model (iter_{})".format(nb_data_points - 1))
    fig.show()

**Generate some sample training data**

In [5]:
NB_OF_SAMPLE_DATA_POINTS = 100
PARAM_INTERCEPT = 0
PARAM_SLOPE = 1
data = LinearDataWithRandomNoise(intercept=PARAM_INTERCEPT, slope=PARAM_SLOPE,
                              nb_of_sample=NB_OF_SAMPLE_DATA_POINTS).get_data()
fig = px.scatter(x=data.x, y=data.y)
fig.show()

**Gradient descent training**

In [6]:
PARAM_INTERCEPT_START = 2
PARAM_SLOPE_START = -1
my_lr = LinearRegressionGradientDescent(intercept=PARAM_INTERCEPT_START, 
                                    slope=PARAM_SLOPE_START, nb_iterations=100)
my_lr.fit(data.x, data.y)
loss = my_lr.get_training_loss().squared_loss
fig = px.scatter(x=loss.index, y=loss).update_layout(
    xaxis_title="training iteration", yaxis_title="loss")
fig.show()

loss: 22.076139034499036
loss: 16.883328114717504
loss: 14.827780063137999
loss: 13.237968931854283
loss: 11.850183951927749
loss: 10.622251612897498
loss: 9.534234148047709
loss: 8.570050113277532
loss: 7.7155926628689535
loss: 6.958373480985164
loss: 6.287326632710303
loss: 5.6926456069421505
loss: 5.165639893553471
loss: 4.698607972310232
loss: 4.284724763983266
loss: 3.917941890385821
loss: 3.5928992849894077
loss: 3.3048468623486293
loss: 3.049575101626589
loss: 2.823353529786541
loss: 2.6228762054595993
loss: 2.4452134068043616
loss: 2.287768817337793
loss: 2.148241584062582
loss: 2.0245926934186182
loss: 1.9150151736857814
loss: 1.8179076883839185
loss: 1.731851134771072
loss: 1.6555879054566738
loss: 1.5880035100644743
loss: 1.528110288369143
loss: 1.4750329768946644
loss: 1.4279959180486816
loss: 1.3863117248705044
loss: 1.3493712357424508
loss: 1.3166346122653718
loss: 1.28762345020509
loss: 1.2619137882212499
loss: 1.2391299122100154
loss: 1.2189388647189352
loss: 1.20104557

**Display intermediate + final models**

In [7]:
print('learned model (intercept, slope): ', my_lr.get_model_params())
display_training_progression(data, my_lr.get_intermediate_y())

learned model (intercept, slope):  (0.05354849046746111, 0.8729966490454498)


**Batch Gradident descent**

In [34]:
class LinearRegressionBatchGradientDescent:
    def __init__(self, intercept=0, 
                 slope=1, 
                 learning_rate=0.01, 
                 batch_size=1,
                 nb_iterations=100, 
                 log_results=True):
        self.beta_0 = intercept
        self.beta_1 = slope
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.nb_iterations = nb_iterations
        self.log_results = log_results
        self.intermediate_y = {}
        self.training_error = {}

    def _linear(self, x):
        return self.beta_0 + self.beta_1 * x

    def fit(self, X, y):
        self.intermediate_y['start'] = self._linear(X)
        for i in range(self.nb_iterations):
            index_samples = random.sample(range(0, len(X)), self.batch_size)
            batch_X = np.array([X[idx] for idx in index_samples])
            batch_y = np.array([y[idx] for idx in index_samples])
            gradient_beta_0 = np.sum(self.beta_0 + self.beta_1 * batch_X - batch_y)
            gradient_beta_1 = np.sum((self.beta_0 + self.beta_1 * batch_X - batch_y) * batch_X)
            # we are minimizing, so graidents are made negative
            self.beta_0 -= self.learning_rate * gradient_beta_0
            self.beta_1 -= self.learning_rate * gradient_beta_1
            if self.log_results:
                y_hat = self._linear(X)
                self.training_error[i] = [0.5 * np.sum((y_hat - y) ** 2)]
                self.intermediate_y[i] = y_hat
                print('loss: {}'.format(self.training_error[i][0]))

    def get_model_params(self):
        return self.beta_0, self.beta_1

    def get_intermediate_y(self):
        return self.intermediate_y
    def get_training_loss(self):
        df = pd.DataFrame(self.training_error).T
        df = df.rename(columns={df.columns[0]: "squared_loss"})
        return df

In [41]:
random.seed(100)
BATCH_SIZE = 10
my_lr = LinearRegressionBatchGradientDescent(intercept=PARAM_INTERCEPT_START, 
                                    slope=PARAM_SLOPE_START, 
                                    batch_size = BATCH_SIZE,
                                    nb_iterations=300)
my_lr.fit(data.x, data.y)
loss = my_lr.get_training_loss().squared_loss
fig = px.scatter(x=loss.index, y=loss).update_layout(
    xaxis_title="training iteration", yaxis_title="loss")
fig.show()

loss: 47.28594255975534
loss: 41.99855873756983
loss: 35.39000488995733
loss: 31.571959038712883
loss: 28.88537511992401
loss: 25.709363284002553
loss: 25.172357087190235
loss: 22.865298223033314
loss: 21.5973373848757
loss: 20.142495254807738
loss: 20.309344808039217
loss: 19.76654209223014
loss: 20.179811023809535
loss: 19.73832990608406
loss: 19.364213401829762
loss: 19.148228772809777
loss: 18.755054826964244
loss: 18.442451927005862
loss: 18.366385585241243
loss: 17.708048994634822
loss: 17.373984029181038
loss: 17.113567763957864
loss: 16.81354067968243
loss: 16.577552080493426
loss: 16.46809845244259
loss: 16.01454121636361
loss: 15.882554751362576
loss: 15.665951535707414
loss: 15.551627576294173
loss: 15.246150256746951
loss: 15.122681350029737
loss: 14.917116796874474
loss: 14.76370124952635
loss: 14.591044914383348
loss: 14.475462507457761
loss: 14.361535695074004
loss: 14.099290491118401
loss: 14.026231166282765
loss: 13.976963250273139
loss: 13.867290734031553
loss: 13.668

**Display intermediate + final models**

In [42]:
print('learned model (intercept, slope): ', my_lr.get_model_params())
display_training_progression(data, my_lr.get_intermediate_y())

learned model (intercept, slope):  (0.24704168082193653, 0.4785484504686116)
