In [48]:
import os
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
from tqdm import tqdm
from glob import glob
import pickle

from PIL import ImageColor, Image
import matplotlib.colors

from utils import NN_FEATURES_DIR, RESULTS_DIR, SUBJECTS, NUM_TEST_STIMULI, FMRI_SURFACE_LEVEL_DIR, STIM_INFO_PATH, COCO_IMAGES_DIR, STIMULI_IDS_PATH
from analyses.ridge_regression_decoding import NUM_CV_SPLITS, RIDGE_DECODER_OUT_DIR, calc_rsa, calc_rsa_images, calc_rsa_captions, get_fmri_data, pairwise_accuracy, \
ACC_MODALITY_AGNOSTIC, ACC_CAPTIONS, ACC_IMAGES, ACC_CROSS_IMAGES_TO_CAPTIONS, ACC_CROSS_CAPTIONS_TO_IMAGES, ACC_IMAGERY, ACC_IMAGERY_WHOLE_TEST, get_default_features, get_default_vision_features, get_default_lang_features, Standardize, IMAGE, CAPTION, get_distance_matrix, dist_mat_to_pairwise_acc, get_fmri_data_paths, get_nn_latent_data

from notebook_utils import add_avg_subject, create_result_graph, plot_metric_catplot, plot_metric, load_results_data, ACC_MEAN, ACC_CROSS_MEAN, PALETTE_BLACK_ONLY, METRICS_ERROR_ANALYSIS, get_data_default_feats, METRICS_BASE

from feature_extraction.feat_extraction_utils import CoCoDataset 

# Nearest Neighbors of test images

In [49]:
MODELS = ["imagebind"] # imagebind gpt2-large blip2
all_data = load_results_data(MODELS, metrics=METRICS_BASE+METRICS_ERROR_ANALYSIS, recompute_acc_scores=False)
data_default_feats = get_data_default_feats(all_data)

100%|██████████| 507/507 [00:07<00:00, 71.18it/s]


In [50]:
MODEL = MODELS[0]
df_model = data_default_feats[data_default_feats.model == MODEL]

train_latents = dict()
train_stim_ids = dict()
for subject in tqdm(SUBJECTS):
    assert len(df_model.vision_features.unique()) == 1
    features = df_model.features.values[0]
    # print(features)
    vision_features = df_model.vision_features.values[0]
    # print(vision_features)
    lang_features = df_model.lang_features.values[0]
    # print(lang_features)
    _, stim_ids, stim_types = get_fmri_data_paths(subject, "train")
    unique_train_stim_ids, idx_unique = np.unique(stim_ids, return_index=True)
    train_stim_ids[subject] = unique_train_stim_ids
    latents, _ = get_nn_latent_data(MODEL, features,
                                    vision_features,
                                    lang_features,
                                    stim_ids,
                                    stim_types,
                                    subject,
                                    "train"
                                    )
    train_latents[subject] = latents[idx_unique]

100%|██████████| 6/6 [00:33<00:00,  5.56s/it]


In [51]:
coco_ds = CoCoDataset(COCO_IMAGES_DIR, STIM_INFO_PATH, STIMULI_IDS_PATH, 'both')

In [52]:
MAX_HEIGHT = 500

def display_stimuli(coco_ids):
    # print(coco_ids)
    print("nearest neighbor captions: ")
    for coco_id in coco_ids:
        print(coco_ds.get_stimuli_by_coco_id(coco_id)[1], end="\n")

    print("nearest neighbor images: ")
    imgs = [np.array(coco_ds.get_img_by_coco_id(img_id)) for img_id in coco_ids]
    # min_height = np.min([np.array(im).shape[0] for im in imgs])
    imgs = [np.vstack((img, np.repeat(255, max(0, MAX_HEIGHT - img.shape[0]) * img.shape[1]* img.shape[2]).reshape((max(0, MAX_HEIGHT - img.shape[0]), img.shape[1], img.shape[2])).astype(img.dtype)))[:MAX_HEIGHT,:,:] for img in imgs]
    stacked = np.hstack(imgs)
    img = Image.fromarray(stacked)
    display(img)
    

