In [51]:
import numpy as np
import pandas as pd
from scipy.stats import bernoulli
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
import random
from sklearn.model_selection import train_test_split
from torch import nn
from torch.nn import functional as F
from torch import optim as optim
import matplotlib.pyplot as plt
from dataset_script import make_training_set, make_easy_test_set, make_med_test_set
# from dataset_script import gen_random_unitary_trans, apply_random_unitary_trans -- doesn't work currently



In [52]:
X, y_easy_c1, y_easy_c2, y_easy_c3, y_med_c12, y_med_c13, y_med_c23 = make_training_set()
X_test_easy, y_easy_c1_test, y_easy_c2_test, y_easy_c3_test = make_easy_test_set()
X_test_med, y_med_c12_test, y_med_c13_test, y_med_c23_test = make_med_test_set()

In [53]:
def apply_random_unitary_trans(X: np.array, random_rotation: np.array):
    res = np.einsum('ijk,kl->ijl', X, random_rotation)
    return res


def gen_random_unitary_trans() -> np.array:
    input_dim = 3 # hard coded so doesn't need input array for dimensions
    random_rotation = np.random.randn(input_dim, input_dim)
    q, r = np.linalg.qr(random_rotation)
    random_rotation = q  # q is a unitary matrix
    return random_rotation

In [54]:
random_rotation = gen_random_unitary_trans() # input necessary 
X = apply_random_unitary_trans(X, random_rotation)
X_test_easy = apply_random_unitary_trans (X, random_rotation)
X_test_med = apply_random_unitary_trans(X, random_rotation)

In [55]:

# Convert to PyTorch tensors
X_tensor = torch.tensor(X, dtype=torch.float32)
y_easy_c1_tensor = torch.tensor(y_easy_c1, dtype=torch.long)
y_easy_c2_tensor = torch.tensor(y_easy_c2, dtype=torch.long)
y_easy_c3_tensor = torch.tensor(y_easy_c3, dtype=torch.long)

X_tensor = torch.tensor(X, dtype=torch.float32)
y_med_c12_tensor = torch.tensor(y_med_c12, dtype=torch.long)
y_med_c13_tensor = torch.tensor(y_med_c13, dtype=torch.long)
y_med_c23_tensor = torch.tensor(y_med_c23, dtype=torch.long)


X_test_easy_tensor = torch.tensor(X_test_easy, dtype=torch.float32)
y_easy_c1_test_tensor = torch.tensor(y_easy_c1_test, dtype=torch.long)
y_easy_c2_test_tensor = torch.tensor(y_easy_c2_test, dtype=torch.long)
y_easy_c3_test_tensor = torch.tensor(y_easy_c3_test, dtype=torch.long)

X_test_med_tensor = torch.tensor(X_test_med, dtype=torch.float32)
y_med_c12_test_tensor = torch.tensor(y_med_c12_test, dtype=torch.long)
y_med_c13_test_tensor = torch.tensor(y_med_c13_test, dtype=torch.long)
y_med_c23_test_tensor = torch.tensor(y_med_c23_test, dtype=torch.long)


    


In [56]:
def split_dataset(X, y, test_size=0.2, random_state=42):
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=test_size, random_state=random_state)
    return X_train, X_val, y_train, y_val

In [67]:
class WhiskerDataset(Dataset):
    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        if idx < 0 or idx >= len(self.X):
            raise IndexError(f"Index {idx} is out of bounds for the dataset of length {len(self.X)}")
        return self.X[idx], self.y[idx]

In [58]:
# Data preprocessing for easy tasks

# Split the dataset for easy tasks
X_train_c1, X_val_c1, y_train_c1, y_val_c1 = split_dataset(X_tensor, y_easy_c1_tensor)
X_train_c2, X_val_c2, y_train_c2, y_val_c2 = split_dataset(X_tensor, y_easy_c2_tensor)
X_train_c3, X_val_c3, y_train_c3, y_val_c3 = split_dataset(X_tensor, y_easy_c3_tensor)

# Create Dataset and DataLoader for each split (easy tasks)
train_dataset_c1 = WhiskerDataset(X_train_c1, y_train_c1)
val_dataset_c1 = WhiskerDataset(X_val_c1, y_val_c1)

train_dataset_c2 = WhiskerDataset(X_train_c2, y_train_c2)
val_dataset_c2 = WhiskerDataset(X_val_c2, y_val_c2)

train_dataset_c3 = WhiskerDataset(X_train_c3, y_train_c3)
val_dataset_c3 = WhiskerDataset(X_val_c3, y_val_c3)

train_dataloader_c1 = DataLoader(train_dataset_c1, batch_size=20, shuffle=True)
val_dataloader_c1 = DataLoader(val_dataset_c1, batch_size=20, shuffle=False)

train_dataloader_c2 = DataLoader(train_dataset_c2, batch_size=20, shuffle=True)
val_dataloader_c2 = DataLoader(val_dataset_c2, batch_size=20, shuffle=False)

train_dataloader_c3 = DataLoader(train_dataset_c3, batch_size=20, shuffle=True)
val_dataloader_c3 = DataLoader(val_dataset_c3, batch_size=20, shuffle=False)

# Create test Dataset and DataLoader for easy tasks
test_dataset_c1 = WhiskerDataset(X_test_easy_tensor, y_easy_c1_test_tensor)
test_dataloader_c1 = DataLoader(test_dataset_c1, batch_size=20, shuffle=False)

test_dataset_c2 = WhiskerDataset(X_test_easy_tensor, y_easy_c2_test_tensor)
test_dataloader_c2 = DataLoader(test_dataset_c2, batch_size=20, shuffle=False)

test_dataset_c3 = WhiskerDataset(X_test_easy_tensor, y_easy_c3_test_tensor)
test_dataloader_c3 = DataLoader(test_dataset_c3, batch_size=20, shuffle=False)

In [59]:
# Data preprocessing for medium tasks

# Split the dataset for medium tasks
X_train_c12, X_val_c12, y_train_c12, y_val_c12 = split_dataset(X_tensor, y_med_c12_tensor)
X_train_c13, X_val_c13, y_train_c13, y_val_c13 = split_dataset(X_tensor, y_med_c13_tensor)
X_train_c23, X_val_c23, y_train_c23, y_val_c23 = split_dataset(X_tensor, y_med_c23_tensor)

# Create Dataset and DataLoader for each split (medium tasks)
train_dataset_c12 = WhiskerDataset(X_train_c12, y_train_c12)
val_dataset_c12 = WhiskerDataset(X_val_c12, y_val_c12)

train_dataset_c13 = WhiskerDataset(X_train_c13, y_train_c13)
val_dataset_c13 = WhiskerDataset(X_val_c13, y_val_c13)

train_dataset_c23 = WhiskerDataset(X_train_c23, y_train_c23)
val_dataset_c23 = WhiskerDataset(X_val_c23, y_val_c23)

train_dataloader_c12 = DataLoader(train_dataset_c12, batch_size=20, shuffle=True)
val_dataloader_c12 = DataLoader(val_dataset_c12, batch_size=20, shuffle=False)

train_dataloader_c13 = DataLoader(train_dataset_c13, batch_size=20, shuffle=True)
val_dataloader_c13 = DataLoader(val_dataset_c13, batch_size=20, shuffle=False)

train_dataloader_c23 = DataLoader(train_dataset_c23, batch_size=20, shuffle=True)
val_dataloader_c23 = DataLoader(val_dataset_c23, batch_size=20, shuffle=False)

# Create test Dataset and DataLoader for medium tasks
test_dataset_c12 = WhiskerDataset(X_test_med_tensor, y_med_c12_test_tensor)
test_dataloader_c12 = DataLoader(test_dataset_c12, batch_size=20, shuffle=False)

test_dataset_c13 = WhiskerDataset(X_test_med_tensor, y_med_c13_test_tensor)
test_dataloader_c13 = DataLoader(test_dataset_c13, batch_size=20, shuffle=False)

test_dataset_c23 = WhiskerDataset(X_test_med_tensor, y_med_c23_test_tensor)
test_dataloader_c23 = DataLoader(test_dataset_c23, batch_size=20, shuffle=False)

In [60]:
device = "cuda" if torch.cuda.is_available() else "cpu"

# hyperparams

input_size = 3
sequence_len = 20
num_layers = 1
hidden_size = 60
num_classes = 2
batch_size = 20
num_epochs = 2 # variables num epochs
learning_rate = 0.005
weight_decay = 10 ** -10 # l2 regularization term
max_epochs_easy = 20
max_epochs_medium = 40
max_epochs_hard = 100

In [61]:

class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes, sequence_len):
        super(RNN, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.num_classes = num_classes
        self.sequence_len = sequence_len
        self.rnn = nn.RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)

        # Forward propagate RNN
        out, _ = self.rnn(x, h0)
        noise = torch.randn_like(out) # strength of noise is always 1
        out += noise
        out = F.relu(out)

        

        # Pass the output of the last time step to the fully connected layer
        out = self.fc(out[:, -1, :])
        return out



In [62]:
def train_model(data_list: list[tuple]) -> None:
    for model, loss_fn, optimizer, task_label, train_dataloader, val_dataloader, test_dataloader in data_list:
        train_losses = []
        val_losses = []
        val_accuracies = []
        test_accuracies = []  # Initialize list for storing test accuracies

        target_loss = 1e-3
        num_epochs = max_epochs_easy  # Set the appropriate number of epochs based on the task complexity

        for epoch in range(num_epochs):
            # Training loopx
            model.train()
            train_loss = 0
            for data, target in train_dataloader:
                data = data.to(device)
                target = target.to(device)

                scores = model(data)
                loss = loss_fn(scores, target)

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                train_loss += loss.item()
            
            # Calculate average training loss for the epoch
            train_loss /= len(train_dataloader)
            train_losses.append(train_loss)
            
            # Validation loop
            model.eval()
            val_loss = 0
            correct = 0
            total = 0
            with torch.no_grad():
                for data, target in val_dataloader:
                    data = data.to(device)
                    target = target.to(device)

                    scores = model(data)
                    loss = loss_fn(scores, target)
                    val_loss += loss.item()
                    _, predicted = torch.max(scores.data, 1)
                    total += target.size(0)
                    correct += (predicted == target).sum().item()
            
            # Calculate average validation loss and accuracy for the epoch
            val_loss /= len(val_dataloader)
            val_losses.append(val_loss)
            accuracy = 100 * correct / total
            val_accuracies.append(accuracy)
            
            print(f"Epoch [{epoch+1}/{num_epochs}], Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {accuracy:.2f}%")
            
            # Test loop (for test accuracy)
            model.eval()
            correct = 0
            total = 0
            with torch.no_grad():
                for data, target in test_dataloader:  # Using the test dataloader
                    data = data.to(device)
                    target = target.to(device)

                    scores = model(data)
                    _, predicted = torch.max(scores.data, 1)
                    total += target.size(0)
                    correct += (predicted == target).sum().item()
            
            # Calculate test accuracy
            test_accuracy = 100 * correct / total
            test_accuracies.append(test_accuracy)
            
            print(f"Test Accuracy: {test_accuracy:.2f}%")
            
            # Check for early stopping
            if train_loss <= target_loss:
                print(f"Target loss of {target_loss} reached at epoch {epoch+1}")
                break


        # Plot training and validation loss
        plt.figure(figsize=(10, 6))
        plt.plot(range(len(train_losses)), train_losses, label='Training Loss')
        plt.plot(range(len(val_losses)), val_losses, label='Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title(task_label + ' Training and Validation Loss')
        plt.legend()
        plt.show()


In [63]:
model_easy_c1 = RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes, sequence_len=sequence_len).to(device)
model_easy_c2 = RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes, sequence_len=sequence_len).to(device)
model_easy_c3 = RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes, sequence_len=sequence_len).to(device)

