# Решение задачи ABSA как классификации токена с несколькими лейблами

В данной задаче лейбл для токена не соединяется в один тег, а предсатвляется в виде вектора нулей и единиц, где каждое число отвечает за определенный тег. Таким образом, лейбл может иметь несколько разных тегов, но не может иметь несколько одинаковых тегов. Теги выглядят следующим образом : ['O', 'B', 'I', 'E', 'S', 'POS', 'NEG', 'NEU']. Каждый элемент может быть выбран один раз. Это связано с особенностями функции потерь BCEWithLogitsLoss, которая может работать с нулями и единицами.

По сравнению с предыдущей задачей, помимо логики присваивания тега, я также поменяла функцию потерь и оценки результатов классификации. В остальном - код такой же, как и при одномерной классификации токенов.

In [1]:
#!g2.1
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings("ignore")

In [2]:
#!g2.1
from datasets import load_dataset
raw_datasets = load_dataset("alexcadillon/SemEval2014Task4", 'restaurants')
raw_datasets

Downloading builder script:   0%|          | 0.00/10.0k [00:00<?, ?B/s]

Downloading data files:   0%|          | 0/3 [00:00<?, ?it/s]

Downloading data:   0%|          | 0.00/35.7k [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/1.24M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/359k [00:00<?, ?B/s]

Extracting data files:   0%|          | 0/3 [00:00<?, ?it/s]

Generating trial split: 0 examples [00:00, ? examples/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

DatasetDict({
    trial: Dataset({
        features: ['sentenceId', 'text', 'aspectTerms', 'aspectCategories'],
        num_rows: 100
    })
    train: Dataset({
        features: ['sentenceId', 'text', 'aspectTerms', 'aspectCategories'],
        num_rows: 3041
    })
    test: Dataset({
        features: ['sentenceId', 'text', 'aspectTerms', 'aspectCategories'],
        num_rows: 800
    })
})

In [3]:
#!g2.1
sample = raw_datasets['train'][15]
sample

{'sentenceId': '3359',
 'text': 'The pizza is the best if you like thin crusted pizza.',
 'aspectTerms': [{'term': 'pizza',
   'polarity': 'positive',
   'from': '4',
   'to': '9'},
  {'term': 'thin crusted pizza',
   'polarity': 'neutral',
   'from': '34',
   'to': '52'}],
 'aspectCategories': [{'category': 'food', 'polarity': 'positive'}]}

In [4]:
#!g2.1
label_list = ['O', 'B', 'I', 'E', 'S', 'POS', 'NEG', 'NEU']

In [5]:
#!g2.1
id2label={i: l for i, l in enumerate(label_list)}
label2id={l: i for i, l in enumerate(label_list)}
label2id

{'O': 0, 'B': 1, 'I': 2, 'E': 3, 'S': 4, 'POS': 5, 'NEG': 6, 'NEU': 7}

In [6]:
#!g2.1
from nltk.tokenize import word_tokenize

def preprocess_text(example):
    terms = []
    polarities = []
    for i in example['aspectTerms']:
        terms.append(i['term'])
        polarities.append(i['polarity'])

    ner_tag = []
    tokens = []
    for term in terms:
        if len(term.split(' '))> 1:
            if polarities[terms.index(term)] == 'positive':
                token_list = term.split(' ')
                for ind, tok in enumerate(token_list):
                    if ind == 0:
                        ner_tag.append([1,5])
                        tokens.append(tok)
                    elif ind == len(token_list)-1:
                        ner_tag.append([3,5])
                        tokens.append(tok)
                    else:
                        ner_tag.append([2,5])
                        tokens.append(tok)
            elif polarities[terms.index(term)] == 'negative':
                token_list = term.split(' ')
                for ind, tok in enumerate(token_list):
                    if ind == 0:
                        ner_tag.append([1,6])
                        tokens.append(tok)
                    elif ind == len(token_list)-1:
                        ner_tag.append([3,6])
                        tokens.append(tok)
                    else:
                        ner_tag.append([2,6])
                        tokens.append(tok)
            else:
                token_list = term.split(' ')
                for ind, tok in enumerate(token_list):
                    if ind == 0:
                        ner_tag.append([1,7])
                        tokens.append(tok)
                    elif ind == len(token_list)-1:
                        ner_tag.append([3,7])
                        tokens.append(tok)
                    else:
                        ner_tag.append([2,7])
                        tokens.append(tok)


        else:
            if polarities[terms.index(term)] == 'positive':
                ner_tag.append([4,5])
                tokens.append(term)
            elif polarities[terms.index(term)] == 'negative':
                ner_tag.append([4,6])
                tokens.append(term)
            else:
                ner_tag.append([4,7])
                tokens.append(term)

    ner_tag_list_fin = []
    tokens_fin = []
    for token in word_tokenize(example['text']):
        if token in tokens:
            ner_tag_list_fin.append(ner_tag[tokens.index(token)])
            tokens_fin.append(token)
            ner_tag.pop(tokens.index(token))
            tokens.pop(tokens.index(token))
        else:
            ner_tag_list_fin.append([0,0])
            tokens_fin.append(token)

    example['ner_tag'] = ner_tag_list_fin
    example['tokens'] = tokens_fin
    
    return example

In [7]:
#!g2.1
preprocess_text(sample)

{'sentenceId': '3359',
 'text': 'The pizza is the best if you like thin crusted pizza.',
 'aspectTerms': [{'term': 'pizza',
   'polarity': 'positive',
   'from': '4',
   'to': '9'},
  {'term': 'thin crusted pizza',
   'polarity': 'neutral',
   'from': '34',
   'to': '52'}],
 'aspectCategories': [{'category': 'food', 'polarity': 'positive'}],
 'ner_tag': [[0, 0],
  [4, 5],
  [0, 0],
  [0, 0],
  [0, 0],
  [0, 0],
  [0, 0],
  [0, 0],
  [1, 7],
  [2, 7],
  [3, 7],
  [0, 0]],
 'tokens': ['The',
  'pizza',
  'is',
  'the',
  'best',
  'if',
  'you',
  'like',
  'thin',
  'crusted',
  'pizza',
  '.']}

In [8]:
#!g2.1
dataset = raw_datasets.map(
    preprocess_text,
    remove_columns = ['text', 'aspectTerms', 'aspectCategories']
)
dataset



Map:   0%|          | 0/100 [00:00<?, ? examples/s]

Map:   0%|          | 0/3041 [00:00<?, ? examples/s]

Map:   0%|          | 0/800 [00:00<?, ? examples/s]

DatasetDict({
    trial: Dataset({
        features: ['sentenceId', 'ner_tag', 'tokens'],
        num_rows: 100
    })
    train: Dataset({
        features: ['sentenceId', 'ner_tag', 'tokens'],
        num_rows: 3041
    })
    test: Dataset({
        features: ['sentenceId', 'ner_tag', 'tokens'],
        num_rows: 800
    })
})

In [9]:
#!g2.1
for token, ner_tag in zip(dataset['train'][50]['tokens'], dataset['train'][50]['ner_tag']):
    print(f'{token:_<40}{ner_tag}')

Three___________________________________[0, 0]
courses_________________________________[4, 7]
-_______________________________________[0, 0]
choices_________________________________[0, 0]
include_________________________________[0, 0]
excellent_______________________________[0, 0]
mussels_________________________________[4, 5]
,_______________________________________[0, 0]
puff____________________________________[1, 5]
pastry__________________________________[2, 5]
goat____________________________________[2, 5]
cheese__________________________________[3, 5]
and_____________________________________[0, 0]
salad___________________________________[1, 5]
with____________________________________[2, 5]
a_______________________________________[2, 5]
delicious_______________________________[2, 5]
dressing________________________________[3, 5]
,_______________________________________[0, 0]
and_____________________________________[0, 0]
a_______________________________________[0, 0]
hanger_______

In [10]:
#!g2.1
import os
os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [11]:
#!g2.1
from transformers import AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("bert-large-cased")

tokenizer_config.json:   0%|          | 0.00/29.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/762 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/213k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/436k [00:00<?, ?B/s]

In [12]:
#!g2.1
label_count = len(label_list)
def tokenize_and_align_labels(examples, label_all_tokens: bool = False):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)

    labels = []
    for i, label in enumerate(examples[f"ner_tag"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)  # Map tokens to their respective word.
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:  # Set the special tokens to -100.
            if word_idx is None:
                label_ids.append([-100 for l in range(label_count)])
            elif word_idx != previous_word_idx:  # Only label the first token of a given word.
                label_ids.append([1 if l in label[word_idx] else 0 for l in range(label_count)])
            else:
                label_ids.append([1 if l in label[word_idx] else 0 for l in range(label_count)]
                                     if label_all_tokens else [-100 for l in range(label_count)])
            previous_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

In [13]:
#!g2.1
tokenized_dataset = dataset.map(tokenize_and_align_labels, batched=True,
                               remove_columns = ['sentenceId', 'tokens', 'ner_tag'])
tokenized_dataset

Map:   0%|          | 0/100 [00:00<?, ? examples/s]

Map:   0%|          | 0/3041 [00:00<?, ? examples/s]

Map:   0%|          | 0/800 [00:00<?, ? examples/s]

DatasetDict({
    trial: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 100
    })
    train: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 3041
    })
    test: Dataset({
        features: ['input_ids', 'token_type_ids', 'attention_mask', 'labels'],
        num_rows: 800
    })
})

