In [None]:
# General Libraries
#! pip install pandas numpy keras nltk spacy tensorflow torch


In [None]:
#! pip install scikit-learn

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
import os
from sklearn.metrics import f1_score, recall_score, accuracy_score
import numpy as np
import torch
from tqdm import tqdm

In [9]:
import tensorflow as tf

tf.keras.backend.clear_session()

# clear gpu memory using torch
import torch
torch.cuda.empty_cache()

# clear output
from IPython.display import clear_output
clear_output()

In [10]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#device = torch.device("cpu")
device

device(type='cuda')

In [15]:
test_path = ("/content/drive/MyDrive/ViHOS/data/Sequence_labeling_based_version/Word/train_BIO_Word.csv")

In [11]:
from transformers import (
    AutoModel, AutoConfig, XLMRobertaModel,
    AutoTokenizer, AutoModelForSequenceClassification
)

input_model = XLMRobertaModel.from_pretrained("xlm-roberta-base")
tokenizer = AutoTokenizer.from_pretrained("xlm-roberta-base")
input_model.resize_token_embeddings(len(tokenizer))

clear_output()

# Data

In [2]:
import pandas as pd
import transformers
import torch
import torch.nn as nn
import pandas as pd

#clear output
from IPython.display import clear_output
clear_output()

In [3]:
def prepare_data(file_path):
    df = pd.read_csv(file_path)

    # remove nan
    df = df.dropna()
    df = df.reset_index(drop=True)

    texts = df['Word'].tolist()
    spans = df['Tag'].tolist()

    # convert spans to binary representation
    binary_spans = []
    for span in spans:
        binary_span = []
        span = span.split(' ')
        for s in span:
            if s == 'O':
                binary_span.append(0)
            else:
                binary_span.append(1)
        binary_spans.append(binary_span)

    return texts, binary_spans

In [4]:
# Dataloader function
class TextDataset(torch.utils.data.Dataset):
    def __init__(self, texts, spans, tokenizer, max_len):
        self.texts = texts
        self.spans = spans
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        span = self.spans[idx]

        # Tokenize and prepare the input data
        encoding = self.tokenizer(text, truncation=True, padding='max_length', max_length=self.max_len, return_tensors='pt')

        # Convert tensors to appropriate shape
        input_ids = encoding['input_ids'].squeeze(0)
        attention_mask = encoding['attention_mask'].squeeze(0)

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'spans': torch.tensor(span)  # Ensure spans is converted to tensor
        }

def create_dataloader(texts, spans, batch_size, tokenizer, max_len, shuffle=True):
    dataset = TextDataset(texts, spans, tokenizer, max_len)
    # return texts
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle)
    print(dataloader)
    return dataloader

In [16]:
batch_size = 32
test_dataloader = create_dataloader(*prepare_data(test_path), batch_size=batch_size, tokenizer = tokenizer, max_len=64)

<torch.utils.data.dataloader.DataLoader object at 0x78c485ece320>
<torch.utils.data.dataloader.DataLoader object at 0x78c57064d540>
<torch.utils.data.dataloader.DataLoader object at 0x78c57064c3a0>


In [None]:
def calculate_f1(preds, y):
    max_preds = preds.argmax(dim = 1, keepdim = True)
    return f1_score(y.cpu(), max_preds.cpu(), average='macro')

In [5]:
class MultiTaskModel(nn.Module):
    def __init__(self, input_model):
        super(MultiTaskModel, self).__init__()
        self.bert = input_model
        self.span_classifier = nn.Linear(768, 1)  
        self.dropout = nn.Dropout(0.1)

    def forward(self, input_ids, attention_mask):
        output = self.bert(input_ids=input_ids, attention_mask=attention_mask, return_dict=False)
        last_hidden_state = output[0]
        last_hidden_state = self.dropout(last_hidden_state)
        span_logits = self.span_classifier(last_hidden_state)

        span_logits = span_logits.mean(dim=1)  # Tính trung bình theo chiều 1 để giảm kích thước thành [batch_size, 1]
        span_logits = span_logits.unsqueeze(-1)  # Thêm chiều cuối cùng để có kích thước [batch_size, 1, 1]
        span_logits = torch.sigmoid(span_logits)

        return span_logits


# Load and test

In [12]:
# Load the model
device = torch.device("cpu")

model = MultiTaskModel(input_model=input_model)  # Reinitialize your model architecture
model.load_state_dict(torch.load("/content/drive/MyDrive/ViHOS/data/ViHos_40epoch.pth", map_location=torch.device('cpu')))
model.to(device)  # Move the model to the appropriate device (GPU or CPU)
model.eval()  # Set the model to evaluation mode


