---
title: "Restricted Boltzmann Machines"
subtitle: "Theory of RBMs and Applications"
author: "Jessica Wells and Jason Gerstenberger (Advisor: Dr. Cohen)"
date: '`r Sys.Date()`'
format:
  html:
    code-fold: true
course: Capstone Projects in Data Science IDC 6940
university: University of West Florida, Pensacola, FL
bibliography: references.bib # file contains bibtex for references
#always_allow_html: true # this allows to get PDF with HTML features
self-contained: true
execute: 
  warning: false
  message: false
editor: 
  markdown: 
    wrap: 72
---




## Introduction

### Background

Restricted Boltzmann Machines (RBM) are a type of neural network that has been around since the 1980s. As a reminder to the reader, machine learning is generally divided into 3 categories: supervised learning (examples: classification tasks, regression), unsupervised learning (examples: clustering, dimensionality reduction, generative modeling), and reinforcement learning (examples: gaming/robotics). RBMs are primarily used for unsupervised learning tasks like dimensionality reduction and feature extraction, which help prepare datasets for machine learning models that may later be trained using supervised learning. They also have other applications which will be discussed further later.

Like Hopfield networks, Boltzmann machines are undirected graphical models, but they are different in that they are stochastic and can have hidden units. Both models are energy-based, meaning they learn by minimizing an energy function for each model [@smolensky1986information]. Boltzmann machines use a sigmoid activation function, which allows for the model to be probabilistic.

In the "Restricted" Boltzmann Machine, there are no interactions between neurons in the visible layer or between neurons in the hidden layer, creating a bipartite graph of neurons. Below is a diagram taken from Goodfellow, et al. [@Goodfellow-et-al-2016] (p. 577) for visualization of the connections.

![](markov_net.png){width=60%}
<br>

Goodfellow, et al. discuss the expense in drawing samples for most undirected graphical models; however, the RBM allows for block Gibbs sampling (p. 578) where the network alternates between sampling all hidden units simultaneously (etc. for visible). Derivatives are also simplified by the fact that the energy function of the RBM is a linear function of it's parameters, which will be seen further in Methods.  

RBMs are trained using a process called Contrastive Divergence (CD) [@hinton2002training] where the weights are updated to minimize the difference between samples from the data and samples from the model. Learning rate, batch size, and number of hidden units are all hyperparameters that can affect the ability of the training to converge successfully and learn the underlying structure of the data.



### Applications

RBMs are probably best known for their success in collaborative filtering. The RBM model was used in the Netflix Prize competition to predict user ratings for movies, with the result that it outperformed the Singular Value Decomposition (SVD) method that was state-of-the-art at the time [@salakhutdinov2007restricted]. They have also been trained to recognize handwritten digits, such as the MNIST dataset [@hinton2002training].

RBMs have been successfully used to distinguish normal and anomalous network traffic. Their potential use in improving network security for companies in the future is promising. There is slow progress in network anomaly detection due to the difficulty of obtaining datasets for training and testing networks. Clients are often reluctant to divulge information that could potentially harm their networks. In a real-life dataset where one host had normal traffic and one was infected by a bot, discriminative RBM (DRBM) was able to successfully distinguish the normal from anomalous traffic. DRBM doesn't rely on knowing the data distribution ahead of time, which is useful, except that it also causes the DRBM to overfit. As a result, when trying to use the same trained RBM on the KDD '99 training dataset performance declined. [@fiore2013network]

RBMs can provide greatly improved classification of brain disorders in MRI images. Generative Adversarial Networks (GANs) use two neural networks: a generator which generates fake data, and a discriminator which tries to distinguish between real and fake data. Loss from the discriminator is backpropagated through the generator so that both part are trained simultaneously. The RBM-GAN uses RBM features from real MRI images as inputs to the generator. Features from the discriminator are then used as inputs to a classifier. [@aslan2023automated]

The many-body quantum wavefunction, which describes the quantum state of a system of particles is difficult to compute with classical computers. RBMs have been used to approximate it using variational Monte Carlo methods. [@melko2019restricted]

RBMs are notoriously slow to train. The process of computing the activation probability requires the calculation of vector dot products. Lean Constrastive Divergence (LCD) is a method which adds two techniques to speed up the process of training RBMs. The first is bounds-based filtering where upper and lower bounds of the probability select only a range of dot products to perform. Second, the delta product involves only recalculating the changed portions of the vector dot product. [@ning2018lcd]



## Methods


Below is the energy function of the RBM. 

