In [38]:
%matplotlib notebook


In [39]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import MinMaxScaler

# Load data set

In [40]:
from sklearn.datasets import load_boston
X, y = load_boston(return_X_y=True)
boston_df = pd.DataFrame(
    X, columns=['CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS', 'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT']
)

In [41]:
boston_df.head(10)

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0,0.00632,18.0,2.31,0.0,0.538,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98
1,0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14
2,0.02729,0.0,7.07,0.0,0.469,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03
3,0.03237,0.0,2.18,0.0,0.458,6.998,45.8,6.0622,3.0,222.0,18.7,394.63,2.94
4,0.06905,0.0,2.18,0.0,0.458,7.147,54.2,6.0622,3.0,222.0,18.7,396.9,5.33
5,0.02985,0.0,2.18,0.0,0.458,6.43,58.7,6.0622,3.0,222.0,18.7,394.12,5.21
6,0.08829,12.5,7.87,0.0,0.524,6.012,66.6,5.5605,5.0,311.0,15.2,395.6,12.43
7,0.14455,12.5,7.87,0.0,0.524,6.172,96.1,5.9505,5.0,311.0,15.2,396.9,19.15
8,0.21124,12.5,7.87,0.0,0.524,5.631,100.0,6.0821,5.0,311.0,15.2,386.63,29.93
9,0.17004,12.5,7.87,0.0,0.524,6.004,85.9,6.5921,5.0,311.0,15.2,386.71,17.1


# Select one feature

In [42]:
boston_df = boston_df[['RM']]
boston_df['target'] = y
print(boston_df.head())
boston_df.describe()

      RM  target
0  6.575    24.0
1  6.421    21.6
2  7.185    34.7
3  6.998    33.4
4  7.147    36.2


Unnamed: 0,RM,target
count,506.0,506.0
mean,6.284634,22.532806
std,0.702617,9.197104
min,3.561,5.0
25%,5.8855,17.025
50%,6.2085,21.2
75%,6.6235,25.0
max,8.78,50.0


In [43]:
boston_df.plot.scatter('RM', 'target')

<IPython.core.display.Javascript object>

<AxesSubplot:xlabel='RM', ylabel='target'>

# Custom Linear Regression Classifier

In [44]:
def get_features(features= None):
    X, y = load_boston(return_X_y=True)

    if features is None:
        print ('Selecting all features')
        
    elif type(features) == int or (type(features) == list and len(features)==1):
        print (f'Selecting one feature: {features}')
        X= X[:,features].reshape(-1,1) # single column 
    elif type(features) == list: 
        print (f'Selecting features list: {features}')
        X = X[:,features]
    else: 
        print ('wrong format of parameter "features"')
        return

    return X, y

In [45]:
X, y = get_features(5)

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=2018)

Selecting one feature: 5


# Check loaded data

In [46]:
print ('X_train.shape= ',X_train.shape)
print ('y_train.shape= ',y_train.shape)
X_train[:10]

X_train.shape=  (379, 1)
y_train.shape=  (379,)


array([[6.009],
       [5.648],
       [5.885],
       [8.297],
       [6.471],
       [4.97 ],
       [6.63 ],
       [6.678],
       [5.454],
       [8.78 ]])

# Develop expresion of h

In [47]:
class Linear_Regression_1():
    def __init__(self):
        pass
            
    def h(self, b, w, X): 
        '''
        :param b -  float or ndarry of shape [m,1], m - number of samples
        :param w - ndarray of shape [1,m],  n - number of features
        :param X - ndarray of shape [m,n], m - number of samples, n - number of features
        '''
        assert (X.shape[1]== w.shape[1])

        h_res = b + w * X
        
        return h_res

In [48]:
#Check h
np.random.seed(2018)
b_check = np.random.randn()
w_check = np.random.randn(1,1)
X_check = np.random.randn(10,1)
print(f'b= {b_check}, \nw= {w_check}, \nX= \n{X_check}')
lin_reg_1 = Linear_Regression_1()
lin_reg_1.h(b_check, w_check, X_check)

