In [2]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, Dataset
from collections import Counter
import torch.nn.functional as F

import random
import math

torch.manual_seed(42)
random.seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

lines = open("deu.txt", encoding="utf-8").read().split("\n")
print(lines[0])
print()

pairs = []
for line in lines:
    parts = line.split("\t")
    if len(parts) < 2:
        continue
    eng = parts[0].strip()
    ger = parts[1].strip()
    pairs.append((eng, ger))

print(pairs[0])
print(len(pairs))
print()

for _ in range(5):
    print(random.choice(pairs))

MAX_VOCAB = 8000
specials = ["[PAD]", "[UNK]", "[BOS]", "[EOS]"]


# Токенізатор (простий)
def tokenize(text):
    return text.lower().strip().split()

# Будуємо словник
def build_vocab(sentences):
    counter = Counter()

    for s in sentences:
        counter.update(tokenize(s))

    most_common = counter.most_common(MAX_VOCAB - len(specials))
    itos = specials + [w for w, _ in most_common]
    stoi = {w: i for i, w in enumerate(itos)}

    return stoi, itos

# Розділяємо дані
eng_sentences = [p[0] for p in pairs]
ger_sentences = [p[1] for p in pairs]

# Створюємо два словники
eng_stoi, eng_itos = build_vocab(eng_sentences)
ger_stoi, ger_itos = build_vocab(ger_sentences)

PAD_IDX = eng_stoi["[PAD]"]    # однакові індекси у двох мовах — добре
UNK_IDX = eng_stoi["[UNK]"]
BOS_IDX = eng_stoi["[BOS]"]
EOS_IDX = eng_stoi["[EOS]"]

print("ENG vocab size:", len(eng_stoi))
print("GER vocab size:", len(ger_stoi))

MAX_LEN_SRC = 20
MAX_LEN_TGT = 20


def encode(text, stoi, add_specials=False):
    tokens = tokenize(text)
    ids = []

    if add_specials:
        ids.append(BOS_IDX)

    for t in tokens:
        ids.append(stoi.get(t, UNK_IDX))

    if add_specials:
        ids.append(EOS_IDX)

    return ids

def prepare_pair(eng, ger):
    # encoder input: англ без BOS/EOS
    src = encode(eng, eng_stoi, add_specials=False)

    # повна цільова послідовність: [BOS ... EOS]
    tgt_full = encode(ger, ger_stoi, add_specials=True)

    # decoder_inputs: без останнього токена
    tgt_in = tgt_full[:-1]

    # targets: без першого токена (зсунуті)
    tgt_out = tgt_full[1:]

    # паддинг
    src = src[:MAX_LEN_SRC] + [PAD_IDX] * (MAX_LEN_SRC - len(src))
    tgt_in = tgt_in[:MAX_LEN_TGT] + [PAD_IDX] * (MAX_LEN_TGT - len(tgt_in))
    tgt_out = tgt_out[:MAX_LEN_TGT] + [PAD_IDX] * (MAX_LEN_TGT - len(tgt_out))

    return src, tgt_in, tgt_out


class TranslationDataset(Dataset):
    def __init__(self, pairs):
        self.pairs = pairs

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        eng, ger = self.pairs[idx]
        src, tgt_in, tgt_out = prepare_pair(eng, ger)

        return (
            torch.tensor(src, dtype=torch.long),
            torch.tensor(tgt_in, dtype=torch.long),
            torch.tensor(tgt_out, dtype=torch.long),
        )


# зробимо train/val/test спліт
random.shuffle(pairs)

total = len(pairs)
train_size = int(0.95 * total)

train_pairs = pairs[:train_size]
val_pairs = pairs[train_size:]

print("Train:", len(train_pairs))
print("Val:", len(val_pairs))

print("Total:", len(train_pairs) + len(val_pairs))

train_ds = TranslationDataset(train_pairs)
val_ds = TranslationDataset(val_pairs)

BATCH_SIZE = 64

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE)

src, tgt_in, tgt_out = next(iter(train_loader))
print("src:", src.shape)  # [B, MAX_LEN_SRC]
print("tgt_in:", tgt_in.shape)  # [B, MAX_LEN_TGT]
print("tgt_out:", tgt_out.shape)


def make_padding_mask(x, pad_idx=PAD_IDX):
    # x: [B, T]
    # True там, де пади (так хоче nn.Transformer)
    return (x == pad_idx)


def generate_subsequent_mask(size, device):
    # True = заборонено
    mask = torch.triu(torch.ones(size, size, dtype=torch.bool, device=device), diagonal=1)
    return mask