$$
E(v,h) = - \sum_{i} a_i v_i - \sum_{j} b_j h_j - \sum_{i} \sum_{j} v_i w_{i,j} h_j
$$ {#eq-energy}
where v<sub>i</sub> and h<sub>j</sub> represent visible and hidden units; a<sub>i</sub> and b<sub>j</sub> are the bias terms of the visible and hidden units; and each w<sub>{i,j}</sub> (weight) element represents the interaction between the visible and hidden units. [@fischer2012introduction]


It is well known neural networks are prone to overfitting and often techniques such as early stopping are employed to prevent it. Some methods to prevent overfitting in RBMs are weight decay (L2 regularization), dropout, dropconnect, and weight uncertainty [@zhang2018overview]. Dropout is a fairly well known concept in deep learning. For example, a dropout value of 0.3 added to a layer means 30% of neurons are dropped during training. This prevents the network from learning certain features too well. L2 regularization is also a commonly employed technique in deep learning. It assigns a penalty to large weights to allow for more generalization. Dropconnect is a method where a subset of weights within the network are randomly set to zero during training. Weight uncertainty is where each weight in the network has it's own probability distribution vice a fixed value. This addition allows the network to learn more useful features. 

If the learning rate is too high, training of the model may not converge. If it is too low, training may take a long time. To fully maximize the training of the model it is helpful to reduce the learning rate over time. This is known as learning rate decay. [@hinton2010practical]

##### Logistic Regression
One technique we explore is standardizing Fashion MNIST features/pixels, then training a RBM (unsupervised learning) to extract hidden features from the visible layer and then feed these features into the Logistic Regression Model (vice feeding the raw pixels). The hidden features from the RBM are standardized again before being used as input features for the logistic regression classifier. Then we use the trained logistic regression model to predict labels for test data, evaluating how well the RBM-derived features perform in a supervised classification task. It is helpful to remind the reader about the methodology behind Logistic Regression.

$$
P(Y = k | X) = \frac{e^{\beta_{0k} + \beta_k^T X}}{\sum_{l=1}^{K} e^{\beta_{0l} + \beta_l^T X}}
$$ {#eq-probability-lr}

Mathematically, the concept behind binary logistic regression is the logit (the natural logarithm of an odds ratio)[@peng2002introduction]. However, since we have 10 labels, our classification task falls into "Multinomial Logistic Regression." 



##### Below is our Process for creating the RBM:
Step 1: We first initialize the RBM with random weights and biases and set visible units to 784 and hidden units to 256. We also set the number of contrastive divergence steps (k) to 1. </br>
Step 2: Sample hidden units from visible. The math behind computing the hidden unit activations from the given input can be seen in [@eq-probability-rbm1] [@fischer2012introduction] where the probability is used to sample from the Bernoulli distribution. </br>
$$
p(H_i = 1 | \mathbf{v}) = \sigma \left( \sum_{j=1}^{m} w_{ij} v_j + c_i \right)
$$ {#eq-probability-rbm1}
Step 3: Sample visible units from hidden. The math behind computing visible unit activations from the hidden layer can be seen in [@eq-probability-rbm2] [@fischer2012introduction] Visible states are sampled using the Bernoulli distribution. This way we can see how well the RBM learned from the inputs. </br>
$$
p(V_j = 1 | \mathbf{h}) = \sigma \left( \sum_{i=1}^{n} w_{ij} h_i + b_j \right)
$$ {#eq-probability-rbm2}

Step 4: K=1 steps of Contrastive Divergence (Feed Forward, Feed Backward) which executes steps 2 and 3. Contrastive Divergence updates the RBM’s weights by minimizing the difference between the original input and the reconstructed input created by the RBM. </br>
Step 5: Free energy is computed. The free energy F is given by the logarithm of the partition function Z [@oh2020entropy] where the partition function is  <br>
$$
Z(\theta) \equiv \sum_{v,h} e^{-E(v,h; \theta)}
$$ {#eq-partition}
and the free energy function is <br>
$$
F(\theta) = -\ln Z(\theta)
$$ {#eq-free-energy}
where lower free energy means the RBM learned the visible state well.

Step 6: Train the RBM. Model weights updated via gradient descent.<br>
Step 7: Feature extraction for classification with LR. The hidden layer activations of the RBM are used as features for LR.

##### Hyperparameter Tuning

We use the Tree-structured Parzen Estimator algorithm from Optuna [@akiba2019optuna] to tune the hyperparameters of the RBM and the classifier models, and we use MLFlow [@zaharia2018accelerating] to record and visualize the results of the hyperparameter tuning process. The hyperparameters we tune include the learning rate, batch size, number of hidden units, and number of epochs.

<!-- 

-   Detail the models or algorithms used.

-   Justify your choices based on the problem and data.

*The common non-parametric regression model is*
$Y_i = m(X_i) + \varepsilon_i$*, where* $Y_i$ *can be defined as the sum
of the regression function value* $m(x)$ *for* $X_i$*. Here* $m(x)$ *is
unknown and* $\varepsilon_i$ *some errors. With the help of this
definition, we can create the estimation for local averaging i.e.*
$m(x)$ *can be estimated with the product of* $Y_i$ *average and* $X_i$
*is near to* $x$*. In other words, this means that we are discovering
the line through the data points with the help of surrounding data
points. The estimation formula is printed below [@R-base]:*

$$
M_n(x) = \sum_{i=1}^{n} W_n (X_i) Y_i  \tag{1}
$$$W_n(x)$ *is the sum of weights that belongs to all real numbers.
Weights are positive numbers and small if* $X_i$ *is far from* $x$*.*


*Another equation:*

$$
y_i = \beta_0 + \beta_1 X_1 +\varepsilon_i
$$

-->

## Analysis and Results

### Data Exploration and Visualization

We use the Fashion MNIST dataset from Zalando Research [@xiao2017/online]. The set includes 70,000 grayscale images of clothing items, 60,000 for training and 10,000 for testing. Each image is 28x28 pixels (784 pixels total). Each pixel has a value associated with it ranging from 0 (white) to 255 (very dark) -- whole numbers only. There are 785 columns in total as one column is dedicated to the label. 

![](fmnist.png){width=60%}
<br>

There are 10 labels in total:<br>

0 T-shirt/top<br>
1 Trouser<br>
2 Pullover<br>
3 Dress<br>
4 Coat<br>
5 Sandal<br>
6 Shirt<br>
7 Sneaker<br>
8 Bag<br>
9 Ankle boot<br>


Below we load the dataset.


In [None]:
import pandas as pd
import numpy as np
from sklearn.linear_model import LogisticRegression
import torch
import torchvision.datasets
import torchvision.models
import torchvision.transforms as transforms
import matplotlib.pyplot as plt



train_data = torchvision.datasets.FashionMNIST(
    root="./data", 
    train=True, 
    download=True, 
    transform=transforms.ToTensor()  # Converts to tensor but does NOT normalize
)

test_data = torchvision.datasets.FashionMNIST(
    root="./data", 
    train=False, 
    download=True, 
    transform=transforms.ToTensor()  
)

Get the seventh image to show a sample 


In [None]:
# Extract the first image (or choose any index)
image_tensor, label = train_data[6]  # shape: [1, 28, 28]

# Convert tensor to NumPy array
image_array = image_tensor.numpy().squeeze()  

# Plot the image
plt.figure(figsize=(5,5))
plt.imshow(image_array, cmap="gray")
plt.title(f"FashionMNIST Image (Label: {label})")
plt.axis("off")  # Hide axes
plt.show()

In [None]:
train_images = train_data.data.numpy()  # Raw pixel values (0-255)
train_labels = train_data.targets.numpy()
X = train_images.reshape(-1, 784)  # Flatten 28x28 images into 1D (60000, 784)

In [None]:
#print(train_images[:5])
flattened = train_images[:5].reshape(5, -1) 

# Create a DataFrame
df_flat = pd.DataFrame(flattened)
print(df_flat.head())
#train_df.info() #datatypes are integers

There are no missing values in the data.


In [None]:
print(np.isnan(train_images).any()) 

<b>There appears to be no class imbalance</b>


In [None]:
unique_labels, counts = np.unique(train_labels, return_counts=True)

# Print the counts sorted by label
for label, count in zip(unique_labels, counts):
    print(f"Label {label}: {count}")

In [None]:
print(f"X shape: {X.shape}")

t-SNE Visualization


In [None]:
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

# Run t-SNE to reduce dimensionality
#embeddings = TSNE(n_jobs=2).fit_transform(X)

tsne = TSNE(n_jobs=-1, random_state=42)  # Use -1 to use all available cores
embeddings = tsne.fit_transform(X) #use scikitlearn instead


# Create scatter plot
figure = plt.figure(figsize=(15,7))
plt.scatter(embeddings[:, 0], embeddings[:, 1], c=train_labels,
            cmap=plt.cm.get_cmap("jet", 10), marker='.')
plt.colorbar(ticks=range(10))
plt.clim(-0.5, 9.5)
plt.title("t-SNE Visualization of Fashion MNIST")
plt.show()

<!-- 

-   Describe your data sources and collection process.

-   Present initial findings and insights through visualizations.

-   Highlight unexpected patterns or anomalies.

-->


### Modeling and Results
<!--
-   Explain your data preprocessing and cleaning steps.

-   Present your key findings in a clear and concise manner.

-   Use visuals to support your claims.

-   **Tell a story about what the data reveals.**
-->

<b>Our Models</b><br>
1. Logistic Regression on Fashion MNIST Data<br>
2. Feed Forward Network on Fashion MNIST Data<br>
3. Convolutional Neural Network on Fashion MNIST Data<br>
4. Logistic Regression on RBM Hidden Features (of Fashion MNIST Data)<br>
5. Feed Forward Network on RBM Hidden Features (of Fashion MNIST Data)<br>


<b>Import Libraries and Re-load data for first 3 models</b>


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import datasets, transforms
import numpy as np
import mlflow
import optuna
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from torch.utils.data import DataLoader

# Set device
device = torch.device("mps")

# Load Fashion-MNIST dataset again for the first 3 models
transform = transforms.Compose([transforms.ToTensor()])
train_dataset = datasets.FashionMNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.FashionMNIST(root='./data', train=False, transform=transform, download=True)

In [None]:
# import mlflow
# 
# print("MLflow Tracking URI:", mlflow.get_tracking_uri())
# print("MLflow Artifact URI:", mlflow.get_artifact_uri())

<b>Model 1: Logistic Regression on Fashion MNIST Data</b>


In [None]:
CLASSIFIER = "LogisticRegression"  # Change for FNN, LogisticRegression, or CNN

# Set MLflow experiment name
if CLASSIFIER == "LogisticRegression":
    experiment = mlflow.set_experiment("pytorch-fmnist-lr-noRBM")
elif CLASSIFIER == "FNN":
    experiment = mlflow.set_experiment("pytorch-fmnist-fnn-noRBM")
elif CLASSIFIER == "CNN":
    experiment = mlflow.set_experiment("pytorch-fmnist-cnn-noRBM")

# Define CNN model
class FashionCNN(nn.Module):
    def __init__(self, filters1, filters2, kernel1, kernel2):
        super(FashionCNN, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=filters1, kernel_size=kernel1, padding=1),
            nn.BatchNorm2d(filters1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=filters1, out_channels=filters2, kernel_size=kernel2),
            nn.BatchNorm2d(filters2),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.fc1 = None
        self.drop = nn.Dropout2d(0.25)
        self.fc2 = nn.Linear(in_features=600, out_features=120)
        self.fc3 = nn.Linear(in_features=120, out_features=10)
        

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.view(out.size(0), -1)
        if self.fc1 is None:
            self.fc1 = nn.Linear(out.shape[1], 600).to(x.device)
        out = self.fc1(out)
        out = self.drop(out)
        out = self.fc2(out)
        out = self.fc3(out)
        return out



        # Dynamically calculate flattened size
        out = out.view(out.size(0), -1)  # Flatten
        if self.fc1 is None:
            self.fc1 = nn.Linear(out.shape[1], 600).to(x.device)  # ✅ Update FC layer dynamically

        out = self.fc1(out)
        out = self.drop(out)
        out = self.fc2(out)
        out = self.fc3(out)
        return out




# Define Optuna objective function
def objective(trial):
    batch_size = trial.suggest_int("batch_size", 64, 256, step=32)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    mlflow.start_run(experiment_id=experiment.experiment_id)
    num_classifier_epochs = trial.suggest_int("num_classifier_epochs", 5, 5) 
    mlflow.log_param("num_classifier_epochs", num_classifier_epochs)

    if CLASSIFIER == "FNN":
        hidden_size = trial.suggest_int("fnn_hidden", 192, 384)
        learning_rate = trial.suggest_float("learning_rate", 0.0001, 0.0025)

        mlflow.log_param("classifier", "FNN")
        mlflow.log_param("fnn_hidden", hidden_size)
        mlflow.log_param("learning_rate", learning_rate)

        model = nn.Sequential(
            nn.Linear(784, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 10)
        ).to(device)


        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    elif CLASSIFIER == "CNN":
        filters1 = trial.suggest_int("filters1", 16, 64, step=16)
        filters2 = trial.suggest_int("filters2", 32, 128, step=32)
        kernel1 = trial.suggest_int("kernel1", 3, 5)
        kernel2 = trial.suggest_int("kernel2", 3, 5)
        learning_rate = trial.suggest_float("learning_rate", 0.0001, 0.0025)

        mlflow.log_param("classifier", "CNN")
        mlflow.log_param("filters1", filters1)
        mlflow.log_param("filters2", filters2)
        mlflow.log_param("kernel1", kernel1)
        mlflow.log_param("kernel2", kernel2)
        mlflow.log_param("learning_rate", learning_rate)

        model = FashionCNN(filters1, filters2, kernel1, kernel2).to(device)
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

      
    elif CLASSIFIER == "LogisticRegression":
        mlflow.log_param("classifier", "LogisticRegression")
    
        # Prepare data for Logistic Regression (Flatten 28x28 images to 784 features)
        train_features = train_dataset.data.view(-1, 784).numpy()
        train_labels = train_dataset.targets.numpy()
        test_features = test_dataset.data.view(-1, 784).numpy()
        test_labels = test_dataset.targets.numpy()
    
        # Normalize the pixel values to [0,1] for better convergence
        train_features = train_features / 255.0
        test_features = test_features / 255.0
    
    
        C = trial.suggest_float("C", 0.01, 10.0, log=True)  
        solver = "saga" 
    
        model = LogisticRegression(C=C, max_iter=num_classifier_epochs, solver=solver)
        model.fit(train_features, train_labels)
    
    
        predictions = model.predict(test_features)
        accuracy = accuracy_score(test_labels, predictions) * 100
        print(f"Logistic Regression Test Accuracy: {accuracy:.2f}%")
    
        mlflow.log_param("C", C)
        mlflow.log_metric("test_accuracy", accuracy)
        mlflow.end_run()
        return accuracy

    # Training Loop for FNN and CNN
    criterion = nn.CrossEntropyLoss()


    model.train()
    for epoch in range(num_classifier_epochs):
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images) if CLASSIFIER == "CNN" else model(images.view(images.size(0), -1))

            optimizer.zero_grad()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"{CLASSIFIER} Epoch {epoch+1}: loss = {running_loss / len(train_loader):.4f}")

    # Model Evaluation
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images) if CLASSIFIER == "CNN" else model(images.view(images.size(0), -1))
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")

    mlflow.log_metric("test_accuracy", accuracy)
    mlflow.end_run()
    return accuracy

if __name__ == "__main__":
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=50)
    print(f"Best Parameters for {CLASSIFIER}:", study.best_params)
    print("Best Accuracy:", study.best_value)

![](lr_norbm.png){width=60%}




<b>Model 2: Feed Forward Network on Fashion MNIST Data</b>


In [None]:
CLASSIFIER = "FNN"  # Change for FNN, LogisticRegression, or CNN

# Set MLflow experiment name
if CLASSIFIER == "LogisticRegression":
    experiment = mlflow.set_experiment("pytorch-fmnist-lr-noRBM")
elif CLASSIFIER == "FNN":
    experiment = mlflow.set_experiment("pytorch-fmnist-fnn-noRBM")
elif CLASSIFIER == "CNN":
    experiment = mlflow.set_experiment("pytorch-fmnist-cnn-noRBM")

# Define CNN model
class FashionCNN(nn.Module):
    def __init__(self, filters1, filters2, kernel1, kernel2):
        super(FashionCNN, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=filters1, kernel_size=kernel1, padding=1),
            nn.BatchNorm2d(filters1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=filters1, out_channels=filters2, kernel_size=kernel2),
            nn.BatchNorm2d(filters2),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.fc1 = None
        self.drop = nn.Dropout2d(0.25)
        self.fc2 = nn.Linear(in_features=600, out_features=120)
        self.fc3 = nn.Linear(in_features=120, out_features=10)
        

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.view(out.size(0), -1)
        if self.fc1 is None:
            self.fc1 = nn.Linear(out.shape[1], 600).to(x.device)
        out = self.fc1(out)
        out = self.drop(out)
        out = self.fc2(out)
        out = self.fc3(out)
        return out



        # Dynamically calculate flattened size
        out = out.view(out.size(0), -1)  # Flatten
        if self.fc1 is None:
            self.fc1 = nn.Linear(out.shape[1], 600).to(x.device)  # ✅ Update FC layer dynamically

        out = self.fc1(out)
        out = self.drop(out)
        out = self.fc2(out)
        out = self.fc3(out)
        return out




# Define Optuna objective function
def objective(trial):
    batch_size = trial.suggest_int("batch_size", 64, 256, step=32)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    mlflow.start_run(experiment_id=experiment.experiment_id)
    num_classifier_epochs = trial.suggest_int("num_classifier_epochs", 5, 5) 
    mlflow.log_param("num_classifier_epochs", num_classifier_epochs)

    if CLASSIFIER == "FNN":
        hidden_size = trial.suggest_int("fnn_hidden", 192, 384)
        learning_rate = trial.suggest_float("learning_rate", 0.0001, 0.0025)

        mlflow.log_param("classifier", "FNN")
        mlflow.log_param("fnn_hidden", hidden_size)
        mlflow.log_param("learning_rate", learning_rate)

        model = nn.Sequential(
            nn.Linear(784, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 10)
        ).to(device)


        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    elif CLASSIFIER == "CNN":
        filters1 = trial.suggest_int("filters1", 16, 64, step=16)
        filters2 = trial.suggest_int("filters2", 32, 128, step=32)
        kernel1 = trial.suggest_int("kernel1", 3, 5)
        kernel2 = trial.suggest_int("kernel2", 3, 5)
        learning_rate = trial.suggest_float("learning_rate", 0.0001, 0.0025)

        mlflow.log_param("classifier", "CNN")
        mlflow.log_param("filters1", filters1)
        mlflow.log_param("filters2", filters2)
        mlflow.log_param("kernel1", kernel1)
        mlflow.log_param("kernel2", kernel2)
        mlflow.log_param("learning_rate", learning_rate)

        model = FashionCNN(filters1, filters2, kernel1, kernel2).to(device)
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

      
    elif CLASSIFIER == "LogisticRegression":
        mlflow.log_param("classifier", "LogisticRegression")
    
        # Prepare data for Logistic Regression (Flatten 28x28 images to 784 features)
        train_features = train_dataset.data.view(-1, 784).numpy()
        train_labels = train_dataset.targets.numpy()
        test_features = test_dataset.data.view(-1, 784).numpy()
        test_labels = test_dataset.targets.numpy()
    
        # Normalize the pixel values to [0,1] for better convergence
        train_features = train_features / 255.0
        test_features = test_features / 255.0
    
    
        C = trial.suggest_float("C", 0.01, 10.0, log=True)  
        solver = "saga" 
    
        model = LogisticRegression(C=C, max_iter=num_classifier_epochs, solver=solver)
        model.fit(train_features, train_labels)
    
    
        predictions = model.predict(test_features)
        accuracy = accuracy_score(test_labels, predictions) * 100
        print(f"Logistic Regression Test Accuracy: {accuracy:.2f}%")
    
        mlflow.log_param("C", C)
        mlflow.log_metric("test_accuracy", accuracy)
        mlflow.end_run()
        return accuracy

    # Training Loop for FNN and CNN
    criterion = nn.CrossEntropyLoss()


    model.train()
    for epoch in range(num_classifier_epochs):
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images) if CLASSIFIER == "CNN" else model(images.view(images.size(0), -1))

            optimizer.zero_grad()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"{CLASSIFIER} Epoch {epoch+1}: loss = {running_loss / len(train_loader):.4f}")

    # Model Evaluation
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images) if CLASSIFIER == "CNN" else model(images.view(images.size(0), -1))
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")

    mlflow.log_metric("test_accuracy", accuracy)
    mlflow.end_run()
    return accuracy

