In [None]:
from transformers import AutoModelForMaskedLM
from transformers import AutoTokenizer
import torch
from collections import Counter
import transformers
from transformers import pipeline
from datasets import load_dataset
from transformers import DataCollatorForLanguageModeling
from transformers import TrainingArguments
from transformers import Trainer
import math
from torch.utils.data import DataLoader
from transformers import default_data_collator
from torch.optim import AdamW
from accelerate import Accelerator
from transformers import get_scheduler
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
import preprocessing
import pickle
import pandas as pd
import os
import re
import comprehension_model

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
model_checkpoint = "KBLab/bert-base-swedish-cased"
tokenizer =preprocessing.create_tokenizer(model_checkpoint)

In [None]:
model_kb = preprocessing.create_model_MLM(model_checkpoint)
model_kb=model_kb.to(device)

In [None]:
tokenizer =preprocessing.create_tokenizer(model_checkpoint)

In [None]:
with open("lm_dataset.pkl","rb") as f:
    lm_datasets= pickle.load(f)

In [None]:
with open("valid_dataset.pkl","rb") as f:
    valid_dataset= pickle.load(f)

valid_dataset

In [None]:
valid_dataset=valid_dataset.remove_columns(["word_ids"])
data_collator = preprocessing.data_collector_masking(tokenizer,0.15)
lm_dataset_bis = lm_datasets.remove_columns(["word_ids","token_type_ids"])

print(lm_dataset_bis["test"])
eval_dataset = preprocessing.create_deterministic_eval_dataset(lm_dataset_bis["test"],data_collator)
valid_dataset=preprocessing.create_deterministic_eval_dataset(valid_dataset,data_collator)

In [None]:
from datasets import Dataset
#valid_dataset=valid_dataset.remove_columns(["word_ids"])
data_collator = preprocessing.data_collector_masking(tokenizer,0.15)
small_valid_dataset = preprocessing.create_deterministic_eval_dataset(valid_dataset.select(range(10000)),data_collator)
small_valid_dataloader=preprocessing.create_dataloader(small_valid_dataset,64,default_data_collator)
                                                                      

In [None]:
word = "Statsrådet"
token_id = tokenizer.convert_tokens_to_ids(word)

In [None]:
def special_token(token,example):
    return token in example['labels']

In [None]:
valid_filtered_dataset = valid_dataset.filter(lambda example : special_token(token_id,example))

In [None]:
valid_filtered_dataset

In [None]:

valid_filtered_dataloader=preprocessing.create_dataloader(valid_filtered_dataset,64,default_data_collator)

In [None]:
batch_size = 64
train_dataloader = preprocessing.create_dataloader(lm_dataset_bis["train"],batch_size,data_collator)
def to_device(batch):
    return {key: value.to(device) for key, value in batch.items()}

print("ok")
eval_dataloader = preprocessing.create_dataloader(eval_dataset,batch_size,default_data_collator)
valid_dataloader=preprocessing.create_dataloader(valid_dataset,batch_size,default_data_collator)

In [None]:
model_hugging_face = AutoModelForMaskedLM.from_pretrained("finetuning_hugging_whitespace_bis-finetuned-imdb/checkpoint-2061000")
model_hugging_face=model_hugging_face.to(device)

In [None]:
model_exbert = AutoModelForMaskedLM.from_pretrained("exbert-finetuned-imdb/checkpoint-1271340")
model_exbert=model_exbert.to(device)

In [None]:
import transformers
config = transformers.BertConfig.from_pretrained("pretraining_from_scratch/checkpoint-3944175")
mosaicBert = AutoModelForMaskedLM.from_pretrained("pretraining_from_scratch/checkpoint-3944175",config=config,trust_remote_code=True)

In [None]:

  
valid_sentence_filtered = valid_filtered_dataset.map(lambda example : preprocessing.get_context_with_mask(example,token_id,tokenizer))

