# Instruction-tuning LLMs for Native Language Identification on ICLE

This notebook contains the code for instruction-tuning open-source large language models on the task of Native Language Identification. The code is heavily inspired by https://github.com/unslothai/unsloth and their example notebooks on fine-tuning LLMs. We will be using the Unsloth library to perform 4-bit QLoRA fine-tuning on the ICLE-NLI dataset under stratified 5-fold cross validation. We recommend running this notebook in Google Colaboratory to speed up the fine-tuning process.

In [None]:
# Install packages
!pip install imbalanced-learn
!pip install "unsloth[cu121-ampere-torch230] @ git+https://github.com/unslothai/unsloth.git"

In [None]:
# Mount Google Drive if using Google Colab
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Import packages
from unsloth import FastLanguageModel
from datasets import load_dataset, Dataset
from pydantic import BaseModel, ValidationError, Field
from sklearn.model_selection import StratifiedKFold
import pandas as pd
import numpy as np
import torch
import os
from typing import Literal
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix, f1_score, accuracy_score
from collections import defaultdict
from trl import SFTTrainer
from transformers import TrainingArguments
from datasets import Dataset
from imblearn.under_sampling import RandomUnderSampler
from collections import Counter

max_seq_length = 2048
dtype = torch.bfloat16
load_in_4bit = True # we use 4bit quantization

🦥 Unsloth: Will patch your computer to enable 2x faster free finetuning.


In [None]:
# 4bit pre quantized models supported by Unsloth
fourbit_models = [
    "unsloth/mistral-7b-bnb-4bit",
    "unsloth/llama-2-7b-bnb-4bit",
    "unsloth/gemma-7b-bnb-4bit",
    "unsloth/llama-3-8b-bnb-4bit",
    "unsloth/Phi-3-mini-4k-instruct"
]

run_name = 'finetuned_mistral'
model_name = "unsloth/mistral-7b-bnb-4bit"

model, tokenizer = FastLanguageModel.from_pretrained(
    model_name = model_name,
    max_seq_length = max_seq_length,
    dtype = dtype,
    load_in_4bit = load_in_4bit,
    # token = "hf_...", # use one if using gated models like meta-llama/Llama-2-7b-hf
)

# Add LoRA adapters
# model = FastLanguageModel.get_peft_model(
#       model,
#       r = 16,
#       target_modules = ["q_proj", "k_proj", "v_proj", "o_proj",
#                       "gate_proj", "up_proj", "down_proj",],
#       lora_alpha = 16,
#       lora_dropout = 0,
#       bias = "none",
#       use_gradient_checkpointing = "unsloth",
#       random_state = 777,
#       use_rslora = False,
#       loftq_config = None,
#     )

==((====))==  Unsloth 2024.8: Fast Mistral patching. Transformers = 4.43.3.
   \\   /|    GPU: NVIDIA A100-SXM4-40GB. Max memory: 39.564 GB. Platform = Linux.
O^O/ \_/ \    Pytorch: 2.3.0+cu121. CUDA = 8.0. CUDA Toolkit = 12.1.
\        /    Bfloat16 = TRUE. FA [Xformers = 0.0.26.post1. FA2 = True]
 "-____-"     Free Apache license: http://github.com/unslothai/unsloth
Unsloth: Fast downloading is enabled - ignore downloading bars which are red colored!


In [None]:
def generate_text(prompt):
  """
  Generate text for LLM based on input prompt
  :param prompt: input prompt
  :param max_length:
  :type prompt: str
  :type max_length: int
  """
    # Tokenize the prompt
  inputs = tokenizer(prompt, return_tensors="pt").input_ids.to("cuda")
  outputs = model.generate(inputs,
                           max_new_tokens=10,
                           pad_token_id=tokenizer.eos_token_id,
                           )
    # Decode the response
  response = tokenizer.decode(outputs[0], skip_special_tokens=True)

  return response

