In [9]:
from datasets import Dataset
import torch
# load train_df and test_df
train_dataset = Dataset(pa.Table.from_pandas(train_df))
test_dataset = Dataset(pa.Table.from_pandas(test_df))

In [None]:
model=torch.load('used_all_setfit_model.pt')

# Code below is for running integrated gradient


In [10]:
# !pip install -r requirements.txt
import sys
import pandas as pd

from sklearn.metrics import roc_auc_score

from datasets import load_dataset,Dataset
from sentence_transformers.losses import CosineSimilarityLoss
from sentence_transformers.SentenceTransformer import SentenceTransformer
from setfit import SetFitModel, SetFitTrainer
from tqdm.auto import tqdm

from setfit_ig.html_text_colorizer import WordImportanceColorsSetFit
from setfit_ig.integrated_gradients import integrated_gradients_on_text
from setfit_ig.model_head import SklearnToPyTorchLogisticRegression

from setfit_ig.setfit_extensions import SetFitGrad, SetFitModelWithTorchHead

from IPython.display import HTML, display

from sklearn.model_selection import train_test_split
import numpy as np
import torch
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
import pyarrow as pa

## SetFitGrad

In [11]:
import copy
from collections import OrderedDict, namedtuple

import torch
from sentence_transformers.SentenceTransformer import SentenceTransformer
from setfit import SetFitModel
from torch.autograd import grad
from torch.nn import Sequential



class SklearnToPyTorchLogisticRegression(torch.nn.Module):
    """
    Pass a trained sklearn LogisticRegression model to this class
    to create an equivalent PyTorch model.
    """

    def __init__(self, sklearn_model):
        super(SklearnToPyTorchLogisticRegression, self).__init__()

        # Extract the parameters from the sklearn model
        coef = sklearn_model.coef_.T
        intercept = sklearn_model.intercept_

        # Initialize the PyTorch parameters
        self.linear = torch.nn.Linear(coef.shape[1], 3)
        with torch.no_grad():
            self.linear.weight.copy_(torch.from_numpy(coef).cuda())
            self.linear.bias.copy_(torch.from_numpy(intercept).cuda())

    def forward(self, x):
        out = self.linear(x)
        return torch.softmax(out)

    def predict(self, x):
        y_pred = self.forward(x).round().squeeze().int()
        return y_pred

    def predict_proba(self, x):
        # Compute the predicted probabilities of the positive class for input x
        return self.forward(x).squeeze()



class SetFitGrad:
    """
    This class takes a SetFit model and deconstructs its operations to
    allow for exploration of gradients.

    Essentially, instead of passing a sentence and getting the probability of a class,
    we can pass a token embedding tensor and do the same + return the gradient w.r.t to
    each token embedding dimension.

    NOTE: This assumes we are interested in a binary classification problem.
    """

    def __init__(self, model: SetFitModel, tokenizer=None, device: int = None):
        self.model_body = model.model_body
        self.model_head = model.model_head

        if device:
            self.device = device
        else:
            self.device = self.model_body.device

        if tokenizer:
            self.tokenizer = tokenizer
        else:
            self.tokenizer = lambda x: self.model_body.tokenizer(
                x, padding=True, truncation=True, return_tensors="pt"
            ).to(self.device)

        transformer = self.model_body._modules["0"]._modules["auto_model"]
        self.embedder = transformer._modules["embeddings"]
        self.encoder = transformer._modules["encoder"]

        rest_of_processing = OrderedDict(
            {
                key: value
                for key, value in self.model_body._modules.items()
                if key != "0"
            }
        )
        self.rest_of_processing = Sequential(rest_of_processing)

    def model_pass(self, sentence_string: str = None, sentence_token_embedding=None):

        if sentence_token_embedding is None:
            with torch.no_grad():
                sentence = self.tokenizer(sentence_string).to(device=self.device)
                sentence_token_embedding = self.embedder(
                    input_ids=sentence["input_ids"],
                    # token_type_ids=sentence["token_type_ids"],
                )

            attention_mask = sentence["attention_mask"]

            sentence_token_embedding.requires_grad = True
            input_ids = sentence["input_ids"]
        else:

            input_ids = None
            attention_mask_dim = sentence_token_embedding.shape[1:2]
            attention_mask = torch.ones(attention_mask_dim, device=self.device)

        encoder_output = self.encoder(
            sentence_token_embedding, attention_mask=attention_mask
        )

        features = {}
        features["token_embeddings"] = encoder_output[0]
        features["attention_mask"] = attention_mask
        features["input_ids"] = input_ids

        results = self.rest_of_processing(features)

        positive_class_probability = self.model_head.predict_proba(
            results["sentence_embedding"]
        )

        token_embedding_gradients = grad(
            outputs=positive_class_probability,
            inputs=sentence_token_embedding,
            grad_outputs=torch.ones_like(positive_class_probability),
            retain_graph=True,
        )[0].squeeze()

        output = namedtuple(
            "SetFitGrad",
            [
                "positive_class_probability",
                "token_embedding_gradients",
                "sentence_token_embedding",
                "attention_mask",
                "input_ids",
                "sentence_embedding",
            ],
        )

        return output(
            positive_class_probability,
            token_embedding_gradients,
            sentence_token_embedding,
            attention_mask,
            input_ids,
            results["sentence_embedding"],
        )

