## Kiến trúc Transformer

In [1]:
# Import các thư viện cần thiết

import torch # type: ignore
import torchvision.transforms as transforms # type: ignore
from torch.utils.data import DataLoader, random_split # type: ignore
import torch.optim as optim # type: ignore
from torchvision.datasets import ImageFolder # type: ignore
from torch import nn # type: ignore
import math # type: ignore
import os # type: ignore

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [None]:
# 1. Input Embedding, Positional Encoding
class TokenAndPositionalEmbedding(nn.Module):
    def __init__(self, vocab_size, embed_dim, max_length, device='cpu'):
        super().__init__()
        self.device = device
        self.word_emb = nn.Embedding(
                                    num_embeddings = vocab_size,
                                    embedding_dim = embed_dim
                                    )
        self.pos_emb = nn.Embedding(
                                    num_embeddings = max_length,
                                    embedding_dim = embed_dim
                                    )
    def forward(self, x):
        N, seq_len = x.size()
        positions = torch.arange(0, seq_len).expand(N, seq_len).to(self.device)
        output1 = self.word_emb(x)
        output2 = self.pos_emb(positions)
        output = output1 + output2
        return output


In [None]:
# 2. Encoder
class TransformerEncoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
        super().__init__()
        self.attn = nn.MultiheadAttention(
                                        embed_dim = embed_dim,
                                        num_heads = num_heads,
                                        batch_first = True
                                    )
        self.ffn = nn.Sequential(
                                nn.Linear(in_features=embed_dim, out_features=ff_dim, bias=True),
                                nn.ReLU(),
                                nn.Linear(in_features=ff_dim, out_features=embed_dim, bias=True)
                                )
        self.layernorm_1 = nn.LayerNorm(normalized_shape=embed_dim, eps=1e-6)
        self.layernorm_2 = nn.LayerNorm(normalized_shape=embed_dim, eps=1e-6)
        self.dropout_1 = nn.Dropout(p=dropout)
        self.dropout_2 = nn.Dropout(p=dropout)

    def forward(self, query, key, value):
        att_output, _ = self.attn(query=query, key=key, value=value)
        att_output = self.dropout_1(att_output)
        out_1 = self.layernorm_1(query + att_output)
        ffn_output = self.ffn(out_1)
        ffn_output = self.dropout_2(ffn_output)
        out_2 = self.layernorm_2(out_1 + ffn_output)
        return out_2

class TransformerEncoder(nn.Module):
    def __init__(self,
                 src_vocab_size,
                 embed_dim,
                 max_length,
                 num_layers,
                 num_heads,
                 ff_dim,
                 dropout=0.1,
                 device='cpu'):
        super().__init__()
        self.embedding = TokenAndPositionalEmbedding(src_vocab_size,
                                                     embed_dim,
                                                     max_length,
                                                     device)
        self.layers = nn.ModuleList([TransformerEncoderBlock(
                            embed_dim, num_heads, ff_dim, dropout) for i in range(num_layers)])

    def forward(self, x):
        output = self.embedding(x)
        for layer in self.layers:
            output = layer(output, output, output)
        return output

