Import Library

In [3]:
import pandas as pd
from transformers import GPT2Tokenizer, GPT2LMHeadModel
import torch
import numpy as np
from collections import Counter

Read Data

In [4]:
df_train = pd.read_csv('syntatical_train_features.csv')
df_test = pd.read_csv('syntatical_test_features.csv')


In [5]:
df_train.text.apply(lambda x: len(x.split(' '))).describe()

Unnamed: 0,text
count,40944.0
mean,483.441383
std,720.079079
min,1.0
25%,343.0
50%,419.0
75%,511.0
max,41901.0


Calculating Perplexity

In [6]:
model_name = "gpt2-large"
tokenizer = GPT2Tokenizer.from_pretrained(model_name)
# model = GPT2LMHeadModel.from_pretrained(model_name)
# model = model.to("cuda")
# model.eval()

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [7]:
df_train.shape

(40944, 34)

In [8]:
import gc

In [9]:
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModelForCausalLM
from torch.utils.data import DataLoader


def calculate_dataset_perplexity(dataset, model_name, max_seq_len=1024, batch_size=100, sliding_window=512):
    from torch.nn.utils.rnn import pad_sequence

    # Load tokenizer and model
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(model_name)
    model.eval()  # Evaluation mode
    model = model.to("cuda" if torch.cuda.is_available() else "cpu")
    if tokenizer.pad_token is None:
      tokenizer.pad_token = tokenizer.eos_token

    def collate_fn(batch):
        # Pad sequences to the same length
        return pad_sequence(batch, batch_first=True, padding_value=tokenizer.pad_token_id)

    all_perplexities = []
    # i=0
    for text in dataset:
        # Skip empty or short texts
        # print(text)
        # i+=1
        if len(text.strip()) == 0:
            print("Skipping empty text")
            continue

        # Tokenize the text
        chunks = []
        tokenized = tokenizer(text, return_tensors="pt", truncation=False)
        input_ids = tokenized["input_ids"].squeeze(0)
        if len(input_ids) < max_seq_len:
            chunks.append(input_ids)
        else:
          # Sliding window chunks

          for i in range(0, len(input_ids) - max_seq_len + 1, sliding_window):
              chunks.append(input_ids[i : i + max_seq_len])
          if len(input_ids) > max_seq_len:
              chunks.append(input_ids[-max_seq_len:])  # Add final chunk

        # Create DataLoader for batch processing
        dataloader = DataLoader(chunks, batch_size=batch_size, collate_fn=collate_fn)

        text_perplexities = []
        for batch in dataloader:
            batch = batch.to("cuda")

            # Handle padding tokens for labels
            labels = batch.clone()
            labels[labels == tokenizer.pad_token_id] = -100

            # Compute loss (cross-entropy)
            with torch.no_grad():
                outputs = model(batch, labels=labels)
                loss = outputs.loss

                # Handle NaN loss
                if torch.isnan(loss):
                    print(f"NaN loss encountered for batch. Skipping.")
                    continue

                # Compute perplexity
                perplexity = torch.exp(torch.clamp(loss, max=100))  # Clamp for stability
                text_perplexities.append(perplexity.item())
        gc.collect()
        torch.cuda.empty_cache()

        if text_perplexities:
            # Average perplexity for this text
            all_perplexities.append(np.mean(text_perplexities))

    if all_perplexities:
        # Return average perplexity across the dataset
        return np.mean(all_perplexities), all_perplexities
    else:
        print("No valid texts or chunks processed.")
        return float("nan"), []


In [None]:
model_name = "gpt2"  # Replace with your desired model
_, df_train['gpt2_perplexity'] = calculate_dataset_perplexity(
    df_train['text'].values, model_name, max_seq_len=1024, batch_size=50, sliding_window=512
)


Token indices sequence length is longer than the specified maximum sequence length for this model (2031 > 1024). Running this sequence through the model will result in indexing errors


In [None]:
model_name = "gpt2"  # Replace with your desired model
_, df_test['gpt2_perplexity'] = calculate_dataset_perplexity(
    df_test['text'].values, model_name, max_seq_len=1024, batch_size=100, sliding_window=512
)


In [9]:
import gc
gc.collect()
torch.cuda.empty_cache()

In [8]:
!export HUGGING_FACE_HUB_TOKEN="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"

In [9]:
import os

os.environ["HUGGING_FACE_HUB_TOKEN"] = "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"

In [None]:
model_name = "meta-llama/Llama-3.2-3B"

_, df_train['llama_perplexity'] = calculate_dataset_perplexity(
    df_train['text'].values, model_name, max_seq_len=1024, batch_size=100, sliding_window=512
)

# model_name = "gpt2"  # Replace with your desired model
_, df_test['llama_perplexity'] = calculate_dataset_perplexity(
    df_test['text'].values, model_name, max_seq_len=1024, batch_size=100, sliding_window=512
)

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Calculate Burstiness

In [None]:
def calculate_burstiness(model_name,text,window_size=1024,stride=512):
    tockenizer =  AutoTokenizer.from_pretrained(model_name)
    tokens = tokenizer.encode(text)
    overall_counts = Counter(tokens)
    overall_freq = {token: count / len(tokens) for token, count in overall_counts.items()}

    burstiness_scores = []
    for start_indx in range(0,len(tokens),stride):
        end_indx = min(start_indx+window_size,len(tokens))
        window_tokens = tokens[start_indx:end_indx]
        window_counts = Counter(window_tokens)
        window_freq = {token: count / len(window_tokens) for token, count in window_counts.items()}

        burstiness = sum(abs(overall_freq.get(token, 0) - window_freq.get(token, 0)) for token in window_freq)
        burstiness_scores.append(burstiness)
    return np.mean(burstiness_scores)

