In [1]:
import os
import gc

import numpy as np
import pandas as pd
import random
import torch
import torch.nn as nn

import math
from collections import Counter
from typing import List, Union

from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# Environment and device configuration
os.environ['OMP_NUM_THREADS'] = '1'
os.environ['TOKENIZERS_PARALLELISM'] = 'false'
PAD_TOKEN_LABEL_ID = torch.nn.CrossEntropyLoss().ignore_index
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
class ParticipantVisibleError(Exception):
    """Custom exception for participant-visible errors."""
    pass


In [None]:
class PerplexityCalculator:
    def __init__(self, model_path: str, load_in_8bit: bool = False, device_map: str = "auto"):
        if load_in_8bit and DEVICE.type != "cuda":
            raise ValueError("8-bit quantization requires a CUDA device")

        if load_in_8bit:
            quant_config = BitsAndBytesConfig(
                load_in_4bit=True,
                bnb_4bit_quant_type="fp4",
                bnb_4bit_use_double_quant=False,
                bnb_4bit_compute_dtype=torch.float16,
            )
            self.model = AutoModelForCausalLM.from_pretrained(
                model_path,
                quantization_config=quant_config,
                device_map=device_map,
            )
        else:
            self.model = AutoModelForCausalLM.from_pretrained(
                model_path,
                torch_dtype=torch.float16 if DEVICE.type == 'cuda' else torch.float32,
                device_map=device_map,
            )

        self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        self.loss_fct = torch.nn.CrossEntropyLoss(reduction="none")
        self.model.eval()

    def get_perplexity(self, input_texts: Union[str, List[str]], batch_size: int = 32) -> Union[float, List[float]]:
        is_single = isinstance(input_texts, str)
        texts = [input_texts] if is_single else input_texts

        losses = []
        num_batches = len(texts) // batch_size + (len(texts) % batch_size != 0)

        for i in range(num_batches):
            batch = texts[i * batch_size: (i + 1) * batch_size]
            with torch.no_grad():
                tokenized_inputs = self.tokenizer(
                    [f"{self.tokenizer.bos_token}{text}{self.tokenizer.eos_token}" for text in batch],
                    return_tensors="pt",
                    padding=True,
                    truncation=True,
                    add_special_tokens=False,
                )
                tokenized_inputs = {k: v.to(DEVICE) for k, v in tokenized_inputs.items()}
                if "token_type_ids" in tokenized_inputs:
                    tokenized_inputs.pop("token_type_ids")

                outputs = self.model(**tokenized_inputs, use_cache=False)
                logits = outputs.logits

                labels = tokenized_inputs["input_ids"]
                labels[labels == self.tokenizer.pad_token_id] = PAD_TOKEN_LABEL_ID

                shift_logits = logits[..., :-1, :].contiguous()
                shift_labels = labels[..., 1:].contiguous()

                loss = self.loss_fct(
                    shift_logits.view(-1, shift_logits.size(-1)),
                    shift_labels.view(-1),
                )
                loss = loss.view(len(logits), -1)
                valid_lengths = (shift_labels != PAD_TOKEN_LABEL_ID).sum(dim=-1)
                sequence_losses = torch.sum(loss, -1) / valid_lengths
                losses.extend(sequence_losses.cpu().tolist())

        return math.exp(losses[0]) if is_single else [math.exp(l) for l in losses]

    def clear_gpu_memory(self):
        if torch.cuda.is_available():
            del self.model
            del self.tokenizer
            gc.collect()
            torch.cuda.empty_cache()

    def score(self, solution: pd.DataFrame, submission: pd.DataFrame, row_id_column_name: str,
              model_path: str, load_in_8bit: bool = True, clear_mem: bool = True) -> float:
        if not all(solution[row_id_column_name] == submission[row_id_column_name]):
            raise ValueError("Row IDs in the solution and submission do not match.")

        sol_counts = solution['text'].str.split().apply(Counter)
        sub_counts = submission['text'].str.split().apply(Counter)
        if not all(sol_counts == sub_counts):
            raise ValueError("Some submitted strings are not valid permutations of the solution strings.")

        scorer = PerplexityCalculator(model_path=model_path, load_in_8bit=load_in_8bit)
        perplexities = scorer.get_perplexity(submission["text"].tolist())

        if clear_mem:
            scorer.clear_gpu_memory()

        return float(np.mean(perplexities))


In [None]:
scorer = PerplexityCalculator('/kaggle/input/gemma-2/transformers/gemma-2-9b/2')

In [None]:
temp_start = 10.0
temp_end = 0.5
cooling_rate = 0.95
steps_per_temp = 5
def calculate_valid_score(arrangement):
    while True:
        score = scorer.get_perplexity(arrangement)
        if not math.isnan(score):
            return score
        random.shuffle(arrangement)

def simulated_annealing_optimize(
        text:str,
        temp_start=temp_start,
        temp_end=temp_end,
        cooling_rate=cooling_rate,
        steps_per_temp=steps_per_temp,
        verbose=False):

    words = text.split()
    current_words = words.copy()
    current_score = calculate_valid_score(text)
    best_score = current_score
    best_words = current_words.copy()

    temp = temp_start

    while temp > temp_end:
        for _ in range(steps_per_temp):
            i, j = random.sample(range(len(words)), 2)
            neighbor = current_words.copy()
            neighbor[i], neighbor[j] = neighbor[j], neighbor[i]
            neighbor_score = scorer.get_perplexity(" ".join(neighbor))

            if math.isnan(neighbor_score):
                continue
            delta = neighbor_score - current_score
            if delta < 0 or random.random() < math.exp(-delta/temp):
                current_score = neighbor_score
                current_words = neighbor
                if current_score < best_score:
                    best_score = current_score
                    best_words = current_words.copy()

        temp *= cooling_rate
        if verbose:
            print(f"Temperatur: {temp:.2f}, Current Score: {current_score:.2f}")

    return ' '.join(best_words), best_score


In [None]:
samples = pd.read_csv("/kaggle/input/santa-claude-output/submission.csv")
submission = pd.DataFrame(columns=["id", "text"])
scores = []
for i, text in samples.iterrows:
    best_words, best_score = simulated_annealing_optimize(text)
    scores.append(best_score)
    submission.iloc[i] = {
        "id": i,
        "text": best_words
    }