## IG

In [12]:
from typing import List, Tuple

import numpy
import pandas as pd
import torch
from scipy.special import roots_legendre
from tqdm import tqdm


def integrated_gradients_on_text(
    sentence_string: str, grd: SetFitGrad, integration_steps: int = 100
):
    """
    Implementation of integrated gradients method for attribution.

    Returns a dataframe with each word in "text" alongside their importance to the
    class label being 1. Essentially a bit more refined than calculating the gradient
    of the probability with respect to each word. Positive gradients indicate words that
    would increase the probability and negative gradients the opposite.

    """


    device = grd.model_body.device

    prob, _, target_embed, _, input_ids, _ = grd.model_pass(
        sentence_string=sentence_string
    )


    # TODO encode an empty sentence instead of passing zeros.
    init_embed = torch.zeros_like(target_embed, device=device)

    # don't zero out [CLS] and [SEP] tokens
    init_embed[0, 0, :] = target_embed[0, 0, :]
    init_embed[0, -1, :] = target_embed[0, -1, :]

    (
        final_scores,
        grad_per_integration_step,
    ) = calculate_integrated_gradient_scores(
        grd, integration_steps, init_embed, target_embed
    )

    scores = pd.DataFrame(
        {
            "token": grd.model_body.tokenizer.convert_ids_to_tokens(
                input_ids.squeeze()
            ),
            "token_ids": input_ids.squeeze().to("cpu"),
            "attribution_score": final_scores,
        }
    ).set_index("token_ids")

    word_to_ids = construct_word_to_id_mapping(sentence_string, grd)

    word_to_score = []
    for word, token_ids in word_to_ids:
        key_scores = scores.loc[token_ids, "attribution_score"].sum()
        word_to_score.append((word, key_scores))

    df_word_to_score = (
        pd.DataFrame(word_to_score).rename(columns={0: "words", 1: "score"}).dropna()
    )

    return df_word_to_score, prob, grad_per_integration_step


def calculate_integrated_gradient_scores(
    grd: SetFitGrad,
    num_of_integration_steps: int,
    init_embed: torch.Tensor,
    target_embed: torch.Tensor,
    max_alpha: float = 1.0,
):
    """
    grd: SetFitGrad
    num_of_integration_steps: int
    init_embed: torch, 1 x number of tokens x embedding size
    target_embed: torch.Tensor, 1 x number of tokens x embedding size
    max_alpha: float, up to where to estimate the integral of the gradient curve.
    """
    device = grd.model_body.device

    integration_steps, weights = roots_legendre(num_of_integration_steps)

    # originally the steps are in (-1,1), need to map to (0,1)
    integration_steps = numpy.interp(integration_steps, (-1, 1), (0, max_alpha))
    integration_steps = torch.tensor(integration_steps, device=device)

    # scale the weights to the new interval
    weights = torch.tensor(weights, device="cpu") * max_alpha / 2.0

    new_embed_v = torch.einsum(
        "bp,bqr->bpqr", integration_steps[None, :], target_embed - init_embed
    ).squeeze()
    new_embed_v = new_embed_v + init_embed
    new_embed_v = new_embed_v.type(torch.float32)

    gradient_at_every_perturbation = grd.model_pass(
        sentence_token_embedding=new_embed_v
    )[1].cpu()

    diff = (target_embed - init_embed).cpu().detach()

    weighted_grads_per_integration_step = (
        gradient_at_every_perturbation
        * diff[:, None, None, :]
        * weights[None, :, None, None]
    )

    integrals_per_embedding = weighted_grads_per_integration_step.squeeze().sum(
        axis=0
    )  # number of tokens x embedding dim

    final_scores = integrals_per_embedding.mean(axis=1)
    return final_scores, weighted_grads_per_integration_step


