Train a Deep Belief Network (DBN) using RBMs.
Fine-tune with supervised learning.

In [1]:
pip install numpy torch scikit-learn


Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5

In [16]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Dense, Activation, Input
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.utils import to_categorical

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

class RBM:
    def __init__(self, visible_size, hidden_size, learning_rate=0.01, batch_size=100, n_epochs=10):
        self.visible_size = visible_size
        self.hidden_size = hidden_size
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.n_epochs = n_epochs

        # Initialize weights and biases
        self.W = np.random.normal(0, 0.01, (visible_size, hidden_size))
        self.visible_bias = np.zeros(visible_size)
        self.hidden_bias = np.zeros(hidden_size)

    def sample_hidden(self, visible_states):
        """Compute hidden probabilities and sample hidden states."""
        hidden_activations = np.dot(visible_states, self.W) + self.hidden_bias
        hidden_probs = 1.0 / (1.0 + np.exp(-hidden_activations))
        hidden_states = (hidden_probs > np.random.random(hidden_probs.shape)).astype(float)
        return hidden_probs, hidden_states

    def sample_visible(self, hidden_states):
        """Compute visible probabilities and sample visible states."""
        visible_activations = np.dot(hidden_states, self.W.T) + self.visible_bias
        visible_probs = 1.0 / (1.0 + np.exp(-visible_activations))
        visible_states = (visible_probs > np.random.random(visible_probs.shape)).astype(float)
        return visible_probs, visible_states

    def contrastive_divergence(self, visible_data):
        """Perform one step of contrastive divergence learning."""
        # Positive phase
        pos_hidden_probs, pos_hidden_states = self.sample_hidden(visible_data)
        pos_associations = np.dot(visible_data.T, pos_hidden_probs)

        # Negative phase (reconstruction)
        neg_visible_probs, neg_visible_states = self.sample_visible(pos_hidden_states)
        neg_hidden_probs, neg_hidden_states = self.sample_hidden(neg_visible_probs)
        neg_associations = np.dot(neg_visible_probs.T, neg_hidden_probs)

        # Update weights and biases
        self.W += self.learning_rate * ((pos_associations - neg_associations) / len(visible_data))
        self.visible_bias += self.learning_rate * np.mean(visible_data - neg_visible_probs, axis=0)
        self.hidden_bias += self.learning_rate * np.mean(pos_hidden_probs - neg_hidden_probs, axis=0)

        # Compute reconstruction error
        error = np.mean((visible_data - neg_visible_probs) ** 2)
        return error

    def fit(self, data):
        """Train the RBM with the given data."""
        n_samples = data.shape[0]
        n_batches = n_samples // self.batch_size

        errors = []

        for epoch in range(self.n_epochs):
            epoch_error = 0
            np.random.shuffle(data)  # Shuffle data for each epoch

            for batch in range(n_batches):
                batch_start = batch * self.batch_size
                batch_end = (batch + 1) * self.batch_size
                batch_data = data[batch_start:batch_end]

                error = self.contrastive_divergence(batch_data)
                epoch_error += error

            avg_epoch_error = epoch_error / n_batches
            errors.append(avg_epoch_error)
            print(f"Epoch {epoch+1}/{self.n_epochs}, Error: {avg_epoch_error:.4f}")

        return errors

    def transform(self, data):
        """Transform data to hidden representation."""
        hidden_probs, _ = self.sample_hidden(data)
        return hidden_probs


