<a href="https://colab.research.google.com/github/sonnyloweus/QuantumDynamicsAI/blob/main/BoltzmannEncoderDecoder.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import ast
import torch
import pandas as pd
import numpy as np
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
import math
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, random_split

In [None]:
from google.colab import drive
drive.mount('/content/drive')
import os
directory_path = '/content/drive/MyDrive/Quantum/'

Mounted at /content/drive


# Encoder Decoder Definition

In [None]:
class BoltzmannEncoderDecoder(nn.Module):
    def __init__(self, batch_size, in_dim, out_dim):
        super().__init__()
        self.batch_size = batch_size
        self.in_dim = in_dim  # dimension of y
        self.out_dim = out_dim  # dimension of z

        self.b = nn.Parameter(torch.zeros((1, self.out_dim)))
        self.c = nn.Parameter(torch.zeros((1, self.in_dim)))
        self.W = nn.Parameter(torch.zeros((1, self.out_dim, self.in_dim)))
        self.init()

    def init(self):
        torch.nn.init.xavier_uniform_(self.b)
        torch.nn.init.xavier_uniform_(self.c)
        torch.nn.init.xavier_uniform_(self.W, gain=1)

    @staticmethod
    def conditional_log_probability_x_given_w(w, x, W, c):
        batch_size = x.shape[0]
        return torch.nn.functional.logsigmoid((2 * x.view(batch_size, -1, 1) - 1) *
                                              (c.unsqueeze(-1) + torch.transpose(W, 1, 2) @
                                               w.view(batch_size, -1, 1))).sum(dim=-2)
    @staticmethod
    def conditional_log_probability_w_given_x(w, x, W, b):
        batch_size = x.shape[0]
        return torch.nn.functional.logsigmoid((2 * w.view(batch_size, -1, 1) - 1) *
                                              (b.unsqueeze(-1) + W @ x.view(batch_size, -1,
                                                                                      1))).sum(dim=-2)
    @staticmethod
    def encoder_sample(x, W, b):
        batch_size = x.shape[0]
        thresholds = torch.sigmoid(
            (b.unsqueeze(-1) + W @ x.view(batch_size, -1, 1)))
        return (torch.rand_like(thresholds) < thresholds).float().squeeze(-1)

    @staticmethod
    def decoder_sample(w, W, c):
        batch_size = w.shape[0]
        thresholds = torch.sigmoid(
            (c.unsqueeze(-1) + torch.transpose(W, 1, 2) @ w.view(batch_size, -1, 1)))
        return (torch.rand_like(thresholds) < thresholds).float().squeeze(-1)

    def conditional_log_probability_x_given_w_(self, w, x):
        return BoltzmannEncoderDecoder.conditional_log_probability_x_given_w(w, x, self.W, self.c)

    def conditional_log_probability_w_given_x_(self, w, x):
        return BoltzmannEncoderDecoder.conditional_log_probability_w_given_x(w,x, self.W, self.b)

    def conditional_log_probability_w_given_x_double_batched_(self, w, x):
        dim_0 = w.shape[0]
        dim_1 = w.shape[1]
        return BoltzmannEncoderDecoder.conditional_log_probability_w_given_x(w.reshape(dim_0*dim_1, -1),x.reshape(dim_0*dim_1, -1), self.W, self.b).reshape(dim_0, dim_1, -1)

    def conditional_log_probability_w_given_x_double_batched(w, x, W, b):
        dim_0 = w.shape[0]
        dim_1 = w.shape[1]
        return BoltzmannEncoderDecoder.conditional_log_probability_w_given_x(w.reshape(dim_0*dim_1, -1),x.reshape(dim_0*dim_1, -1), W, b).reshape(dim_0, dim_1, -1)

    # simple since factorial distribution
    def encoder_sample_(self, x):
        return BoltzmannEncoderDecoder.encoder_sample(x, self.W, self.b)

    # simple since factorial distribution
    def batched_encoder_sample_(self, x):
        dim_0 = x.shape[0]
        dim_1 = x.shape[1]
        return BoltzmannEncoderDecoder.encoder_sample(x.reshape(dim_0*dim_1, -1), self.W, self.b).reshape(dim_0, dim_1, -1)

    # simple since factorial distribution
    @staticmethod
    def batched_encoder_sample(x, W, b):
        dim_0 = x.shape[0]
        dim_1 = x.shape[1]
        return BoltzmannEncoderDecoder.encoder_sample(x.reshape(dim_0*dim_1, -1), W, b).reshape(dim_0, dim_1, -1)


    def decoder_sample_(self, w):
        return BoltzmannEncoderDecoder.decoder_sample(w, self.W, self.c)


