In [3]:
import numpy as np
import matplotlib.pyplot as plt

## Random Features

In [4]:
from sklearn.base import BaseEstimator
from sklearn.exceptions import NotFittedError

# Random Fourier features class
class RFFKernel(BaseEstimator):
    """
    Approximates the feature map of a kernel by Monte Carlo approximation of 
    it's Fourier transform. Implements random Fourier features [1].
        
    Parameters
    ----------
    gamma : float
        Parameter of the Gaussian kernel exp(-gamma * w^2)
        
    D : int
        Number of Monte Carlo samples per feature
    
    [1] Rahimi, A., & Recht, B. (2008). Random features for large-scale kernel machines. 
        In Advances in neural information processing systems (pp. 1177-1184)
    """
    
    def __init__(self,  D=50):
        self.D = D
        self.is_fitted = False 
    
    def fit(self, X):
        """
        Draws D samples for direction w and random offset b
        
        X : Data {array, matrix}, shape (n_samples, n_dimension) 
        
        Returns
        -------
        self : object
            Returns the direction vector w, the offset b and the boolean
            fitted.
        """
        
        dimension = X.shape[1] # dimension of the data
        self.w_direction = np.random.normal(size=(self.D, dimension)) #*np.sqrt(2 * self.gamma)
        self.b_offset = np.random.uniform(0, 2*np.pi, size=(1, self.D))
        self.is_fitted = True
        
        return self
    
    def _transform(self, X):
        """
        Apply the approximate feature map to X.
        
        Parameters
        ----------
        X : Data {array, matrix}, shape (n_samples, n_features)
        
        Returns
        -------
        Z : array of transformed features, shape (n_samples, n_components [D])
        """
        
        Xw = X.dot(self.w_direction.T)
        
        Z = np.sqrt(2 / self.D) * np.cos(Xw + self.b_offset)
        
        return Z
    
    def approx_kernel(self, X, Y=None, gamma=1):
        """
        Computes the kernel gram matrix using the transformed Fourier features
        
        Parameters
        ----------
        X : Data {array, matrix}, shape (n_samples, n_features)
        
        Y : Data {array, matrix}, shape (n_samples, n_features)
        
        gamma: lengthscale {float} 
        
        Returns
        -------
        K : gram matrix (n_samples, n_samples)
        """
        if not self.is_fitted:
            raise NotFittedError('Must call .fit(X) before the kernel can be approximated.')
                
        Zx = self._transform(X) 
        if Y is not None:
            Zy = self._transform(Y)
            K = Zx.dot(Zy.T) * np.sqrt(2 * gamma)
        else:
            K = Zx.dot(Zx.T) * np.sqrt(2 * gamma)      

        return K

### KL Divergence

In [5]:
def kl_divergence(mu, cov):
    """
    Computes the KL divergence between a multivariate normal with mean (mu) and 
    covariance (cov) and a standard multivariate normal N(0,I)
    
    Parameters
    ----------
    mu: mean of the posterior {array}, shape (n_samples + 1)

    S: covariance of the posterior {matrix}, shape (n_samples + 1, n_samples + 1)

    Returns
    -------
    KL : value of KL divergence {float}
    
    """
    n = mu.shape[0]
    kl = 0.5 * (-np.log(np.linalg.det(cov)) - n + np.trace(cov) + np.dot(mu.T,mu))
    return kl

## Bayesian Regression Class

In [6]:
from scipy.optimize import minimize

