In [None]:
import os
import json
import pandas as pd
import numpy as np
from tqdm import tqdm
from scipy import stats
from scipy.spatial.distance import cosine

import collections
import sys
if sys.version_info.major == 3 and sys.version_info.minor >= 10:
    from collections.abc import MutableSet
    collections.MutableSet = collections.abc.MutableSet
    from collections.abc import MutableMapping
    collections.MutableMapping = collections.abc.MutableMapping
else:
    from collections import MutableSet
    from collections import MutableMapping

import transformers
from transformers import AutoTokenizer, AutoModelForCausalLM, AutoConfig, pipeline, BitsAndBytesConfig
from transformers import LlamaForCausalLM, LlamaTokenizer, LlamaConfig
from transformers import StoppingCriteria, StoppingCriteriaList
import torch
import torch.nn.functional as F

import matplotlib.pyplot as plt
import seaborn as sns

import string
import requests as rq
import gc
from huggingface_hub import login

In [None]:
def parse_answer(x):
    try:
        return json.loads(x)
    except Exception:
        return {'answer': '', 'explanation': ''}

def clear_answer(x):
    trns = str.maketrans({'[': None, ']': None, '.': None,})
    return x.translate(trns)

def parse_output(s):
    try:
        return json.loads(s[s.find('{'):s.find('}') + 1])
    except Exception:
        return {'answer': '', 'explanation': ''}

gc.collect()
torch.cuda.empty_cache()

url = "https://raw.githubusercontent.com/snrdrg/kgl/refs/heads/main/CyberMetric-10000-v1.json"
dataset = rq.get(url)
translator = str.maketrans('', '', string.punctuation)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
hf_token = "" #here should be HF token with access to LLAMA family models
login(hf_token)
print('complete')

In [None]:
class StoppingCriteriaEndWith(StoppingCriteria):
    def __init__(self, stop_token, tokenizer):
        self.stop_token = tokenizer.encode(stop_token, add_special_tokens=False)
        self.tokenizer = tokenizer

    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
        if input_ids[0, -len(self.stop_token):].tolist() == self.stop_token:
            return True
        return False


model_id = "meta-llama/Llama-3.2-3B-Instruct"

gc.collect()
torch.cuda.empty_cache()

quant_config = BitsAndBytesConfig(load_in_8bit=True)
config = AutoConfig.from_pretrained(model_id)

tokenizer = AutoTokenizer.from_pretrained(model_id)
data = json.loads(dataset.content)['questions']
tokenizer.pad_token_id = tokenizer.eos_token_id
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    quantization_config=quant_config,
    torch_dtype=torch.float16,
    offload_folder="./offload",
    offload_state_dict=True
)


i = 0

rag_line = '<system>: Using <end> tag is STRICT rule. STRICT rule dont say anything after <end> tag. Give answer to question in following format: {"answer":"<Only letter of answer>", "explanation": "<Explanation>"} <end>'
input_text = rag_line + '\n<user>:' + data[i]['question']
input_text = input_text + 'Possible answers:'

for k,v in data[i]['answers'].items():
    input_text = input_text + k + ':' + v + ';'

input_text = input_text + '\n<assistant>: '
input_ids = tokenizer.encode(input_text, return_tensors="pt", max_length=512, truncation=True).to(model.device)

inputs_attention_mask = input_ids != tokenizer.pad_token_id
stop_token = " <end>"


In [None]:
with torch.no_grad():
    output = model.generate(
        input_ids,
        max_length=512,
        num_return_sequences=1,
        output_logits=True,
        return_dict_in_generate=True,
        attention_mask=inputs_attention_mask,
        stopping_criteria=StoppingCriteriaList([StoppingCriteriaEndWith(stop_token, tokenizer)]),
        pad_token_id=tokenizer.eos_token_id
    )

generated_ids = output.sequences
logits = output.logits

In [None]:
class StoppingCriteriaEndWith(StoppingCriteria):
    def __init__(self, stop_token, tokenizer):
        self.stop_token = tokenizer.encode(stop_token, add_special_tokens=False)
        self.tokenizer = tokenizer

    def __call__(self, input_ids: torch.LongTensor, scores: torch.FloatTensor, **kwargs) -> bool:
        if input_ids[0, -len(self.stop_token):].tolist() == self.stop_token:
            return True
        return False

model_id = "meta-llama/Llama-3.2-3B-Instruct"

gc.collect()
torch.cuda.empty_cache()

quant_config = BitsAndBytesConfig(load_in_8bit=True)
config = AutoConfig.from_pretrained(model_id)

data = json.loads(dataset.content)['questions']
df = pd.DataFrame(columns=['question', 'answer', 'right_answer', 'probs', 'medians', 'means', 'stds', 'modes', 'entropy','prev_dist'])
n = len(data)

tokenizer = AutoTokenizer.from_pretrained(model_id)
model = AutoModelForCausalLM.from_pretrained(
    model_id,
    quantization_config=quant_config,
    torch_dtype=torch.float16,
    offload_folder="./offload",
    offload_state_dict=True
)

print('device:', model.device)
tokenizer.pad_token_id = tokenizer.eos_token_id
f_idx = 0

