In [7]:
import h5py
import torch
from torch.utils.data import Dataset
from torch.utils.data import random_split

# Phase 0: Loading the dataset

In [5]:
class EEGTextH5Dataset(Dataset):
    """
    Custom PyTorch Dataset to load data from an HDF5 file.

    Args:
        h5_path (str): Path to the HDF5 file.
    """
    def __init__(self, h5_path):
        self.h5_path = h5_path
        self.h5_file = None  # File handle will be opened in __getitem__ for multiprocessing
        
        # Open the file once to get the length
        with h5py.File(self.h5_path, 'r') as f:
            self.eeg_data_shape = f['eeg'].shape
            self.n_samples = self.eeg_data_shape[0]
            # Assuming 'text_tokens' is a variable-length dataset
            self.text_tokens_handle = f['input_ids'] 

    def __len__(self):
        return self.n_samples

    def __getitem__(self, idx):
        # Open the file here if not already open. This is better for DataLoader with num_workers > 0.
        if self.h5_file is None:
            self.h5_file = h5py.File(self.h5_path, 'r')
            
        # Retrieve the EEG data for the given index
        eeg_sample = self.h5_file['eeg'][idx, :, :]
        
        # Retrieve the variable-length text tokens for the given index
        text_sample = self.h5_file['input_ids'][idx]

        # Convert numpy arrays from h5py to PyTorch tensors
        eeg_tensor = torch.from_numpy(eeg_sample.astype('float32'))
        text_tensor = torch.from_numpy(text_sample.astype('int64'))
        
        return eeg_tensor, text_tensor


In [6]:

# --- How to use it ---
# 1. Define the path to your data file
H5_FILE_PATH = "/home/poorna/data/eeg_dataset.h5" 

# 2. Instantiate the dataset
# This 'dataset' object is now ready for the next phases.
dataset = EEGTextH5Dataset(H5_FILE_PATH)

print(f"Successfully loaded dataset with {len(dataset)} samples.")
eeg, text = dataset[0] # Test it
print("Sample 0 EEG shape:", eeg.shape)
print("Sample 0 Text shape:", text.shape)

Successfully loaded dataset with 28000 samples.
Sample 0 EEG shape: torch.Size([62, 400])
Sample 0 Text shape: torch.Size([64])


# Phase 1: Splitting the dataset

In [8]:
# Define the proportions for the split
TRAIN_PCT, VAL_PCT, TEST_PCT = 0.8, 0.1, 0.1

# Get the total number of samples
N = len(dataset) # This will be 28000

# Calculate the actual number of samples for each split
n_train = int(N * TRAIN_PCT)
n_val   = int(N * VAL_PCT)
n_test  = N - n_train - n_val

# Use a fixed generator seed for a reproducible split
g = torch.Generator().manual_seed(42)

# Perform the split
train_ds, val_ds, test_ds = random_split(dataset, [n_train, n_val, n_test], generator=g)

print(f"Total samples: {N}")
print(f"Split sizes -> Train: {len(train_ds)}, Validation: {len(val_ds)}, Test: {len(test_ds)}")

Total samples: 28000
Split sizes -> Train: 22400, Validation: 2800, Test: 2800


# Phase 2: Custom collate_fn for Padding

In [9]:
import torch
from torch.nn.utils.rnn import pad_sequence

# --- You can run this code now ---

# IMPORTANT: Adjust these token IDs to match your specific tokenizer.
# We'll assume '0' is the padding token for now.
PAD_ID = 0
SOS_ID = 1  # Start Of Sentence
EOS_ID = 2  # End Of Sentence