In [None]:
# 3. Decoder
class TransformerDecoderBlock(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
        super().__init__()
        self.attn = nn.MultiheadAttention(
                                        embed_dim = embed_dim,
                                        num_heads = num_heads,
                                        batch_first = True
                                        )
        self.cross_attn = nn.MultiheadAttention(
                                                embed_dim = embed_dim,
                                                num_heads = num_heads,
                                                batch_first = True
                                                )
        self.ffn = nn.Sequential(
                                nn.Linear(in_features=embed_dim, out_features=ff_dim, bias=True),
                                nn.ReLU(),
                                nn.Linear(in_features=ff_dim, out_features=embed_dim, bias=True)
                                )
        self.layernorm_1 = nn.LayerNorm(normalized_shape=embed_dim, eps=1e-6)
        self.layernorm_2 = nn.LayerNorm(normalized_shape=embed_dim, eps=1e-6)
        self.layernorm_3 = nn.LayerNorm(normalized_shape=embed_dim, eps=1e-6)
        self.dropout_1 = nn.Dropout(p=dropout)
        self.dropout_2 = nn.Dropout(p=dropout)
        self.dropout_3 = nn.Dropout(p=dropout)

    def forward(self, x, enc_output, src_mask, tgt_mask):
        att_output, _ = self.attn(x, x, x, attn_mask=tgt_mask)
        att_output = self.dropout_1(att_output)
        out_1 = self.layernorm_1(x + att_output)

        attn_output, _ = self.cross_attn(out_1, enc_output, enc_output, attn_mask=src_mask)
        attn_output = self.dropout_2(attn_output)
        out_2 = self.layernorm_2(out_1 + attn_output)

        ffn_output = self.ffn(out_2)
        ffn_output = self.dropout_3(ffn_output)
        out_3 = self.layernorm_3(out_2 + ffn_output)
        return out_3

class TransformerDecoder(nn.Module):
    def __init__(self,
                 tgt_vocab_size,
                 embed_dim,
                 max_length,
                 num_layers,
                 num_heads,
                 ff_dim,
                 dropout=0.1,
                 device='cpu'):
        super().__init__()
        self.embedding = TokenAndPositionalEmbedding(tgt_vocab_size,
                                                     embed_dim,
                                                     max_length,
                                                     device)
        self.layers = nn.ModuleList([TransformerDecoderBlock(
                            embed_dim, num_heads, ff_dim, dropout) for i in range(num_layers)])


    def forward(self, x, enc_output, src_mask, tgt_mask):
        output = self.embedding(x)
        for layer in self.layers:
            output = layer(output, enc_output, src_mask, tgt_mask)
        return output

In [None]:
# 4. Transformer
class Transformer(nn.Module):
    def __init__(self,
                 src_vocab_size,
                 tgt_vocab_size,
                 embed_dim,
                 max_length,
                 num_layers,
                 num_heads,
                 ff_dim,
                 dropout=0.1,
                 device='cpu'):
        super().__init__()
        self.device = device
        self.encoder = TransformerEncoder(src_vocab_size,
                                          embed_dim,
                                          max_length,
                                          num_layers,
                                          num_heads,
                                          ff_dim,
                                          dropout=dropout,
                                          device=device
                                          )
        self.decoder = TransformerDecoder(tgt_vocab_size,
                                          embed_dim,
                                          max_length,
                                          num_layers,
                                          num_heads,
                                          ff_dim,
                                          dropout=dropout,
                                          device=device
                                          )
        self.fc = nn.Linear(embed_dim, tgt_vocab_size)

    def generate_mask(self, src, tgt):
        src_seq_len = src.shape[1]
        tgt_seq_len = tgt.shape[1]

        src_mask = torch.zeros((src_seq_len, src_seq_len), device=self.device).type(torch.bool)
        tgt_mask = torch.triu(torch.zeros((tgt_seq_len, tgt_seq_len), device=self.device)==1).transpose(0, 1)
        tgt_mask = tgt_mask.float().masked_fill(tgt_mask == 0, float('-inf')).masked_fill(tgt_mask == 1, float(0.0))

        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        enc_output = self.encoder(src)
        dec_output = self.decoder(tgt, enc_output, src_mask, tgt_mask)
        output = self.fc(dec_output)
        return output

In [None]:
# 5. Thử nghiệm
batch_size = 128
src_vocab_size = 1000
tgt_vocab_size = 2000
embed_dim = 200
max_length = 100
num_layers = 2
num_headers = 4
ff_dim = 256

model = Transformer(src_vocab_size,
                    tgt_vocab_size,
                    embed_dim,
                    max_length,
                    num_layers,
                    num_headers,
                    ff_dim)
src = torch.randint(
                    high=2,
                    size=(batch_size, max_length),
                    dtype=torch.int64
                    )
tgt = torch.randint(
                    high=2,
                    size=(batch_size, max_length),
                    dtype=torch.int64
                    )

prediction = model(src, tgt)
prediction.shape #batch_size x max)_length x tgt_vocab_size

## Text Classification

In [None]:
# 1. Load Dataset
!gdown 1vSevps_hV5zhVf6aWuN8X7dd-qSAIgcc
!unzip ./flower_photos.zip

# load data
data_patch = "./flower_photos"
dataset = ImageFolder(root = data_patch)
num_samples = len(dataset)
classes = dataset.classes
num_classes = len(dataset.classes)

# split
TRAIN_RATIO, VALID_RATIO = 0.8, 0.1
n_train_examples = int(num_samples * TRAIN_RATIO)
n_valid_examples = int(num_samples * VALID_RATIO)
n_test_examples = num_samples - n_train_examples - n_valid_examples
train_dataset, valid_dataset, test_dataset = random_split(
                        dataset,
                        [n_train_examples, n_valid_examples, n_test_examples]
                        )

In [None]:
# 2. Preprocessing
IMG_SIZE = 224

train_transforms = transforms.Compose([
                        transforms.Resize((IMG_SIZE, IMG_SIZE)),
                        transforms.RandomHorizontalFlip(),
                        transforms.RandomRotation(0.2),
                        transforms.ToTensor(),
                        transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
                        ])

test_transforms = transforms.Compose([
                        transforms.Resize((IMG_SIZE, IMG_SIZE)),
                        transforms.ToTensor(),
                        transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])
                        ])

