In [None]:
!pip install captum

In [None]:
!pip install ace_tools

In [None]:
import argparse
import os
import random
import warnings
import tarfile
import gdown

import numpy as np
import pandas as pd
from tqdm import tqdm
from dataclasses import dataclass

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset, Sampler
from torch.utils.tensorboard import SummaryWriter
import torch.nn.functional as F

import matplotlib.font_manager as fm
import matplotlib.pyplot as plt
from matplotlib.collections import QuadMesh
import seaborn as sn

import nltk
from nltk.corpus import stopwords
from nltk.stem import PorterStemmer
from nltk.tokenize import word_tokenize

from sklearn.model_selection import train_test_split

from transformers import BertModel, BertForSequenceClassification
from transformers import AdamW, get_linear_schedule_with_warmup

In [None]:
torch.cuda.empty_cache()

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from transformers import BertTokenizer, BertModel
from captum.attr import LayerIntegratedGradients, visualization as viz
import joblib

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Logistic Regression Classifier from the image
class LogisticRegression(nn.Module):
    def __init__(self, in_dim, hid_dim, out_dim, dropout=0):
        super().__init__()
        print(f'Logistic Regression classifier of dim ({in_dim} {hid_dim} {out_dim})')

        self.nn = nn.Sequential(
            nn.Dropout(p=dropout),
            nn.Linear(in_dim, hid_dim, bias=True),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Dropout(p=dropout),
            nn.Linear(hid_dim, out_dim, bias=True),
        )

    def forward(self, x, return_feat=False):
        out = self.nn(x)
        if return_feat:
            return out, x
        return out


# BertClassifier class from the image
class BertClassifier(nn.Module):
    FEAT_LEN = 768

    def __init__(self, raw_bert, classifier):
        super().__init__()
        self.bert = raw_bert
        self.fc = classifier

    def forward(self, input_ids, attention_mask=None):
        # BERT model forward pass
        feature = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        # print(feature.last_hidden_state.shape)
        out = self.fc(feature.last_hidden_state.flatten(1))  # Flatten [CLS] token representation
        return out, feature.last_hidden_state.flatten(1)

# Load the pre-trained BERT model
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
extractor = BertModel.from_pretrained('bert-base-cased')
ngpus, dropout = torch.cuda.device_count(), 0.35
num_tokens, hidden_dim, out_dim = 256, 512, 100  # Adjust out_dim as necessary

# Use the LogisticRegression and BertClassifier from the image
classifier = LogisticRegression(768 * num_tokens, hidden_dim, out_dim, dropout=dropout)
model = BertClassifier(extractor, classifier)

# Load the model weights (assuming they are for the updated model)
model_path = '/content/drive/MyDrive/msc_project/model/contrastive/club_try2/style_encoder_supcon_9.pt'
model.load_state_dict(torch.load(model_path))

model.to(device)
model.eval()

# Load the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')

# Define the forward function for prediction
def predict(inputs, attention_mask=None):
    output, _ = model(inputs, attention_mask=attention_mask)
    return output

# Forward function for classification used by LayerIntegratedGradients
def classify_forward_func(inputs, attention_mask=None):
    logits = predict(inputs, attention_mask=attention_mask)
    return logits

# Reference token IDs
ref_token_id = tokenizer.pad_token_id
sep_token_id = tokenizer.sep_token_id
cls_token_id = tokenizer.cls_token_id

# def construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id):
#     text_ids = tokenizer.encode(text, add_special_tokens=False)
#     input_ids = [cls_token_id] + text_ids + [sep_token_id]
#     ref_input_ids = [cls_token_id] + [ref_token_id] * len(text_ids) + [sep_token_id]
#     return torch.tensor([input_ids], device=device), torch.tensor([ref_input_ids], device=device)

# def construct_attention_mask(input_ids):
#     return torch.ones_like(input_ids)

def construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id, max_length=256):
    # Tokenize the text with padding
    inputs = tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=max_length,
        padding='max_length',  # Pad to max length
        truncation=True,
        return_tensors="pt",
        return_attention_mask=True
    )

    # Get input_ids and attention_mask
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)

    # Create reference input filled with the pad token but with [CLS] and [SEP] tokens in their places
    ref_input_ids = input_ids.clone()
    ref_input_ids[:, 1:-1] = ref_token_id  # Fill everything except [CLS] and [SEP] with the pad token
    ref_input_ids[:, 0] = cls_token_id  # [CLS] at the beginning
    ref_input_ids[:, -1] = sep_token_id  # [SEP] at the end

    return input_ids, ref_input_ids, attention_mask


# Load the test dataframe
nlp_test = pd.read_csv('/content/drive/MyDrive/msc_project/data/diffusiondb/paraphrased/test_random100_label_1.csv')
nlp_test = nlp_test[['prompt', 'user_name']]
nlp_test.columns = ['content', 'Target']
# encoder_path = '/content/drive/MyDrive/label_encoder_80.pkl'
# label_encoder = joblib.load(encoder_path)

