# Import libraries

In [None]:
import pandas as pd
import numpy as np
import math
import matplotlib.pyplot as plt
import itertools as it
import time

# Train, test, verification split

In [None]:
def train_test_verification_split(X, y, split_tr, split_v, split_te):    
    # define the data split
    total_count = X.shape[0]
    train_split = int(total_count * split_tr)
    ver_split = int(total_count * split_v) + train_split
    test_split = int(total_count * split_te) + ver_split
    return X[:train_split], y[:train_split], X[train_split:ver_split], y[train_split:ver_split], X[ver_split:test_split], y[ver_split:test_split]

# Read the data

### Run this for the toy problem

In [None]:
toy_problem = True
print("Using the toy problem...")
f = lambda x: np.sin(0.9*x)
#f = lambda x: np.exp(x)
#f = lambda x: 2*x+4

X_data = np.linspace(-5,5,100).reshape(-1,1)
y_data = f(X_data)

df = pd.DataFrame(np.c_[X_data, y_data])
# shuffle data
df = df.sample(frac=1).reset_index(drop=True)

# split off the target variable
X, y = df.iloc[:,:-1], df.iloc[:,-1]
# normalise data
X = (X - X.min()) / (X.max() - X.min())

print("Splitting data...")
X_train, y_train, X_ver, y_ver, X_test, y_test = train_test_verification_split(X, y, 0.6, 0.2, 0.2)

plt.plot(X, y, "r+")
plt.title("The whole dataset")
plt.xlabel("X")
plt.ylabel("y")
plt.show()

### Run this for the SARCOS dataset

In [None]:
toy_problem = False
print("Using the SARCOS dataset...")
# read in csv file
df = pd.read_csv("sarcos_inv.csv", header=None)
# shuffle data
df = df.sample(frac=1).reset_index(drop=True)

# split off the target variable
X, y = df.iloc[:,:-1], df.iloc[:,-1]
# normalise data
X = (X - X.min()) / (X.max() - X.min())

print("Splitting data...")
X_train, y_train, X_ver, y_ver, X_test, y_test = train_test_verification_split(X, y, 0.6, 0.2, 0.2)

# Utility class

This function computes the root mean squared error, given the actual data and the predicted data of the target variable.

In [None]:
class util():
    @staticmethod
    def score(actual, prediction):
        return math.sqrt(np.mean((actual - prediction)**2))
    
    @staticmethod
    def squared_error(actual, prediction):
        return np.sum((actual - prediction)**2)
    
    @staticmethod
    def mean_absolute_error(actual, prediction):
        return np.sum(np.fabs(actual - prediction)) / actual.shape[0]

# Linear regression

This is my implementation of Linear regression.

In [None]:
class linear_regression():
    def __init__(self, alpha=0.1, n_iter=100000, convergence_threshold=1e-12):
        self.alpha = alpha
        self.n_iter = n_iter
        self.convergence_threshold = convergence_threshold
    
    def prepend_ones(self, X):
        return np.c_[np.ones(X.shape[0]), self.normalise(X)]
        
    def normalise(self, X):
        return (X - np.mean(X, 0)) / np.std(X, 0)
        
    def fit(self, X_train, y_train):
        # initialise variables
        self.X_train, self.y_train = X_train, y_train
        self.n_exemplars, self.n_features = X_train.shape[0], X_train.shape[1]
        self.params = np.zeros((self.n_features + 1, 1))
        
        # redefine X_train and y_train for linear regression
        self.X_train = self.prepend_ones(self.X_train)
        self.y_train = self.y_train[:, np.newaxis]

        # iteratively redefine the parameters to get optimal paramters
        for i in range(self.n_iter):
            
            def run_iter(w_current):
                converged = False
                E_current = self.cost((self.X_train @ w_current - self.y_train))

                grad_w = self.X_train.T @ (self.X_train @ self.params - self.y_train)
                w_new = self.params - (self.alpha/self.n_exemplars) * grad_w
                
                E_new = self.cost((self.X_train @ w_new - self.y_train))

                if(E_new > E_current):
                    self.alpha = self.alpha*0.5
                    w_new, converged = run_iter(w_current) 
                    
                if(np.fabs(E_new - E_current) < self.convergence_threshold):
                    print("Converged!")
                    converged = True
            
                w_current = w_new
                return w_current, converged
                
            self.params, converged = run_iter(self.params)
            
            if converged:
                break;
          
    def predict(self, X):
        y_pred = self.prepend_ones(X) @ self.params
        return y_pred.flatten()
    
    def cost(self, prediction):
        return np.sum((prediction - self.y_train)**2) / (2 * self.y_train.shape[0])