class EnergyBasedModelEmbeddingDynamics(nn.Module):
    def __init__(self, dim, hidden_dim = None):
        super().__init__()
        self.dim = dim  # dimension of y
        if hidden_dim is None:
            hidden_dim = 12
        #self.linear_1_weight = nn.Parameter(torch.zeros((hidden_dim, self.dim*self.dim)))
        self.linear_1_weight = nn.Parameter(torch.zeros((hidden_dim, 2*self.dim)))

        self.linear_1_bias = nn.Parameter(torch.zeros((hidden_dim)))
        self.linear_2_weight = nn.Parameter(torch.zeros((1, hidden_dim)))
        self.linear_2_bias = nn.Parameter(torch.zeros((1)))
        self.init()

    def init(self):
        torch.nn.init.xavier_uniform_(self.linear_1_weight)
        torch.nn.init.xavier_uniform_(self.linear_2_weight)

    def unnormalized_log_probs_w_given_z_double_batched_(self, z, w):
        return EnergyBasedModelEmbeddingDynamics.unnormalized_log_probs_w_given_z_double_batched(z, w, self.linear_1_weight, self.linear_1_bias, self.linear_2_weight, self.linear_2_bias)

    @staticmethod
    def energy_function_bilinear(i1, i2, W1, b1, W2, b2):
        batch_size = i1.shape[0]
        outer_product = torch.einsum('bi,bj->bij', (i1, i2))
        outer_product = outer_product.view(batch_size, -1)
        temp = torch.nn.functional.linear(outer_product, W1, bias=b1)
        temp = torch.nn.functional.relu(temp)
        o = torch.nn.functional.linear(temp, W2, b2)
        return o


    @staticmethod
    def energy_function_linear(i1, i2, W1, b1, W2, b2):
        batch_size = i1.shape[0]
        #outer_product = torch.einsum('bi,bj->bij', (i1, i2))
        #outer_product = outer_product.view(batch_size, -1)
        temp = torch.nn.functional.linear(torch.cat((i1,i2), dim = -1), W1, bias=b1)
        temp = torch.nn.functional.relu(temp)
        o = torch.nn.functional.linear(temp, W2, b2)
        return o

    @staticmethod
    def _energy(z, w, W1, b1, W2, b2):
        return EnergyBasedModelEmbeddingDynamics.energy_function_linear(z,w, W1, b1, W2, b2)
        #return EnergyBasedModelEmbeddingDynamics.energy_function_bilinear(z,w, W1, b1, W2, b2)

    @staticmethod
    def unnormalized_log_probs_w_given_z(z, w, W1, b1, W2, b2):
        return -EnergyBasedModelEmbeddingDynamics._energy(z, w, W1, b1, W2, b2)

    def unnormalized_log_probs_w_given_z_(self, z, w):
        return -EnergyBasedModelEmbeddingDynamics._energy(z, w, self.linear_1_weight, self.linear_1_bias, self.linear_2_weight, self.linear_2_bias)


    @staticmethod
    def unnormalized_log_probs_w_given_z_double_batched(z, w, W1, b1, W2, b2):
        first_dim = z.shape[0]
        second_dim = z.shape[1]
        dim = z.shape[2]
        energy = EnergyBasedModelEmbeddingDynamics._energy(z.reshape(-1, dim), w.reshape(-1, dim), W1, b1, W2, b2).reshape(first_dim, second_dim, 1)
        return -energy

    @staticmethod
    def expected_unnormalized_log_probs_w_given_z(z, w, W1, b1, W2, b2):
        samples_dim = z.shape[0]
        batch_dim = z.shape[1]
        dim = z.shape[2]
        energy = EnergyBasedModelEmbeddingDynamics._energy(z.reshape(-1, dim), w.reshape(-1, dim), W1, b1, W2, b2).reshape(samples_dim, batch_dim, 1)
        return -energy.mean(dim=0)

    # for small state spaces it is possible to manually compute the partition function
    @staticmethod
    def log_partition_function(z, W1, b1, W2, b2):
        dim = z.shape[-1]
        z = z.view(-1, 1, dim).expand(-1, 2 ** dim, -1)
        W = torch.arange(0, 2 ** dim, device = z.device).unsqueeze(-1).bitwise_and(2 ** torch.arange(
            dim, device = z.device)).ne(0).unsqueeze(0).expand(z.shape[0], -1, -1).float()
        log_probs = EnergyBasedModelEmbeddingDynamics.unnormalized_log_probs_w_given_z_double_batched(z, W, W1, b1, W2, b2)
        partitions = torch.logsumexp(log_probs, dim=1)
        return partitions

    # the initial state will help create the optimal proposal distribution
    @staticmethod
    @torch.no_grad()
    def estimated_log_partition_function_better(z, initial_state, W, b, _, W1, b1, W2, b2, samples = 512):
            z = z.expand(samples, -1, -1)
            initial_state = initial_state.expand(samples, -1, -1)
            w_batched = BoltzmannEncoderDecoder.batched_encoder_sample(initial_state, W, b)
            proposal_log_probs = BoltzmannEncoderDecoder.conditional_log_probability_w_given_x_double_batched(w_batched, initial_state, W, b)
            log_probs = EnergyBasedModelEmbeddingDynamics.unnormalized_log_probs_w_given_z_double_batched(z, w_batched, W1, b1, W2, b2)
            return  - math.log(samples) + torch.logsumexp(log_probs-proposal_log_probs, dim = 0)


    # do some importance sampling here, note that this is probably good enough for training where the gradient can be noisy
    # and is in generally the right direction, but is definitely not good enough for evaluation
    @staticmethod
    @torch.no_grad()
    def estimated_log_partition_function(z, W1, b1, W2, b2, samples = 512):
        dim = z.shape[-1]
        z = z.expand(samples, -1, -1)
        w = (torch.rand_like(z) < 0.5).float()
        log_probs = EnergyBasedModelEmbeddingDynamics.unnormalized_log_probs_w_given_z_double_batched(z, w, W1, b1, W2, b2)
        return dim*math.log(2) - math.log(samples) + torch.logsumexp(log_probs, dim = 0)

    def estimated_log_partition_function_(self, z):
        return EnergyBasedModelEmbeddingDynamics.estimated_log_partition_function(z, self.linear_1_weight, self.linear_1_bias, self.linear_2_weight, self.linear_2_bias)


    @staticmethod
    def normalized_log_probabilities_w_given_z(z, w, W1, b1, W2, b2):
        return EnergyBasedModelEmbeddingDynamics.unnormalized_log_probs_w_given_z(z, w, W1, b1, W2, b2) - EnergyBasedModelEmbeddingDynamics.log_partition_function(z, W1, b1, W2, b2)


    def normalized_log_probabilities_w_given_z_(self, z, w):
        return EnergyBasedModelEmbeddingDynamics.normalized_log_probabilities_w_given_z(z,w, self.linear_1_weight, self.linear_1_bias, self.linear_2_weight, self.linear_2_bias)

    def estimated_normalized_log_probabilities_w_given_z_(self, z, w):
        return EnergyBasedModelEmbeddingDynamics.estimated_normalized_log_probabilities_w_given_z(z,w, self.linear_1_weight, self.linear_1_bias, self.linear_2_weight, self.linear_2_bias)


    @staticmethod
    def estimated_normalized_log_probabilities_w_given_z(z, w, W1, b1, W2, b2):
        return EnergyBasedModelEmbeddingDynamics.unnormalized_log_probs_w_given_z(z, w, W1, b1, W2, b2) - EnergyBasedModelEmbeddingDynamics.estimated_log_partition_function(z, W1, b1, W2, b2)

    @staticmethod
    def estimated_normalized_log_probabilities_w_given_z_better(z, w, x, W, b, _, W1, b1, W2, b2):
        return EnergyBasedModelEmbeddingDynamics.unnormalized_log_probs_w_given_z(z, w, W1, b1, W2, b2) - EnergyBasedModelEmbeddingDynamics.estimated_log_partition_function_better(z, x, W, b, None, W1, b1, W2, b2)

    @staticmethod
    def estimated_normalized_log_probabilities_w_given_z_better_(z, w, x, model, samples = 1024):
        z_tilde = z.expand(samples, -1, -1)
        initial_state = x.expand(samples, -1, -1)

        # all from the proposal distribution
        w_tilde = model.encoder_decoder.batched_encoder_sample_(initial_state)
        proposal_log_probs = model.encoder_decoder.conditional_log_probability_w_given_x_double_batched_(w_tilde, initial_state)

        log_probs = model.embedding_dynamics.unnormalized_log_probs_w_given_z_double_batched_(z_tilde, w_tilde)
        return  model.embedding_dynamics.unnormalized_log_probs_w_given_z_(z,w) + math.log(samples) - torch.logsumexp(log_probs-proposal_log_probs, dim = 0)


