### Setup

In [2]:
import torch
import torch.nn as nn
#from ignite.contrib.metrics.regression import R2Score
import torch.optim as optim
%config InlineBackend.figure_format = 'retina'
import numpy as np
import pandas as pd
from sklearn import linear_model
from sklearn.datasets import make_spd_matrix
from scipy.optimize import minimize
from scipy.linalg import toeplitz
from sklearn.model_selection import train_test_split as tts
from sklearn.preprocessing import StandardScaler, QuantileTransformer, MinMaxScaler
from sklearn.metrics import mean_absolute_error as mae, mean_squared_error as mse, r2_score as R2

In [3]:
device = torch.device("cpu")
dtype = torch.float64

#1)
### Create your own PyTorch class that implements the method of SCAD regularization and variable selection (smoothly clipped absolute deviations) for linear models. Your development should be based on the following references:

https://andrewcharlesjones.github.io/journal/scad.html

https://www.jstor.org/stable/27640214?seq=1

In [104]:
# scad penalty from the first link, changing numpy methods to pytorch.
def scad_penalty(beta_hat, lambda_val, a_val):
  is_linear = (torch.abs(beta_hat) <= lambda_val)
  is_quadratic = torch.logical_and(lambda_val < torch.abs(beta_hat), torch.abs(beta_hat) <= a_val * lambda_val)
  is_constant = (a_val * lambda_val) < torch.abs(beta_hat)

  linear_part = lambda_val * torch.abs(beta_hat) * is_linear
  quadratic_part = (2 * a_val * lambda_val * torch.abs(beta_hat) - beta_hat**2 - lambda_val**2) / (2 * (a_val - 1)) * is_quadratic
  constant_part = (lambda_val**2 * (a_val + 1)) / 2 * is_constant
  return linear_part + quadratic_part + constant_part

In [123]:
class SCADReg(nn.Module):


    def __init__(self, input_size, lamb = .1, a = 3.7):
        super(SCADReg, self).__init__()
        self.input_size = input_size
        self.a = a
        self.lamb = lamb

        # Define the linear regression layer
        self.linear = nn.Linear(input_size, 1).double()


    def forward(self, x):
        return self.linear(x)


    def scad_penalty(beta_hat, lambda_val, a_val):
      is_linear = (torch.abs(beta_hat) <= lambda_val)
      is_quadratic = torch.logical_and(lambda_val < torch.abs(beta_hat), torch.abs(beta_hat) <= a_val * lambda_val)
      is_constant = (a_val * lambda_val) < np.abs(beta_hat)

      linear_part = lambda_val * torch.abs(beta_hat) * is_linear
      quadratic_part = (2 * a_val * lambda_val * np.abs(beta_hat) - beta_hat**2 - lambda_val**2) / (2 * (a_val - 1)) * is_quadratic
      constant_part = (lambda_val**2 * (a_val + 1)) / 2 * is_constant
      return linear_part + quadratic_part + constant_part


    def loss(self, y_pred, y_true):
        mse_loss = nn.MSELoss()(y_pred, y_true)

        beta_hat = self.linear.weight
        scad_penalty_val = torch.mean(scad_penalty(beta_hat, self.lamb, self.a))

        loss = 0.5*mse_loss + scad_penalty_val
        return loss, scad_penalty_val


    def fit(self, X, y, num_epochs=10000, learning_rate=0.01):
        optimizer = optim.Adam(self.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            self.train()
            optimizer.zero_grad()
            y_pred = self(X)
            loss, scad_penalty_val = self.loss(y_pred, y.reshape(-1,1))
            loss.backward()
            optimizer.step()

            if (epoch + 1) % 1000 == 0:
                print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item()}")


    def predict(self, X):
      self.eval()
      with torch.no_grad():
          y_pred = self(X)
      return y_pred


    def get_coefficients(self):
        return self.linear.weight



###Test your method on a real data set, and determine a variable selection based on features importance according to SCAD.

In [106]:
# testing on the cement dataset
cement = pd.read_csv('/content/drive/MyDrive/W&M/F23/DATA301_Data/concrete.csv')
cement.head(3)

