# Project Phase2

Studnet number : 400101107 - 400101859

Student name : Sadra Khanjari - Parham Gilani

### Simulation Question Part 1

In [4]:
import numpy as np
from sklearn.datasets import fetch_openml
from sklearn.preprocessing import MinMaxScaler,Binarizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from scipy.special import expit as sigmoid
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.metrics import accuracy_score, classification_report
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader,Dataset
from sklearn.metrics import accuracy_score
import pandas as pd

Here is the first part of the simulation question. This is our Secure Restricted Boltzmann Machine (SRBM) implementation and as you can see it has a sample hidden layer and a sample visible layer. The visible layer is the input layer and the hidden layer is the output and it also has a train method that trains the model. There is a also a transform method that transforms the input to the output.

In [3]:
class SecureRBM:
    def __init__(self, n_visible: int, n_hidden: int):
        self.n_visible = n_visible
        self.n_hidden = n_hidden
        self.W = np.random.randn(n_visible, n_hidden) * 0.01
        self.b = np.zeros(n_visible) 
        self.c = np.zeros(n_hidden)  

    def sample_hidden(self, visible: np.ndarray) -> np.ndarray:
        """Sample hidden units given visible units"""
        activation = np.dot(visible, self.W) + self.c
        probabilities = sigmoid(activation)
        return np.random.binomial(1, probabilities)

    def sample_visible(self, hidden: np.ndarray) -> np.ndarray:
        """Sample visible units given hidden units"""
        activation = np.dot(hidden, self.W.T) + self.b
        probabilities = sigmoid(activation)
        return np.random.binomial(1, probabilities)

    def train(self, data: np.ndarray, learning_rate: float = 0.1, epochs: int = 10):
        for epoch in range(epochs):
            for i in range(data.shape[0]):
                v0 = data[i]
                h0 = self.sample_hidden(v0)

                v1 = self.sample_visible(h0)
                h1 = self.sample_hidden(v1)

                positive_gradient = np.outer(v0, h0)
                negative_gradient = np.outer(v1, h1)

                self.W += learning_rate * (positive_gradient - negative_gradient)
                self.b += learning_rate * (v0 - v1)
                self.c += learning_rate * (h0 - h1)
            print(f"Epoch {epoch + 1}/{epochs} completed.")

    def transform(self, data: np.ndarray) -> np.ndarray:
        """Generate hidden representations for input data."""
        return sigmoid(np.dot(data, self.W) + self.c)

In [5]:
# Load and Preprocess MNIST Dataset
mnist = fetch_openml('mnist_784', version=1, as_frame=False)
X, y = mnist.data, mnist.target.astype(int)

# Normalize the data between 0 and 1
scaler = MinMaxScaler()
X = scaler.fit_transform(X)

# Split into train and test datasets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Train Secure Boltzmann Machine
n_visible = X_train.shape[1]
n_hidden = 128
secure_rbm = SecureRBM(n_visible=n_visible, n_hidden=n_hidden)
secure_rbm.train(X_train, learning_rate=0.1, epochs=5)

# Generate Features using RBM
train_features = secure_rbm.transform(X_train)
test_features = secure_rbm.transform(X_test)

# Classification Task
clf = LogisticRegression(max_iter=1000)
clf.fit(train_features, y_train)
predictions = clf.predict(test_features)

# Evaluate the Model
accuracy = accuracy_score(y_test, predictions)
print(f"Classification Accuracy using RBM Features: {accuracy:.4f}")

Epoch 1/5 completed.
Epoch 2/5 completed.
Epoch 3/5 completed.
Epoch 4/5 completed.
Epoch 5/5 completed.
Classification Accuracy using RBM Features: 0.9255


As you can see our model could learn with 0.9255 accuracy and we get same accuracy as in the last part.

### Simulation Part 2

Here is the part 2 of the simulation question. In this part we have implemented the Restricted Boltzmann Machine (RBM) with constrastive divergence (CD) algorithm which perform a Gibbs sampling to approximate the log-likelihood gradient.

