In [None]:
# pip install peft transformers datasets accelerate bitsandbytes

In [None]:
from datasets import load_dataset
import re
import random
from tqdm import tqdm

import torch
from torch.utils.data import DataLoader, Dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
from transformers import TrainingArguments, Trainer, DataCollatorForLanguageModeling
from peft import get_peft_model, LoraConfig, TaskType
from peft import prepare_model_for_kbit_training
from sklearn.metrics import accuracy_score, balanced_accuracy_score

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Используемое устройство: {device}")

Используемое устройство: cuda


In [None]:
def sample_data(dataset, num_samples_per_class, labels):
    sampled_data = []
    for label in range(len(labels)):
        class_data = [example for example in dataset if example["label"] == label]
        sampled_data.extend(random.sample(class_data, num_samples_per_class))
        random.shuffle(sampled_data)
    return sampled_data

def format_instruction(example):
    instruction = """
    Instruction: Determine the category of the given text (provided below).
    Choose exactly one category from the following options: World, Sports, Business, or Sci/Tech.
    Your output should be a single word representing the selected category."""
    text = example["text"]
    output = labels[example["label"]]
    return {
        "instruction": instruction,
        "text": text,
        "output": output
    }

def make_instruction(dct):
  return f"""
  {dct['instruction']}

    Text: {dct['text']}

    Answer: {dct['output']}"""

def make_instruction_test(dct):
  return f"""
  {dct['instruction']}

    Text: {dct['text']}

    Answer:"""

class CustomDataset(Dataset):
    def __init__(self, tokenized_data):
        self.data = tokenized_data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = {key: val.squeeze(0) for key, val in self.data[idx].items()}
        # Копируем input_ids как метки (если это задача языкового моделирования)
        item['labels'] = item['input_ids'].clone()
        return item

def extract_answer(prediction, valid_categories):
    """
    Извлекает категорию из текста предсказания модели с учетом нижнего регистра.

    :param prediction: Текст предсказания
    :param valid_categories: Список допустимых категорий
    :return: Извлечённая категория или None, если не удалось найти валидное значение
    """
    # Приведение допустимых категорий к нижнему регистру
    normalized_categories = [cat.lower() for cat in valid_categories]

    # Ищем слово после "Answer:"
    match = re.search(r"Answer:\s*([\w/]+)", prediction)
    if match:
        extracted = match.group(1).strip().lower()  # Приводим к нижнему регистру
        # Проверяем, является ли это слово валидной категорией
        if extracted in normalized_categories:
            return extracted  # Возвращаем в нижнем регистре
    # Если не удалось найти валидное слово, возвращаем первую найденную категорию
    for category in normalized_categories:
        if category in prediction.lower():
            return category
    return None

## Загрузка AG News

In [None]:
labels = ["World", "Sports", "Business", "Sci/Tech"]

In [None]:
dataset = load_dataset("ag_news")

print(dataset)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md:   0%|          | 0.00/8.07k [00:00<?, ?B/s]

train-00000-of-00001.parquet:   0%|          | 0.00/18.6M [00:00<?, ?B/s]

test-00000-of-00001.parquet:   0%|          | 0.00/1.23M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/120000 [00:00<?, ? examples/s]

Generating test split:   0%|          | 0/7600 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 120000
    })
    test: Dataset({
        features: ['text', 'label'],
        num_rows: 7600
    })
})


In [None]:
train_samples = sample_data(dataset["train"], 400, labels)
test_samples = sample_data(dataset["test"], 100, labels)

formatted_train = [format_instruction(sample) for sample in train_samples]
formatted_test = [format_instruction(sample) for sample in test_samples]

# save_to_file(formatted_train, "small_train_data.txt")
# save_to_file(formatted_test, "small_test_data.txt")

In [None]:
print(make_instruction(formatted_train[13]))


  
    Instruction: Determine the category of the given text (provided below). 
    Choose exactly one category from the following options: World, Sports, Business, or Sci/Tech. 
    Your output should be a single word representing the selected category.
  
    Text: New Zafi-D Worm on the Prowl, Alerts MicroWorld MicroWorld Software has cautioned Internet users about a new malicious worm known as the W32/Zafi-D. According to the antivirus and content security software provider, W32/Zafi-D behaves in a typical worm manner.
  
    Answer: Sci/Tech


