In [None]:
import subprocess
import sys


subprocess.run("pip install datasets nltk gensim einops evaluate", shell=True)
subprocess.run("python -m nltk.downloader punkt", shell=True)

In [None]:
import torch
import nltk
import einops
import evaluate
device = torch.device('cuda')

from datasets import load_dataset

In [None]:
wmt14 = load_dataset("wmt14", "de-en")

In [None]:
tokenizer = nltk.WordPunctTokenizer()
lemmatizer = nltk.WordNetLemmatizer()

In [None]:
def tokenize_pipeline(sentence):
    tokens= tokenizer.tokenize(sentence)
    return [lemmatizer.lemmatize(token) for token in tokens if token.isalpha()]

In [None]:
import nltk
import subprocess

try:
    nltk.data.find('wordnet.zip')
except:
    nltk.download('wordnet', download_dir='/kaggle/working/')
    command = "unzip /kaggle/working/corpora/wordnet.zip -d /kaggle/working/corpora"
    subprocess.run(command.split())
    nltk.data.path.append('/kaggle/working/')

from nltk.corpus import wordnet

In [None]:
import json
import kagglehub
tokenizedwordsandids_path = kagglehub.dataset_download('maksshan/tokenizedwordsandids')
with open(f"{tokenizedwordsandids_path}/all_tokenized_de_words", 'r') as file1:
    data = json.load(file1)
all_tokenized_de_words = set(data)
with open(f"{tokenizedwordsandids_path}/all_tokenized_en_words", 'r') as file2:
    data = json.load(file2)
all_tokenized_en_words = set(data)
en_words_to_ids = {word: idx + 16 for idx, word in enumerate(all_tokenized_en_words)}
de_words_to_ids = {word: idx + 16 for idx, word in enumerate(all_tokenized_de_words)}

In [None]:
from datasets import Dataset
a = {'translation': wmt14['train']['translation'][:100000]}
a = Dataset.from_dict(a)

In [None]:
class TranslationallDataset(torch.utils.data.Dataset):
    def __init__(self, tokenizer, en_words_to_ids,de_words_to_ids, dataset, max_len=64):
        self.tokenizer = tokenizer
        self.en_words_to_ids = en_words_to_ids
        self.de_words_to_ids = de_words_to_ids
        def tokenizer_sentence(example):
            return {'tokensen': self.tokenizer(example['translation']['en']), 'tokensde': self.tokenizer(example['translation']['de']) }
        
        def convert_words_to_ids(example):
            return {'idsen': [self.en_words_to_ids[token] for token in example['tokensen']],'idsde': [self.de_words_to_ids[token] for token in example['tokensde']]}

        dataset = dataset.map(tokenizer_sentence)

        self.dataset = dataset.map(convert_words_to_ids)
        self.max_len=64
        
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, index):
        examplede = self.dataset[index]
        lol1 = [1] + examplede['idsde'][:self.max_len-2] +[2]
        if len(lol1)< self.max_len:
            lol1 +=[0 for _ in range(self.max_len-len(lol1))] 
        exampleen = self.dataset[index]
        lol2 = [1] + exampleen['idsen'][:self.max_len-2] +[2]
        if len(lol2)< self.max_len:
            lol2 +=[0 for _ in range(self.max_len-len(lol2))]
        return torch.tensor(lol1),torch.tensor(lol2)

In [None]:
train_dataset = TranslationallDataset(tokenize_pipeline, en_words_to_ids,de_words_to_ids,a)
valid_dataset = TranslationallDataset(tokenize_pipeline, en_words_to_ids,de_words_to_ids,wmt14['validation'])
test_dataset = TranslationallDataset(tokenize_pipeline, en_words_to_ids,de_words_to_ids,wmt14['test'])

In [None]:
def collate_fn(item):
    x = torch.stack([i[0] for i in item])
    y = torch.stack([i[1] for i in item])
    return x,y