if __name__ == "__main__":
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=50)
    print(f"Best Parameters for {CLASSIFIER}:", study.best_params)
    print("Best Accuracy:", study.best_value)

Test Accuracy by FNN Hidden Units

![](fnn_norbm.png){width=60%}


<b>Model 3: Convolutional Neural Network on Fashion MNIST Data</b><br>
Base code for CNN structure borrowed from [Kaggle](https://www.kaggle.com/code/pankajj/fashion-mnist-with-pytorch-93-accuracy)


In [None]:
CLASSIFIER = "CNN"  # Change for FNN, LogisticRegression, or CNN

# Set MLflow experiment name
if CLASSIFIER == "LogisticRegression":
    experiment = mlflow.set_experiment("pytorch-fmnist-lr-noRBM")
elif CLASSIFIER == "FNN":
    experiment = mlflow.set_experiment("pytorch-fmnist-fnn-noRBM")
elif CLASSIFIER == "CNN":
    experiment = mlflow.set_experiment("pytorch-fmnist-cnn-noRBM")

# Define CNN model
class FashionCNN(nn.Module):
    def __init__(self, filters1, filters2, kernel1, kernel2):
        super(FashionCNN, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=filters1, kernel_size=kernel1, padding=1),
            nn.BatchNorm2d(filters1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.layer2 = nn.Sequential(
            nn.Conv2d(in_channels=filters1, out_channels=filters2, kernel_size=kernel2),
            nn.BatchNorm2d(filters2),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.fc1 = None
        self.drop = nn.Dropout2d(0.25)
        self.fc2 = nn.Linear(in_features=600, out_features=120)
        self.fc3 = nn.Linear(in_features=120, out_features=10)
        

    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.view(out.size(0), -1)
        if self.fc1 is None:
            self.fc1 = nn.Linear(out.shape[1], 600).to(x.device)
        out = self.fc1(out)
        out = self.drop(out)
        out = self.fc2(out)
        out = self.fc3(out)
        return out



        # Dynamically calculate flattened size
        out = out.view(out.size(0), -1)  # Flatten
        if self.fc1 is None:
            self.fc1 = nn.Linear(out.shape[1], 600).to(x.device)  # ✅ Update FC layer dynamically

        out = self.fc1(out)
        out = self.drop(out)
        out = self.fc2(out)
        out = self.fc3(out)
        return out




# Define Optuna objective function
def objective(trial):
    batch_size = trial.suggest_int("batch_size", 64, 256, step=32)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    mlflow.start_run(experiment_id=experiment.experiment_id)
    num_classifier_epochs = trial.suggest_int("num_classifier_epochs", 5, 5) 
    mlflow.log_param("num_classifier_epochs", num_classifier_epochs)

    if CLASSIFIER == "FNN":
        hidden_size = trial.suggest_int("fnn_hidden", 192, 384)
        learning_rate = trial.suggest_float("learning_rate", 0.0001, 0.0025)

        mlflow.log_param("classifier", "FNN")
        mlflow.log_param("fnn_hidden", hidden_size)
        mlflow.log_param("learning_rate", learning_rate)

        model = nn.Sequential(
            nn.Linear(784, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 10)
        ).to(device)


        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    elif CLASSIFIER == "CNN":
        filters1 = trial.suggest_int("filters1", 16, 64, step=16)
        filters2 = trial.suggest_int("filters2", 32, 128, step=32)
        kernel1 = trial.suggest_int("kernel1", 3, 5)
        kernel2 = trial.suggest_int("kernel2", 3, 5)
        learning_rate = trial.suggest_float("learning_rate", 0.0001, 0.0025)

        mlflow.log_param("classifier", "CNN")
        mlflow.log_param("filters1", filters1)
        mlflow.log_param("filters2", filters2)
        mlflow.log_param("kernel1", kernel1)
        mlflow.log_param("kernel2", kernel2)
        mlflow.log_param("learning_rate", learning_rate)

        model = FashionCNN(filters1, filters2, kernel1, kernel2).to(device)
        optimizer = optim.Adam(model.parameters(), lr=learning_rate)

      
    elif CLASSIFIER == "LogisticRegression":
        mlflow.log_param("classifier", "LogisticRegression")
    
        # Prepare data for Logistic Regression (Flatten 28x28 images to 784 features)
        train_features = train_dataset.data.view(-1, 784).numpy()
        train_labels = train_dataset.targets.numpy()
        test_features = test_dataset.data.view(-1, 784).numpy()
        test_labels = test_dataset.targets.numpy()
    
        # Normalize the pixel values to [0,1] for better convergence
        train_features = train_features / 255.0
        test_features = test_features / 255.0
    
    
        C = trial.suggest_float("C", 0.01, 10.0, log=True)  
        solver = "saga" 
    
        model = LogisticRegression(C=C, max_iter=num_classifier_epochs, solver=solver)
        model.fit(train_features, train_labels)
    
    
        predictions = model.predict(test_features)
        accuracy = accuracy_score(test_labels, predictions) * 100
        print(f"Logistic Regression Test Accuracy: {accuracy:.2f}%")
    
        mlflow.log_param("C", C)
        mlflow.log_metric("test_accuracy", accuracy)
        mlflow.end_run()
        return accuracy

    # Training Loop for FNN and CNN
    criterion = nn.CrossEntropyLoss()


    model.train()
    for epoch in range(num_classifier_epochs):
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images) if CLASSIFIER == "CNN" else model(images.view(images.size(0), -1))

            optimizer.zero_grad()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        print(f"{CLASSIFIER} Epoch {epoch+1}: loss = {running_loss / len(train_loader):.4f}")

    # Model Evaluation
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images) if CLASSIFIER == "CNN" else model(images.view(images.size(0), -1))
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Test Accuracy: {accuracy:.2f}%")

    mlflow.log_metric("test_accuracy", accuracy)
    mlflow.end_run()
    return accuracy

if __name__ == "__main__":
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=50)
    print(f"Best Parameters for {CLASSIFIER}:", study.best_params)
    print("Best Accuracy:", study.best_value)

