# Restricted Boltzmann Machine

<img src="files/RBM.png" />

The energy function is defined as follows:

$
\begin{align}
E(\textbf{v}, \textbf{h}) = -\sum\limits_i a_i v_i - \sum\limits_j b_j h_j - \sum\limits_i \sum\limits_j v_i w_{ij} h_j
\end{align}
$

Furthermore, the probability distributions are the following (with $\sigma$ being the logistic sigmoid):

$
\begin{align}
P(\textbf{v}) &= \frac{1}{Z} \sum\limits_{\textbf{h}} e^{-E(\textbf{v}, \textbf{h})}\\
P(h_j = \mbox{on} | \textbf{v}) &= \sigma(b_j + \sum\limits_{i=1}^m v_i w_{ij})\\
P(v_i = \mbox{on} | \textbf{h}) &= \sigma(a_i + \sum\limits_{j=1}^n h_j w_{ij})
\end{align}
$

In [227]:
% matplotlib inline
import numpy as np
import matplotlib.pyplot as plt
import itertools
import random

class RBM:
    """
    Model for a Restricted Boltzmann Machine.
    """
    
    def __init__(self, num_visible, num_hidden, w=None, a=None, b=None):
        """
        Initialize the Restricted Boltzmann Machine (RBM).
        
        :param num_visible: Number of visible cells.
        :param num_hidden: Number of hidden cells.
        :param w: Optional: initial weights between the visible units and the hidden units; (num_visible x num_hidden matrix).
        :param a: Optional: initial bias for the visible units; (1 x num_visible) matrix.
        :param b: Optional: initial bias for the hidden units; (1 x num_hidden) matrix.
        """
        self.num_visible, self.num_hidden, self.w, self.a, self.b = num_visible, num_hidden, w, a, b
        self.w = np.random.normal(0, 1, (num_visible, num_hidden)) if w is None else w
        self.a = np.random.normal(0, 1, (1, num_visible)) if a is None else a
        self.b = np.random.normal(0, 1, (1, num_hidden)) if b is None else b
        
    @staticmethod
    def logistic_sigmoid(x):
        return 1. / (1. + np.exp(-x))
    
    def calculate_normalizing_constant_bruteforce(self):
        Z = 0
        for v in itertools.product([-1, 1], repeat=self.num_visible):
            for h in itertools.product([-1, 1], repeat=self.num_hidden):
                Z += np.exp(-self.calculate_energy(np.matrix(v), np.matrix(h)))
        return Z
    
    def calculate_probability(self, v, normalizing_constant):
        p = 0.
        for h in itertools.product([-1, 1], repeat=self.num_hidden):
            p += np.exp(-self.calculate_energy(v, np.matrix(h)))
        return p / float(normalizing_constant)
    
    def calculate_energy(self, v, h):
        """
        Calculates E(v, h).
        
        :param v: (1 x num_visible) matrix containing the state of the visible units.
        :param h: (1 x num_hidden) matrix containing the state for the hidden units.
        """
        return -np.dot(self.a, v.T) - np.dot(self.b, h.T) - np.dot(v, np.dot(self.w, h.T))
    
    def calculate_probabilities_h_given_v(self, v):
        """
        Calculates p(h_j=1|v).
        
        :param v: (1 x num_visible) matrix containing the state of the visible units.
        """
        return RBM.logistic_sigmoid(self.b + np.dot(v, self.w))
    
    def calculate_probabilities_v_given_h(self, h):
        """
        Calculates p(v_i=1|h).
        
        :param h: (1 x num_hidden) matrix containing the state of the hidden units.
        """
        return RBM.logistic_sigmoid(self.a + np.dot(h, self.w.T))
    
    def train(self, samples, learning_rate=0.1):
        """
        Train using the Contrastive Divergence (CD-1) procedure.
        
        :param samples: (num_samples x num_visible) matrix containing num_samples training samples.
        :param learning_rate: The learning rate used in the CD-1 procedure.
        """
        # Step 1: Take a training sample v, compute the probabilities of the hidden units and sample a hidden activation vector h from this probability distribution.
        v = samples[np.random.randint(0, samples.shape[0]), :]
        _, h = self.generate_sample(v, None, 1)
        # Step 2: Compute the outer product of v and h and call this the positive gradient.
        pos_gradient = np.outer(v, h)
        # Step 3: From h, sample a reconstruction v' of the visible units, then resample the hidden activations h' from this. (Gibbs sampling step).
        v2, _ = self.generate_sample(v, h, 2)
        _, h2 = self.generate_sample(v2, h, 1)
        # Step 4: Compute the outer product of v' and h' and call this the negative gradient.
        neg_gradient = np.outer(v2, h2)
        # Step 5: Now apply the updates.
        dw = learning_rate * (pos_gradient - neg_gradient)
        da = learning_rate * (v - v2)
        db = learning_rate * (h - h2)
        self.w += dw
        self.a += da
        self.b += db
        return dw, da, db
    
    def generate_sample(self, v=None, h=None, num_rounds=200):
        current_phase = 0 if v is not None else 1
        v = v if v is not None else np.random.binomial(1, 0.5, (1, self.num_visible)) * 2 - 1
        h = h if h is not None else np.random.binomial(1, 0.5, (1, self.num_hidden)) * 2 - 1
        for _ in range(num_rounds):
            if current_phase == 0:
                p = self.calculate_probabilities_h_given_v(v)
                h = np.random.binomial(1, p) * 2 - 1
            else:
                p = self.calculate_probabilities_v_given_h(h)
                v = np.random.binomial(1, p) * 2 - 1
            current_phase = (current_phase + 1) % 2
        return v, h

In [239]:
model = RBM(2, 2)
for _ in range(10000):
    model.train(np.matrix([[1, -1], [-1, 1]]), 0.1)
    
print(model.w)
print(model.a)
print(model.b)

[[-6.0353503  -4.47074231]
 [ 5.78564685  4.04338838]]
[[-0.43825892  0.04460119]]
[[ 0.50491522  0.20497371]]


In [240]:
Z = model.calculate_normalizing_constant_bruteforce()
print(model.calculate_probability(np.matrix([-1, -1]), Z))
print(model.calculate_probability(np.matrix([-1, 1]), Z))
print(model.calculate_probability(np.matrix([1, -1]), Z))
print(model.calculate_probability(np.matrix([1, 1]), Z))

[[  3.80517118e-09]]
[[ 0.91571475]]
[[ 0.08428525]]
[[  1.16966751e-09]]


In [182]:
v_log = []
for _ in range(1000):
    v, _ = model.generate_sample()
    v_log.append([v[0, 0], v[0, 1]])
v_log = np.matrix(v_log)

import operator
counts = {}
for r in range(v_log.shape[0]):
    pattern = str(v_log[r, :].tolist())
    if not pattern in counts:
        counts[pattern] = 0
    counts[pattern] += 1
    
for pattern, count in sorted(counts.items(), key=operator.itemgetter(1), reverse=True):
    print("%s:\t%d" % (pattern, count))

[[1, -1]]:	525
[[-1, 1]]:	468
[[1, 1]]:	5
[[-1, -1]]:	2


<itertools.product at 0x7f829bb57ab0>