In [3]:
class SecureRBM:
    def __init__(self, n_visible: int, n_hidden: int, n_classes: int):
        self.n_visible = n_visible
        self.n_hidden = n_hidden
        self.n_classes = n_classes
        # Initialize weights and biases
        self.W = np.random.randn(n_visible, n_hidden) * 0.01
        self.U = np.random.randn(n_classes, n_hidden) * 0.01  # Class-hidden weights
        self.b = np.zeros(n_visible)  # Visible biases
        self.c = np.zeros(n_hidden)   # Hidden biases
        self.d = np.zeros(n_classes)  # Class biases

    def sample_hidden(self, visible: np.ndarray, label: np.ndarray = None) -> np.ndarray:
        """Sample hidden units given visible units and labels"""
        activation = np.dot(visible, self.W)
        if label is not None:
            activation += np.dot(label, self.U)
        activation += self.c
        probabilities = self.sigmoid(activation)
        return np.random.binomial(1, probabilities)

    def sample_visible(self, hidden: np.ndarray) -> np.ndarray:
        """Sample visible units given hidden units"""
        activation = np.dot(hidden, self.W.T) + self.b
        probabilities = self.sigmoid(activation)
        return np.random.binomial(1, probabilities)

    def sample_label(self, hidden: np.ndarray) -> np.ndarray:
        """Sample label given hidden units"""
        activation = np.dot(hidden, self.U.T) + self.d
        probabilities = self.softmax(activation)
        return np.random.multinomial(1, probabilities)

    def contrastive_divergence(self, v0: np.ndarray, y0: np.ndarray, k: int = 1) -> tuple:
        """Perform Contrastive Divergence with labels"""
        h0 = self.sample_hidden(v0, y0)

        vk = v0.copy()
        yk = y0.copy()
        
        for _ in range(k):
            hk = self.sample_hidden(vk, yk)
            vk = self.sample_visible(hk)
            yk = self.sample_label(hk)

        hk = self.sample_hidden(vk, yk)

        # Compute gradients
        dW = np.outer(v0, h0) - np.outer(vk, hk)
        dU = np.outer(y0, h0) - np.outer(yk, hk)
        db = v0 - vk
        dc = h0 - hk
        dd = y0 - yk

        return dW, dU, db, dc, dd

    def train(self, data: np.ndarray, labels: np.ndarray, learning_rate: float = 0.1, 
              epochs: int = 10, k: int = 1):
        """Train the RBM with labeled data"""
        for epoch in range(epochs):
            for i in range(data.shape[0]):
                v0 = data[i]
                y0 = labels[i]
                dW, dU, db, dc, dd = self.contrastive_divergence(v0, y0, k)

                # Update parameters
                self.W += learning_rate * dW
                self.U += learning_rate * dU
                self.b += learning_rate * db
                self.c += learning_rate * dc
                self.d += learning_rate * dd
            print(f"Epoch {epoch + 1}/{epochs} completed.")

    def compute_joint_probability(self, visible: np.ndarray) -> np.ndarray:
        """Compute p(x,y) for all possible classes"""
        hidden_probs = self.sigmoid(np.dot(visible, self.W) + self.c)
        class_probs = np.zeros(self.n_classes)
        
        for y in range(self.n_classes):
            y_one_hot = np.zeros(self.n_classes)
            y_one_hot[y] = 1
            class_activation = np.dot(y_one_hot, self.U)
            class_probs[y] = np.sum(hidden_probs * self.sigmoid(class_activation))
        
        return class_probs

    def classify(self, visible: np.ndarray) -> int:
        """Classify a visible input using p(y|x)"""
        joint_probs = self.compute_joint_probability(visible)
        return np.argmax(joint_probs)

    def softmax(self,x):
        """Compute softmax values for each set of scores in x."""
        e_x = np.exp(x - np.max(x))
        return e_x / e_x.sum()

    def sigmoid(self,x):
        """Compute sigmoid function"""
        return 1 / (1 + np.exp(-x))