In [None]:
train_dataloader = torch.utils.data.DataLoader(train_dataset,batch_size = 2, collate_fn=collate_fn)
valid_dataloader = torch.utils.data.DataLoader(valid_dataset,batch_size = 2, collate_fn=collate_fn)
test_dataloader = torch.utils.data.DataLoader(test_dataset,batch_size = 2, collate_fn=collate_fn)

In [None]:
example = next(iter(train_dataloader))
example[0].shape

# Model


In [None]:
class MultiHeadAttention(torch.nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        self.W_q = torch.nn.Linear(d_model, d_model)
        self.W_k = torch.nn.Linear(d_model, d_model)
        self.W_v = torch.nn.Linear(d_model, d_model)
        self.W_o = torch.nn.Linear(d_model, d_model)
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        attn_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            attn_scores = attn_scores.masked_fill(mask == 0, -1e9)
        attn_probs = torch.softmax(attn_scores, dim=-1)
        output = torch.matmul(attn_probs, V)
        return output
        
    def split_heads(self, x):
        batch_size, seq_length, d_model = x.size()
        return x.view(batch_size, seq_length, self.num_heads, self.d_k).transpose(1, 2)
        
    def combine_heads(self, x):
        batch_size, _, seq_length, d_k = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_length, self.d_model)
        
    def forward(self, Q, K, V, mask=None):
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        output = self.W_o(self.combine_heads(attn_output))
        return output


In [None]:
import math
class MLP(torch.nn.Module):
    def __init__(self,hidden_dim:int):
        super().__init__()

        self.linear_0 = torch.nn.Linear(hidden_dim,2*hidden_dim)
        self.linear_1 = torch.nn.Linear(2*hidden_dim,hidden_dim)
        self.relu = torch.nn.ReLU()

    def forward(self,hidden_state):
        return self.linear_1(self.relu(self.linear_0(hidden_state))) + hidden_state

In [None]:
class EncoderTransformerLayer(torch.nn.Module):
    def __init__(self, hidden_dim: int, num_heads:int):
        super().__init__()

        self.attention_layer = MultiHeadAttention(hidden_dim,num_heads)
        self.mlp_layer = MLP(hidden_dim)
        self.layer_norm = torch.nn.LayerNorm(hidden_dim)
        self.dropout = torch.nn.Dropout(p=0.25)
        self.norm1 = torch.nn.LayerNorm(hidden_dim)
        self.norm2 = torch.nn.LayerNorm(hidden_dim)
    def forward(self, hidden_state,src_mask):
        attn_output = self.attention_layer(hidden_state, hidden_state, hidden_state,mask = src_mask)
        hidden_state = self.norm1(hidden_state + self.dropout(attn_output))
        mlp_output = self.mlp_layer(hidden_state)
        hidden_state = self.norm2(hidden_state + self.dropout(mlp_output))
        return hidden_state


In [None]:
class DecoderTransformerLayer(torch.nn.Module):
    def __init__(self, hidden_dim: int, num_heads:int):
        super().__init__()

        self.self_attention = MultiHeadAttention(hidden_dim,num_heads) #в переводе
        self.out_attention = MultiHeadAttention(hidden_dim,num_heads)#в оригинале
        self.mlp_layer = MLP(hidden_dim)
        self.layer_norm = torch.nn.LayerNorm(hidden_dim)
        self.dropout = torch.nn.Dropout(p=0.25)
        self.norm1 = torch.nn.LayerNorm(hidden_dim)
        self.norm2 = torch.nn.LayerNorm(hidden_dim)
        self.norm3 = torch.nn.LayerNorm(hidden_dim)

    def forward(self, inputs, encoder_layer_output,src_mask,tgt_mask):
        masked_attn_output =  self.self_attention(inputs, inputs, inputs, mask=tgt_mask)
        inputs = self.norm1(inputs + self.dropout(masked_attn_output))
        
        at = self.out_attention(inputs, encoder_layer_output, encoder_layer_output,mask=src_mask)
        inputs = self.norm2(inputs + self.dropout(at))
        asaf = self.mlp_layer(inputs)
        inputs = self.norm3(inputs + self.dropout(asaf))
        return inputs