def collate_batch(batch):
    """
    Collates a list of samples into a padded batch.

    Args:
        batch (list): A list of tuples, where each tuple is (eeg_tensor, text_ids_tensor).
                      e.g., [ (torch.Size([62, 400]), torch.Size([64])), 
                              (torch.Size([62, 400]), torch.Size([58])), ... ]

    Returns:
        tuple: A tuple containing:
            - eeg_batch (torch.Tensor): A tensor of EEG signals, shape [B, 62, 400].
            - text_padded (torch.Tensor): A padded tensor of text IDs, shape [B, T_max].
            - text_lengths (torch.Tensor): A tensor of original text lengths, shape [B].
    """
    eeg_list, text_list = [], [import torch
from torch.nn.utils.rnn import pad_sequence

# --- You can run this code now ---

# IMPORTANT: Adjust these token IDs to match your specific tokenizer.
# We'll assume '0' is the padding token for now.
PAD_ID = 0
SOS_ID = 1  # Start Of Sentence
EOS_ID = 2  # End Of Sentence

def collate_batch(batch):
    """
    Collates a list of samples into a padded batch.

    Args:
        batch (list): A list of tuples, where each tuple is (eeg_tensor, text_ids_tensor).
                      e.g., [ (torch.Size([62, 400]), torch.Size([64])), 
                              (torch.Size([62, 400]), torch.Size([58])), ... ]

    Returns:
        tuple: A tuple containing:
            - eeg_batch (torch.Tensor): A tensor of EEG signals, shape [B, 62, 400].
            - text_padded (torch.Tensor): A padded tensor of text IDs, shape [B, T_max].
            - text_lengths (torch.Tensor): A tensor of original text lengths, shape [B].
    """
    eeg_list, text_list = [], []
    for eeg, txt in batch:
        eeg_list.append(eeg)
        text_list.append(txt.long()) # Ensure text IDs are LongTensors

    # Stack EEG signals (they are all the same size)
    eeg_batch = torch.stack(eeg_list, dim=0)

    # Store the original, un-padded lengths of each text sequence
    text_lengths = torch.tensor([t.numel() for t in text_list], dtype=torch.long)

    # Pad the text sequences to the length of the longest sequence in the batch
    # `batch_first=True` makes the output shape [Batch, SequenceLength]
    text_padded = pad_sequence(text_list, batch_first=True, padding_value=PAD_ID)

    return eeg_batch.float(), text_padded, text_lengths

print("`collate_batch` function defined successfully.")]
    for eeg, txt in batch:
        eeg_list.append(eeg)
        text_list.append(txt.long()) # Ensure text IDs are LongTensors

    # Stack EEG signals (they are all the same size)
    eeg_batch = torch.stack(eeg_list, dim=0)

    # Store the original, un-padded lengths of each text sequence
    text_lengths = torch.tensor([t.numel() for t in text_list], dtype=torch.long)

    # Pad the text sequences to the length of the longest sequence in the batch
    # `batch_first=True` makes the output shape [Batch, SequenceLength]
    text_padded = pad_sequence(text_list, batch_first=True, padding_value=PAD_ID)

    return eeg_batch.float(), text_padded, text_lengths

print("`collate_batch` function defined successfully.")

`collate_batch` function defined successfully.


# Phase 3: DataLoader Instantiation and Verification

In [10]:
from torch.utils.data import DataLoader

# --- You can run this code now ---

# Hyperparameters for the DataLoader
BATCH_SIZE = 32
NUM_WORKERS = 2 # Adjust this based on your machine's capability (0, 2, or 4 are common)

# Create the DataLoader for the training set
# - `shuffle=True` is important for training to ensure the model sees data in a random order each epoch.
# - `collate_fn=collate_batch` tells the DataLoader to use our custom padding function.
train_loader = DataLoader(train_ds, 
                          batch_size=BATCH_SIZE, 
                          shuffle=True,
                          num_workers=NUM_WORKERS, 
                          pin_memory=True, 
                          collate_fn=collate_batch)

# Create the DataLoader for the validation set
# - `shuffle=False` is used for validation/testing for consistent evaluation.
val_loader = DataLoader(val_ds, 
                        batch_size=BATCH_SIZE, 
                        shuffle=False,
                        num_workers=NUM_WORKERS, 
                        pin_memory=True, 
                        collate_fn=collate_batch)

print("DataLoaders created successfully.")
print("\n--- Verification Step ---")
print("Fetching one batch from the train_loader to inspect its shape...")

# Retrieve a single batch of data
eeg_batch, text_padded_batch, text_lengths_batch = next(iter(train_loader))

# Print the shapes of the tensors in the batch
print("\nEEG batch shape:", eeg_batch.shape)
print("Padded Text batch shape:", text_padded_batch.shape)
print("Text Lengths batch shape:", text_lengths_batch.shape)

DataLoaders created successfully.

--- Verification Step ---
Fetching one batch from the train_loader to inspect its shape...

