In [1]:
import torch
import torch.nn as nn
import pandas as pd
from transformers import BertTokenizer, BertModel, Trainer, TrainingArguments, XLNetTokenizer, XLNetModel
from torch.utils.data import Dataset, DataLoader, Subset
from sklearn.metrics import *
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
import seaborn as sns




In [None]:
class XLNetRCNN(nn.Module):
    def __init__(self, xlnet_model_name='xlnet/xlnet-base-cased', device='cpu'):
        super(XLNetRCNN, self).__init__()
        self.xlnet_model = XLNetModel.from_pretrained(xlnet_model_name)
        self.lstm = nn.LSTM(
            input_size=768,
            hidden_size=256,
            num_layers=3,
            bias=True,
            batch_first=True,
            dropout=0.2,
            bidirectional=False
        )
        self.conv1d = nn.Conv1d(
            in_channels=256,
            out_channels=128,
            kernel_size=3,
            stride=1,
            padding=1
        )
        self.fc = nn.Linear(in_features=128, out_features=3)
        self.device = device

    def forward(self, input_ids, attention_mask=None, labels=None):
        outputs = self.xlnet_model(input_ids=input_ids, attention_mask=attention_mask)
        x = outputs.last_hidden_state
        x, _ = self.lstm(x)
        x = x.permute(0, 2, 1)
        x = self.conv1d(x)
        x = x.mean(dim=2)
        logits = self.fc(x)

        # Nếu có nhãn (labels), tính toán và trả về loss
        loss = None
        if labels is not None:
            loss_fct = nn.CrossEntropyLoss()
            loss = loss_fct(logits, labels)

        return (loss, logits) if loss is not None else logits

In [None]:
class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = str(self.texts[idx])
        label = self.labels[idx]

        encoding = self.tokenizer.encode_plus(
            text,
            add_special_tokens=True,
            max_length=self.max_length,
            padding='max_length',
            truncation=True,
            return_tensors='pt'
        )

        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'labels': torch.tensor(label, dtype=torch.long)
        }

In [None]:
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    accuracy = accuracy_score(labels, preds)
    precision = precision_score(labels, preds, average='weighted')
    recall = recall_score(labels, preds, average='weighted')
    f1 = f1_score(labels, preds, average='weighted')
    return {
        'accuracy': accuracy,
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = XLNetRCNN(device=device)
tokenizer = XLNetTokenizer.from_pretrained('xlnet/xlnet-base-cased')
max_length = 512
batch_size = 32

In [None]:
# Dataset
with open('/kaggle/input/semeval/train_text.txt', 'r') as f:
  data = f.readlines()
  train_texts = []
  for line in data:
    train_texts.append(line.strip())

with open('/kaggle/input/semeval/train_labels.txt', 'r') as f:
  data = f.readlines()
  train_labels = []
  for line in data:
    train_labels.append(int(line.strip()))

with open('/kaggle/input/semeval/val_text.txt', 'r') as f:
  data = f.readlines()
  val_texts = []
  for line in data:
    val_texts.append(line.strip())

with open('/kaggle/input/semeval/val_labels.txt', 'r') as f:
  data = f.readlines()
  val_labels = []
  for line in data:
    val_labels.append(int(line.strip()))

with open('/kaggle/input/semeval/test_text.txt', 'r') as f:
  data = f.readlines()
  test_texts = []
  for line in data:
    test_texts.append(line.strip())

with open('/kaggle/input/semeval/test_labels.txt', 'r') as f:
  data = f.readlines()
  test_labels = []
  for line in data:
    test_labels.append(int(line.strip()))

In [None]:
# preprocessing text
# remove emoji
import re

def remove_emojis(text):
    emoji_pattern = re.compile(
        "["
        u"\U0001F600-\U0001F64F"  # emoticons
        u"\U0001F300-\U0001F5FF"  # symbols & pictographs
        u"\U0001F680-\U0001F6FF"  # transport & map symbols
        u"\U0001F700-\U0001F77F"  # alchemical symbols
        u"\U0001F780-\U0001F7FF"  # Geometric Shapes Extended
        u"\U0001F800-\U0001F8FF"  # Supplemental Arrows-C
        u"\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
        u"\U0001FA00-\U0001FA6F"  # Chess Symbols
        u"\U0001FA70-\U0001FAFF"  # Symbols and Pictographs Extended-A
        u"\U00002702-\U000027B0"  # Dingbats
        u"\U000024C2-\U0001F251" 
        "]+", flags=re.UNICODE)
    
    return emoji_pattern.sub(r'', text)

# lower case
def lower_case(text:str):
    return text.lower()

# remove @user
def remove_mentions(text):
    return re.sub(r'@\w+', '', text)    

def remove_tag(text: str):
    return re.sub(r'#\w+', '', text)

def remove_urls(text):
    url_pattern = re.compile(r'http\S+|www\S+|https\S+')
    return url_pattern.sub(r'', text)

from bs4 import BeautifulSoup

def remove_html_tags(text):
    soup = BeautifulSoup(text, "html.parser")
    return soup.get_text()

def clean_text(data_text: list):
  clean = []
  for text in data_text:
    text = remove_emojis(text)
    text = lower_case(text)
    text = remove_mentions(text)
    text = remove_tag(text)
    text = remove_urls(text)
    text = remove_html_tags(text)

    clean.append(text)
  return clean

In [None]:
max_length = 512

train_clean = clean_text(train_texts)
val_clean = clean_text(val_texts)
test_clean = clean_text(test_texts)

train_dataset = CustomDataset(train_clean, train_labels, tokenizer, max_length=max_length)
val_dataset = CustomDataset(val_clean, val_labels, tokenizer, max_length=max_length)
test_dataset = CustomDataset(test_clean, test_labels, tokenizer, max_length=max_length)

In [None]:
training_args = TrainingArguments(
    output_dir='./results',
    eval_strategy='steps',
    save_strategy='steps',
    save_steps=500,
    save_total_limit=3,
    logging_dir='./logs',
    logging_steps=100,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    num_train_epochs=3,
    load_best_model_at_end=True,
    metric_for_best_model='accuracy',
    greater_is_better=True
)

# Khởi tạo Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    compute_metrics=compute_metrics
)

# Huấn luyện mô hình
trainer.train()

In [None]:
predictions = trainer.predict(test_dataset)
preds = predictions.predictions.argmax(-1)
labels = predictions.label_ids

In [None]:
trainer.save_model('./best_model')