In [None]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler as SS
from sklearn import linear_model
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split as tts
from sklearn.metrics import mean_squared_error as mse
from scipy import linalg
from scipy.interpolate import interp1d
import warnings
warnings.filterwarnings('ignore')
# the following line(s) are necessary if you want to make SKlearn compliant functions
from sklearn.base import BaseEstimator, RegressorMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted

In [86]:
def tricubic(x):
  return np.where(np.abs(x)>1,0,(1-np.abs(x)**3)**3)

In [None]:
def Gaussian(x):
  return np.where(np.abs(x)>4,0,1/(np.sqrt(2*np.pi))*np.exp(-1/2*x**2))

In [72]:
def Epanechnikov(x):
  return np.where(np.abs(x)>1,0,3/4*(1-np.abs(x)**2))

In [81]:
def Quartic(x):
  return np.where(np.abs(x)>1,0,15/16*(1-np.abs(x)**2)**2)

In [None]:
import math

In [None]:
lm = linear_model.LinearRegression()

In [69]:
class Lowess:
    def __init__(self, kernel = Gaussian, tau=0.5):
        self.kernel = kernel
        self.tau = tau

    def fit(self, x, y):
        kernel = self.kernel
        tau = self.tau
        # w = weights_matrix(x,x,kernel,tau)
        # if np.isscalar(x):
        #   lm.fit(np.diag(w).dot(x.reshape(-1,1)),np.diag(w).dot(y.reshape(-1,1)))
        #   yest = lm.predict([[x]])[0][0]
        # else:
        #   n = len(x)
        #   yest = np.zeros(n)
        #   #Looping through all x-points
        #   for i in range(n):
        #     lm.fit(np.diag(w[i,:]).dot(x.reshape(-1,1)),np.diag(w[i,:]).dot(y.reshape(-1,1)))
        #     yest[i] = lm.predict(x[i].reshape(-1,1))
        self.xtrain_ = x
        self.yhat_ = y.reshape(-1,1)
        self.yhat_ = y

    def kernel_function(self,k=-1):
        n=len(self.xtrain_)
        if k>=0:
          return np.array([self.kernel(math.dist(self.xtrain_[i], self.xtest_[k])/(2*self.tau)) for i in range(n)])
        else:
          return np.array([self.kernel(math.dist(self.xtrain_[i], self.xtest_)/(2*self.tau)) for i in range(n)])

    def weights_matrix(self):
        if np.isscalar(self.xtest_):
          return self.kernel_function()
        else:
          n = len(self.xtest_)
          return np.array([self.kernel_function(i) for i in range(n)])

    def predict(self,x_new):
        ss=SS()
        self.xtrain_=ss.fit_transform(self.xtrain_)
        self.xtest_=ss.transform(x_new)
        check_is_fitted(self)
        if self.xtrain_.ndim==2:
          num_features = x.shape[1]
        else:
          num_features = 1

        w = self.weights_matrix()

        if np.isscalar(self.xtest_):
          lm.fit(np.diag(w).dot(self.xtrain_.reshape(-1,1)),np.diag(w).dot(self.yhat_.reshape(-1,1)))
          yest = lm.predict([[self.xtrain_]])[0][0]
        else:
          n = len(self.xtest_)
          yest_test = np.zeros(n)
          #Looping through all x-points
          if num_features>1:
            for i in range(n):
              lm.fit(np.diag(w[i,:]).dot(self.xtrain_),np.diag(w[i,:]).dot(self.yhat_.reshape(-1,1)))
              yest_test[i] = lm.predict(self.xtest_[i].reshape(1,-1))
          else:
            for i in range(n):
              lm.fit(np.diag(w[i,:]).dot(self.xtrain_.reshape(-1,1)),np.diag(w[i,:]).dot(self.yhat_.reshape(-1,1)))
              yest_test[i] = lm.predict(self.xtest_[i].reshape(1,-1))
        return yest_test


In [None]:
cars = pd.read_csv("drive/My Drive/DATA 440 Capstone/data/mtcars.csv")

In [74]:
def do_Kfold(model,X,y,k=10,scaler = None, random_state = 146):
    from sklearn.model_selection import KFold

    kf = KFold(n_splits=k, random_state = random_state, shuffle=True)

    train_scores = []
    test_scores = []

    for idxTrain, idxTest in kf.split(X):
        Xtrain = X[idxTrain, :]
        Xtest = X[idxTest, :]
        ytrain = y[idxTrain]
        ytest = y[idxTest]
        if scaler != None:
            Xtrain = scaler.fit_transform(Xtrain)
            Xtest = scaler.transform(Xtest)

        model.fit(Xtrain,ytrain)
        test_scores.append(mse(model.predict(Xtest),ytest))

    return np.mean(test_scores)

In [None]:
x=cars[['wt','hp']].values
y=cars['mpg'].values

In [None]:
do_Kfold(Lowess(),x,y,10)

(32, 2)

In [87]:
tau_range=np.linspace(0.01,10,100)
cost=[]
for tau in tau_range:
  crossval=do_Kfold(Lowess(tau=tau,kernel=tricubic),x,y,10)
  cost.append([crossval,tau])

In [88]:
min_mse=cost[0][0]
min_tau=cost[0][1]
for c in cost:
  if c[0]<min_mse:
    min_mse=c[0]
    min_tau=c[1]
print(min_mse,min_tau)

5.024128215467668 4.449999999999999


In [99]:
xtrain, xtest, ytrain, ytest = tts(x,y,test_size=0.2,shuffle=True,random_state=123)
model=Lowess(tau=min_tau,kernel=tricubic)
model.fit(xtrain,ytrain)
mse(model.predict(xtest),ytest)

1.6997942085141902

In [94]:
lm.fit(xtrain,ytrain)
lm.predict(xtest)

array([22.96278771, 22.8765268 , 20.54809227, 25.79538626, 22.00473509,
       27.33034789, 17.06393448])

In [95]:
ytest

array([24.4, 21.4, 18.1, 26. , 22.8, 30.4, 17.3])