# Linear Regression

In [None]:
#### links
# https://medium.com/analytics-vidhya/linear-regression-using-pandas-numpy-for-beginners-in-data-science-fe57157ed93d
# https://medium.com/berk-hakbilen/regression-in-machine-learning-90a5271a5a12
# https://builtin.com/data-science/train-test-split
# https://realpython.com/train-test-split-python-data/
# https://www.youtube.com/watch?v=nk2CQITm_eo
# https://www.youtube.com/watch?v=R15LjD8aCzc
# https://www.youtube.com/watch?v=i_LwzRVP7bg

In [None]:
%matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

from typing import Tuple, List

## Reading the data

In [None]:
# data source: https://www.kaggle.com/datasets/quantbruce/real-estate-price-prediction

path = './data/Real estate.csv'
full_data = pd.read_csv(path, index_col='No')
full_data.head()

In [None]:
# we drop the column 'X1 transaction date'
full_data.drop(labels='X1 transaction date', axis=1, inplace=True)	#inplace=True - modifies the original object

#### solution for the requirement at the bottom of the file..
#full_data.drop(labels=['X5 latitude', 'X6 longitude'], axis=1, inplace=True)

## Selecting subsets of training and testing

The initial dataset is divided in two: 70% is the training of the model - the establishment of weight $\theta$, and the remaining is for testing, in order to see how generalized the model is.

As the initial order of the data may disadvantage the model (in the data above, the first records are the oldest sales; the most recent ones are the last in the file), we decide to shuffle the data beforehand and then divide it

In [None]:
from tabulate import tabulate

# randomly shuffle the data
full_data = full_data.sample(frac=1)
# get the percentage of train data ==> test data = 1 - train data
percent_train: float = 0.7
length_of_train: int = int(len(full_data) * percent_train)

# split the data into train and test
train: np.array = np.array(full_data[:length_of_train])
test: np.array = np.array(full_data[length_of_train:])

assert isinstance(train, np.ndarray), 'Train set should be an array'
assert isinstance(test, np.ndarray), 'Test set should be an array'
assert len(train) + len(test) == len(full_data)
assert train.shape[1] == test.shape[1] == len(full_data.columns), 'The number of columns should be the same'


print(len(full_data), "=", len(train), "+", len(test), "(length of full_data = length of train + length of test)\n")

print("--> full_data[4]:\n", tabulate([full_data.iloc[4]], headers='keys', tablefmt='plain-text', showindex=False))
print("--> train[4]:\n", train[4], '\n\n')

print("--> full_data[{}]=".format(length_of_train+3))
print(tabulate([full_data.iloc[length_of_train+3]], headers='keys', tablefmt='plain-text', showindex=False))
print("--> test[3]:\n", test[3])

####links
#https://www.statology.org/pandas-train-test/
#https://stackoverflow.com/questions/24147278/how-do-i-create-test-and-train-samples-from-one-dataframe-with-pandas

In [None]:
# split the train/test data into x_train/x_test and their 'y' validation (x columns are input, y columns are output)
# in our case the x columns are those with info from which we deduce the price (y column)
x_train, y_train = train[:, :-1], train[:, -1]
x_test, y_test = test[:, :-1], test[:, -1]
# y columns have a shape of (n,) so we need to reshape them to (n, 1) to be able to use them in the model
y_train = np.transpose([y_train])
y_test = np.transpose([y_test])

assert x_train.shape[0] == train.shape[0]
assert x_test.shape[0] == test.shape[0]
assert y_train.shape == (x_train.shape[0], 1), f'The vector y_train is a column vector'
assert y_test.shape == (x_test.shape[0], 1), f'The vector y_test is a column vector'


print("-> train:\n", train[1])
print("-> x_train + y_train:\n", x_train[1], y_train[1])
print("-> x_train.shape + y_train.shape: ", x_train.shape, y_train.shape)
print()
print("-> test:\n", test[1])
print("-> x_test + y_test:\n", x_test[1], y_test[1])
print("-> x_test.shape + y_test.shape: ", x_test.shape, y_test.shape)

The functions below are used for the convenient transformation of the data:

For the scaling of the values from the training set between 0 and 1 it is needed to know the min and max of every attribute. For a matrix of form:
$$
\mathbf{X} = 
\begin{pmatrix}
x_0^{(1)} & x_1^{(1)} & x_2^{(1)} & \dots & x_n^{(1)}
\\
x_0^{(2)} & x_1^{(2)} & x_2^{(2)} & \dots & x_n^{(2)}
\\
\vdots & \vdots & \vdots	& \vdots & \vdots
\\
x_0^{(m)} & x_1^{(m)} & x_2^{(m)} & \dots & x_n^{(m)}
\end{pmatrix}
$$
the next are receieved: the minimum and maximum of every column $min_i$, $max_i$, from every value the min of its column is extracted and the value is divided by the difference between the max and the min of the same column. it is important to notice the fact that a column may be constant(min and max are both 1 in column 0 in the design matrix).

