For all the classes and methods in this assignment you will use the PyTorch library. You should use a double precision data type and the device is either "cpu" or "cuda".

In [237]:
import torch

In [238]:
device = torch.device("cpu")
dtype = torch.float64

## Question 1

Create your own PyTorch class that implements the method of SCAD regularization and variable selection (smoothly clipped absolute deviations) for linear models. Your development should be based on the following references:

https://andrewcharlesjones.github.io/journal/scad.html

https://www.jstor.org/stable/27640214?seq=1
            
Test your method on a real data set, and determine a variable selection based on features importance according to SCAD.

In [239]:
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split as tts
from sklearn.metrics import r2_score
import pandas as pd
import numpy as np
import torch.nn as nn
import torch.optim as optim

In [240]:
class SCAD(nn.Module):

  def __init__(self, input_size, lambda_val = .5, a_val = .4):
    super(SCAD, self).__init__()
    self.input_size = input_size
    self.lambda_val = lambda_val
    self.a_val = a_val
    self.linear = nn.Linear(input_size, 1, bias = False, device = device, dtype = dtype)

  def forward(self, x):
    return self.linear(x)

  def loss(self, X, y, y_pred, coeffs):
    xT_x = torch.mm(torch.transpose(X, 0, 1), X)
    scad_deriv = self._scad_derivative(coeffs, self.lambda_val, self.a_val)
    mse_loss = nn.MSELoss(reduction='mean')(torch.flatten(y_pred), torch.flatten(y))
    return mse_loss + torch.norm(torch.matmul(torch.inverse(xT_x + torch.matmul(scad_deriv/(2*torch.abs(coeffs)),torch.eye(X.shape[1], dtype = dtype))),torch.matmul(torch.transpose(X, 0, 1), y)))


  def fit(self, X, y, num_epochs = 100, learning_rate = .05, verbose = True):
    optimizer = optim.Adam(self.parameters(), lr = learning_rate)

    for epoch in range(num_epochs):
      self.train()
      optimizer.zero_grad()
      coeffs = self.linear.weight[0]
      y_pred = self(X)
      loss = self.loss(X, y, y_pred, coeffs)
      loss.backward()
      optimizer.step()

      if verbose:
        print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item()}")

  def get_coefficients(self,):
    return self.linear.weight

  def predict(self, X):
    self.eval()
    with torch.no_grad():
      y_pred = self(X)
    return y_pred

  def _scad_penalty(self, beta_hat, lambda_val, a_val):
    is_linear = (torch.abs(beta_hat) <= lambda_val)
    is_quadratic = torch.logical_and(lambda_val < torch.abs(beta_hat), torch.abs(beta_hat) <= a_val * lambda_val)
    is_constant = (a_val * lambda_val) < torch.abs(beta_hat)

    linear_part = lambda_val * torch.abs(beta_hat) * is_linear
    quadratic_part = (2 * a_val * lambda_val * torch.abs(beta_hat) - beta_hat**2 - lambda_val**2) / (2 * (a_val - 1)) * is_quadratic
    constant_part = (lambda_val**2 * (a_val + 1)) / 2 * is_constant
    return linear_part + quadratic_part + constant_part

  def _scad_derivative(self, beta_hat, lambda_val, a_val):
    return lambda_val * ((beta_hat <= lambda_val) + (a_val * lambda_val - beta_hat)*((a_val * lambda_val - beta_hat) > 0) / ((a_val - 1) * lambda_val) * (beta_hat > lambda_val))

In [241]:
data = pd.read_csv('drive/MyDrive/Adv. App. Machine Learning/concrete.csv')
data.head(2)

Unnamed: 0,cement,slag,ash,water,superplastic,coarseagg,fineagg,age,strength
0,540.0,0.0,0.0,162.0,2.5,1040.0,676.0,28,79.99
1,540.0,0.0,0.0,162.0,2.5,1055.0,676.0,28,61.89


In [242]:
x = torch.tensor(data.drop(columns=['strength']).values)
y = torch.tensor(data['strength'].values)

In [243]:
scad = SCAD(input_size = x.shape[1])
scad.fit(x, y, num_epochs = 100, verbose = False)

In [244]:
torch.set_printoptions(sci_mode = False)
scad.get_coefficients()

Parameter containing:
tensor([[ 0.0968,  0.0678,  0.0556,  0.0803,  0.3729, -0.0732,  0.0650,  0.0947]],
       dtype=torch.float64, requires_grad=True)

In [245]:
x_scad = data.drop(columns=['strength', 'cement', 'slag', 'coarseagg', 'fineagg', 'age']).values

In [246]:
xscad_train, xscad_test, yscad_train, yscad_test = tts(x_scad, y, test_size = .2, random_state = 4)
x_train, x_test, y_train, y_test = tts(x, y, test_size = .2, random_state = 4)

In [247]:
model = LinearRegression()
model.fit(x_train, y_train)
scad_pred = model.predict(x_test)
print(f"R^2: {model.score(x_test, y_test)}")

scad = SCAD(input_size = x.shape[1])
scad.fit(x_train, y_train, num_epochs = 500, verbose = False)
scad_pred = scad.predict(x_test)
print(f"SCAD R^2: {r2_score(y_test, scad_pred)}")

R^2: 0.5543960836555616
SCAD R^2: 0.5567211375252696


Slightly better!