delta_all = []
attributions_sum_all = []

# Run Layer Integrated Gradients on a sample
for idx, row in tqdm(nlp_test.iterrows(), total=len(nlp_test)):
    # if idx == 10:
    #     break
    text = row['content']
    label = row['Target']
    # label_encoded = label_encoder.transform([label])

    input_ids, ref_input_ids, attention_mask = construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id)
    # attention_mask = construct_attention_mask(input_ids)

    # Get predictions
    logits = predict(input_ids, attention_mask=attention_mask)

    # Target label index
    target_label_idx = torch.argmax(logits).item()

    # Layer Integrated Gradients
    lig = LayerIntegratedGradients(classify_forward_func, model.bert.embeddings)

    # Compute attributions with respect to the BERT embeddings
    attributions, delta = lig.attribute(
        inputs=input_ids.long(),
        baselines=ref_input_ids,
        additional_forward_args=(attention_mask,),
        target=target_label_idx,
        return_convergence_delta=True
    )

    # Summarize attributions
    def summarize_attributions(attributions):
        attributions = attributions.sum(dim=-1).squeeze(0)
        attributions = attributions / torch.norm(attributions)
        return attributions

    attributions_sum = summarize_attributions(attributions)
    # print(attributions_sum.shape)

    delta_all.append(delta)


    def remove_pad_tokens(tokens, attributions, pad_token_id):
        filtered_tokens = []
        filtered_attributions = []
        for token, attr in zip(tokens, attributions):
            if token != tokenizer.pad_token:  # Check if the token is not a pad token
                filtered_tokens.append(token)
                filtered_attributions.append(attr)  # Convert tensor to value
        return filtered_tokens, filtered_attributions

    # Visualize
    indices = input_ids[0].detach().tolist()
    all_tokens = tokenizer.convert_ids_to_tokens(indices)

    filtered_tokens, filtered_attributions = remove_pad_tokens(all_tokens, attributions_sum.tolist(), tokenizer.pad_token_id)
    attributions_sum_all.append(filtered_attributions)

    # Create a visualization record without pad tokens
    classification_vis = viz.VisualizationDataRecord(
        torch.tensor(filtered_attributions),
        torch.max(torch.softmax(logits[0], dim=0)),
        torch.argmax(logits),
        label,
        str(label),
        torch.tensor(filtered_attributions).sum(),
        filtered_tokens,
        delta
    )
    # classification_vis = viz.VisualizationDataRecord(
    #     attributions_sum,
    #     torch.max(torch.softmax(logits[0], dim=0)),
    #     torch.argmax(logits),
    #     label_encoded,
    #     str(label_encoded),
    #     attributions_sum.sum(),
    #     all_tokens,
    #     delta
    # )

    print(f'\033[1mVisualization For Sample {idx}\033[0m')
    viz.visualize_text([classification_vis])
    print()


In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from transformers import BertTokenizer, BertModel
from captum.attr import LayerIntegratedGradients, visualization as viz
import joblib

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Logistic Regression Classifier from the image
class LogisticRegression(nn.Module):
    def __init__(self, in_dim, hid_dim, out_dim, dropout=0):
        super().__init__()
        print(f'Logistic Regression classifier of dim ({in_dim} {hid_dim} {out_dim})')

        self.nn = nn.Sequential(
            nn.Dropout(p=dropout),
            nn.Linear(in_dim, hid_dim, bias=True),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Dropout(p=dropout),
            nn.Linear(hid_dim, out_dim, bias=True),
        )

    def forward(self, x, return_feat=False):
        out = self.nn(x)
        if return_feat:
            return out, x
        return out


# BertClassifier class from the image
class BertClassifier(nn.Module):
    FEAT_LEN = 768

    def __init__(self, raw_bert, classifier):
        super().__init__()
        self.bert = raw_bert
        self.fc = classifier

    def forward(self, input_ids, attention_mask=None):
        # BERT model forward pass
        feature = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        # print(feature.last_hidden_state.shape)
        out = self.fc(feature.last_hidden_state.flatten(1))  # Flatten [CLS] token representation
        return out, feature.last_hidden_state.flatten(1)

# Load the pre-trained BERT model
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
extractor = BertModel.from_pretrained('bert-base-cased')
ngpus, dropout = torch.cuda.device_count(), 0.35
num_tokens, hidden_dim, out_dim = 256, 512, 100  # Adjust out_dim as necessary

# Use the LogisticRegression and BertClassifier from the image
classifier = LogisticRegression(768 * num_tokens, hidden_dim, out_dim, dropout=dropout)
model = BertClassifier(extractor, classifier)

# Load the model weights (assuming they are for the updated model)
model_path = '/content/drive/MyDrive/msc_project/model/contrastive/club_try2/style_encoder_supcon_9.pt'
model.load_state_dict(torch.load(model_path))

model.to(device)
model.eval()

# Load the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')

