In [2]:
from pathlib import Path
from torch.utils.data import Dataset, DataLoader
import torch
from torch import Tensor, nn
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel

import pandas as pd


In [3]:
!pip install lightning==2.4.0



In [4]:
import lightning as L

In [5]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
torch.manual_seed(42)

<torch._C.Generator at 0x7a4c4403ebf0>

In [6]:
data = pd.read_csv("sample_data/reuters_headlines.csv")

In [7]:
from sklearn.model_selection import train_test_split
train_texts, test_texts = train_test_split(list(data['Headlines']), test_size=0.1, random_state=42)

In [8]:
model_name = "yiyanghkust/finbert-tone"
tokenizer = AutoTokenizer.from_pretrained(model_name)
bert_model = AutoModel.from_pretrained(model_name).to(device)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [9]:
class FinancialNewsDataset(Dataset):
    def __init__(self, texts):
        self.texts = texts

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        return text

In [10]:
train_data = FinancialNewsDataset(train_texts)
test_data = FinancialNewsDataset(test_texts)

In [11]:
len(train_texts)

29493

In [12]:
def collate_fn(
    tokenizer: AutoTokenizer, batch: list[tuple[str, str]]
) -> tuple[Tensor, Tensor]:
    encoded_batch = tokenizer.batch_encode_plus(
        batch, padding="longest", return_tensors="pt", return_token_type_ids=False)
    return encoded_batch.to(device)

In [13]:
len(tokenizer)

30873

In [14]:
train_loader = DataLoader(train_data, batch_size=128, shuffle=True, collate_fn=lambda batch:collate_fn(tokenizer,batch))
test_loader = DataLoader(test_data, batch_size=128, shuffle=True, collate_fn=lambda batch:collate_fn(tokenizer,batch))
batch = next(iter(train_loader))

In [15]:
class AttentionPooling(nn.Module):
    def __init__(self, hidden_dim, output_dim):
        super().__init__()
        self.attn = nn.Linear(hidden_dim, 1)
        self.A_v = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, mask=None):
        attn_scores = self.attn(x).squeeze(-1)  # (B, T)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        attn_weights = torch.softmax(attn_scores, dim=1)  # (B, T)
        return torch.sum(attn_weights.unsqueeze(-1) * self.A_v(x), dim=1)  # (B, H)


at = AttentionPooling(768, 1024).to(device)

In [16]:
x= bert_model(batch['input_ids'], attention_mask=batch['attention_mask']).last_hidden_state

In [17]:
at(x).shape

torch.Size([128, 1024])

In [18]:
import math


class EncoderDecoderModel(nn.Module):
    def __init__(self, bert_model, hidden_dim=768, num_layers=2, nhead=4, max_length=100, vocab_size=len(tokenizer),dropout=1e-4, sent_dim=2048):
        super().__init__()
        self.bert = bert_model  # ЭНКОДЕР (BERT)
        self.hidden_dim = hidden_dim
        self.max_length = max_length
        self.vocab_size = vocab_size
        self.sent_dim = sent_dim


        self.attn_pool = AttentionPooling(hidden_dim, sent_dim)
        # Линейное отображение из энкодера
        #self.projection = nn.Linear(hidden_dim, hidden_dim)

        # Декодер
        decoder_layer = nn.TransformerDecoderLayer(d_model=sent_dim, nhead=nhead, dropout=dropout, batch_first=True)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers)

        # Выходной слой для предсказания токенов
        self.fc_out = nn.Linear(sent_dim, vocab_size)

        # Генерация синусоидальных позиционных эмбеддингов
        self.register_buffer("positional_encoding", self.sinusoidal_positional_encoding(max_length, sent_dim))

    def sinusoidal_positional_encoding(self, seq_length, hidden_dim):
        position = torch.arange(seq_length).unsqueeze(1).float()  # (T, 1)
        div_term = torch.exp(torch.arange(0, hidden_dim, 2).float() * (-math.log(10000.0) / hidden_dim))

        pe = torch.zeros(seq_length, hidden_dim)
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        return pe.unsqueeze(0)  # (1, T, H)

    def forward(self, input_ids, attention_mask):
        batch_size, seq_length = input_ids.shape  # (B, T)

        # ПРОГОН ЧЕРЕЗ ЭНКОДЕР (BERT)
        with torch.no_grad():
            encoded = self.bert(input_ids, attention_mask=attention_mask).last_hidden_state  # (B, T, H)

        # Усредняем эмбеддинги, чтобы получить глобальное представление
        text_embedding = self.attn_pool(encoded, attention_mask)  # (B, S)

        # Декодерные входы (позиционные эмбеддинги)
        tgt = torch.zeros((batch_size, seq_length, self.sent_dim), device=input_ids.device)  # (B, T, S)

        # Добавляем синусоидальные позиции
        tgt = tgt + self.positional_encoding[:, :seq_length, :]  # (B, T, S)

        # ПРОГОН ЧЕРЕЗ ДЕКОДЕР
        memory = text_embedding.unsqueeze(1)  # (B, 1, S)
        decoder_output = self.transformer_decoder(tgt, memory, tgt_key_padding_mask=(attention_mask==0))  # (B, T, S)



        # ПРОГОН ЧЕРЕЗ ВЫХОДНОЙ ЛИНЕЙНЫЙ СЛОЙ
        token_logits = self.fc_out(decoder_output)  # (B, T, V)

        return token_logits  # (B, T, V)