class BR():
    """
    Bayesian linear regression with radial basis functions where the kernel
    matrix is approximated with random features.
    
    Parameters
    ----------        
    num_rff_samples : int
        Number of Monte Carlo samples per feature
    
    """
    
    def __init__(self, num_rff_samples=2000):
        self.num_rff_samples = num_rff_samples
        self.is_fitted = False 
        
        # Instantiate RFF approximations to the design matrix
        self.kernel = RFFKernel(D=self.num_rff_samples)
        self.post_k_xs = RFFKernel(D=self.num_rff_samples)
        self.pre_k_xs = RFFKernel(D=self.num_rff_samples)
    
    def add_data(self, X, Y):
        """
        Adds new data to the regressor and fits the random features kernel.
        
        Parameters
        ----------        
        X : Data {array, matrix}, shape (n_samples, n_features)
        
        Y : Data {array, matrix}, shape (n_samples, n_features)

        Returns
        -------
        None
        
        """
        self.X = X
        self.Y = Y
        self.kernel.fit(X) 
        self.is_fitted = True
        
    def calculate_posterior(self, gamma=0.5, var_y=0.2, var_w=1):
        """
        Calculates the posterior over the weights given the data and the hyperparameters.
        
        Parameters
        ----------        
        gamma : {float} random feature lengthscale  
        
        var_y: {float} target variance 
        
        var_w: {float} prior weight variance  

        Returns
        -------
        mu: mean of the posterior {array}, shape (n_samples + 1)
        
        S: convariance of the posterior {matrix}, shape (n_samples + 1, n_samples + 1)
        
        phi: design matrix {matrix}, shape (n_samples, n_samples + 1)
        
        """
        self.var_y = var_y
        self.gamma = gamma
        
        if not self.is_fitted:
            raise NotFittedError('Must call BR.fit() with new data first.')
                 
        phi = np.append(np.ones((self.X.shape[0],1)), \
                        self.kernel.approx_kernel(self.X, self.X, gamma=self.gamma), axis=1)# Append ones for bias
        pre_w = 1/var_w * np.eye(len(self.X)+1) # prior covariance matrix
        # Mean and covariance matrix for weights given x and y
        self.S = np.linalg.inv((phi.T).dot(phi) / self.var_y + pre_w) # posterior distribution covariance matrix
        self.mu = self.S.dot(phi.T).dot(self.Y) / self.var_y # MAP weights to use in mean(y*)

        return self.mu, self.S, phi
    
    def optimise(self, maxiter=100, disp=False):
        """
        Optimises the variational upper bound using Nelder-Mead method.
        
        Parameters
        ----------        
        maxiter : {int} maximum number of iterations of miminize 
        
        disp: {bool} set to True to print convergence messages

        Returns
        -------
        gamma: {float} random feature lengthscale 
        
        var_y: {float} target variance 
        
        """
    
        if not self.is_fitted:
            raise NotFittedError('Must call BR.fit() before the kernel can be approximated.')
        
        opts={'maxiter': maxiter, 'disp': disp}
        result = minimize(self._upper_bound, [0, 0], method="Nelder-Mead", options=opts)
        print(result.message)
        return np.exp(result.x)
    
        
    def _upper_bound(self, x0):
        """
        Variational upper bound (VUB): -E[log p(Y|f(X))] + KL(q(w)||p(w))
        where -E[log p(Y|f(X))] is the expectation of the Gaussian likelihood,
        and KL(q(w)||p(w)) is the KL divergence between posterior and prior over
        the weights.
        
        Parameters
        ----------        
        x0 : {array}, shape (2,) (gamma, var_y)

        Returns
        -------
        VUB: {float} value of variational upper bound
        """   

        gamma, var_y = np.exp(x0)
        mu, S, phi = self.calculate_posterior(gamma=gamma, var_y=var_y)
        kl = kl_divergence(mu, S) # KL divergence between posterior and prior on the weights
        N = self.X.shape[0]
        log_p = -(N/2)*np.log(2*np.pi*var_y) - (1/(2*var_y))*((self.Y - phi.dot(mu))**2).mean() #likelihood
     
        return -log_p + kl
            
    def sample_weights(self, num_samples=1):
        """
        Draws a sample of the weights from the posterior over the weights.
        
        Parameters
        ----------        
        num_samples : {int} number of sets of weights to draw
        
        Returns
        -------
        weights : {array, matrix}, shape (n_samples + 1, num_samples)
        
        """
        
        return np.random.multivariate_normal(self.mu.squeeze(), self.S, num_samples)
    
    def post_one_step(self, xs, weights):
        """
        Predict for new values xs given a particular set of weights
        
        Parameters
        ----------        
        xs : {float, array} x position(s) for prediction
        
        weights : {array} set of weights drawn from the posterior 
         
        Returns
        -------
        ys : {float, array} predictions
        
        """
        
        self.post_k_xs.fit(xs)
        phi_xs = np.append(np.ones((len(xs),1)),
                           self.post_k_xs.approx_kernel(xs, self.X, self.gamma), axis=1)# Append ones for bias
        ys = phi_xs.dot(weights.T)
        
        return ys
    
    def pred_one_step(self, xs):
        """
        Predict for new values xs from the predictive distribution.
        
        Parameters
        ----------        
        xs : {float, array} x position(s) for prediction
                 
        Returns
        -------
        mu : {float, array} MAP of predictive, shape (len(xs),)
        
        stdev : {float, array} standard deviation of the predictive, shape (len(xs),) 
        
        """
        self.pre_k_xs.fit(xs)
        pred_phi_xs = np.append(np.ones((len(xs),1)), 
                                self.pre_k_xs.approx_kernel(xs, self.X, self.gamma), axis=1)# Append ones for bias
        mu = pred_phi_xs.dot(self.mu) # calculate mean(y*)
        stdev = (np.sum(pred_phi_xs.dot(self.S) * pred_phi_xs, axis = 1) + self.var_y) ** 0.5 # calculate Var(y*)^0.5
                
        return mu, stdev

## Data

In [11]:
import urllib.request
import os.path
from scipy.io import loadmat
from math import floor

if not os.path.isfile('elevators.mat'):
    print('Downloading \'elevators\' UCI dataset...')
    urllib.request.urlretrieve('https://drive.google.com/uc?export=download&id=1jhWL3YUHvXIaftia4qeAyDwVxo6j1alk', 'elevators.mat')

data = np.array(loadmat('elevators.mat')['data'])
X = data[:, :-1]
X = X - X.min(0)[0]
X = 2 * (X / X.max(0)[0]) - 1
y = data[:, -1]

dataX = X[0:1000,0:10]
datay = y[0:1000]

train_n = int(floor(0.8*len(dataX)))

train_x = dataX[:train_n, :]
train_y = datay[:train_n]

test_x = dataX[train_n:, :]
test_y = datay[train_n:]

## Add data and optimize

In [12]:
br = BR()
br.add_data(train_x, train_y)
br.optimise(maxiter=120,disp=True)

Optimization terminated successfully.
         Current function value: -2665.072824
         Iterations: 91
         Function evaluations: 173
Optimization terminated successfully.


array([9.18458169e-06, 7.25443008e-05])

### Predicts outputs for the test set

In [13]:
mu, stdev = br.pred_one_step(test_x)

In [17]:
print('Test MSE: {}'.format(((mu - test_y)**2).mean()))

Test MSE: 0.045809500546632444