EEG batch shape: torch.Size([32, 62, 400])
Padded Text batch shape: torch.Size([32, 64])
Text Lengths batch shape: torch.Size([32])


# Phase 4: Model Architecture (Encoder–Decoder with Attention)

In [17]:
import torch
import torch.nn as nn
import torch.nn.functional as F

# --- You can run this code now ---

class EEGEncoder(nn.Module):
    """
    Encodes the EEG signal from [B, 62, 400] into a sequence of feature vectors.
    """
    def __init__(self, in_channels=62, conv_dim=128, enc_hidden=256, num_layers=2, dropout=0.2):
        super().__init__()
        # 1D Convolutions to extract local and spatial features
        self.conv = nn.Sequential(
            nn.Conv1d(in_channels, 128, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm1d(128),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
            nn.Conv1d(128, conv_dim, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm1d(conv_dim),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
        )
        # Bidirectional GRU to capture long-range temporal dependencies
        self.rnn = nn.GRU(
            input_size=conv_dim,
            hidden_size=enc_hidden,
            num_layers=num_layers,
            bidirectional=True,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=False # We will permute input to [T, B, C]
        )

    def forward(self, eeg):
        # eeg shape: [B, C, T] -> [B, 62, 400]
        x = self.conv(eeg)          # -> [B, conv_dim, T]
        x = x.permute(2, 0, 1)      # -> [T, B, conv_dim] for RNN
        enc_outputs, enc_hidden = self.rnn(x)
        # enc_outputs: [T, B, 2 * H] (all hidden states)
        # enc_hidden: [2 * num_layers, B, H] (final hidden state)
        return enc_outputs, enc_hidden


class LuongAttention(nn.Module):
    """ Implements Luong's general dot-product attention. """
    def __init__(self, enc_dim, dec_dim):
        super().__init__()
        # Linear layer to project encoder outputs to the same dimension as the decoder hidden state
        self.attn = nn.Linear(enc_dim, dec_dim)

    def forward(self, decoder_hidden, encoder_outputs):
        # decoder_hidden shape: [1, B, dec_dim]
        # encoder_outputs shape: [T_enc, B, enc_dim]
        
        # Calculate alignment scores
        # (B, T_enc, dec_dim) @ (B, dec_dim, 1) -> (B, T_enc, 1)
        scores = torch.bmm(self.attn(encoder_outputs).permute(1, 0, 2), 
                           decoder_hidden.permute(1, 2, 0))
        
        attn_weights = F.softmax(scores, dim=1) # -> [B, T_enc, 1]
        
        # Calculate context vector (weighted sum of encoder outputs)
        # (B, 1, T_enc) @ (B, T_enc, enc_dim) -> (B, 1, enc_dim)
        context = torch.bmm(attn_weights.permute(0, 2, 1), encoder_outputs.permute(1, 0, 2))
        
        return context, attn_weights.squeeze(-1)


class Decoder(nn.Module):
    """
    Decodes the encoder's output into a sequence of text tokens.
    (CORRECTED VERSION)
    """
    def __init__(self, vocab_size, emb_dim=256, enc_hidden=256, dec_hidden=256, num_layers=2, 
                 pad_id=0, dropout=0.2):
        super().__init__()
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(vocab_size, emb_dim, padding_idx=pad_id)
        
        enc_dim = enc_hidden * 2 # From bidirectional encoder
        self.attention = LuongAttention(enc_dim, dec_hidden)
        
        self.rnn = nn.GRU(
            input_size=emb_dim + enc_dim,
            hidden_size=dec_hidden,
            num_layers=num_layers,
            dropout=dropout if num_layers > 1 else 0,
            batch_first=False
        )
        self.fc_out = nn.Linear(dec_hidden, vocab_size)
        self.dropout = nn.Dropout(dropout)
        
        # Bridge layer expects a concatenated hidden state of size 2 * enc_hidden
        self.bridge = nn.Linear(enc_dim, dec_hidden)

    def init_hidden(self, enc_hidden):
        # enc_hidden shape: [2 * num_layers, B, H]
        
        # --- START OF FIX ---
        # Separate the forward (even indices) and backward (odd indices) hidden states
        forward_h = enc_hidden[0::2]
        backward_h = enc_hidden[1::2]
        
        # Concatenate them along the feature dimension (dim=2)
        # This correctly creates a tensor of shape [num_layers, B, 2 * H]
        enc_hidden_cat = torch.cat([forward_h, backward_h], dim=2)
        # --- END OF FIX ---
        
        # Pass the properly shaped tensor through the bridge layer
        return torch.tanh(self.bridge(enc_hidden_cat))

    def forward(self, token, decoder_hidden, encoder_outputs):
        # token shape: [B]
        # decoder_hidden shape: [num_layers, B, dec_hidden]
        # encoder_outputs shape: [T_enc, B, enc_dim]
        token = token.unsqueeze(0) # -> [1, B] for embedding
        
        embedded = self.dropout(self.embedding(token)) # -> [1, B, emb_dim]
        
        context, attn_weights = self.attention(decoder_hidden[-1].unsqueeze(0), encoder_outputs) # Use top layer hidden state for attention
        
        rnn_input = torch.cat((embedded, context.permute(1,0,2)), dim=2) # -> [1, B, emb_dim + enc_dim]
        
        output, hidden = self.rnn(rnn_input, decoder_hidden)
        
        prediction = self.fc_out(output.squeeze(0)) # -> [B, vocab_size]
        
        return prediction, hidden, attn_weights


class Seq2Seq(nn.Module):
    """
    Wrapper class to combine the Encoder and Decoder.
    (CORRECTED VERSION 2)
    """
    def __init__(self, vocab_size, enc_hidden=256, dec_hidden=256, pad_id=0, dropout=0.2):
        super().__init__()
        self.encoder = EEGEncoder(enc_hidden=enc_hidden, dropout=dropout)
        self.decoder = Decoder(vocab_size=vocab_size, enc_hidden=enc_hidden, dec_hidden=dec_hidden, 
                               pad_id=pad_id, dropout=dropout)

    def forward(self, eeg, target_text, teacher_forcing_ratio=0.5):
        # eeg: [B, 62, 400], target_text: [B, T_out]
        batch_size = eeg.shape[0]
        target_len = target_text.shape[1]
        
        encoder_outputs, encoder_hidden = self.encoder(eeg)
        decoder_hidden = self.decoder.init_hidden(encoder_hidden)
        
        # First input to the decoder is the <SOS> token
        decoder_input = target_text[:, 0]
        
        # Store predictions
        outputs = torch.zeros(target_len, batch_size, self.decoder.vocab_size).to(eeg.device)
        
        for t in range(1, target_len):
            output, decoder_hidden, _ = self.decoder(decoder_input, decoder_hidden, encoder_outputs)
            outputs[t] = output
            
            teacher_force = torch.rand(1).item() < teacher_forcing_ratio
            top1 = output.argmax(1)
            decoder_input = target_text[:, t] if teacher_force else top1
            
        # --- FIX ---
        # The outputs tensor has shape [T, B, V] but outputs[0] is all zeros.
        # We must slice off this first timestep before permuting to align with the target.
        # This changes the shape from [T, B, V] to [T-1, B, V]
        return outputs[1:].permute(1, 0, 2)

# Phase 4.1: Model Verification (Smoke Test)

In [18]:
import torch

# --- You can run this code now ---

# 1. CRITICAL: Define your vocabulary size. 
#    Replace this placeholder with the actual number from your tokenizer.
VOCAB_SIZE = 30522  # <--- REPLACE THIS VALUE

# 2. Set up device and instantiate the model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = Seq2Seq(vocab_size=VOCAB_SIZE, pad_id=PAD_ID).to(device)

print(f"Model instantiated successfully and moved to '{device}'.")
print(f"Total parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

# 3. Fetch one batch from the train_loader (already created in Phase 3)
eeg_batch, text_padded_batch, _ = next(iter(train_loader))

# 4. Move batch to the same device as the model
eeg_batch = eeg_batch.to(device)
text_padded_batch = text_padded_batch.to(device)

# 5. Perform a forward pass
print("\nPerforming one forward pass (smoke test)...")
with torch.no_grad(): # We don't need to compute gradients for this test
    # The model expects the target text for teacher forcing, even if the ratio is low
    output_logits = model(eeg_batch, text_padded_batch, teacher_forcing_ratio=0.5)

print("Smoke test PASSED.")
print("Output logits shape:", output_logits.shape)

Model instantiated successfully and moved to 'cuda'.
Total parameters: 19,001,146

Performing one forward pass (smoke test)...
Smoke test PASSED.
Output logits shape: torch.Size([32, 63, 30522])


# Phase 5: The Training Loop

In [19]:
import torch
import torch.nn as nn
from torch.optim import AdamW
import math
import time

# --- You can run this code now (Corrected Version) ---

# We assume you have the following variables defined:
# model, train_loader, val_loader, PAD_ID, device

criterion = nn.CrossEntropyLoss(ignore_index=PAD_ID)
optimizer = AdamW(model.parameters(), lr=1e-4, weight_decay=1e-2)

def format_time(seconds):
    mins = int(seconds // 60)
    secs = int(seconds % 60)
    return f"{mins}m {secs}s"

def train_one_epoch(model, loader, optimizer, criterion, clip_norm=1.0, teacher_forcing=0.5):
    model.train()
    total_loss = 0.0
    
    for eeg_b, txt_b, _ in loader:
        eeg_b = eeg_b.to(device)
        txt_b = txt_b.to(device)
        
        optimizer.zero_grad()
        logits = model(eeg_b, txt_b, teacher_forcing_ratio=teacher_forcing)
        
        output_dim = logits.shape[-1]
        # FIX: Changed .view() to .reshape() to handle non-contiguous tensors
        logits_flat = logits.reshape(-1, output_dim)
        
        target_flat = txt_b[:, 1:].reshape(-1)
        loss = criterion(logits_flat, target_flat)
        
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip_norm)
        optimizer.step()
        
        total_loss += loss.item()
        
    return total_loss / len(loader)

@torch.no_grad()
def evaluate(model, loader, criterion):
    model.eval()
    total_loss = 0.0
    
    for eeg_b, txt_b, _ in loader:
        eeg_b = eeg_b.to(device)
        txt_b = txt_b.to(device)
        
        logits = model(eeg_b, txt_b, teacher_forcing_ratio=0.0)
        
        output_dim = logits.shape[-1]
        # FIX: Changed .view() to .reshape() to handle non-contiguous tensors
        logits_flat = logits.reshape(-1, output_dim)
        
        target_flat = txt_b[:, 1:].reshape(-1)
        loss = criterion(logits_flat, target_flat)
        
        total_loss += loss.item()
        
    return total_loss / len(loader)

EPOCHS = 10
best_val_loss = float('inf')

print("--- Starting Training (Corrected) ---")

for epoch in range(1, EPOCHS + 1):
    start_time = time.time()
    
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion)
    val_loss = evaluate(model, val_loader, criterion)
    
    end_time = time.time()
    epoch_time = end_time - start_time
    
    val_perplexity = math.exp(val_loss)
    
    print(f"Epoch {epoch:02d} | Time: {format_time(epoch_time)}")
    print(f"\tTrain Loss: {train_loss:.4f}")
    print(f"\t Val. Loss: {val_loss:.4f} | Val. Perplexity: {val_perplexity:7.4f}")
    
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        torch.save(model.state_dict(), 'eeg2text-best-model.pt')
        print("\t-> New best model saved!")

print("\n--- Training Complete ---")

--- Starting Training (Corrected) ---
Epoch 01 | Time: 5m 41s
	Train Loss: 5.1224
	 Val. Loss: 4.5570 | Val. Perplexity: 95.2977
	-> New best model saved!
Epoch 02 | Time: 5m 41s
	Train Loss: 3.9754
	 Val. Loss: 4.6639 | Val. Perplexity: 106.0535
Epoch 03 | Time: 5m 42s
	Train Loss: 3.7021
	 Val. Loss: 4.5495 | Val. Perplexity: 94.5807
	-> New best model saved!
Epoch 04 | Time: 5m 48s
	Train Loss: 3.5305
	 Val. Loss: 4.5962 | Val. Perplexity: 99.1062
Epoch 05 | Time: 5m 52s
	Train Loss: 3.3806
	 Val. Loss: 4.5117 | Val. Perplexity: 91.0740
	-> New best model saved!
Epoch 06 | Time: 5m 46s
	Train Loss: 3.2657
	 Val. Loss: 4.4999 | Val. Perplexity: 90.0047
	-> New best model saved!
Epoch 07 | Time: 5m 45s
	Train Loss: 3.1515
	 Val. Loss: 4.5811 | Val. Perplexity: 97.6246
Epoch 08 | Time: 5m 44s
	Train Loss: 3.0507
	 Val. Loss: 4.5562 | Val. Perplexity: 95.2239
Epoch 09 | Time: 5m 45s
	Train Loss: 2.9680
	 Val. Loss: 4.6167 | Val. Perplexity: 101.1599
Epoch 10 | Time: 5m 44s
	Train Loss: 

In [22]:
import torch
from transformers import AutoTokenizer # To convert token IDs back to text

# --- You can run this code now (Corrected for local model) ---

# 1. Setup: Load Tokenizer and your best model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# --- FIX ---
# Set the path to the directory containing your local BERT model/tokenizer files.
# For example: "C:/Users/YourUser/models/bert-base-uncased" or "/home/user/models/bert-base-uncased"
LOCAL_MODEL_PATH = "/home/poorna/models/bert-base-uncased" # <--- REPLACE THIS PATH

print(f"Loading tokenizer from local path: {LOCAL_MODEL_PATH}")
# Load the tokenizer from your local folder instead of the Hugging Face Hub
tokenizer = AutoTokenizer.from_pretrained(LOCAL_MODEL_PATH)
# --- END OF FIX ---

VOCAB_SIZE = tokenizer.vocab_size
PAD_ID = tokenizer.pad_token_id
SOS_ID = tokenizer.cls_token_id # BERT's [CLS] is often used as a Start-Of-Sentence token
EOS_ID = tokenizer.sep_token_id # BERT's [SEP] is often used as an End-Of-Sentence token

# Instantiate a fresh model with the same architecture
model = Seq2Seq(vocab_size=VOCAB_SIZE, pad_id=PAD_ID).to(device)

# Load the saved weights from your best-performing epoch
model.load_state_dict(torch.load('eeg2text-best-model.pt', map_location=device))
print("Best model loaded successfully.")


# 2. Define the Inference Function (Greedy Decoding)
@torch.no_grad()
def generate_text(model, eeg_signal):
    """
    Generates a sequence of token IDs from a single EEG signal.
    """
    model.eval() # Set model to evaluation mode
    
    # Add a batch dimension and move to the correct device
    eeg_signal = eeg_signal.unsqueeze(0).to(device)

    # Get the encoder's output
    encoder_outputs, encoder_hidden = model.encoder(eeg_signal)
    
    # Initialize the decoder hidden state
    decoder_hidden = model.decoder.init_hidden(encoder_hidden)
    
    # Start with the <SOS> token
    input_token = torch.tensor([SOS_ID], device=device)
    
    generated_ids = []
    max_len = 100 # Set a max length to prevent infinite loops

    for _ in range(max_len):
        # Decode one step
        prediction, decoder_hidden, _ = model.decoder(input_token, decoder_hidden, encoder_outputs)
        
        # Get the most likely token ID (greedy search)
        next_token_id = prediction.argmax(1)
        
        # If the model predicts the <EOS> token, stop generating
        if next_token_id.item() == EOS_ID:
            break
        
        generated_ids.append(next_token_id.item())
        
        # The predicted token becomes the next input
        input_token = next_token_id
        
    return generated_ids

# 3. Run Inference on a sample from the test set
# Let's take the 10th sample from your test_ds
eeg_sample, true_text_ids = test_ds[10]

print("\n--- Running Inference ---")
# Generate the predicted token IDs
predicted_ids = generate_text(model, eeg_sample)

# Decode the IDs back to human-readable text
true_text = tokenizer.decode(true_text_ids, skip_special_tokens=True)
predicted_text = tokenizer.decode(predicted_ids, skip_special_tokens=True)

print(f"\nGROUND TRUTH: {true_text}")
print(f"MODEL PREDICTION: {predicted_text}")

Loading tokenizer from local path: /home/poorna/models/bert-base-uncased
Best model loaded successfully.

--- Running Inference ---

GROUND TRUTH: two young elephants playfully wrestle in a grassy field.. tone : playful
MODEL PREDICTION: a person of a a a a a a a... tone : calm


In [23]:
import torch
from transformers import AutoTokenizer # To convert token IDs back to text

# --- You can run this code now (Corrected for local model) ---

# 1. Setup: Load Tokenizer and your best model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# --- FIX ---
# Set the path to the directory containing your local BERT model/tokenizer files.
# For example: "C:/Users/YourUser/models/bert-base-uncased" or "/home/user/models/bert-base-uncased"
LOCAL_MODEL_PATH = "/home/poorna/models/bert-base-uncased" # <--- REPLACE THIS PATH

print(f"Loading tokenizer from local path: {LOCAL_MODEL_PATH}")
# Load the tokenizer from your local folder instead of the Hugging Face Hub
tokenizer = AutoTokenizer.from_pretrained(LOCAL_MODEL_PATH)
# --- END OF FIX ---

VOCAB_SIZE = tokenizer.vocab_size
PAD_ID = tokenizer.pad_token_id
SOS_ID = tokenizer.cls_token_id # BERT's [CLS] is often used as a Start-Of-Sentence token
EOS_ID = tokenizer.sep_token_id # BERT's [SEP] is often used as an End-Of-Sentence token

# Instantiate a fresh model with the same architecture
model = Seq2Seq(vocab_size=VOCAB_SIZE, pad_id=PAD_ID).to(device)

# Load the saved weights from your best-performing epoch
model.load_state_dict(torch.load('eeg2text-best-model.pt', map_location=device))
print("Best model loaded successfully.")


# (Keep all the model and tokenizer loading code from before)
# Just replace the generate_text function with this one

@torch.no_grad()
def beam_search_decode(model, eeg_signal, beam_width=5, max_len=100):
    model.eval()
    eeg_signal = eeg_signal.unsqueeze(0).to(device)

    encoder_outputs, encoder_hidden = model.encoder(eeg_signal)
    decoder_hidden = model.decoder.init_hidden(encoder_hidden)

    # Start with the <SOS> token. A beam is a tuple of (sequence, score, hidden_state)
    beams = [([SOS_ID], 0.0, decoder_hidden)]
    
    for _ in range(max_len):
        new_beams = []
        for seq, score, hidden in beams:
            # If the last token is <EOS>, this beam is finished
            if seq[-1] == EOS_ID:
                new_beams.append((seq, score, hidden))
                continue

            # Get the prediction for the next token
            input_token = torch.tensor([seq[-1]], device=device)
            prediction, new_hidden, _ = model.decoder(input_token, hidden, encoder_outputs)
            
            # Get the log probabilities for all words in the vocab
            log_probs = F.log_softmax(prediction, dim=-1).squeeze()

            # Add the top `beam_width` next tokens to our list of candidates
            top_log_probs, top_ids = torch.topk(log_probs, beam_width)
            
            for i in range(beam_width):
                new_seq = seq + [top_ids[i].item()]
                new_score = score + top_log_probs[i].item()
                new_beams.append((new_seq, new_score, new_hidden))

        # Sort all candidates by their score (higher is better) and keep the top `beam_width`
        beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width]
        
        # If the top beam has ended, we can stop
        if beams[0][0][-1] == EOS_ID:
            break
            
    # Return the token IDs from the best beam (first one in the sorted list)
    # We strip the initial <SOS> token before returning
    return beams[0][0][1:]

# --- How to run it ---
print("\n--- Running Inference with Beam Search ---")
predicted_ids_beam = beam_search_decode(model, eeg_sample, beam_width=5)
predicted_text_beam = tokenizer.decode(predicted_ids_beam, skip_special_tokens=True)

print(f"\nGROUND TRUTH: {true_text}")
print(f"BEAM SEARCH PREDICTION: {predicted_text_beam}")

Loading tokenizer from local path: /home/poorna/models/bert-base-uncased
Best model loaded successfully.

--- Running Inference with Beam Search ---

GROUND TRUTH: two young elephants playfully wrestle in a grassy field.. tone : playful
BEAM SEARCH PREDICTION: close - up of a vibrant coral reef.. tone : calm


In [24]:
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer
import random # Import the random library

# --- You can run this code now ---

# 1. Setup: Load Tokenizer and your best model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

LOCAL_MODEL_PATH = "/home/poorna/models/bert-base-uncased"

print(f"Loading tokenizer from local path: {LOCAL_MODEL_PATH}")
tokenizer = AutoTokenizer.from_pretrained(LOCAL_MODEL_PATH)

VOCAB_SIZE = tokenizer.vocab_size
PAD_ID = tokenizer.pad_token_id
SOS_ID = tokenizer.cls_token_id
EOS_ID = tokenizer.sep_token_id

# Instantiate a fresh model with the same architecture
# Make sure your Seq2Seq class is defined in a previous cell
model = Seq2Seq(vocab_size=VOCAB_SIZE, pad_id=PAD_ID).to(device)

# Load the saved weights from your best-performing epoch
model.load_state_dict(torch.load('eeg2text-best-model.pt', map_location=device))
print("Best model loaded successfully.")


# 2. Define the Inference Function (Beam Search)
@torch.no_grad()
def beam_search_decode(model, eeg_signal, beam_width=5, max_len=100):
    model.eval()
    eeg_signal = eeg_signal.unsqueeze(0).to(device)

    encoder_outputs, encoder_hidden = model.encoder(eeg_signal)
    decoder_hidden = model.decoder.init_hidden(encoder_hidden)

    beams = [([SOS_ID], 0.0, decoder_hidden)]
    
    for _ in range(max_len):
        new_beams = []
        for seq, score, hidden in beams:
            if seq[-1] == EOS_ID:
                new_beams.append((seq, score, hidden))
                continue

            input_token = torch.tensor([seq[-1]], device=device)
            prediction, new_hidden, _ = model.decoder(input_token, hidden, encoder_outputs)
            
            log_probs = F.log_softmax(prediction, dim=-1).squeeze()
            top_log_probs, top_ids = torch.topk(log_probs, beam_width)
            
            for i in range(beam_width):
                new_seq = seq + [top_ids[i].item()]
                new_score = score + top_log_probs[i].item()
                new_beams.append((new_seq, new_score, new_hidden))

        beams = sorted(new_beams, key=lambda x: x[1], reverse=True)[:beam_width]
        
        if beams[0][0][-1] == EOS_ID:
            break
            
    return beams[0][0][1:]

# --- FIX: Run Inference on 10 Random Samples ---

# 3. Select 10 random indices from the test set
NUM_SAMPLES = 10
test_set_size = len(test_ds)
random_indices = random.sample(range(test_set_size), NUM_SAMPLES)

print(f"\n--- Running Inference on {NUM_SAMPLES} Random Samples ---")

# 4. Loop through the random indices and generate predictions
for i, idx in enumerate(random_indices):
    eeg_sample, true_text_ids = test_ds[idx]

    # Generate the predicted token IDs using beam search
    predicted_ids_beam = beam_search_decode(model, eeg_sample, beam_width=5)

    # Decode the IDs back to human-readable text
    true_text = tokenizer.decode(true_text_ids, skip_special_tokens=True)
    predicted_text_beam = tokenizer.decode(predicted_ids_beam, skip_special_tokens=True)
    
    # Print the results for this sample
    print(f"\n--- Sample {i+1}/{NUM_SAMPLES} (Index: {idx}) ---")
    print(f"GROUND TRUTH: {true_text}")
    print(f"MODEL PREDICTION: {predicted_text_beam}")

Loading tokenizer from local path: /home/poorna/models/bert-base-uncased
Best model loaded successfully.

--- Running Inference on 10 Random Samples ---

--- Sample 1/10 (Index: 2708) ---
GROUND TRUTH: a woman in a red jacket dances energetically in a room.. tone : energetic
MODEL PREDICTION: close - up of a vibrant coral reef.. tone : calm

--- Sample 2/10 (Index: 1736) ---
GROUND TRUTH: a rainbow crepe cake is sliced, revealing colorful layers.. tone : sweet
MODEL PREDICTION: close - up of a vibrant coral reef.. tone : calm

--- Sample 3/10 (Index: 1264) ---
GROUND TRUTH: a delicious mushroom and spinach pizza is presented on a metal surface.. tone : appetizing
MODEL PREDICTION: close - up of a vibrant coral reef.. tone : calm

--- Sample 4/10 (Index: 151) ---
GROUND TRUTH: a person plays a white kawai digital piano calmly.. tone : calm
MODEL PREDICTION: close - up of a vibrant coral reef.. tone : calm

--- Sample 5/10 (Index: 1217) ---
GROUND TRUTH: a man practices boxing in a gym, 