In [1]:
#HSIC Bottleneck

In [2]:
from __future__ import division
from __future__ import print_function
from __future__ import absolute_import
from __future__ import unicode_literals

In [3]:
from keras import optimizers

Using TensorFlow backend.


In [4]:
import keras.backend as K
import tensorflow as tf

In [5]:
def kernel_matrix(x, sigma):
    ndim = K.ndim(x)
    x1 = K.expand_dims(x, 0)
    x2 = K.expand_dims(x, 1)
    axis = tuple(range(2, ndim+1))
    return K.exp(-0.5*K.sum(K.pow(x1-x2, 2), axis=axis) / sigma ** 2)

In [6]:
def hsic(Kx, Ky, m):
    Kxy = K.dot(Kx, Ky)
    h = tf.linalg.trace(Kxy) / m ** 2 + K.mean(Kx) * K.mean(Ky) - 2 * K.mean(Kxy) / m
    return h * (m / (m-1))**2

In [7]:
class HSICBottleneckTrained(object):
    def __init__(self, model, batch_size, lambda_0, sigma):
        self.batch_size = batch_size
        input_x = model._feed_inputs[0]
        input_y = model._feed_targets[0]

        Kx = kernel_matrix(input_x, sigma)
        Ky = kernel_matrix(input_y, sigma)


        param2grad = {
        }
        trainable_params = []
        total_loss = 0.
        
        for layer in model.layers:
            if layer.name.startswith("hsic"):
                params = layer.trainable_weights
                if not params:
                    continue
                hidden_z = layer.output

                Kz = kernel_matrix(hidden_z, sigma)
                loss = hsic(Kz, Kx, batch_size) - lambda_0 * hsic(Kz, Ky, batch_size)
                total_loss += loss
                trainable_params.extend(params)
                grads = K.gradients(loss, params)
                for p, g in zip(params, grads):
                    param2grad[p.name] = g
            else:
                layer.trainable = False
                
        model._collected_trainable_weights = trainable_params
        model.total_loss = total_loss
        optim = model.optimizer
        
        def get_gradients(loss, params):
            grads = [param2grad[p.name] for p in params]
            if hasattr(self, 'clipnorm') and self.clipnorm > 0:
                norm = K.sqrt(sum([K.sum(K.square(g)) for g in grads]))
                grads = [clip_norm(g, self.clipnorm, norm) for g in grads]
            if hasattr(self, 'clipvalue') and self.clipvalue > 0:
                grads = [K.clip(g, -self.clipvalue, self.clipvalue) for g in grads]
            return grads
        
        optim.get_gradients = get_gradients

        self.model = model

    def reshape(self, x):
        shape = list(K.int_shape(x))
        shape[0] = self.batch_size
        return K.reshape(x, tuple(shape))

    def __call__(self):
        return self.model
    

In [8]:
class PostTrained(object):
    def __init__(self, model):
        for layer in model.layers:
            if layer.name == "output_layer":                
                layer.trainable = True
            else:
                layer.trainable = False
        self.model = model

    def __call__(self):
        return model

In [9]:
import keras.layers as L
from keras import models
import numpy as np

In [10]:
if __name__ == "__main__":

    X = np.random.standard_normal((256*400, 25))
    y = np.uint8(np.sum(X ** 2, axis=-1) > 25.)
    num_train = 256 * 360
    X_train = X[:num_train, :]
    y_train = y[:num_train]
    X_test  = X[num_train:, :]
    y_test  = y[num_train:]

    input_x = L.Input(shape=(25,))
    z1      = L.Dense(40, name="hsic_dense_1", activation="relu")(input_x)
    z2      = L.Dense(64, name="hsic_dense_2", activation="relu")(z1)
    z2      = L.Dropout(0.2)(z2)
    z3      = L.Dense(32, name="hsic_dense_3", activation="relu")(z2)
    output_x = L.Dense(1, name="output_layer", activation="sigmoid")(z3)

    model = models.Model(inputs=input_x, outputs=output_x)


    model.compile(optimizers.SGD(0.001), loss="binary_crossentropy", metrics=["acc"])
    model = HSICBottleneckTrained(model, batch_size=256, lambda_0=100., sigma=10.)()
    model.fit(X_train, y_train, epochs=50, validation_data=(X_test, y_test), batch_size=256)


    model = PostTrained(model)()
    model.compile(optimizers.SGD(0.1), loss="binary_crossentropy", metrics=["acc"])
    model.fit(X_train, y_train, epochs=10, validation_data=(X_test, y_test), batch_size=256)
    
    
del division
del print_function
del absolute_import
del unicode_literals

Train on 92160 samples, validate on 10240 samples
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
Train on 92160 samples, validate on 10240 samples
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10


Epoch 10/10


In [None]:
# Train the model, run the model against validation data set, compare/evaluate the output results.
# Training Dataset: The sample of data used to fit the model.
# Validation Dataset: The validation dataset is used to determine when training should stop in order to avoid overfitting.


# Epoch: An Epoch refers to one cycle or iteration through the full training dataset or batches.
# us or µs means Microseconds
# Step: A training step means using one batch size of training data to train the model.


# val_loss is the value of function for your cross-validation data
# loss is the value of cost function for training data

# acc is the accuracy of a batch of training data
# val_acc is the accuracy of a batch of testing data.
