<a href="https://colab.research.google.com/github/priyanshsingh1765/CS626-Autumn-2024/blob/main/Course%20project/project_cafie.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import pandas as pd
from torch.nn.functional import softmax

#libraries for the models
import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from transformers import AutoTokenizer, AutoModelForCausalLM

#for the gui
import ipywidgets as widgets
from IPython.display import Markdown, display
from IPython.display import clear_output

#CAFIE requirements
import re
from tqdm.notebook import tqdm
import math
import argparse

In [2]:
device = "cuda" if torch.cuda.is_available() else "cpu"

# Loading the language models:
- GPT-2 (Small)
- GPT-2 (Large)
- Pythia

In [3]:
# Load GPT-2 large model and tokenizer
gpt_2_l = "gpt2-large"
tokenizer_gpt_2_l = GPT2Tokenizer.from_pretrained(gpt_2_l)
model_gpt_2_l = GPT2LMHeadModel.from_pretrained(gpt_2_l)

# Load GPT-2 small model and tokenizer
gpt_2_s = "gpt2"
tokenizer_gpt_2_s = GPT2Tokenizer.from_pretrained(gpt_2_s)
model_gpt_2_s = GPT2LMHeadModel.from_pretrained(gpt_2_s)

# Load Pythia 70m model and tokenizer
pythia_70m = "EleutherAI/pythia-70m"
tokenizer_pyth_70m = AutoTokenizer.from_pretrained(pythia_70m)
model_pyth_70m = AutoModelForCausalLM.from_pretrained(pythia_70m)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/666 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.25G [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/396 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/2.11M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/99.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/567 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/166M [00:00<?, ?B/s]

# Tokenizer + model output function

In [4]:
def model_predict(model_name, input_text, no_of_next_words = 1):

    if(model_name == "GPT2-Large"):
      model = model_gpt_2_l
      tokenizer = tokenizer_gpt_2_l

    elif(model_name == "GPT2-Small"):
      model = model_gpt_2_s
      tokenizer = tokenizer_gpt_2_s

    elif(model_name == "Pythia"):
      model = model_pyth_70m
      tokenizer = tokenizer_pyth_70m

    else:
        print("Model not found")

    # Tokenize the input text
    input_tokens = tokenizer.encode(input_text, return_tensors="pt")
    # Generate text
    outputs = model.generate(input_tokens, max_length = input_tokens.shape[1] + no_of_next_words, num_return_sequences = 1, pad_token_id=tokenizer.eos_token_id)

    return tokenizer.decode(outputs[0], skip_special_tokens=True)

# Tesing the model outputs

In [5]:
# Define the toggle button for model selection
button_model = widgets.ToggleButtons(
    options=['GPT2-Large', 'GPT2-Small', 'Pythia'],
    description='Model:',
    disabled=False,
    button_style='info',  # 'success', 'info', 'warning', 'danger', or ''
)

# Define a bounded integer input for the number of next words
num_next_words = widgets.BoundedIntText(
    value=1,
    min=1,
    max=15,
    step=1,
    description='# next words:',
    disabled=False
)

# Define a text box for the input text
input_text = widgets.Text(
    value="That woman works as a",
    placeholder="Type your input text here",
    description='Input Text:',
    disabled=False
)

diplay_output_button = widgets.Button(
    description='Predict next n words',
    disabled=False,
    button_style='warning', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Click to get the prediction',
    icon='check'
)

# Define an output area to display the prediction
output_area = widgets.Output()

# Function to handle updates when either the toggle button, text box, or integer input changes
def update_output(change):
    with output_area:
        clear_output()  # Clear previous output
        # Get values from widgets
        selected_model = button_model.value
        next_words = num_next_words.value
        text = input_text.value
        # Get prediction
        prediction = model_predict(selected_model, text, next_words)
        # Display prediction
        print(f"Prediction: {prediction}")

# Link the function to changes in the toggle button, text box, and integer input
diplay_output_button.on_click(update_output)

# Display the widgets
display(button_model, num_next_words, input_text, diplay_output_button, output_area)

ToggleButtons(button_style='info', description='Model:', options=('GPT2-Large', 'GPT2-Small', 'Pythia'), value…

BoundedIntText(value=1, description='# next words:', max=15, min=1)

Text(value='That woman works as a', description='Input Text:', placeholder='Type your input text here')



Output()

# Getting the next token probabilities

In [6]:
def next_token_probabilities(model_name, input_text, top_k_pred = 5, no_of_next_words=1):

    if model_name == "GPT2-Large":
        model = model_gpt_2_l
        tokenizer = tokenizer_gpt_2_l

    elif model_name == "GPT2-Small":
        model = model_gpt_2_s
        tokenizer = tokenizer_gpt_2_s

    elif model_name == "Pythia":
        model = model_pyth_70m
        tokenizer = tokenizer_pyth_70m

    else:
        raise ValueError("Model not found")

    input_tokens = tokenizer.encode(input_text, return_tensors="pt")

    outputs = model.generate(input_tokens, max_length=input_tokens.shape[1] + no_of_next_words, num_return_sequences=1, output_scores=True, return_dict_in_generate=True, pad_token_id=tokenizer.eos_token_id)

    logits = outputs.scores[-1]  # Get the last token's logits (next token logits)
    probs = softmax(logits, dim=-1)

    top_tokens = torch.topk(probs, k= top_k_pred)  # Get top 5 predictions
    top_token_ids = top_tokens.indices[0].tolist()
    top_probabilities = top_tokens.values[0].tolist()

    top_token_strings = tokenizer.decode(top_token_ids).split()
    result_dict = {token : prob*100 for token, prob in zip(top_token_strings, top_probabilities)}

    result = np.zeros((top_k_pred, 2), dtype = object)
    result[:,0] = np.array(list(result_dict.keys()))
    result[:,1] = np.array(list(result_dict.values()))

    return result

In [7]:
# Define the toggle button for model selection
button_model = widgets.ToggleButtons(
    options=['GPT2-Large', 'GPT2-Small', 'Pythia'],
    description='Model:',
    disabled=False,
    button_style='info',  # 'success', 'info', 'warning', 'danger', or ''
)

# Define a bounded integer input for the number of next words
num_top_k_pred = widgets.BoundedIntText(
    value=5,
    min=1,
    max=15,
    step=1,
    description='# top_k_pred:',
    disabled=False
)

# Define a text box for the input text
input_text = widgets.Text(
    value="That woman works as a",
    placeholder="Type your input text here",
    description='Input Text:',
    disabled=False
)

diplay_output_button = widgets.Button(
    description='Predict top n tokens',
    disabled=False,
    button_style='warning', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Click to get the prediction',
    icon='check'
)

# Define an output area to display the prediction
output_area = widgets.Output()

# Function to handle updates when either the toggle button, text box, or integer input changes
def update_output(change=None):
    with output_area:
        clear_output()  # Clear previous output
        # Get values from widgets
        selected_model = button_model.value
        top_k_pred = num_top_k_pred.value
        text = input_text.value
        # Get prediction
        prediction = next_token_probabilities(selected_model, text, top_k_pred)
        # Display prediction
        print(f"Prediction probabilities: \n {prediction}")

# Link the function to changes in the toggle button, text box, and integer input
diplay_output_button.on_click(update_output)

# Display the widgets
display(button_model, num_top_k_pred, input_text, diplay_output_button, output_area)

ToggleButtons(button_style='info', description='Model:', options=('GPT2-Large', 'GPT2-Small', 'Pythia'), value…

BoundedIntText(value=5, description='# top_k_pred:', max=15, min=1)

Text(value='That woman works as a', description='Input Text:', placeholder='Type your input text here')



Output()

# The CAFIE model

Code for the model (Accessed from the official [Github for CAFIE](https://github.com/banerjeepragyan/CAFIE))

In [8]:
class ScoringAlgo:

    def __init__(
        self,
        mdl,
        model_name_path,
        tokenizer,
        _do_sdb,
        ratio,
        scoring_function,
        threshold,
        lmbda,
        alpha_ratio,
        softmax_temperature,
        prompt,
        context,
        l1,
        l2,
        l3,
        act2=[],
        cnt2=[],
        sent_len=10,
        bias_type="none",
        batch_size=1,
        max_seq_length=128,
        gamma=1,
        words_to_ignore = [],
        context_type = 'ab',
    ):
        self._intrasentence_model = mdl
        self.model_name = model_name_path
        self._tokenizer = tokenizer
        self._batch_size = batch_size
        self._max_seq_length = None if self._batch_size == 1 else max_seq_length
        self._bias_type = bias_type
        self.do_sdb = _do_sdb
        self.rat = ratio
        self.sf = scoring_function
        self.thres = threshold
        self.lmbd = lmbda
        self.alpha = alpha_ratio
        self.temperature = softmax_temperature
        self.prmpt = prompt
        self.cntxt = context
        self.w1_words = l1
        self.w1_words_2 = act2
        self.w2_words = l2
        self.w3_wrds2 = l3
        self.w2_words_2 = cnt2
        self.sl = sent_len
        self.gma = gamma
        self.words_ignore = words_to_ignore
        self.ct = context_type

    def __call__(self):
        with torch.no_grad():
            self._intrasentence_model.to(device)
            output, sc, sent, da = self.generate_sentences (self.prmpt, self.sl, self.w1_words, self.w2_words, self.w3_wrds2, self.w1_words_2, self.w2_words_2, self.cntxt, self._bias_type)
        return output, sc, sent, da

    def find_reqd_indices(self, arr1, arr2):
        j=0
        matching_indices1 = []
        matching_indices2 = []
        for i in range(len(arr1)):
            c = 0
            while j < len(arr2) and arr2[j] not in arr1:
                if c==0:
                    matching_indices1.append(i)
                    matching_indices2.append(j)
                c+=1
                j+=1
            if j < len(arr2) and arr2[j]==arr1[i]:
                matching_indices1.append(i)
                matching_indices2.append(j)
                j+=1
        m1 = np.array(matching_indices1)
        m2 = np.array(matching_indices2)
        m1 = np.sort(len(arr1)-1-m1)
        m2 = np.sort(len(arr2)-1-m2)
        return m1.tolist(), m2.tolist()

    def generate_probabilities (self, input_ids, past, attention_mask, use_cache):
        outputs = self._intrasentence_model(input_ids.to(device))
        ll = outputs[0][0, -1, :].to(device)
        probs = torch.nn.functional.softmax(ll/self.temperature, dim=-1).to(device)
        return probs, outputs[1], torch.nn.functional.softmax(outputs[0]/self.temperature, dim=-1), outputs[0]

    def calculate_new_pdf (self, probs_w1, scores, scres2, avg, thres, sf, lmbd, iil):
        dont = torch.zeros(scores.shape).to(device)
        a = scores < thres
        b = scores > -thres
        c = a & b
        scores = torch.where(c, dont, scores)
        scores[:, :, iil] = 0
        a = scres2 < thres
        b = scres2 > -thres
        c = a & b
        scres2 = torch.where(c, dont, scres2)
        scres2[:, :, iil] = 0
        d1 = torch.exp(torch.linalg.vector_norm(scores))
        d2 = torch.exp(torch.linalg.vector_norm(scres2))

        wt1 = (self.gma*torch.tanh(-lmbd*scores))+1
        wt2 = (self.gma*torch.tanh(-lmbd*scres2))+1

        penalised_probs1 = torch.mul(wt1, probs_w1)
        penalised_probs2 = torch.mul(wt2, probs_w1)

        probs_new = (d1/(d1+d2))*penalised_probs1 + (d2/(d1+d2))*penalised_probs2
        return probs_new, (d1+d2)/2.

    def calculate_scores_1 (self, probs_w1, probs_w2, probs_w3, r):
        scores = probs_w1-probs_w2
        scres2 = probs_w1-probs_w3
        return scores, (r*probs_w1)+((1-r)*probs_w2), scres2

    def topk_process(self, outputs, k):
        for i in range(len(outputs[0])):
            logits = outputs[0][i]
            indices_to_remove = logits < torch.topk(logits, k)[0][..., -1, None]
            logits[indices_to_remove] = 0
            outputs[0][i] = logits
        return outputs

    def remove_bad(self, act, sdb, l):
        s_a = sdb[:, -l:, :]
        delta = act - s_a
        pos_mask = delta > 0
        delta[pos_mask] = 1
        delta[~pos_mask] = torch.exp(50*delta[~pos_mask])
        p_hat = torch.mul(delta, act)
        p_sdb = torch.nn.functional.softmax(p_hat/self.temperature, dim=-1)
        return p_sdb

    def beam_search_decoder(self, data, k):
        sequences = [[list(), 0.0]]
        for row in data:
            all_candidates = list()
            for i in range(len(sequences)):
                seq, score = sequences[i]
                for j in range(len(row)):
                    candidate = [seq + [j], score - math.log(row[j])]
                    all_candidates.append(candidate)
        ordered = sorted(all_candidates, key=lambda tup:tup[1])
        sequences = ordered[:k]
        return sequences

    def create_w2 (self, w1_words, w2_words, w3_words, prompt, bt):
        prompt_actual = prompt
        prompt_actual_array = prompt_actual.split()
        prompt_w2_array = []
        prompt_neutrl_array = []
        prompt_w3_array = []
        rel_array = []
        for word in prompt_actual_array:
            if word in w1_words:
                prompt_w2_array.append(w2_words[w1_words.index(word)])
                prompt_w3_array.append(w3_words[w1_words.index(word)])
                rel_array.append(word)
                prompt_neutrl_array.append("they")
            elif word in w2_words:
                prompt_w2_array.append(w1_words[w2_words.index(word)])
                prompt_w3_array.append(w3_words[w2_words.index(word)])
                rel_array.append(word)
                prompt_neutrl_array.append("they")
            elif word in w3_words:
                prompt_w2_array.append(w1_words[w3_words.index(word)])
                prompt_w3_array.append(w2_words[w3_words.index(word)])
                rel_array.append(word)
                prompt_neutrl_array.append("they")
            else:
                prompt_w2_array.append(word)
                prompt_w3_array.append(word)
                prompt_neutrl_array.append(word)
        prompt_w2 = ""
        for word in prompt_w2_array:
            prompt_w2 += word + " "
        prompt_w2 = prompt_w2[:-1]
        prompt_w3 = ""
        for word in prompt_w3_array:
            prompt_w3 += word + " "
        prompt_w3 = prompt_w3[:-1]
        prompt_neutrl = ""
        for word in prompt_neutrl_array:
            prompt_neutrl += word + " "
        prompt_neutrl = prompt_neutrl[:-1]
        rel_words = ""
        for word in rel_array:
            rel_words += word + " "
        rel_words = rel_words[:-1]
        return prompt_actual, prompt_w2, prompt_w3, prompt_neutrl, rel_words

    def fill_blank_biased(self, context):
        r = re.split('(BLANK)', context)
        if r[0]=="":
            contxt = " "
        else:
            contxt = r[0]
        input_ids = self._tokenizer.encode(contxt, return_tensors="pt")
        input_list = input_ids.cpu().detach().numpy().tolist()[0]
        input_lists = [input_list]
        input_ids = torch.LongTensor(input_lists)
        outputs = self._intrasentence_model(input_ids.to(device))
        ll = torch.argmax(torch.nn.functional.softmax(outputs[0][0, -1, :].to(device))).to(device)
        word = self._tokenizer.decode(ll)
        return contxt + word + r[-1]

    def generate_sentences (self, prompt, sent_len, w1_words, w2_words, w3_words, w1_words_2, w2_words_2, context, bt):
        if self.ct == 'ab':
            r = re.split('(BLANK)', context)
            context = r[-1]
        elif self.ct == 'i':
            context = "Fill in the BLANK- " + context
        elif self.ct == 'fill':
            context = self.fill_blank_biased(context)
        elif self.ct == 'rb':
            context = context.replace("BLANK", "")
        else:
            context = ""
        prompt_w1, prompt_w2, prompt_w3, prompt_neutrl, rel_words = self.create_w2 (w1_words, w2_words, w3_words, prompt, bt)
        contxt_w1, contxt_w2, contxt_w3, contxt_neutrl, rel_cntxt = self.create_w2 (w1_words, w2_words, w3_words, context, bt)

        prompt_actual = prompt_w1
        input_ids_w1 = self._tokenizer.encode(prompt_w1, return_tensors="pt")

        input_list_w1 = input_ids_w1.cpu().detach().numpy().tolist()[0]
        input_lists_w1 = [input_list_w1]
        input_ids_w1 = torch.LongTensor(input_lists_w1)
        attention_mask_w1 = input_ids_w1.new_ones(input_ids_w1.shape)

        input_ids_actual = self._tokenizer.encode(prompt_actual, return_tensors="pt")
        input_list_actual = input_ids_actual.cpu().detach().numpy().tolist()[0]
        input_lists_actual = [input_list_actual]
        input_ids_actual = torch.LongTensor(input_lists_actual)

        _,l = input_ids_actual.shape

        input_ids_w2 = self._tokenizer.encode(prompt_w2, return_tensors="pt")
        input_list_w2 = input_ids_w2.cpu().detach().numpy().tolist()[0]
        input_lists_w2 = [input_list_w2]
        input_ids_w2 = torch.LongTensor(input_lists_w2)
        attention_mask_w2 = input_ids_w2.new_ones(input_ids_w2.shape)

        input_ids_w3 = self._tokenizer.encode(prompt_w3, return_tensors="pt")
        input_list_w3 = input_ids_w3.cpu().detach().numpy().tolist()[0]
        input_lists_w3 = [input_list_w3]
        input_ids_w3 = torch.LongTensor(input_lists_w3)
        attention_mask_w3 = input_ids_w3.new_ones(input_ids_w3.shape)

        input_ids_neutrl = self._tokenizer.encode(prompt_neutrl, return_tensors="pt")
        input_list_neutrl = input_ids_neutrl.cpu().detach().numpy().tolist()[0]
        input_lists_neutrl = [input_list_neutrl]
        input_ids_neutrl = torch.LongTensor(input_lists_neutrl)

        input_ids_w1_c = self._tokenizer.encode(contxt_w1, return_tensors="pt")
        input_list_w1_c = input_ids_w1_c.cpu().detach().numpy().tolist()[0]
        input_lists_w1_c = [input_list_w1_c]
        input_ids_w1_c = torch.LongTensor(input_lists_w1_c)

        input_ids_w2_c = self._tokenizer.encode(contxt_w2, return_tensors="pt")
        input_list_w2_c = input_ids_w2_c.cpu().detach().numpy().tolist()[0]
        input_lists_w2_c = [input_list_w2_c]
        input_ids_w2_c = torch.LongTensor(input_lists_w2_c)

        input_ids_w3_c = self._tokenizer.encode(contxt_w3, return_tensors="pt")
        input_list_w3_c = input_ids_w3_c.cpu().detach().numpy().tolist()[0]
        input_lists_w3_c = [input_list_w3_c]
        input_ids_w3_c = torch.LongTensor(input_lists_w3_c)

        past_w1, past_w2, past_w3, past_actual, past_cntr_2, past_neutrl, past_ac_sdb, past_cn_sdb, use_cache = None, None, None, None, None, None, None, None, True
        sent = prompt_w1

        reqd_indices_w1, reqd_indices_w2 = self.find_reqd_indices(torch.flip(input_ids_w1[0], dims=[-1]), torch.flip(input_ids_w2[0], dims=[-1]))
        reqd_in_cntx_w1, reqd_in_cntx_w2 = self.find_reqd_indices(torch.flip(input_ids_w1_c[0], dims=[-1]), torch.flip(input_ids_w2_c[0], dims=[-1]))
        reqd_indices_w3 = reqd_indices_w2
        reqd_in_cntx_w3 = reqd_in_cntx_w2

        if self.ct != 'n':
            input_ids_w1 = torch.cat([input_ids_w1_c, input_ids_w1], dim=-1)
            input_ids_w2 = torch.cat([input_ids_w2_c, input_ids_w2], dim=-1)
            input_ids_w3 = torch.cat([input_ids_w3_c, input_ids_w3], dim=-1)

        ignore_list = []
        for word in self.words_ignore:
            ignore_list.append(' ' + word)
        ignore_ids_list = []
        for word in ignore_list:
            w_ids = self._tokenizer.encode(word)
            for w in w_ids:
                ignore_ids_list.append(w)

        reqd_indices_w1 = [x + len(input_ids_w1_c[0]) for x in reqd_indices_w1]
        reqd_indices_w2 = [x + len(input_ids_w2_c[0]) for x in reqd_indices_w2]
        reqd_indices_w3 = [x + len(input_ids_w3_c[0]) for x in reqd_indices_w3]

        reqd_indices_w1 = reqd_in_cntx_w1 + reqd_indices_w1
        reqd_indices_w2 = reqd_in_cntx_w2 + reqd_indices_w2
        reqd_indices_w3 = reqd_in_cntx_w3 + reqd_indices_w3

        reqd_indices_w1.append(len(input_ids_w1[0])-1)
        reqd_indices_w2.append(len(input_ids_w2[0])-1)
        reqd_indices_w3.append(len(input_ids_w3[0])-1)

        len_w1 = len(input_ids_w1[0])
        len_w2 = len(input_ids_w2[0])
        len_w3 = len(input_ids_w3[0])

        indices_skipped_w1 = []
        indices_skipped_w2 = []
        indices_skipped_w3 = []

        for i in range(len(input_ids_w1[0])):
            if i not in reqd_indices_w1:
                indices_skipped_w1.append(i)

        for i in range(len(input_ids_w2[0])):
            if i not in reqd_indices_w2:
                indices_skipped_w2.append(i)

        for i in range(len(input_ids_w3[0])):
            if i not in reqd_indices_w3:
                indices_skipped_w3.append(i)

        jsp = []

        for i in range(sent_len):
            probs_w1, past_w1, outputs_w1, ro_w1 = self.generate_probabilities (input_ids_w1, past_w1, attention_mask_w1, use_cache)
            probs_w2, past_w2, outputs_w2, ro_w2 = self.generate_probabilities (input_ids_w2, past_w2, attention_mask_w2, use_cache)
            probs_w3, past_w3, outputs_w3, ro_w3 = self.generate_probabilities (input_ids_w3, past_w3, attention_mask_w3, use_cache)

            outputs_w1 = outputs_w1.to(device)

            outputs_w1_trimmed = torch.index_select(outputs_w1.to(device), 1, torch.Tensor(reqd_indices_w1).int().to(device))
            outputs_w2_trimmed = torch.index_select(outputs_w2.to(device), 1, torch.Tensor(reqd_indices_w2).int().to(device))
            outputs_w3_trimmed = outputs_w2_trimmed
            ids_w1_trimmed = torch.index_select(input_ids_w1.to(device), -1, torch.Tensor(reqd_indices_w1).int().to(device)).to(device)
            scores, avg, scres2 = self.calculate_scores_1 (outputs_w1_trimmed, outputs_w2_trimmed, outputs_w3_trimmed, self.rat)

            for idx in range(1,len(ids_w1_trimmed[0])):
                jsp.append(scores[0, idx-1, ids_w1_trimmed[0][idx]].item())

            da = None
            probs_new, da = self.calculate_new_pdf(outputs_w1_trimmed, scores, scres2, avg, self.thres, self.sf, self.lmbd, ignore_ids_list)

            outputs_w1[0][torch.Tensor(reqd_indices_w1).long()] = probs_new.to(device)   #.to(dtype=torch.bfloat16)
            outputs_actual = self.alpha*outputs_w1[:,-l:,:] + (1-self.alpha)*torch.nn.functional.softmax(ro_w1[:, -l:, :]/self.temperature, dim=-1)  #outputs_w1_copy
            logit_id = torch.multinomial(outputs_actual[0][-1], num_samples=1).to(device)[0]
            word = self._tokenizer.decode(logit_id)
            sent += word

            input_ids_w1 = torch.cat([input_ids_w1.to(device), logit_id.to(device).unsqueeze(-1).unsqueeze(-1)], dim=-1)
            attention_mask_w1 = torch.cat([attention_mask_w1, attention_mask_w1.new_ones((attention_mask_w1.shape[0], 1))], dim=-1)
            input_ids_w2 = torch.cat([input_ids_w2.to(device), logit_id.to(device).unsqueeze(-1).unsqueeze(-1)], dim=-1)
            attention_mask_w2 = torch.cat([attention_mask_w2, attention_mask_w2.new_ones((attention_mask_w2.shape[0], 1))], dim=-1)
            input_ids_w3 = torch.cat([input_ids_w3.to(device), logit_id.to(device).unsqueeze(-1).unsqueeze(-1)], dim=-1)
            attention_mask_w3 = torch.cat([attention_mask_w3, attention_mask_w2.new_ones((attention_mask_w3.shape[0], 1))], dim=-1)

            reqd_indices_w1.append(i+len_w1)
            reqd_indices_w2.append(i+len_w2)
            reqd_indices_w3.append(i+len_w3)

        return outputs_actual, np.mean(jsp), sent, da

# Lists of sensitive words

In [9]:
list_1_words = []
list_2_words = []
list_3_words = []
word_path = "list_1.txt" #Path to the word list 1
with open(word_path, "r") as f:
    for line in f:
        list_1_words.append(line[:-1])
word_path = "list_2.txt" #Path to the word list 2
with open(word_path, "r") as f:
    for line in f:
        list_2_words.append(line[:-1])
word_path = "list_3.txt" #Path to the word list 3
with open(word_path, "r") as f:
    for line in f:
        list_3_words.append(line[:-1])

# Model hyperparameters
- $\lambda$ ($\epsilon$ [0, $∞$])- Used to calculate the weight of the counterfacutals in the CAFIE PDF computation.
- $\alpha$ ($\epsilon$ [0, 1]) - Weightage of the CAFIE PDF in the final output PDF.

# Generation function that calls the class scoring_algo

In [10]:
def generator(model, tokenizer, prompt, max_new_tokens=10, hyp_alpha = 0.99, hyp_lambda = 100):

    tokenizer.pad_token = tokenizer.eos_token
    tokenizer.padding_side = "left"

    try:
        runner = ScoringAlgo(
            mdl=model,
            model_name_path='g',
            tokenizer=tokenizer,
            _do_sdb=False, #add sdb prefix to the sentence
            ratio=0.5, #0.5 for avg
            scoring_function="tanh", #Other options- avg, jpdf, arctan, weight
            threshold=0, #bias threshold for scoring
            lmbda=int(hyp_lambda), #when using scoring, hyperparamenter for scoring function (increasing it increses debiasing but reduces LM score)
            alpha_ratio=float(hyp_alpha), #new probs = alpha*debiased_probs + (1-alpha)*vanilla_probs
            softmax_temperature=1, #temperature for vanilla_probs in alpha
            prompt=prompt,
            context="",
            l1=list_1_words,
            l2=list_2_words,
            l3=list_3_words,
            sent_len=max_new_tokens,
            batch_size=1,
            max_seq_length=128,
            bias_type=None,
            words_to_ignore = []
        )
        _, _, gen_sent, _ = runner()
    except:
        gen_sent = prompt
        print("Error in generation")
    return gen_sent

# Comparing CAFIE output to the base language model outputs

In [15]:
#MODEL PREDICT FUNCTION
def model_predict_base_cafie(model_name, use_model, input_text, alpha_hyp, lambda_hyp, no_of_next_words = 1):

    if(model_name == "GPT2-Large"):
      model = model_gpt_2_l
      tokenizer = tokenizer_gpt_2_l

    elif(model_name == "GPT2-Small"):
      model = model_gpt_2_s
      tokenizer = tokenizer_gpt_2_s

    elif(model_name == "Pythia"):
      model = model_pyth_70m
      tokenizer = tokenizer_pyth_70m

    else:
        print("Model not found")

    # Tokenize the input text
    input_tokens = tokenizer.encode(input_text, return_tensors="pt")
    # Generate text
    outputs = model.generate(input_tokens, max_length = input_tokens.shape[1] + no_of_next_words, num_return_sequences = 1, pad_token_id=tokenizer.eos_token_id)

    final_result = tokenizer.decode(outputs[0], skip_special_tokens=True) if use_model == 0 else generator(model, tokenizer, input_text, no_of_next_words, alpha_hyp, lambda_hyp)

    return final_result

###############################################################


#GUI PART
# Define the toggle button for model selection
button_model = widgets.ToggleButtons(
    options=['GPT2-Large', 'GPT2-Small', 'Pythia'],
    description='Model:',
    disabled=False,
    button_style='info',  # 'success', 'info', 'warning', 'danger', or ''
)

base_cafie = widgets.Checkbox(
    value=0,
    description='Apply CAFIE',
    disabled=False,
    indent=False
)

# Define a bounded integer input for the number of next words
num_next_words = widgets.BoundedIntText(
    value=5,
    min=1,
    max=15,
    step=1,
    description='# next words:',
    disabled=False
)

# Define a text box for the input text
input_text = widgets.Text(
    value="That woman works as a",
    placeholder="Type your input text here",
    description='Input Text:',
    disabled=False
)

diplay_output_button = widgets.Button(
    description='Predict next n words',
    disabled=False,
    button_style='warning', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='Click to get the prediction',
    icon='check'
)

alpha_slider = widgets.FloatSlider(
    value=0.99,
    min=0,
    max=1,
    step=0.01,
    description='Alpha:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.2f',
)

lambda_slider = widgets.FloatSlider(
    value=100,
    min=0,
    max=1000,
    step=10,
    description='Test:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.1f',
)

# Define an output area to display the prediction
output_area = widgets.Output()
# Function to handle updates when either the toggle button, text box, or integer input changes
def update_output(change=None):
    with output_area:
        clear_output()  # Clear previous output
        clear_output()
        # Get values from widgets
        selected_model = button_model.value
        use_model = base_cafie.value
        alpha_hyp = alpha_slider.value
        lambda_hyp = lambda_slider.value
        next_words = num_next_words.value
        text = input_text.value
        # Get prediction
        prediction = model_predict_base_cafie(selected_model, use_model, text, alpha_hyp, lambda_hyp, next_words)
        # Display prediction
        print(f"Prediction: {prediction}")

# Link the function to changes in the toggle button, text box, and integer input
diplay_output_button.on_click(update_output)

# Display the widgets
display(button_model, base_cafie, alpha_slider, lambda_slider, num_next_words, input_text, diplay_output_button, output_area)


ToggleButtons(button_style='info', description='Model:', options=('GPT2-Large', 'GPT2-Small', 'Pythia'), value…

Checkbox(value=False, description='Apply CAFIE', indent=False)

FloatSlider(value=0.99, continuous_update=False, description='Alpha:', max=1.0, step=0.01)

FloatSlider(value=100.0, continuous_update=False, description='Test:', max=1000.0, readout_format='.1f', step=…

BoundedIntText(value=5, description='# next words:', max=15, min=1)

Text(value='That woman works as a', description='Input Text:', placeholder='Type your input text here')



Output()