Import libraries:

In [1]:
import numpy as np
from scipy.stats import mode
from IPython import display
import seaborn as sns
sns.set_style('white')
import matplotlib.pyplot as plt
%matplotlib inline
from warnings import filterwarnings
filterwarnings('ignore')

In [2]:
import theano
import pymc3 as pm
import lasagne
import theano.tensor as T

floatX = theano.config.floatX

In [3]:
import sklearn
from sklearn import datasets
from sklearn.preprocessing import scale
from sklearn.model_selection import train_test_split
from sklearn.datasets import make_moons, make_blobs, make_circles
from sklearn.metrics import accuracy_score

Some global variables:

In [4]:
ADVI_ITERS = 50000
N_SAMPLES = 500

# Bayesian Neural Networks in PyMC3

In [5]:
class PriorWeights(object):
    def __init__(self, mode='W', prior='gauss', **params):
        self.count = 0
        self.prior = prior
        self.mode = mode
        params.setdefault('std', 1.)
        params.setdefault('hyper', None)
        self.params = params
    def __call__(self, shape):
        self.count += 1
        if self.params['hyper'] is None:
            std = self.params['std']
        elif self.params['hyper'] == 'cauchy':
            std = pm.HalfCauchy('hyper_%s%d' % (self.mode, self.count), beta=1.)
        elif self.params['hyper'] == 'normal':
            std = pm.HalfNormal('hyper_%s%d' % (self.mode, self.count), sd=1.)
        elif self.params['hyper'] == 'invgamma':
            std = pm.InverseGamma('hyper_%s%d' % (self.mode, self.count), alpha=1., beta=1.)
        if self.prior == 'gauss':
            return pm.Normal('%s%d' % (self.mode, self.count), mu=0, sd=std, 
                         testval=np.random.normal(size=shape).astype(np.float64),
                         shape=shape)
        elif self.prior == 'laplace': 
            return pm.Laplace('%s%d' % (self.mode, self.count), mu=0, b=std, 
                         testval=np.random.normal(size=shape).astype(np.float64),
                         shape=shape)
        elif self.prior == 'cauchy': 
            return pm.Cauchy('%s%d' % (self.mode, self.count), alpha=1, beta=1., 
                         testval=np.random.normal(size=shape).astype(np.float64),
                         shape=shape)
        elif self.prior == 'flat':
            return pm.Flat('%s%d' % (self.mode, self.count), 
                           testval=np.random.normal(size=shape).astype(np.float64), 
                           shape=shape)

In [6]:
def build_ann(prior_b, prior_W, input_var, target_var, 
              input_shape, params=[5, 5, 2]):
    with pm.Model() as neural_network:
        l_in = lasagne.layers.InputLayer(shape=input_shape,
                                         input_var=input_var)
        n_hid1, n_hid2, n_classes = params
        l_hid1 = lasagne.layers.DenseLayer(
            l_in, num_units=n_hid1,
            nonlinearity=lasagne.nonlinearities.tanh,
            b=prior_b,
            W=prior_W
        )
        l_hid2 = lasagne.layers.DenseLayer(
            l_hid1, num_units=n_hid2,
            nonlinearity=lasagne.nonlinearities.tanh,
            b=prior_b,
            W=prior_W
        )
        l_out = lasagne.layers.DenseLayer(
            l_hid2, num_units=n_classes,
            nonlinearity=lasagne.nonlinearities.softmax,
            b=prior_b,
            W=prior_W
        )

        prediction = lasagne.layers.get_output(l_out)
        out = pm.Categorical('out', prediction, observed=target_var, 
                             total_size=y_train.shape[0])
    
    return neural_network

In [7]:
def create_minibatch(data):
    rng = np.random.RandomState(0)
    
    while True:
        ixs = rng.randint(len(data), size=100)
        yield data[ixs]

In [8]:
def get_prediction(samples_proba):
    return mode(np.argmax(sample_proba(X_test, 500), 
                          axis=-1), 
                axis=0).mode[0]

# Data generation

In [19]:
def plot_data(X, y, filename, visualize=False, save=True):
    fig, ax = plt.subplots(figsize=(8, 8))
    ax.scatter(X[y==0, 0], X[y==0, 1], edgecolors='k', label='Class 0')
    ax.scatter(X[y==1, 0], X[y==1, 1], color='r', edgecolors='k', label='Class 1')
    ax.set_xlabel('Feature 1', fontsize=23)
    ax.set_ylabel('Feature 2', fontsize=23)
    ax.set_title('Data', fontsize=31)
    plt.tight_layout()
    
    if save:
        plt.savefig('../pic/' + filename + '.png')
    
    if not visualize:
        plt.close()