# Define the forward function for prediction
def predict(inputs, attention_mask=None):
    output, _ = model(inputs, attention_mask=attention_mask)
    return output

# Forward function for classification used by LayerIntegratedGradients
def classify_forward_func(inputs, attention_mask=None):
    logits = predict(inputs, attention_mask=attention_mask)
    return logits

# Reference token IDs
ref_token_id = tokenizer.pad_token_id
sep_token_id = tokenizer.sep_token_id
cls_token_id = tokenizer.cls_token_id

# def construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id):
#     text_ids = tokenizer.encode(text, add_special_tokens=False)
#     input_ids = [cls_token_id] + text_ids + [sep_token_id]
#     ref_input_ids = [cls_token_id] + [ref_token_id] * len(text_ids) + [sep_token_id]
#     return torch.tensor([input_ids], device=device), torch.tensor([ref_input_ids], device=device)

# def construct_attention_mask(input_ids):
#     return torch.ones_like(input_ids)

def construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id, max_length=256):
    # Tokenize the text with padding
    inputs = tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=max_length,
        padding='max_length',  # Pad to max length
        truncation=True,
        return_tensors="pt",
        return_attention_mask=True
    )

    # Get input_ids and attention_mask
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)

    # Create reference input filled with the pad token but with [CLS] and [SEP] tokens in their places
    ref_input_ids = input_ids.clone()
    ref_input_ids[:, 1:-1] = ref_token_id  # Fill everything except [CLS] and [SEP] with the pad token
    ref_input_ids[:, 0] = cls_token_id  # [CLS] at the beginning
    ref_input_ids[:, -1] = sep_token_id  # [SEP] at the end

    return input_ids, ref_input_ids, attention_mask


# Load the test dataframe
nlp_test = pd.read_csv('/content/drive/MyDrive/msc_project/data/diffusiondb/paraphrased/test_random100_label_1.csv')
nlp_test = nlp_test[['prompt', 'user_name']]
nlp_test.columns = ['content', 'Target']
# encoder_path = '/content/drive/MyDrive/label_encoder_80.pkl'
# label_encoder = joblib.load(encoder_path)

delta_all = []
attributions_sum_all = []

# Run Layer Integrated Gradients on a sample
for idx, row in tqdm(nlp_test.iterrows(), total=len(nlp_test)):
    text = row['content']
    label = row['Target']

    input_ids, ref_input_ids, attention_mask = construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id)

    # Get predictions
    logits = predict(input_ids, attention_mask=attention_mask)

    # Target label index
    target_label_idx = torch.argmax(logits).item()

    # Layer Integrated Gradients
    lig = LayerIntegratedGradients(classify_forward_func, model.bert.embeddings)

    # Compute attributions with respect to the BERT embeddings
    attributions, delta = lig.attribute(
        inputs=input_ids.long(),
        baselines=ref_input_ids,
        additional_forward_args=(attention_mask,),
        target=target_label_idx,
        return_convergence_delta=True
    )

    # Summarize attributions
    def summarize_attributions(attributions):
        attributions = attributions.sum(dim=-1).squeeze(0)
        attributions = attributions / torch.norm(attributions)
        return attributions

    attributions_sum = summarize_attributions(attributions)

    # Move delta to CPU and detach it
    delta_cpu = delta.cpu().detach().numpy()
    delta_all.append(delta_cpu)

    def remove_pad_tokens(tokens, attributions, pad_token_id):
        filtered_tokens = []
        filtered_attributions = []
        for token, attr in zip(tokens, attributions):
            if token != tokenizer.pad_token:  # Check if the token is not a pad token
                filtered_tokens.append(token)
                filtered_attributions.append(attr)  # Convert tensor to value
        return filtered_tokens, filtered_attributions

    # Convert tokens and attributions to CPU-friendly format
    indices = input_ids[0].detach().tolist()
    all_tokens = tokenizer.convert_ids_to_tokens(indices)

    filtered_tokens, filtered_attributions = remove_pad_tokens(all_tokens, attributions_sum.tolist(), tokenizer.pad_token_id)

    # Convert filtered_attributions to CPU-friendly format
    filtered_attributions = torch.tensor(filtered_attributions).cpu().detach().numpy().tolist()
    attributions_sum_all.append(filtered_attributions)

    # Create a visualization record without pad tokens
    classification_vis = viz.VisualizationDataRecord(
        torch.tensor(filtered_attributions),
        torch.max(torch.softmax(logits[0], dim=0)),
        torch.argmax(logits),
        label,
        str(label),
        torch.tensor(filtered_attributions).sum(),
        filtered_tokens,
        delta_cpu
    )

    print(f'\033[1mVisualization For Sample {idx}\033[0m')
    viz.visualize_text([classification_vis])
    print()

# Save the delta and attributions_sum_all locally
import joblib

joblib.dump(delta_all, 'delta_all.pkl')
joblib.dump(attributions_sum_all, 'attributions_sum_all.pkl')



