In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
import matplotlib.pyplot as plt
from tqdm import tqdm
import time

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Constants
VISIBLE_UNITS = 784  # 28x28 MNIST images
HIDDEN_UNITS = 256   # Increased from 128
BATCH_SIZE = 128     # Increased from 64 for faster training
LEARNING_RATE = 0.01 # Reduced from 0.1 for more stable training
MOMENTUM = 0.9       # Added momentum for faster convergence
WEIGHT_DECAY = 1e-4  # Added regularization
EPOCHS = 15          # Increased from 10
CD_K = 5             # Increased CD-k steps from 1 to 5

# Data loading with normalization
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,)),  # MNIST mean and std
    transforms.Lambda(lambda x: x.view(-1))
])

# Use try-except for data download
try:
    train_data = datasets.MNIST(root="./data", train=True, transform=transform, download=True)
    test_data = datasets.MNIST(root="./data", train=False, transform=transform, download=True)
except Exception as e:
    print(f"Error downloading MNIST data: {e}")
    raise

# Data loaders with num_workers for parallel loading
train_loader = torch.utils.data.DataLoader(
    train_data,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=4,
    pin_memory=True  # Faster data transfer to GPU if available
)

test_loader = torch.utils.data.DataLoader(
    test_data,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=4,
    pin_memory=True
)

# Check for GPU availability
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

class RBM(nn.Module):
    def __init__(self, visible_units, hidden_units):
        super(RBM, self).__init__()
        # Xavier/Glorot initialization for better convergence
        self.W = nn.Parameter(torch.randn(hidden_units, visible_units) / np.sqrt(visible_units))
        self.h_bias = nn.Parameter(torch.zeros(hidden_units))
        self.v_bias = nn.Parameter(torch.zeros(visible_units))

        # Track reconstruction error
        self.reconstruction_errors = []

    def free_energy(self, v):
        """Calculate the free energy of a visible vector"""
        v_bias_term = torch.matmul(v, self.v_bias)
        hidden_term = torch.sum(torch.log(1 + torch.exp(torch.matmul(v, self.W.t()) + self.h_bias)), dim=1)
        return -hidden_term - v_bias_term

    def sample_hidden(self, v):
        """Sample hidden units given visible units"""
        prob_h = torch.sigmoid(torch.matmul(v, self.W.t()) + self.h_bias)
        h_sample = torch.bernoulli(prob_h)
        return h_sample, prob_h

    def sample_visible(self, h):
        """Sample visible units given hidden units"""
        prob_v = torch.sigmoid(torch.matmul(h, self.W) + self.v_bias)
        v_sample = torch.bernoulli(prob_v)
        return v_sample, prob_v

    def contrastive_divergence(self, v_data, k=1):
        """Perform k steps of contrastive divergence"""
        # Positive phase
        h_data, h_prob_data = self.sample_hidden(v_data)

        # Negative phase (Gibbs sampling)
        v_model = v_data.clone()
        h_model = h_data.clone()

        for _ in range(k):
            v_model, v_prob_model = self.sample_visible(h_model)
            h_model, h_prob_model = self.sample_hidden(v_model)

        # Calculate gradients
        W_grad = torch.matmul(h_prob_data.t(), v_data) - torch.matmul(h_prob_model.t(), v_prob_model)
        h_bias_grad = torch.sum(h_prob_data - h_prob_model, dim=0)
        v_bias_grad = torch.sum(v_data - v_prob_model, dim=0)

        return v_prob_model, W_grad, h_bias_grad, v_bias_grad

    def forward(self, v):
        """Forward pass for feature extraction"""
        _, h_prob = self.sample_hidden(v)
        return h_prob

    def reconstruct(self, v):
        """Reconstruct visible units"""
        h_sample, _ = self.sample_hidden(v)
        _, v_prob = self.sample_visible(h_sample)
        return v_prob

def train_rbm(rbm, optimizer, epochs=EPOCHS):
    """Train the RBM model"""
    start_time = time.time()
    rbm.train()

    # Lists to track metrics
    epoch_losses = []
    reconstruction_errors = []

    for epoch in range(epochs):
        epoch_loss = 0.0
        epoch_recon_error = 0.0

        # Use tqdm for progress bar
        with tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}") as pbar:
            for data, _ in pbar:
                data = data.to(device)

                # Perform CD-k
                v_recon, W_grad, h_bias_grad, v_bias_grad = rbm.contrastive_divergence(data, k=CD_K)

                # Compute reconstruction error
                recon_error = torch.mean(torch.sum((data - v_recon)**2, dim=1))

                # Manual parameter update (alternative to backward)
                with torch.no_grad():
                    rbm.W.add_(LEARNING_RATE * W_grad / len(data))
                    rbm.h_bias.add_(LEARNING_RATE * h_bias_grad / len(data))
                    rbm.v_bias.add_(LEARNING_RATE * v_bias_grad / len(data))

                epoch_loss += recon_error.item()
                epoch_recon_error += recon_error.item()

                # Update progress bar
                pbar.set_postfix({"loss": f"{recon_error.item():.4f}"})

        avg_loss = epoch_loss / len(train_loader)
        avg_recon_error = epoch_recon_error / len(train_loader)
        epoch_losses.append(avg_loss)
        reconstruction_errors.append(avg_recon_error)

        print(f"Epoch {epoch+1}/{epochs}, Recon. Error: {avg_recon_error:.4f}")

    total_time = time.time() - start_time
    print(f"RBM training completed in {total_time:.2f} seconds")

    return epoch_losses, reconstruction_errors