In [10]:
def plot_ppm(grid, ppc, filename, visualize=False):
    cmap = sns.diverging_palette(250, 12, s=85, l=25, as_cmap=True)
    fig, ax = plt.subplots(figsize=(10, 8))
    contour = ax.contourf(grid[0], grid[1], ppc[:, :, 1].mean(axis=0).reshape(100, 100), cmap=cmap)
    ax.scatter(X_test[y_pred==0, 0], X_test[y_pred==0, 1], edgecolors='k')
    ax.scatter(X_test[y_pred==1, 0], X_test[y_pred==1, 1], edgecolors='k', color='r')
    cbar = plt.colorbar(contour, ax=ax)
    _ = ax.set(xlim=(-3, 3), ylim=(-3, 3))
    ax.set_xlabel('Feature 1', fontsize=23)
    ax.set_ylabel('Feature 2', fontsize=23)
    ax.set_title('Posterior probability', fontsize=31);
    plt.tight_layout()
    plt.savefig('../pic/' + filename + '.png')
    
    if not visualize:
        plt.close()

In [11]:
def plot_uncertainty(grid, ppc, filename, visualize=False):
    cmap = sns.cubehelix_palette(light=1, as_cmap=True)
    fig, ax = plt.subplots(figsize=(10, 8))
    contour = ax.contourf(grid[0], grid[1], ppc[:, :, 1].std(axis=0).reshape(100, 100), cmap=cmap)
    ax.scatter(X_test[y_pred==0, 0], X_test[y_pred==0, 1], edgecolors='k')
    ax.scatter(X_test[y_pred==1, 0], X_test[y_pred==1, 1], edgecolors='k', color='r')
    cbar = plt.colorbar(contour, ax=ax)
    _ = ax.set(xlim=(-3, 3), ylim=(-3, 3));
    ax.set_xlabel('Feature 1', fontsize=23)
    ax.set_ylabel('Feature 2', fontsize=23)
    ax.set_title('Uncertainty', fontsize=31);
    plt.tight_layout()
    plt.savefig('../pic/' + filename + '.png')
    
    if not visualize:
        plt.close()

## Experiment design

In [12]:
design = [
    ['flat', None, 'flat', None], 
    ['flat', None, 'cauchy', None], 
    ['flat', None, 'laplace', None],
    ['flat', None, 'gauss', None],
    ['flat', None, 'gauss', 'cauchy'],
    ['flat', None, 'gauss', 'invgamma']
]

## Moons 

In [58]:
X, y = make_moons(noise=0.4, random_state=0, n_samples=1200)
X = scale(X)
X = X.astype(floatX)
y = y.astype(floatX)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=.7)
input_shape = list(X_train.shape)
input_shape[0] = None
input_shape = tuple(input_shape)
data_name = 'moons'
plot_data(X, y, data_name + '/' + data_name + '_data')

In [59]:
for params in design:
    b_prior, b_hyper, w_prior, w_hyper = params
    minibatch_X = pm.generator(create_minibatch(X_train))
    minibatch_y = pm.generator(create_minibatch(y_train))
    neural_network_minibatch = build_ann(PriorWeights(mode='b', prior=b_prior, hyper=b_hyper), 
                                         PriorWeights(mode='W', prior=w_prior, hyper=w_hyper),
                                         minibatch_X, minibatch_y, 
                                         input_shape, params=[5, 5, 2])
    with neural_network_minibatch:
        inference = pm.ADVI()
        approx = pm.fit(ADVI_ITERS, method=inference)

    x = T.matrix('X')
    n = T.iscalar('n')
    theano.config.compute_test_value = 'off'
    _sample_proba = approx.sample_node(neural_network_minibatch.out.distribution.p, 
                                       size=n,
                                       more_replacements={minibatch_X:x})
    sample_proba = theano.function([x, n], _sample_proba)
    y_pred = get_prediction(sample_proba(X_test, N_SAMPLES))
    accuracy = accuracy_score(y_test, y_pred)
    grid = np.mgrid[-3:3:100j,-3:3:100j].astype(floatX)
    grid_2d = grid.reshape(2, -1).T
    dummy_out = np.ones(grid.shape[1], dtype=np.int8)
    ppc = sample_proba(grid_2d, N_SAMPLES)
    if b_hyper == None:
        b_hyper_ = 'none'
    else:
        b_hyper_ = b_hyper
    if w_hyper == None:
        w_hyper_ = 'none'
    else:
        w_hyper_ = w_hyper
    plot_ppm(grid, ppc, data_name + '/ppm_' + '_'.join([data_name,
                                                        'b', b_hyper_, b_prior,
                                                        'w', w_hyper_, w_prior,
                                                        'acc=%.3f' % accuracy]))
    plot_uncertainty(grid, ppc, data_name + '/uncertainty_' + '_'.join([data_name,
                                                                        'b', b_hyper_, b_prior,
                                                                        'w', w_hyper_, w_prior,
                                                                        'acc=%.3f' % accuracy]))
    display.clear_output(wait=True)