In [14]:
#!g2.1
tokenized_dataset.set_format(
    "torch", columns=["input_ids", "attention_mask", "labels"], output_all_columns=True)

In [15]:
#!g2.1
for token, label in zip(tokenizer.convert_ids_to_tokens(tokenized_dataset['train'][10]['input_ids']), 
                        tokenized_dataset['train'][10]['labels']):
    print(f'{token:_<40}{label}')

[CLS]___________________________________tensor([-100, -100, -100, -100, -100, -100, -100, -100])
They____________________________________tensor([1, 0, 0, 0, 0, 0, 0, 0])
did_____________________________________tensor([1, 0, 0, 0, 0, 0, 0, 0])
not_____________________________________tensor([1, 0, 0, 0, 0, 0, 0, 0])
have____________________________________tensor([1, 0, 0, 0, 0, 0, 0, 0])
may_____________________________________tensor([0, 0, 0, 0, 1, 0, 1, 0])
##on____________________________________tensor([-100, -100, -100, -100, -100, -100, -100, -100])
##nai___________________________________tensor([-100, -100, -100, -100, -100, -100, -100, -100])
##se____________________________________tensor([-100, -100, -100, -100, -100, -100, -100, -100])
,_______________________________________tensor([1, 0, 0, 0, 0, 0, 0, 0])
forgot__________________________________tensor([1, 0, 0, 0, 0, 0, 0, 0])
our_____________________________________tensor([1, 0, 0, 0, 0, 0, 0, 0])
toast_______________________

