In [None]:
# Install required packages
! pip install --upgrade nltk
! pip install openai evaluate transformers datasets pandarallel tiktoken
! pip install -U flash-attn --no-build-isolation

In [2]:
import numpy as np
import pandas as pd
import tiktoken
import time
import random
import json
import textwrap
from huggingface_hub import login
from pathlib import Path
from sklearn import metrics
from google.colab import drive, userdata
from transformers import AutoConfig, AutoModelForCausalLM, AutoTokenizer

In [None]:
login(token=userdata.get('HF_TOKEN'))

In [None]:
# Mount drive
drive.mount('/content/drive', force_remount=True)
!ls "/content/drive/My Drive"

In [5]:
# Set required variables
dataset_folder_root = Path('/content/drive/My Drive/DP/datasets/csfd')

train_file_path = dataset_folder_root / 'train_top5_withoutOOC.csv'
test_file_path = dataset_folder_root / 'test_top5_withoutOOC.csv'

model_name = "mistralai/Mistral-7B-Instruct-v0.2"

In [None]:
model = AutoModelForCausalLM.from_pretrained(model_name, torch_dtype=torch.float16, attn_implementation="flash_attention_2", device_map="auto")
tokenizer = AutoTokenizer.from_pretrained(model_name)

In [7]:
def count_tokens(text: str) -> int:
  encoding = tokenizer(text)
  return len(encoding.input_ids)


def is_token_count_valid(text: str) -> bool:
  count = count_tokens(text)
  return count > 56 and count < 512

def compute_metrics(ground_truth, predicted):
  acc = metrics.accuracy_score(ground_truth, predicted)
  f1 = metrics.f1_score(ground_truth, predicted, average='macro', zero_division=0)
  precision = metrics.precision_score(ground_truth, predicted, average='macro', zero_division=0)
  recall = metrics.recall_score(ground_truth, predicted, average='macro', zero_division=0)

  return {
      'accuracy': acc * 100,
      'f1': f1 * 100,
      'precision': precision * 100,
      'recall': recall * 100
  }

def evaluate_mistral(results: list[pd.DataFrame]):
  acc_list, f1_list, precision_list, recall_list = [], [], [], []
  for rep_df in results:
    rep_metrics = compute_metrics(rep_df['label'], rep_df['answer'])
    acc_list.append(rep_metrics['accuracy'])
    f1_list.append(rep_metrics['f1'])
    precision_list.append(rep_metrics['precision'])
    recall_list.append(rep_metrics['recall'])

  avg = (round(np.mean(acc_list), 2), round(np.mean(f1_list), 2), round(np.mean(precision_list), 2), round(np.mean(recall_list), 2))
  std = (round(np.std(acc_list), 2), round(np.std(f1_list), 2), round(np.std(precision_list), 2), round(np.std(recall_list), 2))
  return avg, std


def extract_samples(df: pd.DataFrame) -> pd.DataFrame:
  author_names = df['label'].unique().tolist()
  result = pd.DataFrame(columns=['label', 'query_text', 'example_text'])
  for author in author_names:
    text_1, text_2 = df[df['label'] == author]['text'].sample(2)  # Get random 2 text from this author
    result = pd.concat([result, pd.DataFrame([[author, text_1, text_2]], columns=result.columns)], ignore_index=True)
    result = result.sort_values(by=['label'])
    result = result.reset_index(drop=True)
  return result

def create_system_prompt() -> str:
  return textwrap.dedent("""\
    Odpověz pomocí JSON objektu, který obsahuje dva prvky:
    {
      "analysis": Odůvodnění tvojí odpovědi.
      "answer": ID autora analyzovaného textu.
    }"""
  )

  # return textwrap.dedent("""\
  #   Respond with a JSON object including two key elements:
  #   {
  #     "analysis": Reasoning behind your answer.
  #     "answer": The query text's author ID.
  #   }"""
  # )

def create_prompt(query: str, examples: str) -> str:
  return "S ohledem na sadu textů se známými autory a analyzovaný text " + \
         "urči autora analyzovaného textu. Analyzuj styly psaní " + \
         "vstupních textů, přičemž ignoruj rozdíly v tématu a obsahu. " + \
         "Zaměř se na jazykové rysy, jako jsou interpunkce, vzácná " + \
         "slova, přípony, kvantifikátory, " + \
         "humor, sarkasmus, typografické chyby a překlepy. Vstupní texty " + \
         "jsou ohraničeny trojitými zpětnými apostrofy. ```\n\n" + \
         f"Analyzovaný text: {query}\n\n" + \
         f"Texty od potenciálních autorů: {examples}\n```"

  # return "Given a set of texts with known authors and a query text, " + \
  #        "determine the author of the query text. Analyze the writing " + \
  #        "styles of the input texts, disregarding the differences in " + \
  #        "topic and content. Focus on linguistic features such as phrasal " + \
  #        "verbs, modal verbs, punctuation, rare words, affixes, quantities, " + \
  #        "humor, sarcasm, typographical errors, and misspellings. " + \
  #        "The input texts are delimited with triple backticks. ```\n\n" + \
  #        f"Query text: {query}\n\n" + \
  #        f"Texts from potential authors: {examples}\n\n```"


In [None]:
# Create dataset suitable for prompts
df = pd.read_csv(train_file_path)
df = df[["label", "text"]]

print('Original shape:', df.shape)
df = df[df["text"].apply(lambda x: is_token_count_valid(x))]
print('Shape after filtering:', df.shape)

In [None]:
retry_count = 3
retry_delay = 60

reps = 3
rep_responses = []

for _ in range(reps):
  responses = []

  samples = extract_samples(df)
  queries = samples['query_text'].tolist()

  example_texts = {row['label']: row['example_text'] for _, row in samples.iterrows()}
  examples = json.dumps(example_texts, ensure_ascii=False)

  for query in queries:
    messages = [
      {"role": "system", "content": create_prompt(query, examples)},
      {"role": "user", "content": create_system_prompt()}
    ]
    model_inputs = tokenizer.apply_chat_template(messages, return_tensors="pt").to("cuda")
    generated_ids = model.generate(model_inputs, top_p=1.0, max_new_tokens=4096, do_sample=False, pad_token_id=tokenizer.eos_token_id)
    response_str = tokenizer.batch_decode(generated_ids)[0]

    correct_label = samples[samples['query_text'] == query]['label']['label']
    print(f'Response:\n{response_str}\n')
    print(f'Correct label: {correct_label}')

    try:
      response = json.loads(response_str, strict=False)
    except json.JSONDecodeError:
      print("Error while decoding response.")
      response = json.loads("{}")
      response['analysis'] = response_str
      response['answer'] = "error"

    response["query_text"] = query
    response["example_texts"] = examples
    response["label"] = correct_label
    responses.append(response)

  rep_responses.append(pd.DataFrame(responses))


In [None]:
avg, std = evaluate_mistral(rep_responses)

print('AVG:')
print(f'  acc: {avg[0]}, f1: {avg[1]}, precision: {avg[2]}, recall: {avg[3]}')
print('STD:')
print(f'  acc: {std[0]}, f1: {std[1]}, precision: {std[2]}, recall: {std[3]}')