Average Loss = 145.17: 100%|███████████████████████████████████████████████████| 50000/50000 [00:19<00:00, 2525.18it/s]
Finished [100%]: Average Loss = 145.25


## Circles

In [73]:
X, y = make_circles(noise=0.333, random_state=0, n_samples=1000, factor=0.2)
X = scale(X)
X = X.astype(floatX)
y = y.astype(floatX)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=.7)
input_shape = list(X_train.shape)
input_shape[0] = None
input_shape = tuple(input_shape)
data_name = 'circles'
plot_data(X, y, data_name + '/' + data_name + '_data')

In [74]:
for params in design:
    b_prior, b_hyper, w_prior, w_hyper = params
    minibatch_X = pm.generator(create_minibatch(X_train))
    minibatch_y = pm.generator(create_minibatch(y_train))
    neural_network_minibatch = build_ann(PriorWeights(mode='b', prior=b_prior, hyper=b_hyper), 
                                         PriorWeights(mode='W', prior=w_prior, hyper=w_hyper),
                                         minibatch_X, minibatch_y, 
                                         input_shape, params=[5, 5, 2])
    with neural_network_minibatch:
        inference = pm.ADVI()
        approx = pm.fit(ADVI_ITERS, method=inference)

    x = T.matrix('X')
    n = T.iscalar('n')
    theano.config.compute_test_value = 'off'
    _sample_proba = approx.sample_node(neural_network_minibatch.out.distribution.p, 
                                       size=n,
                                       more_replacements={minibatch_X:x})
    sample_proba = theano.function([x, n], _sample_proba)
    y_pred = get_prediction(sample_proba(X_test, N_SAMPLES))
    accuracy = accuracy_score(y_test, y_pred)
    grid = np.mgrid[-3:3:100j,-3:3:100j].astype(floatX)
    grid_2d = grid.reshape(2, -1).T
    dummy_out = np.ones(grid.shape[1], dtype=np.int8)
    ppc = sample_proba(grid_2d, N_SAMPLES)
    if b_hyper == None:
        b_hyper_ = 'none'
    else:
        b_hyper_ = b_hyper
    if w_hyper == None:
        w_hyper_ = 'none'
    else:
        w_hyper_ = w_hyper
    plot_ppm(grid, ppc, data_name + '/ppm_' + '_'.join([data_name,
                                                        'b', b_hyper_, b_prior,
                                                        'w', w_hyper_, w_prior,
                                                        'acc=%.3f' % accuracy]))
    plot_uncertainty(grid, ppc, data_name + '/uncertainty_' + '_'.join([data_name,
                                                                        'b', b_hyper_, b_prior,
                                                                        'w', w_hyper_, w_prior,
                                                                        'acc=%.3f' % accuracy]))
    display.clear_output(wait=True)

Average Loss = 148.31: 100%|███████████████████████████████████████████████████| 50000/50000 [00:21<00:00, 2358.97it/s]
Finished [100%]: Average Loss = 148.26


## Blobs

In [13]:
X, y = make_blobs(cluster_std=1.9, random_state=0, centers=2, n_samples=1000)
X = scale(X)
X = X.astype(floatX)
y = y.astype(floatX)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=.7)
input_shape = list(X_train.shape)
input_shape[0] = None
input_shape = tuple(input_shape)
data_name = 'blobs'
plot_data(X, y, data_name + '/' + data_name + '_data')

