In [None]:
!pip install -q lm-polygraph

In [None]:
from IPython.display import clear_output
# !git clone https://github.com/IINemo/lm-polygraph.git
!git clone https://github.com/sn0rkmaiden/lm-polygraph.git
%cd lm-polygraph/src
%pip install transformers rouge-score datasets

In [3]:
import torch

# model_name_or_path = "mistralai/Mistral-7B-Instruct-v0.2"
# model_name = "NousResearch/Llama-2-7b-chat-hf"
# model_name = "google/gemma-2-2b"
model_name = "HuggingFaceH4/zephyr-7b-beta"
device = "cuda" if torch.cuda.is_available() else "cpu"

In [4]:
batch_size = 1

In [5]:
from huggingface_hub import login
import os

# use your huggingface token HERE
os.environ["HF_KEY"] = ""
login(token=os.environ.get('HF_KEY'), add_to_git_credential=False)

In [72]:
import gc
gc.collect()
torch.cuda.empty_cache()

In [7]:
from transformers import AutoModelForCausalLM, AutoTokenizer, GenerationConfig, GemmaForCausalLM

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    load_in_8bit=True,
    device_map="cuda",
    torch_dtype=torch.float16,
    low_cpu_mem_usage=True,
)

tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

generation_config = GenerationConfig.from_pretrained(model_name)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.
The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


Loading checkpoint shards:   0%|          | 0/8 [00:00<?, ?it/s]

In [15]:
messages = [
    [
        {
            "role": "user",
            "content": "How many fingers on a coala's foot?"
        }
    ],
    [
        {
            "role": "user",
            "content": "Who sang a song Yesterday?"
        }
    ],
    [
        {
            "role": "user",
            "content": "Кто спел песню Кукла Колдуна?"
        }
    ],
    [
        {
            "role": "user",
            "content": "Translate into French: 'I want a small cup of coffee'"
        }
    ],
    [
        {
            "role": "user",
            "content": "Translate into Wjgnfnsdfjkn: 'I want a small cup of coffee'"
        }
    ]
]

chat_messages = [tokenizer.apply_chat_template(m, tokenize=False) for m in messages]

In [63]:
# @title
from lm_polygraph.stat_calculators.stat_calculator import StatCalculator
from lm_polygraph.stat_calculators.embeddings import get_embeddings_from_output

class OutputWrapper:
    hidden_states = None
    encoder_hidden_states = None
    decoder_hidden_states = None

def _gen_samples2(n_samples, model, batch, **kwargs):
    batch_size = len(batch["input_ids"])
    logits, sequences, embeddings = (
        [[] for _ in range(batch_size)],
        [[] for _ in range(batch_size)],
        [],
    )
    with torch.no_grad():
        for k in range(n_samples):
            out = model.generate(**batch, **kwargs)
            cur_logits = torch.stack(out.scores, dim=1)
            if model.model_type == "CausalLM":
                embeddings.append(
                    {
                        "sample_embeddings_all_decoder": out.hidden_states,
                    }
                )
            elif model.model_type == "Seq2SeqLM":
                embeddings.append(
                    {
                        "sample_embeddings_all_encoder": out.encoder_hidden_states,
                        "sample_embeddings_all_decoder": out.decoder_hidden_states,
                    }
                )
            for i in range(batch_size):
                sequences[i].append(out.sequences[i])
                logits[i].append(cur_logits[i])
    sequences = [s for sample_seqs in sequences for s in sample_seqs]
    return sequences, sum(logits, []), embeddings