In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from transformers import BertTokenizer, BertModel
from captum.attr import LayerIntegratedGradients, visualization as viz
import joblib

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Logistic Regression Classifier from the image
class LogisticRegression(nn.Module):
    def __init__(self, in_dim, hid_dim, out_dim, dropout=0):
        super().__init__()
        print(f'Logistic Regression classifier of dim ({in_dim} {hid_dim} {out_dim})')

        self.nn = nn.Sequential(
            nn.Dropout(p=dropout),
            nn.Linear(in_dim, hid_dim, bias=True),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Dropout(p=dropout),
            nn.Linear(hid_dim, out_dim, bias=True),
        )

    def forward(self, x, return_feat=False):
        out = self.nn(x)
        if return_feat:
            return out, x
        return out


# BertClassifier class from the image
class BertClassifier(nn.Module):
    FEAT_LEN = 768

    def __init__(self, raw_bert, classifier):
        super().__init__()
        self.bert = raw_bert
        self.fc = classifier

    def forward(self, input_ids, attention_mask=None):
        # BERT model forward pass
        feature = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        # print(feature.last_hidden_state.shape)
        out = self.fc(feature.last_hidden_state.flatten(1))  # Flatten [CLS] token representation
        return out, feature.last_hidden_state.flatten(1)

# Load the pre-trained BERT model
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
extractor = BertModel.from_pretrained('bert-base-cased')
ngpus, dropout = torch.cuda.device_count(), 0.35
num_tokens, hidden_dim, out_dim = 256, 512, 100  # Adjust out_dim as necessary

# Use the LogisticRegression and BertClassifier from the image
classifier = LogisticRegression(768 * num_tokens, hidden_dim, out_dim, dropout=dropout)
model = BertClassifier(extractor, classifier)

# Load the model weights (assuming they are for the updated model)
model_path = '/content/drive/MyDrive/msc_project/model/contrastive/club_try2/style_encoder_supcon_9.pt'
model.load_state_dict(torch.load(model_path))

model.to(device)
model.eval()

# Load the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')

# Define the forward function for prediction
def predict(inputs, attention_mask=None):
    output, _ = model(inputs, attention_mask=attention_mask)
    return output

# Forward function for classification used by LayerIntegratedGradients
def classify_forward_func(inputs, attention_mask=None):
    logits = predict(inputs, attention_mask=attention_mask)
    return logits

# Reference token IDs
ref_token_id = tokenizer.pad_token_id
sep_token_id = tokenizer.sep_token_id
cls_token_id = tokenizer.cls_token_id

# def construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id):
#     text_ids = tokenizer.encode(text, add_special_tokens=False)
#     input_ids = [cls_token_id] + text_ids + [sep_token_id]
#     ref_input_ids = [cls_token_id] + [ref_token_id] * len(text_ids) + [sep_token_id]
#     return torch.tensor([input_ids], device=device), torch.tensor([ref_input_ids], device=device)

# def construct_attention_mask(input_ids):
#     return torch.ones_like(input_ids)

def construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id, max_length=256):
    # Tokenize the text with padding
    inputs = tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=max_length,
        padding='max_length',  # Pad to max length
        truncation=True,
        return_tensors="pt",
        return_attention_mask=True
    )

    # Get input_ids and attention_mask
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)

    # Create reference input filled with the pad token but with [CLS] and [SEP] tokens in their places
    ref_input_ids = input_ids.clone()
    ref_input_ids[:, 1:-1] = ref_token_id  # Fill everything except [CLS] and [SEP] with the pad token
    ref_input_ids[:, 0] = cls_token_id  # [CLS] at the beginning
    ref_input_ids[:, -1] = sep_token_id  # [SEP] at the end

    return input_ids, ref_input_ids, attention_mask


# Load the test dataframe
nlp_test = pd.read_csv('/content/drive/MyDrive/msc_project/data/diffusiondb/paraphrased/test_random100_label_1.csv')
nlp_test = nlp_test[['prompt', 'user_name']]
nlp_test.columns = ['content', 'Target']
# encoder_path = '/content/drive/MyDrive/label_encoder_80.pkl'
# label_encoder = joblib.load(encoder_path)


from collections import defaultdict

# Dictionary to hold attribution scores per author
# Structure: {author_name: {token: [list_of_attributions]}}
author_attributions = defaultdict(lambda: defaultdict(list))
delta_all = []
attributions_sum_all = []

