# Model

In [42]:


# https://www.kaggle.com/code/wuharlem/simple-bert-w-hinge-loss

from transformers import DebertaV2ForTokenClassification, DebertaV2Model
from typing import List, Optional, Tuple, Union
from transformers.modeling_outputs import TokenClassifierOutput
from torch.nn import CrossEntropyLoss
 
class CustomDeberta(DebertaV2ForTokenClassification):
    def __init__(self, config, class_weight=None):
        super().__init__(config)
        self.num_labels = config.num_labels

        self.deberta = DebertaV2Model(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)
        self.class_weight = torch.tensor(class_weight, dtype=torch.float32).to('cuda' if torch.cuda.is_available() else 'cpu')
        # Initialize weights and apply final processing
        self.post_init()
      
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple, TokenClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size, sequence_length)`, *optional*):
            Labels for computing the token classification loss. Indices should be in `[0, ..., config.num_labels - 1]`.
        """
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.deberta(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        sequence_output = outputs[0]

        sequence_output = self.dropout(sequence_output)
        logits = self.classifier(sequence_output)

        loss = None
        if labels is not None:
            labels = labels.to(logits.device)
            if self.class_weight is None:
                loss_fct = CrossEntropyLoss()
            else:
                loss_fct = CrossEntropyLoss(weight=self.class_weight)
                
            loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))

        if not return_dict:
            output = (logits,) + outputs[1:]
            return ((loss,) + output) if loss is not None else output

        return TokenClassifierOutput(
            loss=loss, logits=logits, hidden_states=outputs.hidden_states, attentions=outputs.attentions
        )

In [43]:
from transformers import AutoTokenizer

# https://huggingface.co/docs/transformers/tasks/token_classification
# model_checkpoint = "microsoft/deberta-v2-xlarge"
model_checkpoint = "microsoft/deberta-base"

tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)

In [44]:
import torch
import pandas as pd
import ast

print("CUDA: ", torch.cuda.is_available())

CUDA:  True


# Load data

In [45]:
ner_tags = [
"MAT",
"NMAT",
"DIMENSION",
"WEIGHT",
"TARGET_USER",
"PROPERTY",
"COLOR",
"SHAPE",
"SIZE",
]

processed_ner_tags = ['O']
for tag in ner_tags:
        processed_ner_tags.extend([f"B-{tag}", f"I-{tag}"])

print(processed_ner_tags)
print(len(processed_ner_tags))

ner_tags_2_number = {t: i for (i, t) in enumerate(processed_ner_tags)}
number_2_ner_tags = {t: i for (t, i) in enumerate(ner_tags_2_number)}

['O', 'B-MAT', 'I-MAT', 'B-NMAT', 'I-NMAT', 'B-DIMENSION', 'I-DIMENSION', 'B-WEIGHT', 'I-WEIGHT', 'B-TARGET_USER', 'I-TARGET_USER', 'B-PROPERTY', 'I-PROPERTY', 'B-COLOR', 'I-COLOR', 'B-SHAPE', 'I-SHAPE', 'B-SIZE', 'I-SIZE']
19


In [46]:
raw_dataset = pd.read_excel("../data/raw_data_restore_uppercase.xlsx")
raw_dataset.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3310 entries, 0 to 3309
Data columns (total 3 columns):
 #   Column    Non-Null Count  Dtype 
---  ------    --------------  ----- 
 0   sentence  3310 non-null   object
 1   locs      3310 non-null   object
 2   words     3310 non-null   object
dtypes: object(3)
memory usage: 77.7+ KB


In [47]:
from datasets import Dataset, DatasetDict, ClassLabel, Features, Value, Sequence

train_dataset = Dataset.from_pandas(raw_dataset)
val_dataset = Dataset.from_pandas(raw_dataset)
test_dataset = Dataset.from_pandas(raw_dataset)

raw_datasets = DatasetDict(
    {"train": train_dataset, "val": val_dataset, "test": test_dataset})
raw_datasets

DatasetDict({
    train: Dataset({
        features: ['sentence', 'locs', 'words'],
        num_rows: 3310
    })
    val: Dataset({
        features: ['sentence', 'locs', 'words'],
        num_rows: 3310
    })
    test: Dataset({
        features: ['sentence', 'locs', 'words'],
        num_rows: 3310
    })
})

In [48]:
def assign_ner_tags_deberta(example):
    # print(example['sentence'])
    token_input = tokenizer(example['sentence'])
    example['tokens'] = tokenizer.convert_ids_to_tokens(
        token_input['input_ids'])

    # print(example['sentence'])
    # print(example['tokens'])
    # print("len: ", len(example['tokens']))

    ner_tags = [0 for token in example['tokens']]
    if str(type(example['locs'])) == "<class 'list'>":
        locs = example['locs']
    else:
        locs = ast.literal_eval(example['locs'])

    locs = [(int(loc[0]), int(loc[1]), loc[2]) for loc in locs]
    locs = sorted(locs)
    bg_id = 1
    pre_loc = 0
    text = example['sentence']
    for loc in locs:
        loc0 = int(loc[0])
        loc1 = int(loc[1])

        pre_text = text[pre_loc:loc0]
        token_input = tokenizer(pre_text)
        pre_token = tokenizer.convert_ids_to_tokens(
            token_input['input_ids'])

        # print(pre_text)
        # print(pre_token)
        # print("len: ", len(pre_token))

        bg_id = bg_id + len(pre_token) - 2
        pre_loc = loc1

        # print("bg_id: ", bg_id)

        word = example['sentence'][loc0: loc1]
        token_input = tokenizer(word)
        word_token = tokenizer.convert_ids_to_tokens(token_input['input_ids'])

        label_number = ner_tags_2_number[f"B-{loc[2]}"]
        ner_tags[bg_id] = label_number
        bg_id += 1
        for idx in range(bg_id, bg_id + len(word_token) - 3):
            ner_tags[idx] = label_number + 1
        bg_id = bg_id + len(word_token) - 3

        # visualize_ner_tags(example['tokens'], ner_tags)

    ner_tags[0] = -100
    ner_tags[-1] = -100
    return ner_tags

In [49]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["sentence"], truncation=True, is_split_into_words=False
    )
    tokenized_inputs["labels"] = assign_ner_tags_deberta(examples)
    return tokenized_inputs

In [50]:
from datasets import Dataset

list_input_ids = []
list_token_type_ids = []
list_attention_mask = []
list_labels = []

for index, row in raw_dataset.iterrows():
    try:
        label = assign_ner_tags_deberta(row)
        token_input = tokenizer(row['sentence'])
        list_input_ids.append(token_input['input_ids'])
        # list_token_type_ids.append(token_input['token_type_ids'])
        list_attention_mask.append(token_input['attention_mask'])
        list_labels.append(label)
    except Exception as error:
        # print(error)
        print(index)
        print(row['sentence'])

tokenized_datasets = pd.DataFrame()
tokenized_datasets['input_ids'] = pd.Series(list_input_ids)
# tokenized_datasets['token_type_ids'] = pd.Series(list_token_type_ids)
tokenized_datasets['attention_mask'] = pd.Series(list_attention_mask)
tokenized_datasets['labels'] = pd.Series(list_labels)

train_dataset = Dataset.from_pandas(tokenized_datasets)
val_dataset = Dataset.from_pandas(tokenized_datasets)
test_dataset = Dataset.from_pandas(tokenized_datasets)

tokenized_datasets = DatasetDict(
    {"train": train_dataset, "val": val_dataset, "test": test_dataset})
tokenized_datasets

2
🐶 【 Non - Slip Bottom 】 : The rectangular bed comes with a non - slip bottom that ensures the bed doesn ' t move and keeps your pet safe .
11
Windmill Pattern Natural Finish , the boards ate 1 / 2 inch thick , the screws are coated steel , 1 / 2 - Inch long , they do not protrude as they are mounted beneath with a plastic collar
14
Use Indoors & Outdoors
16
IDEAL FOR MEDIUM SIZE PETS : Recommended for pets 25 lbs . and under ; Interior space measures 22 . 3 in . H x 21 . 1 in . W x 30 in . D
17
SECURE PET CRATE : Features a latch locking mechanism to keep your pet safely contained
25
GREAT LOOK - Pet bed is strawberry shape design , perfect for your adorable pets small dog , puppy , cat , pig , etcs .
28
GOOD GIFT - Dog house heater aslo is a great gift for friends who have raise lovely pets , for both cats and small dogs .
30
Perfect Size : Cat house measures - Φ 12 . 2 ” x 13 . 8 ” ( H ) ; Entry - way measures 7 . 5 ” ( W ) x 7 . 5 ” ( H ) ; Weight - 5lbs .
33
Privacy Place : This 

DatasetDict({
    train: Dataset({
        features: ['input_ids', 'attention_mask', 'labels'],
        num_rows: 2537
    })
    val: Dataset({
        features: ['input_ids', 'attention_mask', 'labels'],
        num_rows: 2537
    })
    test: Dataset({
        features: ['input_ids', 'attention_mask', 'labels'],
        num_rows: 2537
    })
})

In [51]:
from transformers import DataCollatorForTokenClassification

data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

In [52]:
import evaluate

metric = evaluate.load("seqeval")

In [53]:
import numpy as np

label_names = processed_ner_tags
id2label = {i: label for i, label in enumerate(label_names)}
label2id = {v: k for k, v in id2label.items()}


def compute_metrics(eval_preds):
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)

    # Remove ignored index (special tokens) and convert to labels
    true_labels = [[label_names[l] for l in label if l != -100]
                   for label in labels]
    true_predictions = [
        [label_names[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    all_metrics = metric.compute(
        predictions=true_predictions, references=true_labels)
    return {
        "precision": all_metrics["overall_precision"],
        "recall": all_metrics["overall_recall"],
        "f1": all_metrics["overall_f1"],
        "accuracy": all_metrics["overall_accuracy"],
    }

In [54]:
from sklearn.utils.class_weight import compute_class_weight
from collections import Counter
import numpy as np

def get_class_weight(train_dataset, num_class):
    list_label = []
    for label in train_dataset['labels']:
        for cl in label:
            if cl != -100:
                list_label.append(cl)
    class_weight = compute_class_weight(class_weight='balanced', classes=np.arange(num_class), y=list_label)
    return class_weight

class_weight = get_class_weight(train_dataset, len(processed_ner_tags))
print(class_weight)
print(len(class_weight))

[6.25553276e-02 5.21506935e+00 4.32536978e+00 3.48417824e+01
 4.58000849e+01 1.08175439e+01 1.92580893e+00 5.67921053e+01
 3.59443704e+01 9.08673684e+00 1.68023980e+01 1.76482614e+00
 9.42607556e-01 3.83730441e+01 1.09215587e+02 5.35774578e+01
 5.67921053e+01 9.79174229e+01 2.10341131e+02]
19


In [55]:
from transformers import AutoModelForTokenClassification
from transformers.models.bert import modeling_bert
from transformers import DebertaV2ForTokenClassification

model = AutoModelForTokenClassification.from_pretrained(
    model_checkpoint,
    id2label=id2label,
    label2id=label2id,
    ignore_mismatched_sizes=True
)

# model = AutoModelForTokenClassification.from_pretrained(
#     model_checkpoint,
#     id2label=id2label,
#     label2id=label2id,
#     ignore_mismatched_sizes=True
# )

model.config

Some weights of the model checkpoint at microsoft/deberta-base were not used when initializing DebertaForTokenClassification: ['lm_predictions.lm_head.bias', 'lm_predictions.lm_head.dense.bias', 'lm_predictions.lm_head.LayerNorm.weight', 'lm_predictions.lm_head.LayerNorm.bias', 'lm_predictions.lm_head.dense.weight', 'deberta.embeddings.position_embeddings.weight']
- This IS expected if you are initializing DebertaForTokenClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DebertaForTokenClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DebertaForTokenClassification were not initialized from the model checkpoint at microsoft/deberta-base and are newly initial

DebertaConfig {
  "_name_or_path": "microsoft/deberta-base",
  "attention_probs_dropout_prob": 0.1,
  "hidden_act": "gelu",
  "hidden_dropout_prob": 0.1,
  "hidden_size": 768,
  "id2label": {
    "0": "O",
    "1": "B-MAT",
    "2": "I-MAT",
    "3": "B-NMAT",
    "4": "I-NMAT",
    "5": "B-DIMENSION",
    "6": "I-DIMENSION",
    "7": "B-WEIGHT",
    "8": "I-WEIGHT",
    "9": "B-TARGET_USER",
    "10": "I-TARGET_USER",
    "11": "B-PROPERTY",
    "12": "I-PROPERTY",
    "13": "B-COLOR",
    "14": "I-COLOR",
    "15": "B-SHAPE",
    "16": "I-SHAPE",
    "17": "B-SIZE",
    "18": "I-SIZE"
  },
  "initializer_range": 0.02,
  "intermediate_size": 3072,
  "label2id": {
    "B-COLOR": 13,
    "B-DIMENSION": 5,
    "B-MAT": 1,
    "B-NMAT": 3,
    "B-PROPERTY": 11,
    "B-SHAPE": 15,
    "B-SIZE": 17,
    "B-TARGET_USER": 9,
    "B-WEIGHT": 7,
    "I-COLOR": 14,
    "I-DIMENSION": 6,
    "I-MAT": 2,
    "I-NMAT": 4,
    "I-PROPERTY": 12,
    "I-SHAPE": 16,
    "I-SIZE": 18,
    "I-TARGET_USER

In [56]:
from torch.utils.data import DataLoader

train_dataloader = DataLoader(
    tokenized_datasets["train"],
    shuffle=True,
    collate_fn=data_collator,
    batch_size=2,
)
eval_dataloader = DataLoader(
    tokenized_datasets["test"], collate_fn=data_collator, batch_size=2
)

In [57]:
from torch.optim import AdamW

optimizer = AdamW(model.parameters(), lr=2e-5)

In [58]:
from accelerate import Accelerator

accelerator = Accelerator()
model, optimizer, train_dataloader, eval_dataloader = accelerator.prepare(
    model, optimizer, train_dataloader, eval_dataloader
)

In [59]:
def postprocess(predictions, labels):
    predictions = predictions.detach().cpu().clone().numpy()
    labels = labels.detach().cpu().clone().numpy()

    # Remove ignored index (special tokens) and convert to labels
    true_labels = [[label_names[l] for l in label if l != -100] for label in labels]
    true_predictions = [
        [label_names[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    return true_labels, true_predictions

In [60]:
from transformers import get_scheduler

num_train_epochs = 15
num_update_steps_per_epoch = len(train_dataloader)
num_training_steps = num_train_epochs * num_update_steps_per_epoch

lr_scheduler = get_scheduler(
    "linear",
    optimizer=optimizer,
    num_warmup_steps=0,
    num_training_steps=num_training_steps,
)

In [61]:
from tqdm.auto import tqdm
import torch
from datetime import datetime
import os

date_time = datetime.now()
format_date = date_time.strftime('%Y-%m-%d')
format_time = date_time.strftime('%H:%M:%S')

print(f"Date: {format_date}")
print(f"Time: {format_time}")

# Replace with desire output dir
output_dir = f"../models/model_from_{format_date}/deberta-base_{format_time}"
progress_bar = tqdm(range(num_training_steps))

best_f1_score = 0

for epoch in range(num_train_epochs):
    # Training
    model.train()
    for batch in train_dataloader:
        outputs = model(**batch)
        loss = outputs.loss
        accelerator.backward(loss)

        optimizer.step()
        lr_scheduler.step()
        optimizer.zero_grad()
        progress_bar.update(1)

    # Evaluation
    model.eval()
    for batch in eval_dataloader:
        with torch.no_grad():
            outputs = model(**batch)

        predictions = outputs.logits.argmax(dim=-1)
        labels = batch["labels"]

        # Necessary to pad predictions and labels for being gathered
        predictions = accelerator.pad_across_processes(
            predictions, dim=1, pad_index=-100)
        labels = accelerator.pad_across_processes(
            labels, dim=1, pad_index=-100)

        predictions_gathered = accelerator.gather(predictions)
        labels_gathered = accelerator.gather(labels)

        true_predictions, true_labels = postprocess(
            predictions_gathered, labels_gathered)
        metric.add_batch(predictions=true_predictions, references=true_labels)

    results = metric.compute()
    print(
        f"epoch {epoch}:",
        {
            key: results[f"overall_{key}"]
            for key in ["precision", "recall", "f1", "accuracy"]
        },
    )

    # Save and upload
    accelerator.wait_for_everyone()
    unwrapped_model = accelerator.unwrap_model(model)
    if (epoch + 1) % 5 == 0:
        output_ckpt = os.path.join(output_dir, f'epoch_{epoch + 1}')
        print(f"Save model at epoch {epoch + 1}")
        tokenizer.save_pretrained(output_ckpt)
        unwrapped_model.save_pretrained(output_ckpt, save_function=accelerator.save)

Date: 2023-10-31
Time: 15:44:57


  0%|          | 0/19035 [00:00<?, ?it/s]

You're using a DebertaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


# Inference

In [None]:
from transformers import pipeline
from collections import defaultdict

# Replace this with your own checkpoint
model_checkpoint = "../models/model_from_2023-10-31/deberta-base_11:38:46/epoch_25"
token_classifier = pipeline(
    "ner", model=model_checkpoint, aggregation_strategy="simple"
)

In [None]:
def preprocess_description(description):
    single_description = description.strip()
    new_description = []
    last_special = -1
    for idx, letter in enumerate(single_description):
        if not (('a' <= letter and letter <= 'z') or ('A' <= letter and letter <= 'Z') or ('0' <= letter and letter <= '9') or letter == ' '):
            pretext = single_description[last_special + 1:idx].strip()
            if pretext != '' and pretext != ' ':
                new_description.append(pretext)
            new_description.append(letter.strip())
            last_special = idx
        if idx == len(single_description) - 1:
            new_description.append(
                single_description[last_special + 1:idx + 1].strip())
    return " ".join(new_description)

In [None]:
import json

# Replace this description
description = '''
Unlike other mixed multi - layer memory foam pads that are used to cut corners , our solid memory foam pads are superior in quality and value .
'''

high_score_ans = defaultdict(set)
bullet_points = description.split("\n")
for bullet_point in bullet_points:
    bullet_point = preprocess_description(bullet_point)

    if bullet_point != "":
        print(bullet_point)

        results = token_classifier(bullet_point)
        for res in results:
            if res['word'].lower().strip() in ['durable', 'strong', 'heavy-duty', 'heavy duty', 'stability', 'versatile']:
                continue
            group = res['entity_group']
            if res['score'] >= 0.6:
                high_score_ans[group].add(res['word'].lower().strip())
                
new_high_score_ans = defaultdict(list)
for key_dict in high_score_ans.keys():
    new_high_score_ans[key_dict] = list(high_score_ans[key_dict])

print("-"*100)            
print(json.dumps(new_high_score_ans, sort_keys=True, indent=4))                

In [None]:
from transformers import AutoTokenizer

model_checkpoint = "microsoft/deberta-v2-xlarge"
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint)