2. Coding problem: implement a gradient descent method for Ridge Regression by using the PyTorch library. Your implementation should be a class that has the required methods “.fit” and “.predict”. You should include an application of your code to a data set.

In [1]:
import torch
import numpy as np
import pandas as pd

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
data = pd.read_csv('drive/MyDrive/cars.csv')

In [3]:
data


Unnamed: 0,MPG,CYL,ENG,WGT
0,18.0,8,307.0,3504
1,15.0,8,350.0,3693
2,18.0,8,318.0,3436
3,16.0,8,304.0,3433
4,17.0,8,302.0,3449
...,...,...,...,...
387,27.0,4,140.0,2790
388,44.0,4,97.0,2130
389,32.0,4,135.0,2295
390,28.0,4,120.0,2625


In [4]:
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error

In [5]:
class RidgeRegression:
    def __init__(self, alpha=1.0, lr=0.01, iter=1000, tol=1e-4):
        self.alpha = alpha
        self.lr = lr
        self.iter = iter
        self.tol = tol
        self.weights = None

    def fit(self, x, y):
        x = torch.tensor(x, dtype=torch.float32)
        y = torch.tensor(y, dtype=torch.float32).reshape(-1, 1)
        samples, features = x.shape
        self.weights = torch.randn(features, 1, requires_grad=True)
        for _ in range(self.iter):
            pred = torch.matmul(x, self.weights)
            loss = torch.mean((pred - y) ** 2) + self.alpha * torch.sum(self.weights ** 2)
            loss.backward()
            with torch.no_grad():
                self.weights -= self.lr * self.weights.grad
                self.weights.grad.zero_()
            if torch.sum(torch.abs(loss - loss.detach())) < self.tol:
                break

    def predict(self, x):
        x = torch.tensor(x, dtype=torch.float32)
        matrix_mult = torch.matmul(x, self.weights).detach()
        return matrix_mult.numpy()

In [6]:
df = pd.DataFrame(data)
x = df[['CYL', 'ENG', 'WGT']].values
y = df['MPG'].values

scale = StandardScaler()
model = RidgeRegression()
x_scaled = scale.fit_transform(x)
x_train, x_test, y_train, y_test = train_test_split(x_scaled, y, test_size=0.2, random_state=42)
model.fit(x_train, y_train)
y_pred = model.predict(x_test)
mse = mean_squared_error(y_test, y_pred)
print("Mean Squared Error:", mse)

Mean Squared Error: 575.2460393540222


3. Complete the exercise provided in the Application to Locally Weighted Regression notebook and test the method on a data set, for example, the one provided in class.

In [7]:
# Libraries of functions need to be imported
import numpy as np
import pandas as pd
from sklearn import linear_model
from sklearn.preprocessing import StandardScaler, MinMaxScaler
from scipy.spatial import Delaunay
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import KFold, GridSearchCV
from sklearn.metrics import mean_squared_error as mse
from scipy import linalg
from scipy.interpolate import interp1d, LinearNDInterpolator, NearestNDInterpolator
from sklearn.decomposition import PCA

# the following line(s) are necessary if you want to make SKlearn compliant functions
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted

In [8]:
# Gaussian Kernel
def Gaussian(x):
  return np.where(np.abs(x)>4,0,1/(np.sqrt(2*np.pi))*np.exp(-1/2*x**2))

# this is the correct vectorized version
def Tricubic(x):
  return np.where(np.abs(x)>1,0,(1-np.abs(x)**3)**3)

# Epanechnikov Kernel
def Epanechnikov(x):
  return np.where(np.abs(x)>1,0,3/4*(1-np.abs(x)**2))

# Quartic Kernel
def Quartic(x):
  return np.where(np.abs(x)>1,0,15/16*(1-np.abs(x)**2)**2)

In [9]:
def dist(u,v):
  if len(v.shape)==1:
    v = v.reshape(1,-1)
    d = np.sqrt(np.sum((u-v)**2,axis=1))
  else:
    d = np.array([np.sqrt(np.sum((u-v[i])**2,axis=1)) for i in range(len(v))])
  return d

In [10]:
def kernel_function(xi,x0,kern, tau):
    return kern(dist(xi,x0)/(2*tau))

In [11]:
def weights_matrix(x,x_new,kern,tau):
  if np.isscalar(x_new):
    w = kernel_function(x,x_new,kern,tau)
  else:
    if len(x_new.shape)==1:
      n = 1
      w = kernel_function(x,x_new,kern,tau=0.05).reshape(1,-1)
    else:
      n = len(x_new)
      w = np.array([kernel_function(x,x_new[i],kern,tau) for i in range(n)]).reshape(n,len(x))
  return w

In [12]:
def weight_function(u,v,kern=Gaussian,tau=0.5):
    return kern(dist(u,v)/(2*tau))

In [13]:
class Lowess:
    def __init__(self, kernel = Gaussian, tau=0.05):
        self.kernel = kernel
        self.tau = tau

    def fit(self, x, y):
        kernel = self.kernel
        tau = self.tau
        self.xtrain_ = x
        self.yhat_ = y

    def predict(self, x_new):
        check_is_fitted(self)
        x = self.xtrain_
        y = self.yhat_
        lm = linear_model.Ridge(alpha=0.001)
        w = weight_function(x,x_new,self.kernel,self.tau)

        if np.isscalar(x_new):
          lm.fit(np.diag(w)@(x.reshape(-1,1)),np.diag(w)@(y.reshape(-1,1)))
          yest = lm.predict([[x_new]])[0][0]
        else:
          n = len(x_new)
          yest_test = np.zeros(n)
          #Looping through all x-points
          for i in range(n):
            lm.fit(np.diag(w[:,i])@x,np.diag(w[:,i])@y)
            yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
        return yest_test

In [14]:
x = data[['CYL', 'ENG', 'WGT']]
y = data['MPG']
scale = StandardScaler()
model = Lowess(kernel=Epanechnikov,tau=0.02)
xscaled = scale.fit_transform(x)
model.fit(xscaled,y)
yhat = model.predict(xscaled)
mse(yhat,y)

  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[i] = lm.predict(x_new[i].reshape(1,-1))
  yest_test[

3.227026887555525