# Run Layer Integrated Gradients on a sample and accumulate attributions
for idx, row in tqdm(nlp_test.iterrows(), total=len(nlp_test)):
    text = row['content']
    author = row['Target']  # 'Target' corresponds to the author

    input_ids, ref_input_ids, attention_mask = construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id)

    # Get predictions
    logits = predict(input_ids, attention_mask=attention_mask)

    # Target label index
    target_label_idx = torch.argmax(logits).item()

    # Layer Integrated Gradients
    lig = LayerIntegratedGradients(classify_forward_func, model.bert.embeddings)

    # Compute attributions with respect to the BERT embeddings
    attributions, delta = lig.attribute(
        inputs=input_ids.long(),
        baselines=ref_input_ids,
        additional_forward_args=(attention_mask,),
        target=target_label_idx,
        return_convergence_delta=True
    )

    # Summarize attributions
    def summarize_attributions(attributions):
        attributions = attributions.sum(dim=-1).squeeze(0)
        attributions = attributions / torch.norm(attributions)
        return attributions

    attributions_sum = summarize_attributions(attributions)

    # Move delta to CPU and detach it
    delta_cpu = delta.cpu().detach().numpy()
    delta_all.append(delta_cpu)

    def remove_pad_tokens(tokens, attributions, pad_token_id):
        filtered_tokens = []
        filtered_attributions = []
        for token, attr in zip(tokens, attributions):
            if token != tokenizer.pad_token:  # Check if the token is not a pad token
                filtered_tokens.append(token)
                filtered_attributions.append(attr)  # Convert tensor to value
        return filtered_tokens, filtered_attributions

    # Convert tokens and attributions to CPU-friendly format
    indices = input_ids[0].detach().tolist()
    all_tokens = tokenizer.convert_ids_to_tokens(indices)

    filtered_tokens, filtered_attributions = remove_pad_tokens(all_tokens, attributions_sum.tolist(), tokenizer.pad_token_id)

    # Convert filtered_attributions to CPU-friendly format
    filtered_attributions = torch.tensor(filtered_attributions).cpu().detach().numpy().tolist()
    attributions_sum_all.append(filtered_attributions)

    # Accumulate attribution scores for each token for the current author
    for token, attribution in zip(filtered_tokens, filtered_attributions):
        author_attributions[author][token].append(attribution)

# Now calculate the average attribution score for each token per author
author_avg_attributions = {}

for author, token_attributions in author_attributions.items():
    author_avg_attributions[author] = {}
    for token, attributions in token_attributions.items():
        # Calculate the average attribution score for each token
        avg_attribution = np.mean(attributions)
        author_avg_attributions[author][token] = avg_attribution

# # Example output: print the average attributions for each author
# for author, token_avg_attributions in author_avg_attributions.items():
#     print(f"\nAuthor: {author}")
#     for token, avg_attribution in token_avg_attributions.items():
#         print(f"Token: {token}, Average Attribution: {avg_attribution}")



In [None]:
author_avg_attributions_sorted = {}

for author, token_attributions in author_attributions.items():
    author_avg_attributions = {}
    for token, attributions in token_attributions.items():
        # Calculate the average attribution score for each token
        avg_attribution = np.mean(attributions)
        author_avg_attributions[token] = avg_attribution

    # Sort tokens by average attribution score in descending order
    sorted_tokens = sorted(author_avg_attributions.items(), key=lambda x: x[1], reverse=True)

    # Store sorted tokens for the author
    author_avg_attributions_sorted[author] = sorted_tokens

# # Example output: print the sorted average attributions for each author
# for author, sorted_tokens in author_avg_attributions_sorted.items():
#     print(f"\nAuthor: {author}")
#     for token, avg_attribution in sorted_tokens:
#         print(f"Token: {token}, Average Attribution: {avg_attribution}")

In [None]:
# Example output: print the sorted average attributions for each author
for author, sorted_tokens in author_avg_attributions_sorted.items():
    print(f"\nAuthor: {author}")
    count = 0
    for token, avg_attribution in sorted_tokens:
        print(f"Token: {token}, Average Attribution: {avg_attribution}")
        count += 1
        if count == 10:
          break

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from transformers import BertTokenizer, BertModel
from captum.attr import LayerIntegratedGradients, visualization as viz
import joblib

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Logistic Regression Classifier from the image
class LogisticRegression(nn.Module):
    def __init__(self, in_dim, hid_dim, out_dim, dropout=0):
        super().__init__()
        print(f'Logistic Regression classifier of dim ({in_dim} {hid_dim} {out_dim})')

        self.nn = nn.Sequential(
            nn.Dropout(p=dropout),
            nn.Linear(in_dim, hid_dim, bias=True),
            nn.LeakyReLU(negative_slope=0.2, inplace=True),
            nn.Dropout(p=dropout),
            nn.Linear(hid_dim, out_dim, bias=True),
        )

    def forward(self, x, return_feat=False):
        out = self.nn(x)
        if return_feat:
            return out, x
        return out


# BertClassifier class from the image
class BertClassifier(nn.Module):
    FEAT_LEN = 768

    def __init__(self, raw_bert, classifier):
        super().__init__()
        self.bert = raw_bert
        self.fc = classifier

    def forward(self, input_ids, attention_mask=None):
        # BERT model forward pass
        feature = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        # print(feature.last_hidden_state.shape)
        out = self.fc(feature.last_hidden_state.flatten(1))  # Flatten [CLS] token representation
        return out, feature.last_hidden_state.flatten(1)

