<a href="https://colab.research.google.com/github/natsakh/IAD/blob/main/Pr_7/7_2_Encoder_IMDB_classification.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import nltk
from nltk.tokenize import word_tokenize
nltk.download('punkt')
nltk.download('punkt_tab')

import matplotlib.pyplot as plt
import numpy as np
import re, string
from collections import Counter

import math
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader, random_split
import random

torch.manual_seed(42)
random.seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


Device: cuda


In [2]:
# Hugging Face Datasets — бібліотека для завантаження датасетів.
#У Google Colab вона вже попередньо встановлена
# Якщо ви працюєте локально — попередньо виконайте:
#     pip install datasets
from datasets import load_dataset
#https://huggingface.co/datasets

In [26]:
ds = load_dataset("imdb")
#ds["train"], ds["test"]
#список словників — кожен елемент має ключі 'text' і 'label'

In [4]:
print(ds)

DatasetDict({
    train: Dataset({
        features: ['text', 'label'],
        num_rows: 25000
    })
    test: Dataset({
        features: ['text', 'label'],
        num_rows: 25000
    })
    unsupervised: Dataset({
        features: ['text', 'label'],
        num_rows: 50000
    })
})


In [5]:
example = ds["train"][0]
print('text: ', example['text'])
print('label: ', example['label'])
#label = 1 → позитивний відгук, 0 → негативний.

text:  I rented I AM CURIOUS-YELLOW from my video store because of all the controversy that surrounded it when it was first released in 1967. I also heard that at first it was seized by U.S. customs if it ever tried to enter this country, therefore being a fan of films considered "controversial" I really had to see this for myself.<br /><br />The plot is centered around a young Swedish drama student named Lena who wants to learn everything she can about life. In particular she wants to focus her attentions to making some sort of documentary on what the average Swede thought about certain political issues such as the Vietnam War and race issues in the United States. In between asking politicians and ordinary denizens of Stockholm about their opinions on politics, she has sex with her drama teacher, classmates, and married men.<br /><br />What kills me about I AM CURIOUS-YELLOW is that 40 years ago, this was considered pornographic. Really, the sex and nudity scenes are few and far betwe

In [6]:
def clean_text(text):
    text = text.lower()                                       # до нижнього регістру
    text = re.sub(r"@\S+", " ", text)                         # прибрати згадки @user
    text = re.sub(r"http\S+", " ", text)                      # прибрати посилання
    text = re.sub(r"<.*?>", " ", text)                        # прибрати HTML-теги
    text = re.sub(r"[^a-z\s]", " ", text)                     # залишити лише букви (прибрати цифри, спецсимволи)
    text = re.sub(f"[{re.escape(string.punctuation)}]", " ", text)  # прибрати пунктуацію
    text = re.sub(r"\s+", " ", text)                          # замінити багато пробілів на один
    return text

In [7]:
cleaned_texts = [clean_text(ex["text"]) for ex in ds["train"]]
tokens = [word_tokenize(t) for t in cleaned_texts]

In [8]:
counter = Counter()
for tok_list in tokens:
    counter.update(tok_list)

In [9]:
# кількість унікальних слів
vocab_size = len(counter)

print(f"Кількість унікальних слів у train-наборі: {vocab_size:,}")

MAX_VOCAB = 20_000

Кількість унікальних слів у train-наборі: 73,206


In [10]:
# створюємо списки перетворень, створюємо словник
specials = ["[PAD]", "[UNK]"]
most_common = counter.most_common(MAX_VOCAB - len(specials))
itos = specials + [w for w, _ in most_common]       # index → string
stoi = {w: i for i, w in enumerate(itos)}           # string → index

PAD_IDX = stoi["[PAD]"]
UNK_IDX = stoi["[UNK]"]

print("Розмір словника:", len(stoi))
print("Приклад індексу:", stoi.get("movie", UNK_IDX))

Розмір словника: 20000
Приклад індексу: 18