class TokenPositionalEmbedding(nn.Module):
    def __init__(self, vocab_size, d_model, max_len=50, dropout=0.1):
        super().__init__()
        self.token_emb = nn.Embedding(vocab_size, d_model, padding_idx=PAD_IDX)
        self.pos_emb = nn.Embedding(max_len, d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # x: [B, T]
        B, T = x.shape
        pos = torch.arange(T, device=x.device).unsqueeze(0).expand(B, T)
        x = self.token_emb(x) + self.pos_emb(pos)
        return self.dropout(x)


class Seq2SeqTransformer(nn.Module):
    def __init__(self,
                 src_vocab_size,
                 tgt_vocab_size,
                 d_model=64,
                 n_heads=4,
                 num_layers=2,
                 d_ff=128,
                 max_len_src=20,
                 max_len_tgt=20,
                 dropout=0.1):
        super().__init__()

        self.src_emb = TokenPositionalEmbedding(src_vocab_size, d_model, max_len_src, dropout)
        self.tgt_emb = TokenPositionalEmbedding(tgt_vocab_size, d_model, max_len_tgt, dropout)

        enc_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_ff,
            dropout=dropout,
            batch_first=True,
        )
        dec_layer = nn.TransformerDecoderLayer(
            d_model=d_model,
            nhead=n_heads,
            dim_feedforward=d_ff,
            dropout=dropout,
            batch_first=True,
        )

        self.encoder = nn.TransformerEncoder(enc_layer, num_layers=num_layers)
        self.decoder = nn.TransformerDecoder(dec_layer, num_layers=num_layers)

        self.output_proj = nn.Linear(d_model, tgt_vocab_size)

    def forward(self, src, tgt_in):
        """
        src: [B, S]
        tgt_in: [B, T]
        """
        src_key_padding_mask = make_padding_mask(src)  # [B, S]
        tgt_key_padding_mask = make_padding_mask(tgt_in)  # [B, T]

        # embeddings
        enc_in = self.src_emb(src)  # [B, S, D]
        dec_in = self.tgt_emb(tgt_in)  # [B, T, D]

        # encoder
        memory = self.encoder(
            enc_in,
            src_key_padding_mask=src_key_padding_mask
        )  # [B, S, D]

        # декодерська causal mask
        T = tgt_in.size(1)
        tgt_mask = generate_subsequent_mask(T, device=tgt_in.device)  # [T,T]

        dec_out = self.decoder(
            dec_in,
            memory,
            tgt_mask=tgt_mask,
            tgt_key_padding_mask=tgt_key_padding_mask,
            memory_key_padding_mask=src_key_padding_mask,
        )  # [B, T, D]

        logits = self.output_proj(dec_out)  # [B, T, V_tgt]
        return logits


src_vocab_size = len(eng_stoi)
tgt_vocab_size = len(ger_stoi)

model = Seq2SeqTransformer(
    src_vocab_size=src_vocab_size,
    tgt_vocab_size=tgt_vocab_size,
    d_model=64,
    n_heads=4,
    num_layers=4,
    d_ff=128,
    max_len_src=MAX_LEN_SRC,
    max_len_tgt=MAX_LEN_TGT,
).to(device)


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


print("Trainable parameters:", count_parameters(model))

criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)


def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss = 0.0
    n_tokens = 0

    for src, tgt_in, tgt_out in loader:
        src = src.to(device)
        tgt_in = tgt_in.to(device)
        tgt_out = tgt_out.to(device)

        optimizer.zero_grad()

        logits = model(src, tgt_in)  # [B, T, V]
        B, T, V = logits.shape

        loss = criterion(
            logits.view(B * T, V),
            tgt_out.view(B * T)
        )

        loss.backward()
        optimizer.step()

        total_loss += loss.item() * (tgt_out != PAD_IDX).sum().item()
        n_tokens += (tgt_out != PAD_IDX).sum().item()

    return total_loss / n_tokens


@torch.no_grad()
def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss = 0.0
    n_tokens = 0

    for src, tgt_in, tgt_out in loader:
        src = src.to(device)
        tgt_in = tgt_in.to(device)
        tgt_out = tgt_out.to(device)

        logits = model(src, tgt_in)
        B, T, V = logits.shape

        loss = criterion(
            logits.view(B * T, V),
            tgt_out.view(B * T)
        )

        total_loss += loss.item() * (tgt_out != PAD_IDX).sum().item()
        n_tokens += (tgt_out != PAD_IDX).sum().item()

    return total_loss / n_tokens


EPOCHS = 10

for ep in range(1, EPOCHS + 1):
    train_loss = train_one_epoch(model, train_loader, optimizer, criterion, device)
    val_loss = evaluate(model, val_loader, criterion, device)
    print(f"Epoch {ep}/{EPOCHS} - train loss/token: {train_loss:.4f}  val loss/token: {val_loss:.4f}")