In [None]:
def get_embeddings_bis(model,dataloader, tokenizer,token_id):
    model.eval()
    model.to(device)
    layerwise_embeddings = [[] for _ in range(model.config.num_hidden_layers + 1)]
    preds=[]
    for batch in dataloader :
        tokens={key : value.to(device) for key,value in batch.items()}
        if token_id not in list(batch["labels"][0]):
            continue
        index=list(batch["labels"][0]).index(token_id)
        outputs= model(input_ids=tokens["input_ids"],attention_mask=tokens["attention_mask"],labels=tokens["labels"],output_hidden_states=True)
        preds.append(torch.argmax(F.softmax(outputs.logits.squeeze(0)[index])))
        hidden_states = outputs.hidden_states  # tuple of (layer+1) tensors, each of shape (batch_size, seq_len, hidden_size)
        for i, hidden_state in enumerate(hidden_states):
            masked_embeddings = hidden_state[:, index, :].detach().cpu().numpy()  # Extract [CLS] token
            layerwise_embeddings[i].append(masked_embeddings)
    return [np.vstack(layer) for layer in layerwise_embeddings],preds


In [None]:
from sklearn.decomposition import PCA
inputs = valid_sentence_filtered[3]
print(tokenizer.decode(inputs["input_ids"]))
token = {key: torch.tensor(value, dtype=torch.long).unsqueeze(0).to(device) for key,value in inputs.items()}
outputs = model_hugging_face(input_ids=token["input_ids"],attention_mask=token["attention_mask"],labels=token["labels"],output_hidden_states=True)
outputs2=model_kb(input_ids=token["input_ids"],attention_mask=token["attention_mask"],labels=token["labels"],output_hidden_states=True)
tokens = tokenizer.convert_ids_to_tokens(token["input_ids"].squeeze())
index=inputs["labels"].index(token_id)
hidden_states = outputs.hidden_states
hidden_states_kb = outputs2.hidden_states
last_hidden_state = hidden_states[-1].squeeze().detach().cpu().numpy()
print(tokenizer.decode(torch.argmax(outputs.logits.squeeze()[index])))
print(tokenizer.decode(torch.argmax(outputs2.logits.squeeze()[index])))
def plot_pca_hidden_state(hidden_state,tokens,number):
    pca = PCA(n_components=2)
    reduced_states = pca.fit_transform(hidden_state.squeeze().detach().cpu().numpy())

    # Préparer la figure pour la visualisation
    plt.figure(figsize=(14, 8))
    for i, token in enumerate(tokens):
        if token in ["[CLS]","[SEP]"]:
            color = 'green'  # Token spécial
        elif token == '[MASK]':
            color = 'purple'  # Token masqué
        else:
            color = 'cyan'  # Token de contexte
        
        plt.scatter(reduced_states[i, 0], reduced_states[i, 1], c=color, label=token)
        plt.text(reduced_states[i, 0], reduced_states[i, 1], token, fontsize=9)

    plt.xlabel("PC 1")
    plt.ylabel("PC 2")
    plt.title(f"PCA of hidden state for one sentence at layer {number} ")
    plt.legend(handles=[
        plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='cyan', markersize=10, label='Token de Contexte'),
        plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='green', markersize=10, label='Token Spécial'),
        plt.Line2D([0], [0], marker='o', color='w', markerfacecolor='purple', markersize=10, label='Token Masqué')
    ], loc='upper right')

    plt.show()

for j in range(13):
    print("hugging face")
    plot_pca_hidden_state(hidden_states[j],tokens,j)
    print("kb")
    plot_pca_hidden_state(hidden_states_kb[j],tokens,j)

In [None]:
for layer in range(13):
        combined_embeddings = np.concatenate([baseline_embeddings[layer], finetuned_embeddings[layer]])
        tsne = TSNE(n_components=2, random_state=42)
        X_tsne = tsne.fit_transform(combined_embeddings)
        plt.figure(figsize=(10, 8))
        for i, color in enumerate(['blue', 'red', 'orange', 'green']):
            indices = [j for j, c in enumerate(colors) if c == color]
            plt.scatter(X_tsne[indices, 0], X_tsne[indices, 1], c=color, label=color, alpha=0.5, edgecolors='w', s=50)

        plt.title('t-SNE of Word Embeddings with Classification Results')
        plt.legend(loc='best')
        plt.show()

In [None]:
# Comparaison of weights for different model


weights_kb = comprehension_model.get_model_weights(model_kb)
weights_finetuned = comprehension_model.get_model_weights(model_hugging_face)

