In [1]:
import numpy as np
from typing import Dict, List
import pandas as pd
# from transformers.utils import logging
from transformers import AutoModelForCausalLM
# logging.set_verbosity_error()
import os
import torch
from tqdm import tqdm

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
from lm_polygraph import WhiteboxModel
from transformers import AutoTokenizer


model_name = 'mistralai/Mistral-7B-v0.1'
hf_model = AutoModelForCausalLM.from_pretrained(model_name, token=TOKEN)
tokenizer = AutoTokenizer.from_pretrained(model_name,token=TOKEN)
model = WhiteboxModel(hf_model)

In [3]:
model = WhiteboxModel(hf_model, tokenizer)

In [6]:
import pandas as pd

semeval = pd.read_csv('texts/semeval_en.csv')

In [9]:
# from lm_polygraph.utils.manager import UEManager, estimate_uncertainty, UncertaintyOutput
from lm_polygraph.stat_calculators.stat_calculator import StatCalculator
from lm_polygraph.stat_calculators.embeddings import get_embeddings_from_output
from lm_polygraph.utils.dataset import Dataset
from lm_polygraph.utils.model import WhiteboxModel, BlackboxModel, Model
from lm_polygraph.utils.processor import Processor

In [29]:
from criteria import StopWordCriteria
stop_words = ["\n"]
stopping_criteria = StopWordCriteria(tokenizer=tokenizer, prompts=[], stop_words=stop_words)

In [24]:
class GreedyProbsCalculator(StatCalculator):
    """
    For Whitebox model (lm_polygraph.WhiteboxModel), at input texts batch calculates:
    * generation texts
    * tokens of the generation texts
    * probabilities distribution of the generated tokens
    * attention masks across the model (if applicable)
    * embeddings from the model
    """

    def __init__(self, n_alternatives: int = 2):
        self.n_alternatives = n_alternatives

    def __call__(
        self,
        dependencies: Dict[str, np.array],
        texts: List[str],
        model: WhiteboxModel,
        
        max_new_tokens: int = 40,
    ) -> Dict[str, np.ndarray]:
        """
        Calculates the statistics of probabilities at each token position in the generation.

        Parameters:
            dependencies (Dict[str, np.ndarray]): input statistics, can be empty (not used).
            texts (List[str]): Input texts batch used for model generation.
            model (Model): Model used for generation.
            max_new_tokens (int): Maximum number of new tokens at model generation. Default: 100.
        Returns:
            Dict[str, np.ndarray]: dictionary with the following items:
                - 'input_texts' (List[str]): input texts batch,
                - 'input_tokens' (List[List[int]]): tokenized input texts,
                - 'greedy_log_probs' (List[List[np.array]]): logarithms of autoregressive
                        probability distributions at each token,
                - 'greedy_texts' (List[str]): model generations corresponding to the inputs,
                - 'greedy_tokens' (List[List[int]]): tokenized model generations,
                - 'attention' (List[List[np.array]]): attention maps at each token, if applicable to the model,
                - 'greedy_log_likelihoods' (List[List[float]]): log-probabilities of the generated tokens.
        """
        batch: Dict[str, torch.Tensor] = model.tokenize(texts)
        # encodeds = model.tokenizer.apply_chat_template(texts, return_tensors="pt")
        # model_inputs = encodeds.to(model.device())
        batch = {k: v.to(model.device()) for k, v in batch.items()}
        # print(model_inputs)
        with torch.no_grad():
            out = model.generate(
                **batch,
                # model_inputs,
                output_scores=True,
                return_dict_in_generate=True,
                max_new_tokens=20,
                min_new_tokens=10,
                output_attentions=False,
                output_hidden_states=True,
                # suppress_tokens=(
                #     []
                #     if model.generation_parameters.allow_newlines
                #     else [
                #         t
                #         for t in range(len(model.tokenizer))
                #         if "\n" in model.tokenizer.decode([t])
                #     ]
                # ),
                stopping_criteria=[StopWordCriteria(tokenizer=tokenizer, prompts=texts, stop_words=stop_words)]
            )
            logits = torch.stack(out.scores, dim=1)

            sequences = out.sequences
            embeddings_encoder, embeddings_decoder = get_embeddings_from_output(
                out, batch, model.model_type
            )

        cut_logits = []
        cut_sequences = []
        cut_texts = []
        cut_alternatives = []
        for i in range(len(texts)):
            if model.model_type == "CausalLM":
                idx = batch["input_ids"].shape[1]
                seq = sequences[i, idx:].cpu()
            else:
                seq = sequences[i, 1:].cpu()
            length, text_length = len(seq), len(seq)
            for j in range(len(seq)):
                if seq[j] == model.tokenizer.eos_token_id:
                    length = j + 1
                    text_length = j
                    break
            cut_sequences.append(seq[:length].tolist())
            cut_texts.append(model.tokenizer.decode(seq[:text_length]))
            cut_logits.append(logits[i, :length, :].cpu().numpy())
            cut_alternatives.append([[] for _ in range(length)])
            for j in range(length):
                lt = logits[i, j, :].cpu().numpy()
                best_tokens = np.argpartition(lt, -self.n_alternatives)
                ln = len(best_tokens)
                best_tokens = best_tokens[ln - self.n_alternatives : ln]
                for t in best_tokens:
                    cut_alternatives[-1][j].append((t.item(), lt[t].item()))
                cut_alternatives[-1][j].sort(
                    key=lambda x: x[0] == cut_sequences[-1][j],
                    reverse=True,
                )

        ll = []
        for i in range(len(texts)):
            log_probs = cut_logits[i]
            tokens = cut_sequences[i]
            assert len(tokens) == len(log_probs)
            ll.append([log_probs[j, tokens[j]] for j in range(len(log_probs))])

        if model.model_type == "CausalLM":
            embeddings_dict = {
                "embeddings_decoder": embeddings_decoder,
            }
        elif model.model_type == "Seq2SeqLM":
            embeddings_dict = {
                "embeddings_encoder": embeddings_encoder,
                "embeddings_decoder": embeddings_decoder,
            }
        else:
            raise NotImplementedError

        result_dict = {
            "input_texts": texts,
            "input_tokens": batch["input_ids"].to("cpu").tolist(),
            "greedy_log_probs": cut_logits,
            "greedy_tokens": cut_sequences,
            "greedy_tokens_alternatives": cut_alternatives,
            "greedy_texts": cut_texts,
            "greedy_log_likelihoods": ll,
        }
        result_dict.update(embeddings_dict)
        logprobs = result_dict["greedy_log_probs"]
        entropies = []
        for s_lp in logprobs:
            entropies.append([])
            for lp in s_lp:
                mask = ~np.isinf(lp)
                entropies[-1].append(-np.sum(np.array(lp[mask]) * np.exp(lp[mask])))
        result_dict['entropy'] = entropies
        return result_dict