In [None]:
print("This will only take a few seconds...")
start_time = time.time()

lr_regressor = linear_regression(alpha=0.1, n_iter=100000)
lr_regressor.fit(X_train, y_train)

y_pred = lr_regressor.predict(X_test)
elapsed_time = time.time() - start_time

print("Evaluating RMSE...\n")
rmse_test = util.score(y_test, y_pred)

print("The RMSE for test set is {}".format(rmse_test))
print("The MAE for test set is {}".format(util.mean_absolute_error(y_test, y_pred)))
print("Time taken:", elapsed_time)

# Test time taken with no convergence check

In [None]:
import time
start_time = time.time()

lr_regressor = linear_regression(alpha=0.1, n_iter=100000, convergence_threshold=-1)
lr_regressor.fit(X_train, y_train)

y_pred = lr_regressor.predict(X_test)
elapsed_time = time.time() - start_time

print("Evaluating RMSE...\n")
rmse_test = util.score(y_test, y_pred)

print("The RMSE around test set is {}".format(rmse_test))
print("Time taken:", elapsed_time)

# Test the initial learning rates on the verification set

In [None]:
for i in [0.00001, 0.001, 0.01, 0.03, 0.1, 0.3]:
    import time
    start_time = time.time()

    lr_regressor = linear_regression(alpha=0.1, n_iter=100000)
    lr_regressor.fit(X_train, y_train)

    y_pred = lr_regressor.predict(X_ver)
    elapsed_time = time.time() - start_time

    print("Evaluating RMSE...\n")
    rmse_ver = util.score(y_ver, y_pred)

    print("The RMSE around test set is {}".format(rmse_ver))
    print("Time taken:", elapsed_time)

# Plot to check that KNN works for the toy problem

In [None]:
if toy_problem:
    plt.plot(X_test, y_test, "r+")
    plt.title("The actual data")
    plt.show()

    plt.plot(X_test, y_pred, "r+")
    plt.title("The predicted data")
    plt.axis([0,1,-1,1])
    plt.show()
    
    plt.plot(X_test, y_test, "b+")
    plt.plot(np.sort(X_test.values.flatten()), y_test.values[np.argsort(X_test.values.flatten())], "b--", alpha=0.3, label="actual data")
    plt.plot(X_test, y_pred, "r+")
    plt.plot(np.sort(X_test.values.flatten()), y_pred[np.argsort(X_test.values.flatten())], "r--", alpha=0.3, label="predicted data")
    plt.title("Actual against predicted data")
    plt.xlabel("Independent variable X")
    plt.ylabel("Target variable y")
    leg = plt.legend()
    for lh in leg.legendHandles: 
        lh.set_alpha(1)
    plt.show()

    plt.plot(np.sort(X_test.values.flatten()), y_test.values[np.argsort(X_test.values.flatten())], "b+")
    plt.plot(np.sort(X_test.values.flatten()), y_test.values[np.argsort(X_test.values.flatten())], "b--", alpha=0.3, label="actual data")
    plt.plot(X_test, np.full(y_test.shape[0], np.mean(y_train.values)), "g+")
    plt.plot(np.sort(X_test.values.flatten()), np.full(y_test.shape[0], np.mean(y_train.values)), "g--", alpha=0.3, label="mean data")
    plt.title("Actual against mean data")
    plt.xlabel("Independent variable X")
    plt.ylabel("Target variable y")
    leg = plt.legend()
    for lh in leg.legendHandles: 
        lh.set_alpha(1)
    plt.show()
    plt.show()