In [53]:
results = pickle.load(open('/home/mitja/data/multimodal_decoding/decoders/images/sub-01/imagebind_avg_test_avg_vision_features_cls_lang_features_cls/results.p', 'rb'))

In [56]:
mean_preds = results['predictions'].cpu().numpy()
pickle.dump(mean_preds, 'preds.p')

TypeError: file must have a 'write' attribute

## With predictions averaged over all subjects:

In [None]:
NUM_SAMPLES = 5
N_NEIGHBORS = 5
# training_mode = 'modality-agnostic'
training_mode = 'images'

df = data_default_feats.copy()

df = df[df.model == MODEL]

df = df[df.training_mode == training_mode]

# mask = 'whole_brain'
# df = df[df['mask'] == mask]
df = df[df.surface == False]

# features_candidate_set = "avg"
# df_candidate_set = df[df.features == features_candidate_set].copy()

# df = df[df.features == features]

assert len(df[df.metric == "predictions"]) == len(SUBJECTS)

for stimulus_type in [IMAGE, CAPTION]:
    print(f"fMRI stimulus modality: {stimulus_type}")
    all_test_predictions = []
    for subject in SUBJECTS:
        df_subj = df[df.subject == subject]
        
        test_predictions = df_subj[df_subj.metric == "predictions"].value.item()
        test_latents = df_subj[df_subj.metric == "latents"].value.item()
        test_stimulus_ids = df_subj[df_subj.metric == "stimulus_ids"].value.item()
        stimulus_types = df_subj[df_subj.metric == "stimulus_types"].value.item()
    
        test_latents = test_latents[stimulus_types == stimulus_type]
        test_stimulus_ids = test_stimulus_ids[stimulus_types == stimulus_type]
        test_predictions = test_predictions[stimulus_types == stimulus_type]
        
        # candidate_set_latents = test_latents
        # candidate_set_latents_ids = test_stimulus_ids
        # ((x - self.mean) / self.std)
        # print(test_predictions.mean(axis=1).shape)
        # test_predictions = (test_predictions - test_predictions.mean(axis=1).reshape((70, 1)) ) / test_predictions.std(axis=1).reshape((70, 1))
        # preds_standardize = Standardize(test_predictions.mean(axis=0), test_predictions.std(axis=0))
        # test_predictions = preds_standardize(test_predictions)
        all_test_predictions.append(test_predictions)
    
    
    # test_predictions_averaged = np.mean(all_test_predictions, axis=0)
    test_predictions_averaged = mean_preds[stimulus_types == stimulus_type]
    preds_standardize = Standardize(test_predictions_averaged.mean(axis=0), test_predictions_averaged.std(axis=0)) #TODO
    test_predictions_averaged = preds_standardize(test_predictions_averaged)
    
    candidate_set_latents = np.concatenate((train_latents['sub-01'], test_latents))
    candidate_set_latents_ids = np.concatenate((train_stim_ids['sub-01'], test_stimulus_ids))
        
    dist_mat = get_distance_matrix(test_predictions_averaged, candidate_set_latents)
    # acc = dist_mat_to_pairwise_acc(dist_mat)
    # print(f"Pairwise acc: {acc:.2f}")
    np.random.seed(7)
    sampled_ids = np.random.choice(range(len(test_stimulus_ids)), NUM_SAMPLES, replace=False)
    test_stimulus_ids = test_stimulus_ids[sampled_ids]
    dist_mat = dist_mat[sampled_ids]
    
    for test_stimulus_id, nneighbors_row in zip(test_stimulus_ids, dist_mat):
        print(f"test stimulus: {test_stimulus_id}")
        if stimulus_type == CAPTION:
            print(coco_ds.get_stimuli_by_coco_id(test_stimulus_id)[1])
        else:
            img = coco_ds.get_img_by_coco_id(test_stimulus_id)
            new_width  = 400
            new_height = round(new_width * img.height / img.width)
            display(img.resize((new_width, new_height)))
        target_location = np.argwhere(candidate_set_latents_ids == test_stimulus_id)[0][0]
        nneighbors_ids = candidate_set_latents_ids[np.argsort(nneighbors_row)]
        rank = np.argwhere(nneighbors_ids == test_stimulus_id)[0][0]
        # all_ranks.append(rank)
        # display(f"distance to target: {nneighbors_row[target_location]:.3f} | rank: {rank} of {len(nneighbors_row)}")
        # display(f"nearest neighbors distances: {np.sort(nneighbors_row)[:10]}")
        display_stimuli(nneighbors_ids[:N_NEIGHBORS])
        print("\n")
        
    # print(f"mean rank: {np.mean(all_ranks)}")
    print("\n\n")

