In [1]:
import torch
import numpy as np

In [2]:
cd src/

/home/kunal/UKB/Neuro_GPT/NeuroGPT_UKB/src


In [3]:
from embedder import make 
from decoder import make_decoder

In [4]:
e = make.make_embedder()
d = make_decoder.make_decoder()

  from .autonotebook import tqdm as notebook_tqdm


In [5]:
from model import Model
test = Model('Neuro-GPT/pretrained_model/pytorch_model.bin',e,d)

In [6]:
def inspect_model_deeply(model):
    """
    Deeply inspect the model's attributes and their types
    """
    print("\nDetailed Model Inspection:")
    print("=========================")
    
    # Main model attributes
    print("\nTop-level attributes:")
    for attr in dir(model):
        if not attr.startswith('_'):
            value = getattr(model, attr)
            print(f"{attr}: {type(value)}")
            
    # Inspect embedder if it exists
    if hasattr(model, 'embedder'):
        print("\nEmbedder attributes:")
        embedder = model.embedder
        for attr in dir(embedder):
            if not attr.startswith('_'):
                value = getattr(embedder, attr)
                print(f"embedder.{attr}: {type(value)}")
                
        # Inspect embed_model if it exists
        if hasattr(embedder, 'embed_model'):
            print("\nEmbed_model attributes:")
            embed_model = embedder.embed_model
            for attr in dir(embed_model):
                if not attr.startswith('_'):
                    value = getattr(embed_model, attr)
                    print(f"embedder.embed_model.{attr}: {type(value)}")

    # Print the actual forward method if it exists
    if hasattr(model, 'forward'):
        print("\nForward method:")
        print(model.forward)

# Run the inspection
inspect_model_deeply(test)


Detailed Model Inspection:

Top-level attributes:
T_destination: <class 'typing.TypeVar'>
add_module: <class 'method'>
apply: <class 'method'>
bfloat16: <class 'method'>
buffers: <class 'method'>
call_super_init: <class 'bool'>
children: <class 'method'>
compile: <class 'method'>
compute_loss: <class 'method'>
cpu: <class 'method'>
cuda: <class 'method'>
decoder: <class 'decoder.gpt.GPTModel'>
double: <class 'method'>
dump_patches: <class 'bool'>
embedder: <class 'embedder.csm.CSMEmbedder'>
encoder: <class 'str'>
eval: <class 'method'>
extra_repr: <class 'method'>
float: <class 'method'>
forward: <class 'method'>
from_pretrained: <class 'method'>
ft_only_encoder: <class 'bool'>
get_buffer: <class 'method'>
get_extra_state: <class 'method'>
get_parameter: <class 'method'>
get_submodule: <class 'method'>
half: <class 'method'>
ipu: <class 'method'>
is_decoding_mode: <class 'bool'>
load_state_dict: <class 'method'>
modules: <class 'method'>
name: <class 'str'>
named_buffers: <class 'meth

In [None]:
import torch

def create_dummy_input(batch_size=1, num_chunks=4, input_size=1024):
    """
    Creates dummy input tensor with attention mask and labels
    Args:
        batch_size: Number of samples in the batch
        num_chunks: Number of chunks per sample
        input_size: Size matching the embedder's first Linear layer (1024)
    """
    # Create random input tensor
    dummy_input = torch.randn(batch_size, num_chunks, input_size)
    
    # Create attention mask (1 for valid tokens, 0 for padding)
    attention_mask = torch.ones(batch_size, num_chunks, dtype=torch.long)
    
    # Create dummy labels tensor of the same shape as input
    # We'll use zeros as dummy labels
    labels = torch.zeros_like(dummy_input)
    
    # Create a batch dictionary as expected by the model
    batch = {
        'inputs': dummy_input,
        'attention_mask': attention_mask,
        'labels': labels
    }
    
    return batch

def get_model_outputs(model, batch):
    """
    Get embeddings using the model's forward pass
    """
    # Set model to eval mode
    model.eval()
    
    # Set encoder to None to avoid the string issue
    model.encoder = None
    
    # Ensure we're not in training mode for masking
    model.embedder.training_style = 'decoding'
    model.is_decoding_mode = True
    
    with torch.no_grad():
        # Use the model's forward pass
        outputs = model(batch)
        return outputs

