In [13]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.linear_model import Ridge
from sklearn.metrics.pairwise import cosine_similarity
from sklearn.preprocessing import StandardScaler
from scipy.stats import pearsonr
import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import requests
from io import BytesIO
from tqdm import tqdm
import pickle
import re
import glob
from pycocotools.coco import COCO
from transformers import CLIPProcessor, CLIPModel
import warnings
warnings.filterwarnings('ignore')

In [14]:
# Set random seed for reproducibility
np.random.seed(42)
torch.manual_seed(42)

# Constants and paths
SUBJ = "subj04"
TRAINING_PATH = f"{SUBJ}/training_split"
IMAGES_PATH = f"{TRAINING_PATH}/training_images"
FMRI_PATH = f"{TRAINING_PATH}/training_fmri"
LH_FMRI_PATH = os.path.join(FMRI_PATH, "lh_training_fmri.npy")
RH_FMRI_PATH = os.path.join(FMRI_PATH, "rh_training_fmri.npy")
STIM_INFO_PATH = "nsd_stim_info_merged.csv"

In [15]:
# Load the CLIP model for multimodal embeddings
print("Loading CLIP model for multimodal embeddings...")
model_name = "openai/clip-vit-base-patch32"
clip_model = CLIPModel.from_pretrained(model_name)
clip_processor = CLIPProcessor.from_pretrained(model_name)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
clip_model = clip_model.to(device)

Loading CLIP model for multimodal embeddings...


In [16]:
def load_stimulus_info():
    """
    Load and preprocess stimulus information from CSV
    """
    stim_info = pd.read_csv(STIM_INFO_PATH, index_col=0)
    important_columns = ['cocoId', 'cocoSplit', 'nsdId']
    stim_info = stim_info[important_columns]
    return stim_info


In [17]:
def extract_nsd_id(image_path):
    """
    Extract nsd_id from the given image path.
    """
    match = re.search(r'nsd-(\d+)', image_path)
    if match:
        return int(match.group(1))
    else:
        raise ValueError("nsd_id not found in the image path")

In [18]:
def get_coco_captions(nsdId, stim_info):
    """
    Get COCO captions for a given NSD image ID
    """
    try:
        # Get COCO ID and split
        coco_id = stim_info[stim_info['nsdId'] == nsdId]['cocoId'].values[0]
        coco_split = stim_info[stim_info['nsdId'] == nsdId]['cocoSplit'].values[0]
        
        # Load COCO annotation data
        coco_annotation_file = f"captions_{coco_split}.json"
        coco_data = COCO(coco_annotation_file)
        coco_ann_ids = coco_data.getAnnIds(coco_id)
        coco_annotations = coco_data.loadAnns(coco_ann_ids)
        
        # Extract captions
        captions = [anno['caption'] for anno in coco_annotations]
        return captions
    except Exception as e:
        print(f"Error getting captions for nsdId {nsdId}: {e}")
        return []

In [None]:
def extract_image_features(image_paths):
    """
    Extract features from images using CLIP model
    """
    all_features = []
    
    for image_path in tqdm(image_paths, desc="Extracting image features"):
        # Load and preprocess the image
        image = Image.open(image_path).convert("RGB")
        inputs = clip_processor(images=image, return_tensors="pt").to(device)
        
        # Extract features
        with torch.no_grad():
            image_features = clip_model.get_image_features(**inputs)
            
        # Normalize the features
        image_features = image_features / image_features.norm(dim=1, keepdim=True)
        
        # Convert to numpy and add to the list
        all_features.append(image_features.cpu().numpy().flatten())
        # if(len(all_features) ==100):
        #     break
    
    return np.array(all_features)

In [8]:
def load_fmri_data():
    """
    Load fMRI data for both hemispheres
    """
    print("Loading fMRI data...")
    lh_fmri = np.load(LH_FMRI_PATH)
    rh_fmri = np.load(RH_FMRI_PATH)
    
    print(f"Left hemisphere data shape: {lh_fmri.shape}")
    print(f"Right hemisphere data shape: {rh_fmri.shape}")
    
    return lh_fmri, rh_fmri