In [None]:
class Encoder(torch.nn.Module):
    def __init__(self, de_dictionary_size: int, hidden_dim: int, num_heads,max_seq_len=64):
        super().__init__()
        
        self.word_embedding = torch.nn.Embedding(de_dictionary_size,hidden_dim)
        self.pos_embedding = torch.nn.Embedding(max_seq_len, hidden_dim)
        
        self.attention_layer0 = EncoderTransformerLayer(hidden_dim,num_heads)
        self.attention_layer1 = EncoderTransformerLayer(hidden_dim,num_heads)
        self.attention_layer2 = EncoderTransformerLayer(hidden_dim,num_heads)


        
    def forward(self, inputs,src_mask):
        arange_tensor = torch.arange(inputs.size(-1))
        word_embs = self.word_embedding(inputs)
        pos_embs = self.pos_embedding(arange_tensor.to(device))
        embs = word_embs + pos_embs
        hidden_state = self.attention_layer0(embs,src_mask)
        hidden_state = self.attention_layer1(hidden_state,src_mask)
        hidden_state = self.attention_layer2(hidden_state,src_mask)

        return hidden_state
        

In [None]:
class Decoder(torch.nn.Module):
    def __init__(self, en_dictionary_size: int, hidden_dim: int, num_heads,max_seq_len=64):
        super().__init__()
        self.num_heads = num_heads
        self.word_embedding = torch.nn.Embedding(en_dictionary_size,hidden_dim)
        self.pos_embedding = torch.nn.Embedding(max_seq_len, hidden_dim)
        self.TransformerLayer = DecoderTransformerLayer(hidden_dim, num_heads)
        self.TransformerLayer2 = DecoderTransformerLayer(hidden_dim, num_heads)
        self.TransformerLayer3 = DecoderTransformerLayer(hidden_dim, num_heads)
        self.lm_head = torch.nn.Linear(hidden_dim, en_dictionary_size)
        
    def forward(self, inputs, encoder_output,src_mask,tgt_mask):
        arange_tensor = torch.arange(inputs.size(-1))
        word_embs = self.word_embedding(inputs)
        pos_embs = self.pos_embedding(arange_tensor.to(device))
        embs = word_embs + pos_embs 
        hidden_state = self.TransformerLayer(embs,encoder_output,src_mask,tgt_mask)
        hidden_state = self.TransformerLayer2(hidden_state,encoder_output,src_mask,tgt_mask)
        hidden_state = self.TransformerLayer3(hidden_state,encoder_output,src_mask,tgt_mask)

        
        return self.lm_head(hidden_state)

In [None]:
class TranslationModel(torch.nn.Module):
    def __init__(self, de_dictionary_size: int, en_dictionary_size: int, hidden_dim: int, num_heads:int):
        super().__init__()
        self.num_heads=num_heads
        self.encoder = Encoder(de_dictionary_size, hidden_dim,num_heads)
        self.decoder = Decoder(en_dictionary_size, hidden_dim,num_heads)
    def generate_mask(self, src, tgt):
        src_mask = (src != PAD_TOKEN).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != PAD_TOKEN).unsqueeze(1).unsqueeze(3)
        seq_length = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_length, seq_length), diagonal=1)).bool().to(next(self.parameters()).device)
        tgt_mask = tgt_mask & nopeak_mask
        return src_mask, tgt_mask
    
    def forward(self, original_ids,translation_ids):
        src_mask, tgt_mask = self.generate_mask(original_ids, translation_ids)
        encoder_output = self.encoder(original_ids.to(device),src_mask)
        decoder_output = self.decoder(translation_ids.to(device), encoder_output,src_mask,tgt_mask)
        return decoder_output

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
torch.cuda.empty_cache()
device