In [11]:
#кодуємо тексти у послідовності індексів
#для кожного токена беремо його індекс зі словника stoi,
#якщо слова немає — повертаємо індекс UNK_IDX.
def encode(tokens):
    return [stoi.get(t, UNK_IDX) for t in tokens]

encoded_texts = [encode(tok_list) for tok_list in tokens]
print(encoded_texts[0][:20])


[10, 1593, 10, 239, 1972, 4045, 38, 60, 366, 1097, 86, 5, 31, 2, 6974, 12, 3356, 8, 54, 8]


In [12]:
#Padding (вирівнюємо довжину)
#Нейронна мережа очікує тензори однакової довжини,
#тому короткі тексти “доповнюємо” <pad>, а надто довгі — обрізаємо

MAX_LEN = 100   # довжина послідовності
def pad_sequence(seq):
    seq = seq[:MAX_LEN] + [PAD_IDX] * max(0, MAX_LEN - len(seq))
    return torch.tensor(seq, dtype=torch.long)

X = torch.stack([pad_sequence(seq) for seq in encoded_texts])
print(X.shape)   # [N, MAX_LEN]


torch.Size([25000, 100])


In [13]:
# мітки (labels)
y = torch.tensor([ex["label"] for ex in ds["train"]], dtype=torch.float32)
print(y.shape)   # [N]

torch.Size([25000])


In [14]:
#TensorDataset і DataLoader
dataset = TensorDataset(X, y)

# розділяємо на train/val
VAL_FRAC = 0.2
val_sz = int(len(dataset) * VAL_FRAC)
train_sz = len(dataset) - val_sz
train_ds, val_ds = random_split(dataset, [train_sz, val_sz],
                                generator=torch.Generator().manual_seed(42))

train_loader = DataLoader(train_ds, batch_size=128, shuffle=True)
val_loader   = DataLoader(val_ds, batch_size=128)


In [15]:
# використовуємо ті самі clean_text, токенайзер/word_tokenize, stoi, PAD_IDX, MAX_LEN

# Підготовка X_test, y_test
cleaned_test = [clean_text(ex["text"]) for ex in ds["test"]]
tokens_test  = [word_tokenize(t) for t in cleaned_test]
encoded_test = [[stoi.get(tok, UNK_IDX) for tok in toks] for toks in tokens_test]

def pad_sequence(seq):
    seq = seq[:MAX_LEN] + [PAD_IDX] * max(0, MAX_LEN - len(seq))
    return torch.tensor(seq, dtype=torch.long)

X_test = torch.stack([pad_sequence(seq) for seq in encoded_test])
y_test = torch.tensor([ex["label"] for ex in ds["test"]], dtype=torch.float32)

# 2) TensorDataset + DataLoader
test_ds = TensorDataset(X_test, y_test)
test_loader = DataLoader(test_ds, batch_size=128, shuffle=False)  # на тесті shuffle=False


In [16]:
def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    train_loss = 0.0

    for x, y in loader:
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        out = model(x)
        loss = criterion(out, y)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    train_loss /= len(loader)

    return train_loss


In [17]:
def evaluate_one_epoch(model, loader, criterion, device):
    model.eval()
    valid_loss = 0.0

    with torch.no_grad():
      for x, y in loader:
          x, y = x.to(device), y.to(device)
          out = model(x)
          loss = criterion(out, y)
          valid_loss += loss.item()
    valid_loss /= len(loader)

    return valid_loss

In [18]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        # pe: [max_len, d_model]
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1)          # [max_len, 1]
        div_term = torch.exp(
            torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model)
        )
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)  # [1, max_len, d_model] – для додавання до batch
        self.register_buffer("pe", pe)

    def forward(self, x):
        # x: [B, T, d_model]
        x = x + self.pe[:, : x.size(1)]
        return x