## Per-subject:

In [None]:
NUM_SUBJECTS = 2

training_mode = 'modality-agnostic'
# training_mode = 'images'

df = data_default_feats.copy()

df = df[df.model == MODEL]
df = df[df.training_mode == training_mode]
df = df[df.surface == False]

assert len(df[df.metric == "predictions"]) == len(SUBJECTS)

for subject in SUBJECTS[:NUM_SUBJECTS]:
    print(f"\n\nSubject: {subject}")

    for stimulus_type in [IMAGE, CAPTION]:
        all_ranks = []
        print(f"fMRI stimulus modality: {stimulus_type}")
        df_subj = df[df.subject == subject]
        
        test_predictions = df_subj[df_subj.metric == "predictions"].value.item()
        test_latents = df_subj[df_subj.metric == "latents"].value.item()
        test_stimulus_ids = df_subj[df_subj.metric == "stimulus_ids"].value.item()
        stimulus_types = df_subj[df_subj.metric == "stimulus_types"].value.item()
    
        test_latents_in_mod = test_latents[stimulus_types == stimulus_type]
        test_stimulus_ids_in_mod = test_stimulus_ids[stimulus_types == stimulus_type]
        test_predictions_in_mod = test_predictions[stimulus_types == stimulus_type]
       
        candidate_set_latents = np.concatenate((train_latents[subject], test_latents_in_mod))
        candidate_set_latents_ids = np.concatenate((train_stim_ids[subject], test_stimulus_ids_in_mod))
    
        preds_standardize = Standardize(test_predictions_in_mod.mean(axis=0), test_predictions_in_mod.std(axis=0))
        test_predictions_in_mod = preds_standardize(test_predictions_in_mod)
    
        # targets_standardize = Standardize(candidate_set_latents.mean(axis=0), candidate_set_latents.std(axis=0))
        # candidate_set_latents = targets_standardize(candidate_set_latents)
    
        dist_mat = get_distance_matrix(test_predictions_in_mod, candidate_set_latents) #, metric="euclidean"
        acc = dist_mat_to_pairwise_acc(dist_mat)
        print(f"Pairwise acc: {acc:.2f}")
        np.random.seed(7)
        sampled_ids = np.random.choice(range(len(test_stimulus_ids_in_mod)), NUM_SAMPLES, replace=False)
        test_stimulus_ids_in_mod = test_stimulus_ids_in_mod[sampled_ids]
        dist_mat = dist_mat[sampled_ids]
        
        for test_stimulus_id, nneighbors_row in zip(test_stimulus_ids_in_mod, dist_mat):
            print(f"test stimulus: {test_stimulus_id}")
            if stimulus_type == CAPTION:
                print(coco_ds.get_stimuli_by_coco_id(test_stimulus_id)[1])
            else:
                display(coco_ds.get_img_by_coco_id(test_stimulus_id))
            target_location = np.argwhere(candidate_set_latents_ids == test_stimulus_id)[0][0]
            nneighbors_ids = candidate_set_latents_ids[np.argsort(nneighbors_row)]
            rank = np.argwhere(nneighbors_ids == test_stimulus_id)[0][0]
            all_ranks.append(rank)
            # display(f"distance to target: {nneighbors_row[target_location]:.3f} | rank: {rank} of {len(nneighbors_row)}")
            # display(f"nearest neighbors distances: {np.sort(nneighbors_row)[:10]}")
            display_stimuli(nneighbors_ids[:N_NEIGHBORS])
            print("\n")
        
        # print(f"mean rank: {np.mean(all_ranks)}")
        print("\n\n")
