In [None]:
# установка petals
%pip install -q petals datasets wandb scikit-learn

In [None]:
# импорт необходимых библиотек
from datasets import load_dataset
import torch
from transformers import BloomTokenizerFast 
from petals import DistributedBloomForCausalLM
import re
import random
import ast
from collections import Counter
import json
from statistics import mode

In [None]:
MODEL_NAME = "bigscience/bloom-7b1-petals"

In [None]:
#загрузка датасета
dataset = load_dataset("gsm8k", 'main')

In [None]:
#подготовка токенайзера и модели 
tokenizer = BloomTokenizerFast.from_pretrained(MODEL_NAME)
model = DistributedBloomForCausalLM.from_pretrained(MODEL_NAME)
model = model.cuda()

In [None]:
#Помимо сравнения методов CoT и Self-Consistency CoT можно взять еще несколько "разделений": 
# а) данные с mathematical equation (текст с промежуточным подсчетом в задаче, например, 16 + 4 = 20, и без них 
# б) использовать один и тот же набор промптов для каждого примера в тесте и разные наборы промптов. 
# В связи с тем, что датасет содержит в себе reasoning steps, мы можем попробовать изменять набор промптов

In [None]:
#создание промптов из датасета БЕЗ mathematical equation
def prompt_without_equtation(dataset):
  question = dataset['question']
  answer = list(map(lambda x: re.sub('<<.*>>', '', x), dataset['answer']))
  answer = list(map(lambda x: re.sub('\n####', ' The answer is:', x), answer))
  input_list = []
  prompt = []
  for q in range(len(question)):
    prompt.append('\nQ: ' + question[q] + '\nA: ' + answer[q] + ' \n')
  return prompt

In [None]:
input_prompts = prompt_without_equtation(dataset['train'])

In [None]:
#Случай, где промпты фиксированные для каждого примера в тесте. Количество промптов определялось опираясь на статью https://arxiv.org/abs/2201.11903 
random.seed(42)
number_of_prompts = 8
sample_from_prompts = ' '.join(map(str, random.sample(input_prompts, number_of_prompts)))

In [None]:
#добавление промптов к инпуту  
def add_prompts_to_input(sample_from_prompts, test_question):
  test_input_with_prompts = sample_from_prompts + '\nQ: ' + test_question + '\nA: '
  return(test_input_with_prompts)

In [None]:
#тестовый датасет с фиксированными промптами БЕЗ mathematical equation
test_same_prompts = list(map(lambda x: add_prompts_to_input(sample_from_prompts, x), dataset['test']['question']))

In [None]:
#тестовый датасет с различными промптами БЕЗ mathematical equation
test_diff_prompts = list(map(lambda x: add_prompts_to_input(' '.join(map(str, random.sample(input_prompts, number_of_prompts))), x), dataset['test']['question']))

In [None]:
#создание промптов из датасета С mathematical equation
def prompt_with_equtation(dataset):
  question = dataset['question']
  answer = list(map(lambda x: re.sub('\n####', ' The answer is:', x), dataset['answer']))
  input_list = []
  prompt = []
  for q in range(len(question)):
    prompt.append('\nQ: ' + question[q] + '\nA: ' + answer[q] + ' \n')
  return prompt

In [None]:
input_prompts_with_equtation = prompt_with_equtation(dataset['train'])

In [None]:
#тестовый датасет с фиксированными промптами С mathematical equation
random.seed(42)
sample_from_prompts_with_equtation = ' '.join(map(str, random.sample(input_prompts_with_equtation, number_of_prompts)))
test_same_prompts_with_equtation = list(map(lambda x: add_prompts_to_input(sample_from_prompts_with_equtation, x), dataset['test']['question']))

In [None]:
#тестовый датасет с различными промптами С mathematical equation
test_diff_prompts_with_equtation = list(map(lambda x: add_prompts_to_input(' '.join(map(str, random.sample(input_prompts_with_equtation, number_of_prompts))), x), dataset['test']['question']))

In [None]:
#запускаем генерацию текста для 20 примеров из каждого набора данных
results_greedy = []
for i in [test_same_prompts[:20], test_diff_prompts[:20], test_same_prompts_with_equtation[:20], test_diff_prompts_with_equtation[:20]]:
  predicted_data = []
  for j in i:
    inputs = tokenizer(j, return_tensors="pt")["input_ids"].cuda()
    outputs = model.generate(inputs, max_new_tokens=100)
    predicted_data.append(tokenizer.decode(outputs[0]))
  results_greedy.append(predicted_data)

In [None]:
with open("results_greedy", "w") as fp:
  json.dump(results_greedy, fp)

In [None]:
#запускаем генерацию текста для 20 примеров с методом self_consistency (5 итераций генерации для каждого примера)

results_self_consistency = []
for i in [test_same_prompts[:20], test_diff_prompts[:20], test_same_prompts_with_equtation[:20], test_diff_prompts_with_equtation[:20]]:
  predicted_data = []
  for j in i:
    x = 0
    inputs = tokenizer(j, return_tensors="pt")["input_ids"].cuda()
    self_consistency_options = []
    while x <= 5:
      outputs = model.generate(inputs, max_new_tokens=100, temperature=0.7, do_sample=True)
      decoded_outputs = tokenizer.decode(outputs[0])
      self_consistency_options.append(decoded_outputs)
      x += 1
    predicted_data.append(self_consistency_options)
  results_self_consistency.append(predicted_data)

In [None]:
with open("results_self_consistency", "w") as fp:
  json.dump(results_self_consistency, fp)

