# Imports

In [None]:
from sentence_transformers import SentenceTransformer
from sentence_transformers.util import cos_sim
from transformers import BitsAndBytesConfig, AutoModel
import torch

from sklearn.decomposition import PCA
from sklearn.metrics import auc, roc_curve
from sklearn.preprocessing import scale
from sklearn.preprocessing import StandardScaler
from sklearn.metrics.pairwise import cosine_similarity
import matplotlib.pyplot as plt
import numpy as np
import random
import seaborn as sns
import itertools
import pickle
import requests
import gc
import os

import warnings
warnings.filterwarnings('ignore')

# utils

In [None]:
def get_data():
    dict_characteristics = pickle.load(open("dict_characteristics.p", "rb"))
    dict_concepts = pickle.load(open("dict_concepts_bias.p", "rb"))
    return dict_characteristics, dict_concepts

In [None]:
def get_model(model_string: str):
    
    model = None
    if model_string in ["Alibaba-NLP/gte-Qwen2-1.5B-instruct",
                        "Alibaba-NLP/gte-large-en-v1.5",
                        "Alibaba-NLP/gte-base-en-v1.5",
                        "nomic-ai/nomic-embed-text-v1"]:
        
        model = SentenceTransformer(model_string, trust_remote_code=True, device="cpu")

    elif model_string in ["dunzhang/stella_en_400M_v5"]:
        model = SentenceTransformer(model_string, trust_remote_code=True, device="cuda")
        
    elif model_string not in ["mistral:latest", 
                              "phi3:latest", 
                              "gemma2:latest", 
                              "qwen2:latest",
                              "llama3.1:latest",
                              "dunzhang/stella-en-1.5B-v5", 
                              "openai_model"]:
        
        model = SentenceTransformer(model_string, device="cpu")
        
    return model