In [None]:
for params in design:
    b_prior, b_hyper, w_prior, w_hyper = params
    minibatch_X = pm.generator(create_minibatch(X_train))
    minibatch_y = pm.generator(create_minibatch(y_train))
    neural_network_minibatch = build_ann(PriorWeights(mode='b', prior=b_prior, hyper=b_hyper), 
                                         PriorWeights(mode='W', prior=w_prior, hyper=w_hyper),
                                         minibatch_X, minibatch_y, 
                                         input_shape, params=[5, 5, 2])
    with neural_network_minibatch:
        inference = pm.ADVI()
        approx = pm.fit(ADVI_ITERS, method=inference)

    x = T.matrix('X')
    n = T.iscalar('n')
    theano.config.compute_test_value = 'off'
    _sample_proba = approx.sample_node(neural_network_minibatch.out.distribution.p, 
                                       size=n,
                                       more_replacements={minibatch_X:x})
    sample_proba = theano.function([x, n], _sample_proba)
    y_pred = get_prediction(sample_proba(X_test, N_SAMPLES))
    accuracy = accuracy_score(y_test, y_pred)
    grid = np.mgrid[-3:3:100j,-3:3:100j].astype(floatX)
    grid_2d = grid.reshape(2, -1).T
    dummy_out = np.ones(grid.shape[1], dtype=np.int8)
    ppc = sample_proba(grid_2d, N_SAMPLES)
    if b_hyper == None:
        b_hyper_ = 'none'
    else:
        b_hyper_ = b_hyper
    if w_hyper == None:
        w_hyper_ = 'none'
    else:
        w_hyper_ = w_hyper
    plot_ppm(grid, ppc, data_name + '/ppm_' + '_'.join([data_name,
                                                        'b', b_hyper_, b_prior,
                                                        'w', w_hyper_, w_prior,
                                                        'acc=%.3f' % accuracy]))
    plot_uncertainty(grid, ppc, data_name + '/uncertainty_' + '_'.join([data_name,
                                                                        'b', b_hyper_, b_prior,
                                                                        'w', w_hyper_, w_prior,
                                                                        'acc=%.3f' % accuracy]))
    display.clear_output(wait=True)

## Complex

In [83]:
size = 6000
X1, y1 = make_circles(noise=0.1, random_state=0, n_samples=3*size//4, factor=1.)
X2, y2 = make_circles(noise=0.1, random_state=0, n_samples=size, factor=0.6)
X3, y3 = make_circles(noise=0.1, random_state=0, n_samples=size//4, factor=0.2)

X = np.concatenate((X1[y1==1], X2[y2==1], X3[y3==1]), axis=0)
y = np.concatenate((np.ones(X1[y1==1].shape[0]), 
                    np.zeros(X2[y2==1].shape[0]), 
                    np.ones(X3[y3==1].shape[0])))
X = scale(X)
X = X.astype(floatX)
y = y.astype(floatX)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=.7)
input_shape = list(X_train.shape)
input_shape[0] = None
input_shape = tuple(input_shape)
data_name = '3circles'
plot_data(X, y, data_name + '/' + data_name + '_data')

In [84]:
for params in design:
    b_prior, b_hyper, w_prior, w_hyper = params
    minibatch_X = pm.generator(create_minibatch(X_train))
    minibatch_y = pm.generator(create_minibatch(y_train))
    neural_network_minibatch = build_ann(PriorWeights(mode='b', prior=b_prior, hyper=b_hyper), 
                                         PriorWeights(mode='W', prior=w_prior, hyper=w_hyper),
                                         minibatch_X, minibatch_y, 
                                         input_shape, params=[15, 15, 2])
    with neural_network_minibatch:
        inference = pm.ADVI()
        approx = pm.fit(ADVI_ITERS, method=inference)

    x = T.matrix('X')
    n = T.iscalar('n')
    theano.config.compute_test_value = 'off'
    _sample_proba = approx.sample_node(neural_network_minibatch.out.distribution.p, 
                                       size=n,
                                       more_replacements={minibatch_X:x})
    sample_proba = theano.function([x, n], _sample_proba)
    y_pred = get_prediction(sample_proba(X_test, N_SAMPLES))
    accuracy = accuracy_score(y_test, y_pred)
    grid = np.mgrid[-3:3:100j,-3:3:100j].astype(floatX)
    grid_2d = grid.reshape(2, -1).T
    dummy_out = np.ones(grid.shape[1], dtype=np.int8)
    ppc = sample_proba(grid_2d, N_SAMPLES)
    if b_hyper == None:
        b_hyper_ = 'none'
    else:
        b_hyper_ = b_hyper
    if w_hyper == None:
        w_hyper_ = 'none'
    else:
        w_hyper_ = w_hyper
    plot_ppm(grid, ppc, data_name + '/ppm_' + '_'.join([data_name,
                                                        'b', b_hyper_, b_prior,
                                                        'w', w_hyper_, w_prior,
                                                        'acc=%.3f' % accuracy]))
    plot_uncertainty(grid, ppc, data_name + '/uncertainty_' + '_'.join([data_name,
                                                                        'b', b_hyper_, b_prior,
                                                                        'w', w_hyper_, w_prior,
                                                                        'acc=%.3f' % accuracy]))
    display.clear_output(wait=True)

Average Loss = 864.38: 100%|███████████████████████████████████████████████████| 50000/50000 [00:35<00:00, 1408.45it/s]
Finished [100%]: Average Loss = 864.34