def construct_word_to_id_mapping(sentence_string, grd) -> Tuple[str, List[int]]:

    tok_sentence = grd.model_body.tokenizer.tokenize(sentence_string)

    # temp = grd.model_body.tokenizer.tokenize(sentence_string)
    # tok_sentence = []
    # for word in temp:
    #     tok_sentence.append(word.replace("\u0120",""))

    # print(tok_sentence)

    encoding = grd.model_body.tokenizer.encode(
        sentence_string, add_special_tokens=False
    )
    word_to_ids = [(x, y) for x, y in zip(tok_sentence, encoding)]
    return word_to_ids

## HTML

In [30]:
from typing import Tuple

import numpy
import pandas as pd
from matplotlib import cm
from matplotlib.colors import Normalize, rgb2hex


def hlstr(string: str, color="white") -> str:
    """
    Return HTML markup highlighting text with the desired color.
    """
    return f"<span style=background-color:{color}>{string} </span>"


def colorize(attrs, cmap="PiYG"):
    """
    Compute hex colors based on the attributions for a single instance.
    Uses a diverging colorscale by default and normalizes and scales
    the colormap so that colors are consistent with the attributions.
    """

    # TODO pass an option to have this absolute or relative colouring

    # map colors separately for positive and negative elements
    attrs_ = attrs.copy()
    pos = attrs_[attrs_ > 0]
    # pos_median = np.median(pos)
    pos_median =np.percentile(pos, 75)  
    pos_p  = attrs_[attrs_ >= pos_median]
    pos_s  = attrs_[(attrs_ < pos_median) & (attrs_>0)]

    # attrs_[attrs_ > 0] = numpy.interp(pos, (pos.min(), pos.max()), (0.5, 0.9))
    attrs_[attrs_ >=pos_median] = numpy.interp(pos_p, (pos_p.min(), pos_p.max()), (0.77, 0.77))
    attrs_[(attrs_ < pos_median) & (attrs_>0)] = numpy.interp(pos_s, (pos_s.min(), pos_s.max()), (0.6, 0.6))

    neg = attrs_[attrs_ < 0]
    # attrs_[attrs_ < 0] = numpy.interp(neg, (neg.min(), neg.max()), (0.25, 0.5))
    attrs_[attrs_ <= 0] = numpy.interp(neg, (neg.min(), neg.max()), (0.5, 0.5))

    norm = Normalize(vmin=0, vmax=1)
    cmap = cm.get_cmap(cmap)

    # now compute hex values of colors
    colors = list(map(lambda x: rgb2hex(cmap(norm(x))), attrs_))


    # c = list(map(lambda x: tuple(round(255 * j) for j in x[:3]) , attrs))
    # return colors
    return colors


class WordImportanceColorsSetFit:
    """
    Helper class to quickly colorize sentences based on SetFitGrad.

    TODO this class could probably be altered to work with other scores and
    other attribution methods.
    """

    def __init__(self, scorer: SetFitGrad):
        self.scorer = scorer
    def show_colors_for_sentence(
        self, text: str, integration_steps: int = 100, cmap: str = "bwr"
    ) -> Tuple[str, pd.DataFrame, float, numpy.array]:
        """
        Pass the output of this function to IPython.display.HTML

        from IPython.display import HTML
        HTML(colored_text)


        """

        df_w2s, prob, grad_per_integration_step = integrated_gradients_on_text(
            text, self.scorer, integration_steps=integration_steps
        )

        words = df_w2s['words']
        scores = df_w2s['score']

        a = []
        b = []

        combined = ""
        total = 0
        for i, word in enumerate(words):
            if word[0] == 'Ä ':
                a.append(combined)
                b.append(total)
                combined = word[1:]
                total = scores[i]
            else:
                total += scores[i]
                # total =max(total,scores[i])

                combined += word
        a.append(combined)
        b.append(total)

        df_w2s = pd.DataFrame({"words":a,"score":b})
        words = a
        # words = df_w2s.words.apply(lambda x: x.replace("\u0120", ""))

        colors = colorize(df_w2s.score, cmap=cmap)
        return (
            " ".join(list(map(hlstr, words, colors))),
            df_w2s,
            prob.detach(),
            grad_per_integration_step,
        )


