In [5]:
import os
os.environ["HF_TOKEN"] = "token"

In [None]:
!pip install -U bitsandbytes

In [1]:
import numpy as np
import torch
import transformers
import pandas as pd


ce_loss_fn = torch.nn.CrossEntropyLoss(reduction="none")
softmax_fn = torch.nn.Softmax(dim=-1)


def perplexity(encoding: transformers.BatchEncoding,
               logits: torch.Tensor,
               median: bool = False,
               temperature: float = 1.0):
    shifted_logits = logits[..., :-1, :].contiguous() / temperature
    shifted_labels = encoding.input_ids[..., 1:].contiguous()
    shifted_attention_mask = encoding.attention_mask[..., 1:].contiguous()

    if median:
        ce_nan = (ce_loss_fn(shifted_logits.transpose(1, 2), shifted_labels).
                  masked_fill(~shifted_attention_mask.bool(), float("nan")))
        ppl = np.nanmedian(ce_nan.cpu().float().numpy(), 1)

    else:
        ppl = (ce_loss_fn(shifted_logits.transpose(1, 2), shifted_labels) *
               shifted_attention_mask).sum(1) / shifted_attention_mask.sum(1)
        ppl = ppl.to("cpu").float().numpy()

    return ppl


def entropy(p_logits: torch.Tensor,
            q_logits: torch.Tensor,
            encoding: transformers.BatchEncoding,
            pad_token_id: int,
            median: bool = False,
            sample_p: bool = False,
            temperature: float = 1.0):
    vocab_size = p_logits.shape[-1]
    total_tokens_available = q_logits.shape[-2]
    p_scores, q_scores = p_logits / temperature, q_logits / temperature

    p_proba = softmax_fn(p_scores).view(-1, vocab_size)

    if sample_p:
        p_proba = torch.multinomial(p_proba.view(-1, vocab_size), replacement=True, num_samples=1).view(-1)

    q_scores = q_scores.view(-1, vocab_size)

    ce = ce_loss_fn(input=q_scores, target=p_proba).view(-1, total_tokens_available)
    padding_mask = (encoding.input_ids != pad_token_id).type(torch.uint8)

    if median:
        ce_nan = ce.masked_fill(~padding_mask.bool(), float("nan"))
        agg_ce = np.nanmedian(ce_nan.cpu().float().numpy(), 1)
    else:
        agg_ce = (((ce * padding_mask).sum(1) / padding_mask.sum(1)).to("cpu").float().numpy())

    return agg_ce

In [2]:
from transformers import AutoTokenizer


def assert_tokenizer_consistency(model_id_1, model_id_2):
    identical_tokenizers = (
            AutoTokenizer.from_pretrained(model_id_1).vocab
            == AutoTokenizer.from_pretrained(model_id_2).vocab
    )
    if not identical_tokenizers:
        raise ValueError(f"Tokenizers are not identical for {model_id_1} and {model_id_2}.")
        

In [3]:
from typing import Union

import os
import numpy as np
import torch
import transformers
from transformers import AutoModelForCausalLM, AutoTokenizer

torch.set_grad_enabled(False)

huggingface_config = {
    # Only required for private models from Huggingface (e.g. LLaMA models)
    "TOKEN": os.environ.get("HF_TOKEN", None)
}

# selected using Falcon-7B and Falcon-7B-Instruct at bfloat16
BINOCULARS_ACCURACY_THRESHOLD = 0.9015310749276843  # optimized for f1-score
BINOCULARS_FPR_THRESHOLD = 0.8536432310785527  # optimized for low-fpr [chosen at 0.01%]

DEVICE_1 = "cuda:0" if torch.cuda.is_available() else "cpu"
DEVICE_2 = "cuda:1" if torch.cuda.device_count() > 1 else DEVICE_1