In [None]:
print(make_instruction_test(formatted_test[13]))


  
    Instruction: Determine the category of the given text (provided below). 
    Choose exactly one category from the following options: World, Sports, Business, or Sci/Tech. 
    Your output should be a single word representing the selected category.
  
    Text: The Red Sox Gaze Ahead After Much Looking Back The Boston Red Sox are already thinking about next year, the year after and, above all, how to avoid another eight-and-a-half-decade drought.
  
    Answer:


## Model downloading

In [None]:
# model_name = "EleutherAI/gpt-neo-125m"
model_name = "EleutherAI/gpt-neo-1.3B"
# model_name = "EleutherAI/gpt-neo-2.7B"
# model_name = "EleutherAI/gpt-j-6b"
# model_name = "EleutherAI/gpt-neox-20b"

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_use_double_quant=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16
)

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name, quantization_config=bnb_config)

model.gradient_checkpointing_enable()
model = prepare_model_for_kbit_training(model)

tokenizer_config.json:   0%|          | 0.00/200 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.35k [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/798k [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/90.0 [00:00<?, ?B/s]

`low_cpu_mem_usage` was None, now default to True since model is quantized.


model.safetensors:   0%|          | 0.00/5.31G [00:00<?, ?B/s]

In [None]:
# print(f"Модель находится на устройстве: {next(model.parameters()).device}")

In [None]:
# Установка токена для заполнения
tokenizer.add_special_tokens({"pad_token": "<PAD>"})
tokenizer.pad_token_id = tokenizer.convert_tokens_to_ids("<PAD>")

model.resize_token_embeddings(len(tokenizer))
model.config.pad_token_id = tokenizer.pad_token_id

max_length=256

# Токенизация данных
tokenized_train = [tokenizer(
    make_instruction(item),
    truncation=True,
    padding="max_length",
    return_tensors="pt",
    max_length=max_length
) for item in formatted_train]

tokenized_test = [tokenizer(
    make_instruction_test(item),
    truncation=True,
    padding="max_length",
    return_tensors="pt",
    max_length=max_length
) for item in formatted_test]

The new embeddings will be initialized from a multivariate normal distribution that has old embeddings' mean and covariance. As described in this article: https://nlp.stanford.edu/~johnhew/vocab-expansion.html. To disable this, use `mean_resizing=False`


In [None]:
# tokenizer.model_max_length

In [None]:
# # formatted_string = make_instruction()
# # inputs = tokenizer(formatted_string, return_tensors="pt", truncation=True, padding=True)
# inputs = tokenized_test[0]
# inputs = {key: val.to(device) for key, val in inputs.items()}
# print(formatted_test[0])

In [None]:
# outputs = model(**inputs)
# logits = outputs.logits

# # print(logits)

# predicted_token_ids = torch.argmax(logits, dim=-1)
# decoded_text = tokenizer.decode(predicted_token_ids[0], skip_special_tokens=True)
# # print(decoded_text)

## Model Training

In [None]:
def calculate_max_steps(num_samples, num_epochs, batch_size, grad_accumulation_steps):
    effective_batch_size = batch_size * grad_accumulation_steps
    steps_per_epoch = num_samples // effective_batch_size
    max_steps = steps_per_epoch * num_epochs
    return max_steps

num_samples = 1600
num_epochs = 2
batch_size = 2
grad_accumulation_steps = 4

max_steps = calculate_max_steps(num_samples, num_epochs, batch_size, grad_accumulation_steps)
print(f"Max steps: {max_steps}")

Max steps: 400


In [None]:
# for name, module in model.named_modules():
#     print(name)

In [None]:
# ["query_key_value"] 20 b
# ["q_proj", "v_proj", "out_proj"] 6b
# ["c_attn", "c_proj"] < 6b

lora_config = LoraConfig(
    task_type="CAUSAL_LM",  # Задача каузального языкового моделирования
    r=8,                         # Ранг матрицы LoRA
    lora_alpha=32,                # Коэффициент масштабирования
    lora_dropout=0.05,             # Дропаут LoRA
    target_modules=["c_attn", "c_proj"],  # Модули внимания
    bias="none"
)

peft_model = get_peft_model(model, lora_config)
peft_model.print_trainable_parameters()

trainable params: 1,966,080 || all params: 1,317,543,936 || trainable%: 0.1492


In [None]:
training_args = TrainingArguments(
    per_device_train_batch_size=2,
    gradient_accumulation_steps=4,
    max_steps=400,
    warmup_steps=20,
    learning_rate=1e-4,
    fp16=True,
    logging_steps=5,
    output_dir="outputs",
    optim="paged_adamw_8bit",
    # num_train_epochs=3,
    # lr_scheduler_type='linear'
)


trainer = Trainer(
    model=peft_model,
    train_dataset=CustomDataset(tokenized_train),
    eval_dataset=CustomDataset(tokenized_test),
    args=training_args,
    data_collator=DataCollatorForLanguageModeling(tokenizer, mlm=False),
)
peft_model.config.use_cache = False  # silence the warnings. Please re-enable for inference!

In [None]:
# 741bdface455df61c3a81a9b26fbf9e8e1703a69

trainer.train()

  return fn(*args, **kwargs)


Step,Training Loss
5,11.8697
10,11.5847
15,11.5402
20,11.387
25,10.3325
30,9.574
35,8.3241
40,7.556
45,6.5717
50,6.1402




TrainOutput(global_step=400, training_loss=5.531119890213013, metrics={'train_runtime': 574.4054, 'train_samples_per_second': 5.571, 'train_steps_per_second': 0.696, 'total_flos': 5949462518169600.0, 'train_loss': 5.531119890213013, 'epoch': 2.0})

## Evaluation

In [None]:
# model_to_save = trainer.model.module if hasattr(trainer.model, 'module') else trainer.model  # Take care of distributed/parallel training
# model_to_save.save_pretrained("news_pretrained")

In [None]:
# lora_config = LoraConfig.from_pretrained('news_pretrained')
# model = get_peft_model(model, lora_config)

In [None]:
i = 77

text = make_instruction_test(formatted_test[i])

inputs = tokenizer(text, return_tensors="pt").to(device)
outputs = peft_model.generate(**inputs, max_new_tokens=3, pad_token_id=tokenizer.pad_token_id)
print(tokenizer.decode(outputs[0], skip_special_tokens=False))


  
    Instruction: Determine the category of the given text (provided below). 
    Choose exactly one category from the following options: World, Sports, Business, or Sci/Tech. 
    Your output should be a single word representing the selected category.
  
    Text: Musharraf meets Pope John Paul Pakistan President General Pervez Musharraf met Pope John Paul II, who urged him to adopt a  quot;spirit of dialogue and tolerance quot; in his region.
  
    Answer: World quot;


In [None]:
formatted_test[i]

{'instruction': '\n    Instruction: Determine the category of the given text (provided below). \n    Choose exactly one category from the following options: World, Sports, Business, or Sci/Tech. \n    Your output should be a single word representing the selected category.',
 'text': 'Musharraf meets Pope John Paul Pakistan President General Pervez Musharraf met Pope John Paul II, who urged him to adopt a  quot;spirit of dialogue and tolerance quot; in his region.',
 'output': 'World'}

In [None]:
list_resp = []
for i in tqdm(range(len(formatted_test))):
  text = make_instruction_test(formatted_test[i])

  inputs = tokenizer(text, return_tensors="pt").to(device)
  outputs = peft_model.generate(**inputs,
                                max_new_tokens=5,
                                pad_token_id=tokenizer.pad_token_id)
  list_resp.append(tokenizer.decode(outputs[0], skip_special_tokens=False))

100%|██████████| 400/400 [03:20<00:00,  1.99it/s]


In [None]:
list_preds = []
for pred in list_resp:
  list_preds.append(extract_answer(pred, labels))

list_labels = []
for f in formatted_test:
  list_labels.append(f['output'].lower())

overall_accuracy = accuracy_score(list_labels, list_preds)
weighted_accuracy = balanced_accuracy_score(list_labels, list_preds)
print("Total Accuracy:", overall_accuracy)
print("Weighted Accuracy", weighted_accuracy)

0.875 0.875


In [None]:
# print(formatted_test[0])
# tokenized_test[0]

In [None]:
# print(make_instruction_test(formatted_test[0]))
# print()
# print(f"Right answer is: {formatted_test[0]['output']}")

In [None]:
# random.shuffle(formatted_train)

# # Печать первых 50 элементов
# for item in formatted_train[:50]:
#     print(make_instruction(item))
#     print('-------------------------------')