weight_diffs = {}
for key in weights_kb.keys():
    weight_diffs[key] = weights_finetuned[key] - weights_kb[key]
    if (np.linalg.norm(weight_diffs[key])/weight_diffs[key].size) > 0.04 :
        print(key)
    #     print(np.linalg.norm(weight_diffs[key],2))
    #     print(weight_diffs[key].size)
    #     print( np.linalg.norm(weight_diffs[key])/weight_diffs[key].size)
    

weight_diffs["cls.predictions.decoder.bias"] = model_hugging_face.cls.predictions.decoder.bias.detach().cpu().numpy() - model_kb.cls.predictions.decoder.bias.detach().cpu().numpy()
weight_diffs["cls.predictions.decoder.weight"] = model_hugging_face.cls.predictions.decoder.weight.detach().cpu().numpy() - model_kb.cls.predictions.decoder.weight.detach().cpu().numpy()
norms = [(np.linalg.norm(weight_diffs[key])/weight_diffs[key].size) for key in weight_diffs.keys()]

plt.figure(figsize=(10, 8))
plt.bar(range(len(norms)), norms, tick_label=list(weight_diffs.keys()))
plt.xticks(rotation=90)
plt.ylabel('Frobenius Norm of Weight Differences')
plt.title('Comparison of Weight Changes in BERT Layers')
plt.show()  

In [None]:
model_hugging_face.state_dict()["bert.encoder.layer.0.attention.self.key.weight"].shape

In [None]:
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages

with PdfPages('weight_distribution_scratch.pdf') as pdf:
    for name,param in mosaicBert.named_parameters():
        layer_name = name
        print(name)
        split_name =name.split('.')
        layer = split_name[3]
        print(layer)
        #comprehension_model.plot_weight_distributions(model_hugging_face, model_kb, layer_name)
        if "attention.self.Wqkv"  in name and "weight" in name:
            weights2 = model_kb.state_dict()[f"bert.encoder.layer.{str(layer)}.attention.self.query.{split_name[-1]}"].flatten().cpu().numpy()
            weights1 = mosaicBert.state_dict()[layer_name][:768,:].flatten().cpu().numpy()
            plt.figure(figsize=(10, 5))
            plt.hist(weights1, bins=100, alpha=0.5, label='finetuned Model',density=True)
            plt.hist(weights2, bins=100, alpha=0.5, label='Baseline Model',density=True)
            plt.title(f"Weight Distribution Comparison for {layer_name}")
            plt.xlabel("Weight values")
            plt.ylabel("Frequency")
            plt.legend()
            pdf.savefig()
            plt.show()
            plt.close()
            
            weights2 = model_kb.state_dict()[f"bert.encoder.layer.{str(layer)}.attention.self.key.{split_name[-1]}"].flatten().cpu().numpy()
            weights1 = mosaicBert.state_dict()[layer_name][768:1536,:].flatten().cpu().numpy()
            plt.figure(figsize=(10, 5))
            plt.hist(weights1, bins=100, alpha=0.5, label='finetuned Model',density=True)
            plt.hist(weights2, bins=100, alpha=0.5, label='Baseline Model',density=True)
            plt.title(f"Weight Distribution Comparison for {layer_name}")
            plt.xlabel("Weight values")
            plt.ylabel("Frequency")
            plt.legend()
            pdf.savefig()
            plt.show()
            plt.close()
            
            weights2 = model_kb.state_dict()[f"bert.encoder.layer.{str(layer)}.attention.self.value.{split_name[-1]}"].flatten().cpu().numpy()
            weights1 = mosaicBert.state_dict()[layer_name][1536:,:].flatten().cpu().numpy()
            plt.figure(figsize=(10, 5))
            plt.hist(weights1, bins=100, alpha=0.5, label='finetuned Model',density=True)
            plt.hist(weights2, bins=100, alpha=0.5, label='Baseline Model',density=True)
            plt.title(f"Weight Distribution Comparison for {layer_name}")
            plt.xlabel("Weight values")
            plt.ylabel("Frequency")
            plt.legend()
            pdf.savefig()
            plt.show()
            plt.close()
        elif  "attention.self.Wqkv" in name and "bias" in name :
            weights2 = model_kb.state_dict()[f"bert.encoder.layer.{str(layer)}.attention.self.query.{split_name[-1]}"].flatten().cpu().numpy()
            weights1 = mosaicBert.state_dict()[layer_name][:768].flatten().cpu().numpy()
            plt.figure(figsize=(10, 5))
            plt.hist(weights1, bins=100, alpha=0.5, label='finetuned Model',density=True)
            plt.hist(weights2, bins=100, alpha=0.5, label='Baseline Model',density=True)
            plt.title(f"Weight Distribution Comparison for {layer_name}")
            plt.xlabel("Weight values")
            plt.ylabel("Frequency")
            plt.legend()
            pdf.savefig()
            plt.show()
            plt.close()
            
            weights2 = model_kb.state_dict()[f"bert.encoder.layer.{str(layer)}.attention.self.key.{split_name[-1]}"].flatten().cpu().numpy()
            weights1 = mosaicBert.state_dict()[layer_name][768:1536].flatten().cpu().numpy()
            plt.figure(figsize=(10, 5))
            plt.hist(weights1, bins=100, alpha=0.5, label='finetuned Model',density=True)
            plt.hist(weights2, bins=100, alpha=0.5, label='Baseline Model',density=True)
            plt.title(f"Weight Distribution Comparison for {layer_name}")
            plt.xlabel("Weight values")
            plt.ylabel("Frequency")
            plt.legend()
            pdf.savefig()
            plt.show()
            plt.close()
            
            weights2 = model_kb.state_dict()[f"bert.encoder.layer.{str(layer)}.attention.self.value.{split_name[-1]}"].flatten().cpu().numpy()
            weights1 = mosaicBert.state_dict()[layer_name][1536:].flatten().cpu().numpy()
            plt.figure(figsize=(10, 5))
            plt.hist(weights1, bins=100, alpha=0.5, label='finetuned Model',density=True)
            plt.hist(weights2, bins=100, alpha=0.5, label='Baseline Model',density=True)
            plt.title(f"Weight Distribution Comparison for {layer_name}")
            plt.xlabel("Weight values")
            plt.ylabel("Frequency")
            plt.legend()
            pdf.savefig()
            plt.show()
            plt.close() 
        elif "mlp.gated_layers"  in name :
            
            weights2 = model_kb.state_dict()[f"bert.encoder.layer.{str(layer)}.intermediate.dense.{split_name[-1]}"].flatten().cpu().numpy()
            weights1 = mosaicBert.state_dict()[layer_name].flatten().cpu().numpy()
            plt.figure(figsize=(10, 5))
            plt.hist(weights1, bins=100, alpha=0.5, label='finetuned Model',density=True)
            plt.hist(weights2, bins=100, alpha=0.5, label='Baseline Model',density=True)
            plt.title(f"Weight Distribution Comparison for {layer_name}")
            plt.xlabel("Weight values")
            plt.ylabel("Frequency")
            plt.legend()
            pdf.savefig()
            plt.show()
            plt.close()
            
        elif "mlp.wo" in name :
            weights2 = model_kb.state_dict()[f"bert.encoder.layer.{str(layer)}.output.dense.{split_name[-1]}"].flatten().cpu().numpy()
            weights1 = mosaicBert.state_dict()[layer_name].flatten().cpu().numpy()
            plt.figure(figsize=(10, 5))
            plt.hist(weights1, bins=100, alpha=0.5, label='finetuned Model',density=True)
            plt.hist(weights2, bins=100, alpha=0.5, label='Baseline Model',density=True)
            plt.title(f"Weight Distribution Comparison for {layer_name}")
            plt.xlabel("Weight values")
            plt.ylabel("Frequency")
            plt.legend()
            pdf.savefig()
            plt.show()
            plt.close()
            
        elif "mlp.layernorm"  in name :
            
            weights2 = model_kb.state_dict()[f"bert.encoder.layer.{str(layer)}.output.LayerNorm.{split_name[-1]}"].flatten().cpu().numpy()
            weights1 = mosaicBert.state_dict()[layer_name].flatten().cpu().numpy()
            plt.figure(figsize=(10, 5))
            plt.hist(weights1, bins=100, alpha=0.5, label='finetuned Model',density=True)
            plt.hist(weights2, bins=100, alpha=0.5, label='Baseline Model',density=True)
            plt.title(f"Weight Distribution Comparison for {layer_name}")
            plt.xlabel("Weight values")
            plt.ylabel("Frequency")
            plt.legend()
            pdf.savefig()
            plt.show()
            plt.close()
            
        elif "cls.predictions.decoder" in name:
            continue
            
        else :
            weights1 = mosaicBert.state_dict()[layer_name].flatten().cpu().numpy()
            weights2 = model_kb.state_dict()[layer_name].flatten().cpu().numpy()
            plt.figure(figsize=(10, 5))
            plt.hist(weights1, bins=100, alpha=0.5, label='finetuned Model',density=True)
            plt.hist(weights2, bins=100, alpha=0.5, label='Baseline Model',density=True)
            plt.title(f"Weight Distribution Comparison for {layer_name}")
            plt.xlabel("Weight values")
            plt.ylabel("Frequency")
            plt.legend()
            pdf.savefig()
            plt.show()
            plt.close()