TypeError: 'int' object is not callable

# Develop expresion of Cost Function

In [None]:
class Linear_Regression_2():
    '''linear regression using gradient descent
    '''
    def __init__(self):
        pass


    def J (self, h, y):      
        '''
        :param h - ndarray of shape (m,1)
        :param y - ndarray of shape (m,1)
        :return expression for cost function 
        '''
        if h.shape !=y.shape:
            print('h.shape = {} does not match y.shape = {}.Expected {}'.format (h.shape, y.shape, (self.m,1)))
            raise Exception('Check assertion in J')    
   
        J_res = 1 / (2 * m) * np.sum((h - y) ** 2)
        
        return J_res   

In [None]:
np.random.seed(2019)
m = 10 
y_check= np.random.randn(m,1)
h_check= np.random.randn(m,1)
print(f'y= {y_check}, \nh= {h_check}')
lin_reg_2 = Linear_Regression_2()
lin_reg_2.m = m 
lin_reg_2.J(h_check, y_check)

# Develop expresion of Cost Function derivative

In [None]:
class Linear_Regression_3():
    def __init__(self, max_iter = 1e5, alpha = 1,eps = 1e-10, verbose= 0):
        pass        

    def h(self, b, w, X): 
        '''
        :param b -  float or ndarry of shape [m,1], m - number of samples
        :param w - ndarray of shape [1,m],  n - number of features
        :param X - ndarray of shape [m,n], m - number of samples, n - number of features
        '''
        assert (X.shape[1]== w.shape[1])

        h_res = b + w * X

        return h_res
        
    def J_derivative(self, params, X, y): 
        '''
        :param params - tuple (b,w), where w is the 2d ndarry of shape (1,n), n- number of features 
        :param X- ndarray of shape (m, n)
        :param y - ndarray of shape (m,1)
        :return tuple of derivatrives of cost function by b and w
        '''    
        b, w = params
        assert (w.shape == (1, self.n))                
        h_val = self.h(b, w, X)
        if  h_val.shape != (self.m, 1):
            print(f'h.shape = {h_val.shape}, but expected {(self.m, 1)}')
            raise Exception('Check assertion in J_derivative')

        dJ_b = (1 / m * (h_val - y)).sum()
        dJ_w = (1 / m * (h_val - y).T @ X).sum()
        
        return (dJ_b, dJ_w)

# Check cost function derivatives

In [None]:
np.random.seed(2020)
m = 10 
n = 1
X_check= np.random.randn(m,n)
y_check= np.random.randn(m,1)
b_check= np.random.randn()
w_check= np.random.randn(1,n)
params = b_check,w_check 
print(f'X= {X_check}, \ny= {y_check}, \nb= {b_check} \nw= {w_check}')

lin_reg_3 = Linear_Regression_3()
lin_reg_3.m = m 
lin_reg_3.n = n 
lin_reg_3.J_derivative(params, X_check, y_check)

# Develop gradient descent

