![](https://storage.googleapis.com/mle-courses-prod/users/61b6fa1ba83a7e37c8309756/private-files/0c1001d0-6458-11ef-9b72-9db6eacc12d1-RAGGraph___Neo4J___CamelAI__1_.png)

[ROFORMER: ENHANCED TRANSFORMER WITH ROTARY
POSITION EMBEDDING](https://arxiv.org/pdf/2104.09864v5)

RoFormer thêm thông tin vị trí vào **vector q** và k thay vì phải tạo một lớp chỉ positional Embedding.

RoFormer áp dụng việc xoay **vector q, k** với một góc không đổi để tăng mối quan hệ vị trí tương đối

- Ví dụ ở vị trí trong câu từ m=1 đến m=2 và vị trí m=2 đến m=3, ở cùng một vị trí embedding ví dụ i = 0 vector q sẽ quay một góc giống nhau. Tương tự vector k cũng quay một góc giống nhau


![](https://storage.googleapis.com/mle-courses-prod/users/61b6fa1ba83a7e37c8309756/private-files/7313bb20-6457-11ef-b0a7-998b84b38d43-Screen_Shot_2024_08_27_at_16.33.26.png)

1. **First Equation:**

   $$
   \{q, k\}_m = R_{\Theta, m} \{q, k\}
   $$

2. **Second Equation:**

   $$
   \{q, k\} = W_{\{q, k\}} x_m
   $$

3. **Third Equation (Rotation Matrix):**

   $$
   R_{\Theta, m} = \begin{pmatrix}
   \cos(m \theta_i) & -\sin(m \theta_i) \\
   \sin(m \theta_i) & \cos(m \theta_i)
   \end{pmatrix}
   $$

4. **Fourth Equation (Theta Calculation):**

   $$
   \theta_i = 10000^{-2i/d}
   $$


In [None]:
!pip install tiktoken

In [None]:
!gdown 1WTjIveEsM7XpN28xm6F1qgX57QxaoFI_

!gdown 1WbyeG8f-V7VmpKdQam-0tNg4x6XDWoML

In [None]:
!gdown 1isX7s6hChuJ4GHkVwP80C9DZtbfv1FXl

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import tiktoken
import pandas as pd
from sklearn.model_selection import train_test_split

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
import pandas as pd

# Đọc tệp CSV huấn luyện vào một DataFrame
train_df = pd.read_csv('/kaggle/working/train_set.csv')

# Hiển thị vài hàng đầu tiên của DataFrame huấn luyện
print("Training Set:")
train_df.head()


In [None]:
# Rotary Embedding and apply_rotary_pos_emb function
class RotaryEmbedding(nn.Module):
    def __init__(self, dim, base=10000):
        super(RotaryEmbedding, self).__init__()
        inv_freq = 1.0 / (base ** (torch.arange(0, dim, 2).float() / dim))
        self.register_buffer('inv_freq', inv_freq)

    def forward(self, seq_len):
        # Generate a range for sequence length and reshape for broadcasting
        t = torch.arange(seq_len, device=self.inv_freq.device).type_as(self.inv_freq).unsqueeze(1)
        # Calculate the frequency embeddings using broadcasting instead of einsum
        freqs = t * self.inv_freq.unsqueeze(0)  # Shape: [seq_len, dim//2]
        emb = torch.cat((freqs, freqs), dim=-1)  # Duplicate to match input dimension
        return emb[None, :, :]  # Shape: [1, seq_len, dim]

def apply_rotary_pos_emb(q, k, sinusoidal_pos):
    # Split the query and key tensors into even and odd dimensions
    q_cos, q_sin = q[..., 0::2], q[..., 1::2]
    k_cos, k_sin = k[..., 0::2], k[..., 1::2]

    # Split the positional encodings into cosine and sine parts
    cos, sin = sinusoidal_pos[..., 0::2], sinusoidal_pos[..., 1::2]

    # Apply rotary embeddings without einsum, element-wise operations
    q_rot = torch.cat([q_cos * cos - q_sin * sin, q_cos * sin + q_sin * cos], dim=-1)
    k_rot = torch.cat([k_cos * cos - k_sin * sin, k_cos * sin + k_sin * cos], dim=-1)

    return q_rot, k_rot

In [None]:
class TextDataset(Dataset):
    def __init__(self, dataframe, tokenizer, device):
        self.titles = dataframe['title'].str.lower().values
        self.labels = dataframe['label_numeric'].values
        self.tokenizer = tokenizer
        self.device = device

    def __len__(self):
        return len(self.titles)

    def __getitem__(self, idx):
        title = self.titles[idx]
        label = self.labels[idx]
        encoding = self.tokenizer.encode(title)
        input_ids = torch.tensor(encoding, dtype=torch.long).to(self.device)
        label = torch.tensor(label, dtype=torch.long).to(self.device)
        return input_ids, label

In [None]:
# Updated collate function that handles device placement
def collate_fn(batch):
    # Extract device from the first item in the batch
    device = batch[0][0].device if len(batch) > 0 else torch.device('cpu')
    
    input_ids = [item[0] for item in batch]
    labels = [item[1] for item in batch]
    
    # Find max length
    max_length = max(len(ids) for ids in input_ids)
    
    # Pad sequences and stack
    padded_input_ids = []
    for ids in input_ids:
        padding = torch.zeros(max_length - len(ids), dtype=torch.long, device=device)
        padded_input_ids.append(torch.cat([ids, padding]))
    
    input_ids = torch.stack(padded_input_ids)
    labels = torch.stack(labels)
    
    return input_ids, labels

In [None]:
# Transformer Encoder Layer with Rotary Position Embedding
class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.1):
        super(TransformerEncoderLayer, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model
        self.depth = d_model // num_heads

        self.rotary_emb = RotaryEmbedding(self.depth)

        # Linear layers for Q, K, V matrices
        self.wq = nn.Linear(d_model, d_model)
        self.wk = nn.Linear(d_model, d_model)
        self.wv = nn.Linear(d_model, d_model)

        # Output linear transformation
        self.dense = nn.Linear(d_model, d_model)

        # Feed-forward network
        self.feed_forward = nn.Sequential(
            nn.Linear(d_model, d_ff),
            nn.ReLU(),
            nn.Linear(d_ff, d_model)
        )

        # Layer normalization and dropout
        self.layernorm1 = nn.LayerNorm(d_model)
        self.layernorm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def split_heads(self, x, batch_size):
        x = x.view(batch_size, -1, self.num_heads, self.depth)
        return x.transpose(1, 2)

    def scaled_dot_product_attention(self, q, k, v, mask=None):
        matmul_qk = torch.matmul(q, k.transpose(-2, -1))
        dk = torch.tensor(k.size(-1), dtype=torch.float32, device=q.device)
        scaled_attention_logits = matmul_qk / torch.sqrt(dk)

        if mask is not None:
            scaled_attention_logits = scaled_attention_logits.masked_fill(mask == 0, -1e9)

        attention_weights = torch.nn.functional.softmax(scaled_attention_logits, dim=-1)
        output = torch.matmul(attention_weights, v)

        return output, attention_weights

    def forward(self, x, mask=None):
        batch_size = x.size(0)
        seq_len = x.size(1)

        # Apply linear layers and split into heads
        q = self.split_heads(self.wq(x), batch_size)
        k = self.split_heads(self.wk(x), batch_size)
        v = self.split_heads(self.wv(x), batch_size)

        # Rotary position embedding
        sinusoidal_pos = self.rotary_emb(seq_len)
        q_rot, k_rot = apply_rotary_pos_emb(q, k, sinusoidal_pos)

        # Apply the custom scaled dot-product attention
        scaled_attention, _ = self.scaled_dot_product_attention(q_rot, k_rot, v, mask)

        # Transpose and reshape back to (batch_size, seq_len, d_model)
        scaled_attention = scaled_attention.transpose(1, 2).contiguous()
        concat_attention = scaled_attention.view(batch_size, -1, self.d_model)

        # Apply the final linear layer to combine the heads
        attn_output = self.dense(concat_attention)

        # Add & Norm
        x = self.layernorm1(x + self.dropout(attn_output))

        # Feed-forward
        ff_output = self.feed_forward(x)

        # Add & Norm
        x = self.layernorm2(x + self.dropout(ff_output))

        return x

In [None]:
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, embed_size, d_model, num_heads, d_ff, output_size, num_layers, dropout=0.1):
        super(TransformerModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.encoder_layers = nn.ModuleList([
            TransformerEncoderLayer(d_model, num_heads, d_ff, dropout)
            for _ in range(num_layers)
        ])
        self.fc = nn.Linear(d_model, output_size)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=None):
        x = self.embedding(x)  # (batch_size, seq_len, embed_size)
        for layer in self.encoder_layers:
            x = layer(x, mask)  # (batch_size, seq_len, d_model)
        x = x.mean(dim=1)  # (batch_size, d_model)
        x = self.fc(self.dropout(x))  # (batch_size, output_size)
        return x

In [None]:
# Load and preprocess data
train_df = pd.read_csv('/kaggle/working/train_set.csv')
validation_df = pd.read_csv('/kaggle/working/validation_set.csv')

from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("vinai/phobert-base")


In [None]:
# Initialize Datasets with device
train_dataset = TextDataset(train_df, tokenizer, device)
val_dataset = TextDataset(validation_df, tokenizer, device)

In [None]:
batch_size = 32
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

In [None]:
# Initialize the Transformer model
vocab_size = tokenizer.vocab_size
embed_size = 128
d_model = 128
num_heads = 8
d_ff = 256
output_size = len(train_df['label_numeric'].unique())
num_layers = 8
dropout = 0.2

model = TransformerModel(vocab_size, embed_size, d_model, num_heads, d_ff, output_size, num_layers, dropout)
model = model.to(device)
    
# Training loop
criterion = nn.CrossEntropyLoss().to(device)
optimizer = optim.AdamW(model.parameters(), lr=0.5)

In [None]:
# Training loop with model saving
best_val_accuracy = 0.0  # Track the best validation accuracy
best_model_path = 'best_model.pth'  # File path to save the best model

In [None]:
num_epochs = 150

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    train_correct = 0
    train_total = 0

    for input_ids, labels in train_dataloader:
        # No need to move tensors to device here as we already did it in TextDataset and collate_fn
        optimizer.zero_grad()
        outputs = model(input_ids)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Calculate training accuracy
        _, predicted = torch.max(outputs, 1)
        train_total += labels.size(0)
        train_correct += (predicted == labels).sum().item()
        train_loss += loss.item()

    # Calculate training metrics
    avg_train_loss = train_loss / len(train_dataloader)
    train_accuracy = 100 * train_correct / train_total
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {avg_train_loss:.4f}, Training Accuracy: {train_accuracy:.2f}%")

    # Validation step
    model.eval()
    val_correct = 0
    val_total = 0
    with torch.no_grad():
        for input_ids, labels in val_dataloader:
            # No need to move tensors to device here
            outputs = model(input_ids)
            _, predicted = torch.max(outputs, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    val_accuracy = 100 * val_correct / val_total
    print(f"Validation Accuracy after Epoch {epoch+1}: {val_accuracy:.2f}%")

    # Save model if validation accuracy improves
    if val_accuracy > best_val_accuracy:
        best_val_accuracy = val_accuracy
        torch.save(model.state_dict(), best_model_path)
    
    print()  # Add a blank line for readability

In [None]:
# Assuming the same model architecture
model.load_state_dict(torch.load('best_model.pth'), strict = False)
model.eval()  # Set to evaluation mode if using for inference

In [None]:
import torch
import pandas as pd

def predict_sentence(model, tokenizer, sentence):
    """
    Dự đoán nhãn cho một câu sử dụng mô hình GRU đã huấn luyện.

    Parameters:
    - model: Mô hình GRU đã huấn luyện.
    - tokenizer: GPT tokenizer đã được khởi tạo.
    - sentence: Câu cần dự đoán (chuỗi văn bản).

    Returns:
    - predicted_label: Nhãn dự đoán cho câu (số nguyên).
    """
    model.eval()
    sentence = sentence.lower()
    input_ids = tokenizer.encode(sentence)
    input_ids = torch.tensor(input_ids, dtype=torch.long, device = device).unsqueeze(0)

    with torch.no_grad():
      output = model(input_ids)

    _, predicted_label = torch.max(output, dim=1)

    return predicted_label.item()

# Đọc tập test từ file CSV
test_df = pd.read_csv('/kaggle/working/test_set_public.csv')

# Dự đoán nhãn cho từng tiêu đề trong tập test
test_df['label_numeric'] = test_df['title'].apply(lambda x: predict_sentence(model, tokenizer, x))

test_df.rename(columns={'_id': 'id'}, inplace=True)

test_df = test_df.drop('title', axis=1)

# Lưu kết quả vào file CSV
test_df.to_csv('your_submissions.csv', index=False)


In [None]:
test_df.head()