class EnergyBasedEncoderDecoder(nn.Module):
    def __init__(self, in_dim, out_dim,  hidden_dim = None):
        super().__init__()
        self.in_dim = in_dim  # dimension of x
        self.out_dim = out_dim # dimension of w
        if hidden_dim is None:
            hidden_dim = 12

        self.linear_1_weight = nn.Parameter(torch.zeros((hidden_dim, self.in_dim+ self.out_dim)))
        self.linear_1_bias = nn.Parameter(torch.zeros((hidden_dim)))
        self.linear_2_weight = nn.Parameter(torch.zeros((1, hidden_dim)))
        self.linear_2_bias = nn.Parameter(torch.zeros((1)))
        self.init()

    def init(self):
        torch.nn.init.xavier_uniform_(self.linear_1_weight)
        torch.nn.init.xavier_uniform_(self.linear_2_weight)

    def forward(self, input_a, input_b):
        return self._energy(input_a, input_b, self.linear_1_weight, self.linear_1_bias, self.linear_2_weight, self.linear_2_bias)

    @staticmethod
    def energy_function_linear(a, b, W1, b1, W2, b2):
        temp = torch.nn.functional.linear(torch.cat((a,b), dim = -1), W1, bias=b1)
        temp = torch.nn.functional.relu(temp)
        o = torch.nn.functional.linear(temp, W2, b2)
        return o

    @staticmethod
    def _energy(a, b, W1, b1, W2, b2):
        return EnergyBasedEncoderDecoder.energy_function_linear(a, b, W1, b1, W2, b2)


    @staticmethod
    def unnormalized_log_probs_a_given_b(a, b, W1, b1, W2, b2):
        return -EnergyBasedEncoderDecoder._energy(a, b, W1, b1, W2, b2)

    def unnormalized_log_probs_a_given_b_(self, a, b):
        return EnergyBasedEncoderDecoder.unnormalized_log_probs_a_given_b(a, b, self.linear_1_weight, self.linear_1_bias, self.linear_2_weight, self.linear_2_bias)

    @staticmethod
    def unnormalized_log_probs_a_given_b_double_batched(a, b, W1, b1, W2, b2):
        first_dim = a.shape[0]
        second_dim = a.shape[1]
        dim_a = a.shape[2]
        dim_b = b.shape[2]
        energy = EnergyBasedEncoderDecoder._energy(a.reshape(-1, dim_a), b.reshape(-1, dim_b), W1, b1, W2, b2).reshape(first_dim, second_dim, 1)
        return -energy

    # for small state spaces it is possible to manually compute the partition function
    @staticmethod
    def log_partition_function(b, W1, b1, W2, b2):
        dim = b.shape[-1]
        b = b.view(-1, 1, dim).expand(-1, 2 ** dim, -1)
        A = torch.arange(0, 2 ** dim, device = b.device).unsqueeze(-1).bitwise_and(2 ** torch.arange(
            dim, device = b.device)).ne(0).unsqueeze(0).expand(b.shape[0], -1, -1).float()
        log_probs = EnergyBasedEncoderDecoder.unnormalized_log_probs_a_given_b_double_batched(A, b, W1, b1, W2, b2)
        partitions = torch.logsumexp(log_probs, dim=1)
        return partitions

    @staticmethod
    def conditional_log_probability_a_given_b(a, b, W1, b1, W2, b2):
        return EnergyBasedEncoderDecoder.unnormalized_log_probs_a_given_b(a, b, W1, b1, W2, b2) - EnergyBasedEncoderDecoder.log_partition_function(b, W1, b1, W2, b2)


    def conditional_log_probability_a_given_b_(self, a, b):
        return EnergyBasedEncoderDecoder.conditional_log_probability_a_given_b(a, b, self.linear_1_weight, self.linear_1_bias, self.linear_2_weight, self.linear_2_bias)


    def conditional_log_probability_a_given_b_double_batched_(self, a, b):
        dim_0 = a.shape[0]
        dim_1 = a.shape[1]
        return EnergyBasedEncoderDecoder.conditional_log_probability_a_given_b(a.reshape(dim_0*dim_1, -1),b.reshape(dim_0*dim_1, -1)).reshape(dim_0, dim_1, -1)