In [16]:
#!g2.1
from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)



In [17]:
#!g2.1
from transformers import AutoModelForTokenClassification, TrainingArguments, Trainer

In [18]:
#!g2.1
from typing import Optional
import torch
from torch import nn
class MultiLabelNERTrainer(Trainer):
    def __init__(self, *args, class_weights: Optional[torch.FloatTensor] = None, **kwargs):
        super().__init__(*args, **kwargs)
        if class_weights is not None:
            class_weights = class_weights.to(self.args.device)
            logging.info(f"Using multi-label classification with class weights", class_weights)
        self.loss_fct = nn.BCEWithLogitsLoss(weight=class_weights)

    def compute_loss(self, model, inputs, return_outputs=False):
        """
        How the loss is computed by Trainer. By default, all models return the loss in the first element.
        Subclass and override for custom behavior.
        """
        labels  = inputs.pop("labels")
        outputs = model(**inputs)
        
        # this accesses predictions for tokens that aren't CLS, PAD, or the 2nd+ subword in a word
        # and simultaneously flattens the logits or labels
        flat_outputs = outputs.logits[labels!=-100] 
        flat_labels  = labels[ labels!=-100]
        
        try:
            loss = self.loss_fct(flat_outputs, flat_labels.float())
        except AttributeError:  # DataParallel
            loss = self.loss_fct(flat_outputs, flat_labels.float())

        return (loss, outputs) if return_outputs else loss

In [19]:
#!g2.1
from transformers import EvalPrediction
from sklearn.metrics import f1_score, roc_auc_score, accuracy_score
import torch

sigmoid = torch.nn.Sigmoid()