def visualize_weights(rbm, num_weights=10):
    """Visualize RBM weights for selected hidden units"""
    plt.figure(figsize=(12, 6))
    for i in range(num_weights):
        plt.subplot(2, 5, i+1)
        weight = rbm.W[i].detach().cpu().numpy().reshape(28, 28)
        plt.imshow(weight, cmap='viridis')
        plt.axis('off')
        plt.title(f'Hidden Unit {i+1}')
    plt.tight_layout()
    plt.savefig('rbm_weights.png')
    plt.close()

def visualize_reconstruction(rbm, test_loader):
    """Visualize original and reconstructed images"""
    rbm.eval()
    plt.figure(figsize=(12, 6))

    # Get a batch of test data
    data, _ = next(iter(test_loader))
    data = data.to(device)

    # Reconstruct the data
    with torch.no_grad():
        recon = rbm.reconstruct(data)

    # Plot original and reconstructed images
    for i in range(5):
        # Original
        plt.subplot(2, 5, i+1)
        plt.imshow(data[i].cpu().numpy().reshape(28, 28), cmap='gray')
        plt.axis('off')
        plt.title(f'Original {i+1}')

        # Reconstructed
        plt.subplot(2, 5, i+6)
        plt.imshow(recon[i].cpu().numpy().reshape(28, 28), cmap='gray')
        plt.axis('off')
        plt.title(f'Recon {i+1}')

    plt.tight_layout()
    plt.savefig('rbm_reconstruction.png')
    plt.close()

def extract_features(rbm, data_loader):
    """Extract features using the trained RBM"""
    features, labels = [], []
    rbm.eval()

    with torch.no_grad():
        for data, target in tqdm(data_loader, desc="Extracting features"):
            data = data.to(device)
            h_prob = rbm(data)
            features.append(h_prob.cpu().numpy())
            labels.append(target.numpy())

    return np.vstack(features), np.hstack(labels)

# Initialize model and move to device
rbm = RBM(visible_units=VISIBLE_UNITS, hidden_units=HIDDEN_UNITS).to(device)

# Use SGD with momentum and weight decay
optimizer = optim.SGD(
    rbm.parameters(),
    lr=LEARNING_RATE,
    momentum=MOMENTUM,
    weight_decay=WEIGHT_DECAY
)

# Train RBM
losses, recon_errors = train_rbm(rbm, optimizer)

# Visualize weights and reconstructions
visualize_weights(rbm)
visualize_reconstruction(rbm, test_loader)

# Plot training metrics
plt.figure(figsize=(10, 4))
plt.plot(losses)
plt.title('RBM Training Loss')
plt.xlabel('Epoch')
plt.ylabel('Reconstruction Error')
plt.grid(True)
plt.savefig('rbm_training_loss.png')
plt.close()

# Extract features
print("Extracting features for training set...")
X_train, y_train = extract_features(rbm, train_loader)
print("Extracting features for test set...")
X_test, y_test = extract_features(rbm, test_loader)

print(f"Train features shape: {X_train.shape}, Train labels shape: {y_train.shape}")
print(f"Test features shape: {X_test.shape}, Test labels shape: {y_test.shape}")

# Train different classifiers and compare
print("\nTraining classifiers on RBM features:")

# 1. Logistic Regression
print("\n1. Logistic Regression")
start_time = time.time()
lr_clf = LogisticRegression(max_iter=2000, C=1.0, solver='lbfgs', multi_class='multinomial', n_jobs=-1)
lr_clf.fit(X_train, y_train)
lr_pred = lr_clf.predict(X_test)
lr_accuracy = accuracy_score(y_test, lr_pred)
print(f"Logistic Regression Accuracy: {lr_accuracy:.4f}")
print(f"Training time: {time.time() - start_time:.2f} seconds")
print("\nClassification Report:")
print(classification_report(y_test, lr_pred))

# 2. SVM (optional - can be computationally expensive)
from sklearn.svm import SVC
print("\n2. SVM Classifier")
start_time = time.time()
svm_clf = SVC(kernel='rbf', C=10, gamma='scale')
svm_clf.fit(X_train, y_train)
svm_pred = svm_clf.predict(X_test)
svm_accuracy = accuracy_score(y_test, svm_pred)
print(f"SVM Accuracy: {svm_accuracy:.4f}")
print(f"Training time: {time.time() - start_time:.2f} seconds")