Unnamed: 0,cement,slag,ash,water,superplastic,coarseagg,fineagg,age,strength
0,540.0,0.0,0.0,162.0,2.5,1040.0,676.0,28,79.99
1,540.0,0.0,0.0,162.0,2.5,1055.0,676.0,28,61.89
2,332.5,142.5,0.0,228.0,0.0,932.0,594.0,270,40.27


In [107]:
x = cement.loc[:,'cement':'age'].values
y = cement['strength'].values

In [108]:
xtrain, xtest, ytrain, ytest = tts(x,y,test_size=0.2,shuffle=True,random_state=123)

In [109]:
# I am using standard scaler because SCAD assumes that all values are centered around zero
# which is how the Standard Scaler scales the data.
scale = StandardScaler()

In [110]:
xtrain_scale = torch.tensor(scale.fit_transform(xtrain)).type(torch.DoubleTensor)
ytrain = torch.tensor(ytrain).type(torch.DoubleTensor)

In [111]:
model = SCADReg(a = 3.7, lamb = .7, input_size = 8)

In [112]:
model.fit(xtrain_scale, ytrain)

Epoch [1000/10000], Loss: 404.26740379002047
Epoch [2000/10000], Loss: 221.29938890299607
Epoch [3000/10000], Loss: 118.55526164592177
Epoch [4000/10000], Loss: 71.90273566424479
Epoch [5000/10000], Loss: 57.40822812910194
Epoch [6000/10000], Loss: 55.22370978268554
Epoch [7000/10000], Loss: 55.131841740552275
Epoch [8000/10000], Loss: 55.13138941162357
Epoch [9000/10000], Loss: 55.13138935134478
Epoch [10000/10000], Loss: 55.13138935134475


In [113]:
model.get_coefficients()

Parameter containing:
tensor([[11.7108,  8.1723,  4.9558, -3.5771,  1.8533,  0.7980,  0.8426,  7.1944]],
       dtype=torch.float64, requires_grad=True)

In [114]:
xtest_scale = torch.tensor(scale.fit_transform(xtest))
ytest = torch.tensor(ytest)

In [115]:
mse(model.predict(xtest_scale), ytest)

102.4760490974356

Based on the coefficients produced by the SCAD model, variables that should be included are cement, slag, ash, water, superplastic,	and age (variables 1, 2, 3, 4, 5, and 8). The SCAD model made the coefficients for coarseagg and fineagg (variables 6 and 7) close to zero (<1), indicating that these variables have little to no impact on predicting the strength of cement, and are not useful in the model.

---
#2)

### Based on the simulation design explained in class, generate 500 data sets where the input features have a strong correlation structure (you may consider a 0.9).



In [153]:
def make_correlated_features(num_samples,p,rho):
  vcor = []
  for i in range(p):
    vcor.append(rho**i)
  r = toeplitz(vcor)
  mu = np.repeat(0,p)
  x = np.random.multivariate_normal(mu, r, size=num_samples)
  return x

In [154]:
rho = 0.9
p = 20
n = 500
vcor = []
for i in range(p):
  vcor.append(rho**i)

In [155]:
x = make_correlated_features(n, p, rho)

###Apply ElasticNet, SqrtLasso and SCAD to check which method produces the best approximation of an ideal solution, such as a "betastar" you design with a sparsity pattern.

### Elastic Net