def encode_src_sentence(text, stoi, max_len):
    tokens = tokenize(text)
    ids = [eng_stoi.get(t, UNK_IDX) for t in tokens]
    ids = ids[:max_len] + [PAD_IDX] * (max_len - len(ids))
    return torch.tensor(ids, dtype=torch.long).unsqueeze(0)  # [1, S]


def decode_tgt_ids(ids, itos):
    # ids: список індексів (наприклад, з BOS/EOS/PAD)
    tokens = []
    for i in ids:
        if i in (PAD_IDX, BOS_IDX, EOS_IDX):
            continue
        tokens.append(itos[i])
    return " ".join(tokens)


@torch.no_grad()
def translate(model, sentence, max_len=MAX_LEN_TGT):
    model.eval()
    device = next(model.parameters()).device

    # 1) кодуємо англійське речення
    src = encode_src_sentence(sentence, eng_stoi, MAX_LEN_SRC).to(device)  # [1, S]
    src_key_padding_mask = make_padding_mask(src)  # [1, S]

    # 2) пропускаємо через encoder
    enc_in = model.src_emb(src)  # [1, S, D]
    memory = model.encoder(enc_in, src_key_padding_mask=src_key_padding_mask)  # [1, S, D]

    # 3) старт декодера з BOS
    tgt_ids = [BOS_IDX]
    for _ in range(max_len):
        tgt_tensor = torch.tensor(tgt_ids, dtype=torch.long, device=device).unsqueeze(0)  # [1, T]
        tgt_key_padding_mask = make_padding_mask(tgt_tensor)

        T = tgt_tensor.size(1)
        tgt_mask = generate_subsequent_mask(T, device=device)  # [T, T]

        dec_in = model.tgt_emb(tgt_tensor)  # [1, T, D]
        dec_out = model.decoder(
            dec_in,
            memory,
            tgt_mask=tgt_mask,
            tgt_key_padding_mask=tgt_key_padding_mask,
            memory_key_padding_mask=src_key_padding_mask,
        )  # [1, T, D]

        logits = model.output_proj(dec_out[:, -1, :])  # [1, V]
        next_id = logits.argmax(dim=-1).item()

        tgt_ids.append(next_id)

        if next_id == EOS_IDX:
            break

    # 4) декодуємо у текст
    translation = decode_tgt_ids(tgt_ids, ger_itos)
    return translation


examples = [
    "Go.",
    "Hi!",
    "Good morning.",
    "Good night.",
    "I am fine.",
    "See you later.",
    "I forgot my keys at home.",
    "We are looking for a new apartment.",
    "She didn’t come because she was sick.",
]

for s in examples:
    print("EN:", s)
    print("GR:", translate(model, s))
    print()


Device: cpu
Go.	Geh.	CC-BY 2.0 (France) Attribution: tatoeba.org #2877272 (CM) & #8597805 (Roujin)

('Go.', 'Geh.')
324282

('Tom is fishing today.', 'Tom angelt heute.')
('Adjust your tie.', 'Richte deine Krawatte!')
('Who did you give my book to?', 'Wem hast du mein Buch gegeben?')
('This book is worth reading.', 'Es lohnt sich, dieses Buch zu lesen.')
('Tom is out shoveling snow.', 'Tom schippt draußen Schnee.')
ENG vocab size: 8000
GER vocab size: 8000
Train: 308067
Val: 16215
Total: 324282
src: torch.Size([64, 20])
tgt_in: torch.Size([64, 20])
tgt_out: torch.Size([64, 20])
Trainable parameters: 1881408


  output = torch._nested_tensor_from_mask(


Epoch 1/10 - train loss/token: 3.4507  val loss/token: 2.3313
Epoch 2/10 - train loss/token: 2.2529  val loss/token: 1.8451
Epoch 3/10 - train loss/token: 1.9202  val loss/token: 1.6535
Epoch 4/10 - train loss/token: 1.7531  val loss/token: 1.5482
Epoch 5/10 - train loss/token: 1.6437  val loss/token: 1.4766
Epoch 6/10 - train loss/token: 1.5655  val loss/token: 1.4176
Epoch 7/10 - train loss/token: 1.5040  val loss/token: 1.3829
Epoch 8/10 - train loss/token: 1.4558  val loss/token: 1.3437
Epoch 9/10 - train loss/token: 1.4164  val loss/token: 1.3132
Epoch 10/10 - train loss/token: 1.3818  val loss/token: 1.2994
EN: Go.
GR: geh los.

EN: Hi!
GR: [UNK]

EN: Good morning.
GR: guten morgen [UNK]

EN: Good night.
GR: gute nacht [UNK]

EN: I am fine.
GR: ich bin gut.

EN: See you later.
GR: wir sehen uns später.

EN: I forgot my keys at home.
GR: ich habe meine schlüssel zu hause vergessen.

EN: We are looking for a new apartment.
GR: wir suchen eine neue wohnung.

EN: She didn’t come beca