# 3. Random Forest
from sklearn.ensemble import RandomForestClassifier
print("\n3. Random Forest Classifier")
start_time = time.time()
rf_clf = RandomForestClassifier(n_estimators=100, max_depth=20, n_jobs=-1)
rf_clf.fit(X_train, y_train)
rf_pred = rf_clf.predict(X_test)
rf_accuracy = accuracy_score(y_test, rf_pred)
print(f"Random Forest Accuracy: {rf_accuracy:.4f}")
print(f"Training time: {time.time() - start_time:.2f} seconds")

# Compare models
print("\nModel Comparison:")
models = {
    'Logistic Regression': lr_accuracy,
    'SVM': svm_accuracy,
    'Random Forest': rf_accuracy
}

for model, acc in sorted(models.items(), key=lambda x: x[1], reverse=True):
    print(f"{model}: {acc:.4f}")

# Experiment with different hidden unit counts (optional)
def experiment_hidden_units():
    """Experiment with different numbers of hidden units"""
    hidden_units_list = [64, 128, 256, 512]
    accuracies = []

    for h_units in hidden_units_list:
        print(f"\nTraining RBM with {h_units} hidden units")

        # Create and train RBM
        test_rbm = RBM(visible_units=VISIBLE_UNITS, hidden_units=h_units).to(device)
        train_rbm(test_rbm, optim.SGD(test_rbm.parameters(), lr=LEARNING_RATE, momentum=MOMENTUM), epochs=5)

        # Extract features and train classifier
        X_train_exp, y_train_exp = extract_features(test_rbm, train_loader)
        X_test_exp, y_test_exp = extract_features(test_rbm, test_loader)

        lr_clf = LogisticRegression(max_iter=1000)
        lr_clf.fit(X_train_exp, y_train_exp)
        y_pred_exp = lr_clf.predict(X_test_exp)

        accuracy = accuracy_score(y_test_exp, y_pred_exp)
        accuracies.append(accuracy)
        print(f"Classification accuracy with {h_units} hidden units: {accuracy:.4f}")

    # Plot results
    plt.figure(figsize=(10, 6))
    plt.plot(hidden_units_list, accuracies, 'o-')
    plt.title('Classification Accuracy vs. Number of Hidden Units')
    plt.xlabel('Number of Hidden Units')
    plt.ylabel('Accuracy')
    plt.grid(True)
    plt.savefig('hidden_units_experiment.png')
    plt.close()

# Uncomment to run the experiment (takes time)
# experiment_hidden_units()

print("\nExecution complete!")

Epoch 1, Loss: 0.4811
Epoch 2, Loss: 0.4811
Epoch 3, Loss: 0.4810
Epoch 4, Loss: 0.4810
Epoch 5, Loss: 0.4811
Epoch 6, Loss: 0.4811
Epoch 7, Loss: 0.4809
Epoch 8, Loss: 0.4810
Epoch 9, Loss: 0.4810
Epoch 10, Loss: 0.4810
(60000, 128) (60000,)
(10000, 128) (10000,)
Classification Accuracy: 0.1339


In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

class RBM(nn.Module):
    def __init__(self, visible_units, hidden_units):
        super(RBM, self).__init__()
        self.W = nn.Parameter(torch.randn(hidden_units, visible_units) * 0.01)
        self.h_bias = nn.Parameter(torch.zeros(hidden_units))
        self.v_bias = nn.Parameter(torch.zeros(visible_units))

    def sample_hidden(self, v):
        prob_h = torch.sigmoid(torch.matmul(v, self.W.t()) + self.h_bias)
        return torch.bernoulli(prob_h), prob_h

    def sample_visible(self, h):
        prob_v = torch.sigmoid(torch.matmul(h, self.W) + self.v_bias)
        return torch.bernoulli(prob_v), prob_v

    def contrastive_divergence(self, v, k=1):
        v_sample = v
        for _ in range(k):
            h_sample, _ = self.sample_hidden(v_sample)
            v_sample, _ = self.sample_visible(h_sample)
        return v_sample

class StackRBM:
    def __init__(self, size=5):
        self.stack_size = size
        self.stack = torch.zeros((1, size))
        self.rbm = RBM(visible_units=size, hidden_units=size)
        self.optimizer = optim.SGD(self.rbm.parameters(), lr=0.1)

    def push(self, value):
        self.stack = torch.roll(self.stack, shifts=1, dims=1)
        self.stack[0, 0] = value
        self.train_rbm()

    def pop(self):
        top_value = self.stack[0, 0].item()
        self.stack[0, 0] = 0
        self.stack = torch.roll(self.stack, shifts=-1, dims=1)
        self.train_rbm()
        return top_value

    def train_rbm(self):
        v_sample = self.rbm.contrastive_divergence(self.stack)
        loss = torch.mean((self.stack - v_sample) ** 2)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def display_stack(self):
        print("Stack State:", self.stack.numpy())

stack_rbm = StackRBM(size=5)
stack_rbm.push(1)
stack_rbm.push(2)
stack_rbm.push(3)
stack_rbm.display_stack()
print("Popped:", stack_rbm.pop())
stack_rbm.display_stack()


Stack State: [[3. 2. 1. 0. 0.]]
Popped: 3.0
Stack State: [[2. 1. 0. 0. 0.]]