Test Accuracy Based on the Number of Filters in the First Conv2D Layer

![](filters1.png){width=60%}
Test Accuracy Based on the Number of Filters in the Second Conv2D Layer
![](filters2.png){width=60%}
Test Accuracy Based on Kernel Size in the First Conv2D Layer
![](kernelsize1.png){width=60%}
Test Accuracy Based on Kernel Size in the Second Conv2D Layer
![](kernelsize2.png){width=60%}


<b>Model 4: Logistic Regression on RBM Hidden Features (of Fashion MNIST Data)</b><br>


In [None]:
CLASSIFIER = 'LogisticRegression'

if CLASSIFIER == 'LogisticRegression':
    experiment = mlflow.set_experiment("pytorch-fmnist-lr-withrbm")
else:
    experiment = mlflow.set_experiment("pytorch-fmnist-fnn-withrbm")


class RBM(nn.Module):
    def __init__(self, n_visible=784, n_hidden=256, k=1):
        super(RBM, self).__init__()
        self.n_visible = n_visible
        self.n_hidden = n_hidden
        # Initialize weights and biases
        self.W = nn.Parameter(torch.randn(n_hidden, n_visible) * 0.1)
        self.v_bias = nn.Parameter(torch.zeros(n_visible))
        self.h_bias = nn.Parameter(torch.zeros(n_hidden))
        self.k = k  # CD-k steps

    def sample_h(self, v):
        # Given visible v, sample hidden h
        p_h = torch.sigmoid(F.linear(v, self.W, self.h_bias))  # p(h=1|v)
        h_sample = torch.bernoulli(p_h)                        # sample Bernoulli
        return p_h, h_sample

    def sample_v(self, h):
        # Given hidden h, sample visible v
        p_v = torch.sigmoid(F.linear(h, self.W.t(), self.v_bias))  # p(v=1|h)
        v_sample = torch.bernoulli(p_v)
        return p_v, v_sample

    def forward(self, v):
        # Perform k steps of contrastive divergence starting from v
        v_k = v.clone()
        for _ in range(self.k):
            _, h_k = self.sample_h(v_k)    # sample hidden from current visible
            _, v_k = self.sample_v(h_k)    # sample visible from hidden
        return v_k  # k-step reconstructed visible

    def free_energy(self, v):
        # Compute the visible bias term for each sample in the batch
        vbias_term = (v * self.v_bias).sum(dim=1)  # shape: [batch_size]
        # Compute the activation of the hidden units
        wx_b = F.linear(v, self.W, self.h_bias)     # shape: [batch_size, n_hidden]
        # Compute the hidden term
        hidden_term = torch.sum(torch.log1p(torch.exp(wx_b)), dim=1)  # shape: [batch_size]
        # Return the mean free energy over the batch
        return - (vbias_term + hidden_term).mean()
    