def clean_output(output, eos_token, output_only=False):
  """
  This function specifically cleans up the output given by Llama 2 chat,
  to remove the prompt, make sure it is in the correct format, and remove any empty lines.
  :param output: model generated output
  :param labels: all possible classes in TOEFL/ICLE
  :param eos_token: end-of-sequence token to split model output on
  :param output_only, default False: if True, extract only the newly generated output by model and remove the prompt. mostly used for debugging
  :type output: str
  :type labels: list
  :type eos_token: str
  """
  pure_output = output.split(eos_token)
  pure_output = pure_output[-1]
  pure_output = pure_output.strip()
  final_output = pure_output
  if output_only==False: # whether to extract only json-formatted string in the output or not
    predicted_classes=0
    language_class_dict = {'arabic': 'ARA',
                            'bulgarian': 'BUL',
                             'chinese': 'CHI',
                             'czech': 'CZE',
                             "french": "FRE",
                             "german": "GER",
                             "hindi": "HIN",
                             "italian": "ITA",
                             "japanese": "JPN",
                             "korean": 'KOR',
                             "spanish": "SPA",
                             "telugu": "TEL",
                             "turkish": "TUR",
                             "russian": "RUS",
                             "english": "ENG",
                            'sp': 'SPA'
                             }
    if '}' in final_output:
      x = output.split("}")
      for piece in x:
        if 'native_lang' in piece:
          x = piece.split(":")
          label = x[-1]
          label = label.strip()
          label = label.replace('"', '')
          label = label.replace('\n', '')
          final_output = '{"native_lang":"' + label + '"}'
    if 'Class:' in final_output:
      x = output.split("Class:")
      label = x[-1]
      label = label.strip()
      label = label.replace('"', '')
      label = label.replace('\n', '')
      label = label.replace('.', '')
      final_output = label
    for lang, label in language_class_dict.items():
      if lang in final_output.lower() or label in final_output:
        final_output = '{"native_lang":"' + label + '"}'

  return final_output

prompt_retry_eng = '''
  You previously mistakenly predicted this text as "ENG" (English). The class is NOT English.
  Please classify the native language of the author of the text again.
  '''

def classify(texts, goldlabels, filter_token, closedopen_setting ='closed'):
  '''
  :param texts: list of texts
  :param goldlabels: list of gold labels
  :param dataset: TOEFL or ICLE
  :param filter_token: token to get cleaned output
  :type texts: list
  :type goldlabels: list
  :type dataset: str
  :returns predictions: a list of model predictions
  '''
  predictions = []
  # if closedopen_setting != 'open':
  sys_prompt = prompt_ICLE
  prompt_retry = prompt_retry_ICLE
  all_labels = all_labels_ICLE
  NLI_prediction = NLI_prediction_ICLE
  main_task_prompt = main_task_prompt_ICLE
  count = 1
  for text, gold in zip(texts, goldlabels):
    promptcounter = 0
    while True:
      try:
        fullprompt = "Instruction: " + sys_prompt + '\n\n' + main_task_prompt + '\n\nInput: '+ text + "\nResponse:" # use same prompt template as used in fine-tuning
        output = generate_text(fullprompt) # generate output per text
        output_only = clean_output(output, filter_token, output_only=True)
        # print(output_only)
        final_output = clean_output(output, filter_token)
        validated_response = NLI_prediction.model_validate_json(final_output) # use class to validate json string
        response_dict = validated_response.model_dump() # dump validated response into dict
        predicted_native_lang = response_dict['native_lang'] # get the predicted native language
        if predicted_native_lang == "ENG": # reiterate prompt if model predicts english
          fullprompt = "Instruction: " + sys_prompt + '\n\n' + main_task_prompt + '\n' + prompt_retry_eng + '\n\nInput: '+ text + "\nResponse:"
          promptcounter+=1
          if promptcounter > 4: # try 5 times to reprompt, if still unable to extract predicted label, append other
            response_dict = {'native_lang': 'other'}
            predictions.append('other')
            break
        else:
          predictions.append(predicted_native_lang) # append it to list of predictions
          break
      # print(final_output)
      except ValidationError as e: # if there is a validation error, make model retry
        fullprompt = "Instruction: " + sys_prompt + '\n\n' + main_task_prompt + '\n' + prompt_retry + '\n\nInput: '+ text + "\nResponse:"
        promptcounter +=1
        if promptcounter > 4: # try 5 times to reprompt, if still unable to extract predicted label, append other
          response_dict = {'native_lang': 'other'}
          predictions.append('other')
          break
    print(count, response_dict)
    print('Accuracy:', "{:.2f}".format(accuracy_score(goldlabels[0:count], predictions)*100))
    count +=1
  return predictions