In [None]:
import torch.nn.functional as F
print(hidden_states1[0].shape)
i=8
hidden_states1,logits1=get_embeddings_bis(model_kb,valid_sentence_filtered [i],tokenizer)
hidden_states2,logits2=get_embeddings_bis(model_hugging_face,valid_sentence_filtered [i],tokenizer)
#masked_positions =[idx for idx, token in enumerate(valid_filtered_dataset[i]['input_ids']) if token == tokenizer.mask_token_id]
index = valid_sentence_filtered [i]['labels'].index(token_id)
print(token_id)
print(tokenizer.decode(valid_sentence_filtered[i]['input_ids']))
print(tokenizer.decode(torch.argmax(F.softmax(logits1.squeeze()[index], dim=-1)).item()))
print(tokenizer.decode(torch.argmax(F.softmax(logits2.squeeze()[index], dim=-1)).item()))
for j in range(len(hidden_states1)) :
    print(tokenizer.decode((valid_sentence_filtered[i]['labels'][index])))
    print('hidden layer ',j)
    plt.figure(figsize=(10,6))
   #plt.hist(hidden_states1[j][0][index].detach().cpu().numpy(), bins=100, alpha=0.5, label='Baseline Model')
    plt.hist(hidden_states1[j][0][0].detach().cpu().numpy(), bins=100, alpha=0.5, label='Baseline Model cls')
    #plt.hist(hidden_states2[j][0][index].detach().cpu().numpy(), bins=100, alpha=0.5, label='Fine-tuned Model')
    plt.hist(hidden_states2[j][0][0].detach().cpu().numpy(), bins=100, alpha=0.5, label='finetuned Model cls')
    plt.xlabel('weight')
    plt.ylabel('frequency')
    plt.legend()
    plt.show()

In [None]:
import numpy as np 
checkpoint_directory = "/home/laurinemeier/swerick/finetuning/finetuning_hugging_whitespace-finetuned-imdb"
checkpoint_files = os.listdir(checkpoint_directory)
checkpoint_files.sort(key=lambda x: int(re.search(r'checkpoint-(\d+)', x).group(1)))
selected_checkpoints = [checkpoint_files[i] for i in range(0, len(checkpoint_files), 10)]
weight1 = model_kb.state_dict()["bert.embeddings.word_embeddings.weight"].flatten().cpu().numpy()
print("std kb", np.std(weight1))
for name in selected_checkpoints :
    print(name)
    model_hugging =AutoModelForMaskedLM.from_pretrained(checkpoint_directory + '/'+name)
    weights2 = model_hugging.state_dict()["bert.embeddings.word_embeddings.weight"].flatten().cpu().numpy()
    print(np.std(weights2))
    model_hugging.to(device)
    comprehension_model.plot_weight_distributions(model_hugging, model_kb, "bert.embeddings.word_embeddings.weight")

In [None]:
hidden_states1 = comprehension_model.get_embeddings(model_kb, small_valid_dataloader, tokenizer)
hidden_states2 = comprehension_model.get_embeddings(model_hugging_face, small_valid_dataloader, tokenizer)