transform = transforms.Compose([transforms.ToTensor()])
train_dataset = datasets.FashionMNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.FashionMNIST(root='./data', train=False, transform=transform, download=True)

def objective(trial):
    num_rbm_epochs = trial.suggest_int("num_rbm_epochs", 5, 5)# 24, 33)
    batch_size = trial.suggest_int("batch_size", 192, 1024)
    rbm_lr = trial.suggest_float("rbm_lr", 0.05, 0.1)
    rbm_hidden = trial.suggest_int("rbm_hidden", 384, 8192)

    mlflow.start_run(experiment_id=experiment.experiment_id)
    if CLASSIFIER != 'LogisticRegression':
        fnn_hidden = trial.suggest_int("fnn_hidden", 192, 384)
        fnn_lr = trial.suggest_float("fnn_lr", 0.0001, 0.0025)
        mlflow.log_param("fnn_hidden", fnn_hidden)
        mlflow.log_param("fnn_lr", fnn_lr)

    num_classifier_epochs = trial.suggest_int("num_classifier_epochs", 5, 5)# 40, 60)

    mlflow.log_param("num_rbm_epochs", num_rbm_epochs)
    mlflow.log_param("batch_size", batch_size)
    mlflow.log_param("rbm_lr", rbm_lr)
    mlflow.log_param("rbm_hidden", rbm_hidden)
    mlflow.log_param("num_classifier_epochs", num_classifier_epochs)

    # Instantiate RBM and optimizer
    device = torch.device("mps")
    rbm = RBM(n_visible=784, n_hidden=rbm_hidden, k=1).to(device)
    optimizer = torch.optim.SGD(rbm.parameters(), lr=rbm_lr)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    rbm_training_failed = False
    # Training loop (assuming train_loader yields batches of images and labels)
    for epoch in range(num_rbm_epochs):
        total_loss = 0.0
        for images, _ in train_loader:
            # Flatten images and binarize
            v0 = images.view(-1, 784).to(rbm.W.device)      # shape [batch_size, 784]
            v0 = torch.bernoulli(v0)                        # sample binary input
            vk = rbm(v0)                                    # k-step CD reconstruction
            # Compute contrastive divergence loss (free energy difference)
            loss = rbm.free_energy(v0) - rbm.free_energy(vk)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}: avg free-energy loss = {total_loss/len(train_loader):.4f}")
        if np.isnan(total_loss):
            rbm_training_failed = True
            break

    if rbm_training_failed:
        accuracy = 0.0
    else:
        rbm.eval()  # set in evaluation mode if using any layers that behave differently in training
        features_list = []
        labels_list = []
        for images, labels in train_loader:
            v = images.view(-1, 784).to(rbm.W.device)
            v = v  # (optionally binarize or use raw normalized pixels)
            h_prob, h_sample = rbm.sample_h(v)  # get hidden activations
            features_list.append(h_prob.cpu().detach().numpy())
            labels_list.append(labels.numpy())
        train_features = np.concatenate(features_list)  # shape: [N_train, n_hidden]
        train_labels = np.concatenate(labels_list)

        # Convert pre-extracted training features and labels to tensors and create a DataLoader
        train_features_tensor = torch.tensor(train_features, dtype=torch.float32)
        train_labels_tensor = torch.tensor(train_labels, dtype=torch.long)
        train_feature_dataset = torch.utils.data.TensorDataset(train_features_tensor, train_labels_tensor)
        train_feature_loader = torch.utils.data.DataLoader(train_feature_dataset, batch_size=batch_size, shuffle=True)

            
        if CLASSIFIER == 'LogisticRegression':
            # add optuna tuning same as log reg without RBM features...
            lr_C = trial.suggest_float("lr_C", 0.01, 10.0, log=True)  
            mlflow.log_param("lr_C", lr_C)  # Log the chosen C value


            classifier = LogisticRegression(max_iter=num_classifier_epochs, C=lr_C, solver="saga") 
            classifier.fit(train_features, train_labels)            
            
        else:
            classifier = nn.Sequential(
                nn.Linear(rbm.n_hidden, fnn_hidden),
                nn.ReLU(),
                nn.Linear(fnn_hidden, 10)
            )

            # Move classifier to the same device as the RBM
            classifier = classifier.to(device)
            criterion = nn.CrossEntropyLoss()
            classifier_optimizer = torch.optim.Adam(classifier.parameters(), lr=fnn_lr)

            classifier.train()
            for epoch in range(num_classifier_epochs):
                running_loss = 0.0
                for features, labels in train_feature_loader:
                    features = features.to(device)
                    labels = labels.to(device)
                    
                    # Forward pass through classifier
                    outputs = classifier(features)
                    loss = criterion(outputs, labels)
                    
                    # Backpropagation and optimization
                    classifier_optimizer.zero_grad()
                    loss.backward()
                    classifier_optimizer.step()
                    
                    running_loss += loss.item()
                avg_loss = running_loss / len(train_feature_loader)
                print(f"Classifier Epoch {epoch+1}: loss = {avg_loss:.4f}")

        # Evaluate the classifier on test data.
        # Here we extract features from the RBM for each test image.
        if CLASSIFIER != 'LogisticRegression':
            classifier.eval()
            correct = 0
            total = 0
        features_list = []
        labels_list = []
        with torch.no_grad():
            for images, labels in test_loader:
                v = images.view(-1, 784).to(device)
                # Extract hidden activations; you can use either h_prob or h_sample.
                h_prob, _ = rbm.sample_h(v)
                if CLASSIFIER == 'LogisticRegression':
                    features_list.append(h_prob.cpu().detach().numpy())
                    labels_list.append(labels.numpy())
                else:
                    outputs = classifier(h_prob)
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted.cpu() == labels).sum().item()

        if CLASSIFIER == 'LogisticRegression':
            test_features = np.concatenate(features_list)
            test_labels = np.concatenate(labels_list)
            predictions = classifier.predict(test_features)
            accuracy = accuracy_score(test_labels, predictions) * 100
        else:
            accuracy = 100 * correct / total

        print(f"Test Accuracy: {accuracy:.2f}%")

    mlflow.log_metric("test_accuracy", accuracy)
    mlflow.end_run()

    return accuracy