def compute_metrics(p, threshold=0.5):
    predictions, labels = p

    cleaned_predictions = [
        [p for (p, l) in zip(prediction, label) if [i for i in l if i != -100]]
        for prediction, label in zip(predictions, labels)]
    true_predictions = []
    for prediction in cleaned_predictions:
        temp = sigmoid(torch.Tensor(prediction))
        y_pred = np.zeros(temp.shape)
        y_pred[np.where(temp >= 0.5)] = 1
        true_predictions.extend(y_pred)
    
    cleaned_labels = [
    [l for (p, l) in zip(prediction, label) if [i for i in l if i != -100]]
    for prediction, label in zip(predictions, labels)]
    true_labels = []
    for label in cleaned_labels:
        temp = np.array(label)
        true_labels.extend(temp)
        
    f1_micro_average = f1_score(y_true=true_labels, y_pred=true_predictions, average='macro')
    roc_auc = roc_auc_score(true_labels, true_predictions, average = 'macro')
    accuracy = accuracy_score(true_labels, true_predictions)
    
    metrics = {'f1': f1_micro_average,
               'roc_auc': roc_auc,
               'accuracy': accuracy}

    return metrics

In [20]:
#!g2.1
model = AutoModelForTokenClassification.from_pretrained(
    "bert-large-cased", problem_type="multi_label_classification",
    num_labels=8, id2label=id2label, label2id=label2id)

training_args = TrainingArguments(
    output_dir="token_class_model",
    learning_rate=5e-06,
    per_device_train_batch_size=1,
    per_device_eval_batch_size=1,
    num_train_epochs=7,
    weight_decay=0.1,
    evaluation_strategy="epoch",
    push_to_hub=False,
    save_strategy="no", 
    group_by_length=True,
    warmup_ratio=0.1,
    optim="adamw_torch",
    lr_scheduler_type="cosine",
)

