In [None]:
'''
 * Copyright (c) 2018 Radhamadhab Dalai
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
'''

In [1]:
# Contrastive Divergence Learning



import numpy as np

class RBM:
    def __init__(self, num_visible, num_hidden):
        self.num_visible = num_visible
        self.num_hidden = num_hidden
        self.W = np.random.normal(0, 0.1, size=(num_visible, num_hidden))  # Weight matrix
        self.b = np.zeros(num_visible)  # Visible biases
        self.c = np.zeros(num_hidden)   # Hidden biases

    def sigmoid(self, x):
        return 1.0 / (1 + np.exp(-x))

    def sample_hidden(self, visible):
        hidden_activations = self.sigmoid(np.dot(visible, self.W) + self.c)
        hidden_states = np.random.binomial(1, hidden_activations)
        return hidden_activations, hidden_states

    def sample_visible(self, hidden):
        visible_activations = self.sigmoid(np.dot(hidden, self.W.T) + self.b)
        visible_states = np.random.binomial(1, visible_activations)
        return visible_activations, visible_states

    def contrastive_divergence(self, data, learning_rate=0.1, epochs=100, batch_size=10, k=1):
        num_samples = data.shape[0]
        for epoch in range(epochs):
            np.random.shuffle(data)
            for i in range(0, num_samples, batch_size):
                batch = data[i:i+batch_size]
                self.update_weights(batch, learning_rate, k)

            if epoch % 10 == 0:
                print(f"Epoch {epoch+1} complete. Free energy: {self.free_energy(data)}")

    def update_weights(self, batch, learning_rate, k):
        v0 = batch
        h0_prob, h0_sample = self.sample_hidden(v0)

        vk = v0
        hk = h0_sample
        for _ in range(k):
            vk_prob, vk_sample = self.sample_visible(hk)
            hk_prob, hk = self.sample_hidden(vk_sample)

        positive_grad = np.dot(v0.T, h0_prob)
        negative_grad = np.dot(vk_sample.T, hk_prob)

        self.W += learning_rate * (positive_grad - negative_grad) / batch.shape[0]
        self.b += learning_rate * np.mean(v0 - vk_sample, axis=0)
        self.c += learning_rate * np.mean(h0_prob - hk_prob, axis=0)

    def free_energy(self, v):
        vbias_term = np.dot(v, self.b)
        wx_b = np.dot(v, self.W) + self.c
        hidden_term = np.sum(np.log(1 + np.exp(wx_b)), axis=1)
        return -hidden_term - vbias_term


In [2]:
# Example usage
num_visible = 6
num_hidden = 3
rbm = RBM(num_visible, num_hidden)

# Assuming data is your training dataset, shape (num_samples, num_visible)
data = np.random.binomial(1, 0.5, size=(100, num_visible))  # Dummy data

rbm.contrastive_divergence(data, learning_rate=0.1, epochs=100, batch_size=10, k=1)