In [None]:
class Linear_Regression_4():
    '''
    linear regression using gradient descent
    '''
    def __init__(self, max_iter = 1e5, alpha = 0.01, eps = 1e-10, verbose= 0):
        '''
        :param verbose: set 1 to display more details of J val changes
        '''
        self.max_iter = max_iter
        self.alpha = alpha
        self.eps = eps
        self.verbose = verbose       
        
    def h(self, b, w, X): 
        '''
        :param b -  float or ndarry of shape [m,1], m - number of samples
        :param w - ndarray of shape [1,m],  n - number of features
        :param X - ndarray of shape [m,n], m - number of samples, n - number of features
        '''
        assert (X.shape[1] == w.shape[1])

        h_res = b + w * X
        
        if h_res.shape != (X.shape[0],1):
            print('h.shape = {} but expected {}'.format (h_res.shape,  (self.m,1)))
            raise Exception('Check assertion in h')    
        return h_res

    def J(self, h, y):      
        '''
        :param h - ndarray of shape (m,1)
        :param y - ndarray of shape (m,1)
        :return expression for cost function 
        '''
        if h.shape !=y.shape:
            print('h.shape = {} does not match y.shape = {}.Expected {}'.format (h.shape, y.shape, (self.m,1)))
            raise Exception('Check assertion in J')   

        J_res = 1 / (2 * m) * np.sum((h - y) ** 2)

        return J_res
        
    def J_derivative(self, params, X, y): 
        '''
        :param params - tuple (b,w), where w is the 2d ndarry of shape (1,n), n- number of features 
        :param X- ndarray of shape (m, n)
        :param y - ndarray of shape (m,1)
        :return tuple of derivatrives of cost function by b and w
        '''
      
        b,w = params
        assert (w.shape == (1, self.n))                
        h_val = self.h(b, w, X)
        if  h_val.shape != (self.m, 1):
            print('h.shape = {}, but expected {}'.format (h_val.shape, (self.m, 1)))
            raise Exception('Check assertion in J_derivative')
        
        dJ_b = (1 / m * (h_val - y)).sum()
        dJ_w = (1 / m * (h_val - y).T @ X).sum()
        
        return (dJ_b, dJ_w)

    def fit(self, X, y):
        '''
        :param X - ndarray training set of shape [m,n], m - number of samples, n - number of features
        :param y - ndarray - 1d array 
        :return: True in case of successful fit 
        '''      
        if self.verbose: 
            print ('Running gradient descent with alpha = {}, eps= {}, max_iter= {}'.format(
                self.alpha, self.eps, self.max_iter))
        self.m, self.n = X.shape # number of samples, number of features  
        y = y.reshape(self.m, 1) # make it 2 d to make sure it corresponds to h_val
        b = 0 # init intercept with 0
        w = np.zeros(self.n).reshape(1,-1) # make sure it's shape is [1,n]
        params = (b, w)
        
        self.J_hist=[-1] # used for keeping J values. Init with -1 to avoid 0 at first iter
        continue_iter = True # flag to continue next iter (grad desc step)
        iter_number =0 # used for limit by max_iter

        while continue_iter:            
            dJ_b, dJ_w =  self.J_derivative(params, X, y)
            b = b - self.alpha * dJ_b
            w = w - self.alpha * dJ_w
            params = (b, w)
            
            # keep history of J values
            self.J_hist.append(self.J(self.h(b, w, X), y))
            if self.verbose:
                print ('b = {}, w= {}, J= {}'.format(b,w,self.J_hist[-1]))
            # check criteria of exit the loop (finish grad desc)
            if self.max_iter and iter_number> self.max_iter: # if max_iter is provided and limit succeeded
                continue_iter = False
            elif np.abs(self.J_hist[iter_number-1] - self.J_hist[iter_number])< self.eps: # if accuracy is succeeded
                continue_iter = False
            iter_number += 1
            
        # store the final params to further using 
        self.intercept_, self.coef_= params        
        return True

# Check gradient descent

In [None]:
np.random.seed(2021)
m = 10 
n = 1
X_check= np.random.randn(m,n)
y_check= np.random.randn(m,1)
print('X= {}, \ny= {}'.format(X_check, y_check))
lin_reg_4 = Linear_Regression_4(alpha = 1, max_iter = 5, verbose=1)
lin_reg_4.fit(X_check, y_check)

# Launch liner regression learning on real values.