trainer = MultiLabelNERTrainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_dataset["train"],
    eval_dataset=tokenized_dataset["test"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

trainer.train()

model.safetensors:   0%|          | 0.00/1.34G [00:00<?, ?B/s]

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-large-cased and are newly initialized: ['classifier.weight', 'classifier.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
You're using a BertTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


Epoch,Training Loss,Validation Loss,F1,Roc Auc,Accuracy
1,0.0594,0.051822,0.697294,0.832431,0.929171
2,0.0331,0.045367,0.733207,0.85539,0.937094
3,0.0231,0.043288,0.794379,0.869353,0.950008
4,0.0137,0.044566,0.821954,0.894868,0.955158
5,0.0059,0.050844,0.821994,0.892175,0.956584
6,0.0066,0.054592,0.8133,0.885446,0.956663
7,0.0034,0.054683,0.813375,0.885847,0.956821


TrainOutput(global_step=21287, training_loss=0.033653985380471194, metrics={'train_runtime': 1666.7125, 'train_samples_per_second': 12.772, 'train_steps_per_second': 12.772, 'total_flos': 760931237937024.0, 'train_loss': 0.033653985380471194, 'epoch': 7.0})

In [21]:
#!g2.1
trial = trainer.predict(tokenized_dataset['trial'])

In [22]:
#!g2.1
preds = trial.predictions[0]
label = trial.label_ids[0]

In [23]:
#!g2.1
def return_labels(preds, label):   
    all_preds = []
    for pred, lbl in zip(preds, label):
        true_pred = [p for (p,l) in zip(pred, lbl) if l != -100]
        if len(true_pred) > 0:
            temp = sigmoid(torch.Tensor(true_pred))
            y_pred = np.zeros(temp.shape)
            y_pred[np.where(temp >= 0.5)] = 1
            all_preds.append(y_pred.tolist())
    
    tags = []
    for token in all_preds:
        if sum(i for i in token)> 1:
            temp = []
            for indx, element in enumerate(token):
                #temp = []
                if element !=0:
                    temp.append(id2label[indx])
            tags.append(temp)
        else:
            for indx, element in enumerate(token):
                if element !=0:
                    tags.append(id2label[indx])
    return tags    

In [24]:
#!g2.1
raw_datasets['trial'][5]

{'sentenceId': '1609',
 'text': 'Service was quick.',
 'aspectTerms': [{'term': 'Service',
   'polarity': 'positive',
   'from': '0',
   'to': '7'}],
 'aspectCategories': [{'category': 'service', 'polarity': 'positive'}]}

In [25]:
#!g2.1
for token, ner_tag in zip(dataset['trial'][5]['tokens'], return_labels(trial.predictions[5], trial.label_ids[5])):
    print(f'{token:_<40}{ner_tag}')

Service_________________________________['S', 'POS']
was_____________________________________O
quick___________________________________O
._______________________________________O


In [26]:
#!g2.1
raw_datasets['trial'][99]

{'sentenceId': '3041',
 'text': "We've only eaten in the restaurant once, but we have ordered many times for dinner.",
 'aspectTerms': [{'term': 'dinner',
   'polarity': 'neutral',
   'from': '76',
   'to': '82'}],
 'aspectCategories': [{'category': 'anecdotes/miscellaneous',
   'polarity': 'neutral'}]}

In [27]:
#!g2.1
for token, ner_tag in zip(dataset['trial'][99]['tokens'], return_labels(trial.predictions[99], trial.label_ids[99])):
    print(f'{token:_<40}{ner_tag}')

We______________________________________O
've_____________________________________O
only____________________________________O
eaten___________________________________O
in______________________________________O
the_____________________________________O
restaurant______________________________O
once____________________________________O
,_______________________________________O
but_____________________________________O
we______________________________________O
have____________________________________O
ordered_________________________________O
many____________________________________O
times___________________________________O
for_____________________________________O
dinner__________________________________['S', 'NEU']
._______________________________________O


In [28]:
#!g2.1
raw_datasets['trial'][40]

{'sentenceId': '3170',
 'text': 'Good spreads, great beverage selections and bagels really tasty.',
 'aspectTerms': [{'term': 'spreads',
   'polarity': 'positive',
   'from': '5',
   'to': '12'},
  {'term': 'beverage selections',
   'polarity': 'positive',
   'from': '20',
   'to': '39'},
  {'term': 'bagels', 'polarity': 'positive', 'from': '44', 'to': '50'}],
 'aspectCategories': [{'category': 'food', 'polarity': 'positive'}]}

In [29]:
#!g2.1
for token, ner_tag in zip(dataset['trial'][40]['tokens'], return_labels(trial.predictions[40], trial.label_ids[40])):
    print(f'{token:_<40}{ner_tag}')

Good____________________________________O
spreads_________________________________['S', 'POS']
,_______________________________________O
great___________________________________O
beverage________________________________['B', 'POS']
selections______________________________['E', 'POS']
and_____________________________________O
bagels__________________________________['S', 'POS']
really__________________________________O
tasty___________________________________O
._______________________________________O


In [30]:
#!g2.1
for token, ner_tag, label in zip(dataset['trial'][40]['tokens'], return_labels(trial.predictions[40], trial.label_ids[40]),\
                                 dataset['trial'][40]['ner_tag']):
    print(f'{token:_<40}{ner_tag} {label}')

Good____________________________________O [0, 0]
spreads_________________________________['S', 'POS'] [4, 5]
,_______________________________________O [0, 0]
great___________________________________O [0, 0]
beverage________________________________['B', 'POS'] [1, 5]
selections______________________________['E', 'POS'] [3, 5]
and_____________________________________O [0, 0]
bagels__________________________________['S', 'POS'] [4, 5]
really__________________________________O [0, 0]
tasty___________________________________O [0, 0]
._______________________________________O [0, 0]


In [31]:
#!g2.1
trial = trainer.predict(tokenized_dataset['test'])

predictions = []
for i in range(len(trial.predictions)):
    pred = return_labels(trial.predictions[i], trial.label_ids[i])
    predictions.append(pred)

count_total = 0
count_matches = 0
for label, prediction in zip(dataset['test']['ner_tag'],predictions):
    for lbl, pred in zip(label, prediction):
        if lbl != [0,0]:
            count_total += 1
            new_label = []
            for item in lbl:
                
                new_item = id2label[item]
                new_label.append(new_item)
            if new_label == pred:
                count_matches += 1
print(f'Accuracy on E2E ABSA: {count_matches/count_total}')

Accuracy on E2E ABSA: 0.7214640198511166


# Финальные выводы:

Результат классификации токенов получился немного лучше, чем при одномерных тегах, но точность извлечения и классификации токена не сильно поменялась. В следующем эксперименте выберу другоую модель за основу - Albert - и посмотрю, как это подвлияет на результат