МОДЕЛЬ

In [1]:
class Translator(nn.Module):
    def __init__(self, source_vocab_size, target_vocab_size, sp, tp, hidden_dim=512, n_heads=8, 
                 num_encoder_layers=6, num_decoder_layers=6, dropout=0.1):
        super(Translatorv3, self).__init__()
        self.n_heads = n_heads
        self.target_embeddings = nn.Embedding(target_vocab_size, hidden_dim)
        self.source_embeddings = nn.Embedding(source_vocab_size, hidden_dim)
        
        self.sp = sp
        self.tp = tp
        
        self.transformer = nn.Transformer(
            d_model=hidden_dim, 
            nhead=n_heads, 
            num_encoder_layers=num_encoder_layers, 
            num_decoder_layers=num_decoder_layers, 
            dropout=dropout, 
            batch_first=True
        )
        
        self.linear = nn.Linear(hidden_dim, hidden_dim * 2)
        self.projection = nn.Linear(hidden_dim * 2, target_vocab_size)
        
        self.non_lin = nn.ReLU()
        self.normalization = nn.LayerNorm(hidden_dim * 2)

    
    def generate_square_subsequent_mask(self, sz):
        mask = torch.triu(torch.ones(sz, sz) * float('-inf'), diagonal=1)
        return mask.bool() 

    def create_padding_mask(self, seq, pad_idx):
        return (seq == pad_idx).bool() 

    
    def forward(self, source, target):

        target_embeddings = self.target_embeddings(target)
        source_embeddings = self.source_embeddings(source)
        

        tgt_seq_len = target.size(1)
        tgt_mask = self.generate_square_subsequent_mask(tgt_seq_len).to(target.device)
        src_key_padding_mask = self.create_padding_mask(source, self.sp).to(source.device)
        tgt_key_padding_mask = self.create_padding_mask(target, self.tp).to(target.device)
        
        output = self.transformer(
            src=source_embeddings,
            tgt=target_embeddings,
            tgt_mask=tgt_mask,
            src_key_padding_mask=src_key_padding_mask,
            tgt_key_padding_mask=tgt_key_padding_mask
        )


        output = self.non_lin(self.normalization(self.linear(output)))
        projection = self.projection(output)
        
        return projection


NameError: name 'nn' is not defined

MAIN

In [2]:
data = read_json('train')
val_data = read_json('val')
_, _, _, _, _, eval_dataset = make_wordinddicts(val_data, tokenizer)
eval_dataloader = DataLoader(eval_dataset, batch_size = BATCH_SIZE, shuffle = True)
source_word2ind, source_ind2word, target_word2ind, target_ind2word, max_len, dataset = make_wordinddicts(data, tokenizer)
dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle = True)

model = Translatorv3(len(source_word2ind), len(target_word2ind), source_word2ind['<PAD>'], target_word2ind['<PAD>']).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=target_word2ind['<PAD>'])
optimizer = torch.optim.Adam(model.parameters(), weight_decay = 0.00001)


train_losses, val_losses = train_model(model, criterion, optimizer, dataloader, eval_dataloader, 5)
model.load_state_dict(torch.load('best_model.pth'))

from utils import translate
from tqdm import tqdm
data = read_json('test_no_reference')
result = []
for line in tqdm(data):
    sentence = [char for char in line['src']]
    translated_sentence = translate(model, sentence, source_word2ind, target_word2ind)
    # print(translated_sentence)
    result.append({'src': line['src'],
                   'dst': ''.join(translated_sentence)})


NameError: name 'read_json' is not defined

UTILS

In [None]:
import json
import torch
from WordDataset import WordDataset
from tqdm import tqdm
import torch.nn.functional as F
from nltk.translate.bleu_score import sentence_bleu
import numpy as np
MAX_LEN = 10


def read_json(filepath):
    try:
        data = []
        with open(filepath, 'r', encoding='utf-8') as f:
            for line in f:
                item = json.loads(line.strip())
                data.append(item)
        return data
        
    except (FileNotFoundError, json.JSONDecodeError) as e:
        print(f"Error reading or parsing the file: {e}")
        return None







def train_model(model, criterion, optimizer, dataloader, eval_dataloader, num_epoch, device=torch.device('cuda')):
    avg_losses_train = []
    avg_losses_val = []
    best_val_loss = float('inf')

    for epoch in range(1, num_epoch + 1):
        print(f'Epoch: {epoch}')
        model.train()
        train_losses = []
        for source, target, src_mask, tgt_mask in tqdm(dataloader):
            optimizer.zero_grad()
            
            # Подготовка данных
            source, target_input = source.to(device), target[:, :-1].to(device)
            target_output = target[:, 1:].to(device).flatten()

            src_mask, tgt_mask = src_mask.to(device), tgt_mask.to(device)

            # Прямой проход
            output = model(source, target_input)
            output = output.view(-1, output.size(-1))
    
            # Вычисление ошибки
            loss = criterion(output, target_output)
            loss.backward()
            optimizer.step()
            train_losses.append(loss.item())

        avg_train_loss = sum(train_losses) / len(train_losses)
        avg_losses_train.append(avg_train_loss)
        print(f'Average train loss: {avg_train_loss:.4f}')
        
        # Оценка на валидации
        model.eval()
        val_losses = []

        with torch.no_grad():
            for source, target, src_mask, tgt_mask in tqdm(eval_dataloader):
                source, target_input = source.to(device), target[:, :-1].to(device)
                target_output = target[:, 1:].to(device).flatten()

                src_mask, tgt_mask = src_mask.to(device), tgt_mask.to(device)

                output = model(source, target_input)
                output = output.view(-1, output.size(-1))
                loss = criterion(output, target_output)
                val_losses.append(loss.item())

        avg_val_loss = sum(val_losses) / len(val_losses)
        avg_losses_val.append(avg_val_loss)
        print(f'Average val loss: {avg_val_loss:.4f}')
        
        # Сохранение лучшей модели
        if avg_val_loss < best_val_loss:
            best_val_loss = avg_val_loss
            torch.save(model.state_dict(), 'best_model.pth')
            print('Model saved.')

    return avg_losses_train, avg_losses_val