MultiTaskModel(
  (bert): XLMRobertaModel(
    (embeddings): XLMRobertaEmbeddings(
      (word_embeddings): Embedding(250002, 768, padding_idx=1)
      (position_embeddings): Embedding(514, 768, padding_idx=1)
      (token_type_embeddings): Embedding(1, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): XLMRobertaEncoder(
      (layer): ModuleList(
        (0-11): 12 x XLMRobertaLayer(
          (attention): XLMRobertaAttention(
            (self): XLMRobertaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): XLMRobertaSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
            

In [13]:
def test(model, test_dataloader, device):
    model.eval()  # Set the model to evaluation mode
    span_preds = []
    span_targets = []

    for batch in tqdm(test_dataloader):
        # Extract components from the batch
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        spans = batch['spans'].float().to(device)  # Ensure spans are in float for loss computation

        with torch.no_grad():  # Disable gradient computation for inference
            span_logits = model(input_ids, attention_mask)  # Get model predictions

        # Append predictions and targets for later evaluation
        span_preds.append(span_logits.squeeze().cpu().numpy().flatten())
        span_targets.append(spans.cpu().numpy().flatten())

    # Concatenate all predictions and targets
    span_preds = np.concatenate(span_preds)
    span_targets = np.concatenate(span_targets)

    # Binarize predictions based on threshold
    span_preds = (span_preds > 0.5).astype(int)

    # Calculate metrics
    span_f1 = f1_score(span_targets, span_preds, average='macro')
    span_recall = recall_score(span_targets, span_preds, average='macro')
    span_accuracy = accuracy_score(span_targets, span_preds)

    print("Span F1 Score: {:.4f}".format(span_f1))
    print("Span Recall: {:.4f}".format(span_recall))
    print("Span Accuracy: {:.4f}".format(span_accuracy))


In [17]:
def create_subset(dataloader, subset_size=100):
    subset_texts = []
    subset_spans = []

    for i, batch in enumerate(dataloader):
        if i * dataloader.batch_size >= subset_size:
            break

        # Extract data based on batch structure
        input_ids = batch['input_ids']
        attention_mask = batch['attention_mask']
        spans = batch['spans']

        subset_texts.append({
            'input_ids': input_ids,
            'attention_mask': attention_mask
        })
        subset_spans.append(spans)

    # Convert lists to tensors if needed
    subset_texts = {
        'input_ids': torch.cat([x['input_ids'] for x in subset_texts], dim=0),
        'attention_mask': torch.cat([x['attention_mask'] for x in subset_texts], dim=0)
    }
    subset_spans = torch.cat(subset_spans, dim=0)

    return subset_texts, subset_spans

# Create a subset dataloader
def create_subset_dataloader(dataloader, subset_size=1000):
    subset_texts, subset_spans = create_subset(dataloader, subset_size)

    # Define a new dataset and dataloader
    class SubsetDataset(torch.utils.data.Dataset):
        def __init__(self, texts, spans):
            self.texts = texts
            self.spans = spans

        def __len__(self):
            return len(self.spans)

        def __getitem__(self, idx):
            return {
                'input_ids': self.texts['input_ids'][idx],
                'attention_mask': self.texts['attention_mask'][idx],
                'spans': self.spans[idx]
            }

    subset_dataset = SubsetDataset(subset_texts, subset_spans)
    subset_dataloader = torch.utils.data.DataLoader(
        subset_dataset, batch_size=dataloader.batch_size, shuffle=False, num_workers=4
    )
    return subset_dataloader

# Assuming test_dataloader is defined
subset_dataloader = create_subset_dataloader(test_dataloader, subset_size=1000)




In [None]:
# Example of using the model for inference
test(model, subset_dataloader, device)


  self.pid = os.fork()
100%|██████████| 32/32 [01:54<00:00,  3.56s/it]

Span F1 Score: 0.7199
Span Recall: 0.6888
Span Accuracy: 0.8564





## Sử dụng model để dự đoán từ hoặc cụm từ xấu/nhạy cảm/phân biệt,... được nhập từ người dùng

In [31]:
sentence = "clmn"

# Step 1: Tokenize the input sentence
encoding = tokenizer(sentence, truncation=True, padding='max_length', max_length=64, return_tensors='pt')

# Step 2: Prepare input tensors
input_ids = encoding['input_ids'].to(device)
attention_mask = encoding['attention_mask'].to(device)

# Step 3: Pass the tensors through the model
model.eval()  # Set the model to evaluation mode
with torch.no_grad():
    output = model(input_ids, attention_mask)

# Step 4: Interpret the output
# Assuming the model predicts binary spans (0 or 1)
span_logits = output.squeeze().cpu().numpy()
span_predictions = (span_logits > 0.5).astype(int)  # Convert probabilities to binary predictions

# Print the results
print("Input Sentence:", sentence)
print("Predicted Spans:", span_predictions)


Input Sentence: clmn
Predicted Spans: 1