In [None]:
class Linear_Regression():
    '''
    linear regression using gradient descent
    '''
    def __init__(self, max_iter = 1e5, alpha = 0.01, eps = 1e-10, verbose= 0):
        '''
        :param verbose: set 1 to display more details of J val changes
        '''
        self.max_iter = max_iter
        self.alpha = alpha
        self.eps = eps
        self.verbose = verbose       
        
    def h(self, b, w, X): 
        '''
        :param b -  float or ndarry of shape [m,1], m - number of samples
        :param w - ndarray of shape [1,m],  n - number of features
        :param X - ndarray of shape [m,n], m - number of samples, n - number of features
        '''
        assert (X.shape[1]== w.shape[1])

        h_res = b + w * X
        
        if h_res.shape != (X.shape[0],1):
            print(f'h.shape = {h_res.shape} but expected {(self.m,1)}')
            raise Exception('Check assertion in h')    
        return h_res

    def J(self, h, y):      
        '''
        :param h - ndarray of shape (m,1)
        :param y - ndarray of shape (m,1)
        :return expression for cost function 
        '''
        if h.shape != y.shape:
            print(f'h.shape = {h.shape} does not match y.shape = {y.shape}.Expected {(self.m, 1)}')
            raise Exception('Check assertion in J')   

        J_res = 1 / (2 * self.m) * np.sum((h - y) ** 2)
      
        return J_res
        
    def J_derivative(self, params, X, y):
        '''
        :param params - tuple (b,w), where w is the 2d ndarry of shape (1,n), n- number of features 
        :param X- ndarray of shape (m, n)
        :param y - ndarray of shape (m,1)
        :return tuple of derivatrives of cost function by b and w
        '''
      
        b, w = params
        assert (w.shape == (1, self.n))                
        h_val = self.h(b,w,X)
        if  h_val.shape != (self.m, 1):
            print(f'h.shape = {h_val.shape}, but expected {(self.m, 1)}')
            raise Exception('Check assertion in J_derivative')
                  
        dJ_b = (1 / self.m * (h_val - y)).sum()
        dJ_w = (1 / self.m * (h_val - y).T @ X).sum()
        
        return (dJ_b, dJ_w)

    def fit(self, X, y):
        '''
        :param X - ndarray training set of shape [m,n], m - number of samples, n - number of features
        :param y - ndarray - 1d array 
        :return: True in case of successful fit 
        '''      
        if self.verbose: 
            print(f'Running gradient descent with alpha = {self.alpha}, eps= {self.eps}, max_iter= {self.max_iter}')
        self.m, self.n = X.shape # number of samples, number of features  
        y = y.reshape(self.m, 1) # make it 2 d to make sure it corresponds to h_val
        b = 0 # init intercept with 0
        w = np.zeros(self.n).reshape(1,-1) # make sure it's shape is [1,n]
        params = (b, w)
        self.J_hist = [-1] # used for keeping J values. Init with -1 to avoid 0 at first iter
        continue_iter = True # flag to continue next iter (grad desc step)
        iter_number = 0 # used for limit by max_iter
        while continue_iter:
            dJ_b, dJ_w = self.J_derivative(params, X, y)
            b = b - self.alpha * dJ_b
            w = w - self.alpha * dJ_w
            params = (b, w)
            
            # keep history of J values
            self.J_hist.append(self.J(self.h(b, w, X), y))
            if self.verbose:
                print ('b = {}, w= {}, J= {}'.format(b, w, self.J_hist[-1]))
            # check criteria of exit the loop (finish grad desc)
            if self.max_iter and iter_number > self.max_iter: # if max_iter is provided and limit succeeded
                continue_iter = False
            elif np.abs(self.J_hist[iter_number-1] - self.J_hist[iter_number]) < self.eps: # if accuracy is succeeded
                continue_iter = False
            iter_number += 1

        # store the final params to further using 
        self.intercept_, self.coef_ = params        
        return True        
        
    def draw_cost_changes(self):        
        J_hist = self.J_hist[1:]
        plt.figure()
        plt.scatter(np.arange(0,len(J_hist)), J_hist, s=20, marker='.', c='b')
        plt.xlabel('Iterations')
        plt.ylabel('Cost function J value')
        title_str = f'Complited: {len(self.J_hist)-2}, alpha ={self.alpha}, max_iter={self.max_iter}, eps={self.eps}'
        # Note: len(J_hist)-2) due to first one is -1 (was not iteration), iter + 1  at the end  of the gradient loop
        plt.title(title_str)
 

    def predict(self, X): 
        '''
        :param X - ndarray of shape (?,n)
        :return 
        '''
        return self.h(self.intercept_, self.coef_, X)
        
   
    def score(self, X_test, y_test):
        '''
        :param X_test - ndarray testing set or any for prediction of shape [?,n], ? - number of samples, n - number of features
        :param y_test - ndarray - 1d array 
        :return R2 score of y_test and prediction for X_test
        '''
        z = self.predict(X_test)
        from sklearn.metrics import r2_score
        return (r2_score(y_test, z))