In [19]:
class LightningAutoencoder(L.LightningModule):
    def __init__(self, model, learning_rate=1e-4):
        super().__init__()
        self.model = model
        self.criterion = nn.CrossEntropyLoss(ignore_index=0)
        self.learning_rate = learning_rate

    def forward(self, input_ids, attention_mask):
        return self.model(input_ids, attention_mask)

    def training_step(self, batch, batch_idx):
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]
        outputs = self(input_ids, attention_mask)

        targets = input_ids[:, 1:].contiguous()  # Сдвигаем цель на 1 вправо
        outputs = outputs[:, :-1, :]  # Убираем последний предсказанный токен

        loss = self.criterion(outputs.reshape(-1, self.model.vocab_size), targets.reshape(-1))
        self.log("train_loss", loss, on_step=False, on_epoch=True, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        input_ids = batch["input_ids"]
        attention_mask = batch["attention_mask"]
        outputs = self(input_ids, attention_mask)

        targets = input_ids[:, 1:].contiguous()
        outputs = outputs[:, :-1, :]

        val_loss = self.criterion(outputs.reshape(-1, self.model.vocab_size), targets.reshape(-1))
        self.log("val_loss", val_loss, on_step=False, on_epoch=True, prog_bar=True)
        return {
            "loss": val_loss,
            "preds": outputs,
        }

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate, weight_decay=1e-5)
        # давайте кроме оптимизатора создадим ещё расписание для шага оптимизации
        return {
            "optimizer": optimizer,
            "lr_scheduler": torch.optim.lr_scheduler.MultiStepLR(
                optimizer,
                milestones=[5, 10, 15],
                gamma=0.1,
            ),
        }


In [20]:
#!rm -rf lightning_logs

In [21]:
bert_model = AutoModel.from_pretrained(model_name).to(device)
encoder_decoder_model = EncoderDecoderModel(bert_model).to(device)
model = LightningAutoencoder(encoder_decoder_model).to(device)

In [22]:
model(batch['input_ids'], attention_mask=batch['attention_mask']).shape

torch.Size([128, 27, 30873])

In [23]:
from lightning.pytorch.loggers import TensorBoardLogger

In [24]:
from lightning.pytorch.callbacks import EarlyStopping, ModelCheckpoint

callbacks = [
    ModelCheckpoint(
        filename="{epoch}-{val_loss:.2f}",
        monitor="val_loss",
        mode="min",
        save_top_k=2,
        save_last=True,
    )
]

In [None]:
trainer = L.Trainer(
    max_epochs=15,
    accelerator="auto",
    logger=TensorBoardLogger(save_dir="."),
    callbacks=callbacks
)

trainer.fit(model, train_loader)
trainer.validate(model, test_loader)

INFO: Trainer already configured with model summary callbacks: [<class 'lightning.pytorch.callbacks.model_summary.ModelSummary'>]. Skipping setting a default `ModelSummary` callback.
INFO:lightning.pytorch.utilities.rank_zero:Trainer already configured with model summary callbacks: [<class 'lightning.pytorch.callbacks.model_summary.ModelSummary'>]. Skipping setting a default `ModelSummary` callback.
INFO: GPU available: True (cuda), used: True
INFO:lightning.pytorch.utilities.rank_zero:GPU available: True (cuda), used: True
INFO: TPU available: False, using: 0 TPU cores
INFO:lightning.pytorch.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO: HPU available: False, using: 0 HPUs
INFO:lightning.pytorch.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO: LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO:lightning.pytorch.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO: 
  | Name      | Type                | Params | Mode 
---------------------------

Training: |          | 0/? [00:00<?, ?it/s]

In [None]:
batch = next(iter(test_loader))
tokenizer.batch_decode(batch['input_ids'], skip_special_tokens=True)

In [None]:
model.to(device)

In [None]:
out = model(batch['input_ids'],batch['attention_mask'])
out_tokens = out.argmax(dim=-1)
tokenizer.batch_decode(out_tokens, skip_special_tokens=True)

In [None]:
trainer = L.Trainer(accelerator="auto", logger=False)
bert_model = AutoModel.from_pretrained(model_name).to(device)
encoder_decoder_model = EncoderDecoderModel(bert_model).to(device)
model = LightningAutoencoder(encoder_decoder_model).to(device)

last_checkpoint_path = Path("/content/lightning_logs/version_1/checkpoints/epoch=9-step=5010.ckpt")
trainer.validate(
    model,
    train_loader,
    ckpt_path=last_checkpoint_path,
)