In [None]:
class NLI_prediction_ICLE(BaseModel):
  native_lang: Literal['BUL', 'CHI', 'CZE', 'FRE', 'JPN', 'RUS', 'SPA']

all_labels_ICLE = ['BUL', 'CHI', 'CZE', 'FRE', 'JPN', 'RUS', 'SPA']

prompt_ICLE = '''
  You are a forensic linguistics expert that reads English texts written by non-native authors to classify the native language of the author as one of:
  "BUL": Bulgarian
  "CHI": Chinese
  "CZE": Czech
  "FRE": French
  "JPN": Japanese
  "RUS": Russian
  "SPA": Spanish
  Use clues such as spelling errors, word choice, syntactic patterns, and grammatical errors to decide on the native language of the author.\n\n

  DO NOT USE ANY OTHER CLASS.
  IMPORTANT: Do not classify any input as "ENG" (English). English is an invalid choice.

  Valid output formats:
  Class: "BUL"
  Class: "CHI"
  Class: "CZE"
  Class: "SPA"
  '''

main_task_prompt_ICLE = '''Classify the text above as one of BUL, CHI, CZE, FRE, JPN, RUS, or SPA. Do not output any other class - do NOT choose "ENG" (English). What is the closest native language of the author of this English text from the given list?'''

prompt_retry_ICLE = '''
  Your classification is not in the list of possible languages.
  Please try again and choose only one of the following classes:
  BUL, CHI, CZE, FRE, JPN, RUS, or SPA
  '''

In [None]:
icle = "/content/drive/MyDrive/thesis_NLI/ICLE-NLI-results-final.csv"

alpaca_prompt = '''
### Instruction:
You are a forensic linguistics expert that reads English texts written by non-native authors to classify the native language of the author as one of:
"BUL": Bulgarian
"CHI": Chinese
"CZE": Czech
"FRE": French
"JPN": Japanese
"RUS": Russian
"SPA": Spanish
Use clues such as spelling errors, word choice, syntactic patterns, and grammatical errors to decide on the native language of the author.\n\n

DO NOT USE ANY OTHER CLASS.
IMPORTANT: Do not classify any input as "ENG" (English). English is an invalid choice.

Valid output formats:
Class: "BUL"
Class: "CHI"
Class: "CZE"
Class: "SPA"

Classify the text below as one of BUL, CHI, CZE, FRE, JPN, RUS, or SPA. Do not output any other class - do NOT choose "ENG" (English). What is the closest native language of the author of this English text from the given list?

### Input:
{}

### Response:
{}'''

skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=1)
df_icle = pd.read_csv(icle)
x = df_icle['text'].tolist()
y = df_icle['language'].tolist()

In [None]:
EOS_TOKEN = tokenizer.eos_token # Must add EOS_TOKEN
def formatting_prompts_func(examples):
    inputs       = examples["text"]
    outputs      = examples["language"]
    texts = []
    for input, output in zip(inputs, outputs):
        # Must add EOS_TOKEN, otherwise your generation will go on forever!
        text = alpaca_prompt.format(input, output) + EOS_TOKEN
        texts.append(text)
    return { "text" : texts, }
pass

