# Prompt tuning for conspiracy theory identification on Twitter

The following python notebook, executable in Google Colab, allows for efficient prompt tuning of any large language model using pytorch and huggingface's wonderful peft package. It applies prompt-tuning to a small, labelled dataset for 500 examples of potential Bill Gates conspiracy theory propagation on Twitter. This code can be easily adapted to prompt tune any binary classification task by replacing the dataset with custom data.

### Credits

This implementation borrows *heavily* from the [example code](https://github.com/huggingface/peft/blob/main/examples/sequence_classification/Prompt_Tuning.ipynb) on peft's GitHub.


### Speed-up

Using Google Colab's default settings, the model trains very slowly (< 1 iter per s.). To improve speed, consider the following options


*   Purchase Google's premium A100s GPUs (recommended):
*   Reduce model size (e.g., bigscience/bloomz-1b7 --> bigscience/bloomz-560m
*   Reduce prompt length (e.g., num_virtual_tokens = 256 --> 128)


### Notes:

*   The entire repository, such as function definitions, is contained in this Colab file to avoid linking Google Drive or git cloning


In [1]:
!pip install transformers torch datasets accelerate bitsandbytes peft sentencepiece

Collecting datasets
  Downloading datasets-2.18.0-py3-none-any.whl (510 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m510.5/510.5 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting accelerate
  Downloading accelerate-0.28.0-py3-none-any.whl (290 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m290.1/290.1 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting bitsandbytes
  Downloading bitsandbytes-0.43.0-py3-none-manylinux_2_24_x86_64.whl (102.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m102.2/102.2 MB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting peft
  Downloading peft-0.10.0-py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.1/199.1 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
Collecting nvidia-cuda-nvrtc-cu12==12.1.105 (from torch)
  Downloading nvidia_cuda_nvrtc_cu12-12.1.105-py3-none-manylinux1_x86_64.whl (23.7 MB)
[2K     [90m━━━━━━━━

In [2]:
from transformers import AutoModelForCausalLM
from peft import get_peft_config, get_peft_model, PromptTuningInit, PromptTuningConfig, TaskType, PeftType
import torch
from datasets import load_dataset, Dataset
import os
from transformers import AutoTokenizer
from torch.utils.data import DataLoader
from transformers import default_data_collator, get_linear_schedule_with_warmup
from tqdm import tqdm
from collections import Counter

from google.colab import drive
import numpy as np
import pandas as pd
import os

# Define model and tokenizer (declared later)
device = "cuda"
model_name_or_path = "bigscience/bloomz-1b7"  # "bigscience/bloomz-560m, bloomz-1b7, and bloomz-3b bloomz7b"   - Note: Cannot execute with mixed precision
tokenizer_name_or_path = "bigscience/bloomz-1b7" # "bigscience/bloomz-1b7"
batch_size = 1 # Declare batch size for training & test dataloader

Lets start by examining the data. We see there are 500 labelled examples, with a 'content' column and a 'label' column. These tweets were manually labelled by the research team as either endorsing a conspiracy theory involving Bill Gates (label == 1) or not (label == 0).

In [3]:
data_path = 'https://github.com/pvicinanza/llm_prompt_tuning_conspiracies/blob/9aa5db0e7d4d4a1443bbd93fb9a309f71e2d5e8e/data/labelled_gates.csv?raw=true'
tweets = pd.read_csv(data_path, index_col=0)
print(tweets.shape)
tweets.head(10)

(500, 2)


Unnamed: 0_level_0,content,label
tweet_id,Unnamed: 1_level_1,Unnamed: 2_level_1
i1268563638165688320,even the uk govt and who disagree with you *in...,1
i1285059389024382976,its naziism all over again. that exactly what ...,1
i1249743872462401537,gates' vaccines have killed people. who's the ...,1
i1253061296418766848,"i dont this this will fly, the surgeon general...",1
i1271149485108928515,#billgates & his sidekick #drfauci r greed mon...,1
i1251687805220261891,santa clara is a hot spot seems gates likes th...,0
i1268558303895760896,#billgates admits there will be inadequate saf...,1
i1248377971284389897,wtf!!!! no wonder hospitals are fighting for v...,1
i1265606969823498240,who is bill gates?! must see documentary by ja...,0
i1231717016958009344,damnit! mother f*****! son of a b****! i can't...,0


Define the training parameters

In [4]:
# The intialized prompt with a single-shot learning example
# The tweet to classify is appended to the prompt text after 'Tweet: '
prompt_text = '''Does the following tweets endorse a conspiracy theory involving bill gates? Common conspiracy hashtags include #agenda21 and #billgatesbioterrorist. Answer yes or no.

Tweet: Bill Gates is installing #5g towers to infect us with microradation #5gkills
Answer: Yes

Tweet: '''

do_lowercase=True                # Whether to lowercase the data. Found lowercase improves performance on Twitter data
dataset_name = 'labelled_gates'  # Name of the dataset (stored as csv)
text_column = 'content'          # Column name of text in dataset
label_column = 'label'           # Column name of classification label
text_label_column = "text_label" # Column name of text label
pos_label = ' Yes'               # Define positive label
neg_label = ' No'                # Define negative label
num_virtual_tokens = 256         # Number of tokens for prompt tuning
max_length = num_virtual_tokens + 256  # Max length for model

# Configuration settings for prompt tuning
peft_config = PromptTuningConfig(
    task_type=TaskType.CAUSAL_LM,
    prompt_tuning_init=PromptTuningInit.TEXT,
    num_virtual_tokens=num_virtual_tokens,
    prompt_tuning_init_text=prompt_text,
    tokenizer_name_or_path=model_name_or_path,
)
checkpoint_name = f"{dataset_name}_{model_name_or_path}_{peft_config.peft_type}_{peft_config.task_type}_v1.pt".replace(
    "/", "_"
)

if do_lowercase:
    prompt_text = prompt_text.lower()
    pos_label = pos_label.lower()
    neg_label = neg_label.lower()

In [5]:
# Read in labeled conspiracy tweets data
tweets[text_label_column] = np.where(tweets[label_column] == 1, pos_label, np.where(tweets[label_column] == 0, neg_label, np.nan))
classes = [k.replace("_", " ") for k in tweets[text_label_column].unique()]

# Add the last part of the prompt
tweets[text_column] = tweets[text_column].apply(lambda x : x + '\nAnswer:')

# Split between classified and unclassified tweets
tweets_unclassified = tweets[tweets[label_column].isna()]
tweets = tweets[tweets[label_column].notnull()].reset_index(drop=True)   # Reset index necessary to avoid error in dataloader
tweets = tweets[[text_column, label_column, text_label_column]]

print(tweets.shape)
tweets.head()

(500, 3)


Unnamed: 0,content,label,text_label
0,even the uk govt and who disagree with you *in...,1,yes
1,its naziism all over again. that exactly what ...,1,yes
2,gates' vaccines have killed people. who's the ...,1,yes
3,"i dont this this will fly, the surgeon general...",1,yes
4,#billgates & his sidekick #drfauci r greed mon...,1,yes


In [6]:
# Split between train and eval data
# Offers downsampling of negative class to handle class imbalance
  # Necessary for rare conspiracy theories

percentage_train = 0.8
n_downsample = 0

cut = int(len(tweets) * percentage_train)
tweets = tweets.sample(frac=1, random_state=42)

train_df = tweets[:cut]
train_pos = train_df[train_df['label'] == 1]
train_neg = train_df[train_df['label'] == 0]
train_neg = train_neg.sample(n = len(train_neg) - n_downsample, random_state=42)
train_df = pd.concat([train_pos, train_neg]).sample(frac=1, random_state=42).reset_index(drop=True)

train_dataset = Dataset.from_pandas(train_df)
eval_dataset = Dataset.from_pandas(tweets[cut:])

# Class imbalance
pos_percent = sum([i == pos_label for i in train_dataset[text_label_column]]) / len(train_dataset) * 100
print(f'Training: {pos_percent}% positive in {len(train_dataset)} examples')
pos_percent = sum([i == pos_label for i in eval_dataset[text_label_column]]) / len(eval_dataset) * 100
print(f'Eval: {pos_percent}% positive in {len(eval_dataset)} examples')

train_dataset

Training: 69.75% positive in 400 examples
Eval: 69.0% positive in 100 examples


Dataset({
    features: ['content', 'label', 'text_label'],
    num_rows: 400
})

In [7]:
# Data preprocessing
tokenizer = AutoTokenizer.from_pretrained(model_name_or_path)
if tokenizer.pad_token_id is None:
    tokenizer.pad_token_id = tokenizer.eos_token_id
target_max_length = max([len(tokenizer(class_label)["input_ids"]) for class_label in classes])

def preprocess_function(examples):
    batch_size = len(examples[text_column])
    inputs = [f"{text_column} : {x} Label : " for x in examples[text_column]]
    targets = [str(x) for x in examples[text_label_column]]
    model_inputs = tokenizer(inputs)
    labels = tokenizer(targets)
    for i in range(batch_size):
        sample_input_ids = model_inputs["input_ids"][i]
        label_input_ids = labels["input_ids"][i] + [tokenizer.pad_token_id]

        model_inputs["input_ids"][i] = sample_input_ids + label_input_ids
        labels["input_ids"][i] = [-100] * len(sample_input_ids) + label_input_ids
        model_inputs["attention_mask"][i] = [1] * len(model_inputs["input_ids"][i])

    for i in range(batch_size):
        sample_input_ids = model_inputs["input_ids"][i]
        label_input_ids = labels["input_ids"][i]
        model_inputs["input_ids"][i] = [tokenizer.pad_token_id] * (
            max_length - len(sample_input_ids)
        ) + sample_input_ids
        model_inputs["attention_mask"][i] = [0] * (max_length - len(sample_input_ids)) + model_inputs[
            "attention_mask"][i]
        labels["input_ids"][i] = [-100] * (max_length - len(sample_input_ids)) + label_input_ids
        model_inputs["input_ids"][i] = torch.tensor(model_inputs["input_ids"][i][:max_length])
        model_inputs["attention_mask"][i] = torch.tensor(model_inputs["attention_mask"][i][:max_length])
        labels["input_ids"][i] = torch.tensor(labels["input_ids"][i][:max_length])
    model_inputs["labels"] = labels["input_ids"]
    return model_inputs

def processDataset(dataset):
  return dataset.map(
    preprocess_function,
    batched=True,
    num_proc=1,
    load_from_cache_file=False,
    desc="Running tokenizer on dataset",
)

train_dataset = processDataset(train_dataset)
eval_dataset = processDataset(eval_dataset)

train_dataloader = DataLoader(train_dataset, shuffle=True, collate_fn=default_data_collator, batch_size=batch_size, pin_memory=True)
eval_dataloader = DataLoader(eval_dataset, collate_fn=default_data_collator, batch_size=batch_size, pin_memory=True)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/222 [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/14.5M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/85.0 [00:00<?, ?B/s]

Running tokenizer on dataset:   0%|          | 0/400 [00:00<?, ? examples/s]

Running tokenizer on dataset:   0%|          | 0/100 [00:00<?, ? examples/s]

In [8]:
def evalModel(eval_preds, eval_labels, num_digits=3):
    '''
    Compute accuracy and f1 for model predictions
    @param eval_preds (list[str]) - List of model predictions
    @param eval_labels (list[str]) - List of ground truth for comparison

    @return dictionary holding metrics
    '''
    if do_lowercase:
        eval_preds = [i.lower() for i in eval_preds]
        eval_labels = [i.lower() for i in eval_labels]

    # Compute evalation metrics
    true_pos, true_neg, false_pos, false_neg = 0, 0, 0, 0
    no_label = []
    for pred, label in zip(eval_preds, eval_labels):
        if pred == pos_label:  # Predict positive
            if label == pos_label:
                true_pos += 1
            else:
                false_pos += 1
        elif pred == neg_label:  # Predict negative
            if label == neg_label:
                true_neg += 1
            else:
                false_neg += 1
        else:   # Not predicting a classification option
            # print(f'{pred:} {label:}')
            no_label.append(pred)

    # Account for potential divide by zero errors
    if (true_pos + false_pos) == 0:
        precision = 0
    else:
        precision = true_pos / (true_pos + false_pos)
    if (true_pos + false_neg) == 0:
        recall = 0
    else:
        recall = true_pos / (true_pos + false_neg)
    if (precision + recall) == 0:
        f1 = 0
    else:
        f1 = 2 * (precision * recall) / (precision + recall)
    accuracy = (true_pos + true_neg) / len(eval_preds)

    if len(no_label):
        non_labelled = Counter(no_label)
        print(f'Non-label predictions: {non_labelled}')

    return ({'accuracy' : round(accuracy, num_digits),
            'f1' : round(f1, num_digits),
            'precision' : round(precision, num_digits),
            'recall' : round(recall, num_digits),
            'true_pos' : true_pos,
            'true_neg' : true_neg,
            'false_pos' : false_pos,
            'false_neg' : false_neg,
            'no_label' : no_label})

In [9]:
def evalModelFull(model, eval_dataset, do_lowercase=do_lowercase):
    results = []
    labels = []
    model.eval()
    for example in tqdm(eval_dataset):
          label = example[text_label_column]
          inputs = tokenizer(example[text_column], return_tensors="pt").to(device)
          outputs = model.generate(input_ids=inputs["input_ids"],
                                   attention_mask=inputs["attention_mask"],
                                   max_new_tokens=1,
                                   eos_token_id=3)[:,-1]  # Get last prediction

          # Note: Modify here if using a batch
          results.extend(tokenizer.batch_decode(outputs.detach().cpu().numpy(), skip_special_tokens=True))
          labels.append(label)
          # print(f'Prediction: {results[-1]} Label: {label}')
    return evalModel(results, labels)

## Single training of the model with optimal hyperparameters

In [10]:
# Declare model hyperparameters
# Note: Batch size declared earlier
patience = 1                 # Number of epochs after best accuracy before early stopping
lr = 3e-4                    # Somewhere between 3e-3 & 3e-4 is the sweet spot for this task
num_epochs = 5               # Number of training epochs
accumulation_steps = 2       # Gradient accumulation steps
early_stop_after_epoch = 5   # Minimum epoch for early stopping

In [11]:
# Create model
# Note: Loading in 8bit or torch_dtype=torch.bfloat16/torch.float16
      # results in nan. Train in 32 bit.
model = AutoModelForCausalLM.from_pretrained(model_name_or_path,
                                             #load_in_8bit=True,
                                             device_map='auto')
model = get_peft_model(model, peft_config)
model = model.to(device)
model.print_trainable_parameters()

config.json:   0%|          | 0.00/715 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/3.44G [00:00<?, ?B/s]

trainable params: 524,288 || all params: 1,722,933,248 || trainable%: 0.030429965908928817


In [12]:
# Delcare optimizer and learning rate scheduler
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
lr_scheduler = get_linear_schedule_with_warmup(
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=(len(train_dataloader) * num_epochs / batch_size),
)

In [13]:
# Training and evaluation with gradient accumulation and early stopping
best_accuracy = 0
epoch_since_best = 0
results = []

for epoch in range(num_epochs):

    # Forward pass through the langauge model
    model.train()
    total_loss = 0
    for step, batch in enumerate(tqdm(train_dataloader)):
        batch = {k: v.to(device) for k, v in batch.items()}
        outputs = model(**batch)
        total_loss += outputs.loss.detach().float()
        loss = outputs.loss / accumulation_steps  # Divide by gradient accumulation steps to adjust loss
        loss.backward()

        if ((step + 1) % accumulation_steps == 0) or (accumulation_steps + 1 == len(train_dataloader)):
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()

    # Compute total loss
    train_epoch_loss = total_loss / (len(train_dataloader) * batch_size)
    train_ppl = torch.exp(train_epoch_loss)

    # Evaluate the model
    model.eval()
    eval_metrics = evalModelFull(model, eval_dataset)
    print(f'''{epoch=}; training loss={train_epoch_loss.detach().cpu()}; accuracy={eval_metrics['accuracy']}; f1={eval_metrics['f1']}; precision={eval_metrics['precision']}; recall={eval_metrics['recall']}''')

    # Consider early stopping
    if (eval_metrics['accuracy'] >= best_accuracy):
        epoch_since_best = 0
        best_accuracy = eval_metrics['accuracy']
        best_metrics = eval_metrics
    else:
        epoch_since_best += 1
        if (epoch_since_best >= patience) & (epoch >= early_stop_after_epoch):
            print(f"Evaluation loss failing to decrease. Executing early stopping with a patience of {patience}.\nFinal accuracy={best_accuracy}")

            # Add outcome to data
            best_metrics['learning rate'] = lr
            best_metrics['accumulation_steps'] = accumulation_steps
            best_metrics['best_epoch'] = epoch - patience
            results.append(best_metrics)

100%|██████████| 400/400 [03:18<00:00,  2.02it/s]
100%|██████████| 100/100 [00:11<00:00,  8.35it/s]


epoch=0; training loss=2.0046310424804688; accuracy=0.7; f1=0.817; precision=0.705; recall=0.971


100%|██████████| 400/400 [03:17<00:00,  2.02it/s]
100%|██████████| 100/100 [00:10<00:00,  9.43it/s]


epoch=1; training loss=0.2594348192214966; accuracy=0.7; f1=0.741; precision=0.915; recall=0.623


100%|██████████| 400/400 [03:17<00:00,  2.02it/s]
100%|██████████| 100/100 [00:10<00:00,  9.42it/s]


epoch=2; training loss=0.19777758419513702; accuracy=0.8; f1=0.846; precision=0.902; recall=0.797


100%|██████████| 400/400 [03:17<00:00,  2.02it/s]
100%|██████████| 100/100 [00:10<00:00,  9.45it/s]


epoch=3; training loss=0.16156356036663055; accuracy=0.8; f1=0.857; precision=0.845; recall=0.87


100%|██████████| 400/400 [03:17<00:00,  2.02it/s]
100%|██████████| 100/100 [00:10<00:00,  9.51it/s]

epoch=4; training loss=0.08989445120096207; accuracy=0.79; f1=0.835; precision=0.914; recall=0.768





And that's all there is too it!

## Classify the full set of tweets

Note: This code is not functional as we cannot release the full dataset. Thus it is purely included as a demonstration

In [None]:
# Here's how to apply the model to a full unlabelled corpus

yes_ind = tokenizer.encode(pos_label)
no_ind = tokenizer.encode(neg_label)
softmax_func=torch.nn.Softmax(dim=0)

def getProbs(model, doc, tokenizer=tokenizer, yes_ind=yes_ind,
             no_ind=no_ind, softmax=softmax_func):
    '''
    Get predicted positive and negative model probabilites for a binary
    classification task.
    '''

    model.eval()

    with torch.no_grad():
        inputs = tokenizer(doc, return_tensors="pt").to(device)
        outputs = model.generate(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"],
            max_new_tokens=1,
            eos_token_id=3,
            return_dict_in_generate=True,
            output_scores=True
        )

    token_probs = softmax(outputs.scores[0][0]).cpu().detach().numpy()

    prob_yes = round(token_probs[yes_ind][0], 3)
    prob_no = round(token_probs[no_ind][0], 3)
    return ([prob_yes, prob_no])

In [None]:
# Assuming tweets loads to a full dataset
tweets = pd.read_csv(f'drive/MyDrive/Colab Notebooks/conspiracy_tweets/data/{dataset_name}.csv').rename(columns={'bert_processed' : 'content'})
tweets[text_label_column] = np.where(tweets['label'] == 1, pos_label, np.where(tweets['label'] == 0, neg_label, np.nan))

# Add the last part of the prompt
tweets[text_column] = tweets[text_column].apply(lambda x : x + '\nAnswer: ')
print(len(tweets))

# Compute probabilities
probs = [getProbs(model, i) for i in tqdm(tweets[text_column])]
tweets = pd.concat([tweets, pd.DataFrame(probs, columns=['yes_prob', 'no_probs'])], axis=1)

# Output to csv
tweets.to_csv(f'drive/MyDrive/Colab Notebooks/conspiracy_tweets/data/model_output/{dataset_name}.csv', index=False)

# Save the prompt tuned output
# Note that this does not save the full model, just the prompt-tuned head!
dataname = dataset_name.split('_')[-1]
peft_model_id = f'drive/MyDrive/Colab Notebooks/conspiracy_tweets/data/model_output/{dataname}'
model.save_pretrained(peft_model_id)