class SamplingGenerationCalculator2(StatCalculator):
    """
    For Whitebox model (lm_polygraph.WhiteboxModel), at input texts batch calculates:
    * sampled texts
    * tokens of the sampled texts
    * probabilities of the sampled tokens generation
    """

    @staticmethod
    def meta_info() -> Tuple[List[str], List[str]]:
        """
        Returns the statistics and dependencies for the calculator.
        """

        return [
            "sample_log_probs",
            "sample_tokens",
            "sample_texts",
            "sample_log_likelihoods",
            "sample_embeddings",
        ], []

    def __init__(self, samples_n: int = 10):
        super().__init__()
        self.samples_n = samples_n

    def __call__(
        self,
        dependencies: Dict[str, np.array],
        texts: List[str],
        model: WhiteboxModel,
        max_new_tokens: int = 100,
    ) -> Dict[str, np.ndarray]:
        """
        Calculates the statistics of sampling texts.

        Parameters:
            dependencies (Dict[str, np.ndarray]): input statistics, can be empty (not used).
            texts (List[str]): Input texts batch used for model generation.
            model (Model): Model used for generation.
            max_new_tokens (int): Maximum number of new tokens at model generation. Default: 100.
        Returns:
            Dict[str, np.ndarray]: dictionary with the following items:
                - 'sample_texts' (List[List[str]]): `samples_n` texts for each input text in the batch,
                - 'sample_tokens' (List[List[List[float]]]): tokenized 'sample_texts',
                - 'sample_log_probs' (List[List[float]]): sum of the log probabilities at each token of the sampling generation.
                - 'sample_log_likelihoods' (List[List[List[float]]]): log probabilities at each token of the sampling generation.
                - 'sample_embeddings' (List[List[List[float]]]): embeddings from the middle layer for the last token of the sampling generation.
        """
        batch: Dict[str, torch.Tensor] = model.tokenize(texts)
        batch = {k: v.to(model.device()) for k, v in batch.items()}
        sequences, logits, embeddings = _gen_samples2(
            self.samples_n,
            model,
            batch,
            output_scores=True,
            return_dict_in_generate=True,
            output_hidden_states=True,
            max_new_tokens=max_new_tokens,
            min_new_tokens=2,
            do_sample=True,
            num_beams=1,
            num_return_sequences=1,
            suppress_tokens=(
                []
                if model.generation_parameters.allow_newlines
                else [
                    t
                    for t in range(len(model.tokenizer))
                    if "\n" in model.tokenizer.decode([t])
                ]
            ),
        )

        log_probs = [[] for _ in range(len(texts))]
        tokens = [[] for _ in range(len(texts))]
        texts = [[] for _ in range(len(texts))]
        log_likelihoods = [[] for _ in range(len(texts))]
        if model.model_type == "Seq2SeqLM":
            sequences = [seq[1:] for seq in sequences]
        for i in range(len(logits)):
            log_prob, ll, toks = 0, [], []
            inp_size = (
                len(batch["input_ids"][int(i / self.samples_n)])
                if model.model_type == "CausalLM"
                else 0
            )
            for j in range(len(sequences[i]) - inp_size):
                cur_token = sequences[i][j + inp_size].item()
                log_prob += logits[i][j][cur_token].item()
                if cur_token == model.tokenizer.eos_token_id:
                    break
                ll.append(logits[i][j][cur_token].item())
                toks.append(cur_token)

            log_likelihoods[int(i / self.samples_n)].append(ll)
            log_probs[int(i / self.samples_n)].append(log_prob)
            tokens[int(i / self.samples_n)].append(toks)
            texts[int(i / self.samples_n)].append(model.tokenizer.decode(toks))

        out = OutputWrapper()
        batch_size = len(batch["input_ids"])
        embeddings_last_token = [[] for _ in range(batch_size)]

        for sample_embeddings in embeddings:
            if model.model_type == "CausalLM":
                out.hidden_states = sample_embeddings["sample_embeddings_all_decoder"]
            elif model.model_type == "Seq2SeqLM":
                out.decoder_hidden_states = sample_embeddings[
                    "sample_embeddings_all_decoder"
                ]
                out.encoder_hidden_states = sample_embeddings[
                    "sample_embeddings_all_encoder"
                ]
            _, cur_token_embeddings = get_embeddings_from_output(
                out,
                batch,
                model.model_type,
                level="token",
                hidden_layer=int(model.model.config.num_hidden_layers // 2),
            )

            for i in range(batch_size):
                if len(cur_token_embeddings.shape) > 2:
                    embeddings_last_token[i].append(
                        cur_token_embeddings[i, -1].cpu().detach().numpy()
                    )
                else:
                    embeddings_last_token[i].append(
                        cur_token_embeddings[i].cpu().detach().numpy()
                    )

        return {
            "sample_log_likelihoods": log_likelihoods,
            "sample_log_probs": log_probs,
            "sample_tokens": tokens,
            "sample_texts": texts,
            "sample_embeddings": embeddings_last_token,
        }

In [69]:
from lm_polygraph.stat_calculators.infer_causal_lm_calculator import InferCausalLMCalculator
from lm_polygraph.stat_calculators.greedy_alternatives_nli import GreedyAlternativesNLICalculator
from lm_polygraph.utils.deberta import Deberta
from lm_polygraph.utils.model import WhiteboxModel
from lm_polygraph.model_adapters import WhiteboxModelBasic
from lm_polygraph.estimators import MaximumSequenceProbability, ClaimConditionedProbability, DegMat, SemanticEntropy, SAR
from lm_polygraph.stat_calculators.cross_encoder_similarity import CrossEncoderSimilarityMatrixCalculator
from lm_polygraph.stat_calculators.sample import SamplingGenerationCalculator
from lm_polygraph.stat_calculators.semantic_matrix import SemanticMatrixCalculator
from lm_polygraph.stat_calculators.semantic_classes import SemanticClassesCalculator

from torch.utils.data import DataLoader

model_adapter = WhiteboxModelBasic(model, tokenizer, {})
model_adapter2 = WhiteboxModel(model, tokenizer)

calc_infer_llm = InferCausalLMCalculator(tokenize=False)
nli_model = Deberta(device=device)
nli_model.setup()

calc_nli = GreedyAlternativesNLICalculator(nli_model=nli_model)
calc_samples = SamplingGenerationCalculator2()
calc_cross_encoder = CrossEncoderSimilarityMatrixCalculator()
calc_semantic_matrix = SemanticMatrixCalculator(nli_model=nli_model)
calc_semantic_classes = SemanticClassesCalculator()

args_generate = {"generation_config" : generation_config,
                 "max_new_tokens": 30}

estimators = [MaximumSequenceProbability(), ClaimConditionedProbability(), SemanticEntropy()]
# estimators = [MaximumSequenceProbability(),
#               ClaimConditionedProbability(),
#               DegMat(),
#               SemanticEntropy(),
#               SAR()]

data_loader = DataLoader(chat_messages, batch_size=batch_size, shuffle=False, collate_fn=lambda x: x)

Some weights of the model checkpoint at microsoft/deberta-large-mnli were not used when initializing DebertaForSequenceClassification: ['config']
- This IS expected if you are initializing DebertaForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DebertaForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [None]:
j = 0
for batch in data_loader:
    encoded = tokenizer(batch, padding=True, return_tensors="pt")
    deps = {"model_inputs": encoded}
    deps.update(calc_infer_llm(
        deps, texts=batch, model=model_adapter, args_generate=args_generate))
    deps.update(calc_nli(deps, texts=batch, model=model_adapter))
    deps.update(calc_samples(deps, texts=batch, model=model_adapter2))
    # deps.update(calc_cross_encoder(deps, texts=batch, model=model_adapter))
    deps.update(calc_semantic_matrix(deps, texts=batch, model=model_adapter))
    deps.update(calc_semantic_classes(deps, texts=batch, model=model_adapter))

    generated_texts = tokenizer.batch_decode(deps['greedy_tokens'])
    ues = []
    for estimator in estimators:
        uncertainty_scores = estimator(deps)
        ues.append((str(estimator), uncertainty_scores))

    for i, text in enumerate(generated_texts):

        print(f'Question: {messages[j][0]["content"]} \nOutput: {text}')
        for scores in ues:
            print(f"Uncertainty score by {scores[0]}: {scores[1][i]}")
        print()
    j += 1