In [None]:
for ind, rs in zip(range(0,1), [100]): # 3 runs with 3 random seeds if necessary
  count=0
  accuracies=[]
  all_predictions = []
  all_test_index = []
  for tr_index, (train_index, test_index) in enumerate(skf.split(x, y)):
    model, tokenizer = FastLanguageModel.from_pretrained(
        model_name = "unsloth/mistral-7b-bnb-4bit",
        max_seq_length = max_seq_length,
        dtype = dtype,
        load_in_4bit = load_in_4bit,
        )
    model = FastLanguageModel.get_peft_model(
        model,
        r = 16,
        target_modules = ["q_proj", "k_proj", "v_proj", "o_proj",
                      "gate_proj", "up_proj", "down_proj",],
        lora_alpha = 16,
        lora_dropout = 0,
        bias = "none",
        use_gradient_checkpointing = "unsloth",
        random_state = rs,
        use_rslora = False,
        loftq_config = None,
      )
    x_train_fold, x_test_fold = np.array(x)[train_index], np.array(x)[test_index]
    y_train_fold, y_test_fold = np.array(y)[train_index], np.array(y)[test_index]
    train_dict = {'text': x_train_fold, 'language': y_train_fold}
    # perform random undersampling
    rus = RandomUnderSampler(sampling_strategy={'BUL':33, 'CHI':33, 'CZE':33, 'FRE':33, 'JPN':33, 'RUS':33, 'SPA':33}, random_state=0)
    df = pd.DataFrame(train_dict)
    X = df.drop('language',axis=1)
    Y = df['language'].tolist()
    X_resampled, y_resampled = rus.fit_resample(X, Y)
    sample_df = pd.DataFrame({'text': X_resampled['text'].tolist(), 'language': y_resampled})

    dataset = Dataset.from_pandas(sample_df)
    print(Counter(y_resampled))

    #uncomment the following code for the full training set
    # dataset = Dataset.from_dict(train_dict)
    dataset = dataset.map(formatting_prompts_func, batched = True,)
    trainer = SFTTrainer(
      model = model,
      tokenizer = tokenizer,
      train_dataset = dataset,
      dataset_text_field = "text",
      max_seq_length = max_seq_length,
      dataset_num_proc = 2,
      packing = False, # Can make training 5x faster for short sequences.
      args = TrainingArguments(
          per_device_train_batch_size = 4, # The batch size per GPU/TPU core/CPU for training.
          gradient_accumulation_steps = 4,
          warmup_steps = 5,
          #max_steps = 0,
          num_train_epochs=3,
          learning_rate = 1e-4, # originally 2e-4
          fp16 = not torch.cuda.is_bf16_supported(),
          bf16 = torch.cuda.is_bf16_supported(),
          logging_steps = 1,
          optim = "adamw_8bit",
          weight_decay = 0.01,
          lr_scheduler_type = "linear",
          seed = rs,
          output_dir = "outputs",
      ),
      )
    trainer_stats = trainer.train()
    model.save_pretrained(f"/content/drive/MyDrive/thesis_NLI/ICLE/finetuned_mistral_undersampled_{count}") # Local saving
    tokenizer.save_pretrained(f"/content/drive/MyDrive/thesis_NLI/ICLE/finetuned_mistral_undersampled_{count}")
    FastLanguageModel.for_inference(model)
    eos_token = 'Response:'
    results_ICLE = "/content/drive/MyDrive/thesis_NLI/ICLE-NLI-results-final.csv"
    predictions = classify(x_test_fold, y_test_fold, eos_token)
    accuracy = accuracy_score(y_test_fold, predictions)
    accuracy2 = "{:.2f}".format(accuracy*100)

    print(f'-------------Run: {count}')
    print(f'-------------Accuracy: {accuracy2}')
    accuracy=float(accuracy)
    accuracies.append(accuracy)
    all_test_index.extend(test_index.tolist())
    all_predictions.extend(predictions)
    count+=1

  avg_acc=sum(accuracies)/5
  print(f"Average: {avg_acc*100}")
  print(f"standard dev: {np.std(accuracies)}")
  print(accuracies)

#save results to CSV file
  results_dict = {}
  from collections import OrderedDict
  for index, pred in zip(all_test_index, all_predictions):
    results_dict[index] = pred
  print(results_dict)
  sorted_dict = OrderedDict(sorted(results_dict.items()))
  sorted_predictions = list(sorted_dict.values())
  icle = "/content/drive/MyDrive/thesis_NLI/ICLE-NLI-results-final.csv"
  df = pd.read_csv(icle)
  num_columns = len(df.columns)
  # df.drop(column_name, axis=1)
  df.insert(num_columns, run_name, sorted_predictions)
  df.to_csv(icle, index=False)
  cm = confusion_matrix(y, sorted_predictions, labels=all_labels_ICLE)
  cm_display = ConfusionMatrixDisplay(cm, display_labels=all_labels_ICLE).plot()
  cm_display.figure_.savefig(f'/content/drive/MyDrive/thesis_NLI/ICLE_results/{run_name}.png')