# Load the pre-trained BERT model
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')
extractor = BertModel.from_pretrained('bert-base-cased')
ngpus, dropout = torch.cuda.device_count(), 0.35
num_tokens, hidden_dim, out_dim = 256, 512, 100  # Adjust out_dim as necessary

# Use the LogisticRegression and BertClassifier from the image
classifier = LogisticRegression(768 * num_tokens, hidden_dim, out_dim, dropout=dropout)
model = BertClassifier(extractor, classifier)

# Load the model weights (assuming they are for the updated model)
model_path = '/content/drive/MyDrive/msc_project/model/contrastive/contrax/exp_data/diffusiondb100_cls_para_bert-base-cased_coe0.0_temp0.1_unit2_epoch30/diffusiondb100_cls_para_val0.72073_e24.pt'
model.load_state_dict(torch.load(model_path))

model.to(device)
model.eval()

# Load the tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')

# Define the forward function for prediction
def predict(inputs, attention_mask=None):
    output, _ = model(inputs, attention_mask=attention_mask)
    return output

# Forward function for classification used by LayerIntegratedGradients
def classify_forward_func(inputs, attention_mask=None):
    logits = predict(inputs, attention_mask=attention_mask)
    return logits

# Reference token IDs
ref_token_id = tokenizer.pad_token_id
sep_token_id = tokenizer.sep_token_id
cls_token_id = tokenizer.cls_token_id

# def construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id):
#     text_ids = tokenizer.encode(text, add_special_tokens=False)
#     input_ids = [cls_token_id] + text_ids + [sep_token_id]
#     ref_input_ids = [cls_token_id] + [ref_token_id] * len(text_ids) + [sep_token_id]
#     return torch.tensor([input_ids], device=device), torch.tensor([ref_input_ids], device=device)

# def construct_attention_mask(input_ids):
#     return torch.ones_like(input_ids)

def construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id, max_length=256):
    # Tokenize the text with padding
    inputs = tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=max_length,
        padding='max_length',  # Pad to max length
        truncation=True,
        return_tensors="pt",
        return_attention_mask=True
    )

    # Get input_ids and attention_mask
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)

    # Create reference input filled with the pad token but with [CLS] and [SEP] tokens in their places
    ref_input_ids = input_ids.clone()
    ref_input_ids[:, 1:-1] = ref_token_id  # Fill everything except [CLS] and [SEP] with the pad token
    ref_input_ids[:, 0] = cls_token_id  # [CLS] at the beginning
    ref_input_ids[:, -1] = sep_token_id  # [SEP] at the end

    return input_ids, ref_input_ids, attention_mask


# Load the test dataframe
nlp_test = pd.read_csv('/content/drive/MyDrive/msc_project/data/diffusiondb/paraphrased/test_random100_label_1.csv')
nlp_test = nlp_test[['prompt', 'user_name']]
nlp_test.columns = ['content', 'Target']
# encoder_path = '/content/drive/MyDrive/label_encoder_80.pkl'
# label_encoder = joblib.load(encoder_path)


from collections import defaultdict

# Dictionary to hold attribution scores per author
# Structure: {author_name: {token: [list_of_attributions]}}
author_attributions = defaultdict(lambda: defaultdict(list))
delta_all = []
attributions_sum_all = []

# Run Layer Integrated Gradients on a sample and accumulate attributions
for idx, row in tqdm(nlp_test.iterrows(), total=len(nlp_test)):
    text = row['content']
    author = row['Target']  # 'Target' corresponds to the author

    input_ids, ref_input_ids, attention_mask = construct_input_ref_pair(text, ref_token_id, sep_token_id, cls_token_id)

    # Get predictions
    logits = predict(input_ids, attention_mask=attention_mask)

    # Target label index
    target_label_idx = torch.argmax(logits).item()

    # Layer Integrated Gradients
    lig = LayerIntegratedGradients(classify_forward_func, model.bert.embeddings)

    # Compute attributions with respect to the BERT embeddings
    attributions, delta = lig.attribute(
        inputs=input_ids.long(),
        baselines=ref_input_ids,
        additional_forward_args=(attention_mask,),
        target=target_label_idx,
        return_convergence_delta=True
    )

    # Summarize attributions
    def summarize_attributions(attributions):
        attributions = attributions.sum(dim=-1).squeeze(0)
        attributions = attributions / torch.norm(attributions)
        return attributions

    attributions_sum = summarize_attributions(attributions)

    # Move delta to CPU and detach it
    delta_cpu = delta.cpu().detach().numpy()
    delta_all.append(delta_cpu)

    def remove_pad_tokens(tokens, attributions, pad_token_id):
        filtered_tokens = []
        filtered_attributions = []
        for token, attr in zip(tokens, attributions):
            if token != tokenizer.pad_token:  # Check if the token is not a pad token
                filtered_tokens.append(token)
                filtered_attributions.append(attr)  # Convert tensor to value
        return filtered_tokens, filtered_attributions

    # Convert tokens and attributions to CPU-friendly format
    indices = input_ids[0].detach().tolist()
    all_tokens = tokenizer.convert_ids_to_tokens(indices)

    filtered_tokens, filtered_attributions = remove_pad_tokens(all_tokens, attributions_sum.tolist(), tokenizer.pad_token_id)

    # Convert filtered_attributions to CPU-friendly format
    filtered_attributions = torch.tensor(filtered_attributions).cpu().detach().numpy().tolist()
    attributions_sum_all.append(filtered_attributions)

    # Accumulate attribution scores for each token for the current author
    for token, attribution in zip(filtered_tokens, filtered_attributions):
        author_attributions[author][token].append(attribution)

