In [1]:
import numpy as np
import matplotlib.pyplot as plt
import time
from data_utils import load_dataset

In [2]:
testing = True

Below is a construction of the dictionary of basis functions. The mauna_loa training dataset has sinusoidal behaviour with an increasing, seemingly linear, vertical offset. Thus I have decided to use a combination of polynomial, sine, and cosine basis functions.

As polynomial regression has a tendency to overfit with higher order terms, I will use polynomial terms up to order 5 (??? should change this bc answer uses this... ???). As we are asked to have at least 200 basis functions, I will also use 100 sine functions and 100 cosine functions.

The sine and cosine functions increasing frequencies, i.e., they have frequencies of (w)(pi) with increasing w.

In [3]:
PI = np.pi
max_order = 5
max_w = 100

# Dict has 211 basis functions
Dict = []

p = [p for p in range(0,max_order+1)]
Dict += [lambda x, i=i: x**i for i in p]

w = [w for w in range(1,max_w+1)]
Dict += [lambda x, i=i: np.sin(i*PI*x) for i in w]
Dict += [lambda x, i=i: np.cos(i*PI*x) for i in w]

# https://www.reddit.com/r/learnpython/comments/zajla6/how_to_return_a_list_of_lambda_function/

In [4]:
# this stuff needs to be tested!!! --> working!!!

def J(phi_i, r_k):
    # phi_i is the ith column of the PHI matrix
    # where the ith column of the PHI matrix contains the mapping
    # of each training point x_{1-N} to the feature phi_i
    # the ith row of the PHI matrix is the mapping of x_i to all phi_{1-M}
    # i.e., PHI_ij = 
    return np.square(phi_i.T.dot(r_k)) / phi_i.T.dot(phi_i)

def mdl(N, l2_loss, k):
    return N/2 * np.log(l2_loss) + k/2 * np.log(N)

def greedy_regression_model(x_train, y_train, epsilon_r, testing=False):
    N = x_train.shape[0]
    k = 0
    I_selected = []
    I_candidates = np.arange(0,len(Dict))
    r_k = y_train # residual/training error vector

    MDL = np.inf
    # MDL will decrease as model complexity grows and then increase as overfitting occurs
    # so once MDL stops decreasing, we will stop the algorithm (stop at some relative growth, epsilon_r)
    # (or until no more basis functions to select)
    
    weights = None
    chosen_k = 0
    
    if testing:
        print("Initial:")
        print("I_selected: ", I_selected)
        print("I_candidates: ", I_candidates)
        print("MDL: ", MDL)
    
    while I_candidates.shape[0] > 0:
        # 1) increment k
        #    k is also equal to num fcns in the model after this iteration
        k += 1
        
        # 2) pick new basis fcn from Dict
        #    use orthogonal matching pursuit metric
        #    i.e., pick basis fcn from candidates that maximmizes J
        Js = []
        for i in I_candidates:
            Js.append(J(np.array(Dict[i](x_train)), r_k))
        i_k = I_candidates[np.argmax(Js)]
        
        # 3) add selected basis fcn to I_selected
        I_selected.append(i_k)
        
        # 4) delete selected basis fcn from I_candidates
        I_candidates = np.delete(I_candidates, i_k)
        
        # 5) use current I_selected model to solve for weights
        #    use SVD to compute weights
        PHI = np.empty((N, k))
        for i, fcn in enumerate(I_selected):
            PHI[:,i] = Dict[fcn](x_train).reshape(PHI[:,i].shape)

        U, sigma, V_t = np.linalg.svd(PHI, full_matrices=False, compute_uv=True, hermitian=False)
        weights = np.linalg.multi_dot([V_t.T, np.linalg.inv(np.diag(sigma)), U.T, y_train])
        
        # 6) update residual/training error vector
        y_hat = PHI.dot(weights)
        r_k = y_train - y_hat
        
        # 7) check MDL stopping criterion
        temp = mdl(N, np.sqrt(np.sum(np.square(r_k))), k)
        done = False
        if temp >= MDL:
            if abs(temp - MDL)/abs(MDL) > epsilon_r:
                done = True
        else:
            MDL = temp
            chosen_k = k
            
        if testing:
            print("\n")
            print("Iteration ", k
            print("chosen index: ". i_k)
            print("I_selected: ", I_selected)
            print("I_candidates: ", I_candidates)    
            print("PHI shape: ", PHI.shape)
            print("prev MDL: ", MDL)
            print("MDL: ", temp)
            print("chosen_k: ", chosen_k)
            
            plt.plot(x_train, y_train, label="actual")
            plt.plot(x_train, y_hat, label="pred")
            plt.xlabel("x")
            plt.ylabel("y")
            plt.legend(loc="best")
            plt.show()
        
        if done:
            break
    
    if testing:
        print("\nFinal:")
        print("num fcns selected: ", len(I_selected))
        print("chosen k: ", chosen_k)
        print("\n")

    return I_selected[:chosen_k], weights[:chosen_k]

SyntaxError: '(' was never closed (4204735121.py, line 79)

In [None]:
x_train, x_valid, x_test, y_train, y_valid, y_test = load_dataset('mauna_loa')
x_train = np.vstack([x_valid, x_train])
y_train = np.vstack([y_valid, y_train])

epsilon_r = 0.1

I_selected, weights = greedy_regression_model(x_train, y_train, epsilon_r, testing)
print("num features: ", len(I_selected))
print("num weights: ", len(weights))

In [None]:
PHI = np.empty((x_test.shape[0], len(I_selected)))
for i, fcn in enumerate(I_selected):
    PHI[:,i] = Dict[fcn](x_test).reshape(PHI[:,i].shape)

y_predict = PHI.dot(weights)
RMSE = np.sqrt(np.mean(np.square(y_test-y_predict)))

print("RMSE = ", RMSE)

plt.plot(x_test, y_test, label="actual")
plt.plot(x_test, y_predict, label="pred")
plt.xlabel("x")
plt.ylabel("y")
plt.legend(loc="best")
plt.show()