In [None]:
class Linear_Regression_for_mult_features():
    '''
    linear regression using gradient descent
    '''
    def __init__(self, max_iter = 1e5, alpha = 0.01, eps = 1e-10, verbose= 0):
        '''Linear_Regression_mult_feature
        :param verbose: set 1 to display more details of J val changes
        '''
        self.max_iter = max_iter
        self.alpha = alpha
        self.eps = eps
        self.verbose = verbose       
        
    def h(self, b, w, X): 
        '''
        :param b -  float or ndarry of shape [m,1], m - number of samples
        :param w - ndarray of shape [1,m],  n - number of features
        :param X - ndarray of shape [m,n], m - number of samples, n - number of features
        '''
        assert (X.shape[1]== w.shape[1])
        
        h_res = b + X @ w.T
        
        return h_res

    def J(self, h, y, params, X):      
        '''
        :param h - ndarray of shape (m,1)
        :param y - ndarray of shape (m,1)
        :return expression for cost function 
        ''' 
        
        b, w = params
        J_res = 1 / (2 * self.m) * np.sum((self.h(b, w, X) - y) ** 2)
      
        return J_res
        
    def J_derivative(self, params, X, y):
        '''
        :param params - tuple (b,w), where w is the 2d ndarry of shape (1,n), n- number of features 
        :param X- ndarray of shape (m, n)
        :param y - ndarray of shape (m,1)
        :return tuple of derivatrives of cost function by b and w
        '''
      
        b, w = params
        assert (w.shape == (1, self.n))                
        h_val = self.h(b, w, X)
                  
        dJ_b = (1 / self.m * (h_val - y)).sum()
        dJ_w = 1 / self.m * (h_val - y).T @ X
        
        return (dJ_b, dJ_w)

    def fit(self, X, y):
        '''
        :param X - ndarray training set of shape [m,n], m - number of samples, n - number of features
        :param y - ndarray - 1d array 
        :return: True in case of successful fit 
        '''      
        if self.verbose: 
            print(f'Running gradient descent with alpha = {self.alpha}, eps= {self.eps}, max_iter= {self.max_iter}')
        self.m, self.n = X.shape # number of samples, number of features  
        y = y.reshape(self.m, 1) # make it 2 d to make sure it corresponds to h_val
        b = 0 # init intercept with 0
        w = np.zeros(self.n).reshape(1,-1) # make sure it's shape is [1,n]
        params = (b, w)
        self.J_hist = [-1] # used for keeping J values. Init with -1 to avoid 0 at first iter
        continue_iter = True # flag to continue next iter (grad desc step)
        iter_number = 0 # used for limit by max_iter
        while continue_iter:
            dJ_b, dJ_w = self.J_derivative(params, X, y)
            b = b - self.alpha / self.m * dJ_b
            w = w - self.alpha / self.m * dJ_w                
            params = (b, w)
            
            # keep history of J values
            self.J_hist.append(self.J(self.h(b, w, X), y, params, X))
            if self.verbose:
                print ('b = {}, w= {}, J= {}'.format(b, w, self.J_hist[-1]))
            # check criteria of exit the loop (finish grad desc)
            if self.max_iter and iter_number > self.max_iter: # if max_iter is provided and limit succeeded
                continue_iter = False
            elif np.abs(self.J_hist[iter_number-1] - self.J_hist[iter_number]) < self.eps: # if accuracy is succeeded
                continue_iter = False
            iter_number += 1

        # store the final params to further using 
        self.intercept_, self.coef_ = params        
        return True        
        
    def draw_cost_changes(self):        
        J_hist = self.J_hist[1:]
        plt.figure()
        plt.scatter(np.arange(0,len(J_hist)), J_hist, s=20, marker='.', c='b')
        plt.xlabel('Iterations')
        plt.ylabel('Cost function J value')
        title_str = f'Complited: {len(self.J_hist)-2}, alpha ={self.alpha}, max_iter={self.max_iter}, eps={self.eps}'
        # Note: len(J_hist)-2) due to first one is -1 (was not iteration), iter + 1  at the end  of the gradient loop
        plt.title(title_str)
 

    def predict(self, X): 
        '''
        :param X - ndarray of shape (?,n)
        :return 
        '''
        return self.h(self.intercept_, self.coef_, X)
        
   
    def score(self, X_test, y_test):
        '''
        :param X_test - ndarray testing set or any for prediction of shape [?,n], ? - number of samples, n - number of features
        :param y_test - ndarray - 1d array 
        :return R2 score of y_test and prediction for X_test
        '''
        z = self.predict(X_test)
        from sklearn.metrics import r2_score
        return (r2_score(y_test, z))