Here is the training part where i take the data and try to train the model.

In [31]:
mnist = fetch_openml('mnist_784', version=1, as_frame=False)
X, y = mnist.data, mnist.target.astype(int)

# Normalize the data between 0 and 1
scaler = MinMaxScaler()
X = scaler.fit_transform(X)

# Binarize the data
binarizer = Binarizer(threshold=0.5)
X_binarized = binarizer.fit_transform(X)

# Split the data into train and test
X_train, X_test, y_train, y_test = train_test_split(X_binarized, y, test_size=0.2, random_state=42)

# Convert labels to one-hot encoding
onehot = OneHotEncoder(sparse_output=False)
y_train_onehot = onehot.fit_transform(y_train.reshape(-1, 1))

# Initialize RBM
n_visible = X_train.shape[1]  # 784 features
n_hidden = 256  # Number of hidden units
n_classes = 10   # Number of classes (digits 0-9)

# Create and train the RBM
rbm = SecureRBM(n_visible=n_visible, n_hidden=n_hidden, n_classes=n_classes)
rbm.train(
    data=X_train,
    labels=y_train_onehot,
    learning_rate=0.01,
    epochs=5,
    k=1
)

# Make predictions on test set
predictions = []
for sample in X_test:
    pred = rbm.classify(sample)
    predictions.append(pred)

# Convert predictions to numpy array
predictions = np.array(predictions)

# Calculate and print metrics
accuracy = accuracy_score(y_test, predictions)
print(f"\nAccuracy: {accuracy:.4f}")
print("\nClassification Report:")
print(classification_report(y_test, predictions))


Epoch 1/5 completed.
Epoch 2/5 completed.
Epoch 3/5 completed.
Epoch 4/5 completed.
Epoch 5/5 completed.

Accuracy: 0.9218

Classification Report:
              precision    recall  f1-score   support

           0       0.95      0.97      0.96      1343
           1       0.97      0.97      0.97      1600
           2       0.92      0.91      0.92      1380
           3       0.90      0.91      0.90      1433
           4       0.94      0.89      0.91      1295
           5       0.94      0.86      0.90      1273
           6       0.91      0.97      0.94      1396
           7       0.91      0.95      0.93      1503
           8       0.92      0.87      0.90      1357
           9       0.87      0.91      0.89      1420

    accuracy                           0.92     14000
   macro avg       0.92      0.92      0.92     14000
weighted avg       0.92      0.92      0.92     14000


Sample Predictions:
True: 8, Predicted: 8
True: 4, Predicted: 4
True: 8, Predicted: 8
True: 7

### Simulation Part 3

Here is the part 3 of the simulation question. In this part we have implemented the Discriminative Restricted Boltzmann Machine (DiscriminativeRBM) which is a supervised learning algorithm which has a free energy function that is used to calculate the gradient of the weights we used it base on our phase 1 code and we defined a method called train and evaluate which train our model then it will find its accuracy.

In [7]:
class DiscriminativeRBM(nn.Module):
    def __init__(self, input_size, hidden_size, num_categories):
        super(DiscriminativeRBM, self).__init__()
        self.weights = nn.Parameter(torch.randn(input_size, hidden_size) * 0.01)
        self.bias_h = nn.Parameter(torch.zeros(hidden_size))
        self.bias_y = nn.Parameter(torch.zeros(num_categories))
        self.weights_output = nn.Parameter(torch.randn(hidden_size, num_categories) * 0.01)

    def forward(self, x):
        hidden_probs = torch.sigmoid(torch.matmul(x, self.weights) + self.bias_h)
        output_logits = torch.matmul(hidden_probs, self.weights_output) + self.bias_y
        return output_logits