for i in range(len(hidden_states1)):
        plt.figure(figsize=(10, 5))
        plt.hist(hidden_states1[i].flatten(), bins=100, alpha=0.5, label='Baseline Model')
        plt.hist(hidden_states2[i].flatten(), bins=100, alpha=0.5, label='Fine-tuned Model')
        plt.title(f"Hidden States Distribution Comparison for Layer {i}")
        plt.xlabel("Hidden States Values")
        plt.ylabel("Frequency")
        plt.legend()
        plt.show()

In [None]:
#Evolution of a specific layer through epochs
checkpoint_directory = 'finetuning/finetuning_hugging_whitespace-finetuned-imdb'
comprehension_model.evolution_specific_layer_weight(chekpoint_directory)

Study of Word Embedding

In [None]:

def plot_results(mean_similarities,x_label='Layer Number',y_label='Average Cosine Similarity',title='Average Layer-wise Cosine Similarity between hidden_states Across Validation Dataset'):
    plt.figure(figsize=(10, 5))
    plt.plot(range(len(mean_similarities)), mean_similarities, marker='o', linestyle='-', color='b')
    plt.xlabel(x_label)
    plt.ylabel(y_label)
    plt.title(title)
    plt.grid(True)
    plt.show()

mean_similarities_hidden_states,mean_similarities_attention,diff_tot = comprehension_model.extract_and_compare_activations(model_kb, model_hugging_face, valid_filtered_dataloader,token_id)
print("Layer-wise cosine similarities:", mean_similarities_hidden_states)


plot_results(mean_similarities_hidden_states)
plot_results(mean_similarities_attention,'Attention Layer Number',title='Average Attention Layer-wise Cosine Similarity between Attention values Across Validation Dataset')
plot_results(diff_tot,'Layer Number',title='Average Norm difference between hidden states  Across Validation Dataset for token {words}')



In [None]:
import numpy as np
import torch
import matplotlib.pyplot as plt


def cosine_similarity(tensor1, tensor2):
    # Ensure tensors are flattened (1D) to compute vector cosine similarity
    tensor1_flat = tensor1.view(-1)
    tensor2_flat = tensor2.view(-1)
    cos_sim = torch.nn.functional.cosine_similarity(tensor1_flat.unsqueeze(0), tensor2_flat.unsqueeze(0))
    return cos_sim.item()


# Define the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def plot_results(similarities, label, x_label='Layer Number', y_label='Average Cosine Similarity', title='Average Layer-wise Cosine Similarity between hidden_states Across Validation Dataset'):
    plt.figure(figsize=(10, 5))
    plt.plot(range(len(similarities)), similarities, marker='o', linestyle='-', color='b')
    plt.xlabel(x_label)
    plt.ylabel(y_label)
    plt.title(title)
    plt.grid(True)
    plt.show()

def compare_ffn_contributions(model_pre, model_post, dataloader):
    similarities_pre = []
    similarities_post = []
    
    for batch in dataloader:
        batch = {k: batch[k].to(device) for k in batch.keys()}
        
        with torch.no_grad():
            pre_output = model_pre(**batch, output_hidden_states=True)
            pre_activations = pre_output.hidden_states
            post_output = model_post(**batch, output_hidden_states=True)
            post_activations = post_output.hidden_states
    
        pre_contribution = [cosine_similarity(pre_activations[layer+1].cpu(), pre_activations[layer].cpu()) for layer in range(len(pre_activations)-1)]
        post_contribution = [cosine_similarity(post_activations[layer+1].cpu(), post_activations[layer].cpu()) for layer in range(len(post_activations)-1)]

        similarities_pre.append(pre_contribution)
        similarities_post.append(post_contribution)
        
        del pre_activations
        del post_activations
    
    similarities_pre = np.mean(np.array(similarities_pre), axis=0)
    similarities_post = np.mean(np.array(similarities_post), axis=0)
    
    return similarities_pre, similarities_post

# Example usage
mean_similarity_pr, mean_similarity_post = compare_ffn_contributions(model_kb, model_hugging_face, small_valid_dataloader)

plot_results(mean_similarity_pr, label='Layer n+1 - Layer n', x_label='Layer n+1 - Layer n', y_label='Average Cosine Similarity', title='Average Layer-wise Cosine Similarity for Pre Model')
plot_results(mean_similarity_post, label='Layer n+1 - Layer n', x_label='Layer n+1 - Layer n', y_label='Average Cosine Similarity', title='Average Layer-wise Cosine Similarity for Post Model')