Epoch 1 complete. Free energy: [-1.74022503 -1.92157537 -1.75184045 -1.803132   -2.1743541  -2.03300667
 -1.83326608 -1.77516069 -1.84183904 -1.92731382 -1.86881865 -2.1382904
 -2.00528705 -2.07877459 -1.91484734 -1.92266732 -1.8222435  -1.91102561
 -1.85732583 -2.03300667 -1.93297515 -1.74022503 -2.0028521  -1.78899529
 -1.89677059 -2.08686085 -2.00749652 -1.93751805 -2.03300667 -2.1743541
 -1.96309149 -2.04022157 -1.86881865 -2.17751814 -2.17751814 -1.96812387
 -2.0849842  -2.00528705 -1.71477736 -1.94760785 -2.03300667 -1.92266732
 -2.00528705 -1.85732583 -1.77516069 -2.08686085 -2.09555477 -1.69760517
 -1.75485715 -1.80893841 -1.92731382 -1.66408322 -1.87746942 -1.87746942
 -2.00528705 -1.9746386  -1.84183904 -1.69760517 -2.04637275 -2.1743541
 -2.03300667 -1.92157537 -2.08686085 -1.803132   -1.8222435  -1.75485715
 -2.0491449  -1.66408322 -1.79566977 -1.8311864  -1.76889929 -2.1382904
 -1.92157537 -1.8311864  -2.0849842  -1.93297515 -1.95645014 -1.9746386
 -2.0491449  -2.04637275 

## Algorithm 7.3: CD1 Fast RBM Learning Algorithm

1. **Input**: A training sample $ x_0 $, the number of hidden layer units m, learning rate $ \eta $, maximum training period T .

2. **Initialization**: 
   - Set the initial state of the visible layer unit $ x_1 = x_0 $.
   - Randomly initialize W , b, and c.

3. **For t = 1 to T do**
   1. **For \( j = 1 \) to \( m \) do** (for all hidden units)
      1. Calculate \( P(h_1^j = 1 | x_1) = \sigma \left(c_j + \sum_i W_{ij} x_1^i \right) \)
   2. **End for**
   
   3. Construct \( P(h_1 = 1 | x_1) = \left[ P(h_1^1 = 1 | x_1), \ldots, P(h_1^m = 1 | x_1) \right]^T \)

   4. **For \( i = 1 \) to \( n \) do** (for all visible units)
      1. Calculate \( P(x_2^i = 1 | h_1) = \sigma \left( b_i + \sum_j W_{ij} h_1^j \right) \)
   5. **End for**

   6. **For \( j = 1 \) to \( m \) do** (for all hidden units)
      1. Calculate \( P(h_2^j = 1 | x_2) = \sigma \left( c_j + \sum_i W_{ij} x_2^i \right) \)
   7. **End for**

   8. Construct \( P(h_2 = 1 | x_2) = \left[ P(h_2^1 = 1 | x_2), \ldots, P(h_2^m = 1 | x_2) \right]^T \)

   9. Update weights and biases:
      - \( W \leftarrow W + \eta \left( P(h_1 = 1 | x_1) x_1^T - P(h_2 = 1 | x_2) x_2^T \right) \)
      - \( b \leftarrow b + \eta (x_1 - x_2) \)
      - \( c \leftarrow c + \eta \left( P(h_1 = 1 | x_1) - P(h_2 = 1 | x_2) \right) \)

4. **End for**

5. **Output**: \( W \), \( b \), \( c \)


In [4]:
import numpy as np

class RBM:
    def __init__(self, n_visible, n_hidden, learning_rate=0.1):
        self.n_visible = n_visible
        self.n_hidden = n_hidden
        self.learning_rate = learning_rate
        
        # Initialize weights and biases
        self.W = np.random.randn(n_visible, n_hidden) * 0.01
        self.b = np.zeros(n_visible)
        self.c = np.zeros(n_hidden)
    
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    
    def train(self, data, max_epochs=1000):
        for epoch in range(max_epochs):
            for x0 in data:
                # Positive phase
                h1_prob = self.sigmoid(self.c + np.dot(x0, self.W))
                h1 = (np.random.rand(self.n_hidden) < h1_prob).astype(np.float32)
                
                # Negative phase
                x2_prob = self.sigmoid(self.b + np.dot(h1, self.W.T))
                x2 = (np.random.rand(self.n_visible) < x2_prob).astype(np.float32)
                
                h2_prob = self.sigmoid(self.c + np.dot(x2, self.W))
                
                # Update weights and biases
                self.W += self.learning_rate * (np.outer(x0, h1_prob) - np.outer(x2, h2_prob))
                self.b += self.learning_rate * (x0 - x2)
                self.c += self.learning_rate * (h1_prob - h2_prob)
            
            if epoch % 100 == 0:
                error = np.mean(np.square(data - self.reconstruct(data)))
                print(f'Epoch {epoch}, Reconstruction error: {error}')
    
    def reconstruct(self, data):
        h = self.sigmoid(self.c + np.dot(data, self.W))
        reconstructed_data = self.sigmoid(self.b + np.dot(h, self.W.T))
        return reconstructed_data

# Example usage:
# Create some binary data for training
data = np.random.randint(2, size=(100, 6))

# Initialize RBM with 6 visible units and 2 hidden units
rbm = RBM(n_visible=6, n_hidden=2)

# Train RBM
rbm.train(data, max_epochs=1000)


Epoch 0, Reconstruction error: 0.25020727745176286
Epoch 100, Reconstruction error: 0.19526593913238244
Epoch 200, Reconstruction error: 0.19113734728849557
Epoch 300, Reconstruction error: 0.1792198589607437
Epoch 400, Reconstruction error: 0.18320924647445982
Epoch 500, Reconstruction error: 0.18139787772999202
Epoch 600, Reconstruction error: 0.17679489427426476
Epoch 700, Reconstruction error: 0.1784778090162915
Epoch 800, Reconstruction error: 0.18079410722245873
Epoch 900, Reconstruction error: 0.17693185260783437


In [5]:
# Multiple Restricted Boltzmann Machines
import numpy as np

class RBM:
    def __init__(self, n_visible, n_hidden, learning_rate=0.1):
        self.n_visible = n_visible
        self.n_hidden = n_hidden
        self.learning_rate = learning_rate
        
        # Initialize weights and biases
        self.W = np.random.randn(n_visible, n_hidden) * 0.01
        self.b = np.zeros(n_visible)
        self.c = np.zeros(n_hidden)
    
    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))
    
    def train(self, data, max_epochs=1000):
        for epoch in range(max_epochs):
            for x0 in data:
                # Positive phase
                h1_prob = self.sigmoid(self.c + np.dot(x0, self.W))
                h1 = (np.random.rand(self.n_hidden) < h1_prob).astype(np.float32)
                
                # Negative phase
                x2_prob = self.sigmoid(self.b + np.dot(h1, self.W.T))
                x2 = (np.random.rand(self.n_visible) < x2_prob).astype(np.float32)
                
                h2_prob = self.sigmoid(self.c + np.dot(x2, self.W))
                
                # Update weights and biases
                self.W += self.learning_rate * (np.outer(x0, h1_prob) - np.outer(x2, h2_prob))
                self.b += self.learning_rate * (x0 - x2)
                self.c += self.learning_rate * (h1_prob - h2_prob)
            
            if epoch % 100 == 0:
                error = np.mean(np.square(data - self.reconstruct(data)))
                print(f'Epoch {epoch}, Reconstruction error: {error}')
    
    def reconstruct(self, data):
        h = self.sigmoid(self.c + np.dot(data, self.W))
        reconstructed_data = self.sigmoid(self.b + np.dot(h, self.W.T))
        return reconstructed_data

    def transform(self, data):
        return self.sigmoid(self.c + np.dot(data, self.W))