In [None]:
!nvidia-smi

In [None]:
model = TranslationModel((len(all_tokenized_de_words)),(len(all_tokenized_en_words)),hidden_dim=54,num_heads=3)
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(),lr=1e-3)
criterion = torch.nn.CrossEntropyLoss()
PAD_TOKEN = 0


In [None]:
from functools import reduce

def get_num_of_params(
    model : torch.nn.Module
) -> int:
    return sum([reduce(lambda x, y: x * y, cur.shape) for cur in model.parameters()])

get_num_of_params(model)

In [None]:
import numpy as np

In [None]:
from tqdm.auto import tqdm
train_loss =[]
valid_loss =[]
epochs = 1
for epoch in tqdm(range(epochs)):
    model.train()
    train_loss_current = []
    model.train()
    for idx, (X, y) in tqdm(enumerate(train_dataloader)):
        preds = model(
            X.to(device),
            y[:, :-1].to(device)
        )
        loss = criterion(
            preds.view(-1, len(all_tokenized_en_words)), 
            y.contiguous()[:, 1:].reshape(-1).to(device)
        )

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()  
        train_loss_current.append(loss.item())

    train_loss.append(np.mean(train_loss_current))
    
    valid_loss_current = []
    model.eval()
    with torch.inference_mode():
        for idx, (X, y) in enumerate(valid_dataloader):
            preds = model(
                X.to(device),
                y[:, :-1].to(device)
            )
            loss = criterion(
                preds.view(-1, len(all_tokenized_en_words)), 
                y.contiguous()[:, 1:].reshape(-1).to(device)
            )  
            valid_loss_current.append(loss.item())
    valid_loss.append(np.mean(valid_loss_current))
    
    print(f'Эпоха - {epoch+1}, train_loss - {train_loss[-1]}, valid_loss - {valid_loss[-1]}')

In [None]:
#f= '/kaggle/working/DeepLByMeMax.model'
#torch.save(model.state_dict(), f)

In [None]:
#model.load_state_dict(torch.load('/kaggle/input/deppl/pytorch/default/1/DeepLByMeMax.model', weights_only=True,map_location=torch.device('cpu')))
model.eval()

In [None]:
def prepross(original,max_len:int):
    tokenized_original = tokenize_pipeline(original)
    idx_org = [1]+[de_words_to_ids[token] for token in tokenized_original]+[2]
    if len(idx_org)< max_len:
            idx_org +=[0 for _ in range(max_len-len(idx_org))] 
    idx_org = torch.tensor(idx_org)     
    return (idx_org)

In [None]:
def get_last_token_prediction(prefix, original, model):
    output = model(original.unsqueeze(0),prefix)
    max_token = torch.argmax(output,dim=-1)
    return max_token

In [None]:
MAX_SEQ_LEN = 64
BOS_TOKEN = 1
EOS_TOKEN = 2
@torch.inference_mode
def generate(
    src : torch.Tensor
) -> torch.Tensor:
    
    tokens = [BOS_TOKEN]
    while len(tokens) < MAX_SEQ_LEN and tokens[-1] != EOS_TOKEN:
        tokens.append(
            model(
                src.unsqueeze(0).to(device),
                torch.tensor([tokens], dtype=torch.int64).to(device))[0, -1, :].argmax().item())
    return torch.tensor(tokens, dtype=torch.int64)

In [None]:
original = "Ich liebe dich"
org_preprossed = prepross(original,max_len=64)
org_preprossed

In [None]:
translated = generate(org_preprossed)
translated

In [None]:
def dec(tokens):
    decoded = []
    for i in tokens: 
        key = next((key for key, value in en_words_to_ids.items() if value == i), None)
        decoded.append(key)
    return decoded 

In [None]:
d = dec(translated)
e = ' '.join(d[1:-1])

In [None]:
print(f'Исходный текст: {original}')
print(f'Перевод: {e}')