In [9]:
def prepare_dataset():
    """
    Prepare dataset for training and testing
    """
    # Load stimulus info for caption mapping
    stim_info = load_stimulus_info()
    
    # Get all image paths
    image_paths = sorted(glob.glob(os.path.join(IMAGES_PATH, "*.png")))
    print(f"Found {len(image_paths)} images")
    
    # Extract image features
    image_features = extract_image_features(image_paths)
    print(f"Image features shape: {image_features.shape}")
    
    # Load fMRI data
    lh_fmri, rh_fmri = load_fmri_data()
    
    # Create image info with nsd_ids
    image_info = []
    for path in image_paths:
        nsd_id = extract_nsd_id(path)
        image_info.append({
            'path': path,
            'nsd_id': nsd_id
        })
    
    # Split data into train and test sets
    train_indices, test_indices = train_test_split(
        range(len(image_paths)), 
        test_size=0.2, 
        random_state=42
    )
    
    # Prepare train and test datasets
    X_train_lh = lh_fmri[train_indices]
    X_train_rh = rh_fmri[train_indices]
    X_test_lh = lh_fmri[test_indices]
    X_test_rh = rh_fmri[test_indices]
    
    y_train = image_features[train_indices]
    y_test = image_features[test_indices]
    
    train_info = [image_info[i] for i in train_indices]
    test_info = [image_info[i] for i in test_indices]
    
    return (X_train_lh, X_train_rh, y_train, train_info, 
            X_test_lh, X_test_rh, y_test, test_info, 
            stim_info)


In [10]:
def train_decoding_model(X_train, y_train, hemisphere):
    """
    Train ridge regression model for brain decoding
    """
    print(f"Training decoding model for {hemisphere} hemisphere...")
    
    # Standardize the features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    
    # Train Ridge regression model
    ridge = Ridge(alpha=100.0, random_state=42)
    ridge.fit(X_train_scaled, y_train)
    
    return ridge, scaler

In [11]:
def evaluate_model(model, scaler, X_test, y_true):
    """
    Evaluate model performance
    """
    # Scale test data
    X_test_scaled = scaler.transform(X_test)
    
    # Predict embeddings
    y_pred = model.predict(X_test_scaled)
    
    # Calculate cosine similarity between predicted and true embeddings
    cos_sim = np.zeros(len(y_true))
    for i in range(len(y_true)):
        cos_sim[i] = cosine_similarity([y_pred[i]], [y_true[i]])[0][0]
    
    avg_cos_sim = np.mean(cos_sim)
    print(f"Average cosine similarity: {avg_cos_sim:.4f}")
    
    return y_pred, cos_sim

In [12]:
def find_top_matches(y_pred, y_train, n=5):
    """
    Find top n matches in the training set for each predicted embedding
    """
    similarities = cosine_similarity(y_pred, y_train)
    
    top_indices = []
    top_similarities = []
    
    for i in range(similarities.shape[0]):
        # Get indices of top n matches
        top_n_indices = np.argsort(similarities[i])[::-1][:n]
        top_n_similarities = similarities[i][top_n_indices]
        
        top_indices.append(top_n_indices)
        top_similarities.append(top_n_similarities)
    
    return top_indices, top_similarities