In [None]:
# Cosine similarity between layers
from datasets import Dataset
import torch
import numpy as np




def extract_and_compare_feed_forward_weights(model_pre, model_post, dataloader):
    similarities_attention = {}
    similarities_query = {}
    similarities_key = {}
    similarities_value = {}
    for (name_base, param_base), (name_fine, param_fine) in zip(model_kb.named_parameters(), model_hugging_face.named_parameters()):
        if "cls.predictions.transform.dense.weight" in name_base :
            sim = cosine_similarity(param_base, param_fine)
            similarities_attention[name_base]=sim
            print(f"{name_base} - Cosine Similarity: {sim}")
        if  "cls.predictions.transform.dense.bias" in name_base:
            sim = cosine_similarity(param_base, param_fine)
            similarities_query[name_base]=sim
        if  "attention.self.key.bias" in name_base:
            sim = cosine_similarity(param_base, param_fine)
            similarities_key[name_base]=sim
        if  "attention.self.value.bias" in name_base:
            sim = cosine_similarity(param_base, param_fine)
            similarities_value[name_base]=sim
            print(f"{name_base} - Cosine Similarity: {sim}")

    return similarities_attention,similarities_query,similarities_key,similarities_value
       
def cosine_similarity(tensor1, tensor2):
    # Ensure tensors are flattened (1D) to compute vector cosine similarity
    tensor1_flat = tensor1.view(-1)
    tensor2_flat = tensor2.view(-1)
    cos_sim = torch.nn.functional.cosine_similarity(tensor1_flat.unsqueeze(0), tensor2_flat.unsqueeze(0))
    return cos_sim.item()



def plot_results(similarities,label,x_label='Layer Number',y_label='Average Cosine Similarity',title='Average Layer-wise Cosine Similarity between hidden_states Across Validation Dataset'):
    plt.figure(figsize=(10, 5))
    plt.plot(range(len(similarities)), similarities, marker='o', linestyle='-', color='b',)
    plt.xlabel(label)
    plt.ylabel(y_label)
    plt.title(title)
    plt.grid(True)
    plt.show()

mean_similarities_attention,mean_similarities_query,mean_similarities_key,mean_similarities_value = extract_and_compare_feed_forward_weights(model_kb, model_hugging_face, valid_dataloader)
similarity_attention=[mean_similarities_attention[i] for i in mean_similarities_attention.keys()]
similarity_query=[mean_similarities_query[i] for i in mean_similarities_query.keys()]
similarity_key=[mean_similarities_key[i] for i in mean_similarities_key.keys()]
similarity_value=[mean_similarities_value[i] for i in mean_similarities_value.keys()]



plot_results(similarity_attention,label = mean_similarities_attention.keys(),title='layer wise cosine similarity between weights for attention.output.dense')
plot_results(similarity_query,label = mean_similarities_query.keys(),title='layer wise cosine similarity between weights for attention.self.query')
plot_results(similarity_key,label = mean_similarities_key.keys(),title='layer wise cosine similarity between weights for attention.self.key')
plot_results(similarity_value,label = mean_similarities_value.keys(),title='layer wise cosine similarity between weights for attention.self.value')


In [None]:
def get_embeddings(text,model):

    inputs = tokenizer(text, return_tensors="pt", padding="max_length", truncation=True, max_length=512)
    with torch.no_grad():
        outputs = model(**inputs,output_hidden_states=True)
        
    embeddings = outputs.hidden_states
    return embeddings

In [None]:
embeddings_train = [get_embeddings(phrase,model_kb) for phrase in date_dataset["train"]["content"][:5]]
print(len(embeddings_train))
print(len(embeddings_train[0]))
embeddings_test = [get_embeddings(phrase,model_kb) for phrase in date_dataset["test"]["content"][:5]]

In [None]:
#Edge probing : predicting noun  
import numpy as np 
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
        