In [26]:
semeval.iloc[0].Text

"You now have to sign up for a extracurricular activity at school. You don't want to sign up for one because you already have a lot to do at home, you don't want to stress yourself out more than you already are. Why do you have to do this when you don't want to? Should kids have to do an extracurricular activity? I think they shouldn't  because it could cost money its their choice if they want to. In a recent survey that the Boston Globe did said that parents are less likely to sign up for an activity that cost over 100 dollars. I think this is the most important reason, parents shouldn't have to spend money. Maybe they are in a tough time and don't have a lot of money to spend on sports equipment. Most sports you sign up for you have to buy equipment or there is a sign up fee. If there is a free activity, sure they can do that, but most sports cost money. If the parents want to spend that money, they can. I'm not saying they shouldn't, I'm just saying it could cause problems for less 

In [None]:
greedy = GreedyProbsCalculator()
texts = [t for t in train.Text.values]
stat = {}

for calculator in [
    GreedyProbsCalculator()
]:
    stat.update(calculator(stat, texts, model))

In [None]:
def calculate_confidence(log_likelihoods):
    # Convert to probabilities
    probabilities = np.exp(log_likelihoods)
    # Normalize to [0,1] range if needed
    return probabilities / np.sum(probabilities)

In [None]:
def calculate_entropy_from_log_likelihoods(log_likelihoods):
    # Convert to probabilities first
    probabilities = np.exp(log_likelihoods)
    # Normalize probabilities
    probabilities = probabilities / np.sum(probabilities)
    # Calculate entropy
    return -np.sum(probabilities * log_likelihoods)

In [None]:
stats = []
ue_metrics = []
from tqdm import tqdm

for text in tqdm(train.Text.values):
    stat.update(calculator(stat, [text], model))
    log_likelihoods = stat['greedy_log_likelihoods']
    # print(log_likelihoods)
    if len(stat['greedy_texts'][0].strip()) > 2:
        log_likelihoods = stat['greedy_log_likelihoods'][0][:2]
        # print(log_likelihoods)
    msp = -np.sum(log_likelihoods)
    confidence = calculate_confidence(log_likelihoods)
    perplexity = np.exp(-np.mean(log_likelihoods))
    mean_token_entropy = calculate_entropy_from_log_likelihoods(log_likelihoods)
    stats.append(stat)
    ue_metrics.append({'msp': msp, 'perplexity': perplexity, 'confidence': confidence, 'entropy': mean_token_entropy})

In [55]:
ue_metrics

[{'msp': 1.3480228,
  'perplexity': 0.6740114,
  'entropy': array([1.9506748], dtype=float32)},
 {'msp': 0.17804258,
  'perplexity': 0.08902129,
  'entropy': array([0.7876212], dtype=float32)},
 {'msp': 2.8124993,
  'perplexity': 1.4062496,
  'entropy': array([3.1080236], dtype=float32)},
 {'msp': 0.56219023,
  'perplexity': 0.28109512,
  'entropy': array([1.1193432], dtype=float32)},
 {'msp': 5.953055,
  'perplexity': 2.9765275,
  'entropy': array([4.013937], dtype=float32)},
 {'msp': 2.1414292,
  'perplexity': 1.0707146,
  'entropy': array([2.0098016], dtype=float32)},
 {'msp': 2.1088276,
  'perplexity': 1.0544138,
  'entropy': array([3.380998], dtype=float32)},
 {'msp': 4.2997856,
  'perplexity': 2.1498928,
  'entropy': array([2.47094], dtype=float32)},
 {'msp': 0.8898622,
  'perplexity': 0.4449311,
  'entropy': array([1.5326345], dtype=float32)},
 {'msp': 1.7788925,
  'perplexity': 0.08894463,
  'entropy': array([0.5906372], dtype=float32)}]