# Classifications

In [1]:
import math
import time
import os
import csv
import numpy as np
import random
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_probability as tfp
from tensorflow_probability import distributions as tfd
from tensorflow_probability import edward2 as ed
from dbnn.dbnn import DBNN
from dbnn.och import OCH

In [2]:
savefig = False

plt.rcParams["font.family"] = "serif"
plt.rcParams["mathtext.fontset"] = "dejavuserif"

if not savefig:
    plt.rcParams["figure.figsize"] = (4, 4)
    plt.rcParams["font.size"] = 15
    plt.rcParams["figure.titlesize"] = 25
    plt.rcParams["axes.labelsize"] = 20
    plt.rcParams["xtick.labelsize"] = 15
    plt.rcParams["ytick.labelsize"] = 15
    plt.rcParams["legend.fontsize"] = 13
    plt.rcParams["lines.linewidth"] = 2
else:
    plt.rcParams["figure.figsize"] = (8, 8)
    plt.rcParams["font.size"] = 30
    plt.rcParams["axes.labelsize"] = 53
    plt.rcParams["xtick.labelsize"] = 40
    plt.rcParams["ytick.labelsize"] = 40
    plt.rcParams["legend.fontsize"] = 28
    plt.rcParams["lines.linewidth"] = 4

Constants and hyperparameters:

In [3]:
# paths
CLASSIFICATION_FLAG = "classifications"
DATASET_PATH = 'datasets/'
BNN_PATH = 'models_checkpoints/bnn'
DAT_PATH = "leaderboard/%s/" % CLASSIFICATION_FLAG

# experiments
seed = 30
tf.random.set_seed(seed)
np.random.seed(seed)
POSTERIOR_NO = 30

# och parameters
och_x1_params = {'k': 5, 'l': 5.0, 's': 1.0}
och_x_params = {'k': 10, 'l': 0.01, 's': 1.0}
och_y_params = {'k': 10, 'l': 0.01, 's': 1.0}

# style
alpha = 0.12
colors = ["tab:blue", "tab:green", "tab:purple", "tab:red"]
labels = ["DNN", "MU", "DU", "DBNN"]
guide_linestyle=(0, (1, 1))
linestyles = [(0, (5, 1)), 'solid', (0, (5, 1)), 'solid']

In [4]:
def bnn_normal(model, xs, regularizer=None): 
    logits = [tf.expand_dims(logit, axis=-1) for logit in tf.unstack(model(xs), axis=-1)]
    if regularizer is not None:
        logits[1] = tf.math.maximum(logits[1], math.log(math.exp(regularizer) - 1.))
    return tfd.Normal(loc=logits[0], scale=tf.math.softplus(logits[1]))

def bnn_categorical(model, xs):
    logits = model(xs)
    return tfd.Categorical(logits=logits)

In [5]:
def train_epoch(model, x_train, y_train, batch, optimizer, loss_ftn, *loss_metrics):
    indexes_batch = np.array_split(np.random.permutation(len(x_train)), len(x_train) / batch + 1)
    indexes_batch = [indexes for indexes in indexes_batch if len(indexes) > 0]

    for indexes in indexes_batch:
        xs = tf.stack([x_train[index] for index in indexes])
        ys = tf.stack([y_train[index] for index in indexes])
        train_step(model, xs, ys, optimizer, loss_ftn, *loss_metrics)
    