class DBN:
    def __init__(self, layer_sizes, learning_rate=0.1, max_epochs=1000):
        self.layer_sizes = layer_sizes
        self.learning_rate = learning_rate
        self.max_epochs = max_epochs
        self.rbms = []
        self._build_rbms()
    
    def _build_rbms(self):
        for i in range(len(self.layer_sizes) - 1):
            rbm = RBM(self.layer_sizes[i], self.layer_sizes[i+1], self.learning_rate)
            self.rbms.append(rbm)
    
    def pretrain(self, data):
        input_data = data
        for i, rbm in enumerate(self.rbms):
            print(f'Training RBM layer {i+1}/{len(self.rbms)} with {rbm.n_visible} visible units and {rbm.n_hidden} hidden units')
            rbm.train(input_data, self.max_epochs)
            input_data = rbm.transform(input_data)
    
    def reconstruct(self, data):
        transformed_data = data
        for rbm in self.rbms:
            transformed_data = rbm.transform(transformed_data)
        reconstructed_data = transformed_data
        for rbm in reversed(self.rbms):
            reconstructed_data = rbm.reconstruct(reconstructed_data)
        return reconstructed_data

# Example usage:
# Create some binary data for training
data = np.random.randint(2, size=(100, 6))

# Define layer sizes: 6 visible units, 4 hidden units in the first RBM, and 3 hidden units in the second RBM
layer_sizes = [6, 4, 3]