# apply
train_dataset.dataset.transform = train_transforms
valid_dataset.dataset.transform = test_transforms
test_dataset.dataset.transform = test_transforms

In [None]:
# 3. Dataloader
BATCH_SIZE = 512

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, num_workers=4, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE, num_workers=4, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, num_workers=4, shuffle=True)

In [None]:
# tran epoch
import time

def train_epoch(model, optimizer, criterion, train_dataloader, device, epoch=0, log_interval=50):
    model.train()
    total_acc, total_count = 0, 0
    losses = []
    start_time = time.time()

    for idx, (inputs, labels) in enumerate(train_dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        predictions = model(inputs)

        # compute loss
        loss = criterion(predictions, labels)
        losses.append(loss.item())

        # backward
        loss.backward()
        optimizer.step()
        total_acc += (predictions.argmax(1) == labels).sum().item()
        total_count += labels.size(0)
        if idx % log_interval == 0 and idx > 0:
            elapsed = time.time() - start_time
            print(
                "| epoch {:3d} | {:5d}/{:5d} batches "
                "| accuracy {:8.3f}".format(
                    epoch, idx, len(train_dataloader), total_acc / total_count
                )
            )
            total_acc, total_count = 0, 0
            start_time = time.time()

    epoch_acc = total_acc / total_count
    epoch_loss = sum(losses) / len(losses)
    return epoch_loss, epoch_acc

# evaluate
def evaluate_epoch(model, criterion, dataloader, device):
    model.eval()
    total_acc, total_count = 0, 0
    losses = []
    with torch.no_grad():
        for idx, (inputs, labels) in enumerate(dataloader):
            inputs = inputs.to(device)
            labels = labels.to(device)

            predictions = model(inputs)

            loss = criterion(predictions, labels)
            losses.append(loss.item())

            total_acc += (predictions.argmax(1) == labels).sum().item()
            total_count += labels.size(0)

    epoch_acc = total_acc / total_count
    epoch_loss = sum(losses) / len(losses)
    return epoch_loss, epoch_acc

# train
def train(model, model_name, save_model, optimizer, criterion, train_dataloader, val_dataloader, num_epochs, device):
    train_accs, train_losses = [], []
    eval_accs, eval_losses = [], []
    best_loss_eval = 100
    times = []
    for epoch in range(1, num_epochs + 1):
        epoch_start_time = time.time()
        # Training
        train_acc, train_loss = train_epoch(model, optimizer, criterion, train_dataloader, device, epoch)
        train_accs.append(train_acc)
        train_losses.append(train_loss)

        # Evaluation
        eval_acc, eval_loss = evaluate_epoch(model, criterion, val_dataloader, device)
        eval_accs.append(eval_acc)
        eval_losses.append(eval_loss)

        # Save best model
        if eval_loss < best_loss_eval:
            torch.save(model.state_dict(), save_model + f'/{model_name}.pt')

        times.append(time.time() - epoch_start_time)

        # Print loss, acc end epoch
        print("-" * 59)
        print(
            "| End of epoch {:3d} | time: {:5.2f}s | Train Accuracy {:8.3f} | Train Loss {:8.3f}"
            "| Valid Accuracy {:8.3f} | Valid Loss {:8.3f}".format(
                epoch, times.time() - epoch_start_time, train_acc, train_loss, eval_loss, eval_acc
            )
        )
        print("-" *59)

    # Load beset model
    model.load_state_dict(torch.load(save_model + f'/{model_name}.pt'))
    model.eval()
    metrics = {
            'train_accuracy' : train_accs,
            'train_loss' : train_losses,
            'valid_accuracy' : eval_accs,
            'valid_loss' : eval_losses,
            'time' : times
            }
    return model, metrics

In [None]:
# 4. Training from Scratch
# 4.1. Modeling
class TransformerEncoder(nn.Module):
    def __init__(self, embed_dim, num_heads, ff_dim, dropout=0.1):
        super().__init__()
        self.attn = nn.MultiheadAttention(
                                        embed_dim = embed_dim,
                                        num_heads = num_heads,
                                        batch_first = True
                                        )
        self.ffn = nn.Sequential(
                                nn.Linear(in_features=embed_dim, out_features=ff_dim, bias=True),
                                nn.ReLU(),
                                nn.Linear(in_features=ff_dim, out_features=embed_dim, bias=True)
                                )
        self.layernorm_1 = nn.LayerNorm(normalized_shape=embed_dim, eps=1e-6)
        self.layernorm_2 = nn.LayerNorm(normalized_shape=embed_dim, eps=1e-6)
        self.dropout_1 = nn.Dropout(p=dropout)
        self.dropout_2 = nn.Dropout(p=dropout)

    def forward(self, query, key, value):
        att_output, _ = self.attn(query, key, value)
        attn_output = self.dropout_1(attn_output)
        out_1 = self.layernorm_1(query + attn_output)

        ffn_output = self.ffn(out_1)
        ffn_output = self.dropout_2(ffn_output)
        out_2 = self.layernorm_2(out_1 + ffn_output)
        return out_2

class PactchPositionEmbedding(nn.Module):
    def __init__(self, image_size=224, embed_dim=512, patch_size=16, device='cpu'):
        super().__init__()
        self.conv1 = nn.Conv2d(
                            in_channels=3,
                            out_channels=embed_dim,
                            kernel_size=patch_size,
                            stride=patch_size,
                            bias=False
                            )
        scale = embed_dim ** -0.5
        self.positional_embedding = nn.Parameter(scale * torch.randn((image_size // patch_size) ** 2, embed_dim))
        self.device = device

    def forward(self, x):
        x = self.conv1(x)
        x = x.reshape(x.shape[0], x.shape[1], -1)
        x = x.permute(0, 2, 1)

        x = x + self.positional_embedding.to(self.device)
        return x

class VisionTransformerCls(nn.Module):
    def __init__(self,
                 image_size,
                 embed_dim,
                 num_heads,
                 ff_dim,
                 dropout=0.1,
                 device='cpu',
                 num_classes = 10,
                 patch_size = 16):
        super().__init__()
        self.embd_layer = PactchPositionEmbedding(image_size=image_size,
                                                  embed_dim=embed_dim,
                                                  patch_size=patch_size,
                                                  device=device
                                                )
        self.transformer_layer = TransformerEncoder(embed_dim, num_heads, ff_dim, dropout)

        # self.pooling = nn.AvgPool1d(kernel_size=max_length)
        self.fc1 = nn.Linear(in_features=embed_dim, out_features=20)
        self.fc2 = nn.Linear(in_features=20, out_features=num_classes)
        self.drouout = nn.Dropout(p=dropout)
        self.relu = nn.ReLU()

    def forward(self, x):
        output = self.embd_layer(x)
        output = self.transformer_layer(output, output, output)
        output = output[:, 0, :]
        output = self.dropout(output)
        output = self.fc1(output)
        output = self.drouout(output)
        output = self.fc2(output)
        return output

# 4.2. Training
image_size = 224
embed_dim = 512
num_heads = 4
ff_dim = 128
dropout = 0.1

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = VisionTransformerCls(image_size,
                            embed_dim,
                            num_heads,
                            ff_dim,
                            dropout,
                            num_classes,
                            device)
model.to(device)

criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0005, weight_decay=0.0001)

num_epochs = 100
save_model = './model'
os.makedirs(save_model, exist_ok=True)
model_name = 'vit_flowers'

model, metrics = train(
                        model, model_name, save_model, optimizer, criterion, train_loader,
                        valid_loader, num_epochs, device)

In [None]:
# 5. Fine Tuning
# 5.1. Modeling
from transformers import ViTForImageClassification # type: ignore

id2label = {id: label for id, label in enumerate(classes)}
label2id = {label :id for id, label in id2label.items()}

model = ViTForImageClassification.from_pretrained('google/vit-base-patch16-224-in21k',
                                                    num_labels = num_classes,
                                                    id2label = id2label,
                                                    label2id = label2id
                                                  )
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# 5.2. Metric
import evaluate # type: ignore
import numpy as np # type: ignore

metric = evaluate.Load('accuracy')

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    predictions = np.argmax(logits, axis=-1)
    return metric.compute(predictions=predictions, references=labels)

# 5.3. Trainer

import torch # type: ignore
from transformers import ViTImageProcessor, TrainingArguments, Trainer # type: ignore

feature_extractor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224-in21k')
metric_name = 'accuracy'

args = TrainingArguments("vit_flowers",
                        save_strategy='epoch',
                        evaluation_strategy='epoch',
                        learning_rate=2e-5,
                        per_device_train_batch_size =32,
                        per_device_eval_batch_size =32,
                        num_train_epochs =10,
                        weight_decay =0.01,
                        load_best_model_at_end =True,
                        metric_for_best_model = metric_name,
                        logging_dir ='logs',
                        remove_unused_columns =False
                         )

def collate_fn(examples):
    # example => Tuple(image, label)
    pixel_values = torch.stack([example[0] for example in examples])
    labels = torch.tensor([example[1] for example in examples])
    return {
            'pixel_values': pixel_values,
            'labels': labels
            }

trainer = Trainer(
                model,
                args,
                train_dataset = train_dataset,
                eval_dataset = valid_dataset,
                data_collator = collate_fn,
                compute_metrics = compute_metrics,
                tokenizer = feature_extractor
                )

# 5.4. Training
trainer.train()
outputs = trainer.predict(test_dataset)
outputs.metrics