## Preprocessing functions

In [None]:
def get_min_max_cols(mat: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    """For a given matrix, calculate and return the maximums and minimums of each column.
    param mat: matrix with at least two rows
    return: minimums and maximums of columns."""
    # mins, maxes = [min(i) for i in mat.T], [max(i) for i in mat.T]
    mins, maxes = mat.min(axis=0, keepdims=True), mat.max(axis=0, keepdims=True) 
    # it is recommended to return the minimums and maximums as a matrix with one line.
    assert mins.shape == maxes.shape == (1, mat.shape[1])
    return mins, maxes

The design matrix has the value 1 on the first column to allow a free term. the rest of the columns are the read data.

In [None]:
def get_design_matrix(mat: np.ndarray) -> np.ndarray:
    """The function creates a design matrix from a given matrix. The design matrix has the same number of 
    rows as the given one, but with a column of ones added(at the beginning)"""
    result: np.ndarray=np.insert(mat, 0, 1, axis=1)
    assert result.shape == (mat.shape[0], mat.shape[1]+1)
    assert np.all(result[:, 0] == 1)
    return result

Implementing the scaling of data in the interval $[0, 1]$:

In [None]:
def scale_values(mat: np.ndarray, min_cols: np.ndarray, max_cols: np.ndarray) -> np.ndarray:
    """Being given the minimums and maximums of each column, scale the values of the given matrix between 0 and 1
    the formula is: (x - min) / (max - min). The minimums and maximums are calculated for each column separately."""
    
    assert isinstance(mat, np.ndarray) and mat.ndim == 2, 'mat must be a matrix'
    assert mat.shape[0] > 1, 'at least two columns are required'
    assert mat.shape[1] == min_cols.shape[-1] == max_cols.shape[-1]
    
    # the case where a column is constant is treated, i.e. the minimum and maximum on it coincide.
    # these columns will not make the division, as it would result in a division by 0
    
    result=mat;
    for i_column in range(result.shape[1]):
        if result[:, i_column].min() == result[:, i_column].max():
            continue
        result[:, i_column]=(result[:, i_column]-min_cols[0, i_column])/(max_cols[0, i_column]-min_cols[0, i_column])
        
    assert result.shape == mat.shape, 'Forma matricei se pastreaza'
    return result

In [None]:
# code testing

_mat: np.ndarray = np.random.randn(1000, 1000) * 50
min_mat, max_mat = get_min_max_cols(_mat)
_mat_scaled: np.ndarray = scale_values(_mat, min_mat, max_mat)

epsilon: float = 1e-5  # a very small natural number; the calculations in floating point might get to a point where the result is not exactly 0 or 1

assert np.min(_mat_scaled) >= -epsilon, f'The matrix is not scaled in [0, 1]: its minimum is {np.min(_mat_scaled)}'
assert np.max(_mat_scaled) <= 1 + epsilon, f'The matrix is not scaled in [0, 1]: its maximum is {np.max(_mat_scaled)}'

_mat = get_design_matrix(_mat)  # add a column of ones
min_mat, max_mat = get_min_max_cols(_mat)
_mat_scaled = scale_values(_mat, min_mat, max_mat)

print(_mat_scaled)
print(np.min(_mat_scaled), np.max(_mat_scaled))

assert np.min(_mat_scaled) >= -epsilon, f'The matrix is not scaled in [0, 1]: its minimum is {np.min(_mat_scaled)}'
assert np.max(_mat_scaled) <= 1 + epsilon, f'The matrix is not scaled in [0, 1]: its maximum is {np.max(_mat_scaled)}'


The training set is preprocessed: design matrix -> scaling:

In [None]:
# getting the design matrix
x_train: np.ndarray = get_design_matrix(x_train)

# we calculate the minimums and maximums on column, from the training set
# these will be used for transforming the data from the test set
x_train_min, x_train_max = get_min_max_cols(x_train)

# we scale the data in the interval [0, 1], using scale_values
x_train_scaled: np.ndarray = scale_values(x_train, x_train_min, x_train_max)

epsilon: float = 1e-5  # a very small natural number

assert np.min(x_train_scaled) >= -epsilon, f'The matrix is not scaled in [0, 1]: its minimum is {np.min(_mat_scaled)}'
assert np.max(x_train_scaled) <= 1 + epsilon, f'The matrix is not scaled in [0, 1]: its maximum is {np.max(_mat_scaled)}'

For the test set the same type of preprocessing is done, with the observation that for scaling the values from `x_train_min` and `x_train_max` will be used:

In [None]:
x_test: np.ndarray = get_design_matrix(x_test)

x_test_scaled: np.ndarray = scale_values(x_test, x_train_min, x_train_max)

print(f'The minimum values from every column of the testing set should not be very much different from 0, except the first column')
print(f'all column minimums for the testing set: {np.min(x_test_scaled)}')

print(f'The maximum values from every column of the testing set should not be very much different from 1, except from the first column, which should actually be 1')
print(f'all column maximums for the testing set: {np.max(x_test_scaled)}')

## Linear regression model

The model of the prediction is:
$$
h_\theta(X) = X \cdot \theta
$$
where $X$ is a design matrix and $\theta$ is a weight vector. 

In [None]:
def model_predict(theta: np.ndarray, x: np.ndarray) -> np.ndarray:
    """Makes predictions for the values from the given matrix x, using the given theta.
    :param theta: column matrix of weights
    :param x: design matrix. every line contains the values of the atributes necessary for a registration.
    :return: column matrix of predictions, with the same number of rows as :param x:."""
    # numarul de trasaturi din x este egal cu numarul de coeficienti din theta
    assert x.shape[1] == theta.shape[0]
    # vectorul theta e vector coloana
    assert theta.shape[1] == 1
    result: np.ndarray = np.dot(x, theta)
    assert result.shape == (x.shape[0], 1)
    return result

In [None]:
# verifying

np.random.seed(100)

_x: np.ndarray = np.random.rand(3, 2)
_y: np.ndarray = np.random.rand(3, 1)
_theta: np.ndarray = np.ones((2, 1))

assert np.allclose(model_predict(_theta, _x), np.array([[0.82177433],
       [1.26929372],
       [0.12628798]]))

## Loss function

The loss function measures how different are the predicitons from the known values (ground truth). Here we use as function of error half of the mean squared error (https://en.wikipedia.org/wiki/Mean_squared_error):

In [None]:
from sklearn.metrics import mean_squared_error

def loss(theta: np.ndarray, x: np.ndarray, y: np.ndarray) -> float:
    """Error function used for gradient descent.
    :param theta: column matrix of weights
    :param x: design matrix. every line contains the values of the atributes necessary for a registration.
    :param y: output values, corresponding to every registration from x
    :return: scalar value, representing the error"""
    
    m: int = x.shape[0]
    y_hat: np.ndarray = model_predict(theta, x)
    # result = mean_squared_error(y_hat, y)
    result: np.float64 = 1/(2*m) * np.sum((y_hat - y)**2)   # why is it not 1/m instead?
    return result

# https://datagy.io/mean-squared-error-python/

In [None]:
np.random.seed(100)

_x: np.ndarray = np.random.rand(3, 2)
_y: np.ndarray = np.random.rand(3, 1)
_theta: np.ndarray = np.ones((2, 1))

assert np.allclose(loss(_theta, _x, _y), 0.03659284388808936)

## The gradient descent and training calculation

In [None]:
def gradient(theta: np.ndarray, x: np.ndarray, y: np.ndarray) -> np.ndarray:
    """Calculates the gradient of the loss function, for the given theta, x and y.
    :param theta: column matrix of weights
    :param x: design matrix with the entry values
    :param y: output values
    :return: vector of output values"""
    
    assert theta.shape[0] == x.shape[1]
    m: int = x.shape[0]
    y_hat: np.ndarray = model_predict(theta, x)
    assert y_hat.shape == (m, 1)
    grad: np.ndarray = 1/m * np.dot(x.T, (y_hat - y))
    assert grad.shape == theta.shape
    return grad

# https://pythonocean.com/blogs/linear-regression-using-gradient-descent-python

In [None]:
def train(x: np.ndarray, y: np.ndarray, alpha:float=0.1, max_iters = 10000, verbose=True) -> Tuple[np.ndarray, List[float]]:
    """Trains the linear regression model on the given data x and y. Returns a list of error values and the final theta.
    :param x: design matrix with entry values
    :param y: output values
    :param alpha: learning rate, implicitly value is 0.1
    :param max_iters: the maximum number of allowed iterations. if the treshold is reached, the training stops.
    :param verbose: if True, prints the error after every 1000 iterations: the number of iterations and the error value.
    :return: a tuple containing the final weights(theta) and a list of error values"""
    
    eps: float = 1e-6
    assert x.shape[0] == y.shape[0]
    assert np.all(x[:, 0] == 1), 'First column of x has to be all ones'
    m, n = x.shape
    theta: np.ndarray = np.zeros((n, 1))
    assert theta.shape == (n, 1)
    assert np.all(theta==0)
    
    losses: list = []
    losses.append(loss(theta, x, y))
    iters: int = 0
    while True:
        iters += 1
        theta = theta - alpha * gradient(theta, x, y)
        current_loss = loss(theta, x, y)
        losses.append(current_loss)
        if (np.abs(losses[-1] - losses[-2]) < eps) or (iters > max_iters):
            break
        if np.isnan(current_loss) or np.isinf(current_loss):
            break
        if verbose and iters % 1000 == 0:
            print(f'Iteration {iters}; loss = {current_loss}')
    return theta, losses

In [None]:
theta, losses = train(x_train_scaled, y_train, alpha=0.5)

In [None]:
print(f'losses={losses}')

In [None]:
plt.plot(losses)
plt.xlabel('Epoch')
plt.ylabel('Error')
plt.show()

## Testing the model

In [None]:
# mean squared error function

def mse(y_true: np.ndarray, y_hat: np.ndarray) -> float:
    """Calculates mean squared error for the vectors y_true and y_hat.
    :param y_true: vector of true values
    :param y_hat: vector of predicted values
    :param return: mean squared error"""
    
    return 1/y_true.shape[0] * np.sum((y_true - y_hat)**2)

### Testing the training set

In [None]:
y_hat_train: np.ndarray = model_predict(theta, x_train_scaled)
    
print(f'Mean squared error for the training set: {mse(y_hat_train, y_train)}')

In [None]:
print(f'Actual and predicted values, training set: {list(zip(y_train, y_hat_train))}')

### Testing the test set

In [None]:
y_hat_test: np.ndarray = model_predict(theta, x_test_scaled)
    
print(f'Mean squared error for the testing set: {mse(y_hat_test, y_test)}')

In [None]:
print(f'actual and predicted values, testing set: {list(zip(y_test, y_hat_test))}')

Requirements:
1. Find a value for alpha (learning rate) for which the model produces a list of ascending values for the loss function. Make a plot of the loss function 
2. Find a value for alpha > 0 for which the algorithm doesnt stop in 'max_iters' iterations. Make a plot of the loss function.

In [None]:
# Solution for requirement 1
theta, losses = train(x_train_scaled, y_train, alpha = -0.0004, max_iters = 7300, verbose = True)
theta

In [None]:
plt.plot(losses)
plt.show()

In [None]:
losses  

In [None]:
# Solution for requirement 2
theta, losses = train(x_train_scaled, y_train, alpha=0.25, max_iters = 10000, verbose = True)
losses 

In [None]:
plt.plot(losses)
plt.show()

## The normal equation 

For linear regression the weights $\theta$ can be found by solving the following equation:
$$
{\theta}^{(min)} = \left(\mathbf{X}^t \mathbf{X}\right)^{-1}\mathbf{X}^t\cdot \mathbf{y}
$$
where $\mathbf{X}$ is the design matrix, $\mathbf{y}$ is the vector of ground truth values (known values).
For determining the pseudoinverse of $\left(\mathbf{X}^t \mathbf{X}\right)^{-1}\mathbf{X}^t$ of matrix $\mathbf{X}$ we use the function `np.linalg.pinv` from [NumPy](https://numpy.org/doc/stable/reference/generated/numpy.linalg.pinv.html).

Determine the weights vector using the pseudoinverse and compare the weights vector with the one obtained using gradient descent, for an $\alpha$ which makes the loss function to decrease.

Obs: in the normal equation it is not needed to have the data scaled/normalized, as opposed to the gradient descent algorithm. for comparison purposes however, use the scaled matrix `X_train_scaled`.

In [None]:
theta_normal: np.ndarray = np.linalg.inv(x_train_scaled.T @ x_train_scaled) @ x_train_scaled.T @ y_train
print(f'The weights vector determined by the normal equation method is:\n{theta_normal}')

In [None]:
# we determine a weights vector with the gradient descent method, using the training set
theta_gradient, _ = train(x_train_scaled, y_train, alpha=0.05, max_iters=10000, verbose=False)
print(f'The weights vector determined by the gradient descent method is:\n{theta_gradient}')

In [None]:
print(f'The distance between the two determined vectors (by using the two previous methods): {np.linalg.norm(theta_gradient - theta_normal)}')

## The selection of attributes
In the initial dataset the latitude and logitude are present as columns x5 and x6, being the last two columns in x_train and x_test:

In [None]:
full_data.head()

We ask outselves if removing the two column improves the performance of the prediction model. Implement and execute the following steps:
1. Delete the last two columns from x_train and x_test;
2. Re-call the preprocessing functions for the new x_train and x_test;
3. Re-train the model using the gradient descent algorithm and calculate the loss function on the training and testing sets. Compare the values obtained with the ones obtained in the initial case;