author_avg_attributions_sorted = {}

for author, token_attributions in author_attributions.items():
    author_avg_attributions = {}
    for token, attributions in token_attributions.items():
        # Calculate the average attribution score for each token
        avg_attribution = np.mean(attributions)
        author_avg_attributions[token] = avg_attribution

    # Sort tokens by average attribution score in descending order
    sorted_tokens = sorted(author_avg_attributions.items(), key=lambda x: x[1], reverse=True)

    # Store sorted tokens for the author
    author_avg_attributions_sorted[author] = sorted_tokens

# Example output: print the sorted average attributions for each author
for author, sorted_tokens in author_avg_attributions_sorted.items():
    print(f"\nAuthor: {author}")
    for token, avg_attribution in sorted_tokens:
        print(f"Token: {token}, Average Attribution: {avg_attribution}")


In [None]:
data = {
    'delta': delta_all,
    'attributions_sum': attributions_sum_all
}

# Convert the dictionary into a DataFrame
df = pd.DataFrame(data)

# Display the DataFrame (optional)
print(df)

# Save the DataFrame to a CSV file
df.to_csv('output.csv', index=False)

In [None]:
!pip install spacy
!python -m spacy download en_core_web_sm


In [None]:
import spacy
from transformers import BertTokenizer

# Load spaCy model
nlp = spacy.load("en_core_web_sm")

# Load BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')

def get_pos_tags(text):
    # Apply spaCy POS tagging on the original text
    doc = nlp(text)
    pos_tags = [(token.text, token.pos_) for token in doc]
    return pos_tags

# Example text
text = "The quick brown fox jumps over the lazy dog."

# Get POS tags for the original sentence
pos_tags = get_pos_tags(text)
print("Original POS tags:", pos_tags)


def map_pos_to_bert_tokens(text, tokenizer):
    # Get POS tags using spaCy
    doc = nlp(text)
    original_words = [token.text for token in doc]
    pos_tags = [token.pos_ for token in doc]

    # Tokenize using BERT
    bert_tokens = tokenizer.tokenize(text)

    mapped_tokens = []
    pos_pointer = 0

    # Map the POS tags to the BERT tokens
    for token in bert_tokens:
        if token.startswith("##"):
            # For subword tokens, we keep the same POS tag as the previous word
            mapped_tokens.append((token, pos_tags[pos_pointer - 1]))
        else:
            # Assign the current word's POS tag
            mapped_tokens.append((token, pos_tags[pos_pointer]))
            pos_pointer += 1

        # Ensure we don't exceed the original word list
        if pos_pointer >= len(pos_tags):
            break

    return mapped_tokens

# Example sentence
text = "The quick brown fox jumps over the lazy dog."

# Get BERT tokens and their corresponding POS tags
bert_pos_mapping = map_pos_to_bert_tokens(text, tokenizer)
print("BERT tokens with POS tags:", bert_pos_mapping)


# Assuming you have a list of BERT tokens and attributions
def analyze_pos_attributions(text, attributions, tokenizer):
    bert_pos_mapping = map_pos_to_bert_tokens(text, tokenizer)

    for (token, pos), attribution in zip(bert_pos_mapping, attributions):
        print(f"Token: {token}, POS: {pos}, Attribution: {attribution}")

# Example: Assuming you have a text and the attributions from your model
text = "The quick brown fox jumps over the lazy dog."
attributions = [0.1, 0.05, 0.03, 0.12, 0.5, 0.2, 0.1, 0.07, 0.08]
analyze_pos_attributions(text, attributions, tokenizer)


In [None]:
import pandas as pd
import numpy as np
import spacy
from transformers import BertTokenizer

# Load spaCy model for POS tagging
nlp = spacy.load("en_core_web_sm")

# Load BERT tokenizer
tokenizer = BertTokenizer.from_pretrained('bert-base-cased')

# Function to get POS tags from original text using spaCy
def get_pos_tags(text):
    doc = nlp(text)
    pos_tags = [(token.text, token.pos_) for token in doc]
    return pos_tags