# Train the Discriminative RBM model
def train(model, training_loader, testing_loader, num_epochs=10, learning_rate=0.01):
    loss_function = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    for epoch in range(num_epochs):
        model.train()
        total_loss = 0
        for inputs, labels in training_loader:
            inputs, labels = inputs.view(inputs.size(0), -1), labels
            optimizer.zero_grad()
            logits = model(inputs)
            loss = loss_function(logits, labels)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()

        print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(training_loader):.4f}")

    return model

# Evaluate the Discriminative RBM model
def evaluate_accuracy(model, testing_loader):
    model.eval()
    all_predictions, all_true_labels = [], []
    with torch.no_grad():
        for inputs, targets in testing_loader:
            inputs = inputs.view(inputs.size(0), -1)
            logits = model(inputs)
            predictions = torch.argmax(logits, dim=1)
            all_predictions.extend(predictions.cpu().numpy())
            all_true_labels.extend(targets.cpu().numpy())

    accuracy = accuracy_score(all_true_labels, all_predictions)
    print(f"Test Accuracy: {accuracy:.4f}")
    accuracy = accuracy_score(all_true_labels, all_predictions)
    print(f"\nAccuracy: {accuracy:.4f}")

class MNISTBinaryDataset(Dataset):
    def __init__(self, inputs, targets):
        self.inputs = torch.tensor(inputs, dtype=torch.float32)
        self.targets = torch.tensor(targets, dtype=torch.long)

    def __len__(self):
        return len(self.inputs)

    def __getitem__(self, idx):
        return self.inputs[idx], self.targets[idx]

def load_mnist_binary():
    mnist = fetch_openml('mnist_784', version=1)
    X, y = mnist.data, mnist.target.astype(int)
    scaler = MinMaxScaler()
    X = scaler.fit_transform(X)
    binarizer = Binarizer(threshold=0.5)
    X_binarized = binarizer.fit_transform(X)
    X_train, X_test, y_train, y_test = train_test_split(X_binarized, y, test_size=0.2, random_state=42)

    y_train = y_train.to_numpy() if isinstance(y_train, pd.Series) else y_train
    y_test = y_test.to_numpy() if isinstance(y_test, pd.Series) else y_test
    
    train_dataset = MNISTBinaryDataset(X_train, y_train)
    test_dataset = MNISTBinaryDataset(X_test, y_test)

    train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)

    return train_loader, test_loader



In [8]:
train_dataset, test_dataset = load_mnist_binary()

# Initialize and train Discriminative RBM
discriminative_rbm_model = DiscriminativeRBM(784, 128, 10)
trained_model = train(discriminative_rbm_model, train_dataset, test_dataset, num_epochs=10, learning_rate=0.01)

# Evaluate the trained model
evaluate_accuracy(trained_model, test_dataset)

Epoch 1/10, Loss: 0.2700
Epoch 2/10, Loss: 0.1062
Epoch 3/10, Loss: 0.0762
Epoch 4/10, Loss: 0.0576
Epoch 5/10, Loss: 0.0470
Epoch 6/10, Loss: 0.0402
Epoch 7/10, Loss: 0.0344
Epoch 8/10, Loss: 0.0339
Epoch 9/10, Loss: 0.0352
Epoch 10/10, Loss: 0.0270
Test Accuracy: 0.9658

Accuracy: 0.9658


As you can see by using free energy and DiscriminativeRBM we could get higher accuracy around 96.58%.

### Simulation Part 4
here we try to implement hybrid learning algorithm which is a combination of unsupervised and supervised learning. We use the unsupervised learning to pretrain the model and then we use the supervised learning to fine tune the model.

In [44]:
import numpy as np