class DBN:
    def __init__(self, layer_sizes, learning_rate=0.01, batch_size=100, rbm_epochs=10):
        """
        Initialize a Deep Belief Network

        Args:
            layer_sizes: list with the size of each layer (including input layer)
            learning_rate: learning rate for RBMs
            batch_size: batch size for RBMs
            rbm_epochs: number of epochs for each RBM
        """
        self.layer_sizes = layer_sizes
        self.n_layers = len(layer_sizes) - 1  # Number of RBMs
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.rbm_epochs = rbm_epochs
        self.rbm_layers = []

        # Create RBM layers
        for i in range(self.n_layers):
            self.rbm_layers.append(
                RBM(layer_sizes[i], layer_sizes[i+1],
                    learning_rate=learning_rate,
                    batch_size=batch_size,
                    n_epochs=rbm_epochs)
            )

    def pretrain(self, data):
        """Perform greedy layer-wise pretraining."""
        print("Starting greedy layer-wise pretraining...")
        input_data = data

        for i, rbm in enumerate(self.rbm_layers):
            print(f"Training RBM layer {i+1}/{self.n_layers}")
            rbm.fit(input_data)

            # Transform data for the next RBM layer
            input_data = rbm.transform(input_data)

        print("Pretraining completed")

    def build_tensorflow_model(self, output_dim, activation='softmax'):
        """Build a TensorFlow model for fine-tuning."""
        model = Sequential()

        # Add layers from pretrained RBMs
        for i, rbm in enumerate(self.rbm_layers):
            if i == 0:
                model.add(Dense(rbm.hidden_size, input_dim=rbm.visible_size, activation='sigmoid'))
            else:
                model.add(Dense(rbm.hidden_size, activation='sigmoid'))

            # Set weights from pretrained RBM
            layer = model.layers[i]
            layer.set_weights([rbm.W, rbm.hidden_bias])

        # Add output layer
        model.add(Dense(output_dim, activation=activation))

        return model

    def fine_tune(self, x_train, y_train, x_test, y_test, learning_rate=0.001, epochs=20, batch_size=128):
        """Fine-tune the network with supervised learning."""
        # One-hot encode the labels
        output_dim = y_train.max() + 1
        y_train_one_hot = to_categorical(y_train, output_dim)
        y_test_one_hot = to_categorical(y_test, output_dim)

        # Build and compile the model
        model = self.build_tensorflow_model(output_dim)
        optimizer = Adam(learning_rate=learning_rate)
        model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])

        print("Starting fine-tuning with supervised learning...")
        history = model.fit(
            x_train, y_train_one_hot,
            epochs=epochs,
            batch_size=batch_size,
            validation_data=(x_test, y_test_one_hot),
            verbose=1
        )

        print("Fine-tuning completed")
        return model, history


# Example usage with MNIST dataset
def run_mnist_example():
    print("Loading MNIST dataset...")
    (x_train, y_train), (x_test, y_test) = mnist.load_data()

    # Preprocess data
    x_train = x_train.reshape(-1, 784).astype('float32') / 255.0
    x_test = x_test.reshape(-1, 784).astype('float32') / 255.0

    # Define DBN architecture
    layer_sizes = [784, 500, 500, 200]

    # Create and pretrain the DBN
    dbn = DBN(
        layer_sizes=layer_sizes,
        learning_rate=0.01,
        batch_size=100,
        rbm_epochs=5
    )

    # Pretrain with unsupervised learning
    dbn.pretrain(x_train)

    # Fine-tune with supervised learning
    model, history = dbn.fine_tune(
        x_train, y_train,
        x_test, y_test,
        learning_rate=0.001,
        epochs=10,
        batch_size=128
    )

    # Evaluate the model
    loss, accuracy = model.evaluate(x_test, to_categorical(y_test))
    print(f"Test accuracy: {accuracy:.4f}")

    return dbn, model, history


if __name__ == "__main__":
    dbn, model, history = run_mnist_example()

