Looking at distance, do we look at urgency score (1-10, 1-5), 

two vectors, invervention, non-intervention

In [1]:
# import required libraries
# standard ml libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# sklearn libraries
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

# deep learning models
import torch
import torch.nn as nn
import torch.optim as optim
import torch, torchvision
import torch.nn.functional as F
from torchvision import transforms, datasets

import umap
from tqdm import tqdm
from transformers import pipeline # hugging face's library for 
from transformers import AutoTokenizer, AutoModelForMaskedLM

In [2]:
# import word2vec embedding tool
import gensim.downloader as api
google_news_vectors = api.load("word2vec-google-news-300")

In [3]:
# Load model directly
# Use a pipeline as a high-level helper
pipe = pipeline("fill-mask", model="medicalai/ClinicalBERT") 
tokenizer = AutoTokenizer.from_pretrained("medicalai/ClinicalBERT")
model = AutoModelForMaskedLM.from_pretrained("medicalai/ClinicalBERT")

Device set to use mps:0


In [4]:
# see what it looks like
text = "The patient presented with chest pain and shortness of breath."
encoded_input = tokenizer(
    text,
    padding=True,  # Pad shorter sequences to the length of the longest sequence
    truncation=True, # Truncate sequences that are longer than the model's maximum input length
    return_tensors='pt'  # Return PyTorch tensors
)

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


In [5]:
class EmbedMedical():

    """
        Given a single string of text, this embedding model uses a version of ClinicalBERT
        called Bio_ClinicalBERT. This embedding tool has been trained on medical data and has a 
        deep medical vocabulary.
    """

    def get_medical_embeddings(text):
        import torch
        from transformers import AutoTokenizer, AutoModel
    
        # create pre-trained ClinicalBERT model anem
        model = "emilyalsentzer/Bio_ClinicalBERT"

        # Load the tokenizer and model
        tokenizer = AutoTokenizer.from_pretrained(model)
        model = AutoModel.from_pretrained(model)

        # Tokenize and encode the text
        encoded_input = tokenizer(
            text,
            max_length = 300,
            padding=True,
            truncation=True,
            return_tensors='pt' # return pytorch tensor
        )

        # Get the model outputs
        with torch.no_grad():
            output = model(**encoded_input)

        # Access the 'last_hidden_state' attribute to get the embeddings
        embedding = output.last_hidden_state[0, 0, :].numpy()
         

        # 'embeddings' have the shape (batch_size, sequence_length, hidden_size)
        return embedding

In [6]:
# Distance Function
import numpy as np
from scipy.stats import ttest_ind

def calculate_distance_metrics(group1, group2):
    """
    Calculate Cohen's D, effect size, and t-test p-value between two sets of embeddings.

    Args:
        group1 (np.ndarray): Embedding vectors for group 1, shape (n_samples, embedding_dim)
        group2 (np.ndarray): Embedding vectors for group 2, shape (n_samples, embedding_dim)

    Returns:
        dict: {
            "cohens_d": float,
            "effect_size": str,
            "p_value": float
        }
    """
    # Ensure inputs are numpy arrays
    group1 = np.array(group1)
    group2 = np.array(group2)

    # Compute the mean vector for each group
    group1_mean = group1.mean(axis=0)
    group2_mean = group2.mean(axis=0)

    # Compute the pooled standard deviation
    pooled_std = np.sqrt(((group1.std(axis=0, ddof=1) ** 2) + (group2.std(axis=0, ddof=1) ** 2)) / 2)

    # Compute Cohen's D for each dimension and average it
    cohens_d_vector = (group1_mean - group2_mean) / pooled_std
    cohens_d = np.mean(cohens_d_vector)

    # Interpret effect size
    if abs(cohens_d) < 0.2:
        effect = "small"
    elif abs(cohens_d) < 0.5:
        effect = "medium"
    else:
        effect = "large"

    # Compute t-test
    t_stat, p_val = ttest_ind(group1, group2, equal_var=False)

    return {
        "cohens_d": cohens_d,
        "effect_size": effect,
        "p_value": float(np.mean(p_val))  # average across dimensions
    }