class HybridDRBM:
    def __init__(self, n_visible, n_hidden, n_classes, alpha=0.5):
        # Xavier initialization for weights
        self.W = np.random.normal(0, np.sqrt(2.0/n_visible), (n_visible, n_hidden))
        self.U = np.random.normal(0, np.sqrt(2.0/n_classes), (n_classes, n_hidden))
        self.d = np.zeros(n_hidden)
        self.c = np.zeros(n_classes)

        # Momentum buffers for updates
        self.vW = np.zeros_like(self.W)
        self.vU = np.zeros_like(self.U)
        self.vd = np.zeros_like(self.d)
        self.vc = np.zeros_like(self.c)

        self.n_classes = n_classes
        self.alpha = alpha  # Hybrid learning coefficient

        # Clip weights to prevent divergence
        self.W = np.clip(self.W, -1, 1)

    def _relu(self, x):
        return np.maximum(0, x)

    def _stable_softmax(self, x):
        e_x = np.exp(x - np.max(x))
        return e_x / np.sum(e_x)

    def free_energy(self, v, y):
        wx = np.dot(v, self.W)
        uy = np.dot(y, self.U)
        hidden = self._relu(self.d + wx + uy)
        return -np.sum(hidden) - np.dot(y, self.c)

    def train(self, X, y, learning_rate=0.001, momentum=0.9, batch_size=32, epochs=10):
        n_samples = len(X)

        for epoch in range(epochs):
            indices = np.random.permutation(n_samples)
            X, y = X[indices], y[indices]

            for i in range(0, n_samples, batch_size):
                batch_X = X[i:i + batch_size]
                batch_y = y[i:i + batch_size]

                # Forward pass with ReLU
                h_input = self.d + np.dot(batch_X, self.W) + np.dot(batch_y, self.U)
                h_act = self._relu(h_input)

                # Compute gradients for supervised classification
                dW_class = np.dot(batch_X.T, h_act) / batch_size
                dU_class = np.dot(batch_y.T, h_act) / batch_size
                dd_class = np.mean(h_act, axis=0)
                dc_class = np.mean(batch_y, axis=0)

                # Compute gradients for unsupervised energy minimization
                dW_unsup = np.dot(batch_X.T, h_act) / batch_size
                dU_unsup = np.dot(batch_y.T, h_act) / batch_size
                dd_unsup = np.mean(h_act, axis=0)
                dc_unsup = np.mean(batch_y, axis=0)

                # Hybrid update: blend supervised and unsupervised gradients with alpha
                dW = self.alpha * dW_class + (1 - self.alpha) * dW_unsup
                dU = self.alpha * dU_class + (1 - self.alpha) * dU_unsup
                dd = self.alpha * dd_class + (1 - self.alpha) * dd_unsup
                dc = self.alpha * dc_class + (1 - self.alpha) * dc_unsup

                # Update with momentum
                self.vW = momentum * self.vW + learning_rate * dW
                self.vU = momentum * self.vU + learning_rate * dU
                self.vd = momentum * self.vd + learning_rate * dd
                self.vc = momentum * self.vc + learning_rate * dc

                self.W += self.vW
                self.U += self.vU
                self.d += self.vd
                self.c += self.vc

            print(f"Epoch {epoch + 1}/{epochs} completed")

    def predict(self, X):
        n_classes = self.U.shape[0]
        predictions = []
        
        for v in X:
            # Compute free energy for each class
            energies = np.array([
                -self.free_energy(v.reshape(1, -1), np.eye(n_classes)[c].reshape(1, -1))
                for c in range(n_classes)
            ]).flatten()  # Ensure a flat list of energy values
            
            prob = self._stable_softmax(energies)
            predictions.append(np.argmax(prob))
        
        return np.array(predictions)



Here is the training part.

In [45]:
# Learning 
mnist = fetch_openml('mnist_784', version=1, as_frame=False)
X, y = mnist.data, mnist.target.astype(int)
scaler = MinMaxScaler()
X = scaler.fit_transform(X)
binarizer = Binarizer(threshold=0.5)
X_binarized = binarizer.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X_binarized, y, test_size=0.2, random_state=42)
onehot = OneHotEncoder(sparse_output=False)
y_train_onehot = onehot.fit_transform(y_train.reshape(-1, 1))