if __name__ == "__main__":
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=50)
    print(study.best_params)
    print(study.best_value)
    print(study.best_trial)

![](logreg_withrbm_C.png){width=60%}
Test Accuracy By Number of RBM Hidden Units
![](logreg_withrbm_hiddenunits.png){width=60%}

<b>Model 5: Feed Forward Network on RBM Hidden Features (of Fashion MNIST Data)</b><br>


In [None]:
CLASSIFIER = 'FNN'

if CLASSIFIER == 'LogisticRegression':
    experiment = mlflow.set_experiment("pytorch-fmnist-lr-withrbm")
else:
    experiment = mlflow.set_experiment("pytorch-fmnist-fnn-withrbm")


class RBM(nn.Module):
    def __init__(self, n_visible=784, n_hidden=256, k=1):
        super(RBM, self).__init__()
        self.n_visible = n_visible
        self.n_hidden = n_hidden
        # Initialize weights and biases
        self.W = nn.Parameter(torch.randn(n_hidden, n_visible) * 0.1)
        self.v_bias = nn.Parameter(torch.zeros(n_visible))
        self.h_bias = nn.Parameter(torch.zeros(n_hidden))
        self.k = k  # CD-k steps

    def sample_h(self, v):
        # Given visible v, sample hidden h
        p_h = torch.sigmoid(F.linear(v, self.W, self.h_bias))  # p(h=1|v)
        h_sample = torch.bernoulli(p_h)                        # sample Bernoulli
        return p_h, h_sample

    def sample_v(self, h):
        # Given hidden h, sample visible v
        p_v = torch.sigmoid(F.linear(h, self.W.t(), self.v_bias))  # p(v=1|h)
        v_sample = torch.bernoulli(p_v)
        return p_v, v_sample

    def forward(self, v):
        # Perform k steps of contrastive divergence starting from v
        v_k = v.clone()
        for _ in range(self.k):
            _, h_k = self.sample_h(v_k)    # sample hidden from current visible
            _, v_k = self.sample_v(h_k)    # sample visible from hidden
        return v_k  # k-step reconstructed visible

    def free_energy(self, v):
        # Compute the visible bias term for each sample in the batch
        vbias_term = (v * self.v_bias).sum(dim=1)  # shape: [batch_size]
        # Compute the activation of the hidden units
        wx_b = F.linear(v, self.W, self.h_bias)     # shape: [batch_size, n_hidden]
        # Compute the hidden term
        hidden_term = torch.sum(torch.log1p(torch.exp(wx_b)), dim=1)  # shape: [batch_size]
        # Return the mean free energy over the batch
        return - (vbias_term + hidden_term).mean()
    