Loading MNIST dataset...
Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
[1m11490434/11490434[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 0us/step
Starting greedy layer-wise pretraining...
Training RBM layer 1/3
Epoch 1/5, Error: 0.0563
Epoch 2/5, Error: 0.0372
Epoch 3/5, Error: 0.0305
Epoch 4/5, Error: 0.0263
Epoch 5/5, Error: 0.0236
Training RBM layer 2/3
Epoch 1/5, Error: 0.0522
Epoch 2/5, Error: 0.0217
Epoch 3/5, Error: 0.0137
Epoch 4/5, Error: 0.0105
Epoch 5/5, Error: 0.0094
Training RBM layer 3/3
Epoch 1/5, Error: 0.0533
Epoch 2/5, Error: 0.0240
Epoch 3/5, Error: 0.0148
Epoch 4/5, Error: 0.0104
Epoch 5/5, Error: 0.0080
Pretraining completed
Starting fine-tuning with supervised learning...


  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/10
[1m469/469[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 21ms/step - accuracy: 0.1043 - loss: 2.3261 - val_accuracy: 0.1793 - val_loss: 2.2976
Epoch 2/10
[1m469/469[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 21ms/step - accuracy: 0.1057 - loss: 2.3033 - val_accuracy: 0.1382 - val_loss: 2.2981
Epoch 3/10
[1m469/469[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 24ms/step - accuracy: 0.1081 - loss: 2.3028 - val_accuracy: 0.1488 - val_loss: 2.2966
Epoch 4/10
[1m469/469[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m18s[0m 19ms/step - accuracy: 0.1075 - loss: 2.3027 - val_accuracy: 0.1717 - val_loss: 2.2960
Epoch 5/10
[1m469/469[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m11s[0m 20ms/step - accuracy: 0.1082 - loss: 2.3023 - val_accuracy: 0.1973 - val_loss: 2.2994
Epoch 6/10
[1m469/469[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m10s[0m 21ms/step - accuracy: 0.1094 - loss: 2.3017 - val_accuracy: 0.2021 - val_loss: 2.3010
Epoch 7/10
[1m4

In [24]:
import numpy as np
from sklearn.datasets import load_digits
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.neural_network import BernoulliRBM
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Load the Digits dataset (8x8 images of handwritten digits)
digits = load_digits()
X, y = digits.data, digits.target

# Train-test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y)

# Standardize features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# --- Layer-wise RBM Pretraining ---
rbm1 = BernoulliRBM(n_components=128, learning_rate=0.01, n_iter=20, verbose=1, random_state=42)
X_train_rbm1 = rbm1.fit_transform(X_train_scaled)

rbm2 = BernoulliRBM(n_components=64, learning_rate=0.01, n_iter=20, verbose=1, random_state=42)
X_train_rbm2 = rbm2.fit_transform(X_train_rbm1)

# --- Supervised Fine-Tuning with Logistic Regression ---
clf = LogisticRegression(max_iter=1000, solver='lbfgs', multi_class='multinomial', random_state=42)
clf.fit(X_train_rbm2, y_train)

# Transform test data through RBMs
X_test_rbm1 = rbm1.transform(X_test_scaled)
X_test_rbm2 = rbm2.transform(X_test_rbm1)

# Evaluate
y_pred = clf.predict(X_test_rbm2)
accuracy = accuracy_score(y_test, y_pred)
print(f"Stacked DBN (RBMs + LR) accuracy on Digits dataset: {accuracy:.4f}")

[BernoulliRBM] Iteration 1, pseudo-likelihood = -109.32, time = 0.15s
[BernoulliRBM] Iteration 2, pseudo-likelihood = -124.54, time = 0.32s
[BernoulliRBM] Iteration 3, pseudo-likelihood = -135.10, time = 0.27s
[BernoulliRBM] Iteration 4, pseudo-likelihood = -174.39, time = 0.25s
[BernoulliRBM] Iteration 5, pseudo-likelihood = -219.01, time = 0.16s
[BernoulliRBM] Iteration 6, pseudo-likelihood = -262.25, time = 0.20s
[BernoulliRBM] Iteration 7, pseudo-likelihood = -310.08, time = 0.19s
[BernoulliRBM] Iteration 8, pseudo-likelihood = -359.88, time = 0.19s
[BernoulliRBM] Iteration 9, pseudo-likelihood = -412.63, time = 0.14s
[BernoulliRBM] Iteration 10, pseudo-likelihood = -463.32, time = 0.18s
[BernoulliRBM] Iteration 11, pseudo-likelihood = -525.85, time = 0.24s
[BernoulliRBM] Iteration 12, pseudo-likelihood = -579.42, time = 0.19s
[BernoulliRBM] Iteration 13, pseudo-likelihood = -632.30, time = 0.25s
[BernoulliRBM] Iteration 14, pseudo-likelihood = -681.14, time = 0.08s
[BernoulliRBM] 



Stacked DBN (RBMs + LR) accuracy on Digits dataset: 0.7083