def extract_and_classify(dataset, model,length):
    layer_accuracies = []
    with torch.no_grad():
        for layer_index in range(model.config.num_hidden_layers + 1):  # Include the embedding layer
            embeddings_train = [get_embeddings(phrase,model_kb) for phrase in dataset["train"]["content"][:length]]
            train_embeddings =[sentence[layer_index] for sentence in embeddings_train] 
            train_labels = date_dataset["train"]["reform_label"][:length]
            print(train_embeddings)
            print(len(train_labels))
            embeddings_test = [get_embeddings(phrase,model_kb) for phrase in dataset["test"]["content"][:length]]
            test_embeddings =[sentence[layer_index] for sentence in embeddings_test] 
            testlabels = date_dataset["test"]["reform_label"][:length]
    
            clf = LogisticRegression()
            clf.fit(train_embeddings, train_labels)
            y_pred = clf.predict(test_embeddings)

            # Calculate accuracy
            accuracy = accuracy_score(test_labels, y_pred)
            layer_accuracies.append((layer_index, accuracy))
    
    return layer_accuracies


accuracies = extract_and_classify(date_dataset, model_kb,5)

# Output the accuracies for each layer
for layer, acc in accuracies:
    print(f"Layer {layer}: Accuracy {acc}")




In [None]:
train_embeddings =[sentence[1] for sentence in embeddings_train] 
len(train_embeddings[0][0][0])

In [None]:
input = date_dataset["train"]["content"][0]
input = tokenizer(input,return_tensors="pt", padding="max_length", truncation=True, max_length=512)
data_collator = preprocessing.data_collector_masking(tokenizer,0.15)
input=data_collator([input])
collated_inputs = {key: value.squeeze(1) for key, value in input.items()}
output =model_kb(collated_inputs["input_ids"],attention_mask=collated_inputs["attention_mask"],labels =collated_inputs["labels"],output_hidden_states=True)
hidden_states = output.hidden_states



In [None]:
def extract_features(examples,model):
  # take a batch of images
  images = examples['content']
  images = tokenizer(images,return_tensors="pt", padding="max_length", truncation=True, max_length=512)
  input=data_collator([images])
  collated_inputs = {key: value.squeeze(1) for key, value in input.items()}
  with torch.no_grad():
    output =model(collated_inputs["input_ids"],attention_mask=collated_inputs["attention_mask"],labels =collated_inputs["labels"],output_hidden_states=True)
  hidden_states = output.hidden_states
  # add features of each layer
  for i in range(len(hidden_states)):
      features = torch.mean(hidden_states[i], dim=1)
      examples[f'features_{i}'] = features.cpu().detach().numpy()
  
  return examples

In [None]:
encoded_dataset_train_bis=Dataset.from_dict(date_dataset["train"][100:150]).map(lambda example :extract_features(example,model_hugging_face), batched=False)

In [None]:
encoded_dataset_test_bis = Dataset.from_dict(date_dataset["test"][:100]).map(lambda example :extract_features(example,model_hugging_face), batched=False)

In [None]:
encoded_dataset_test['features_4']==encoded_dataset_test_bis['features_4']

In [None]:
from sklearn.linear_model import LogisticRegression
from tqdm.notebook import tqdm
from sklearn.metrics import f1_score
def scores_linear_prob(train_dataset,test_dataset):
    train_dataset = train_dataset
    test_dataset = test_dataset

    scores = dict()
    for i in range(model_kb.config.num_hidden_layers + 1):
        train_features = torch.Tensor(train_dataset[f'features_{i}']).squeeze(1)
        test_features = torch.Tensor(test_dataset[f'features_{i}']).squeeze(1)
        lr_clf = LogisticRegression()
        lr_clf.fit(train_features, train_dataset['reform_label'])
        # compute accuracy on training + test set
        #training_score = lr_clf.score(train_features, train_dataset['reform_label'])
        #test_score = lr_clf.score(test_features, test_dataset['reform_label'])
        #scores[f'features_{i}'] = (training_score, test_score)

        train_preds = lr_clf.predict(train_features)
        test_preds = lr_clf.predict(test_features)
        training_f1 = f1_score(train_dataset['reform_label'], train_preds, average='macro')
        test_f1 = f1_score(test_dataset['reform_label'], test_preds, average='macro')
        
        scores[f'features_{i}'] = (training_f1, test_f1)
        
    return scores