transform = transforms.Compose([transforms.ToTensor()])
train_dataset = datasets.FashionMNIST(root='./data', train=True, transform=transform, download=True)
test_dataset = datasets.FashionMNIST(root='./data', train=False, transform=transform, download=True)

def objective(trial):
    num_rbm_epochs = trial.suggest_int("num_rbm_epochs", 5, 5)# 24, 33)
    batch_size = trial.suggest_int("batch_size", 192, 1024)
    rbm_lr = trial.suggest_float("rbm_lr", 0.05, 0.1)
    rbm_hidden = trial.suggest_int("rbm_hidden", 384, 8192)

    mlflow.start_run(experiment_id=experiment.experiment_id)
    if CLASSIFIER != 'LogisticRegression':
        fnn_hidden = trial.suggest_int("fnn_hidden", 192, 384)
        fnn_lr = trial.suggest_float("fnn_lr", 0.0001, 0.0025)
        mlflow.log_param("fnn_hidden", fnn_hidden)
        mlflow.log_param("fnn_lr", fnn_lr)

    num_classifier_epochs = trial.suggest_int("num_classifier_epochs", 5, 5)# 40, 60)

    mlflow.log_param("num_rbm_epochs", num_rbm_epochs)
    mlflow.log_param("batch_size", batch_size)
    mlflow.log_param("rbm_lr", rbm_lr)
    mlflow.log_param("rbm_hidden", rbm_hidden)
    mlflow.log_param("num_classifier_epochs", num_classifier_epochs)

    # Instantiate RBM and optimizer
    device = torch.device("mps")
    rbm = RBM(n_visible=784, n_hidden=rbm_hidden, k=1).to(device)
    optimizer = torch.optim.SGD(rbm.parameters(), lr=rbm_lr)

    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    rbm_training_failed = False
    # Training loop (assuming train_loader yields batches of images and labels)
    for epoch in range(num_rbm_epochs):
        total_loss = 0.0
        for images, _ in train_loader:
            # Flatten images and binarize
            v0 = images.view(-1, 784).to(rbm.W.device)      # shape [batch_size, 784]
            v0 = torch.bernoulli(v0)                        # sample binary input
            vk = rbm(v0)                                    # k-step CD reconstruction
            # Compute contrastive divergence loss (free energy difference)
            loss = rbm.free_energy(v0) - rbm.free_energy(vk)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        print(f"Epoch {epoch+1}: avg free-energy loss = {total_loss/len(train_loader):.4f}")
        if np.isnan(total_loss):
            rbm_training_failed = True
            break

    if rbm_training_failed:
        accuracy = 0.0
    else:
        rbm.eval()  # set in evaluation mode if using any layers that behave differently in training
        features_list = []
        labels_list = []
        for images, labels in train_loader:
            v = images.view(-1, 784).to(rbm.W.device)
            v = v  # (optionally binarize or use raw normalized pixels)
            h_prob, h_sample = rbm.sample_h(v)  # get hidden activations
            features_list.append(h_prob.cpu().detach().numpy())
            labels_list.append(labels.numpy())
        train_features = np.concatenate(features_list)  # shape: [N_train, n_hidden]
        train_labels = np.concatenate(labels_list)

        # Convert pre-extracted training features and labels to tensors and create a DataLoader
        train_features_tensor = torch.tensor(train_features, dtype=torch.float32)
        train_labels_tensor = torch.tensor(train_labels, dtype=torch.long)
        train_feature_dataset = torch.utils.data.TensorDataset(train_features_tensor, train_labels_tensor)
        train_feature_loader = torch.utils.data.DataLoader(train_feature_dataset, batch_size=batch_size, shuffle=True)

            
        if CLASSIFIER == 'LogisticRegression':
            # add optuna tuning same as log reg without RBM features...
            lr_C = trial.suggest_float("lr_C", 0.01, 10.0, log=True)  
            mlflow.log_param("lr_C", lr_C)  # Log the chosen C value


            classifier = LogisticRegression(max_iter=num_classifier_epochs, C=lr_C, solver="saga") 
            classifier.fit(train_features, train_labels)            
            
        else:
            classifier = nn.Sequential(
                nn.Linear(rbm.n_hidden, fnn_hidden),
                nn.ReLU(),
                nn.Linear(fnn_hidden, 10)
            )

            # Move classifier to the same device as the RBM
            classifier = classifier.to(device)
            criterion = nn.CrossEntropyLoss()
            classifier_optimizer = torch.optim.Adam(classifier.parameters(), lr=fnn_lr)

            classifier.train()
            for epoch in range(num_classifier_epochs):
                running_loss = 0.0
                for features, labels in train_feature_loader:
                    features = features.to(device)
                    labels = labels.to(device)
                    
                    # Forward pass through classifier
                    outputs = classifier(features)
                    loss = criterion(outputs, labels)
                    
                    # Backpropagation and optimization
                    classifier_optimizer.zero_grad()
                    loss.backward()
                    classifier_optimizer.step()
                    
                    running_loss += loss.item()
                avg_loss = running_loss / len(train_feature_loader)
                print(f"Classifier Epoch {epoch+1}: loss = {avg_loss:.4f}")

        # Evaluate the classifier on test data.
        # Here we extract features from the RBM for each test image.
        if CLASSIFIER != 'LogisticRegression':
            classifier.eval()
            correct = 0
            total = 0
        features_list = []
        labels_list = []
        with torch.no_grad():
            for images, labels in test_loader:
                v = images.view(-1, 784).to(device)
                # Extract hidden activations; you can use either h_prob or h_sample.
                h_prob, _ = rbm.sample_h(v)
                if CLASSIFIER == 'LogisticRegression':
                    features_list.append(h_prob.cpu().detach().numpy())
                    labels_list.append(labels.numpy())
                else:
                    outputs = classifier(h_prob)
                    _, predicted = torch.max(outputs.data, 1)
                    total += labels.size(0)
                    correct += (predicted.cpu() == labels).sum().item()

        if CLASSIFIER == 'LogisticRegression':
            test_features = np.concatenate(features_list)
            test_labels = np.concatenate(labels_list)
            predictions = classifier.predict(test_features)
            accuracy = accuracy_score(test_labels, predictions) * 100
        else:
            accuracy = 100 * correct / total

        print(f"Test Accuracy: {accuracy:.2f}%")

    mlflow.log_metric("test_accuracy", accuracy)
    mlflow.end_run()

    return accuracy