def make_wordinddicts(data, tokenizer):
    source = []
    target = []
    
    for line in data:
        t, s = line.keys()
        target.append(line[t].lower())
        source.append(line[s].lower())
    
    target_bag_of_words = []
    source_bag_of_words = []
    
    target_sentences = []
    source_sentences = []
    
    for i in range(len(target)):
        t_sent = target[i]
        s_sent = source[i]
        t_tokens = tokenizer.tokenize(t_sent.lower())
        s_tokens = [char for char in s_sent]
        
        target_bag_of_words.extend(t_tokens)
        source_bag_of_words.extend(s_tokens)
    
        target_sentences.append(t_tokens)
        source_sentences.append(s_tokens)
        
    
    special_symbols = ['<SOS>', '<EOS>', '<PAD>', '<UNK>']
    
    target_bag_of_words.extend(special_symbols)
    source_bag_of_words.extend(special_symbols)
    target_bag_of_words = set(target_bag_of_words)
    source_bag_of_words = set(source_bag_of_words)
    
    source_word2ind = {word: ind for ind, word in enumerate(source_bag_of_words)}
    target_word2ind = {word: ind for ind, word in enumerate(target_bag_of_words)}
    source_ind2word = {ind: word for ind, word in enumerate(source_bag_of_words)}
    target_ind2word = {ind: word for ind, word in enumerate(target_bag_of_words)}

    max_len = max(max([len(sentence) for sentence in target_sentences]), max([len(sentence) for sentence in source_sentences]))

    dataset = WordDataset(source_sentences, target_sentences, source_word2ind, target_word2ind, max_len = MAX_LEN)

    return source_word2ind, source_ind2word, target_word2ind, target_ind2word, max_len, dataset



def translate(model, sentence, source_word2ind, target_word2ind, device='cuda', max_length=MAX_LEN):
    """
    Переводит предложение, используя модель Translatorv3.
    """
    model.eval()
    
    source_ids = torch.tensor([[source_word2ind.get(word, source_word2ind['<UNK>']) 
                               for word in sentence]]).to(device)
    target_ids = torch.tensor([[target_word2ind['<SOS>']]]).to(device)
    
    with torch.no_grad():
        for _ in range(max_length):
            output = model(source_ids, target_ids)
            next_word_id = output[0, -1].argmax().item()
            
            target_ids = torch.cat([target_ids, 
                                  torch.tensor([[next_word_id]]).to(device)], dim=1)
            
            if next_word_id == target_word2ind['<EOS>']:
                break
    
    target_ind2word = {v: k for k, v in target_word2ind.items()}
    translated = [target_ind2word[idx.item()] for idx in target_ids[0][1:-1]]
    
    return ' '.join(translated)


def write_json(data):
    with open('output.jsonl', 'w', encoding='utf-8') as f:
        for item in data:
            json_line = json.dumps(item, ensure_ascii=False)
            f.write(json_line + '\n')



DATASET

In [None]:
class WordDataset(Dataset):
    def __init__(self, source, target, source_word2ind, target_word2ind, max_len=50):
        self.source_samples = source
        self.target_samples = target

        self.source_word2ind = source_word2ind
        self.target_word2ind = target_word2ind
        self.max_len = max_len

    def __len__(self):
        return len(self.source_samples)


    def __getitem__(self, idx):
        source_sentence = self.source_samples[idx][:self.max_len]
        target_sentence = self.target_samples[idx][:self.max_len]
        
        source_indices = [self.source_word2ind['<SOS>']] + \
                         [self.source_word2ind.get(word, self.source_word2ind['<UNK>']) for word in source_sentence] + \
                         [self.source_word2ind['<EOS>']]
        
        target_indices = [self.target_word2ind['<SOS>']] + \
                         [self.target_word2ind.get(word, self.target_word2ind['<UNK>']) for word in target_sentence] + \
                         [self.target_word2ind['<EOS>']]
        
        source_indices += [self.source_word2ind['<PAD>']] * (self.max_len + 2 - len(source_indices))
        target_indices += [self.target_word2ind['<PAD>']] * (self.max_len + 2 - len(target_indices))

        src_padding_mask = torch.tensor([token != self.source_word2ind['<PAD>'] for token in source_indices], dtype=torch.bool)
        tgt_padding_mask = torch.tensor([token != self.target_word2ind['<PAD>'] for token in target_indices], dtype=torch.bool)
        tgt_padding_mask = tgt_padding_mask[1:]
        
        source_tensor = torch.tensor(source_indices, dtype=torch.long)
        target_tensor = torch.tensor(target_indices, dtype=torch.long)
    
        return source_tensor, target_tensor, src_padding_mask, tgt_padding_mask