# Initialize DBN
dbn = DBN(layer_sizes, learning_rate=0.1, max_epochs=1000)

# Pretrain DBN
dbn.pretrain(data)

# Reconstruct data using the trained DBN
reconstructed_data = dbn.reconstruct(data)


Training RBM layer 1/2 with 6 visible units and 4 hidden units
Epoch 0, Reconstruction error: 0.25256792754457763
Epoch 100, Reconstruction error: 0.22537702293600176
Epoch 200, Reconstruction error: 0.2250872664629361
Epoch 300, Reconstruction error: 0.18005507358416376
Epoch 400, Reconstruction error: 0.16843223817423028
Epoch 500, Reconstruction error: 0.1582156608374342
Epoch 600, Reconstruction error: 0.16152565468936741
Epoch 700, Reconstruction error: 0.1619548377002168
Epoch 800, Reconstruction error: 0.16409116193986678
Epoch 900, Reconstruction error: 0.1594841465762499
Training RBM layer 2/2 with 4 visible units and 3 hidden units
Epoch 0, Reconstruction error: 0.05755355447779868
Epoch 100, Reconstruction error: 0.061761361568826105
Epoch 200, Reconstruction error: 0.056338898501938746
Epoch 300, Reconstruction error: 0.05641870923556357
Epoch 400, Reconstruction error: 0.05725537266683869
Epoch 500, Reconstruction error: 0.05796535462960344
Epoch 600, Reconstruction error:

ValueError: shapes (100,3) and (4,3) not aligned: 3 (dim 1) != 4 (dim 0)