if __name__ == "__main__":
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=50)
    print(study.best_params)
    print(study.best_value)
    print(study.best_trial)

Test Accuracy by RBM Hidden Units
![](fnn_withrbm_rbmhiddenunits.png){width=60%}
Test Accuracy by FNN Hidden Units
![](fnn_withrbm_fnnhiddenunits.png){width=60%}


### Conclusion

-   Summarize your key findings.
<style type="text/css">
.tg  {border-collapse:collapse;border-spacing:0;}
.tg td{border-color:black;border-style:solid;border-width:1px;font-family:Arial, sans-serif;font-size:14px;
  overflow:hidden;padding:10px 5px;word-break:normal;}
.tg th{border-color:black;border-style:solid;border-width:1px;font-family:Arial, sans-serif;font-size:14px;
  font-weight:normal;overflow:hidden;padding:10px 5px;word-break:normal;}
.tg .tg-baqh{text-align:center;vertical-align:top}
.tg .tg-lvth{font-size:16px;text-align:center;vertical-align:top}
.tg .tg-0lax{text-align:left;vertical-align:top}
</style>
<table class="tg"><thead>
  <tr>
    <th class="tg-lvth"><span style="font-weight:bold">Model</span></th>
    <th class="tg-lvth"><span style="font-weight:bold">Optuna Best Trial </span><br><span style="font-weight:bold">MLflow Test Accuracy (%)</span></th>
  </tr></thead>
<tbody>
  <tr>
    <td class="tg-baqh">Logistic Regression</td>
    <td class="tg-0lax">84.64%</td>
  </tr>
  <tr>
    <td class="tg-baqh">Feed Forward Network</td>
    <td class="tg-0lax">88.10%</td>
  </tr>
  <tr>
    <td class="tg-baqh">Convolutional Neural Network</td>
    <td class="tg-0lax">90.99%</td>
  </tr>
  <tr>
    <td class="tg-baqh">Logistic Regression (on RBM Hidden Features)</td>
    <td class="tg-0lax">86.95%</td>
  </tr>
  <tr>
    <td class="tg-baqh">Feed Forward Network (on RBM Hidden Features)</td>
    <td class="tg-0lax">86.98%</td>
  </tr>
</tbody>
</table>


-   Discuss the implications of your results.

## References