# Function to map POS tags to BERT tokens
def map_pos_to_bert_tokens(text, tokenizer):
    doc = nlp(text)
    original_words = [token.text for token in doc]
    pos_tags = [token.pos_ for token in doc]

    # Tokenize using BERT
    bert_tokens = tokenizer.tokenize(text)

    mapped_tokens = []
    pos_pointer = 0

    # Map the POS tags to the BERT tokens
    for token in bert_tokens:
        if token.startswith("##"):
            # For subword tokens, we keep the same POS tag as the previous word
            mapped_tokens.append((token, pos_tags[pos_pointer - 1]))
        else:
            # Assign the current word's POS tag
            mapped_tokens.append((token, pos_tags[pos_pointer]))
            pos_pointer += 1

        # Ensure we don't exceed the original word list
        if pos_pointer >= len(pos_tags):
            break

    return mapped_tokens

# Initialize dictionary to store attributions by POS tag
def calculate_average_attributions_by_pos(df):
    pos_attributions = {}

    # Loop over the rows of the dataframe
    for idx, row in df.iterrows():
        text = row['content']
        attributions_sum = eval(row['attributions_sum'])  # Assuming this is a list of attribution values
        print(len(attributions_sum))

        # Get BERT tokens and POS tags
        bert_pos_mapping = map_pos_to_bert_tokens(text, tokenizer)
        print(len(bert_pos_mapping))

        # Ensure we have the same length for BERT tokens and attributions
        if len(bert_pos_mapping) != len(attributions_sum):
            print(f"Warning: Mismatch in token count and attributions at row {idx}")
            continue

        # Accumulate attribution scores by POS tags
        for (token, pos), attribution in zip(bert_pos_mapping, attributions_sum):
            if pos not in pos_attributions:
                pos_attributions[pos] = {'total_attr': 0, 'count': 0}

            pos_attributions[pos]['total_attr'] += attribution
            pos_attributions[pos]['count'] += 1

    # Calculate average attribution score for each POS tag
    average_pos_attributions = {pos: pos_data['total_attr'] / pos_data['count'] for pos, pos_data in pos_attributions.items()}

    return average_pos_attributions

# Now apply the function to calculate average attributions for each POS tag
average_attributions_by_pos = calculate_average_attributions_by_pos(df)

# Print the results
print("Average attribution scores by POS tags:")
for pos, avg_attr in average_attributions_by_pos.items():
    print(f"{pos}: {avg_attr}")


In [None]:
import torch
import torch.nn.functional as F
from transformers import BertTokenizer, BertModel
from captum.attr import IntegratedGradients
import pandas as pd

# Check if GPU is available
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Load your trained content encoder and tokenizer
text_encoder = BertModel.from_pretrained('your_text_encoder').to(device)
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Load precomputed author embeddings (from training) and move to GPU
author_embeddings = torch.load('author_embeddings.pth').to(device)

# Switch model to evaluation mode
text_encoder.eval()

# Function to compute text embeddings using your encoder
def get_text_embedding(inputs):
    inputs = {k: v.to(device) for k, v in inputs.items()}  # Move inputs to GPU
    outputs = text_encoder(**inputs)
    text_embedding = outputs.last_hidden_state.mean(dim=1)  # Averaging token embeddings
    return text_embedding

# Cosine similarity function
def cosine_similarity(text_embedding, author_id):
    author_embedding = author_embeddings[author_id].unsqueeze(0)
    cosine_sim = F.cosine_similarity(text_embedding, author_embedding)
    return cosine_sim

# Forward function for computing cosine similarity
def forward_func(inputs, author_id):
    text_embedding = get_text_embedding(inputs)
    cosine_sim = cosine_similarity(text_embedding, author_id)
    return cosine_sim

# Integrated Gradients setup
ig = IntegratedGradients(lambda inputs: forward_func(inputs, author_id))

# Example test set with ground truth authors
test_set = [
    {"text": "Sample sentence 1.", "author_id": 0},
    {"text": "Sample sentence 2.", "author_id": 1},
    # Add more samples from your test set
]

# Store the results in a list of dictionaries
results = []

for sample in test_set:
    text = sample["text"]
    author_id = sample["author_id"]

    # Tokenize the input text
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True)

    # Move inputs to GPU
    inputs = {k: v.to(device) for k, v in inputs.items()}

    # Compute Integrated Gradients attributions
    attributions = ig.attribute(inputs['input_ids'], target=0)  # Cosine similarity doesn't have specific classes

    # Move attributions back to CPU for processing
    attributions = attributions.cpu()

    # Convert input IDs to tokens
    tokens = tokenizer.convert_ids_to_tokens(inputs['input_ids'][0].cpu())

    # Store the tokens and their corresponding attributions
    attributions_list = attributions.detach().numpy().tolist()[0]

    # Append the results
    results.append({
        "text": text,
        "author_id": author_id,
        "tokens": tokens,
        "attributions": attributions_list
    })

# Convert the results to a DataFrame for better visualization
df = pd.DataFrame(results)
tools.display_dataframe_to_user(name="Test Set Integrated Gradients", dataframe=df)

# You can also save it to a CSV file
df.to_csv('integrated_gradients_test_set_stage2.csv', index=False)
