Kernel Ridge Regression


# import

In [1]:
import numpy as np
import pandas as pd
import matplotlib.pylab as plt
import itertools
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from tqdm import tqdm_notebook as tqdm
import time

# functions

In [2]:
def get_func(theta, x_obs, h, Kernel):
    """
    Parameters:
    --------------
    theta: np.array
        parameter
    x_obs: np.matrix
        observed x
    K: func
        kernel function (default: gauss kernel)
    h: double
        gauss range
        
    Returns:
    --------------
    func_linear_model: func
            predicting function
    """
    def func_linear_model(x):
        """
        Parameters:
        --------------
        x: np.array
            x
            
        Returns:
        --------------
        ans: double
            predicted x
        """
        n = theta.size

        assert x_obs.shape[0] == n, 'x_obs is invalid'

        ans = 0
        for j in range(n):
            ans += theta[j]*Kernel(x,x_obs[j])
        return ans
        
    return func_linear_model

def get_theta_opt(x_obs,y_obs,lam,Kernel):
    n = y_obs.size
    
    K_mat = np.zeros([n,n])
    
    list_ij = [i for i in range(n)]
    for i, j in itertools.product(list_ij, list_ij):
        x_i,x_j = x_obs[i,:],x_obs[j,:]
        K_mat[i,j] = Kernel(x_i,x_j)

    theta_opt = np.linalg.inv(K_mat@K_mat+lam*np.eye(n))@np.transpose(K_mat)@y_obs
    return theta_opt

def get_loss(x_val, y_val, func):
    loss = 0
    for i in range(y_val.size):
        x_pred = func(x_val[i,:])
        loss += (x_pred - y_val[i])**2
    return loss/y_val.size

def cross_validate(x_obs,y_obs,
                   x_test,y_test,
                   lam,k,h,
                   K='gauss',silence=True):
    """
    Parameters:
    --------------
    x_obs: np.matrix
        observed x
    y_obs: np.array
        observed y
    k: int
        number of split
    """
    n = y_obs.size
    
    # assert n%k==0, 'n%k is not 0.'
    
    if K=='gauss':
        Kernel = lambda x,c: np.exp(-np.abs(np.sum((x-c)**2))/(2*h**2))
    
    loss_list = np.zeros(n//k) # store errors calculated in each step
    func_list = [] # store functions calculated in each step
    
    if not silence:
        print('===========================')
        print('Parameters:\n')
        print('L2 regularization: lambda = {0}'.format(lam))
        print('gauss range: h = {0}'.format(h))
        print('# of data for validation: k = {0}'.format(k))
        print('===========================')
        print('Start calculating optimal parameters for each step.')
    
    for n_step in tqdm(range(n//k)):
        # Z_i
        x_val = x_obs[n_step*k:n_step*k+k,:]
        y_val = y_obs[n_step*k:n_step*k+k]
        
        # delete Z_i from all dataset
        x_obs_temp = np.delete(x_obs,np.s_[n_step*k:n_step*k+k],0)
        y_obs_temp = np.delete(y_obs,np.s_[n_step*k:n_step*k+k],0)
        
        # calculate optimal parameters with the train data, and define temporary function
        theta_opt_temp = get_theta_opt(x_obs_temp,y_obs_temp,lam,Kernel)
        func_temp = get_func(theta_opt_temp, x_obs_temp, h, Kernel)
        
        # calculate the error with the rest data
        loss_list[n_step] = get_loss(x_val, y_val, func_temp)
        
        func_list += [func_temp]
    
    # results
    def func_pred(x):
        ans = 0
        for i in range(len(func_list)):
            ans += func_list[i](x)
        return ans/len(func_list)
    if not silence:
        print('===========================')
        print('Finished learning. Calculating the train error...')
    train_loss = 0
    test_loss = 0
    for i in tqdm(range(n),desc='train data'):
        x_pred = func_pred(x_obs[i,:])
        train_loss += (x_pred-y_obs[i])**2
        
    for i in tqdm(range(len(y_test)),desc='test data'):
        x_pred = func_pred(x_test[i,:])
        test_loss += (x_pred-y_test[i])**2
        
    print('train loss:{0}'.format((train_loss/n)[0]))
    print('test loss:{0}'.format((test_loss/len(y_test))[0]))
    #return final_loss/n, loss_list

# dataset

In [3]:
# load dataset
boston = load_boston()
df_X = pd.DataFrame(boston.data, columns=boston.feature_names)
df_Y = pd.DataFrame(boston.target, columns=['target'])

X, Y = df_X.values, df_Y.values

In [4]:
df_X.head(3)

Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT
0,0.00632,18.0,2.31,0.0,0.538,6.575,65.2,4.09,1.0,296.0,15.3,396.9,4.98
1,0.02731,0.0,7.07,0.0,0.469,6.421,78.9,4.9671,2.0,242.0,17.8,396.9,9.14
2,0.02729,0.0,7.07,0.0,0.469,7.185,61.1,4.9671,2.0,242.0,17.8,392.83,4.03


In [5]:
df_Y.head(3)

Unnamed: 0,target
0,24.0
1,21.6
2,34.7


In [6]:
X_train, X_test, y_train, y_test = train_test_split(X, Y,train_size=0.8)



# learning

In [9]:
cross_validate(X_train,y_train,X_test,y_test,lam=0.00000001,k=80,h=100,silence=False)

Parameters:

L2 regularization: lambda = 1e-08
gauss range: h = 100
# of data for validation: k = 80
Start calculating optimal parameters for each step.


HBox(children=(IntProgress(value=0, max=5), HTML(value='')))


Finished learning. Calculating the train error...


HBox(children=(IntProgress(value=0, description='train data', max=404, style=ProgressStyle(description_width='…




HBox(children=(IntProgress(value=0, description='test data', max=102, style=ProgressStyle(description_width='i…


train loss:6.023894129723013
test loss:11.357379029212519
