# Restricted Boltzmann Machines with `scikit-learn`

Let's spend today looking at Restricted Boltzmann Machines (RBM's).  

Recall that RBM's learn to associate the ground states of an energy function with the training data.  This is similar to the behavior of generative models like (variational) AutoEncoders, GAN's, and so on that learn latent variables that capture the features of the training data.  In the case of RBM's, the hidden nodes play the role of the latent space, while the visible nodes represent both the input and output layers.  

We'll start by implementing our own simple RBM, but then we'll use the `scikit-learn` implementation of RBM's to look at denoising and image reconstruction.  

## Simple RBM

In [None]:
%matplotlib inline
import numpy as np
import scipy 
import matplotlib.pyplot as plt
import matplotlib
from sklearn.neural_network import BernoulliRBM
from sklearn.model_selection import train_test_split

from joblib import dump, load

from tensorflow.keras.datasets import mnist

matplotlib.rcParams['figure.dpi']=300 # highres display

from IPython.display import clear_output
from time import sleep

from torchvision.datasets import MNIST

### Helper functions

Perform a single step of the Markov chain, going from visible units $v$ to hidden units $h$, according to biases $b$ and weights $w$.
    
$$z_j = b_j + \sum_i v_i w_{ij}$$
    
and 
    
$$P(h_j=1|v) = \sigma(z_j)$$
    
Note: you can go from $h$ to $v$, by inserting instead of $v$ the $h$, instead of $b$ the $c$, and instead of $w$ the transpose of $w$.

In [None]:
def BoltzmannStep(v,b,w,do_random_sampling=True):
    batchsize=np.shape(v)[0]
    hidden_dim=np.shape(w)[1]
    z=b+np.dot(v,w)
    
    # sigmoid!
    P=1/(np.exp(-z)+1)
    
    # now, the usual trick to obtain 0 or 1 according
    # to a given probability distribution:
    # just produce uniform (in [0,1]) random numbers and
    # check whether they are below the cutoff given by P
    if do_random_sampling:
        p=np.random.uniform(size=[batchsize,hidden_dim])
        return(np.array(p<=P,dtype='int'))
    else:
        return(P) # no binary random output, just the prob. distribution itself!

The function below performs one sequence of steps $v \to h \to v' \to h'$ of a Boltzmann machine, with the given weights $w$ and biases $b$ and $c$.

All the arrays have a shape `[batchsize,num_neurons]` (where `num_neurons` is `num_visible` for $v$ and `num_hidden` for $h$).

You can set `drop_h_prime` to `True` if you want to use this routine to generate arbitrarily long sequences by calling it repeatedly (then don't use $h'$).

In [None]:
def BoltzmannSequence(v,b,c,w,
                      drop_h_prime=False,
                      do_random_sampling=True,
                      do_not_sample_h_prime=False,
                      do_not_sample_v_prime=False):

    # update h based on v
    h=BoltzmannStep(v,c,w,do_random_sampling=do_random_sampling)

    # sample v based on h, get v'
    if do_not_sample_v_prime:
        v_prime=BoltzmannStep(h,b,np.transpose(w),do_random_sampling=False)
    else:
        v_prime=BoltzmannStep(h,b,np.transpose(w),do_random_sampling=do_random_sampling)
    
    # sample h based on v', get h'
    if not drop_h_prime:
        if do_not_sample_h_prime: # G. Hinton recommends not sampling in the v'->h' step (reduces noise)
            h_prime=BoltzmannStep(v_prime,c,w,do_random_sampling=False)
        else:
            h_prime=BoltzmannStep(v_prime,c,w,do_random_sampling=do_random_sampling)
    else:
        h_prime=np.zeros(np.shape(h))
        
    # return original v along with corresponding h, v', h'
    return(v,h,v_prime,h_prime)

Given a set of randomly selected training samples $v$ (of shape `[batchsize,num_neurons_visible]`), and given biases $b,c$ and weights $w$: update those biases and weights according to the contrastive-divergence update rules:

$$\Delta w_{ij} = \eta \left( \langle v_i h_j\rangle - \epsilon\langle v'_i h'_j\rangle \right)\\
\Delta c_i  = \eta \left( \langle v_i\rangle - \epsilon\langle v'_i\rangle\right)\\
\Delta b_j  = \eta \left( \langle h_j\rangle - \epsilon\langle h'_j\rangle\right)
$$

Returns `delta_b`, `delta_c`, `delta_w`.

In [None]:
def trainStep(v,b,c,w,
              eta,
              epsilon=1.,
              do_random_sampling=True,
              do_not_sample_h_prime=False,
              do_not_sample_v_prime=False):
    
    # update v, h, v', h' based on random sampling
    v,h,v_prime,h_prime=BoltzmannSequence(v,b,c,w,
                                          do_random_sampling=do_random_sampling,
                                          do_not_sample_h_prime=do_not_sample_h_prime,
                                          do_not_sample_v_prime=do_not_sample_v_prime)
    
    # Check difference of random sampling results from training data, update weights and biases accordingly
    #       Positive phase                                   Negative phase
    db=eta*(np.average(v,axis=0)                       - epsilon*np.average(v_prime,axis=0))
    dc=eta*(np.average(h,axis=0)                       - epsilon*np.average(h_prime,axis=0))
    dw=eta*(np.average(v[:,:,None]*h[:,None,:],axis=0) - epsilon*np.average(v_prime[:,:,None]*h_prime[:,None,:],axis=0))
 
    return (db, dc, dw)

Produce `batchsize` samples, of length `num_visible`. Returns array $v$ of shape `[batchsize,num_visible]`.

Within the set of visible nodes, randomly places segments of '1's of size 2*`max_distance`.  So if `max_distance` is the default, then it will place a segment of length 6.

In [None]:
def produce_samples_random_segment(batchsize,num_visible,max_distance=3,debug=False):
    random_pos=num_visible*np.random.uniform(size=batchsize)
    if debug:
        print(random_pos)
        print(random_pos[:,None])
    j=np.arange(0,num_visible)

    retval=np.array( np.abs(j[None,:]-random_pos[:,None])<=max_distance, dtype='int' )
    
    if debug:
        print(np.array( np.abs(j[None,:]-random_pos[:,None])))
    
    return( retval )

Let's just check to make sure we understand the samples that get produced with this function, using a batch size of 1:

In [None]:
produce_samples_random_segment(1,20,max_distance=3,debug=True)

### RBM Training

Now we're finally ready to do the training.  Initialize some hyperparameters and the assign random values as starting points for vectors $b$, $c$, and matrix $w$:

In [None]:
num_visible=20
num_hidden=10
eta=0.1
epsilon=1
nsteps=int(500/epsilon) # increase training sample size with unlearning rate
batchsize=50 
skipsteps=int(10/epsilon)

b=np.random.randn(num_visible)
c=np.random.randn(num_hidden)
w=np.random.randn(num_visible,num_hidden)

test_samples=np.zeros([num_visible,nsteps])

Now produce some random assortments of bits that will represent our training data, and train the RBM on those samples as we get them:

In [None]:
for j in range(nsteps):
    
    # produce the samples
    v=produce_samples_random_segment(batchsize,num_visible)
    
    # train the RBM on the samples
    db,dc,dw=trainStep(v,b,c,w,eta,epsilon=epsilon)
    
    # update the parameters for the next round of training
    b+=db
    c+=dc
    w+=dw
    
    # add the visible nodes from the sampling to the test samples...
    # not strictly necessary, but we'll visualize the test samples below.
    test_samples[:,j]=v[0,:]
    
    # plot some of the samples below
    if j%skipsteps==0 or j==nsteps-1:
        clear_output(wait=True)
        plt.figure(figsize=(10,1))
        plt.imshow(test_samples,origin='lower',aspect='auto',interpolation='none')
        plt.axis('off')
        plt.show()

Now: visualize the typical samples generated (from some starting point).  Run several times to continue this. It basically is a random walk through the space of all possible configurations, hopefully according to the probability distribution that has been learned in training.

Note in this case we're explicitly *not* updating $h$, since we're not training anymore!  We just allow the visible nodes $v$ to interact with the latent space $h$ to evolve $v$ as we step through.  This implies that we should get some continuous-ish behavior of the output.

In [None]:
nsteps=100
test_samples=np.zeros([num_visible,nsteps])

v_prime=np.zeros(num_visible)
h=np.zeros(num_hidden)
h_prime=np.zeros(num_hidden)

for j in range(nsteps):
    v,h,v_prime,h_prime=BoltzmannSequence(v,b,c,w,drop_h_prime=True) # step from v via h to v_prime!
    test_samples[:,j]=v[0,:]
    v=np.copy(v_prime) # use the new v as a starting point for next step!
    if j%skipsteps==0 or j==nsteps-1:
        clear_output(wait=True)
        plt.imshow(test_samples,origin='lower',interpolation='none')
        plt.show()

Note that this is entirely based on *sampling*, not backpropagation, so the algorithm will behave differently overall from the neural nets that we're used to.  

## RBM's in `scikit-learn`

We'll use the `scikit-learn` implementation of RBM's to look at denoising and image reconstruction.  Let's get some MNIST data to use for training and testing:

In [None]:
# Dataset
(X_train, X_test), (X_test, Y_test) = mnist.load_data()

X_train = X_train.reshape(-1, 784)/255
X_test = X_test.reshape(-1, 784)/255

# Apply a threshold to binarize the image.
X_train = np.where(X_train > 0.2, 1, 0)
X_test = np.where(X_test > 0.2, 1, 0)

# Split into training and validation sets
X_train, X_val = train_test_split(X_train, test_size=1/5,random_state=42)

Let's do our usual printout of a few figures in MNIST to remind ourselves of what the images are:

In [None]:
plt.figure(figsize=(8,4))

for i in range(10):
  plt.subplot(2,5,i+1)
  plt.xticks([])
  plt.yticks([])
  plt.grid(False)
  plt.imshow(X_train[i].reshape(28,28), cmap='Greys')
plt.tight_layout()

### Training

I pre-trained a model last night, since it takes a bit of time.  The `scikit-learn` RBM model evaluates a "pseudo-likelihood" as the equivalent of a loss function to determine convergence.

In [None]:
try:
    # If you have access to our pretrained model
    rbm = load('rbm.joblib')  
    print("RBM Reloaded")
except:
    rbm = BernoulliRBM(random_state=0, n_components=80,
                       verbose=True, batch_size=20, n_iter=60, learning_rate=0.01)

    rbm.fit(X_train)
    dump(rbm, 'rbm.joblib')

print("Training set Pseudo-Likelihood =", rbm.score_samples(X_train).mean())
print("Validation set Pseudo-Likelihood =", rbm.score_samples(X_val).mean())

### Denoising

Now let's do a de-noising example.  Take an example from the test set and make it noisy.

In [None]:
# Pick a random image from the test set
im_ind = 23
X_pick = X_test[im_ind]

# Choose 50 random pixels to flip
pick = np.random.choice(28 * 28, 50)
x_noisy = np.copy(X_pick)
x_noisy[pick] = ((X_pick[pick] + 1) % 2)

# Plotting the images
fig, ax = plt.subplots(1, 2, figsize=(6, 3))
ax[0].imshow(X_pick.reshape(28, 28), cmap='Greys')
ax[0].set_title('Original')
ax[1].imshow(x_noisy.reshape(28, 28), cmap='Greys')
ax[1].set_title('Corrupted')

for i in range(2):
    ax[i].set_xticks([])
    ax[i].set_yticks([])
plt.show()

Now pass the noisy image into the RBM, which has already been trained, so we're only going to do the Gibbs sampling here to update the visible nodes.  

In [None]:
# Perform the denoising
k_iter = 12  # Number of Gibbs Sampling Iterations
alpha = 0.9  # Decay factor for the averaging

# Gibb sampling steps.  Do the first iteration based on the noisy image:
v = rbm.gibbs(x_noisy)
x_final = np.zeros(784) + np.copy(v)
fig,ax=plt.subplots(1,k_iter,figsize=(20,10))
for i in range(k_iter):
    x_slice = np.where(x_final > 0.5*np.max(x_final),1,0)
    ax[i].imshow(x_slice.reshape(28,28),cmap='Greys')
    ax[i].set_xticks([])
    ax[i].set_yticks([])
    
    # update the image with each iteration.
    v = rbm.gibbs(v)
    
    # Averaging the images
    x_final += (alpha**(i+1))*v.astype(float) 
plt.show()

In [None]:
# Applying a threshold to binarize the image
x_final = np.where(x_final > 0.5*np.max(x_final), 1, 0)

plt.imshow(x_final.reshape(28, 28), cmap='Greys')
plt.show()

**Exercise**: play with the noise levels, test images, and more -- how far can we push this?

### Missing image reconstruction

Finally, let's see if we can reconstruct an image where part of the image is missing entirely.

In [None]:
# Pick another random image and set some parts of the image to zero.
im_ind = 30
X_missing = X_test[im_ind].copy().reshape(28,28)
X_missing[:,15:] = 0

# Image Reconstruction
k_iter = 100 # Number of Gibbs iterations
alpha = 0.9  # Decay factor

X_recon = np.zeros((28,13)) # Array to store the reconstruction

b = X_missing.copy().reshape(-1)
for i in range(k_iter):
    b = rbm.gibbs(b)
    X_recon += alpha**(i) * b.reshape(28,28)[:,15:]
    b.reshape(28,28)[:,:15] = X_missing[:,:15]

# Apply a threshold and complete the image
X_recon = np.where(X_recon > 0.5*np.max(X_recon), 1, 0)
X_complete = X_missing.copy()
X_complete[:,15:] = X_recon

# Plot the figures
fig, ax = plt.subplots(1, 3, figsize=(10, 3))
ax[0].imshow(X_test[im_ind].reshape(28, 28), cmap='Greys')
ax[0].set_title('Original')
ax[1].imshow(X_missing, cmap='Greys')
ax[1].set_title('Corrupted')
ax[2].imshow(X_complete.reshape(28,28), cmap='Greys')
ax[2].set_title('Reconstructed')
for i in range(3):
    ax[i].set_xticks([])
    ax[i].set_yticks([])
plt.show()

**Exercise**: How much of the image can we chop away?  Does it matter which part of the image we remove?  How many iterations do we need to get back to a good sample?