# Data Processing

In [None]:
# Load and prepare the data
data_path = '/content/drive/MyDrive/Quantum/quantum_simulation_data.pkl'
data = pd.read_pickle(data_path)

def to_tensor(item):
    if isinstance(item, torch.Tensor):
        # If already a tensor, clone and detach it to prevent issues
        return item.clone().detach()
    elif isinstance(item, str):
        # Convert string to list using ast.literal_eval
        item_list = ast.literal_eval(item)
        return torch.tensor(item_list)
    elif isinstance(item, list):
        return torch.tensor(item)
    else:
        raise ValueError(f"Expected a list or tensor, but got {type(item)}")

data.shape

(64000, 2)

In [None]:
data['Initial_State'] = [to_tensor(lst) for lst in data['Initial_State']]
data['Final_State'] = [to_tensor(lst) for lst in data['Final_State']]

print(data['Final_State'][0])

# Convert Series to list of tensors before stacking
initial_state_tensors = list(data['Initial_State'])
final_state_tensors = list(data['Final_State'])

dataset = TensorDataset(torch.stack(initial_state_tensors), torch.stack(final_state_tensors))
print(dataset)

# Define the split sizes (e.g., 80% train, 20% test)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

# Perform the train-test split
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Example of how to use DataLoader for training and testing
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# X_train, X_test, y_train, y_test = train_test_split(initial_state_tensors, final_state_tensors, test_size=0.2, random_state=42)
# train_loader = torch.utils.data.DataLoader(list(zip(X_train, y_train)), batch_size=32, shuffle=False)
# test_loader = torch.utils.data.DataLoader(list(zip(X_test, y_test)), batch_size=32, shuffle=False)

