# 205 Scratch

Paper: *Learned Uncertainty-Aware (LUNA) Bases for Bayesian Regression using Multi-Headed Auxiliary Networks*

In [None]:
from autograd import numpy as np
from autograd import grad
from autograd.misc.optimizers import adam, sgd
from autograd import scipy as sp
import autograd.numpy.random as npr
import pandas as pd
import numpy
import matplotlib.pyplot as plt
import sys

In [None]:
class Feedforward:
    def __init__(self, architecture, random=None, weights=None):
        self.params = {'H': architecture['width'],
                       'L': architecture['hidden_layers'],
                       'D_in': architecture['input_dim'],
                       'D_out': architecture['output_dim'],
                       'activation_type': architecture['activation_fn_type'],
                       'activation_params': architecture['activation_fn_params']}

        self.D = (  (architecture['input_dim'] * architecture['width'] + architecture['width'])
                  + (architecture['output_dim'] * architecture['width'] + architecture['output_dim'])
                  + (architecture['hidden_layers'] - 1) * (architecture['width']**2 + architecture['width'])
                 )

        if random is not None:
            self.random = random
        else:
            self.random = np.random.RandomState(0)

        self.h = architecture['activation_fn']

        if weights is None:
            self.weights = self.random.normal(0, 1, size=(1, self.D))
        else:
            self.weights = weights

        self.objective_trace = np.empty((1, 1))
        self.weight_trace = np.empty((1, self.D))


    def forward(self, weights, x, final_layer_out=False):
        ''' Forward pass given weights and input '''
        H = self.params['H']
        D_in = self.params['D_in']
        D_out = self.params['D_out']

        assert weights.shape[1] == self.D

        if len(x.shape) == 2:
            assert x.shape[0] == D_in
            x = x.reshape((1, D_in, -1))
        else:
            assert x.shape[1] == D_in

        weights = weights.T

        #input to first hidden layer
        W = weights[:H * D_in].T.reshape((-1, H, D_in))
        b = weights[H * D_in:H * D_in + H].T.reshape((-1, H, 1))
        input = self.h(np.matmul(W, x) + b)
        index = H * D_in + H

        assert input.shape[1] == H

        #additional hidden layers
        for _ in range(self.params['L'] - 1):
            before = index
            W = weights[index:index + H * H].T.reshape((-1, H, H))
            index += H * H
            b = weights[index:index + H].T.reshape((-1, H, 1))
            index += H
            output = np.matmul(W, input) + b
            input = self.h(output)

            assert input.shape[1] == H

        #output layer #THIS IS JUST FOR FINAL LAYER, INPUT STORES VALUES OF FINAL ALYER
        W = weights[index:index + H * D_out].T.reshape((-1, D_out, H))
        b = weights[index + H * D_out:].T.reshape((-1, D_out, 1))
        output = np.matmul(W, input) + b
        assert output.shape[1] == self.params['D_out']

        #finallayer
        final_layer = np.array(input, copy=True) #autograd doesn't like np.copy

        if final_layer_out:
            return final_layer
        else:
            return output
    
    def make_objective(self, x_train, y_train, reg_param):

        def objective(W, t):
            squared_error = np.linalg.norm(y_train - self.forward(W, x_train), axis=1)**2
            if reg_param is None:
                sum_error = np.sum(squared_error)
                return sum_error
            else:
                mean_error = np.mean(squared_error) + reg_param * np.linalg.norm(W)
                return mean_error

        return objective, grad(objective)
    
    
    def fit(self, x_train, y_train, params, reg_param=None,):

        assert x_train.shape[0] == self.params['D_in']
        assert y_train.shape[0] == self.params['D_out']

        ### make objective function for training
        self.objective, self.gradient = self.make_objective(x_train, y_train, reg_param)

        ### set up optimization
        step_size = 0.01
        max_iteration = 5000
        check_point = 100
        weights_init = self.weights.reshape((1, -1))
        mass = None
        optimizer = 'adam' # DEFAULT. CHANGE IN PARAMS
        opt_gradient = self.gradient # DEFAULT. CHANGE IN PARAMS
        random_restarts = 5

        if 'step_size' in params.keys():
            step_size = params['step_size']
        if 'max_iteration' in params.keys():
            max_iteration = params['max_iteration']
        if 'check_point' in params.keys():
            self.check_point = params['check_point']
        if 'init' in params.keys():
            weights_init = params['init']
        if 'call_back' in params.keys():
            call_back = params['call_back']
        if 'mass' in params.keys():
            mass = params['mass']
        if 'optimizer' in params.keys():
            optimizer = params['optimizer']
        if  'opt_gradient' in params.keys():
            gradient = params['opt_gradient']
        if 'random_restarts' in params.keys():
            random_restarts = params['random_restarts']

        def call_back(weights, iteration, g):
            ''' Actions per optimization step '''
            objective = self.objective(weights, iteration)
            self.objective_trace = np.vstack((self.objective_trace, objective))
            self.weight_trace = np.vstack((self.weight_trace, weights))
            if iteration % check_point == 0:
                print("Iteration {} lower bound {}; gradient mag: {}".format(iteration, objective, np.linalg.norm(self.gradient(weights, iteration))))

        ### train with random restarts
        optimal_obj = 1e16
        optimal_weights = self.weights
        
        # AM 205: THIS IS THE ONLY PLACE YOU WILL NEED TO MODIFY CODE
        for i in range(random_restarts):

            
            if optimizer == 'adam':
                adam(opt_gradient, weights_init, step_size=step_size, num_iters=max_iteration, callback=call_back)
            
            local_opt = np.min(self.objective_trace[-100:])
            if local_opt < optimal_obj:
                opt_index = np.argmin(self.objective_trace[-100:])
                self.weights = self.weight_trace[-100:][opt_index].reshape((1, -1))
            weights_init = self.random.normal(0, 1, size=(1, self.D))

        self.objective_trace = self.objective_trace[1:]
        self.weight_trace = self.weight_trace[1:]