In the above discussion, the observed “visible” unit is denoted by the binary vector
$$\mathbf{x} = [x_1, \ldots, x_m]^T$$. Now, consider the case where we have a set of \( m \) user-rated
movies or commodities. Suppose the user rated movie \( i \) as \( k \), where \( k \in \{1, \ldots, K\} \).
Then the observed “visible” binary rating matrix is defined as
$$
\mathbf{X} = \begin{bmatrix}
x_{11} & x_{12} & \cdots & x_{1m} \\
x_{21} & x_{22} & \cdots & x_{2m} \\
\vdots & \vdots & \ddots & \vdots \\
x_{K1} & x_{K2} & \cdots & x_{Km}
\end{bmatrix} \in \mathbb{R}^{K \times m}
$$
where the \((k, i)\)th entry is given by
$$
x_{ik} = \begin{cases}
1, & \text{if the user rated movie } i \text{ as } k; \\
0, & \text{otherwise}.
\end{cases}
$$
Therefore, we have \( K \) restricted Boltzmann machines with binary hidden units
and softmax visible units. For each user, the RBM only includes softmax units for
the movies that user has rated. All of the \( K \) RBMs have the same binary values
of hidden (latent) variables \( \mathbf{h} \).
If using a conditional multinomial distribution (a “softmax”) for modeling each
column of the observed visible binary rating matrix \( \mathbf{X} \) and a conditional Bernoulli
distribution for modeling hidden user features \( \mathbf{h} \), then
$$
p(x_{ik} = 1 | \mathbf{h}) = \frac{\exp(b_{ik} + \sum_{j=1}^{F} h_j W_{ij}^k)}{\sum_{l=1}^{K} \exp(b_{il} + \sum_{j=1}^{F} h_j W_{ij}^l)},
$$
where \( W_{ij}^k \) is a symmetric interaction parameter between feature and rating \( k \) of
movie \( i \), \( b_{ik} \) is the bias of rating \( k \) for movie \( i \), and \( c_j \) is the bias of feature \( j \). Note
that the \( b_{ik} \) can be initialized to the logs of their respective base rates over all users.
The marginal distribution over the visible ratings \( \mathbf{X} \) is
$$
p(\mathbf{X}) = \sum_{\mathbf{h}} \frac{\exp(-E(\mathbf{X}, \mathbf{h}))}{\sum_{\mathbf{X}', \mathbf{h}'} \exp(-E(\mathbf{X}', \mathbf{h}'))}
$$
with an “energy” term given by
$$
E(\mathbf{X}, \mathbf{h}) = - \sum_{i=1}^{m} \sum_{j=1}^{F} \sum_{k=1}^{K} W_{ij}^k h_j x_{ik} + \sum_{i=1}^{m} \log(Z_i) - \sum_{i=1}^{m} \sum_{k=1}^{K} x_{ik} b_{ik} - \sum_{j=1}^{F} h_j c_j,
$$
where
$$
Z_i = \sum_{l=1}^{K} \exp(b_{il} + \sum_{j=1}^{F} h_j W_{ij}^l)
$$
is the normalization term that ensures that \( \sum_{l=1}^{K} p(x_i = 1|h) = 1 \). The movies with missing ratings do not make any
contribution to the energy function.
The symmetric interaction matrix \( \mathbf{W} \) is updated in gradient ascent form as \( \mathbf{W} \leftarrow \mathbf{W} + \eta \Delta \mathbf{W} \), where
$$
\Delta \mathbf{W} = \left\langle x_{ik} h_j \right\rangle_{\text{data}} - \left\langle x_{ik} h_j \right\rangle_{\text{model}},
$$
where the expectation \( \left\langle x_{ik} h_j \right\rangle_{\text{data}} \) defines the frequency with which movie \( i \) with
rating \( k \) and feature \( j \) are on together when the feature detectors are being driven
by the observed user-rating data from the training set using Eq. (7.6.42), and
\( \left\langle x_{ik} h_j \right\rangle_{\text{model}} \) is the corresponding frequency when the hidden units are being driven
by reconstructed images.
To avoid computing \( \left\langle \cdot \right\rangle_{\text{model}} \), Salakhutdinov et al. proposed to follow an
approximation to the gradient of a different objective function called “contrastive
divergence” (CD):
$$
\Delta \mathbf{W} = \left\langle x_{ik} h_j \right\rangle_{\text{data}} - \left\langle x_{ik} h_j \right\rangle_{T}.
$$
The expectation \( \left\langle \cdot \right\rangle_{T} \) represents a distribution of samples from running the Gibbs
sampler (Eqs. (7.6.41) and (7.6.42)), initialized at the data, for \( T \) full steps. \( T \)
is typically set to one at the beginning of learning and increased as the learning
converges.
Restricted Boltzmann machines discussed above are assumed to use binary
visible and hidden units, but many other types of unit can also be used. The main
use of other types of unit is for dealing with data that is not well-modeled by binary
(or logistic) visible units.
The following are two typical units that can be used in restricted Boltzmann
machines:
1. **Softmax and multinomial units**: For a binary unit, the probability of turning on is
given by the logistic sigmoid function of its total input \( x \):
$$
\sigma(x) = \frac{1}{1 + \exp(-x)}.
$$
The energy contributed by the unit is \( -x \) if it is on and 0 if it is off. This logistic
sigmoid function of two states can be generalized to \( K \) alternative states, i.e.,
$$
p_j = \frac{\exp(x_j)}{\sum_{i=1}^{K} \exp(x_i)},
$$
which is often called a “softmax” unit. A further generalization of the softmax
unit is to sample \( N \) times (with replacement) from the probability distribution
instead of just sampling once. The \( K \) different states can then have integer values
bigger than 1, but the values must add to \( N \). This is called a multinomial unit and
the learning rule is again unchanged.
2. **Gaussian visible units**: The binary visible units are replaced by linear units with
independent Gaussian noise. In this case, the energy function becomes
$$
E(\mathbf{x}, \mathbf{h}) = \sum_{j \in \text{visible}} \frac{(x_j - b_j)^2}{2\sigma_j^2} - \sum_{i \in \text{hidden}} c_i h_i - \sum_{i, j} \frac{h_i x_j W_{ij}}{\sigma_j},
$$
where \( \sigma_i \) is the standard deviation of the Gaussian noise for visible unit \( i \).