## Question 2

Based on the simulation design explained in class, generate 500 data sets where the input features have a strong correlation structure (you may consider a 0.9) and apply ElasticNet, SqrtLasso and SCAD to check which method produces the best approximation of an ideal solution, such as a "betastar" you design with a sparsity pattern.

In [248]:
from scipy.linalg import toeplitz
from sklearn.linear_model import ElasticNet

In [249]:
class SqrtLasso(nn.Module):
    def __init__(self, input_size, alpha=0.1):
        """
        Initialize the  regression model.


        """
        super(SqrtLasso, self).__init__()
        self.input_size = input_size
        self.alpha = alpha


        # Define the linear regression layer
        self.linear = nn.Linear(input_size, 1,bias=False,device=device,dtype=dtype)

    def forward(self, x):
        """
        Forward pass of the model.

        Args:
            x (Tensor): Input data with shape (batch_size, input_size).

        Returns:
            Tensor: Predicted values with shape (batch_size, 1).

        """
        return self.linear(x)

    def loss(self, y_pred, y_true):
        """
        Compute the loss function.

        Args:
            y_pred (Tensor): Predicted values with shape (batch_size, 1).
            y_true (Tensor): True target values with shape (batch_size, 1).

        Returns:
            Tensor: The loss.

        """
        mse_loss = nn.MSELoss(reduction='mean')(y_pred, y_true)
        l1_reg = torch.norm(self.linear.weight, p=1,dtype=dtype)
        # l2_reg = torch.norm(self.linear.weight, p=2,dtype=torch.float64)

        loss = torch.sqrt(mse_loss) + self.alpha * (l1_reg)
        return torch.flatten(loss)

    def fit(self, X, y, num_epochs=500, learning_rate=0.01):
        """
        Fit the model to the training data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).
            y (Tensor): Target values with shape (num_samples, 1).
            num_epochs (int): Number of training epochs.
            learning_rate (float): Learning rate for optimization.

        """
        optimizer = optim.Adam(self.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):
            self.train()
            optimizer.zero_grad()
            y_pred = self(X)
            loss = self.loss(y_pred, y)
            loss.backward()
            optimizer.step()

            if (epoch + 1) % 100 == 0:
                print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item()}")

    def predict(self, X):
        """
        Predict target values for input data.

        Args:
            X (Tensor): Input data with shape (num_samples, input_size).

        Returns:
            Tensor: Predicted values with shape (num_samples, 1).

        """
        self.eval()
        with torch.no_grad():
            y_pred = self(X)
        return y_pred
    def get_coefficients(self):
        """
        Get the coefficients (weights) of the linear regression layer.

        Returns:
            Tensor: Coefficients with shape (output_size, input_size).

        """
        return self.linear.weight

In [250]:
# we want to define a function for generating x with a prescribed number of obsvervations, features and Toeplitz correlation structure.
def make_correlated_features(num_samples,p,rho):
  vcor = []
  for i in range(p):
    vcor.append(rho**i)
  r = toeplitz(vcor)
  mu = np.repeat(0,p)
  x = np.random.multivariate_normal(mu, r, size=num_samples)
  return x

In [251]:
rho = 0.9
p = 200
n = 500

x = make_correlated_features(n,p,rho)

In [252]:
beta = np.array([4,1,2,0,1,0,0,3,0,2])
beta = beta.reshape(-1,1)
betastar = np.concatenate([beta,np.repeat(0,p-len(beta)).reshape(-1,1)],axis=0)

In [253]:
y = x@betastar + 2*np.random.normal(size=(n,1))

In [254]:
y.shape, x.shape

((500, 1), (500, 200))

In [255]:
x = torch.tensor(x, device = device)
y = torch.tensor(y, device = device)

In [256]:
elastic = ElasticNet()
elastic.fit(x,y)
print(f"ElasticNet Coefficients: {elastic.coef_}")

sqrtlasso = SqrtLasso(input_size = x.shape[1])
sqrtlasso.fit(x,y)
sqrt_coeff = sqrtlasso.get_coefficients()
print(f"SqrtLasso Coefficients: {sqrt_coeff}")

scad = SCAD(input_size = x.shape[1])
scad.fit(x, y, num_epochs = 500, verbose = False)
print(f"SCAD Coefficients: {scad.get_coefficients()}")

ElasticNet Coefficients: [ 1.98884217  1.64770351  1.33194919  0.93181883  0.84064111  0.47513789
  0.6091949   1.01851896  0.84648865  0.86630733  0.44757007  0.27682252
  0.07205539  0.0282199   0.          0.          0.          0.
  0.          0.          0.          0.         -0.         -0.
 -0.          0.          0.          0.          0.          0.
  0.          0.          0.          0.          0.          0.
  0.          0.          0.          0.          0.          0.
  0.          0.          0.          0.          0.          0.
  0.          0.          0.          0.          0.          0.
  0.          0.          0.          0.         -0.         -0.
 -0.         -0.         -0.          0.         -0.         -0.
 -0.         -0.         -0.         -0.         -0.         -0.
 -0.          0.          0.          0.          0.          0.
  0.          0.          0.         -0.         -0.         -0.
 -0.         -0.         -0.         -0.         

It looks like the ElasticNet did a great job!

## Question 3

Host your project in your GitHub space.

https://willcameron2002.github.io/DATA441/