class Binoculars(object):
    def __init__(self,
                 observer_name_or_path: str = "google/gemma-2-2b",
                 performer_name_or_path: str = "google/gemma-2-2b-it",
                 use_bfloat16: bool = True,
                 max_token_observed: int = 512,
                 mode: str = "low-fpr",
                 ) -> None:
        assert_tokenizer_consistency(observer_name_or_path, performer_name_or_path)

        self.change_mode(mode)
        self.observer_model = AutoModelForCausalLM.from_pretrained(
                observer_name_or_path,
                device_map={"": DEVICE_1},
                trust_remote_code=True,
                load_in_4bit=True,  # Enable 4-bit quantization
                bnb_4bit_compute_dtype=torch.float16,  # or torch.bfloat16 based on your preference
                token=huggingface_config["TOKEN"]
            )

        self.performer_model = AutoModelForCausalLM.from_pretrained(
                performer_name_or_path,
                device_map={"": DEVICE_2},
                trust_remote_code=True,
                load_in_4bit=True,  # Enable 4-bit quantization
                bnb_4bit_compute_dtype=torch.float16,
                token=huggingface_config["TOKEN"]
            )


        self.observer_model.eval()
        self.performer_model.eval()

        self.tokenizer = AutoTokenizer.from_pretrained(observer_name_or_path)
        if not self.tokenizer.pad_token:
            self.tokenizer.pad_token = self.tokenizer.eos_token
        self.max_token_observed = max_token_observed

    def change_mode(self, mode: str, custom_threshold: float = None) -> None:
        if mode == "low-fpr":
            self.threshold = BINOCULARS_FPR_THRESHOLD
        elif mode == "accuracy":
            self.threshold = BINOCULARS_ACCURACY_THRESHOLD
        elif custom_threshold is not None:
            self.threshold = custom_threshold
        else:
            raise ValueError(f"Invalid mode: {mode}")


    def _tokenize(self, batch: list[str]) -> transformers.BatchEncoding:
        batch_size = len(batch)
        encodings = self.tokenizer(
            batch,
            return_tensors="pt",
            padding="longest" if batch_size > 1 else False,
            truncation=True,
            max_length=self.max_token_observed,
            return_token_type_ids=False).to(self.observer_model.device)
        return encodings

    @torch.inference_mode()
    def _get_logits(self, encodings: transformers.BatchEncoding) -> torch.Tensor:
        observer_logits = self.observer_model(**encodings.to(DEVICE_1)).logits
        performer_logits = self.performer_model(**encodings.to(DEVICE_2)).logits
        if DEVICE_1 != "cpu":
            torch.cuda.synchronize()
        return observer_logits, performer_logits

    def compute_score(self, input_text: Union[list[str], str]) -> Union[float, list[float]]:
        batch = [input_text] if isinstance(input_text, str) else input_text
        encodings = self._tokenize(batch)
        observer_logits, performer_logits = self._get_logits(encodings)
        ppl = perplexity(encodings, performer_logits)
        x_ppl = entropy(observer_logits.to(DEVICE_1), performer_logits.to(DEVICE_1),
                        encodings.to(DEVICE_1), self.tokenizer.pad_token_id)
        binoculars_scores = ppl / x_ppl
        binoculars_scores = binoculars_scores.tolist()
        return binoculars_scores[0] if isinstance(input_text, str) else binoculars_scores

    def predict(self, input_text: Union[list[str], str]) -> Union[list[str], str]:
        binoculars_scores = np.array(self.compute_score(input_text))
        pred = np.where(binoculars_scores < self.threshold,
                        "Most likely AI-generated",
                        "Most likely human-generated"
                        ).tolist()
        return pred
    def predict_with_confidence(self, input_text: Union[list[str], str]) -> Union[list[tuple], tuple]:
        binoculars_scores = self.compute_score(input_text)
        
        # Handle single string vs list of strings appropriately
        if isinstance(input_text, str):
            # For a single string, binoculars_scores is a scalar
            score = binoculars_scores
            distance_from_threshold = abs(score - self.threshold)
            scaling_factor = 5.0
            confidence = (1 - np.exp(-scaling_factor * distance_from_threshold)) * 100
            prediction = "Most likely AI-generated" if score < self.threshold else "Most likely human-generated"
            return (prediction, confidence)
        else:
            # For a list, convert to numpy array for vectorized operations
            scores = np.array(binoculars_scores)
            distance_from_threshold = abs(scores - self.threshold)
            scaling_factor = 5.0
            confidence = (1 - np.exp(-scaling_factor * distance_from_threshold)) * 100
            predictions = np.where(scores < self.threshold,
                                  "Most likely AI-generated",
                                  "Most likely human-generated"
                                  ).tolist()
            return list(zip(predictions, confidence.tolist()))