# Check results

In [None]:
print('X_train.shape = ', X_train.shape)
print('y_train.shape = ', y_train.shape)
print (f'X_train = \n{X_train[:5,:]}')
lin_reg = Linear_Regression(alpha= 0.01, verbose=0, eps=1e-8)
lin_reg.fit(X_train, y_train)
lin_reg.draw_cost_changes()
print('R2 Score =', lin_reg.score(X_test, y_test))
print(f'b: {lin_reg.intercept_}, w = {lin_reg.coef_}')

# Draw scatter and prediction for one feature

In [None]:
if X_train.shape[1] > 1:
    raise Exception('Select single feature to plot')
plt.figure()
plt.scatter(X_train, y_train)
x_line = np.array([np.min(X_train), np.max(X_train)])
z_line = lin_reg.predict(x_line.reshape(-1,1))
plt.plot(x_line, z_line, '-', c='red')

In [None]:
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [None]:
lin_reg = Linear_Regression(alpha= 0.01, verbose=0, eps=1e-8)
lin_reg.fit(X_train_scaled, y_train)
print ('R2 Score =', lin_reg.score(X_test_scaled, y_test))
lin_reg.draw_cost_changes()
print (f'b: {lin_reg.intercept_}, w = {lin_reg.coef_}')

# Compare with sklearn

In [None]:
from sklearn.linear_model import LinearRegression
lin_reg_sklearn = LinearRegression().fit(X_train_scaled, y_train)
print('coef_', lin_reg_sklearn.coef_)
print ('R2 training Score =', lin_reg_sklearn.score(X_train_scaled, y_train))
print ('R2 Score =', lin_reg_sklearn.score(X_test_scaled, y_test))

# Run on real data

In [None]:
np.random.seed = 2021

X, y = load_boston(return_X_y=True)

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=2018)
print ('X_train.shape = ', X_train.shape)
print ('y_train.shape = ', y_train.shape)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [None]:
# linear regression for mmultiple features with own implementaion
lin_reg = Linear_Regression_for_mult_features(alpha= 0.1, verbose=0, eps=1e-5, max_iter=100000)
lin_reg.fit(X_train_scaled, y_train)
lin_reg.draw_cost_changes()
print ('R2 training Score =', lin_reg.score(X_train_scaled, y_train))
print ('R2 Score =', lin_reg.score(X_test_scaled, y_test))
print (f'b: {lin_reg.intercept_}, w = {lin_reg.coef_}') 