In [None]:
embedding_group_1 = [EmbedMedical.get_medical_embeddings(txt) for txt in texts_group_1]
embedding_group_2 = [EmbedMedical.get_medical_embeddings(txt) for txt in texts_group_2]

result = calculate_distance_metrics(embedding_group_1, embedding_group_2)
print(result)


In [22]:
# embedding1 = EmbedMedical.get_medical_embeddings("You should rest for 5 days and call us back if you have any concerns or things worsen.")
# embedding2 = EmbedMedical.get_medical_embeddings("You should rest for 5 dahave any concerns or things worsen.")

# distance = function(embedding1, embedding2)

In [24]:
synthetic['recommendation_embed'] = sythentic['recommendation'].apply(EmbedMedical.get_medical_embeddings())

sythentic['dist_vector_1'] = 
sythentic['dist_vector_2'] = 

array([ 1.54784441e-01, -1.54275864e-01, -3.92404258e-01,  1.79212734e-01,
       -1.37097776e-01,  3.45092565e-02,  1.77666575e-01,  2.31356546e-02,
        5.67118227e-01, -1.48763403e-01, -1.00918282e-02, -2.31822252e-01,
       -6.36753321e-01, -2.10905239e-01, -4.11169350e-01,  2.55564779e-01,
        1.86052412e-01, -3.36936563e-01, -4.05172050e-01, -1.82731133e-02,
        6.64738491e-02,  3.25947702e-01, -3.71549189e-01, -2.08111003e-01,
        2.22359076e-01,  2.04471141e-01,  3.48660111e-01,  3.55831832e-01,
        2.29404375e-01,  3.23382646e-01,  4.16044623e-01, -5.44602871e-02,
        1.62829310e-02, -2.38350227e-01, -2.17302278e-01,  8.07436258e-02,
        1.83334410e-01,  6.73272371e-01,  2.94341333e-03, -1.92279324e-01,
       -1.86912775e-01,  1.30772382e-01,  9.20590758e-01, -2.93336838e-01,
        2.86309540e-01, -3.54130507e-01,  3.21948886e-01,  3.41619551e-01,
       -2.91189611e-01,  1.67546734e-01,  2.28995413e-01,  3.33767831e-01,
       -6.14488199e-02, -

In [None]:

# To get a single vector representation for each text, you can:

# 1. Take the embedding of the [CLS] token (the first token)
cls_embeddings = embeddings[:, 0, :]  # Shape: (batch_size, hidden_size)
print("CLS Embeddings shape:", cls_embeddings.shape)
print("Example CLS Embedding:", cls_embeddings[0, :5]) # Print the first 5 elements of the first embedding

In [None]:
# create analogy helper function
def analogy(word1: str, word2: str, word3: str, model: 'KeyedVectors' = google_news_vectors) -> 'pd.DataFrame':
    """
    Returns analogy word using the given word embedding model.
    
    Finds the word that completes the analogy relation word1:word2::word3:? using vector arithmetic
    in the embedding space. For example, "king:man::woman:?" would return "queen" as the top result.

    Parameters
    ----------
    word1 : str
        First word in the analogy relation (e.g. "king")
    word2 : str  
        Second worad in the analogy relation (e.g. "man")
    word3 : str
        Third word in the analogy relation (e.g. "woman") 
    model : gensim.models.KeyedVectors
        Word embedding model containing the word vectors

    Returns
    -------
    pd.DataFrame
        DataFrame containing the top analogy words and their similarity scores
        Columns are ["Analogy word", "Score"]
    """
    print("%s : %s :: %s : ?" % (word1, word2, word3))
    sim_words = model.most_similar(positive=[word3, word2], negative=[word1])
    return pd.DataFrame(sim_words, columns=["Analogy word", "Score"])

In [None]:
# create wefat test
def weat_test(X, Y, A, B, model=google_news_vectors, n_samples=10000):
    """
    Performs the Word Embedding Association Test (WEAT).
    
    Parameters
    ----------
    X : list
        First set of target words
    Y : list
        Second set of target words
    A : list
        First set of attribute words
    B : list
        Second set of attribute words
    model : gensim.models.KeyedVectors
        Word embedding model containing the word vectors
    n_samples : int
        Number of permutation samples for the p-value calculation
        
    Returns
    -------
    dict
        Dictionary containing test results:
        - 'effect_size': Normalized measure of separation between distributions
        - 'p_value': One-sided p-value from the permutation test
        - 'association_X': Mean association scores for words in X
        - 'association_Y': Mean association scores for words in Y
    """
    import numpy as np
    from scipy import stats
    
    # Check if words are in vocabulary
    all_words = X + Y + A + B
    missing_words = [word for word in all_words if word not in model.key_to_index]
    if missing_words:
        print(f"Warning: The following words are not in the model vocabulary: {missing_words}")
        
    # Filter out missing words
    X = [x for x in X if x in model.key_to_index]
    Y = [y for y in Y if y in model.key_to_index]
    A = [a for a in A if a in model.key_to_index]
    B = [b for b in B if b in model.key_to_index]
    
    if not (X and Y and A and B):
        raise ValueError("After filtering, at least one word set is empty")
    
    # Function to calculate association of a word with attributes
    def s(w, A, B):
        """
        Measures association of word w with attribute sets A and B
        s(w, A, B) = mean_{a∈A}cos(w,a) - mean_{b∈B}cos(w,b)
        """
        return np.mean([model.similarity(w, a) for a in A]) - np.mean([model.similarity(w, b) for b in B])
    
    # Calculate association scores for all target words
    x_scores = [s(x, A, B) for x in X]
    y_scores = [s(y, A, B) for y in Y]
    w_scores = x_scores + y_scores
    
    # Calculate observed test statistic
    test_statistic = sum(x_scores) - sum(y_scores)
    
    # Calculate effect size
    effect_size = (np.mean(x_scores) - np.mean(y_scores)) / np.std(w_scores, ddof=1)
    
    # Permutation test
    target_words = X + Y
    count = 0
    for _ in range(n_samples):
        np.random.shuffle(target_words)
        Xi = target_words[:len(X)]
        Yi = target_words[len(X):]
        xi_scores = [s(x, A, B) for x in Xi]
        yi_scores = [s(y, A, B) for y in Yi]
        sample_test_statistic = sum(xi_scores) - sum(yi_scores)
        if sample_test_statistic > test_statistic:
            count += 1
    
    p_value = count / n_samples
    
    return {
        'effect_size': effect_size,
        'p_value': p_value,
        'association_X': np.mean(x_scores),
        'association_Y': np.mean(y_scores)
    }

# Example usage
# Testing for gender bias in occupation words
X = ["programmer", "engineer", "scientist", "developer", "mathematician"]  # Stereotypically male
Y = ["nurse", "teacher", "librarian", "receptionist", "homemaker"]  # Stereotypically female
A = ["man", "male", "he", "him", "his"]  # Male attributes
B = ["woman", "female", "she", "her", "hers"]  # Female attributes

results = weat_test(X, Y, A, B)
print(f"Effect size: {results['effect_size']:.4f}")
print(f"P-value: {results['p_value']:.4f}")
print(f"Mean association of X with male vs. female attributes: {results['association_X']:.4f}")
print(f"Mean association of Y with male vs. female attributes: {results['association_Y']:.4f}")

Effect size: 1.7604
P-value: 0.0000
Mean association of X with male vs. female attributes: 0.0352
Mean association of Y with male vs. female attributes: -0.1590
