Configuration 

In [None]:
config = {
    "max_seq_length": 128,
    "batch_size": 128,
    "model_dir": "drive/MyDrive/final",
    "tokenizer_dir": "drive/MyDrive/final",
    "test_normal": "datasets/5k_bgl_normal.txt",
    "test_abnormal": "datasets/5k_bgl_abnormal.txt",
}

Load the model

In [None]:
from unsloth import FastModel
from transformers import AutoTokenizer
from peft import PeftModel, PeftConfig
tokenizer = AutoTokenizer.from_pretrained(config["tokenizer_dir"])

model, _ = FastModel.from_pretrained(
    config["model_dir"],
    max_seq_length=config["max_seq_length"],
    load_in_4bit=True,  
    resize_model_vocab=len(tokenizer),
    device_map="auto",
)
    
    
model = FastModel.for_inference(model)

from transformers import DataCollatorForLanguageModeling

data_collator = DataCollatorForLanguageModeling(
    tokenizer=tokenizer,
    mlm=False
)

Tokenize the testing datasets

In [None]:
from datasets import load_dataset

dataset_normal = load_dataset("text", data_files=config["test_normal"])
dataset_abnormal = load_dataset("text", data_files=config["test_abnormal"])


def tokenize_function(examples):
    return tokenizer(examples["text"], truncation=True, max_length=config["max_seq_length"])

tokenized_dataset_normal = dataset_normal.map(tokenize_function, batched=True, batch_size=config["batch_size"], remove_columns=["text"])
tokenized_dataset_abnormal = dataset_abnormal.map(tokenize_function, batched=True, batch_size=config["batch_size"], remove_columns=["text"])

Calculate the misses from the testing sequences (PS: Change the K to the one obtained in the get_k file)

In [None]:
import torch
from torch.utils.data import DataLoader
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm
import numpy as np
import torch
import numpy as np
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm

def calculate_topk_miss_rate_optimized(sequences, model, tokenizer, top_k=10, batch_size=256):
    device = model.device
    model.eval()
    all_miss_rates = []

    pad_token_id = tokenizer.pad_token_id or tokenizer.eos_token_id

    with torch.no_grad():
        for i in tqdm(range(0, len(sequences), batch_size), desc=f"Top-{top_k} Miss Rate"):
            batch_seqs = sequences[i:i + batch_size]
            batch_tensors = [torch.tensor(seq, dtype=torch.long) for seq in batch_seqs]

            valid_indices = [idx for idx, t in enumerate(batch_tensors) if len(t) > 2]
            if not valid_indices:
                all_miss_rates.extend([0.0] * len(batch_tensors))
                continue

            batch_tensors = [batch_tensors[idx] for idx in valid_indices]

            input_ids = [t[:-1] for t in batch_tensors]
            labels = [t[1:] for t in batch_tensors]

            input_ids = pad_sequence(input_ids, batch_first=True, padding_value=pad_token_id).to(device)
            labels = pad_sequence(labels, batch_first=True, padding_value=-100).to(device)
            attention_mask = (input_ids != pad_token_id).long()

            with torch.amp.autocast(device_type=device.type, dtype=torch.float16 if device.type == 'cuda' else torch.float32):
                logits = model(input_ids=input_ids, attention_mask=attention_mask).logits

            logits = logits.to(torch.float32)

            if tokenizer.unk_token_id is not None:
                logits[:, :, tokenizer.unk_token_id] = float('-inf')

            actual_k = min(top_k, logits.size(-1))
            topk_preds = torch.topk(logits, k=actual_k, dim=-1).indices  # [B, L, K]

            labels_exp = labels.unsqueeze(-1)  # [B, L, 1]
            correct = (topk_preds == labels_exp).any(dim=-1)  # [B, L]
            valid = labels != -100

            hits = (correct & valid).sum(dim=1)
            total = valid.sum(dim=1)
            miss_rate = 1.0 - (hits.float() / total.float())
            miss_rate = torch.where((miss_rate < 0.01) & (miss_rate > 0), torch.tensor(1, device=device), miss_rate)

            result = [0.0] * len(batch_seqs)
            for j, idx in enumerate(valid_indices):
                result[idx] = miss_rate[j].item()
            
            all_miss_rates.extend(result)

    return np.array(all_miss_rates)


normal_perplexities = calculate_topk_miss_rate_optimized(tokenized_dataset_normal["train"]["input_ids"], model, tokenizer, top_k=177) #change the k to the one calculated in the training
abnormal_perplexities = calculate_topk_miss_rate_optimized(tokenized_dataset_abnormal["train"]["input_ids"], model, tokenizer, top_k=177) #change the k to the one calculated in the training

Calculate restuls (every sequence with a miss are all considered anomalies)

In [None]:
import numpy as np
from sklearn.metrics import precision_recall_curve, f1_score, precision_score, recall_score


all_perplexities = np.concatenate([normal_perplexities, abnormal_perplexities])

normal_labels = np.zeros(len(normal_perplexities), dtype=int)  # 0 for normal
abnormal_labels = np.ones(len(abnormal_perplexities), dtype=int)  # 1 for abnormal
y_true = np.concatenate([normal_labels, abnormal_labels])

threshold = 0.01
y_pred_fixed = (all_perplexities >= threshold).astype(int)
precision_fixed = precision_score(y_true, y_pred_fixed)
recall_fixed = recall_score(y_true, y_pred_fixed)
f1_fixed = f1_score(y_true, y_pred_fixed)

print(f"Fixed Threshold: {threshold:.2f}")
print(f"Precision: {precision_fixed:.4f}")
print(f"Recall: {recall_fixed:.4f}")
print(f"F1-Score: {f1_fixed:.4f}")