In [None]:
import pickle
import warnings
import numpy as np
from tqdm.auto import tqdm
from scipy import spatial
from scipy.io import loadmat
from scipy.stats import pearsonr
from sklearn.linear_model import Ridge
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression

warnings.filterwarnings("ignore")


In [None]:
def pairwise_accuracy(actual, predicted):
    true = 0
    total = 0
    for i in range(0, len(actual)):
        for j in range(i + 1, len(actual)):
            total += 1

            s1 = actual[i]
            s2 = actual[j]
            b1 = predicted[i]
            b2 = predicted[j]

            result1 = spatial.distance.cosine(s1, b1)
            result2 = spatial.distance.cosine(s2, b2)
            result3 = spatial.distance.cosine(s1, b2)
            result4 = spatial.distance.cosine(s2, b1)

            if result1 + result2 < result3 + result4:
                true += 1

    return true / total


In [None]:
def pearcorr(actual, predicted):
    corr = []
    for i in range(0, len(actual)):
        corr.append(np.corrcoef(actual[i], predicted[i])[0][1])
    return np.mean(corr)


In [None]:
def generate_indices(data):
    Taskindices = []
    for j in data["meta"][0][0][11][0][5]:
        for k in j[0]:
            Taskindices.append(int(k))
    DMNindices = []
    for j in data["meta"][0][0][11][0][6]:
        for k in j[0]:
            DMNindices.append(int(k))
    Visualindices = []
    Visualindices_body = []
    Visualindices_face = []
    Visualindices_object = []
    Visualindices_scene = []
    for j in data["meta"][0][0][11][0][9]:
        for k in j[0]:
            Visualindices_body.append(int(k))
    for j in data["meta"][0][0][11][0][10]:
        for k in j[0]:
            Visualindices_face.append(int(k))
    for j in data["meta"][0][0][11][0][11]:
        for k in j[0]:
            Visualindices_object.append(int(k))
    for j in data["meta"][0][0][11][0][12]:
        for k in j[0]:
            Visualindices_scene.append(int(k))

    for j in data["meta"][0][0][11][0][13]:
        for k in j[0]:
            Visualindices.append(int(k))
    Languageindices_lh = []
    Languageindices_rh = []
    for j in data["meta"][0][0][11][0][7]:
        for k in j[0]:
            Languageindices_lh.append(int(k))
    for j in data["meta"][0][0][11][0][8]:
        for k in j[0]:
            Languageindices_rh.append(int(k))
    return (
        Taskindices,
        DMNindices,
        Visualindices_body,
        Visualindices_face,
        Visualindices_object,
        Visualindices_scene,
        Visualindices,
        Languageindices_lh,
        Languageindices_rh,
    )


In [None]:
TASKS = [
    "bert",
    "coref",
    "ner",
    "nli",
    "paraphrase",
    "qa",
    "sa",
    "srl",
    "ss",
    "sum",
    "wsd",
]
ROIS = [
    "language_lh",
    "language_rh",
    "vision_body",
    "vision_face",
    "vision_object",
    "vision_scene",
    "vision",
    "dmn",
    "task",
]
subjects = ["P01", "M02", "M04", "M07", "M15"]
layers_bert = [
    "block1",
    "block2",
    "block3",
    "block4",
    "block5",
    "block6",
    "block7",
    "block8",
    "block9",
    "block10",
    "block11",
    "block12",
    "fc",
]
layers_bart = ["fc"]


In [None]:
def get_subject_data(subject):
    data_pic1 = loadmat("./pereira_dataset/" + subject + "/data_384sentences.mat")
    data_pic2 = loadmat("./pereira_dataset/" + subject + "/data_243sentences.mat")
    data_pic = loadmat("./pereira_dataset/" + subject + "/data_384sentences.mat")

    data_pic["examples_passagesentences"] = np.concatenate(
        (
            data_pic1["examples_passagesentences"],
            data_pic2["examples_passagesentences"],
        ),
        axis=0,
    )
    data_pic["meta"] = np.concatenate((data_pic1["meta"], data_pic2["meta"]), axis=0)
    (
        Taskindices,
        DMNindices,
        Visualindices_body,
        Visualindices_face,
        Visualindices_object,
        Visualindices_scene,
        Visualindices,
        Languageindices_lh,
        Languageindices_rh,
    ) = generate_indices(data_pic)

    roi_indices = {
        "language_lh": Languageindices_lh,
        "language_rh": Languageindices_rh,
        "vision_body": Visualindices_body,
        "vision_face": Visualindices_face,
        "vision_object": Visualindices_object,
        "vision_scene": Visualindices_scene,
        "vision": Visualindices,
        "dmn": DMNindices,
        "task": Taskindices,
    }

    fmri = {}
    for roi, indices in roi_indices.items():
        fmri[roi] = data_pic["examples_passagesentences"][0:, np.array(indices) - 1]

    return fmri


In [None]:
def create_ensemble(outputs, voxels, typ="lin_reg"):
    voxels = np.array(voxels)
    x_train, x_test, y_train, y_test = train_test_split(
        outputs, voxels, test_size=0.2, random_state=42
    )
    if typ == "lin_reg":
        reg = LinearRegression()
        for x_sentence, y_sentence in zip(x_train, y_train):
            reg.fit(x_sentence, y_sentence)
        y_pred = list()
        for x_sentence in x_test:
            y_pred.append(reg.predict(x_sentence))
        y_pred = np.array(y_pred)
    else:
        pass
    acc = pairwise_accuracy(y_test, y_pred)
    corr = pearcorr(y_test, y_pred)

    return acc, corr


In [None]:
ensemble = {}
output = {}
output["2v2"] = {}
output["pear"] = {}

for subject in tqdm(subjects[:1]):
    ensemble[subject] = {}
    output["2v2"][subject] = {}
    output["pear"][subject] = {}
    fmri = get_subject_data(subject)
    for roi in ROIS[:1]:
        model_outputs = []
        for task in TASKS[:3]:
            print(f"{subject} {roi} {task}")
            data = np.load(f"./features/pereira_{task}.npy", allow_pickle=True)
            feats = np.array(data[-1])
            with open(f"./models/{task}_{roi}_{subject}.model", "rb") as f:
                model = pickle.load(f)

            model_output = model.predict(feats)
            model_outputs.append(model_output)
            del model, data

        model_outputs = np.array(model_outputs)
        model_outputs = np.transpose(model_outputs, (1, 0, 2))
        voxels = np.array(fmri[roi])
        acc, corr = create_ensemble(model_outputs, voxels)
        output["2v2"][subject][roi] = acc
        output["pear"][subject][roi] = corr


In [None]:
# with open(f"results/results_ensemble.pkl", "wb") as f:
#     pickle.dump(output, f)