def test_model(model, device='cpu'):
    """
    Test the model with dummy input
    """
    # Move model to specified device
    model = model.to(device)
    
    # Create dummy input
    batch = create_dummy_input(batch_size=500, num_chunks=8, input_size=1024)
    
    # Move all tensors to the same device
    for key in batch:
        if isinstance(batch[key], torch.Tensor):
            batch[key] = batch[key].to(device)
    
    # Get embeddings
    outputs = get_model_outputs(model, batch)
    
    print(f"Input shape: {batch['inputs'].shape}")
    print(f"Attention mask shape: {batch['attention_mask'].shape}")
    print(f"Labels shape: {batch['labels'].shape}")
    print(f"Output keys: {outputs.keys()}")
    
    for key, value in outputs.items():
        if isinstance(value, torch.Tensor):
            print(f"{key} shape: {value.shape}")
    
    return outputs

def apply_model(model, input, device='cpu'):
    # Move model to specified device
    model = model.to(device)
    # Move all tensors to the same device
    for key in input:
        if isinstance(input[key], torch.Tensor):
            input[key] = input[key].to(device)

    

# Run the test
outputs = test_model(test)

Input shape: torch.Size([500, 8, 1024])
Attention mask shape: torch.Size([500, 8])
Labels shape: torch.Size([500, 8, 1024])
Output keys: dict_keys(['outputs'])
outputs shape: torch.Size([500, 9, 768])


In [8]:
from torch.utils.data import Dataset
from typing import Dict
from scipy.io import loadmat
import numpy as np

class EEGDatasetFromMAT(Dataset):
    def __init__(self, mat_file_path, chunk_len=500, num_chunks=8, ovlp=50, normalization=True):
        # Load the .mat file
        mat_data = loadmat(mat_file_path)
        self.data = mat_data['data']  # Assuming data is stored in a variable named 'data'
        self.labels = mat_data['data_labels'][0]  # Assuming labels are stored in 'data_labels'
        
        self.chunk_len = chunk_len
        self.num_chunks = num_chunks
        self.ovlp = ovlp
        self.do_normalization = normalization

    def __len__(self):
        return self.data.shape[1]  # Number of samples or trials

    def __getitem__(self, idx):
        data_sample = self.data[idx]
        if self.do_normalization:
            data_sample = self.normalize(data_sample)
        chunks = self.split_chunks(data_sample)
        return chunks

    def split_chunks(self, data, length=None, ovlp=None, num_chunks=None):
        if length is None:
            length = self.chunk_len
        if ovlp is None:
            ovlp = self.ovlp
        if num_chunks is None:
            num_chunks = self.num_chunks

        all_chunks = []
        total_len = data.shape[1]
        actual_num_chunks = num_chunks
        
        if num_chunks * length > total_len - 1:
            start_point = 0
            actual_num_chunks = total_len // length
        else:
            start_point = np.random.randint(0, total_len - num_chunks * length)
        
        for _ in range(actual_num_chunks):
            chunk = data[:, start_point: start_point + length]
            all_chunks.append(np.array(chunk))
            start_point += length - ovlp
        
        return np.array(all_chunks)

    def normalize(self, data):
        mean = np.mean(data, axis=-1, keepdims=True)
        std = np.std(data, axis=-1, keepdims=True)
        return (data - mean) / (std + 1e-25)
    
# Example usage
mat_file_path = '/home/kunal/UKB/Neuro_GPT/NeuroGPT_UKB/data/001_data.mat'
dataset = EEGDatasetFromMAT(mat_file_path)

chunks = dataset.split_chunks(dataset.data)

# Store chunks as a dictionary
chunks_dict = {label[0]: chunks[:, i] for i, label in enumerate(dataset.labels)}
# Add a new key with the following label: 'Oz', it is the average of O1 and O2
chunks_dict['Oz'] = (chunks_dict['O1'] + chunks_dict['O2']) / 2
# Store keys with the following labels:Fp1,Fp2,F7,F3,Fz,F4,F8,F9,T7,C3,Cz,C4,T8,F8,P7,P3,Pz,P4,P8,O1,Oz,O2
chunk_filtered = {key: chunks_dict[key] for key in ['Fp1', 'Fp2', 'F7', 'F3', 'Fz', 'F4', 'F8', 'F9', 'T7', 'C3', 'Cz', 'C4', 'T8', 'F8', 'P7', 'P3', 'Pz', 'P4', 'P8', 'O1', 'Oz','O2']}

In [13]:
chunk_filtered['Oz'].shape

(8, 500)