alphas = [0.01, 0.1, 1.0]

for alpha in alphas:
    model = HybridDRBM(n_visible=784, n_hidden=256, n_classes=10, alpha=alpha)
    print(f"Training Hybrid DRBM with alpha={alpha}...")
    model.train(X_train, y_train_onehot, learning_rate=0.01, batch_size=32, epochs=5)
    
    # Evaluate
    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    print(f"\nAccuracy for alpha = {alpha}: {accuracy:.4f}")
    print("\nClassification Report:")
    print(classification_report(y_test, predictions, zero_division=1))




Training Hybrid DRBM with alpha=0.01...


  ret = umr_sum(arr, axis, dtype, out, keepdims, where=where)


Epoch 1/5 completed
Epoch 2/5 completed
Epoch 3/5 completed
Epoch 4/5 completed
Epoch 5/5 completed

Accuracy for alpha = 0.01: 0.0959

Classification Report:
              precision    recall  f1-score   support

           0       0.10      1.00      0.18      1343
           1       1.00      0.00      0.00      1600
           2       1.00      0.00      0.00      1380
           3       1.00      0.00      0.00      1433
           4       1.00      0.00      0.00      1295
           5       1.00      0.00      0.00      1273
           6       1.00      0.00      0.00      1396
           7       1.00      0.00      0.00      1503
           8       1.00      0.00      0.00      1357
           9       1.00      0.00      0.00      1420

    accuracy                           0.10     14000
   macro avg       0.91      0.10      0.02     14000
weighted avg       0.91      0.10      0.02     14000

Training Hybrid DRBM with alpha=0.1...


  ret = umr_sum(arr, axis, dtype, out, keepdims, where=where)


Epoch 1/5 completed
Epoch 2/5 completed
Epoch 3/5 completed
Epoch 4/5 completed
Epoch 5/5 completed

Accuracy for alpha = 0.1: 0.0959

Classification Report:
              precision    recall  f1-score   support

           0       0.10      1.00      0.18      1343
           1       1.00      0.00      0.00      1600
           2       1.00      0.00      0.00      1380
           3       1.00      0.00      0.00      1433
           4       1.00      0.00      0.00      1295
           5       1.00      0.00      0.00      1273
           6       1.00      0.00      0.00      1396
           7       1.00      0.00      0.00      1503
           8       1.00      0.00      0.00      1357
           9       1.00      0.00      0.00      1420

    accuracy                           0.10     14000
   macro avg       0.91      0.10      0.02     14000
weighted avg       0.91      0.10      0.02     14000

Training Hybrid DRBM with alpha=1.0...


  ret = umr_sum(arr, axis, dtype, out, keepdims, where=where)
  dW = self.alpha * dW_class + (1 - self.alpha) * dW_unsup
  dd = self.alpha * dd_class + (1 - self.alpha) * dd_unsup


Epoch 1/5 completed
Epoch 2/5 completed
Epoch 3/5 completed
Epoch 4/5 completed
Epoch 5/5 completed

Accuracy for alpha = 1.0: 0.0959

Classification Report:
              precision    recall  f1-score   support

           0       0.10      1.00      0.18      1343
           1       1.00      0.00      0.00      1600
           2       1.00      0.00      0.00      1380
           3       1.00      0.00      0.00      1433
           4       1.00      0.00      0.00      1295
           5       1.00      0.00      0.00      1273
           6       1.00      0.00      0.00      1396
           7       1.00      0.00      0.00      1503
           8       1.00      0.00      0.00      1357
           9       1.00      0.00      0.00      1420

    accuracy                           0.10     14000
   macro avg       0.91      0.10      0.02     14000
weighted avg       0.91      0.10      0.02     14000



As you can see our model could learn by adding hybrid learning and we could not get more accuracy by changing epochs and learning rate or adding momentum and this was our results.