In [2]:
import time
import logging
import numpy as np
import matplotlib.pyplot as plt
from argparse import ArgumentParser

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("")
formatter = logging.Formatter('%(message)s')

class RestrictedBoltzmannMachine:
    def __init__(self, n_visible, n_hidden, activation, learning_rate=0.1, n_epochs=1000, batch_size=10, decay_rate=0.99):
        self.n_visible = n_visible
        self.n_hidden = n_hidden
        self.learning_rate = learning_rate
        self.n_epochs = n_epochs
        self.batch_size = batch_size
        self.decay_rate = decay_rate
        self.activation = activation

        # Initialize weights and biases
        self.weights = np.random.uniform(-0.1, 0.1, (n_visible, n_hidden))
        self.visible_bias = np.zeros(n_visible)
        self.hidden_bias = np.zeros(n_hidden)

    def activate(self, x):
        """Apply the selected activation function."""
        if self.activation == "sigmoid":
            return 1 / (1 + np.exp(-x))
        elif self.activation == "relu":
            return np.maximum(0, x)
        elif self.activation == "leaky_relu":
            return np.where(x > 0, x, 0.01 * x)
        else:
            raise ValueError("Unsupported activation function")
        
    def sample_probabilities(self, probs):
        """Sample binary states based on probabilities."""
        return (np.random.rand(*probs.shape) < probs).astype(np.float32)

    def contrastive_divergence(self, data):
        """Perform one step of contrastive divergence."""
        # Positive phase
        pos_hidden_probs = self.activate(np.dot(data, self.weights) + self.hidden_bias)
        pos_hidden_states = self.sample_probabilities(pos_hidden_probs)
        pos_associations = np.dot(data.T, pos_hidden_probs)

        # Negative phase
        neg_visible_probs = self.activate(np.dot(pos_hidden_states, self.weights.T) + self.visible_bias)
        neg_hidden_probs = self.activate(np.dot(neg_visible_probs, self.weights) + self.hidden_bias)
        neg_associations = np.dot(neg_visible_probs.T, neg_hidden_probs)

        # Update weights and biases
        self.weights += self.learning_rate * (pos_associations - neg_associations) / data.shape[0]
        self.visible_bias += self.learning_rate * np.mean(data - neg_visible_probs, axis=0)
        self.hidden_bias += self.learning_rate * np.mean(pos_hidden_probs - neg_hidden_probs, axis=0)

    def train(self, data):
        """Train the RBM using the provided data."""
        total_times = []
        errors = []
        for epoch in range(self.n_epochs):
            np.random.shuffle(data)
            start_time = time.time()
            for i in range(0, data.shape[0], self.batch_size):
                batch = data[i:i + self.batch_size]
                self.contrastive_divergence(batch)

            elapsed_time = time.time() - start_time
            error = np.mean((data - self.reconstruct(data)) ** 2)

            total_times.append(elapsed_time)
            errors.append(error)

            # Apply learning rate decay
            self.learning_rate *= self.decay_rate

            # Calculate reconstruction error
            if (epoch + 1) % 100 == 0:
                logger.info(f"Epoch {epoch + 1}/{self.n_epochs}, Reconstruction Error: {error:.4f}, Elapsed Time: {elapsed_time:.4f}")

        logger.info(f"Average Error: {np.mean(errors)} Average Epoch Time: {np.mean(total_times)}")

    def reconstruct(self, data):
        """Reconstruct visible units from hidden units."""
        hidden_probs = self.activate(np.dot(data, self.weights) + self.hidden_bias)
        visible_probs = self.activate(np.dot(hidden_probs, self.weights.T) + self.visible_bias)
        return visible_probs

    def visualize_weights(self):
        """Visualize weights as a heatmap."""
        plt.figure(figsize=(10, 8))
        plt.imshow(self.weights, cmap='viridis', aspect='auto')
        plt.colorbar(label="Weight Magnitude")
        plt.title("Weight Heatmap")
        plt.xlabel("Hidden Units")
        plt.ylabel("Visible Units")
        plt.show()

