# Regularized linear regression

## Synthetic returns

In [None]:
import numpy as np
from sklearn.model_selection import train_test_split
from matplotlib import pyplot as plt 
import sys

In [None]:
def gen_data(n_feat, n_beta, n_obs, rho, noise, test_size):
    
    # Correlated normal random variables (ad-hoc)
    corr_mat = np.ones([n_feat, n_feat]) * rho
    np.fill_diagonal(corr_mat, 1)
    C = np.linalg.cholesky(corr_mat)
    X = np.matmul(np.random.normal(size=[n_obs, n_feat]), np.transpose(C))
    
    # Linearly dependent observations
    beta = np.random.binomial(1, np.min([1.0, n_beta/n_feat]), n_feat) 
    beta = beta / np.sum(beta)
    eps = noise * np.random.normal(size=[n_obs])
    y = np.matmul(X, beta) + eps
    
    # Train and test data
    x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=test_size, shuffle=False)
    
    return  x_train, x_test, y_train, y_test, beta

In [None]:
n_feat = 2
n_beta = 1
n_obs = 60*7
rho = 0.0
noise = 1.0
test_size = 0.2

np.random.seed(123)
x_train, x_test, y_train, y_test, beta_true = gen_data(n_feat, n_beta, n_obs, rho, noise, test_size)

In [None]:
plt.title("Target returns") 
plt.xlabel("time t") 
plt.ylabel("r_t") 
plt.plot(range(y_train.size), y_train, label='train') 
plt.plot(range(y_train.size, y_train.size + y_test.size), y_test, label='test') 
plt.legend()
plt.show()

In [None]:
plt.title("Feature variables") 
plt.xlabel("time t") 
plt.ylabel("x_t") 
plt.plot(np.concatenate((x_train, x_test)), label='train') 
plt.show()

## Ordinary least squares

In [None]:
def beta_closedform(X, y):
    Xtr = np.transpose(X)
    XtrX = np.matmul(Xtr, X)
    if np.linalg.det(XtrX) == 0.0:
        sys.exit('Matrix is not invertible')
    beta = np.matmul(np.linalg.inv(XtrX),  np.matmul(Xtr, y)) 
    return beta

def rmse(y_true, y_pred):
    return np.sqrt(np.mean((y_true - y_pred)**2))

In [None]:
n_feat = 100
n_beta = 10
n_obs = 60*7
rho = 0.0
noise = 1.0
test_size = 0.2

np.random.seed(123)
x_train, x_test, y_train, y_test, beta_true = gen_data(n_feat, n_beta, n_obs, rho, noise, test_size)

In [None]:
beta_hat = beta_closedform(x_train, y_train)
loss_train = rmse(y_train, np.matmul(x_train, beta_hat)) 
loss_test = rmse(y_test, np.matmul(x_test, beta_hat)) 

plt.title("OLS beta parameters") 
plt.xlabel("train") 
plt.ylabel("true") 
plt.plot(beta_hat, beta_true,"ob") 
plt.show()

print("Train RMSE: \t %.4f\nTest RMSE: \t %.4f" % (loss_train, loss_test))

In [None]:
plt.plot(beta_true, "ob")
plt.show()

## Average RMSE for different feature subsets

In [None]:
n_feat = 100
n_beta = 10
n_obs = 60*7
rho = 0.15
noise = 1.0
test_size = 0.2
n_seed = 10

res_train = np.zeros(shape=[n_seed, n_feat-1])
res_test = np.zeros(shape=[n_seed, n_feat-1])

for s in range(n_seed):
    
    np.random.seed(s)
    x_train, x_test, y_train, y_test, beta_true = gen_data(n_feat, n_beta, n_obs, rho, noise, test_size)
    
    for n in range(n_feat-1):
        nc = n + 1
        beta_hat = beta_closedform(x_train[:,:nc], y_train)
        loss_train = rmse(y_train, np.matmul(x_train[:,:nc], beta_hat)) 
        loss_test = rmse(y_test, np.matmul(x_test[:,:nc], beta_hat)) 
        res_train[s, n] = loss_train
        res_test[s, n] = loss_test
        
y_train = np.nanmean(res_train, axis=0)
y_test = np.nanmean(res_test, axis=0)
x = range(1, n_feat)