In [6]:
bino = Binoculars()

tokenizer_config.json:   0%|          | 0.00/46.4k [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/4.24M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.5M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/636 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/47.0k [00:00<?, ?B/s]

tokenizer.model:   0%|          | 0.00/4.24M [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/17.5M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/636 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/818 [00:00<?, ?B/s]

The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


model.safetensors.index.json:   0%|          | 0.00/24.2k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/3 [00:00<?, ?it/s]

model-00001-of-00003.safetensors:   0%|          | 0.00/4.99G [00:00<?, ?B/s]

model-00002-of-00003.safetensors:   0%|          | 0.00/4.98G [00:00<?, ?B/s]

model-00003-of-00003.safetensors:   0%|          | 0.00/481M [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/168 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/838 [00:00<?, ?B/s]

The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


model.safetensors.index.json:   0%|          | 0.00/24.2k [00:00<?, ?B/s]

Downloading shards:   0%|          | 0/2 [00:00<?, ?it/s]

model-00001-of-00002.safetensors:   0%|          | 0.00/4.99G [00:00<?, ?B/s]

model-00002-of-00002.safetensors:   0%|          | 0.00/241M [00:00<?, ?B/s]

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

generation_config.json:   0%|          | 0.00/187 [00:00<?, ?B/s]

In [7]:
data_path = "/kaggle/input/traindataset/trainData.csv"
df = pd.read_csv(data_path)

In [11]:
import pandas as pd
from collections import Counter

def row_to_values_list(row, columns_to_include):
    """
    Extract values from specified columns in a row, replace 'nan' with 'No response',
    and return them as a list.
    """
    values = []
    for col in columns_to_include:
        value = row[col]
        if pd.isna(value):
            value = 'No response'
        else:
            value = str(value)
        values.append(value)
    return values

def aggregation(predictions):
    """
    Perform maximum voting aggregation on predictions.
    
    Args:
        predictions: List of tuples in the form ("label", confidence_score)
    
    Returns:
        A tuple with the most frequent label and its corresponding highest confidence score.
    """
    label_counts = Counter()
    max_confidence = {"Most likely human-generated": 0, "Most likely AI-generated": 0}
    
    for label, confidence in predictions:
        print(label,confidence)
        label_counts[label] += 1
        max_confidence[label] = max(max_confidence[label], confidence)
    
    # Select the most common label
    most_common_label = label_counts.most_common(1)[0][0]
    
    return most_common_label, max_confidence[most_common_label]

def aiDetector(row_number, df, columns_of_interest, bino):
    """
    Detect AI-generated responses for a specific row using an aggregation function.
    
    Args:
        row_number: Index of the row to analyze
        df: DataFrame containing the survey data
        columns_of_interest: List of columns to extract values from
        bino: Object with methods `predict_with_confidence` and `compute_score`
    
    Returns:
        Aggregated result from the aggregation function.
    """
    row = df.iloc[row_number]
    responses = row_to_values_list(row, columns_of_interest)
    
    predictions = []
    for response in responses:
        label, confidence = bino.predict_with_confidence(response)
        predictions.append((label, confidence))
    
    return aggregation(predictions)

In [12]:
# Example usage
columns_of_interest = ['Likes About Concept', 'Dislikes About Concept', 'Concept Replacement Product', 'Replacement Product 1', 'Replacement Product 2', 'Replacement Product 3']
row_number = 4
result = aiDetector(row_number, df, columns_of_interest, bino)
result

Most likely AI-generated 17.70006455776596
Most likely human-generated 97.99496192050546
Most likely human-generated 99.68653231955265
Most likely human-generated 99.43385593792105
Most likely human-generated 99.99998764613231
Most likely human-generated 99.9999702623447


('Most likely human-generated', 99.99998764613231)