for i in tqdm(range(n)):
    gc.collect()
    torch.cuda.empty_cache()

    rag_line = '<system>: Using <end> tag is STRICT rule. STRICT rule dont say anything after <end> tag. Give answer to question in following format: {"answer":"<Only letter of answer>", "explanation": "<Explanation>"} <end>'
    input_text = rag_line + '\n<user>:' + data[i]['question']
    input_text = input_text + 'Possible answers:'
    for k,v in data[i]['answers'].items():
        input_text = input_text + k + ':' + v + ';'
    input_text = input_text + '\n<assistant>: '

    tokenizer.pad_token_id = tokenizer.eos_token_id
    input_ids = tokenizer.encode(input_text, return_tensors="pt", max_length=512, truncation=True).to(model.device)
    inputs_attention_mask = input_ids != tokenizer.pad_token_id
    stop_token = " <end>"

    with torch.no_grad():
        output = model.generate(
            input_ids,
            max_length=512,
            num_return_sequences=1,
            output_logits=True,
            return_dict_in_generate=True,
            attention_mask=inputs_attention_mask,
            stopping_criteria=StoppingCriteriaList([StoppingCriteriaEndWith(stop_token, tokenizer)]),
            pad_token_id=tokenizer.eos_token_id
        )

    generated_ids = output.sequences
    logits = output.logits
    log_max = []
    log_med = []
    log_mean = []
    log_std = []
    log_mode = []
    log_entropy = []
    log_similarity = []
    prev_logits = None
    
    for row in logits:
        probs = F.softmax(row, dim=-1).cpu().numpy()
        entropy = -np.sum(probs * np.log(probs + 1e-12))        
        if prev_logits is not None:
            dist = 1 - cosine(row.cpu().numpy()[0], prev_logits[0])
        else:
            dist = 0
        prev_logits = row.cpu().numpy()
        
        lgt = row[-1].to('cpu').numpy()
        log_max.append(lgt.argmax())
        log_med.append(np.median(lgt))
        log_mean.append(lgt.mean())
        log_std.append(lgt.std())
        log_mode.append(stats.mode(lgt)[0])
        log_entropy.append(entropy)
        log_similarity.append(dist)
        
        
    generated_text = tokenizer.decode(generated_ids[0], skip_special_tokens=True)
    if generated_text.startswith(input_text):
        generated_text = generated_text[len(input_text):].strip()

    df.loc[i] = [data[i]['question'], generated_text, data[i]['solution'], log_max, log_med, log_mean, log_std, log_mode, log_entropy, log_similarity]
    if df.shape[0] >= 1000:
        print('Saving chunk', f_idx)        
        df['probs'] = df['probs'].apply(lambda x: str(x))
        df['medians'] = df['medians'].apply(lambda x: str(x))
        df['means'] = df['means'].apply(lambda x: str(x))
        df['stds'] = df['stds'].apply(lambda x: str(x))
        df['modes'] = df['modes'].apply(lambda x: str(x))
        df['entropy'] = df['entropy'].apply(lambda x: str(x))
        df['prev_dist'] = df['prev_dist'].apply(lambda x: str(x))
        df.to_csv('llama323b_8bit_cyber' + str(f_idx) + '.csv', index=False)      
        del df
        gc.collect()
        df = pd.DataFrame(columns=['question', 'answer', 'right_answer', 'probs', 'medians', 'means', 'stds', 'modes', 'entropy','prev_dist'])
        f_idx += 1
        print('Saving complete, continue')
    gc.collect()
    torch.cuda.empty_cache()

if df.shape[0] > 0:
    print('Saving chunk', f_idx)        
    df['probs'] = df['probs'].apply(lambda x: str(x))
    df['medians'] = df['medians'].apply(lambda x: str(x))
    df['means'] = df['means'].apply(lambda x: str(x))
    df['stds'] = df['stds'].apply(lambda x: str(x))
    df['modes'] = df['modes'].apply(lambda x: str(x))
    df['entropy'] = df['entropy'].apply(lambda x: str(x))
    df['prev_dist'] = df['prev_dist'].apply(lambda x: str(x))
    df.to_csv('llama323b_8bit_cyber' + str(f_idx) + '.csv', index=False) 
    df = pd.DataFrame(columns=['question', 'answer', 'right_answer', 'probs'])
    print('Saving complete, continue')

gc.collect()
torch.cuda.empty_cache()
print('Inference finished')

In [None]:
if df.shape[0] > 0:
    print('Saving chunk', f_idx)        
    df['probs'] = df['probs'].apply(lambda x: str(x))
    df['medians'] = df['medians'].apply(lambda x: str(x))
    df['means'] = df['means'].apply(lambda x: str(x))
    df['stds'] = df['stds'].apply(lambda x: str(x))
    df['modes'] = df['modes'].apply(lambda x: str(x))
    df['entropy'] = df['entropy'].apply(lambda x: str(x))
    df['prev_dist'] = df['prev_dist'].apply(lambda x: str(x))
    df.to_csv('llama323b_8bit_cyber' + str(f_idx) + '.csv', index=False) 
    df = pd.DataFrame(columns=['question', 'answer', 'right_answer', 'probs'])
    print('Saving complete, continue')