In [24]:
def visualize_results(test_idx, top_train_indices, top_similarities, test_info, train_info, stim_info, n=5):
    """
    Visualize top matches for a given test image
    """
    test_path = test_info[test_idx]['path']
    test_nsd_id = test_info[test_idx]['nsd_id']
    test_image = Image.open(test_path)
    test_captions = get_coco_captions(test_nsd_id, stim_info)
    
    plt.figure(figsize=(15, 10))
    
    # Display test image
    plt.subplot(1, n+1, 1)
    plt.imshow(test_image)
    plt.title(f"Test Image (NSD ID: {test_nsd_id})")
    plt.axis('off')
    
    # Display top matches
    for i in range(n):
        train_idx = top_train_indices[test_idx][i]
        train_path = train_info[train_idx]['path']
        train_nsd_id = train_info[train_idx]['nsd_id']
        similarity = top_similarities[test_idx][i]
        
        train_image = Image.open(train_path)
        
        plt.subplot(1, n+1, i+2)
        plt.imshow(train_image)
        plt.title(f"Match {i+1}\nSim: {similarity:.4f}\nNSD ID: {train_nsd_id}")
        plt.axis('off')
    
    plt.tight_layout()
    plt.savefig(f"test_image_{test_idx}_matches.png")
    plt.close()
    
    # Print captions
    print(f"Test Image (NSD ID: {test_nsd_id}) Captions:")
    for i, caption in enumerate(test_captions):
        print(f"  {i+1}. {caption}")
    
    print("\nTop Matches Captions:")
    for i in range(n):
        train_idx = top_train_indices[test_idx][i]
        train_nsd_id = train_info[train_idx]['nsd_id']
        similarity = top_similarities[test_idx][i]
        train_captions = get_coco_captions(train_nsd_id, stim_info)
        
        print(f"Match {i+1} (Similarity: {similarity:.4f}, NSD ID: {train_nsd_id}):")
        for j, caption in enumerate(train_captions):
            print(f"  {j+1}. {caption}")
        print()

In [25]:
def get_model_coefficients(model_lh, model_rh):
    """
    Get model coefficients for brain regions
    """
    coeffs_lh = model_lh.coef_
    coeffs_rh = model_rh.coef_
    
    # Average across output dimensions
    avg_coeffs_lh = np.mean(np.abs(coeffs_lh), axis=0)
    avg_coeffs_rh = np.mean(np.abs(coeffs_rh), axis=0)
    
    return avg_coeffs_lh, avg_coeffs_rh

In [26]:
def plot_coefficient_importance(coeffs_lh, coeffs_rh):
    """
    Plot the importance of voxels based on model coefficients
    """
    plt.figure(figsize=(12, 6))
    
    plt.subplot(1, 2, 1)
    plt.hist(coeffs_lh, bins=50, alpha=0.7)
    plt.title("Left Hemisphere Coefficient Distribution")
    plt.xlabel("Average Absolute Coefficient Value")
    plt.ylabel("Count")
    
    plt.subplot(1, 2, 2)
    plt.hist(coeffs_rh, bins=50, alpha=0.7)
    plt.title("Right Hemisphere Coefficient Distribution")
    plt.xlabel("Average Absolute Coefficient Value")
    plt.ylabel("Count")
    
    plt.tight_layout()
    plt.savefig("coefficient_importance.png")
    plt.close()
    
    # Plot top voxels
    n_top = 100
    top_voxels_lh = np.argsort(coeffs_lh)[::-1][:n_top]
    top_voxels_rh = np.argsort(coeffs_rh)[::-1][:n_top]
    
    plt.figure(figsize=(12, 6))
    
    plt.subplot(1, 2, 1)
    plt.bar(range(n_top), coeffs_lh[top_voxels_lh])
    plt.title(f"Top {n_top} Voxels (Left Hemisphere)")
    plt.xlabel("Voxel Rank")
    plt.ylabel("Coefficient Value")
    
    plt.subplot(1, 2, 2)
    plt.bar(range(n_top), coeffs_rh[top_voxels_rh])
    plt.title(f"Top {n_top} Voxels (Right Hemisphere)")
    plt.xlabel("Voxel Rank")
    plt.ylabel("Coefficient Value")
    
    plt.tight_layout()
    plt.savefig("top_voxels.png")
    plt.close()
    
    return top_voxels_lh, top_voxels_rh

In [27]:
(X_train_lh, X_train_rh, y_train, train_info, 
    X_test_lh, X_test_rh, y_test, test_info, 
    stim_info) = prepare_dataset()

Found 8779 images


Extracting image features: 100%|██████████| 8779/8779 [05:03<00:00, 28.90it/s]


Image features shape: (8779, 512)
Loading fMRI data...
Left hemisphere data shape: (8779, 19004)
Right hemisphere data shape: (8779, 20544)


In [29]:
# Visualize the shapes of the datasets
print("X_train_lh shape:", X_train_lh.shape)
print("X_train_rh shape:", X_train_rh.shape)
print("y_train shape:", y_train.shape)
print("train_info shape:", train_info)
print("X_test_lh shape:", X_test_lh.shape)
print("X_test_rh shape:", X_test_rh.shape)
print("y_test shape:", y_test.shape)
print("test_info shape:", test_info)
print("stim_info shape:", stim_info)