In [None]:
#создадим список с ответами из тестового сплита – answers_test
test_dataset = dataset['test']['question'][:20]
answers_test = []
for i in dataset['test']['answer'][:20]:
    answers_test.append(i.split('\n#### ')[-1])

In [None]:
#откроем уже сохраненные данные
with open('results_greedy_test_same_prompts') as f:
    results_greedy_test_same_prompts = f.readlines()
with open('results_greedy_test_diff_prompts') as f:
    results_greedy_test_diff_prompts = f.readlines()   
with open('results_greedy_test_same_prompts_with_equtation') as f:
    results_greedy_test_same_prompts_with_equtation = f.readlines()  
with open('results_greedy_test_diff_prompts_with_equtation') as f:
    results_greedy_test_diff_prompts_with_equtation = f.readlines()  

In [None]:
#итого 4 набора данных для chain of thoughts 
data = [results_greedy_test_same_prompts, 
               results_greedy_test_diff_prompts, 
               results_greedy_test_same_prompts_with_equtation, 
               results_greedy_test_diff_prompts_with_equtation]

In [None]:
#преобразуем данные после сохранения через json.dump обратно в списки
def package_data(result_dataset):
    result_dataset = ast.literal_eval(result_dataset[0])
    result_dataset = result_dataset[0]
    return result_dataset

In [None]:
result_data = list(map(package_data, data))

In [None]:
#вытаскиваем из каждой генерации ответ для тестового примера. 
#Были случаи, когда модель вообще не давала ответ (не генерила The answer is:). Такие кейсы отнесем в УНК. 
def get_answer_from_generated_text(generated_text, question_from_test):
    answer = str()
    result = re.search(r'The answer is: \d+', generated_text.split(question_from_test)[-1])
    try: 
        answer = re.search(r'\d+', result.group(0)).group(0)
    except:
        answer = 'UNK'
    return answer

In [None]:
#получаем ответы из каждой генерации для каждого примера из каждого набора данных
all_answers = []
for dataset in result_data:
    results_chain_of_thoughts = []
    for example in range(len(dataset)):
        results_chain_of_thoughts.append(get_answer_from_generated_text(dataset[example], test_dataset[example]))
    all_answers.append(results_chain_of_thoughts)

In [None]:
#Посчитаем совпадения в ответах на тесте и в сгенерированных примерах. Yes – ответ совпал, no – не совпал. 
#UNK в данном случае также относиться к no, так как ответа на выходе мы не получаем
results_total_chain_of_thoughts = []
for answers in range(len(all_answers)):
    results_by_method = []
    for generated_answer in range(len(all_answers[answers])):
        if all_answers[answers][generated_answer] == answers_test[generated_answer]:
            results_by_method.append('yes')
        else:
            results_by_method.append('no')
    results_total_chain_of_thoughts.append(results_by_method)

In [None]:
#Считаем precision для каждого набора данных 
for i in results_total_chain_of_thoughts:
    precision = int(Counter(i)['yes'] / len(i) * 100)
    print(f'Precision: {precision}%')

In [None]:
#загружаем данные с self_consistency
with open('results_self_consistency') as f:
    results_self_consistency_test_same_prompts = f.readlines()
with open('results_self_consistency_test_diff_prompts') as f:
    results_self_consistency_test_diff_prompts = f.readlines()

In [None]:
data_self_consistency = [results_self_consistency_test_same_prompts,
                        results_self_consistency_test_diff_prompts]

In [None]:
result_data_self_consistency = list(map(package_data, data_self_consistency))

In [None]:
#вытасикваем сгенерированные ответы для каждого примера
all_results = []
for each_dataset in range(len(result_data_self_consistency)):
    answers_by_method_self_consistency = []
    for example in range(len(result_data_self_consistency[each_dataset])):
        generated_answer_for_each_test_text = list(map(lambda x: get_answer_from_generated_text(x, test_dataset[example]), result_data_self_consistency[each_dataset][example]))
        answers_by_method_self_consistency.append(generated_answer_for_each_test_text)
    all_results.append(answers_by_method_self_consistency)

In [None]:
with open("all_results_self_consistency", "w") as fp:
    json.dump(all_results, fp)

In [None]:
#считаем моду у каждого примера из генерации
all_answers_self_consistency = list(map((lambda x: list(map(mode, x))), all_results))

In [None]:
#также я попробовала немного другую реализацию maj vote
#у нас есть случаи, когда ответы состоят только из унков (тогда следует оставить унк как финальный вариант ответа)
#есть же случаи, где мода – унк, однако есть и ответы числовые. Кажется, что лучше взять рандомно какое-то число (в функции первое из списка), чем УНК

#ТЛДР: на выборке это не улучшает результат
def get_maj_answer(answers):
    while 'UNK' in answers: 
        answers.remove('UNK') 
    if answers:
        mode_answer = mode(answers)
    else:
        mode_answer = 'UNK'
    return mode_answer

In [None]:
#Посчитаем совпадения в ответах на тесте и в сгенерированных примерах. Yes – ответ совпал, no – не совпал. 
#UNK в данном случае также относиться к no, так как ответа на выходе мы не получаем
results_total_self_consistency = []
for answers in range(len(all_answers_self_consistency)):
    results_by_method = []
    for generated_answer in range(len(all_answers_self_consistency[answers])):
        if all_answers_self_consistency[answers][generated_answer] == answers_test[generated_answer]:
            results_by_method.append('yes')
        else:
            results_by_method.append('no')
    results_total_self_consistency.append(results_by_method)

In [None]:
#Считаем precision для каждого набора данных из self_consistency 
for i in results_total_self_consistency:
    precision = int(Counter(i)['yes'] / len(i) * 100)
    print(f'Precision: {precision}%')