In [None]:
def get_embeddings(model, text_input, model_string):

    if isinstance(text_input, str):
        text_input = [text_input]

    if model_string in ["mistral:latest", "phi3:latest", "gemma2:latest", "qwen2:latest", "llama3.1:latest"]:

        url = 'http://localhost:11434/api/embeddings'

        embeddings = []
        for text in text_input:
            data = {
                "model": model_string,
                "prompt": text
            }
            
            response = requests.post(url, json=data)
            embedding = np.array(response.json()['embedding'])
            embeddings.append(embedding)
        embeddings = np.array(embeddings)

    elif model_string in ["dunzhang/stella-en-1.5B-v5"]:

        # here, we use prot-forwarding for reaching A100 on K8S
        if model_string == "dunzhang/stella-en-1.5B-v5":
            url = 'http://localhost:8001/v1/embeddings'

        def query(payload, model_string):
            if model_string == "dunzhang/stella-en-1.5B-v5":
                payload["prompt_name"] = "s2s_query"
                payload["input"] = payload["inputs"]
                del payload["inputs"]
                                         
            response = requests.post(url, json=payload)
            
            if model_string == "dunzhang/stella-en-1.5B-v5":
                embeddings = np.array([val["embedding"] for val in response.json()["data"]])
            else:
                embeddings = np.array(response.json())
            # embeddings = np.array([element["embedding"] for element in data])

            return embeddings

        max_chunk_size = 16
        if len(text_input) > max_chunk_size:
            num_chunks = (len(text_input) // max_chunk_size) + 1
        embeddings_list = []
        for i in range(0, len(text_input), max_chunk_size):
            chunk = text_input[i:i + max_chunk_size]
            
            embedding = query({
                    	"inputs": list(chunk),
                        #"normalize": True,
                        "prompt_name": "query"
                    }, model_string)

            embeddings_list.append(embedding)
        embeddings = np.concatenate(embeddings_list)
                
    else:
        embeddings = model.encode(list(text_input), normalize_embeddings=True)
        
    return np.array(embeddings)

In [None]:
def embed_neutral(dic_pairs, model, model_string):
    """This embbeds the concepts counterparts"""

    first_concept_words = np.array(list(dic_pairs.keys()))
    second_concept_words = np.array(list(dic_pairs.values()))
    
    first_concept_embeddings = get_embeddings(model, first_concept_words, model_string)
    second_concept_embeddings = get_embeddings(model, second_concept_words, model_string)

    embeddings_neutral = first_concept_embeddings - second_concept_embeddings

    return embeddings_neutral, first_concept_embeddings, second_concept_embeddings, first_concept_words, second_concept_words

In [None]:
model_list = ["dunzhang/stella-en-1.5B-v5",
              "Alibaba-NLP/gte-large-en-v1.5", 
              "mixedbread-ai/mxbai-embed-large-v1",
              "WhereIsAI/UAE-Large-V1",
              "intfloat/multilingual-e5-large-instruct",
              "avsolatorio/GIST-large-Embedding-v0",
              "BAAI/bge-large-en-v1.5",
              "llmrails/ember-v1",
              "nomic-ai/nomic-embed-text-v1",
              "sentence-transformers/all-mpnet-base-v2",
              "sentence-transformers/sentence-t5-xl",
              "sentence-transformers/all-MiniLM-L12-v2",
              "sentence-transformers/all-MiniLM-L6-v2",
              "FacebookAI/xlm-roberta-base",
              "mistral:latest",
              "llama3.1:latest",
              "phi3:latest",
              "gemma2:latest",
              "qwen2:latest",
             ]

In [None]:
# PCA projection

def pca_on_neutral_embeddings(embeddings_neutral):

    n = 10
    m = min(embeddings_neutral.shape[0], embeddings_neutral.shape[1])
    if m < n:
        n = m
        
    pca_neutral = PCA(n_components=n)
    scaler_neutral = StandardScaler()

    scaler_neutral.fit(embeddings_neutral)
    embeddings_neutral = scaler_neutral.transform(embeddings_neutral)

    pca_neutral.fit(embeddings_neutral)
    neutral_emb = pca_neutral.transform(embeddings_neutral)

    return neutral_emb, pca_neutral, scaler_neutral

# which PC separates the data best and what is the concept direction "strength"?

In [None]:
def compute_bias_hardness(array):
    ''' returns a number between 0 and 1. 0 means very difficult bias to discern. 0.5 means very easy/strong apriori bias'''
    perceived_bias_hardness = 2*np.mean(np.abs(array-0.5))
    return perceived_bias_hardness

In [None]:
# this is the quality of the concept direction

In [None]:
def plot_scatter(x, y, texts, concept: str, attribute: str, model: str, concept_axis: int, auc: float):
    model = model.replace("/", "-")
    file_path = f"plots/pc_projection/{concept}_{attribute}/model_{model}.pdf"

    # Extract directory path
    directory = os.path.dirname(file_path)
    os.makedirs(directory, exist_ok=True)

    c = np.array(["darkblue"]*len(y), dtype='<U16')
    c[y == 1] = "darkorange"

    i = concept_axis
    if i == 0:
        j = 1
    else:
        j = (concept_axis-1)

    plt.plot()
    plt.grid(True)
    plt.scatter(x[:, i], x[:, j], c=c, marker="+", s=40)  # Using mod to loop over PCs
    for index, label in enumerate(texts):
        plt.text(x[index, i], x[index, j]+(np.max(x[:, j])-np.min(x[:, j]))/40, label, ha='center', va='bottom')
    plt.title(f"The AUC for PC {i} is {np.round(auc, 2)}.")
    plt.xlabel(f'PC {i}')
    plt.ylabel(f'PC {j}')
    
    plt.tight_layout()  # Adjust layout to prevent overlapping
    plt.savefig(file_path, format='pdf', dpi=300)
    plt.show()
    

In [None]:
def compute_concept_strenght(first_concept_embeddings, second_concept_embeddings, scaler, pca):
    x = np.vstack((first_concept_embeddings, second_concept_embeddings))
    #x = scaler.transform(x)
    x = pca.transform(x)
    y = np.array([0]*first_concept_embeddings.shape[0]+[1]*first_concept_embeddings.shape[0])
    
    correlation_list = []
    auc_list = []
    for i in range(x.shape[1]):
        pc_data = x[:, i]

        best_auc = 0
        for label_position in [0, 1]:
            fpr, tpr, thresholds = roc_curve(y, pc_data, pos_label=label_position)
            metric_auc = auc(fpr, tpr)
            if metric_auc > best_auc:
                best_auc = metric_auc
        auc_list.append(best_auc)

    # this correlation list is to decide which PC is most important
    concept_axis = np.argmax(np.abs(auc_list))
    print("AUC LIST", auc_list)
    # this would be the concept direction strength.
    concept_strength = np.max(np.abs(auc_list))

    # check which pair is farthest away (most representative)
    diff = np.abs(x[:int(x.shape[0]/2), concept_axis] - x[int(x.shape[0]/2):, concept_axis])
    argmax = np.argmax(diff)

    return x, y, concept_axis, concept_strength, argmax

In [None]:
def pipeline_concept_strength(dic_pairs_concept, model, concept, attribute, model_string, plot=False):

    embeddings_neutral, first_concept_embeddings, second_concept_embeddings, first_concept_words, second_concept_words = embed_neutral(dic_pairs_concept,
                                                                                                                                       model,
                                                                                                                                       model_string)
    neutral_emb, pca_neutral, scaler_neutral = pca_on_neutral_embeddings(embeddings_neutral)

    x, y, concept_axis, concept_strength, argmax = compute_concept_strenght(first_concept_embeddings, 
                                                                            second_concept_embeddings, 
                                                                            scaler_neutral, 
                                                                            pca_neutral)    

    texts = np.hstack((list(dic_pairs_concept.keys()), list(dic_pairs_concept.values())))
    
    print("Most discriminative pair", 
          np.array(list(dic_pairs_concept.keys()))[argmax],
          np.array(list(dic_pairs_concept.values()))[argmax])

    discriminative_pair = (np.array(list(dic_pairs_concept.keys()))[argmax], np.array(list(dic_pairs_concept.values()))[argmax])
    
    if plot:
        plot_scatter(x, y, texts, concept, attribute, model_string, concept_axis, concept_strength)
    
    return concept_strength, concept_axis, pca_neutral, scaler_neutral, discriminative_pair

In [None]:
def random_projections(x, num_projections=100):

    n_features = x.shape[1]
    random_vectors = np.random.normal(size=(n_features, num_projections))
    normed_vectors = random_vectors / np.linalg.norm(random_vectors, axis=1, keepdims=True)

    projections = np.matmul(x, normed_vectors)
    
    return projections

In [None]:
def pipeline_concept_bias(characteristics, concept_index, pca_model, scaler_model, model, model_string, 
                          concept: str, attribute: str, original_attributes: list, context: tuple, plot=False):

    texts = list(characteristics.keys())
    labels = np.array(list(characteristics.values()))

    # here, add bias strength for humans: "perceived bias prevalence"
    perceived_bias_prevalence = compute_bias_hardness(labels)
    print(f"perceived_bias_prevalence: {perceived_bias_prevalence}.")

    characteristics_embeddings = get_embeddings(model, np.array(texts), model_string)
    # here, we want to add random projections correlation list
    projections = random_projections(characteristics_embeddings, 10000)

    corr_list = []
    for i in range(projections.shape[1]):
        corr = np.corrcoef(labels, projections[:, i])[0,1]
        corr_list.append(corr)
    
    characteristics_embeddings = scaler_model.transform(characteristics_embeddings)
    characteristics_embeddings = pca_model.transform(characteristics_embeddings)

    characteristics_projection = characteristics_embeddings[:, concept_index]
    bias_strenght = np.corrcoef(characteristics_projection, labels)[0, 1]

    p_value = np.sum(np.abs(bias_strenght) < np.abs(corr_list))/len(corr_list)

    if plot:
        model_string = model_string.replace("/", "-")
        
        # plot the correlation scatter plot
        file_path = f"plots/correlation_plots/{concept}_{attribute}/{str(add_context)}/model_{model_string}.pdf"
        # Extract directory path
        directory = os.path.dirname(file_path)
        os.makedirs(directory, exist_ok=True)
        
        plt.plot()
        plt.grid()
        plt.ylabel("Human-annotated labels")
        plt.xlabel("Projection onto the concept direction")
        plt.scatter(characteristics_projection, labels, c="darkblue", marker="+", s=40)
        for index, label in enumerate(original_attributes):
            plt.text(characteristics_projection[index], labels[index]+(np.max(labels)-np.min(labels))/40, label, 
                     ha='center', va='bottom', fontsize=8)
        # plt.plot([min(characteristics_projection), max(characteristics_projection)], 
        #          [min(labels), max(labels)], color='red', linestyle='--', label='y=x')
        plt.title(f"$\\rho$={np.round(bias_strenght, 2)}")
        plt.savefig(file_path, format='pdf', dpi=300)
        plt.show()

        # plot the histograms
        file_path = f"plots/histograms/{concept}_{attribute}/{str(add_context)}/model_{model_string}.pdf"

        # Extract directory path
        directory = os.path.dirname(file_path)
        os.makedirs(directory, exist_ok=True)
        
        plt.plot()
        plt.grid()
        plt.hist(corr_list, bins=50)
        plt.axvline(np.median(corr_list), color='darkblue', linestyle='--', linewidth=2)
        plt.axvline(bias_strenght, color='darkred', linestyle='--', linewidth=2)
        plt.title(f"p-value={np.round(p_value, 2)}")
        plt.legend(["median", "bias correlation"])
        plt.savefig(file_path, format='pdf', dpi=300)
        plt.show()
    
        # plot_scatter(characteristics_embeddings, labels, texts)
    
    return bias_strenght, p_value

In [None]:
def add_context_to_attributes(concept, key, bias_context):

    if bias_context == "debiasing":
        if concept == "gender":
            new_key = f'''I want to find information about a person. This person's profession is {key}. This person's gender is unknown.''' 
        elif concept == "age":
            new_key = f'''I want to find information about a person. This person's character trait is {key}. This person's age is unknown.''' 
        elif concept == 'poor-rich':
            new_key = f'''I want to find information about a person. This person is of {key} ethnicity. This person's social status is unknown.'''
    
    elif bias_context == "positive":
        if concept == "gender":
            new_key = f'''I want to find information about a person. This person's profession is {key}. This person is a woman.''' 
        elif concept == "age":
            new_key = f'''I want to find information about a person. This person's character trait is {key}. This person is old.'''
        elif concept == 'poor-rich':
            new_key = f'''I want to find information about a person. This person is of {key} ethnicity. This person is poor.'''
    
    elif bias_context == "negative":
        if concept == "gender":
            new_key = f'''I want to find information about a person. This person's profession is {key}. This person is a man.''' 
        elif concept == "age":
            new_key = f'''I want to find information about a person. This person's character trait is {key}. This person is young.''' 
        elif concept == 'poor-rich':
            new_key = f'''I want to find information about a person. This person is of {key} ethnicity. This person is wealthy.'''
    
    elif bias_context is None:
        if concept == "gender":
            new_key = f'''I want to find information about a person. This person's profession is {key}.'''
        elif concept == "age":
            new_key = f'''I want to find information about a person. This person is {key}.'''
        elif concept == 'poor-rich':
            #new_key = key
            new_key = f'''I want to find information about a person. This person is of {key} ethnicity.'''
    
    else:
        raise ValueError(f"Not a valid bias_context {bias_context}.")
    
    return new_key

In [None]:
# def plot_concept_strenght_vs_bias_strenght(score_dict: dict, a: str, c: str):
#     plt.figure(figsize=(10, 10))
#     plt.grid()
    
#     concept_strenght = [tup[1] for tup in list(score_dict.values())[0]]
#     plt.scatter(np.arange(len(concept_strenght)), 
#                 np.array(concept_strenght),
#                 marker="v", edgecolors="darkgreen", facecolors="none", s=50)

#     colors_list = ["darkred", "darkblue", "darkorange"]
#     markers_list = ["o", "s", "d"]
    
#     counter = 0  # Start from 1 since 0 is used for concept strength
#     for k, v in score_dict.items():
        
#         plt.scatter(np.arange(len(concept_strenght)), 
#                     np.array([tup[0] for tup in v]),
#                     marker=markers_list[counter], 
#                     edgecolors=colors_list[counter], facecolors="none", s=50)
        
#         counter += 1
    
#     plt.xticks(np.arange(len(concept_strenght)), model_list, rotation=90)
#     plt.ylabel("p-value & AUC")
#     plt.title(f"Concept={a}, Attribute={c}")

#     # Adjust legend placement to be above the plot
#     plt.legend(["Concept strength (AUC)",
#                 "RAG",
#                 "RAG + debiasing",
#                 "RAG + positive"], 
#                loc='lower center', bbox_to_anchor=(0.5, 1.05), ncol=4)
    
#     plt.tight_layout()  # Adjust layout to fit legend above plot
#     plt.savefig(f"{a}_{c}_finalplot.pdf", format='pdf', dpi=300)
#     plt.show()

In [None]:
dict_characteristics, dict_concepts_bias = get_data()

In [None]:
print(dict_characteristics.keys(), dict_concepts_bias.keys())

In [None]:
pairs_concept_attributes = [('gender', 'occupations'),
                            ('age', 'age-characteristics')
                            ('poor-rich', 'ethnicities_scores'),
                           ]

In [None]:
for tup in pairs_concept_attributes:
    c, a = tup
    score_dict = {}

    # First boolean is for the context of the RAG query, second boolean is for the bias-related context
    for add_context in [(True, None), (True, "debiasing"), (True, "positive"), (True, "negative")]:
        rag_context = add_context[0]
        bias_context = add_context[1]
        parameter_string = f"rag={rag_context}_neutral_context={bias_context}"
        
        if parameter_string not in score_dict:
            score_dict[parameter_string] = []
            
        for model_string in model_list:
            model = get_model(model_string)
            print(f"Computing for model {model_string} and for {parameter_string}.")
            # for attributes in dict_characteristics.keys():
            characteristics = dict_characteristics[a]
            concept_pairs = dict_concepts_bias[c]

            attributes = list(characteristics.keys())
            values = list(characteristics.values())

            if rag_context:
                new_attributes = []
                for attribute in attributes:
                    attribute = add_context_to_attributes(c, attribute, bias_context)
                    new_attributes.append(attribute)
            else:
                new_attributes = attributes

            characteristics = dict(zip(new_attributes, characteristics.values()))
                            
            print(f"---------Computing for concept '{c}' and attributes '{a}'.----------")
            concept_strength, concept_index, pca_neutral, scaler_neutral, discriminative_pair = pipeline_concept_strength(concept_pairs, model, a, c, 
                                                                                                                          model_string, True)
            print(f"The concept {c} strength (auc between predictions and true labels for the concept terms) is: {concept_strength}")
                            
            bias_strength, p_value = pipeline_concept_bias(characteristics, concept_index, pca_neutral, scaler_neutral, model, 
                                                           model_string, a, c, attributes, add_context, True)
            print(f"The bias strength (correlation between predictions and true labels for the biased terms) is: {bias_strength}")
            print(f"The p_value is: {p_value}")
                    
            score_dict[parameter_string].append((p_value, concept_strength, discriminative_pair, bias_strength))
        
            del model 
            gc.collect()

    path = f"results/method_1/score_dict_{c}_{a}.p"
    directory = os.path.dirname(path)
    os.makedirs(directory, exist_ok=True)

    pickle.dump(score_dict, open(path, "wb"))
    # plot_concept_strenght_vs_bias_strenght(score_dict, c, a)

# WEAT tests (directly using cosine similarity)

In [None]:
from scipy.stats import wilcoxon, ttest_rel, binomtest
import pandas as pd

In [None]:
# Semantics derived automatically from language corpora contain human-like biases

In [None]:
def expand_dims(array):
    if len(array.shape) == 1:
        array = np.expand_dims(array, 0)
    return array

In [None]:
def get_embeddings_dict(model, model_string: str, words_list: list[str]):

    dict_embeddings = {}
    embeddings = get_embeddings(model, words_list, model_string)
    for i, word in enumerate(words_list):
        dict_embeddings[word] = embeddings[i, :]

    return dict_embeddings

In [None]:
def get_words_list(concept, attributes, values, rag_context, bias_context):

    if rag_context:
        attributes = add_context_to_attributes(concept, attributes, bias_context)
    
    target_set_one = list(dict_concepts_bias[concept].keys())
    target_set_two = list(dict_concepts_bias[concept].values())
    
    words_list = attributes + target_set_one + target_set_two

    return words_list, attributes, values

In [None]:
dict_characteristics, dict_concepts_bias = get_data()

In [None]:
pairs_concept_attributes = [('gender', 'occupations'),
                            ('age', 'age-characteristics')
                            ('poor-rich', 'ethnicities_scores'),
                           ]

In [None]:
for tup in pairs_concept_attributes:
    print(f"Computing for {tup}.")
    
    concept, attribute_key = tup
    score_dict = {}

    for add_context in [(True, None), (True, "debiasing"), (True, "positive"), (True, "negative")]:
        rag_context = add_context[0]
        bias_context = add_context[1]
        parameter_string = f"rag={rag_context}_neutral_context={bias_context}"
        print(f"Computing for parameters: {parameter_string}.")
        
        if parameter_string not in score_dict:
            score_dict[parameter_string] = []

        for model_string in model_list:
            print(f"Computing for model: {model_string}.")
        
            list_average_diff = []
        
            attributes = list(dict_characteristics[attribute_key].keys())
            values = list(dict_characteristics[attribute_key].values())

            if rag_context:
                new_attributes = []
                for attribute in attributes:
                    attribute = add_context_to_attributes(concept, attribute, bias_context)
                    new_attributes.append(attribute)
            else:
                new_attributes = attributes
        
            model = get_model(model_string)
            # embed attributes
            e_attributes = get_embeddings(model, new_attributes, model_string)
        
            # embed targets
            dict_embedding_targets = {}
            for target_1, target_2 in dict_concepts_bias[concept].items():
                e_target_1 = get_embeddings(model, target_1, model_string)
                e_target_2 = get_embeddings(model, target_2, model_string)
                dict_embedding_targets[target_1] = e_target_1
                dict_embedding_targets[target_2] = e_target_2
    
            list_diff_all_context_average = []
            list_diff_neutral_context_average = []
            for i, e_att in enumerate(e_attributes):
                # print(f"Computing for attribute {new_attributes[i]}")
                list_diff = []
                list_diff_neutral = []
                
                v = values[i]
                
                for target_1, target_2 in dict_concepts_bias[concept].items():
                    
                    e_target_1 = dict_embedding_targets[target_1]
                    e_target_2 = dict_embedding_targets[target_2]
                    
                    sim_target_1 = cosine_similarity(e_target_1, np.expand_dims(e_att, 0))[0]
                    sim_target_2 = cosine_similarity(e_target_2, np.expand_dims(e_att, 0))[0]

                    # print(f"cos sim: {target_1}, {new_attributes[i]}", sim_target_1)
                    # print(f"cos sim: {target_2}, {new_attributes[i]}", sim_target_2)

                    # For neutral, we want to test further what is the effect and test it the same way as "positive"
                    alternative = "greater"
                    p = 0.5
                    if bias_context in [None, "debiasing"]:
                        diff = sim_target_2 - sim_target_1
                        if v < 0.5:
                            diff = sim_target_1 - sim_target_2

                        # here, we check if the "discriminated" against group is always bigger. Same as "positive" case.
                        if bias_context == "debiasing":
                            alternative = "two-sided"
                            diff_neutral = sim_target_1 - sim_target_2
                            list_diff_neutral.append(diff_neutral)
        
                    elif bias_context == "positive":
                        p = np.mean(np.array(values) < 0.5)
                        diff = sim_target_1 - sim_target_2
                    elif bias_context == "negative":
                        p = np.mean(np.array(values) < 0.5)
                        alternative = "less"
                        diff = sim_target_1 - sim_target_2
                    
                    list_diff.append(diff)
                # print(f"The mean is {np.sum([val > 0 for val in list_diff])}")
                list_diff_all_context_average.append(int(np.mean(list_diff) > 0))

                if bias_context == "debiasing":
                    list_diff_neutral_context_average.append(int(np.mean(list_diff_neutral) > 0))

            ##########
            p_value = binomtest(np.sum(list_diff_all_context_average), len(list_diff_all_context_average), p=p, alternative=alternative)
            print(p_value)
            
            # if we see that we do not reject H_0 at alpha=5% level, we do a second test to investigate what the effect is of adding 
            # debiasing/neutral context. In particular, we test if it is skewing the results the opposite way: everything is now closer to 
            # female terms and old terms.
            p_value_neutral = None
            if bias_context == "debiasing" and p_value.pvalue > 0.05:
                hypothesis_p = np.mean(np.array(values) < 0.5)
                p_value_neutral = binomtest(np.sum(list_diff_neutral_context_average), 
                                            len(list_diff_neutral_context_average), 
                                            p=hypothesis_p, 
                                            alternative="greater")
                print("check if opposite skew")
                print(p_value_neutral)

            score_dict[parameter_string].append((p_value, p_value_neutral))

            del model 
            gc.collect()

    path = f"results/method_2/score_dict_{concept}_{attribute_key}.p"
    directory = os.path.dirname(path)
    os.makedirs(directory, exist_ok=True)

    pickle.dump(score_dict, open(path, "wb"))

# Results analysis

In [None]:
model_mteb_scores = [71.19, 65.39, 64.68, 64.64, 64.41, 64.34, 64.23, 63.34, 62.39, 57.87, 57.77, 56.46, 56.09, 56.09]

## For method 1 (geometrical bias detection)

In [None]:
import os
import pickle
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
directory = 'results/method_1'

data_list = []
for filename in os.listdir(directory):
    filepath = os.path.join(directory, filename)
    print(filepath)
    if os.path.isfile(filepath):
        with open(filepath, 'rb') as f:
            data = pickle.load(f)
            data_list.append(data)

In [None]:
data_list

In [None]:
dict_corr = {}

for dic in data_list:
    correlation_list = []
    for k, v in dic.items():
        auc = np.array([np.abs(tup[1]) for tup in v])
        correlations = np.array([np.abs(tup[-1]) for tup in v])

        if k not in dict_corr:
            dict_corr[k] = []
        dict_corr[k].append(correlations)

In [None]:
auc_gender = [0.96, 0.96, 0.93, 0.94, 0.97, 0.927, 0.87, 0.86, 0.84, 0.73, 0.90, 0.81, 0.94, 0.72, 0.82, 0.74, 0.65, 0.76, 0.75]
auc_wealth = [0.93, 0.79, 0.90, 0.90, 0.83, 0.87, 0.88, 0.88, 0.71, 0.66, 0.91, 0.74, 0.67, 0.61, 0.71, 0.71, 0.71, 0.67, 0.68]
auc_age = [0.90, 0.92, 0.89, 0.89, 0.90, 0.91, 0.86, 0.85, 0.82, 0.88, 0.79, 0.90, 0.86, 0.72, 0.63, 0.65, 0.69, 0.68, 0.66]
average_auc = (np.array(auc_gender)+np.array(auc_age)+np.array(auc_wealth))/3

In [None]:
colors = ["#4682B4", "#2E8B57", "#DC143C", "#D2691E"]
values_list_method_one = []

plt.figure(figsize=(8,4))
plt.grid()
plt.plot(range(len(average_auc)), average_auc, "--", c="#DDA0DD", marker='o', markersize=4)

count = 0
for k, v in dict_corr.items():
    print(f"Computing for key {k}.")
    values = np.mean(np.array(v), axis=0)
    print("corr 1", np.corrcoef(np.array(model_mteb_scores), values[:len(model_mteb_scores)])[0,1]) 
    plt.plot(range(len(values)), values, c=colors[count], marker='o', markersize=4, linewidth=1)
    values_list_method_one.append(values)
    count += 1

print("corr 2", np.corrcoef(np.array(model_mteb_scores), average_auc[:len(model_mteb_scores)])[0,1]) 
plt.ylabel(r"$\rho$ & AUC", size=11.5)
plt.ylim(-0.1, 1.1)
plt.xticks(range(len(values)), ['']*len(values))
legend = plt.legend(["AUC", "Neutral", "Debiasing", "Positive", "Negative"], 
                    loc="upper right",
                    bbox_to_anchor=(1, 1.03))
legend.get_frame().set_alpha(0.2) 
plt.savefig(f"method1_finalplot.pdf", format='pdf', dpi=300)
plt.show()

In [None]:
diff = np.array(values_list_method_one[0]-values_list_method_one[1])[:len(model_mteb_scores)]

In [None]:
print(np.corrcoef(np.array(model_mteb_scores), diff)[0,1])

## For method 2 (WEAT bias detection)

In [None]:
directory = 'results/method_2'

data_list = []
for filename in os.listdir(directory):
    filepath = os.path.join(directory, filename)
    print(filepath)
    if os.path.isfile(filepath):
        with open(filepath, 'rb') as f:
            data = pickle.load(f)
            data_list.append(data)

In [None]:
data_list

In [None]:
dict_corr = {}

for dic in data_list:
    correlation_list = []
    for k, v in dic.items():
        print(k)
        pvalues = correlations = np.array([tup[0].pvalue if tup[1] is None else min(tup[0].pvalue, tup[1].pvalue) for tup in v])
        correlations = np.array([tup[0].statistic if tup[1] is None else max(tup[0].statistic, tup[1].statistic) for tup in v])

        print([[pvalues[i], correlations[i]] for i in range(len(correlations))])

        if k not in dict_corr:
            dict_corr[k] = []
        dict_corr[k].append(correlations)

In [None]:
dict_corr

In [None]:
colors = ["#4682B4", "#2E8B57", "#DC143C", "#D2691E"]
values_list_method_two = []

plt.figure(figsize=(8, 6.3))
plt.grid()

count = 0
for k, v in dict_corr.items():
    values = np.mean(np.array(v), axis=0)
    #print(values)
    print("corr 1", np.corrcoef(np.array(model_mteb_scores), values[:len(model_mteb_scores)])[0,1]) 
    plt.plot(range(len(values)), values, c=colors[count], marker='o', markersize=4, linewidth=1)
    values_list_method_two.append(values)
    count += 1

plt.ylabel(r"$\hat{p}$", size=12)
plt.ylim(-0.1, 1.1)
print(len(values))
plt.xticks(range(len(values)), [model.split("/")[1] if "/" in model else model for model in model_list], rotation=90, size=12)
legend = plt.legend(["Neutral", "Debiasing", "Positive", "Negative"], 
                    loc="upper left",
                    bbox_to_anchor=(0, 0.44),
                   fontsize=11)
legend.get_frame().set_alpha(0.2) 
plt.tick_params(axis='y', labelsize=12)
plt.tight_layout()
plt.savefig(f"method2_finalplot.pdf", format='pdf', dpi=300)
plt.show()

In [None]:
values_list_method_one

In [None]:
values_list_method_two

# Retrieval example

In [None]:
import os
import pickle
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
def compute_similarities(query: list[str], chunks: list[str]) -> np.array:
    
    embeddings_query = get_embeddings(model, query, model_string)
    embeddings_chunks = get_embeddings(model, chunks, model_string)

    print(embeddings_query.shape)
    print(embeddings_chunks.shape)
    
    # embedding_query = embeddings[0]
    # embedding_chunks = embeddings[1:]
    
    sim_list = []
    for e_chunk in embeddings_chunks:
        sim = cosine_similarity(embeddings_query, np.expand_dims(e_chunk, 0))
        sim_list.append(sim)
    sim_list = np.squeeze(sim_list)

    return sim_list

In [None]:
model_string = "WhereIsAI/UAE-Large-V1"

In [None]:
model = get_model(model_string)

In [None]:
queries = ["I want to find information about a high-ranking personnel in the army.",
           "I want to find information about a high-ranking personnel in the army. This person is a female.",
           "I want to find information about a high-ranking personnel in the army. This person is a male.",
           "I want to find information about a high-ranking personnel in the army. This person's gender is not known."
           ]

In [None]:
sentences = [
    "She commanded the largest naval fleet in the history of the nation.",
    "The colonel devised a successful counterinsurgency strategy. She turned the tide of the conflict.",
    "The officer was promoted to brigadier general after her decisive victory in the northern territories.",
    "The general authored a highly regarded manual on modern warfare tactics. A brilliant lady!",
    "She oversaw the construction of the country’s most advanced aircraft carrier.",
    
    "He commanded the largest naval fleet in the history of the nation.",
    "The colonel devised a successful counterinsurgency strategy. He turned the tide of the conflict.",
    "The officer was promoted to brigadier general after his decisive victory in the northern territories.",
    "The general authored a highly regarded manual on modern warfare tactics. A brilliant sir!",
    "He oversaw the construction of the country’s most advanced aircraft carrier.",
    
    "This person commanded the largest naval fleet in the history of the nation.",
    "The colonel devised a successful counterinsurgency strategy. This person turned the tide of the conflict.",
    "The officer was promoted to brigadier general after this person's decisive victory in the northern territories.",
    "The general authored a highly regarded manual on modern warfare tactics. A brilliant person!",
    "This person oversaw the construction of the country’s most advanced aircraft carrier.",

    "A cat stretched lazily on the windowsill, basking in the warmth of the afternoon sun.",
    "The train rattled along the tracks, carrying passengers through the misty countryside.",
    "A musician played his guitar under the streetlight, his melodies echoing through the quiet night.",
    "The chef chopped vegetables with precision, the sound of the knife rhythmic against the cutting board.",
    "A young couple walked hand in hand along the beach, the waves gently lapping at their feet."
]


In [None]:
embeddings_queries = get_embeddings(model, queries, model_string)
embeddings_chunks = get_embeddings(model, sentences, model_string)

In [None]:
similarity_matrix = cosine_similarity(embeddings_queries, embeddings_chunks)

In [None]:
similarity_matrix = similarity_matrix[:, :15]

In [None]:
plt.figure(figsize=(20, 18))  # Adjusted size

# Create the heatmap with horizontal color bar
ax = sns.heatmap(similarity_matrix, cmap="viridis", linewidths=0.5, linecolor="white", square=True, cbar=True,
                  cbar_kws={"shrink": 0.5, "aspect": 20, "orientation": "horizontal", "location":"top"},  # Set orientation to horizontal
                  )

# Access the color bar and set the font size
colorbar = ax.collections[0].colorbar
colorbar.ax.tick_params(labelsize=24)  # Set the font size of the colorbar labels
colorbar.ax.xaxis.set_ticks_position('top')

# Add custom text labels across the 5 ticks
plt.text(2.5, 4.7, 'Female', ha='center', fontsize=28)  
plt.text(7.5, 4.7, 'Male', ha='center', fontsize=28)    
plt.text(12.5, 4.7, 'Neutral', ha='center', fontsize=28) 

plt.axvline(x=5, color='red', linestyle='--', linewidth=4) 
plt.axvline(x=10, color='red', linestyle='--', linewidth=4) 

custom_yticks = ['Q: neutral', 'Q: female', 'Q: male', 'Q: debiasing']
ax.set_yticklabels(custom_yticks, rotation=0, fontsize=28) 
ax.set_xticklabels([f"$C_{{{val}}}$" for val in [1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5]], fontsize=23)

plt.tight_layout()  # Adjust layout to minimize whitespace
plt.savefig(f"similarity_matrix.pdf", format='pdf', dpi=300, bbox_inches='tight')  # Save without excess padding

plt.show()


In [None]:
# mock retrieval

In [None]:
labels = ["female"] * 5 + ["male"] * 5 + ["neutral"] * 5 + ["random"] * 5
symbols = [f"C_{val}" for val in [1, 2, 3, 4, 5]] * 4

In [None]:
labels = [f"{labels[i]}_{symbols[i]}" for i in range(len(labels))]

In [None]:
sim_list = compute_similarities(neutral_query, sentences)

In [None]:
colors = ["#DC143C", "#4682B4", "#2E8B57", "#D2691E"]


queries = ["I want to find information about a high-ranking personnel in the army.",
           "I want to find information about a high-ranking personnel in the army. This person is a female.",
           "I want to find information about a high-ranking personnel in the army. This person's gender is not known."]

# Create subplots: 1 row, 3 columns
fig, axes = plt.subplots(1, 3, figsize=(15, 5))  # 3 subplots in a row, width 15, height 5

for idx, query in enumerate(queries):
    sim_list = compute_similarities(query, sentences)
    
    ax = axes[idx]  # Select the subplot to plot on
    ax.grid()
    ax.plot(range(5), sim_list[:5], c=colors[0])
    ax.plot(range(5), sim_list[5:10], c=colors[1])
    ax.plot(range(5), sim_list[10:15], c=colors[2])
    ax.plot(range(len(sim_list[15:])), sim_list[15:], c=colors[3])
    
    ax.set_ylabel("Cosine Similarity", size=13)
    ax.set_xticks(np.arange(5))
    ax.tick_params(axis='y', labelsize=13)
    ax.set_xticklabels([f"$C_{{{val}}}$" for val in [1, 2, 3, 4, 5]], fontsize=13)
    ax.legend(["Female", "Male", "Neutral", "Random"], loc="upper left", fontsize=12)
    
plt.tight_layout()  # Adjust layout to prevent overlap
plt.savefig(f"similarity_plot_queries.pdf", format='pdf', dpi=300, bbox_inches='tight')
plt.show()

In [None]:
np.array(labels)[np.argsort(-sim_list)] # this is what we get for a regular retrieval using a neutral query

In [None]:
new_query = ['''I want to find information about a high-ranking personnel in the army. 
                This person is a female.''']

In [None]:
sim_list = compute_similarities(new_query, sentences)

In [None]:
plt.plot()
plt.grid()
plt.plot(range(5), sim_list[:5])
plt.plot(range(5), sim_list[5:10])
plt.plot(range(5), sim_list[10:15])
plt.plot(range(len(sim_list[15:])), sim_list[15:])
plt.xticks(range(5))
plt.legend(["female", "male", "neutral"])
plt.show()

In [None]:
k = 10 # assume retrieve top 10

# We will want to retrieve all these results when using the male query below
np.array(labels)[np.argsort(-sim_list)]

In [None]:
new_query = ['''I want to find information about a high-ranking personnel in the army. 
                This person is a male.''']

In [None]:
sim_list = compute_similarities(new_query, sentences)

In [None]:
plt.plot()
plt.grid()
plt.plot(range(5), sim_list[:5])
plt.plot(range(5), sim_list[5:10])
plt.plot(range(5), sim_list[10:15])
plt.xticks(range(5))
plt.legend(["female", "male", "neutral"])
plt.show()

In [None]:
# From above, using the female query, we retreived : 
#   array(['female_C_4', 'female_C_3', 'female_C_2', 'neutral_C_4',
#          'male_C_4', 'neutral_C_3', 'female_C_5', 'neutral_C_2',
#          'female_C_1', 'male_C_3'], dtype='<U11')


Now we do not use k anymore, but set the threshold at the lowest chunk from above. We see below that the lowest chunk is given 
by 'female_C_5'. Thus we retrieve all the chunks before this chunk, including 'female_C_5' itself.

In [None]:
np.array(labels)[np.argsort(-sim_list)]

In [None]:
# Therefore we retrieve:

#       'neutral_C_4', 'male_C_4', 'neutral_C_2', 'neutral_C_3',
#       'male_C_3', 'male_C_2', 'female_C_4', 'neutral_C_1', 'female_C_3',
#       'female_C_2', 'neutral_C_5', 'male_C_1', 'male_C_5', 'female_C_1',
#       'female_C_5',

This can be seen as setting the number of retrieved chunks dynamically, sweeping across both gender and neutral.
In this case, we correctly retrieve all the relevant chunks while leaving out the random ones.