X_train_lh shape: (7023, 19004)
X_train_rh shape: (7023, 20544)
y_train shape: (7023, 512)
train_info shape: [{'path': 'subj04/training_split/training_images/train-3535_nsd-28871.png', 'nsd_id': 28871}, {'path': 'subj04/training_split/training_images/train-0368_nsd-03237.png', 'nsd_id': 3237}, {'path': 'subj04/training_split/training_images/train-2869_nsd-23257.png', 'nsd_id': 23257}, {'path': 'subj04/training_split/training_images/train-6854_nsd-56850.png', 'nsd_id': 56850}, {'path': 'subj04/training_split/training_images/train-6145_nsd-50823.png', 'nsd_id': 50823}, {'path': 'subj04/training_split/training_images/train-7221_nsd-60200.png', 'nsd_id': 60200}, {'path': 'subj04/training_split/training_images/train-0836_nsd-06822.png', 'nsd_id': 6822}, {'path': 'subj04/training_split/training_images/train-1673_nsd-13496.png', 'nsd_id': 13496}, {'path': 'subj04/training_split/training_images/train-4082_nsd-33375.png', 'nsd_id': 33375}, {'path': 'subj04/training_split/training_images/train-7

In [3]:
# Train models for each hemisphere
model_lh, scaler_lh = train_decoding_model(X_train_lh, y_train, "left")

NameError: name 'X_train_lh' is not defined

In [None]:

# Train models for each hemisphere
model_lh, scaler_lh = train_decoding_model(X_train_lh, y_train, "left")
model_rh, scaler_rh = train_decoding_model(X_train_rh, y_train, "right")

# Evaluate models
print("\nEvaluating left hemisphere model:")
y_pred_lh, cos_sim_lh = evaluate_model(model_lh, scaler_lh, X_test_lh, y_test)

print("\nEvaluating right hemisphere model:")
y_pred_rh, cos_sim_rh = evaluate_model(model_rh, scaler_rh, X_test_rh, y_test)

# Average the predictions from both hemispheres
print("\nAveraging predictions from both hemispheres...")
y_pred_avg = (y_pred_lh + y_pred_rh) / 2.0

# Evaluate average predictions
cos_sim_avg = np.zeros(len(y_test))
for i in range(len(y_test)):
    cos_sim_avg[i] = cosine_similarity([y_pred_avg[i]], [y_test[i]])[0][0]

avg_cos_sim = np.mean(cos_sim_avg)
print(f"Average cosine similarity (combined): {avg_cos_sim:.4f}")

# Find top matches in training set
print("\nFinding top matches for predicted embeddings...")
top_indices, top_similarities = find_top_matches(y_pred_avg, y_train, n=5)

# Visualize results for a few test samples
print("\nVisualizing results...")
num_samples = min(5, len(test_info))
for i in range(num_samples):
    print(f"\nSample {i+1}:")
    visualize_results(i, top_indices, top_similarities, test_info, train_info, stim_info)

# Analyze model coefficients
print("\nAnalyzing model coefficients...")
avg_coeffs_lh, avg_coeffs_rh = get_model_coefficients(model_lh, model_rh)
top_voxels_lh, top_voxels_rh = plot_coefficient_importance(avg_coeffs_lh, avg_coeffs_rh)

# Save results
print("\nSaving results...")
results = {
    'y_pred_lh': y_pred_lh,
    'y_pred_rh': y_pred_rh,
    'y_pred_avg': y_pred_avg,
    'cos_sim_lh': cos_sim_lh,
    'cos_sim_rh': cos_sim_rh,
    'cos_sim_avg': cos_sim_avg,
    'top_indices': top_indices,
    'top_similarities': top_similarities,
    'top_voxels_lh': top_voxels_lh,
    'top_voxels_rh': top_voxels_rh
}

with open('brain_decoding_results.pkl', 'wb') as f:
    pickle.dump(results, f)

print("\nDone!")
