$$
\newcommand{\bx}{\mathbf{x}}
\newcommand{\bv}{\mathbf{v}}
\newcommand{\by}{\mathbf{y}}
\newcommand{\bz}{\mathbf{z}}
\newcommand{\E}{\mathbb{E}}
\newcommand{\V}{\mathbb{V}}
\newcommand{\R}{\mathbb{R}}
\newcommand{\calN}{\mathcal{N}}
$$

## Simple implementation of a Helmholtz machine trained using the wake-sleep algorithm

In [5]:
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from scipy.special import expit as sigmoid

sns.set(style="white", color_codes=True)

np.random.seed(42)

In [1]:
class World:
    def __init__(self):
        self._obs = np.array([
            np.array([[1, 0, 0],
                      [1, 0, 0],
                      [1, 0, 0],]).ravel(),
            np.array([[0, 1, 0],
                      [0, 1, 0],
                      [0, 1, 0],]).ravel(),
            np.array([[0, 0, 1],
                      [0, 0, 1],
                      [0, 0, 1],]).ravel(),
            np.array([[1, 1, 0],
                      [1, 1, 0],
                      [1, 1, 0],]).ravel(),
            np.array([[1, 0, 1],
                      [1, 0, 1],
                      [1, 0, 1],]).ravel(),
            np.array([[0, 1, 1],
                      [0, 1, 1],
                      [0, 1, 1],]).ravel(),
            np.array([[1, 1, 1],
                      [0, 0, 0],
                      [0, 0, 0],]).ravel(),
            np.array([[0, 0, 0],
                      [1, 1, 1],
                      [0, 0, 0],]).ravel(),
            np.array([[0, 0, 0],
                      [0, 0, 0],
                      [1, 1, 1],]).ravel(),
            np.array([[1, 1, 1],
                      [1, 1, 1],
                      [0, 0, 0],]).ravel(),
            np.array([[1, 1, 1],
                      [0, 0, 0],
                      [1, 1, 1],]).ravel(),
            np.array([[0, 0, 0],
                      [1, 1, 1],
                      [1, 1, 1],]).ravel(),
        ])
        self._probs = np.array([2/18, 2/18, 2/18, 2/18, 2/18, 2/18, 1/18, 1/18, 1/18, 1/18, 1/18, 1/18])
        
    def sample(self):
        idx = np.random.choice(self._obs.shape[0], p=self._probs)
        return np.expand_dims(self._obs[idx], axis=0)
    
    def valid_observations(self):
        return self._obs
    
    def kl(self, other_probs):
        return np.sum(self._probs * np.log(self._probs)) - np.sum(self._probs * np.log(other_probs))

In [2]:
class SigmoidLayer:
    def __init__(self, input_size, output_size, include_bias=False):
        self._i = input_size
        self._o = output_size
        self._include_bias = include_bias
        
        self._output = None
        self._initialise_params()
    
    def _initialise_params(self):
        self._W = np.zeros((self._o, self._i))
        if self._include_bias:
            self._b = np.zeros((self._o, 1))
        
    def _compute_activities(self, x):
        assert len(x.shape) == 2, "received input of shape {}".format(x.shape)
        assert x.shape[1] == self._i, "received input of shape {}, expected 2nd dim {}".format(x.shape, self._i)
        
        activity = x @ self._W.T
        if self._include_bias:
            activity += self._b.T
        return activity
    
    def _probs(self, x):
        return sigmoid(self._compute_activities(x))
    
    def _sample(self, x):
        probs = self._probs(x)
        return np.random.uniform(size=probs.shape) < probs
        
    def __call__(self, x):
        self._input = x
        self._output = self._sample(x)
        return self._output
    
    def state(self):
        assert self._input is not None
        assert self._output is not None

        return (self._input, self._output)
    
    def learn(self, true_input, target_output, learning_rate):
        prediction_error = target_output - self._probs(true_input)
        self._W += learning_rate * prediction_error.T @ true_input
        if self._include_bias:
            self._b += learning_rate * prediction_error.T @ np.ones((true_input.shape[0], 1))

In [29]:
class SBN:
    def __init__(self, layer_sizes, include_bias_layer=False):
        assert len(layer_sizes) >= 2
        self.layer_sizes = layer_sizes
        self.include_bias_layer = include_bias_layer
        self._build_network()
        
    def _build_network(self):
        self.layers = []
        if self.include_bias_layer:
            self.layers.append(SigmoidLayer(1, self.layer_sizes[0], True))
        for input_size, output_size in zip(self.layer_sizes[:-1], self.layer_sizes[1:]):
            self.layers.append(SigmoidLayer(input_size, output_size, True))
        
    def __call__(self, x):
        if self.include_bias_layer and x is None:
            x = np.zeros((1, 1))
        for layer in self.layers:
            x = layer(x)
        return x
    
    def state(self):
        ret = [self.layers[0].state()[0]]
        for layer in self.layers:
            ret.append(layer.state()[1])
        return ret
    
    def learn(self, true_inputs, targets, learning_rate):
        if self.include_bias_layer:
            true_inputs = [np.zeros((true_inputs[0].shape[0], 1))] + true_inputs
        assert len(self.layers) == len(true_inputs) == len(targets)
        
        for layer, true_input, target in zip(self.layers, true_inputs, targets):
            layer.learn(true_input, target, learning_rate)

In [30]:
class HM:
    def __init__(self, layer_sizes):
        assert len(layer_sizes) >= 2
        self._layer_sizes = layer_sizes
        self._generator_network = SBN(layer_sizes, True)
        self._recognition_network = SBN(layer_sizes[::-1])
        
    def _wake(self, observation, learning_rate):
        self._recognition_network(observation)
        targets = self._recognition_network.state()[::-1]
        self._generator_network.learn(targets[:-1], targets, learning_rate)
        
    def _sleep(self, learning_rate):
        self._generator_network(None)
        targets = self._generator_network.state()[1:][::-1]
        self._recognition_network.learn(targets[:-1], targets[1:], learning_rate)
            
    def _wake_sleep(self, observation, learning_rate):
        self._wake(observation, learning_rate)
        self._sleep(learning_rate)
        
    def learn(self, world, epochs, learning_rate):
        for _ in range(epochs):
            self._wake_sleep(world.sample(), learning_rate)
            
    def sample(self, num_samples=1):
        return self._generator_network(np.zeros((num_samples, 1)))
    
    def evaluate(self, world, num_samples=10000):
        """Computes KL divergence between world and model distributions."""
        assert num_samples > 0
        
        valid_obs = world.valid_observations()
        
        samples = self.sample(num_samples)
        
        hadamard_distances = valid_obs[:, None] - samples
        probs = np.mean(np.logical_not(np.any(hadamard_distances, axis=2)), axis=1)
        return world.kl(probs)

In [31]:
world = World()

In [32]:
hm = HM([1, 6, 9])

In [33]:
for _ in range(10):
    hm.learn(world, 5000, 0.05)
    kl = hm.evaluate(world, 10000)
    print("KL divergence: {}".format(kl))

KL divergence: 2.1264738787155206
KL divergence: 1.055114500264255
KL divergence: 0.7317817316180122
KL divergence: 0.5416706172079699
KL divergence: 0.4208250744547746
KL divergence: 0.35035638489690957
KL divergence: 0.3060245302060456
KL divergence: 0.266122195328812
KL divergence: 0.24376943987078192
KL divergence: 0.21922929582550488
