In [1]:
import random
import string
import json
import pandas as pd 
import re
random.seed(123)

In [2]:
import numpy as np
from tqdm import tqdm

In [3]:
def find_word_positions(text:str,
                        words:list[str]):
    # find all occurrences of word in text
    positions = []
    for word in words:
        for m in re.finditer(word, text):
            positions.extend(list(range(m.start(), m.end())))
    return positions

In [4]:
def add_noise(text, threshold, chars, words):
    target_choices = ["char", "word"]
    operation_choices = ["insert", "delete", "substitute"]  
    text_split = text.strip().split()
    noise_text_split = []
    for token in text_split:
        random_float = random.uniform(0, 1)
        if random_float <= threshold:
            target = random.choice(target_choices)
            operation = random.choice(operation_choices)
            if target == "char":
                # protext <answer> </answer> to be untouched by excluding their positions
                protected_positions = find_word_positions(text, ["<answer>", "</answer>", 'python_function', 'necessarily true', 'necessarily false', 'uncertain', 'yes', 'no', "<answer>yes</answer>", '0', '1', '2', '3', '4', '5' '6', '7', '8', '9', 'Python'])
                token_position = random.choice([_ for _ in range(len(token)) if _ not in protected_positions])
                if operation == "insert":
                    token = token[:token_position] + random.choice(chars) + token[token_position:]
                elif operation == "delete":
                    token = token[:token_position] + token[token_position + 1:]
                elif operation == "substitute":
                    token = token[:token_position] + random.choice(chars) + token[token_position + 1:]
            elif target == "word":
                if operation == "insert":
                    token = f"{random.choice(words)} {token}"
                elif operation == "delete":
                    token = ""
                elif operation == "substitute":
                    token = random.choice(words)
        if token:
            noise_text_split.append(str(token))
    return " ".join(noise_text_split)

In [5]:
frequency_data = pd.read_csv("./wordFrequency.csv")
words = frequency_data.lemma.drop_duplicates().tolist()
chars = string.ascii_letters

In [6]:
prompt_dic = {}
for task in ['comprehensive', 'algorithm', 'logic', 'math']:
    prompt_dic[task] = [ele['prompt'] for ele in json.load(open(f'../redial/redial_gold/{task}.json', 'r'))['vanilla']['original']]

In [7]:
perturbed_prompt_dic = dict()

for threshold in [0.02, 0.04, 0.06]:
    perturbed_prompt_dic[threshold] = dict()
    for task, prompts in prompt_dic.items():
        perturbed_prompt_dic[threshold][task] = {'vanilla':{'original':[]}}
        for prompt in prompts:
            perturbed_prompt_dic[threshold][task]['vanilla']['original'].append(add_noise(prompt, threshold, chars, words))

for threshold, task_dic in perturbed_prompt_dic.items():
    for task, data in task_dic.items():
        json.dump(data, open(f'../redial/perturbations/{threshold}/{task}.json', 'w'), indent=4)

# load model

In [None]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
# load your model and tokenizer
tokenizer = AutoTokenizer.from_pretrained("model",
                                          token='token')
model = AutoModelForCausalLM.from_pretrained("model",
                                             device_map='auto',
                                             token='token')

In [9]:
def calculate_perplexity(text:str,
                         model=model,
                         tokenizer=tokenizer):
    # Tokenize input text
    inputs = tokenizer(text, return_tensors='pt')
    
    # Move tensors to the appropriate device
    inputs = {key: value.to(model.device) for key, value in inputs.items()}
    
    # Get model outputs
    with torch.no_grad():
        outputs = model(**inputs, labels=inputs['input_ids'])
    
    # Calculate log-likelihood
    log_likelihood = outputs.loss.item()
    
    # Calculate perplexity
    perplexity = torch.exp(torch.tensor(log_likelihood))
    
    return perplexity.item()

In [None]:
ppl_dic = dict()

for threshold, data in perturbed_prompt_dic.items():
    ppl_dic[threshold] = dict()
    for task, prompts in data.items():
        ppl_dic[threshold][task] = list()
        for prompt in tqdm(prompts['vanilla']['original']):
            ppl_dic[threshold][task].append(calculate_perplexity(prompt))
    print(f"Threshold {threshold} done")
    # print mean ppl of the threshold data
    all_ppls = [v for val in ppl_dic[threshold].values() for v in val]
    print(f'Averaged ppl {round(np.mean(all_ppls), 1)}')