plt.title("Average RMSE on train and test samples") 
plt.xlabel("number of features") 
plt.ylabel("RMSE") 
plt.plot(x, y_train, label='train') 
plt.plot(x, y_test, label='test') 
plt.legend()
plt.show()

## Regularized linear regression with TensorFlow

In [None]:
import tensorflow as tf

In [None]:
def linear_regression(X, beta):
    return tf.matmul(X, beta)

In [None]:
def prediction_error(y_pred, y_target):
    return tf.reduce_mean(tf.square(y_target - y_pred))

In [None]:
def regularization(W, reg_type):
    if reg_type == "OLS":
        loss = 0.0
    elif reg_type == 'LASSO':
        loss = tf.reduce_sum(tf.abs(W))
    elif reg_type == 'Ridge':
        loss = tf.reduce_sum(tf.square(W))
    else:
        print('Invalid regression_type parameter value', file=sys.stderr)
    return loss

In [None]:
def make_train_model(X, y, X_test, y_test, learning_rate, epochs, reg_type="OLS", lbd=0.0, log=True):

    # Change tensors format
    X, y, X_test, y_test = [data.astype('float32') for data in [X, y, X_test, y_test]]
    y = y.reshape([-1,1])
    y_test = y_test.reshape([-1,1])
    
    # Number of features
    num_features = np.size(X,1)
    
    # Define variable
    beta = tf.Variable(tf.ones([num_features, 1])/num_features, name="beta") 
    
    # Set optimizer
    optimizer = tf.optimizers.SGD(learning_rate)
    
    ## Loop
    for epoch in range(epochs):
        
        # Wrap computation inside a GradientTape for automatic differentiation
        with tf.GradientTape() as g:
            y_pred = linear_regression(X, beta)
            loss_train = prediction_error(y_pred, y) + lbd * regularization(beta, reg_type)
        
        # Compute gradients
        gradients = g.gradient(loss_train, [beta])

        # Update beta following gradients
        optimizer.apply_gradients(zip(gradients, [beta]))
        
        # Log
        if (log & (epoch % 10 == 0)):
            y_pred = linear_regression(X_test, beta)
            loss_test = prediction_error(y_pred, y_test) + pen_param * regularization(beta, reg_type)
            print("At epoch %d: \t Train loss: %.4f \t Test loss: %.4f" % (epoch, float(loss_train), float(loss_test)))
    
    # Compute last loss values
    y_pred = linear_regression(X, beta)
    loss_train = prediction_error(y_pred, y) + lbd * regularization(beta, reg_type)
    y_pred = linear_regression(X_test, beta)
    loss_test = prediction_error(y_pred, y_test) + pen_param * regularization(beta, reg_type)
    
    return beta.numpy(), loss_train, loss_test

In [None]:
n_feat = 300
n_beta = 50
n_obs = 60*7
rho = 0.0
noise = 1.0
test_size = 0.25

#np.random.seed(123)
x_train, x_test, y_train, y_test, beta_true = gen_data(n_feat, n_beta, n_obs, rho, noise, test_size)

In [None]:
# Parameters
learning_rate, epochs, reg_type, pen_param = [0.01, 100, "OLS", 0.0]

# Run
beta_hat, loss_train, loss_test = make_train_model(x_train, y_train, x_test, y_test, learning_rate, epochs, reg_type, pen_param)

plt.title("LS beta parameters") 
plt.xlabel("train") 
plt.ylabel("true") 
plt.plot(beta_hat, beta_true,"ob") 
plt.show()

plt.title("Histogram of parameter values") 
plt.hist(beta_hat)
plt.show()

## Lambda hyper-parameter

In [None]:
lambdas = np.linspace(0, 20, 20)
res_train = []
res_test = []

for lbd in lambdas:
    # Parameters
    learning_rate, epochs, reg_type, pen_param = [0.01, 100, "Ridge", lbd]
    # Run
    beta_hat, loss_train, loss_test = make_train_model(x_train, y_train, x_test, y_test, 
                                                       learning_rate, epochs, reg_type, pen_param,
                                                       log=False)
    # Save
    res_train.append(loss_train)
    res_test.append(loss_test)
    
plt.title("RMSE for different penalty parameter") 
plt.xlabel("lambda") 
plt.ylabel("RMSE") 
plt.plot(lambdas, res_train, label='train') 
plt.plot(lambdas, res_test, label='test') 
plt.legend()
plt.show()