loss_fn_easy_c1 = nn.CrossEntropyLoss()
loss_fn_easy_c2 = nn.CrossEntropyLoss()
loss_fn_easy_c3 = nn.CrossEntropyLoss()

optim_easy_c1 = optim.Adam(params=model_easy_c1.parameters(), lr=learning_rate, weight_decay=weight_decay)
optim_easy_c2 = optim.Adam(params=model_easy_c2.parameters(), lr=learning_rate, weight_decay=weight_decay)
optim_easy_c3 = optim.Adam(params=model_easy_c3.parameters(), lr=learning_rate, weight_decay=weight_decay)

easy_data_list = [
(model_easy_c1, loss_fn_easy_c1, optim_easy_c1, "Easy Task 1", train_dataloader_c1, val_dataloader_c1, test_dataloader_c1), \
(model_easy_c2, loss_fn_easy_c2, optim_easy_c2, "Easy Task 2", train_dataloader_c2, val_dataloader_c2, test_dataloader_c2), \
(model_easy_c3, loss_fn_easy_c3, optim_easy_c3, "Easy Task 3", train_dataloader_c3, val_dataloader_c3, test_dataloader_c3)
]


In [64]:
model_med_c12 = RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes, sequence_len=sequence_len).to(device)
model_med_c13 = RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes, sequence_len=sequence_len).to(device)
model_med_c23 = RNN(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes, sequence_len=sequence_len).to(device)

loss_fn_med_c12 = nn.CrossEntropyLoss()
loss_fn_med_c13 = nn.CrossEntropyLoss()
loss_fn_med_c23 = nn.CrossEntropyLoss()

optim_med_c12 = optim.Adam(params=model_med_c12.parameters(), lr=learning_rate, weight_decay=weight_decay)
optim_med_c13 = optim.Adam(params=model_med_c13.parameters(), lr=learning_rate, weight_decay=weight_decay)
optim_med_c23 = optim.Adam(params=model_med_c23.parameters(), lr=learning_rate, weight_decay=weight_decay)

medium_data_list = [
    (model_med_c12, loss_fn_med_c12, optim_med_c12, "Medium Task 1-2", train_dataloader_c12, val_dataloader_c12, test_dataloader_c12),
    (model_med_c13, loss_fn_med_c13, optim_med_c13, "Medium Task 1-3", train_dataloader_c13, val_dataloader_c13, test_dataloader_c13),
    (model_med_c23, loss_fn_med_c23, optim_med_c23, "Medium Task 2-3", train_dataloader_c23, val_dataloader_c23, test_dataloader_c23)
]

In [66]:
print(f"X_tensor length: {len(X_tensor)}")
print(f"y_easy_c1_tensor length: {len(y_easy_c1_tensor)}")

print(f"X_train_c1 length: {len(X_train_c1)}")
print(f"X_val_c1 length: {len(X_val_c1)}")
print(f"y_train_c1 length: {len(y_train_c1)}")
print(f"y_val_c1 length: {len(y_val_c1)}")

print(f"train_dataset_c1 length: {len(train_dataset_c1)}")
print(f"val_dataset_c1 length: {len(val_dataset_c1)}")

print(f"train_dataloader_c1 length: {len(train_dataloader_c1)}")
print(f"val_dataloader_c1 length: {len(val_dataloader_c1)}")

X_tensor length: 3200
y_easy_c1_tensor length: 3200
X_train_c1 length: 2560
X_val_c1 length: 640
y_train_c1 length: 2560
y_val_c1 length: 640
train_dataset_c1 length: 2560
val_dataset_c1 length: 640
train_dataloader_c1 length: 128
val_dataloader_c1 length: 32


In [65]:
train_model(easy_data_list)

Epoch [1/20], Train Loss: 0.0852, Val Loss: 0.0051, Val Accuracy: 100.00%


IndexError: index 320 is out of bounds for dimension 0 with size 320