tensor([0., 0., 1., 0., 1., 0., 0., 0., 0., 1., 1., 1.])
<torch.utils.data.dataset.TensorDataset object at 0x7b37e0179690>


# Training and Evaluation

In [None]:
# Parameters
input_dim = 12  # Assuming each bitstring has 12 bits
hidden_dim = 64
num_layers = 3
num_heads = 4
output_dim = 12  # Predicting 12-bit output
max_seq_len = 12  # Maximum length of the input sequence

# Initialize model
model = EnergyBasedQuantumTransformer(input_dim, hidden_dim, num_layers, num_heads, output_dim, max_seq_len)

# # Example input: batch_size = 2, sequence length = 12
# input_bitstrings = torch.tensor([[0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1], [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]], dtype=torch.float32)
# # Forward pass
# output_bitstrings = model.predict_binary(input_bitstrings)
# print("Predicted Output Bitstrings:", output_bitstrings)

In [None]:
def energy_loss(output, target):
    return F.mse_loss(output, target)  # Using MSE as an example loss function
    # return F.binary_cross_entropy_with_logits(output, target)

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

num_epochs = 100
model.train()

for epoch in range(num_epochs):
    for initial, final in train_loader:
        optimizer.zero_grad()

        # Forward pass
        outputs = model(initial)

        # Compute loss
        loss = energy_loss(outputs, final)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

TypeError: EnergyBasedEncoderDecoder.energy_function_linear() missing 1 required positional argument: 'b2'

In [None]:
model.eval()

with torch.no_grad():
    total_loss = 0
    for initial, final in val_dataloader:
        outputs = model(initial, final)
        loss = energy_loss(outputs, final)
        total_loss += loss.item()

    avg_loss = total_loss / len(val_dataloader)
    print(f'Validation Loss: {avg_loss:.4f}')