In [140]:
class ElasticNet(nn.Module):
    def __init__(self, input_size, alpha=1.0, l1_ratio=0.5):
        """
        Initialize the ElasticNet regression model.

        Args:
            input_size (int): Number of input features.
            alpha (float): Regularization strength. Higher values of alpha
                emphasize L1 regularization, while lower values emphasize L2 regularization.
            l1_ratio (float): The ratio of L1 regularization to the total
                regularization (L1 + L2). It should be between 0 and 1.

        """
        super(ElasticNet, self).__init__()
        self.input_size = input_size
        self.alpha = alpha
        self.l1_ratio = l1_ratio

        # Define the linear regression layer
        self.linear = nn.Linear(input_size, 1, device = device, dtype = dtype)

    def forward(self, x):
        """
        Forward pass of the ElasticNet model.

        Args:
            x (Tensor): Input data with shape (batch_size, input_size).

        Returns:
            Tensor: Predicted values with shape (batch_size, 1).

        """
        return self.linear(x)

    def loss(self, y_pred, y_true):
        """
        Compute the ElasticNet loss function.

        Args:
            y_pred (Tensor): Predicted values with shape (batch_size, 1).
            y_true (Tensor): True target values with shape (batch_size, 1).

        Returns:
            Tensor: The ElasticNet loss.

        """
        mse_loss = nn.MSELoss()(y_pred, y_true)
        l1_reg = torch.norm(self.linear.weight, p=1)
        l2_reg = torch.norm(self.linear.weight, p=2)

        loss = mse_loss + self.alpha * (
            self.l1_ratio * l1_reg + (1 - self.l1_ratio) * l2_reg
        )

        return loss

    def fit(self, X, y, num_epochs=10000, learning_rate=0.01):
        """
        Fit the ElasticNet model to the training data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).
            y (Tensor): Target values with shape (num_samples, 1).
            num_epochs (int): Number of training epochs.
            learning_rate (float): Learning rate for optimization.

        """
        optimizer = optim.SGD(self.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            self.train()
            optimizer.zero_grad()
            y_pred = self(X)
            loss = self.loss(y_pred, y)
            loss.backward()
            optimizer.step()

            if (epoch + 1) % 1000 == 0:
                print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item()}")

    def predict(self, X):
        """
        Predict target values for input data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).

        Returns:
            Tensor: Predicted values with shape (num_samples, 1).

        """
        self.eval()
        with torch.no_grad():
            y_pred = self(X)
        return y_pred
    def get_coefficients(self):
        """
        Get the coefficients (weights) of the linear regression layer.

        Returns:
            Tensor: Coefficients with shape (output_size, input_size).

        """
        return self.linear.weight


### Sqrt Lasso

In [132]:
class sqrtLasso(nn.Module):
    def __init__(self, input_size, alpha=0.1):
        """
        Initialize the  regression model.


        """
        super(sqrtLasso, self).__init__()
        self.input_size = input_size
        self.alpha = alpha


        # Define the linear regression layer
        self.linear = nn.Linear(input_size, 1).double()

    def forward(self, x):
        """
        Forward pass of the model.

        Args:
            x (Tensor): Input data with shape (batch_size, input_size).

        Returns:
            Tensor: Predicted values with shape (batch_size, 1).

        """
        return self.linear(x)

    def loss(self, y_pred, y_true):
        """
        Compute the loss function.

        Args:
            y_pred (Tensor): Predicted values with shape (batch_size, 1).
            y_true (Tensor): True target values with shape (batch_size, 1).

        Returns:
            Tensor: The loss.

        """
        mse_loss = nn.MSELoss()(y_pred, y_true)
        l1_reg = torch.norm(self.linear.weight, p=1,dtype=torch.float64)
        # l2_reg = torch.norm(self.linear.weight, p=2,dtype=torch.float64)

        loss = (len(y_true)*mse_loss)**(1/2) + self.alpha * (l1_reg)

        return loss

    def fit(self, X, y, num_epochs=10000, learning_rate=0.01):
        """
        Fit the model to the training data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).
            y (Tensor): Target values with shape (num_samples, 1).
            num_epochs (int): Number of training epochs.
            learning_rate (float): Learning rate for optimization.

        """
        optimizer = optim.Adam(self.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            self.train()
            optimizer.zero_grad()
            y_pred = self(X)
            loss = self.loss(y_pred, y)
            loss.backward()
            optimizer.step()

            if (epoch + 1) % 1000 == 0:
                print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item()}")

    def predict(self, X):
        """
        Predict target values for input data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).

        Returns:
            Tensor: Predicted values with shape (num_samples, 1).

        """
        self.eval()
        with torch.no_grad():
            y_pred = self(X)
        return y_pred
    def get_coefficients(self):
        """
        Get the coefficients (weights) of the linear regression layer.

        Returns:
            Tensor: Coefficients with shape (output_size, input_size).

        """
        return self.linear.weight