def train_step(model, x_batch, y_batch, optimizer, loss_ftn, *loss_metrics):
    with tf.GradientTape() as tape:
        losses = loss_ftn(model, x_batch, y_batch)
    gradients = tape.gradient(losses[0], model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    for loss, loss_metric in zip(losses, loss_metrics):
        loss_metric(loss)
        
def bnn_normal_loss_ftn(model, xs, ys, length, regularizer=None):
    nll = - tf.reduce_mean(bnn_normal(model, xs, regularizer).log_prob(ys))
    kl = sum(model.losses) / length
    loss = nll + kl
    return loss, nll

def bnn_categorical_loss_ftn(model, xs, ys, length):
    ys = tf.squeeze(ys, axis=-1)
    nll = - tf.reduce_mean(bnn_categorical(model, xs).log_prob(ys))
    kl = sum(model.losses) / length
    loss = nll + kl
    return loss, nll

In [6]:
def test_step(model, xs, ys, sampling, runtime_metric, *loss_metrics):
    time1 = time.time()
    samples = sampling(model, xs)
    for loss_ftn, metric in loss_metrics:
        loss = loss_ftn(samples, ys)
        if loss is None:
            pass
        elif isinstance(metric, list):
            metric.append(loss)
        else:
            metric(loss)
    time2 = time.time()
    runtime_metric(time2 - time1)
        
def test(model, x_test, y_test, batch, sampling, runtime_metric, *loss_metrics):
    indexes_batch = np.array_split(range(len(x_test)), len(x_test) / batch + 1)
    indexes_batch = [indexes for indexes in indexes_batch if len(indexes) > 0]
    time1 = time.time()
    for i, indexes in enumerate(indexes_batch):
        xs = tf.stack([x_test[index] for index in indexes])
        ys = tf.stack([y_test[index] for index in indexes])
        test_step(model, xs, ys, sampling, runtime_metric, *loss_metrics)

        if i % 100 is 0:
            metrics_str = ", ".join([str(metric.result().numpy()) if isinstance(metric, tf.keras.metrics.Metric) else "non-tf.keras.metrics" for _, metric in loss_metrics])
            print("(%.1f sec) %d th iteration: %s" % (time.time() - time1, i, metrics_str))
            time1 = time.time()

def correct_from_samples(sws, ys):
    if sws:
        correct = ys == tf.math.argmax(sum([s * w for s, w in sws]), axis=-1, output_type=tf.int32)
    else:
        correct = None
    return correct
            
def confidence_from_samples(sws):
    if sws:
        confidence = tf.math.reduce_max(sum([s * w for s, w in sws]), axis=-1)
    else:
        confidence = None
    return confidence

def msec_from_samples(ys, classes_no, sws, threshold=0.0):
    confidence = confidence_from_samples(sws)
    if len(sws) is not 0 and confidence >= threshold:
        ys = tf.one_hot(ys, classes_no)
        msec = tf.keras.losses.MSE(ys, sum([s * w for s, w in sws]))
    else:
        msec = None
    return msec

def mse_from_samples(ys, sws):
    if sws:
        mse = tf.keras.losses.MSE(ys, sum([s * w for s, w in sws]))
    else:
        mse = None
    return mse

cce_object = tf.keras.losses.SparseCategoricalCrossentropy()

def cce_from_samples(ys, sws):
    if sws:
        cce = cce_object(ys, sum([s * w for s, w in sws]))
    else:
        cce = None    
    return cce

In [7]:
LAYER_NAMES = DENSE_1, DENSE_2, DENSE_3 = "dense_1", "dense_2", "dense_3"

def create_bnn(classes_no, unit=50):
    return tf.keras.Sequential([
        tfp.layers.DenseFlipout(unit, activation=tf.nn.relu, name=DENSE_1),
        tfp.layers.DenseFlipout(unit, activation=tf.nn.relu, name=DENSE_2),
        tfp.layers.DenseFlipout(classes_no, name=DENSE_3)
    ])

def freeze(bnn, input_dim, posterior_no):
    kernel_d1, bias_d1 = [], []
    kernel_d2, bias_d2 = [], []
    kernel_d3, bias_d3 = [], []
    
    layer1 = bnn.get_layer(DENSE_1) 
    layer2 = bnn.get_layer(DENSE_2) 
    layer3 = bnn.get_layer(DENSE_3) 

    layer1.kernel_posterior_tensor_fn = lambda d: append(kernel_d1, d.sample())
    layer1.bias_posterior_tensor_fn = lambda d: append(bias_d1, d.sample())
    layer2.kernel_posterior_tensor_fn = lambda d: append(kernel_d2, d.sample())
    layer2.bias_posterior_tensor_fn = lambda d: append(bias_d2, d.sample())
    layer3.kernel_posterior_tensor_fn = lambda d: append(kernel_d3, d.sample())
    layer3.bias_posterior_tensor_fn = lambda d: append(bias_d3, d.sample())
        
    _ = [bnn(tf.random.normal((1, input_dim))) for _ in range(posterior_no)]
    
    layer1.kernel_posterior_tensor_fn = lambda d: kernel_d1[random.randint(0, posterior_no - 1)]
    layer1.bias_posterior_tensor_fn = lambda d: bias_d1[random.randint(0, posterior_no - 1)]
    layer2.kernel_posterior_tensor_fn = lambda d: kernel_d2[random.randint(0, posterior_no - 1)]
    layer2.bias_posterior_tensor_fn = lambda d: bias_d2[random.randint(0, posterior_no - 1)]
    layer3.kernel_posterior_tensor_fn = lambda d: kernel_d3[random.randint(0, posterior_no - 1)]
    layer3.bias_posterior_tensor_fn = lambda d: bias_d3[random.randint(0, posterior_no - 1)]
    
def append(xs, x):
    xs.append(x)
    return x

Table of Contents
* [Occupancy Detection Data Set (Classification)](#Occupancy-Detection-Data-Set-(Classification))
* [EMG Data for Gestures Data Set (Classification)](#EMG-Data-for-Gestures-Data-Set-(Classification))
* [Localization Data for Person Activity Data Set (Classification)](#Localization-Data-for-Person-Activity-Data-Set-(Classification))

## [Occupancy Detection Data Set](https://archive.ics.uci.edu/ml/datasets/Occupancy+Detection+) (Classification)

In [8]:
OCC_FLAG = "occupancy"
OCC_INPUT_DIM = 5
OCC_CLASS_NO = 2

In [9]:
train_file_names, test_file_names = ["datatraining.txt"], ["datatest.txt", "datatest2.txt"]

def occupancy_data(file_name):
    occupancy_path = DATASET_PATH + "%s/" % OCC_FLAG
    x, y = [], []
    with open(occupancy_path + file_name, 'r') as csvfile:
        file_reader = csv.reader(csvfile, delimiter=',')
        header = next(file_reader)
        for row in file_reader:
            x.append(tf.constant([float(v) for v in row[2: 7]]))
            y.append(tf.constant([int(row[7])]))
    return x, y

In [10]:
x_train, y_train = occupancy_data(train_file_names[0])
x_test, y_test = tuple(zip(*[occupancy_data(test_file_name) for test_file_name in test_file_names]))
x_test, y_test = tf.concat(x_test, 0), tf.concat(y_test, 0)

print("Train dataset: %d, Testset size: %d" % (len(x_train), len(x_test)))
print("x sample: ", x_train[0].numpy())
print("y sample: ", y_train[0].numpy())

Train dataset: 8143, Testset size: 12417
x sample:  [2.3180000e+01 2.7271999e+01 4.2600000e+02 7.2125000e+02 4.7929883e-03]
y sample:  [1]


### Train Models

In [11]:
epochs, batch = 30, 3

bnn = create_bnn(OCC_CLASS_NO)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_ftn = lambda model, xs, ys: bnn_categorical_loss_ftn(model, xs, ys, len(x_train))
train_loss = tf.keras.metrics.Mean(name='train_loss')
nll_loss = tf.keras.metrics.Mean(name='nll_loss')

for epoch in range(epochs):
    time1 = time.time()
    train_epoch(bnn, x_train, y_train, batch, optimizer, loss_ftn, train_loss, nll_loss)
    time2 = time.time()
        
    if (epoch + 1) % 1 == 0:
        template = '({} sec) Epoch {}, Loss: {}, NLL: {}'
        print(template.format(time2 - time1,
                              epoch + 1,
                              train_loss.result(),
                              nll_loss.result()))
        train_loss.reset_states()
        nll_loss.reset_states()
        
        
bnn.save_weights("%s-%s" % (BNN_PATH, OCC_FLAG), save_format='tf')

Instructions for updating:
Please use `layer.add_weight` method instead.


KeyboardInterrupt: 

### Test Models

Test MU:

In [12]:
batch, sample_no = 1, 30

bnn = create_bnn(OCC_CLASS_NO)
bnn.load_weights("%s-%s" % (BNN_PATH, OCC_FLAG))

sampling = lambda model, xs: [(tf.one_hot(bnn_categorical(model, xs).sample(), OCC_CLASS_NO), 1 / sample_no) for _ in range(sample_no)]
msec_ftn = lambda sws, ys: msec_from_samples(ys, OCC_CLASS_NO, sws)
msec_90_ftn = lambda sws, ys: msec_from_samples(ys, OCC_CLASS_NO, sws, 0.9)
cce_ftn = lambda sws, ys: cce_from_samples(ys, sws)
confidence_ftn = lambda sws, ys: confidence_from_samples(sws)
correct_ftn = lambda sws, ys: correct_from_samples(sws, ys).numpy()
estimation_ftn = lambda sws, ys: tf.math.argmax(sum([s * w for s, w in sws]), axis=-1).numpy()

runtime_metric = tf.keras.metrics.Mean(name='runtime')
mse_metric = tf.keras.metrics.Mean(name='mse')
mse_90_metric = tf.keras.metrics.Mean(name='mse-90')
cce_metric = tf.keras.metrics.Mean(name='cce')
confidence_acc = []
correct_acc = []
estimation_acc = []

loss_metrics = (msec_ftn, mse_metric), (msec_90_ftn, mse_90_metric), (cce_ftn, cce_metric), (confidence_ftn, confidence_acc), (correct_ftn, correct_acc), (estimation_ftn, estimation_acc)

test(bnn, x_test, y_test, batch, sampling, runtime_metric, *loss_metrics)

template = 'MU Runtime: {} (ms), MSE: {}, MSE-90: {}, NLL: {}, Confidence: {}'
print(template.format(runtime_metric.result() * 1000, 
                      mse_metric.result(),
                      mse_90_metric.result(),
                      cce_metric.result(),
                      np.mean(confidence_acc),
                      len(np.argwhere(np.array(confidence_acc) > 0.9)) / len(x_test)))
runtime_metric.reset_states()
mse_metric.reset_states()
mse_90_metric.reset_states()
cce_metric.reset_states()

(1.2 sec) 0 th iteration: 6.3948846e-14, 6.3948846e-14, 1.1920928e-07, non-tf.keras.metrics, non-tf.keras.metrics, non-tf.keras.metrics


KeyboardInterrupt: 

Test DBNN:

In [13]:
batch = 1

dbnn_op = create_bnn(OCC_CLASS_NO)
dbnn_op.load_weights("%s-%s" % (BNN_PATH, OCC_FLAG))
freeze(dbnn_op, OCC_INPUT_DIM, POSTERIOR_NO)

x_dims, y_dims = [OCC_INPUT_DIM], [OCC_CLASS_NO]
och_x = OCH(**och_x_params, dims=x_dims, hash_no=1)
och_y = OCH(**och_y_params, dims=y_dims, hash_no=1)
och_x_1 = OCH(**och_x1_params, dims=x_dims[1:], hash_no=3, cs=[])
dbnn = DBNN(lambda x: tf.one_hot(bnn_categorical(dbnn_op, x[0]).sample(), OCC_CLASS_NO), och_x_1, och_x, och_y)

def sampling(model, xs):
    model.update(xs)
    return model.och_y.cws()

msec_ftn = lambda sws, ys: msec_from_samples(ys, OCC_CLASS_NO, sws)
msec_90_ftn = lambda sws, ys: msec_from_samples(ys, OCC_CLASS_NO, sws, 0.9)
cce_ftn = lambda sws, ys: cce_from_samples(ys, sws)
confidence_ftn = lambda sws, ys: confidence_from_samples(sws)

runtime_metric = tf.keras.metrics.Mean(name='runtime')
mse_metric = tf.keras.metrics.Mean(name='mse')
mse_90_metric = tf.keras.metrics.Mean(name='mse-90')
cce_metric = tf.keras.metrics.Mean(name='cce')
confidence_acc = []

loss_metrics = (msec_ftn, mse_metric), (msec_90_ftn, mse_90_metric), (cce_ftn, cce_metric), (confidence_ftn, confidence_acc)

test(dbnn, x_test, y_test, batch, sampling, runtime_metric, *loss_metrics)

template = 'DBNN Runtime: {} (ms), MSE: {}, MSE-90: {}, NLL: {}, Confidence: {}, Coverage-90: {}'
print(template.format(runtime_metric.result() * 1000, 
                      mse_metric.result(),
                      mse_90_metric.result(),
                      cce_metric.result(),
                      np.mean(confidence_acc), 
                      len(np.argwhere(np.array(confidence_acc) > 0.9)) / len(x_test)))
runtime_metric.reset_states()
mse_metric.reset_states()
mse_90_metric.reset_states()
cce_metric.reset_states()

(0.0 sec) 0 th iteration: 0.0, 0.0, 1.1920928e-07, non-tf.keras.metrics
(4.0 sec) 100 th iteration: 0.07424769, 0.022211295, 0.2681527, non-tf.keras.metrics


KeyboardInterrupt: 

## [EMG Data for Gestures Data Set](https://archive.ics.uci.edu/ml/datasets/EMG+data+for+gestures) (Classification)

In [14]:
EMG_FLAG = "emg"
EMG_INPUT_DIM = 8
EMG_CLASS_NO = 8

In [15]:
def emg_data(dataset_no):
    emg_path = DATASET_PATH + "%s/" % EMG_FLAG
    channels, labels = [], []
    dat_path = emg_path + "%02d/" % dataset_no
    for file_name in os.listdir(dat_path):
        with open(dat_path + file_name, 'r') as csvfile:
            file_reader = csv.reader(csvfile, delimiter='\t')
            header = next(file_reader)
            for row in file_reader:
                if len(row) == 10:
                    channels.append(tf.constant([float(v) for v in row[1:9]]))
                    labels.append(tf.constant([int(row[9])]))
    return channels, labels

In [19]:
x_train, y_train = tuple(zip(*[emg_data(i) for i in range(1, 33)]))
x_train, y_train = tf.concat(x_train, 0), tf.concat(y_train, 0)
x_test, y_test = tuple(zip(*[emg_data(i) for i in range(33, 37)]))
x_test, y_test = tf.concat(x_test, 0), tf.concat(y_test, 0)

print("train dataset: %d" % len(x_train))
print("x sample: ", x_train[0].numpy())
print("y sample: ", y_train[0].numpy())
print("test dataset: %d" % len(x_test))
print("x sample: ", x_test[0].numpy())
print("y sample: ", y_test[0].numpy())

train dataset: 3793345
x sample:  [ 1.e-05 -2.e-05 -1.e-05 -3.e-05  0.e+00 -1.e-05  0.e+00 -1.e-05]
y sample:  [0]
test dataset: 444562
x sample:  [ 0.0e+00 -2.0e-05 -8.0e-05 -1.7e-04 -1.0e-05  2.0e-05 -2.0e-05 -2.0e-05]
y sample:  [0]


### Train Models

Train BNN:

In [20]:
epochs, batch = 10, 3

bnn = create_bnn(EMG_CLASS_NO)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_ftn = lambda model, xs, ys: bnn_categorical_loss_ftn(model, xs, ys, len(x_train))
train_loss = tf.keras.metrics.Mean(name='train_loss')
nll_loss = tf.keras.metrics.Mean(name='nll_loss')

for epoch in range(epochs):
    time1 = time.time()
    train_epoch(bnn, x_train, y_train, batch, optimizer, loss_ftn, train_loss, nll_loss)
    time2 = time.time()
        
    if (epoch + 1) % 1 == 0:
        template = '({} sec) Epoch {}, Loss: {}, NLL: {}'
        print(template.format(time2 - time1,
                              epoch + 1,
                              train_loss.result(),
                              nll_loss.result()))
        train_loss.reset_states()
        nll_loss.reset_states()
        
bnn.save_weights("%s-%s" % (BNN_PATH, EMG_FLAG), save_format='tf')

KeyboardInterrupt: 

### Test Models

Test MU:

In [21]:
batch, sample_no = 1, 30

bnn = create_bnn(EMG_CLASS_NO)
bnn.load_weights("%s-%s" % (BNN_PATH, EMG_FLAG))

sampling = lambda model, xs: [(tf.one_hot(bnn_categorical(model, xs).sample(), EMG_CLASS_NO), 1 / sample_no) for _ in range(sample_no)]
msec_ftn = lambda sws, ys: msec_from_samples(ys, EMG_CLASS_NO, sws)
msec_90_ftn = lambda sws, ys: msec_from_samples(ys, EMG_CLASS_NO, sws, 0.9)
cce_ftn = lambda sws, ys: cce_from_samples(ys, sws)
confidence_ftn = lambda sws, ys: confidence_from_samples(sws)
correct_ftn = lambda sws, ys: correct_from_samples(sws, ys)
estimation_ftn = lambda sws, ys: tf.math.argmax(sum([s * w for s, w in sws]), axis=-1) if sws else -1

runtime_metric = tf.keras.metrics.Mean(name='runtime')
mse_metric = tf.keras.metrics.Mean(name='mse')
mse_90_metric = tf.keras.metrics.Mean(name='mse-90')
cce_metric = tf.keras.metrics.Mean(name='cce')
confidence_acc = []
correct_acc = []
estimation_acc = []

loss_metrics = (msec_ftn, mse_metric), (msec_90_ftn, mse_90_metric), (cce_ftn, cce_metric), (confidence_ftn, confidence_acc), (correct_ftn, correct_acc), (estimation_ftn, estimation_acc)

test(bnn, x_test, y_test, batch, sampling, runtime_metric, *loss_metrics)

template = 'MU Runtime: {} (ms), MSE: {}, MSE-90: {}, NLL: {}, Confidence: {}±{}(%), Accuracy: {} (%), Estimations: {}, Coverage-90: {}'
print(template.format(runtime_metric.result() * 1000, 
                      mse_metric.result(),
                      mse_90_metric.result(),
                      cce_metric.result(),
                      np.mean(confidence_acc) * 100, 
                      np.std(confidence_acc) * 100,
                      np.mean(correct_acc) * 100,
                      np.bincount(np.array(estimation_acc).flatten()), 
                      len(np.argwhere(np.array(confidence_acc) > 0.9)) / len(x_test)))
runtime_metric.reset_states()
mse_metric.reset_states()
mse_90_metric.reset_states()
cce_metric.reset_states()

(0.3 sec) 0 th iteration: 0.03, 0.0, 0.51082605, non-tf.keras.metrics, non-tf.keras.metrics, non-tf.keras.metrics
(14.9 sec) 100 th iteration: 0.028712858, 0.0, 0.45603973, non-tf.keras.metrics, non-tf.keras.metrics, non-tf.keras.metrics


KeyboardInterrupt: 

Test DBNN:

In [None]:
batch = 1

dbnn_op = create_bnn(EMG_CLASS_NO)
dbnn_op.load_weights("%s-%s" % (BNN_PATH, EMG_FLAG))
freeze(dbnn_op, EMG_INPUT_DIM, POSTERIOR_NO)

x_dims, y_dims = [EMG_INPUT_DIM], [EMG_CLASS_NO]
och_x = OCH(**och_x_params, dims=x_dims, hash_no=1)
och_y = OCH(**och_y_params, dims=y_dims, hash_no=1)
och_x_1 = OCH(**och_x1_params, dims=x_dims[1:], hash_no=3, cs=[])
dbnn = DBNN(lambda x: tf.one_hot(bnn_categorical(dbnn_op, x[0]).sample(), EMG_CLASS_NO), och_x_1, och_x, och_y)

def sampling(model, xs):
    model.update(xs)
    return model.och_y.cws()

msec_ftn = lambda sws, ys: msec_from_samples(ys, EMG_CLASS_NO, sws) if sws else None
msec_90_ftn = lambda sws, ys: msec_from_samples(ys, EMG_CLASS_NO, sws, 0.9) if sws else None
cce_ftn = lambda sws, ys: cce_from_samples(ys, sws) if sws else None
confidence_ftn = lambda sws, ys: confidence_from_samples(sws) if sws else None
correct_ftn = lambda sws, ys: correct_from_samples(sws, ys) if sws else None
estimation_ftn = lambda sws, ys: tf.math.argmax(sum([s * w for s, w in sws]), axis=-1) if sws else None

runtime_metric = tf.keras.metrics.Mean(name='runtime')
mse_metric = tf.keras.metrics.Mean(name='mse')
mse_90_metric = tf.keras.metrics.Mean(name='mse-90')
cce_metric = tf.keras.metrics.Mean(name='cce')
confidence_acc = []
correct_acc = []
estimation_acc = []

loss_metrics = (msec_ftn, mse_metric), (msec_90_ftn, mse_90_metric), (cce_ftn, cce_metric), (confidence_ftn, confidence_acc), (correct_ftn, correct_acc), (estimation_ftn, estimation_acc)

test(dbnn, x_test, y_test, batch, sampling, runtime_metric, *loss_metrics)

template = 'DU Runtime: {} (ms), MSE: {}, MSE-90: {}, NLL: {}, Confidence: {}±{}(%), Accuracy: {} (%), Estimations: {}, Coverage-90: {}'
print(template.format(runtime_metric.result() * 1000, 
                      mse_metric.result(),
                      mse_90_metric.result(),
                      cce_metric.result(),
                      np.mean(confidence_acc) * 100, 
                      np.std(confidence_acc) * 100,
                      np.mean(correct_acc) * 100,
                      np.bincount(np.array(estimation_acc).flatten()), 
                      len(np.argwhere(np.array(confidence_acc) > 0.9)) / len(x_test)))
runtime_metric.reset_states()
mse_90_metric.reset_states()
mse_metric.reset_states()
cce_metric.reset_states()

(0.1 sec) 0 th iteration: 0.0, 0.0, 8.344647e-07, non-tf.keras.metrics, non-tf.keras.metrics, non-tf.keras.metrics
(4.3 sec) 100 th iteration: 0.08339719, 0.0767046, 4.57181, non-tf.keras.metrics, non-tf.keras.metrics, non-tf.keras.metrics


## [Localization Data for Person Activity Data Set](https://archive.ics.uci.edu/ml/datasets/Localization+Data+for+Person+Activity) (Classification)

In [None]:
LOC_FLAG = "localization"
LOC_INPUT_DIM = 4
LOC_CLASS_NO = 11
TAG = ["010-000-024-033", "010-000-030-096", "020-000-033-111", "020-000-032-221"]
ACTIVITY = ["walking","falling","lying down","lying","sitting down","sitting","standing up from lying","on all fours","sitting on the ground","standing up from sitting","standing up from sitting on the ground"]

In [None]:
def localization_data():
    file_path = DATASET_PATH + "%s/%s" % (LOC_FLAG, "ConfLongDemo_JSI.txt")
    xs, ys = [], []
    with open(file_path, 'r') as csvfile:
        file_reader = csv.reader(csvfile, delimiter=',')
        header = next(file_reader)
        for row in file_reader:
            xs.append(tf.constant([TAG.index(row[1])] + [float(v) for v in row[4:7]]))
            ys.append(tf.constant([ACTIVITY.index(row[7])]))
    return xs, ys

In [None]:
xs, ys = localization_data()
x_len, train_len = len(xs), int(len(xs) * 0.9)
x_train, y_train, x_test, y_test = xs[:train_len], ys[:train_len], xs[train_len:], ys[train_len:]

print("train dataset: %d, test datasets: %d" % (len(x_train), len(x_test)))
print("x sample: ", x_train[0].numpy())
print("y sample: ", y_train[0].numpy())

### Train Models

Train BNN:

In [None]:
epochs, batch = 30, 3

bnn = create_bnn(LOC_CLASS_NO)
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
loss_ftn = lambda model, xs, ys: bnn_categorical_loss_ftn(model, xs, ys, len(x_train))
train_loss = tf.keras.metrics.Mean(name='train_loss')
nll_loss = tf.keras.metrics.Mean(name='nll_loss')

for epoch in range(epochs):
    time1 = time.time()
    train_epoch(bnn, x_train, y_train, batch, optimizer, loss_ftn, train_loss, nll_loss)
    time2 = time.time()
        
    if (epoch + 1) % 1 == 0:
        template = 'Epoch {}, Loss: {}, NLL: {}'
        print(template.format(epoch + 1,
                              train_loss.result(),
                              nll_loss.result()))
        train_loss.reset_states()
        nll_loss.reset_states()
        
bnn.save_weights("%s-%s" % (BNN_PATH, LOC_FLAG), save_format='tf')

### Test

Test BNN:

In [None]:
batch, sample_no = 1, 30

bnn = create_bnn(LOC_CLASS_NO)
bnn.load_weights("%s-%s" % (BNN_PATH, LOC_FLAG))

sampling = lambda model, xs: [(tf.one_hot(bnn_categorical(model, xs).sample(), LOC_CLASS_NO), 1 / sample_no) for _ in range(sample_no)]
msec_ftn = lambda sws, ys: msec_from_samples(ys, LOC_CLASS_NO, sws)
msec_90_ftn = lambda sws, ys: msec_from_samples(ys, LOC_CLASS_NO, sws, 0.9)
cce_ftn = lambda sws, ys: cce_from_samples(ys, sws)
confidence_ftn = lambda sws, ys: confidence_from_samples(sws)
correct_ftn = lambda sws, ys: correct_from_samples(sws, ys)
estimation_ftn = lambda sws, ys: tf.math.argmax(sum([s * w for s, w in sws]), axis=-1) if sws else -1

runtime_metric = tf.keras.metrics.Mean(name='runtime')
mse_metric = tf.keras.metrics.Mean(name='mse')
mse_90_metric = tf.keras.metrics.Mean(name='mse-90')
cce_metric = tf.keras.metrics.Mean(name='cce')
confidence_acc = []
correct_acc = []
estimation_acc = []

loss_metrics = (msec_ftn, mse_metric), (msec_90_ftn, mse_90_metric), (cce_ftn, cce_metric), (confidence_ftn, confidence_acc), (correct_ftn, correct_acc), (estimation_ftn, estimation_acc)

test(bnn, x_test, y_test, batch, sampling, runtime_metric, *loss_metrics)

template = 'MU Runtime: {} (ms), MSE: {}, MSE-90: {}, NLL: {}, Confidence: {}±{}(%), Accuracy: {} (%), Estimations: {}, Coverage-90: {}'
print(template.format(runtime_metric.result() * 1000, 
                      mse_metric.result(),
                      mse_90_metric.result(),
                      cce_metric.result(),
                      np.mean(confidence_acc) * 100, 
                      np.std(confidence_acc) * 100,
                      np.mean(correct_acc) * 100,
                      np.bincount(np.array(estimation_acc).flatten()), 
                      len(np.argwhere(np.array(confidence_acc) > 0.9)) / len(x_test)))
runtime_metric.reset_states()
mse_metric.reset_states()
mse_90_metric.reset_states()
cce_metric.reset_states()

Test DBNN:

In [None]:
batch = 1

dbnn_op = create_bnn(LOC_CLASS_NO)
dbnn_op.load_weights("%s-%s" % (BNN_PATH, LOC_FLAG))
freeze(dbnn_op, LOC_INPUT_DIM, POSTERIOR_NO)

x_dims, y_dims = [LOC_INPUT_DIM], [LOC_CLASS_NO]
och_x = OCH(**och_x_params, dims=x_dims, hash_no=1)
och_y = OCH(**och_y_params, dims=y_dims, hash_no=1)
och_x_1 = OCH(**och_x1_params, dims=x_dims[1:], hash_no=3, cs=[])
dbnn = DBNN(lambda x: tf.one_hot(bnn_categorical(dbnn_op, x[0]).sample(), LOC_CLASS_NO), och_x_1, och_x, och_y)

def sampling(model, xs):
    model.update(xs)
    return model.och_y.cws()

msec_ftn = lambda sws, ys: msec_from_samples(ys, LOC_CLASS_NO, sws) if sws else None
msec_90_ftn = lambda sws, ys: msec_from_samples(ys, LOC_CLASS_NO, sws, 0.9) if sws else None
cce_ftn = lambda sws, ys: cce_from_samples(ys, sws) if sws else None
confidence_ftn = lambda sws, ys: confidence_from_samples(sws) if sws else None
correct_ftn = lambda sws, ys: correct_from_samples(sws, ys) if sws else None
estimation_ftn = lambda sws, ys: tf.math.argmax(sum([s * w for s, w in sws]), axis=-1) if sws else None

runtime_metric = tf.keras.metrics.Mean(name='runtime')
mse_metric = tf.keras.metrics.Mean(name='mse')
mse_90_metric = tf.keras.metrics.Mean(name='mse-90')
cce_metric = tf.keras.metrics.Mean(name='cce')
confidence_acc = []
correct_acc = []
estimation_acc = []

loss_metrics = (msec_ftn, mse_metric), (msec_90_ftn, mse_90_metric), (cce_ftn, cce_metric), (confidence_ftn, confidence_acc), (correct_ftn, correct_acc), (estimation_ftn, estimation_acc)

test(dbnn, x_test, y_test, batch, sampling, runtime_metric, *loss_metrics)

template = 'DBNN Runtime: {} (ms), MSE: {}, MSE-90: {}, NLL: {}, Confidence: {}±{}(%), Accuracy: {} (%), Estimations: {}, Coverage-90: {}'
print(template.format(runtime_metric.result() * 1000, 
                      mse_metric.result(),
                      mse_90_metric.result(),
                      cce_metric.result(),
                      np.mean(confidence_acc) * 100, 
                      np.std(confidence_acc) * 100,
                      np.mean(correct_acc) * 100,
                      np.bincount(np.array(estimation_acc).flatten()), 
                      len(np.argwhere(np.array(confidence_acc) > 0.9)) / len(x_test)))
runtime_metric.reset_states()
mse_90_metric.reset_states()
mse_metric.reset_states()
cce_metric.reset_states()

with open(DAT_PATH + "%s-dbnn.csv" % LOC_FLAG, 'w', newline='') as csvfile:
    csvw = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
    for confidence_batch, correct_batch, estimation_batch in zip(confidence_acc, correct_acc, estimation_acc):
        for confidence, correct, estimation in zip(confidence_batch, correct_batch, estimation_batch):
            csvw.writerow([confidence.numpy(), correct[0].numpy() if correct is not None else None, estimation.numpy()])