In [None]:

data = pd.read_csv('HW8_data.csv')
x_train = data['x'].values.reshape((1, -1))
y_train = data['y'].values.reshape((1, -1))
data

In [None]:
###relu activation
activation_fn_type = 'relu'
activation_fn = lambda x: np.maximum(np.zeros(x.shape), x)


###neural network model design choices
width = 5
hidden_layers = 1
input_dim = 1
output_dim = 1

architecture = {'width': width,
               'hidden_layers': hidden_layers,
               'input_dim': input_dim,
               'output_dim': output_dim,
               'activation_fn_type': 'relu',
               'activation_fn_params': 'rate=1',
               'activation_fn': activation_fn}

#set random state to make the experiments replicable
rand_state = 0
random = np.random.RandomState(rand_state)

#instantiate a Feedforward neural network object
nn = Feedforward(architecture, random=random)

In [None]:
###define design choices in gradient descent
params = {'step_size':1e-3, 
          'max_iteration':15000, 
          'random_restarts':1,
          'optimizer':'adam'}

#fit my neural network to minimize MSE on the given data
nn.fit(x_train, y_train, params)

In [None]:
#test x-values
x_test = np.linspace(-8, 8, 100).reshape((1, -1))
print(nn.weights.shape, x_test.shape)
#predict on the test x-values
y_test_pred = nn.forward(nn.weights, x_test)
#visualize the function learned by the neural network
plt.scatter(x_train.flatten(), y_train.flatten(), color='black', label='data')
plt.plot(x_test.flatten(), y_test_pred.flatten(), color='red', label='learned neural network function')
plt.legend(loc='best')
plt.show()

In [None]:
##
from scipy.stats import multivariate_normal

class NLM():

    def __init__(self,X,Y,prior_var,y_var,architecture, random_state):

        self.ff = Feedforward(architecture, random = random_state) #CHANGE
        
        #super().__init__(architecture)
        self.X = X # X training
        self.Y = Y # Y training data
        
        self.prior_var = prior_var
        self.y_var = y_var
        self.w_prior_pdf = lambda w : multivariate_normal(mean = np.zeros(shape = w.shape), cov = prior_variance*np.eye(w.shape[1])).pdf(w)
        self.w_prior_sampler = lambda : np.random.default_rng().multivariate_normal(mean = np.zeros(shape = w.shape), cov = prior_variance*np.eye(w.shape[1]))

    def train(self):
        
        # Fit Weights
        self.ff.fit(self.X, self.Y, params)

        # Transform X with Feature Map for Bayes Reg
        fm_x_matrix = self.ff.forward(self.ff.weights, self.X, final_layer_out=True)
        
        print(fm_x_matrix)
        # Conduct Bayes Reg on Final Layer 
        posterior_samples = self.get_bayes_lr_posterior(self.prior_var,
                                                                self.y_var,
                                                                fm_x_matrix.T, #fm_x_matrix, 
                                                                self.Y.T, 
                                                                x_test_matrix = None, 
                                                                samples=100)


        print("DONE")
        
        

    
    def get_bayes_lr_posterior(self, prior_var, noise_var, x_matrix, y_matrix, x_test_matrix=None, samples=100):
        '''Function to generate posterior predictive samples for Bayesian linear regression model'''
        prior_variance = np.diag(prior_var * np.ones(x_matrix.shape[1])) # make it 2 D
        prior_precision = np.linalg.inv(prior_variance)

        joint_precision = prior_precision + x_matrix.T.dot(x_matrix) / noise_var 
        joint_variance = np.linalg.inv(joint_precision)
        joint_mean = joint_variance.dot(x_matrix.T.dot(y_matrix)) / noise_var

        #sampling 100 points from the posterior
        posterior_samples = np.random.multivariate_normal(joint_mean.flatten(), joint_variance, size=samples)

        #take posterior predictive samples
        if x_test_matrix != None:
            posterior_predictions = np.dot(posterior_samples, x_test_matrix.T) 
            posterior_predictive_samples = posterior_predictions[np.newaxis, :, :] + np.random.normal(0, noise_var**0.5, size=(100, posterior_predictions.shape[0], posterior_predictions.shape[1]))
            posterior_predictive_samples = posterior_predictive_samples.reshape((100 * posterior_predictions.shape[0], posterior_predictions.shape[1]))
            return posterior_predictions, posterior_predictive_samples
        else:
            return posterior_samples

In [None]:

prior_var = 1.0
y_var = 2.0
test_nlm = NLM(x_train, y_train, prior_var,y_var, architecture, random_state = np.random.RandomState(0))

In [None]:
test_nlm.train()