### Testing the models

In [156]:
beta =np.array([-1,2,3,0,0,0,0,2,-1,4])

In [157]:
beta = beta.reshape(-1,1)
beta_star = np.concatenate([beta,np.repeat(0,p-len(beta)).reshape(-1,1)],axis=0)

In [158]:
y = x@beta_star + 1.5*np.random.normal(size = (n,1))

In [159]:
xtrain, xtest, ytrain, ytest = tts(x,y,test_size=0.2,shuffle=True,random_state=123)

In [161]:
x_scaled = scale.fit_transform(xtrain)
xtest_tens = torch.tensor(x_scaled, device = device)
ytest_tens = torch.tensor(ytrain, device = device)

In [147]:
model_SCAD = SCADReg(input_size = x.size()[1])
model_EN = ElasticNet(input_size = x.size()[1])
model_SqrtLasso = sqrtLasso(input_size = x.size()[1])

In [164]:
model_SCAD = SCADReg(input_size = xtest_tens.size()[1])
model_SCAD.fit(xtest_tens, ytest_tens)
y_hat = model_SCAD.predict(xtest_tens)
mse_loss = nn.MSELoss()(y_hat, ytest_tens)

Epoch [1000/10000], Loss: 1.3151421674105745
Epoch [2000/10000], Loss: 1.0976174253130437
Epoch [3000/10000], Loss: 1.0733681561908968
Epoch [4000/10000], Loss: 1.072457815216144
Epoch [5000/10000], Loss: 1.0724529875031295
Epoch [6000/10000], Loss: 1.0724529777562744
Epoch [7000/10000], Loss: 1.0724524586594706
Epoch [8000/10000], Loss: 1.0724552779192258
Epoch [9000/10000], Loss: 1.072460027360491
Epoch [10000/10000], Loss: 1.0724646744753414


In [165]:
model_SqrtLasso = sqrtLasso(input_size = xtest_tens.size()[1])
model_SqrtLasso.fit(xtest_tens, ytest_tens)
y_hat = model_SqrtLasso.predict(xtest_tens)
mse_loss = nn.MSELoss()(y_hat, ytest_tens)

Epoch [1000/10000], Loss: 30.6751338270882
Epoch [2000/10000], Loss: 30.521426759661576
Epoch [3000/10000], Loss: 30.521385374205103
Epoch [4000/10000], Loss: 30.52153559974206
Epoch [5000/10000], Loss: 30.521616522122887
Epoch [6000/10000], Loss: 30.521543248149744
Epoch [7000/10000], Loss: 30.521732154593955
Epoch [8000/10000], Loss: 30.521734511637327
Epoch [9000/10000], Loss: 30.521807895736977
Epoch [10000/10000], Loss: 30.52156891463577


In [166]:
model_EN = ElasticNet(input_size = xtest_tens.size()[1])
model_EN.fit(xtest_tens, ytest_tens)
y_hat = model_EN.predict(xtest_tens)
mse_loss = nn.MSELoss()(y_hat, ytest_tens)

Epoch [1000/10000], Loss: 9.021857302383726
Epoch [2000/10000], Loss: 9.017577752421776
Epoch [3000/10000], Loss: 9.016047487422668
Epoch [4000/10000], Loss: 9.017165388247399
Epoch [5000/10000], Loss: 9.012702954895332
Epoch [6000/10000], Loss: 9.014681363381541
Epoch [7000/10000], Loss: 9.014390451034965
Epoch [8000/10000], Loss: 9.016199270106968
Epoch [9000/10000], Loss: 9.014235268118831
Epoch [10000/10000], Loss: 9.015929884126217


### The SCAD model produces the lowest MSE, followed by Elastic Net, and then Sqaure Root Lasso.