In [14]:
class SklearnToPyTorchLogisticRegression(torch.nn.Module):
    """
    Pass a trained sklearn LogisticRegression model to this class
    to create an equivalent PyTorch model.
    """

    def __init__(self, sklearn_model):
        super(SklearnToPyTorchLogisticRegression, self).__init__()

        # Extract the parameters from the sklearn model
        coef = sklearn_model.coef_
        intercept = sklearn_model.intercept_

        # Initialize the PyTorch parameters
        self.linear = torch.nn.Linear(coef.shape[1], coef.shape[0])
        with torch.no_grad():
            self.linear.weight.copy_(torch.from_numpy(coef))
            self.linear.bias.copy_(torch.from_numpy(intercept))
        self.linear=self.linear.cuda()
    def forward(self, x):
        out = self.linear(x)
        
        return torch.softmax(out,dim=-1)

    def predict(self, x):
        y_pred = self.forward(x).round().squeeze().int()
        return y_pred

    def predict_proba(self, x):
        # Compute the predicted probabilities of the positive class for input x
        return self.forward(x).squeeze()

In [15]:
def apply_html_styles_to_word(html_lines, filename):
    # Create a new Word document
    doc = Document()
    
    for html in html_lines:
        # Parse the HTML content
        soup = BeautifulSoup(html, 'html.parser')
        
        # Create a new paragraph for each line of HTML
        p = doc.add_paragraph()
        
        # Iterate through each part of the parsed HTML
        for element in soup.descendants:
            if element.name == "span" and element.string:
                run = p.add_run(element.string)
                # Check for style attribute and apply styles if present
                style = element.attrs.get('style', '')
                color_match = re.search(r'color:\s*(#[0-9a-fA-F]+)', style)
                if color_match:
                    color_hex = color_match.group(1)
                    run.font.color.rgb = RGBColor(*hex_to_rgb(color_hex))
            elif element.name in ["b", "strong"] and element.string:
                run = p.add_run(element.string)
                run.bold = True
            elif element.name in ["i", "em"] and element.string:
                run = p.add_run(element.string)
                run.italic = True
            elif element.string:
                p.add_run(element.string)
    
    # Save the document
    doc.save(filename)

## Output

In [15]:
from sklearn import preprocessing
import numpy as np
from bs4 import BeautifulSoup
from docx import Document
from docx.shared import RGBColor
import re
import torch
import gc

In [20]:
preds= np.load('full_setfit_preds.npy')

In [31]:
def map_color_(text,score):
    colors = colorize(score, cmap="bwr")
    words = text.split()
    return " ".join(list(map(hlstr, words, colors)))

In [None]:
results_250={}
s=250
torch.cuda.empty_cache()
gc.collect()
for i in range(len(test_dataset)):
    warnings.filterwarnings('ignore')
    torch.cuda.empty_cache()
    gc.collect()
    # print(test_dataset["target"][i],f's={s}')
    results_250[i]={'colors':0,'score':0,'model_prediction':preds[i],'new_prediction':0}
    try:
        colors, d, p = return_prediction(m,test_dataset,preds,i,s)
        # print(preprocessing.normalize([d['score'].to_list()]))
        results_250[i]['colors']=colors
        raw_scores/torch.norm(raw_scores,p=2)
        results_250[i]['score']=preprocessing.normalize([d['score'].to_list()])
        results_250[i]['new_prediction']=p.argmax().item()

    except:
        print(i)
    torch.cuda.empty_cache()
    gc.collect()    


In [33]:

for i in range(len(test_dataset)):
    results_250[i]['setfit_span']=map_color_(test_dataset[i]['text'],results_250[i]['score'][0])


  cmap = cm.get_cmap(cmap)


In [34]:
torch.save(results_250,'setfit_250_allelements.pt')

In [2]:
import torch
results_250 =torch.load('setfit_250_raw.pt')

In [None]:
for i,item in results_250.items():
    if item['colors']==0:
        print(i)
    else:
        display(HTML(item['colors']))
        # print(item['pred'])
    print('\n')