In [19]:
class TransformerEncoder_IMDB(nn.Module):
    def __init__(
        self,
        vocab_size,
        emb_dim,
        pad_idx,
        n_heads=4,
        n_layers=2,
        dim_feedforward=256,
        dropout=0.1,
        max_len=5000,
    ):
        super().__init__()
        self.pad_idx = pad_idx
        self.emb_dim = emb_dim

        # Слово → вектор
        self.embedding = nn.Embedding(
            num_embeddings=vocab_size,
            embedding_dim=emb_dim,
            padding_idx=pad_idx
        )

        # Позиційні кодування
        self.pos_encoding = PositionalEncoding(d_model=emb_dim, max_len=max_len)

        # Один шар енкодера трансформера
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=emb_dim,
            nhead=n_heads,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True,   # очікуємо [B, T, D]
        )
        # Стек із кількох шарів
        self.encoder = nn.TransformerEncoder(
            encoder_layer,
            num_layers=n_layers
        )

        # Класифікаційна "голова"
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(emb_dim, 1)

    def forward(self, x):
        # x: [B, T] – індекси слів

        # Створюємо маску паддінгу: True там, де PAD
        src_key_padding_mask = (x == self.pad_idx)   # [B, T], bool

        # Ембедінги + масштабуємо
        x = self.embedding(x) * math.sqrt(self.emb_dim)  # [B, T, D]

        # Додаємо позиційні кодування
        x = self.pos_encoding(x)  # [B, T, D]

        # Проганяємо через TransformerEncoder
        # src_key_padding_mask: True = token буде ігноруватись
        enc_out = self.encoder(
            x,
            src_key_padding_mask=src_key_padding_mask
        )  # [B, T, D]

        pooled = enc_out.mean(dim=1)   # [B, D]

        logits = self.fc(self.dropout(pooled))   # [B, 1]
        return logits.squeeze(1)                 # [B]



In [20]:
vocab_size = len(stoi)
emb_dim = 100
hidden_dim = 64
pad_idx = stoi["[PAD]"]

model_transformer = TransformerEncoder_IMDB(
    vocab_size=vocab_size,
    emb_dim=emb_dim,
    pad_idx=pad_idx,
    n_heads=4,          # 100 % 4 == 0 – ок
    n_layers=2,
    dim_feedforward=256,
    dropout=0.1,
    max_len=MAX_LEN     #  MAX_LEN=100
).to(device)


In [21]:
num_params = sum(p.numel() for p in model_transformer.parameters() if p.requires_grad)
print("Trainable params:", num_params)

Trainable params: 2184813


In [22]:
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model_transformer.parameters(), lr=1e-3)

In [23]:
# навчання
n_epochs = 7

for epoch in range(n_epochs):
   train_loss = train_one_epoch(model_transformer, train_loader, optimizer, criterion, device)
   val_loss = evaluate_one_epoch(model_transformer, val_loader, criterion, device)

   print(f"[{epoch+1:02d}] train_loss={train_loss:.4f} | val_loss={val_loss:.4f}")

  output = torch._nested_tensor_from_mask(


[01] train_loss=0.5930 | val_loss=0.5310
[02] train_loss=0.4855 | val_loss=0.5069
[03] train_loss=0.4297 | val_loss=0.4828
[04] train_loss=0.3882 | val_loss=0.4776
[05] train_loss=0.3574 | val_loss=0.5188
[06] train_loss=0.3282 | val_loss=0.4907
[07] train_loss=0.3038 | val_loss=0.5085


In [24]:
# Для фінальної перевірки якості
@torch.no_grad()
def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss, total_acc, n = 0.0, 0, 0
    for xb, yb in loader:
        xb, yb = xb.to(device), yb.to(device)
        logits = model(xb)
        loss = criterion(logits, yb)
        preds = (torch.sigmoid(logits) >= 0.5).long()
        total_loss += loss.item() * xb.size(0)
        total_acc  += (preds == yb.long()).sum().item()
        n += xb.size(0)
    return total_loss/n, total_acc/n

In [25]:
test_loss, test_acc = evaluate(model_transformer, test_loader, criterion, device)
print(f"TEST  loss={test_loss:.4f}  acc={test_acc*100:.2f}%")

TEST  loss=0.5238  acc=76.77%