def generate_numerals():
    """Generate 10x10 binary arrays representing the digits 0-7."""
    numerals = [
        # Digit 0
        np.array([
            [0, 1, 1, 1, 1, 0, 0, 0, 0, 0],
            [1, 1, 0, 0, 1, 1, 0, 0, 0, 0],
            [1, 1, 0, 0, 1, 1, 0, 0, 0, 0],
            [1, 1, 0, 0, 1, 1, 0, 0, 0, 0],
            [1, 1, 0, 0, 1, 1, 0, 0, 0, 0],
            [1, 1, 0, 0, 1, 1, 0, 0, 0, 0],
            [1, 1, 0, 0, 1, 1, 0, 0, 0, 0],
            [0, 1, 1, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        ]),
        # Digit 1
        np.array([
            [0, 0, 1, 1, 0, 0, 0, 0, 0, 0],
            [0, 1, 1, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 1, 1, 0, 0, 0, 0, 0],
            [0, 1, 1, 1, 1, 1, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        ]),
        # Digit 2
        np.array([
            [0, 1, 1, 1, 1, 0, 0, 0, 0, 0],
            [1, 1, 0, 0, 1, 1, 0, 0, 0, 0],
            [0, 0, 0, 0, 1, 1, 0, 0, 0, 0],
            [0, 0, 0, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 1, 1, 0, 0, 0, 0, 0, 0],
            [0, 1, 1, 0, 0, 0, 0, 0, 0, 0],
            [1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
            [1, 1, 1, 1, 1, 1, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        ]),
        # Digit 3
        np.array([
            [0, 1, 1, 1, 1, 0, 0, 0, 0, 0],
            [1, 1, 0, 0, 1, 1, 0, 0, 0, 0],
            [0, 0, 0, 0, 1, 1, 0, 0, 0, 0],
            [0, 0, 0, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 1, 1, 0, 0, 0, 0, 0, 0],
            [0, 0, 0, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 1, 1, 0, 0, 0, 0],
            [1, 1, 0, 0, 1, 1, 0, 0, 0, 0],
            [0, 1, 1, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        ]),
        # Digit 4
        np.array([
            [0, 0, 0, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 1, 1, 1, 0, 0, 0, 0, 0],
            [0, 1, 1, 0, 1, 0, 0, 0, 0, 0],
            [1, 1, 0, 0, 1, 0, 0, 0, 0, 0],
            [1, 1, 1, 1, 1, 1, 0, 0, 0, 0],
            [0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        ]),
        # Digit 5
        np.array([
            [1, 1, 1, 1, 1, 1, 0, 0, 0, 0],
            [1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
            [1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
            [1, 1, 1, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 1, 1, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 1, 1, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 1, 1, 0, 0],
            [1, 1, 0, 0, 1, 1, 0, 0, 0, 0],
            [0, 1, 1, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        ]),
        # Digit 6
        np.array([
            [0, 0, 1, 1, 1, 1, 0, 0, 0, 0],
            [0, 1, 1, 0, 0, 0, 0, 0, 0, 0],
            [1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
            [1, 1, 1, 1, 1, 0, 0, 0, 0, 0],
            [1, 1, 0, 0, 1, 1, 0, 0, 0, 0],
            [1, 1, 0, 0, 0, 1, 1, 0, 0, 0],
            [1, 1, 0, 0, 0, 0, 1, 1, 0, 0],
            [0, 1, 1, 0, 0, 1, 1, 0, 0, 0],
            [0, 0, 1, 1, 1, 1, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        ]),
        # Digit 7
        np.array([
            [1, 1, 1, 1, 1, 1, 0, 0, 0, 0],
            [0, 0, 0, 0, 1, 1, 0, 0, 0, 0],
            [0, 0, 0, 1, 1, 0, 0, 0, 0, 0],
            [0, 0, 1, 1, 0, 0, 0, 0, 0, 0],
            [0, 1, 1, 0, 0, 0, 0, 0, 0, 0],
            [1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
            [1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
            [1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
            [1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
            [0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
        ]),
    ]

    # Flatten each 10x10 array into a 1D array of size 100
    flattened_numerals = [numeral.flatten() for numeral in numerals]
    return np.array(flattened_numerals)

def add_custom_noise(data, noise_level=0.2):
    noisy_data = data.copy()
    noise = np.random.binomial(1, noise_level, data.shape)
    noisy_data = np.abs(noisy_data - noise)  # Flip bits based on noise
    return noisy_data

# Function to binarize the reconstructed data to ensure black and white output
def binarize_data(data, threshold=0.5):
    """Convert probabilities to binary values (0 or 1) based on a threshold."""
    return (data >= threshold).astype(np.float32)

# Function to calculate reconstruction error
def calculate_reconstruction_error(original, reconstructed):
    """Calculate the mean squared error between original and reconstructed data."""
    return np.mean((original - reconstructed) ** 2)

# Function to calculate Hamming distance
def hamming_distance(a, b):
    """Calculate the Hamming distance between two binary arrays."""
    return np.sum(a != b)

if __name__ == "__main__":
    parser = ArgumentParser()

    parser.add_argument('--n_visible', type=int, default=100)
    parser.add_argument('--n_hidden', type=int, default=150)
    parser.add_argument('--learning_rate', type=float, default=0.1)
    parser.add_argument('--n_epochs', type=int, default=1000)
    parser.add_argument('--batch_size', type=int, default=4)
    parser.add_argument('-o', '--output', default=None, type=str, help="Output for the metrics")
    parser.add_argument('--f', type=str, help="Kernel file (ignored in Jupyter)", default=None)  # Added to handle Jupyter arguments

    opts = parser.parse_args()

    if opts.output is not None:
        fileHandler = logging.FileHandler(opts.output)
        fileHandler.setFormatter(formatter)
        logger.addHandler(fileHandler)

    data = generate_numerals()
    logger.info(f"Shape of data: {data.shape}")  # Should print (8, 100)

    noisy_data = add_custom_noise(data, noise_level=0.01)  # Reduced noise level
    logger.info(f"Shape of noisy data: {noisy_data.shape}")  # Should also be (8, 100)

    activations = ["sigmoid", "relu", "leaky_relu"]
    results = []

    for activation in activations:
        logger.info(f"Testing activation function: {activation}")
        rbm = RestrictedBoltzmannMachine(
            n_visible=opts.n_visible,
            n_hidden=opts.n_hidden,
            learning_rate=opts.learning_rate,
            n_epochs=opts.n_epochs,
            batch_size=opts.batch_size,
            activation=activation
        )
        rbm.train(noisy_data)
        reconstructed_data = rbm.reconstruct(noisy_data)
        reconstructed_data = binarize_data(reconstructed_data)

        # Calculate reconstruction error
        reconstruction_error = calculate_reconstruction_error(data, reconstructed_data)

        # Evaluate accuracy using Hamming distance
        threshold = 30
        correct_reconstructions = 0
        for i in range(len(data)):
            distance = hamming_distance(data[i], reconstructed_data[i])
            if distance <= threshold:
                correct_reconstructions += 1

        accuracy = correct_reconstructions / len(data) * 100
        results.append((activation, reconstruction_error, accuracy))

    # Display results
    logger.info("Activation Function Results:")
    logger.info(f"{'Activation':<15} {'Reconstruction Error':<20} {'Accuracy (%)':<15}")
    for activation, error, acc in results:
        logger.info(f"{activation:<15} {error:<20.4f} {acc:<15.2f}")

INFO:root:Shape of data: (8, 100)
INFO:root:Shape of noisy data: (8, 100)
INFO:root:Testing activation function: sigmoid
INFO:root:Epoch 100/1000, Reconstruction Error: 0.0008, Elapsed Time: 0.0002
INFO:root:Epoch 200/1000, Reconstruction Error: 0.0002, Elapsed Time: 0.0002
INFO:root:Epoch 300/1000, Reconstruction Error: 0.0002, Elapsed Time: 0.0002
INFO:root:Epoch 400/1000, Reconstruction Error: 0.0001, Elapsed Time: 0.0002
INFO:root:Epoch 500/1000, Reconstruction Error: 0.0001, Elapsed Time: 0.0002
INFO:root:Epoch 600/1000, Reconstruction Error: 0.0001, Elapsed Time: 0.0002
INFO:root:Epoch 700/1000, Reconstruction Error: 0.0001, Elapsed Time: 0.0002
INFO:root:Epoch 800/1000, Reconstruction Error: 0.0001, Elapsed Time: 0.0002
INFO:root:Epoch 900/1000, Reconstruction Error: 0.0001, Elapsed Time: 0.0002
INFO:root:Epoch 1000/1000, Reconstruction Error: 0.0001, Elapsed Time: 0.0002
INFO:root:Average Error: 0.0025350922250412405 Average Epoch Time: 0.00018507933616638184
INFO:root:Testing 

### Overview of the Experiment

This experiment involves the design and training of a **Restricted Boltzmann Machine (RBM)**, a generative stochastic neural network that learns a probability distribution over its input data. The RBM is trained to reconstruct binary data representing numerals (digits 0-7) while exploring the effects of different activation functions on reconstruction accuracy and error.

---

### Background Information

#### **Restricted Boltzmann Machine (RBM) Design**
- RBMs work by maximizing the likelihood function, which requires estimating averages involving exponential terms. This computation is typically expensive, but techniques like Contrastive Divergence (CD) are used to approximate these averages efficiently.
- The RBM consists of two layers:
    - **Visible Units**: Represent the input data.
    - **Hidden Units**: Capture dependencies and features in the data.

#### **Weights and Biases**
- **Weights**: Represent the connections between visible and hidden units. These are learned during training.
- **Visible Bias**: Adjusts the activation of visible units.
- **Hidden Bias**: Adjusts the activation of hidden units.

---

### Activation Functions Used

1. **Sigmoid**:
     - Suitable for binary data as it outputs values between 0 and 1.
     - Helps in reconstructing data that falls within this range.
     - May suffer from vanishing gradient issues during training.

2. **ReLU (Rectified Linear Unit)**:
     - Speeds up training and is preferred for hidden layers.
     - Helps mitigate the vanishing gradient problem by allowing gradients to flow for positive inputs.

3. **Leaky ReLU**:
     - Addresses the "dying ReLU" problem by allowing a small gradient for negative inputs.
     - Prevents neurons from becoming inactive during training.

---

### Training Parameters

- **Learning Rate**: 0.1
- **Number of Epochs**: 1000
- **Batch Size**: 10
- **Decay Rate**: 0.99 (reduces the learning rate over time to stabilize training)

---

### Results Summary

The experiment evaluated the performance of three activation functions (`sigmoid`, `relu`, `leaky_relu`) based on reconstruction error and accuracy. The results are as follows:

| Activation Function | Reconstruction Error | Accuracy (%) |
|----------------------|----------------------|--------------|
| Sigmoid             | 0.22625             | 87.5         |
| ReLU                | 0.15375             | 87.5         |
| Leaky ReLU          | 0.2575              | 75.0         |

- **ReLU** achieved the lowest reconstruction error and highest accuracy, making it the most effective activation function for this dataset.
- **Leaky ReLU** had the highest reconstruction error and lowest accuracy, possibly due to its behavior with binary data.
- **Sigmoid** performed well, as expected for binary data, but was slightly outperformed by ReLU. 

---

### Conclusion

This experiment demonstrates the impact of activation functions on RBM performance. While sigmoid is suitable for binary data, ReLU's ability to mitigate vanishing gradients and speed up training made it the most effective choice for this task.