In [6]:
import numpy as np

class RBM:
    def __init__(self, n_visible, n_hidden, learning_rate=0.01, n_iterations=1000):
        self.n_visible = n_visible
        self.n_hidden = n_hidden
        self.learning_rate = learning_rate
        self.n_iterations = n_iterations
        self.W = np.random.randn(n_visible, n_hidden) * 0.1
        self.b = np.zeros(n_visible)
        self.c = np.zeros(n_hidden)

    def sigmoid(self, x):
        return 1 / (1 + np.exp(-x))

    def train(self, data):
        for epoch in range(self.n_iterations):
            for x in data:
                x = x.reshape((1, self.n_visible))  # Ensure x is a row vector

                # Positive phase
                h_prob = self.sigmoid(np.dot(x, self.W) + self.c)
                h_state = (h_prob > np.random.rand(1, self.n_hidden)).astype(np.float32)

                # Negative phase
                x_reconstructed_prob = self.sigmoid(np.dot(h_state, self.W.T) + self.b)
                x_reconstructed_state = (x_reconstructed_prob > np.random.rand(1, self.n_visible)).astype(np.float32)

                h_reconstructed_prob = self.sigmoid(np.dot(x_reconstructed_state, self.W) + self.c)

                # Update weights and biases
                self.W += self.learning_rate * (np.dot(x.T, h_prob) - np.dot(x_reconstructed_state.T, h_reconstructed_prob))
                self.b += self.learning_rate * (x - x_reconstructed_state).flatten()
                self.c += self.learning_rate * (h_prob - h_reconstructed_prob).flatten()

            if epoch % 100 == 0:
                error = np.mean((data - self.reconstruct(data)) ** 2)
                print(f'Epoch: {epoch}, Error: {error}')

    def reconstruct(self, data):
        h_prob = self.sigmoid(np.dot(data, self.W) + self.c)
        x_reconstructed_prob = self.sigmoid(np.dot(h_prob, self.W.T) + self.b)
        return x_reconstructed_prob

# Example usage
if __name__ == "__main__":
    # Generate some random binary data
    np.random.seed(0)
    data = np.random.randint(2, size=(100, 6))  # 100 samples, 6 visible units

    rbm = RBM(n_visible=6, n_hidden=3, learning_rate=0.1, n_iterations=1000)
    rbm.train(data)
    
    # Reconstruct the data
    reconstructed_data = rbm.reconstruct(data)
    print(reconstructed_data)


Epoch: 0, Error: 0.2516118916840019
Epoch: 100, Error: 0.2100911991787205
Epoch: 200, Error: 0.17097789024114401
Epoch: 300, Error: 0.1743201215459293
Epoch: 400, Error: 0.17543284017863472
Epoch: 500, Error: 0.16911788950219805
Epoch: 600, Error: 0.16847397695502725
Epoch: 700, Error: 0.1631449462565153
Epoch: 800, Error: 0.17004938994255508
Epoch: 900, Error: 0.1696976419625688
[[0.54273148 0.52167021 0.75622622 0.47379943 0.6223005  0.36373293]
 [0.5501198  0.52491673 0.76816804 0.47514922 0.62169175 0.3540986 ]
 [0.12268186 0.75464447 0.35179849 0.73827165 0.10278767 0.10575589]
 [0.43347516 0.55603284 0.67441495 0.51355194 0.53572903 0.33493186]
 [0.54263147 0.52833697 0.76519059 0.47838252 0.6158852  0.35026675]
 [0.54724742 0.52375173 0.76403208 0.47457224 0.6223705  0.35778298]
 [0.54016275 0.52280166 0.7545462  0.4750792  0.61944895 0.36223455]
 [0.54447509 0.52269214 0.75956962 0.47424302 0.62213869 0.36099225]
 [0.32168784 0.41862602 0.26427066 0.43654352 0.61493032 0.673944