In [16]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import random, ast
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer
from itertools import combinations
from scipy import stats
from torch import Tensor

import configuration
from dataset_class.data_preprocessing import *

In [137]:
""" Configuration for NER Pipeline """

class CFG:
    """ Pipeline Setting """
    train, test = True, False
    checkpoint_dir = './saved/model'
    resume, load_pretrained,  state_dict = True, False, '/'
    name = 'FBP3_Base_Train_Pipeline'
    loop = 'mpl_loop'
    dataset = 'FBPDataset'  # dataset_class.dataclass.py -> FBPDataset, MPLDataset
    model_arch = 'FBPModel'  # model.model.py -> FBPModel, MPLModel
    model = 'microsoft/deberta-v3-large'
    tokenizer = AutoTokenizer.from_pretrained(model)
    pooling = 'MeanPooling'  # mean, attention, max, weightedlayer, concat, conv1d, lstm

    """ Common Options """
    wandb = True
    optuna = False  # if you want to tune hyperparameter, set True
    competition = 'FB3'
    seed = 42
    cfg_name = 'CFG'
    n_gpu = 1
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'mps')
    gpu_id = 0
    num_workers = 0

    """ Data Options """
    n_folds = 5
    max_len = 2048  # Max Sequence Length == 1778
    epochs = 180
    batch_size = 64
    val_batch_size = 64

    """ Gradient Options """
    amp_scaler = False
    gradient_checkpoint = True  # save parameter
    clipping_grad = True  # clip_grad_norm
    n_gradient_accumulation_steps = 1
    max_grad_norm = 1000

    """ Loss & Metrics Options """
    loss_fn = 'SmoothL1Loss'
    # val_loss_fn = 'WeightedMSELoss'
    margin = 0.5
    reduction = 'mean'
    metrics = ['MCRMSE', 'f_beta', 'recall']

    """ Optimizer with LLRD Options """
    optimizer = 'AdamW'  # options: SWA, AdamW
    llrd = True
    layerwise_lr = 5e-6
    layerwise_lr_decay = 0.9
    layerwise_weight_decay = 1e-2
    layerwise_adam_epsilon = 1e-6
    layerwise_use_bertadam = False
    betas = (0.9, 0.999)

    """ Scheduler Options """
    scheduler = 'cosine_annealing'  # options: cosine, linear, cosine_annealing, linear_annealing
    batch_scheduler = True
    num_cycles = 0.5  # num_warmup_steps = 0
    warmup_ratio = 0.1  # options: 0.05, 0.1

    """ SWA Options """
    swa = True
    swa_start = int(epochs*0.75)
    swa_lr = 1e-4
    anneal_epochs = 4
    anneal_strategy = 'cos'  # default = cos, available option: linear

    """ Model_Utils Options """
    init_weight = 'xavier_normal'
    stop_mode = 'min'
    freeze = False
    num_freeze = 2
    reinit = True
    num_reinit = 0
    awp = False
    nth_awp_start_epoch = 10
    awp_eps = 1e-2
    awp_lr = 1e-4

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


In [139]:
def check_device() -> bool:
    return torch.mps.is_available()

def check_library(checker: bool) -> tuple:
    """
    1) checker == True
        - current device is mps
    2) checker == False
        - current device is cuda with cudnn
    """
    if not checker:
        _is_built = torch.backends.cudnn.is_available()
        _is_enable = torch.backends.cudnn.enabledtorch.backends.cudnn.enabled
        version = torch.backends.cudnn.version()
        device = (_is_built, _is_enable, version)
        return device


def class2dict(cfg) -> dict:
    return dict((name, getattr(cfg, name)) for name in dir(cfg) if not name.startswith('__'))


def all_type_seed(cfg, checker: bool) -> None:
    # python & torch seed
    os.environ['PYTHONHASHSEED'] = str(cfg.seed)  # python Seed
    random.seed(cfg.seed)  # random module Seed
    np.random.seed(cfg.seed)  # numpy module Seed
    torch.manual_seed(cfg.seed)  # Pytorch CPU Random Seed Maker

    # device == cuda
    if not checker:
        torch.cuda.manual_seed(cfg.seed)  # Pytorch GPU Random Seed Maker
        torch.cuda.manual_seed_all(cfg.seed)  # Pytorch Multi Core GPU Random Seed Maker
        # torch.cudnn seed
        torch.backends.cudnn.deterministic = False
        torch.backends.cudnn.benchmark = True
        torch.backends.cudnn.enabled = True

    # devide == mps
    else:
        torch.mps.manual_seed(cfg.seed)


def seed_worker(worker_id) -> None:
    worker_seed = torch.initial_seed() % 2 ** 32
    np.random.seed(worker_seed)
    random.seed(worker_seed)

os.environ['TOKENIZERS_PARALLELISM'] = "false"
os.environ['LRU_CACHE_CAPACITY'] = "1"
os.environ['AUTOCASTLONGTO_FLOAT16'] = "false"
check_library(True)
all_type_seed(CFG, True)

In [115]:
""" Load Train Dataset """
train_df = pd.read_csv('./dataset_class/data_folder/final_converted_train_df.csv')
train_df

Unnamed: 0,id,text,entities,fold
0,E1FA876D6E6C,"Dear Senator,\n\nI am writting this letter to ...","['O', 'O', 'B-Lead', 'I-Lead', 'I-Lead', 'I-Le...",0
1,8AC1D6E165CD,"Dear Principal, I believe in policy 2. Kids ar...","['O', 'O', 'B-Position', 'I-Position', 'I-Posi...",4
2,45EF6A4EDB1A,"Summer projects are no fun, but they are a gre...","['B-Lead', 'I-Lead', 'I-Lead', 'I-Lead', 'I-Le...",3
3,B0070361406D,"The author who wrote ""The challenge of Explori...","['B-Lead', 'I-Lead', 'I-Lead', 'I-Lead', 'I-Le...",0
4,839F4F7F7DD7,Our school systems have seen many changes as t...,"['B-Lead', 'I-Lead', 'I-Lead', 'I-Lead', 'I-Le...",4
...,...,...,...,...
15589,DB0E754610DB,In my opinion I think that the author did a gr...,"['B-Position', 'I-Position', 'I-Position', 'I-...",2
15590,709156BCF733,Dear Principal:\n\nI think students must have ...,"['O', 'O', 'B-Position', 'I-Position', 'I-Posi...",2
15591,A0F328ADB40C,I believe that students should not be allowed ...,"['B-Position', 'I-Position', 'I-Position', 'I-...",4
15592,688370E9E6F1,i think that technology is valuable because yo...,"['B-Lead', 'I-Lead', 'I-Lead', 'I-Lead', 'I-Le...",0


In [138]:
""" Max Sequence Length: 1778 for after tokenizing """
length_list = sequence_length(CFG, train_df.text.to_list())
max(length_list)

  0%|          | 0/15594 [00:00<?, ?it/s]

1778

In [133]:
""" Data utils """

def sequence_length(cfg: configuration.CFG, text_list: list) -> list:
    """ Get sequence length of all text data for checking statistics value """
    length_list = []
    for text in tqdm(text_list):
        tmp_text = ner_tokenizing(cfg, text)['attention_mask']
        length_list.append(tmp_text.count(1))
    return length_list

def check_null(df: pd.DataFrame) -> pd.Series:
    """ check if input dataframe has null type object...etc """
    return df.isnull().sum()

def kfold(df: pd.DataFrame, cfg) -> pd.DataFrame:
    """ KFold """
    fold = KFold(
        n_splits=cfg.n_folds,
        shuffle=True,
        random_state=cfg.seed
    )
    df['fold'] = -1
    for num, (tx, vx) in enumerate(fold.split(df)):
        df.loc[vx, "fold"] = int(num)
    return df

def group_kfold(df: pd.DataFrame, cfg: configuration.CFG) -> pd.DataFrame:
    """ GroupKFold """
    fold = GroupKFold(
        n_splits=cfg.n_folds,
    )
    df['fold'] = -1
    for num, (tx, vx) in enumerate(fold.split(X=df, y=df['pct_rank'], groups=df['ancestor_id'])):
        df.loc[vx, "fold"] = int(num)
    return df

def ner_tokenizing(cfg: configuration.CFG, text: str):
    """
    Preprocess text for NER Pipeline
    if you want to set param 'return_offsets_mapping' == True, you must use FastTokenizer
    you must use PretrainedTokenizer which is supported FastTokenizer
    Converting text to torch.Tensor will be done in Custom Dataset Class
    Params:
        return_offsets_mapping:
            - (bool, optional, defaults to False)
            - Whether or not to return (char_start, char_end) for each token.
            => useful for NER Task
    Args:
        cfg: configuration.CFG, needed to load tokenizer from Huggingface AutoTokenizer
        text: text from dataframe or any other dataset, please pass str type
    """
    inputs = cfg.tokenizer(
        text,
        return_offsets_mapping=True,  # only available for FastTokenizer by Rust, not erase /n, /n/n
        max_length=cfg.max_len,
        padding='max_length',
        truncation=True,
        return_tensors=None,
        add_special_tokens=True,
    )
    return inputs

def load_data(data_path: str) -> pd.DataFrame:
    """
    Load data_folder from csv file like as train.csv, test.csv, val.csv
    """
    df = pd.read_csv(data_path)
    return df

def labels2ids():
    """
    Encoding labels to ids for neural network with BIO Styles
    labels2dict = {
    'O': 0, 'B-Lead': 1, 'I-Lead': 2, 'B-Position': 3, 'I-Position': 4, 'B-Claim': 5,
    'I-Claim': 6, 'B-Counterclaim': 7, 'I-Counterclaim': 8, 'B-Rebuttal': 9, 'I-Rebuttal': 10,
    'B-Evidence': 11, 'I-Evidence': 12, 'B-Concluding Statement': 13, 'I-Concluding Statement': 14
     }
    """
    output_labels = [
        'O', 'B-Lead', 'I-Lead', 'B-Position', 'I-Position', 'B-Claim', 'I-Claim', 'B-Counterclaim',
        'I-Counterclaim', 'B-Rebuttal', 'I-Rebuttal', 'B-Evidence', 'I-Evidence', 'B-Concluding Statement',
        'I-Concluding Statement'
    ]
    labels_to_ids = {v: k for k, v in enumerate(output_labels)}
    return labels_to_ids


def ids2labels():
    """
    Decoding labels to ids for neural network with BIO Styles
    labels2dict = {
    'O': 0, 'B-Lead': 1, 'I-Lead': 2, 'B-Position': 3, 'I-Position': 4, 'B-Claim': 5,
    'I-Claim': 6, 'B-Counterclaim': 7, 'I-Counterclaim': 8, 'B-Rebuttal': 9, 'I-Rebuttal': 10,
    'B-Evidence': 11, 'I-Evidence': 12, 'B-Concluding Statement': 13, 'I-Concluding Statement': 14
     }

    """
    output_labels = [
        'O', 'B-Lead', 'I-Lead', 'B-Position', 'I-Position', 'B-Claim', 'I-Claim', 'B-Counterclaim',
        'I-Counterclaim', 'B-Rebuttal', 'I-Rebuttal', 'B-Evidence', 'I-Evidence', 'B-Concluding Statement',
        'I-Concluding Statement'
    ]
    ids_to_labels = {k: v for k, v in enumerate(output_labels)}
    return ids_to_labels

def split_mapping(unsplit):
    """ Return array which is mapping character index to index of word in list of split() words """
    splt = unsplit.split()
    offset_to_wordidx = np.full(len(unsplit),-1)
    txt_ptr = 0
    for split_index, full_word in enumerate(splt):
        while unsplit[txt_ptr:txt_ptr + len(full_word)] != full_word:
            txt_ptr += 1
        offset_to_wordidx[txt_ptr:txt_ptr + len(full_word)] = split_index
        txt_ptr += len(full_word)
    return offset_to_wordidx

def sorted_quantile(array: list, q: float):
    """
    This is used to prevent re-sorting to compute quantile for every sequence.
    Args:
        array: list of element
        q: accumulate probability which you want to calculate spot
    Reference:
        https://stackoverflow.com/questions/60467081/linear-interpolation-in-numpy-quantile
        https://www.kaggle.com/code/chasembowers/sequence-postprocessing-v2-67-lb/notebook
    """
    array = np.array(array)
    n = len(array)
    index = (n - 1) * q
    left = np.floor(index).astype(int)
    fraction = index - left
    right = left
    right = right + (fraction > 0).astype(int)
    i, j = array[left], array[right]
    return i + (j - i) * fraction

labels2ids = labels2ids()
ids2labels = ids2labels()

In [105]:
""" Model Utils """
def freeze(module) -> None:
    """
    Freezes module's parameters.

    [Example]
    freezing embeddings and first 2 layers of encoder
    1) freeze(model.embeddings)
    2) freeze(model.encoder.layer[:2])
    """
    for parameter in module.parameters():
        parameter.requires_grad = False

def reinit_topk(model, num_layers):
    """
    Re-initialize the last-k transformer Encoder layers.
    Encoder Layer: Embedding, Attention Head, LayerNorm, Feed Forward
    Args:
        model: The target transformer model.
        num_layers: The number of layers to be re-initialized.
    """
    if num_layers > 0:
        model.encoder.layers[-num_layers:].apply(model._init_weights)

In [106]:
""" Trainer Utils """
def get_optimizer_grouped_parameters(model, layerwise_lr, layerwise_weight_decay, layerwise_lr_decay):
    """ Grouped Version: Layer-wise learning rate decay """
    no_decay = ["bias", "LayerNorm.bias", "LayerNorm.weight"]
    # initialize lr for task specific layer
    optimizer_grouped_parameters = [{"params": [p for n, p in model.named_parameters() if "model" not in n],
                                     "weight_decay": 0.0,
                                     "lr": layerwise_lr,
                                     }, ]
    # initialize lrs for every layer
    layers = [model.embeddings] + list(model.encoder.layers)
    layers.reverse()
    lr = layerwise_lr
    for layer in layers:
        optimizer_grouped_parameters += [
            {"params": [p for n, p in layer.named_parameters() if not any(nd in n for nd in no_decay)],
             "weight_decay": layerwise_weight_decay,
             "lr": lr,
             },
            {"params": [p for n, p in layer.named_parameters() if any(nd in n for nd in no_decay)],
             "weight_decay": 0.0,
             "lr": lr,
             },]
        lr *= layerwise_lr_decay
    return optimizer_grouped_parameters

def collate(inputs):
    """ Descending sort inputs by length of sequence """
    mask_len = int(inputs["attention_mask"].sum(axis=1).max())
    for k, v in inputs.items():
        inputs[k] = inputs[k][:, :mask_len]
    return inputs


def get_swa_scheduler(cfg, optimizer):
    """  SWA Scheduler """
    swa_scheduler = getattr(torch.optim.swa_utils, 'SWALR')(
        optimizer,
        swa_lr=cfg.swa_lr,
        anneal_epochs=cfg.anneal_epochs,
        anneal_strategy=cfg.anneal_strategy
    )
    return swa_scheduler


def get_scheduler(cfg, optimizer, len_train: int):
    """ Select Scheduler Function """
    scheduler_dict = {
        'cosine_annealing': 'get_cosine_with_hard_restarts_schedule_with_warmup',
        'cosine': 'get_cosine_schedule_with_warmup',
        'linear': 'get_linear_schedule_with_warmup'
    }
    lr_scheduler = getattr(transformers, scheduler_dict[cfg.scheduler])(
        optimizer,
        num_warmup_steps=int(len_train/cfg.batch_size * cfg.epochs/cfg.n_gradient_accumulation_steps) * cfg.warmup_ratio,
        num_training_steps=int(len_train/cfg.batch_size * cfg.epochs/cfg.n_gradient_accumulation_steps),
        num_cycles=cfg.num_cycles
    )
    return lr_scheduler

In [107]:
""" Loss & Metric Function """

class SmoothL1Loss(nn.Module):
    """ Smooth L1 Loss in Pytorch """
    def __init__(self, reduction):
        super().__init__()
        self.reduction = reduction

    def forward(self, y_pred, y_true) -> Tensor:
        criterion = nn.SmoothL1Loss(reduction=self.reduction)
        return criterion(y_pred, y_true)


# Cross-Entropy Loss
class CrossEntropyLoss(nn.Module):
    def __init__(self, reduction):
        super().__init__()
        self.reduction = reduction

    def forward(self, y_pred, y_true) -> Tensor:
        criterion = nn.CrossEntropyLoss(reduction=self.reduction)
        return criterion(y_pred, y_true)


# Binary Cross-Entropy Loss
class BinaryCrossEntropyLoss(nn.Module):
    def __init__(self, reduction):
        super().__init__()
        self.reduction = reduction

    def forward(self, y_pred, y_true) -> Tensor:
        criterion = nn.BCEWithLogitsLoss(reduction=self.reduction)
        return criterion(y_pred, y_true)

def recall(y_true, y_pred) -> float:
    """
    Actual positives that the model predicted to be positive
    Math:
        recall = tp / (tp + fn)
    """
    y_true = y_true.apply(lambda x: set(x.split()))
    y_pred = y_pred.apply(lambda x: set(x.split()))
    tp = np.array([len(x[0] & x[1]) for x in zip(y_true, y_pred)])
    fn = np.array([len(x[0] - x[1]) for x in zip(y_true, y_pred)])
    score = tp / (tp + fn)
    return round(score.mean(), 4)

def precision(y_true, y_pred) -> float:
    """
    Actual positives among the model's positive predictions
    Math:
        precision = tp / (tp + fp)
    """
    y_true = y_true.apply(lambda x: set(x.split()))
    y_pred = y_pred.apply(lambda x: set(x.split()))
    tp = np.array([len(x[0] & x[1]) for x in zip(y_true, y_pred)])
    fp = np.array([len(x[1] - x[0]) for x in zip(y_true, y_pred)])
    score = tp / (tp + fp)
    return round(score.mean(), 4)

def f_beta(y_true, y_pred, beta: float = 1) -> float:
    """
    F-beta score, in this competition, beta is 1 (micro f1 score)
    Element Explanation:
        tp: true positive
        fp: false positive
        tn: true negative
        fn: false negative
        if true ~, prediction == ground truth,
        if false ~, prediction != ground truth, ~ is prediction not ground truth value
    Math:
        f_beta = (1 + beta ** 2) * precision * recall / (beta ** 2 * precision + recall)
        if you want to emphasize precision, set beta < 1, options: 0.3, 0.6
        if you want to emphasize recall, set beta > 1, options: 1.5, 2
    Reference:
        https://blog.naver.com/PostView.naver?blogId=wideeyed&logNo=221531998840&parentCategoryNo=&categoryNo=2&
    """
    y_true = y_true.apply(lambda x: set(x.split()))
    y_pred = y_pred.apply(lambda x: set(x.split()))
    tp = np.array([len(x[0] & x[1]) for x in zip(y_true, y_pred)])
    fp = np.array([len(x[1] - x[0]) for x in zip(y_true, y_pred)])
    fn = np.array([len(x[0] - x[1]) for x in zip(y_true, y_pred)])
    f_precision = tp / (tp + fp)
    f_recall = tp / (tp + fn)
    score = (1 + beta ** 2) * f_precision * f_recall / (beta ** 2 * f_precision + f_recall)
    return round(score.mean(), 4)

def calc_overlap(row: pd.Series) -> list:
    """
    Calculates the overlap between prediction and ground truth and overlap percentages
    used for determining true positives for F1-Score, gt is abbreviation for ground truth
    """
    set_pred = set(row.predictionstring_pred.split(' '))
    set_gt = set(row.predictionstring_gt.split(' '))
    # Length of each and intersection
    len_gt = len(set_gt)
    len_pred = len(set_pred)
    inter = len(set_gt.intersection(set_pred))
    overlap_1 = inter / len_gt  # Recall
    overlap_2 = inter / len_pred  # Precision
    return [overlap_1, overlap_2]


def score_feedback_comp(pred_df: pd.DataFrame, gt_df: pd.DataFrame) -> float:
    """
    Function for scoring for competition
    Step 1:
        Make dataframe all ground truths and predictions for a given class are compared
    Step 2:
        If the overlap between the ground truth and prediction is >= 0.5 (Recall),
        and the overlap between the prediction and the ground truth >= 0.5 (Precision),
        In other words, prediction will be accepted 'True Positive',
        when Precision & Recall greater than 0.5
        the prediction is a match and considered a true positive.
        If multiple matches exist, the match with the highest pair of overlaps is taken.
        And then count number of Potential True Positive ids
    Step 3:
        Any unmatched ground truths are false negatives and any unmatched predictions are false positives.
        And then count number of Potential False Positives
    Step 4.
        Calculate Micro F1-Score for Cross Validation
    Reference:
        https://www.kaggle.com/c/feedback-prize-2021/overview/evaluation
    """
    gt_df = gt_df[['id', 'discourse_type', 'predictionstring']].reset_index(drop=True).copy()
    pred_df = pred_df[['id', 'class', 'predictionstring']].reset_index(drop=True).copy()
    pred_df['pred_id'] = pred_df.index
    gt_df['gt_id'] = gt_df.index

    # Stage 1. Make dataframe
    joined = pred_df.merge(
        gt_df,
        left_on=['id', 'class'],
        right_on=['id', 'discourse_type'],
        how='outer',
        suffixes=('_pred', '_gt')
    )
    joined['predictionstring_gt'] = joined['predictionstring_gt'].fillna(' ')
    joined['predictionstring_pred'] = joined['predictionstring_pred'].fillna(' ')
    joined['overlaps'] = joined.apply(calc_overlap, axis=1)  # Calculate Overlapped Percentage

    # Stage 2. Calculate Potential True Positive
    joined['overlap1'] = joined['overlaps'].apply(lambda x: eval(str(x))[0])
    joined['overlap2'] = joined['overlaps'].apply(lambda x: eval(str(x))[1])

    joined['potential_TP'] = (joined['overlap1'] >= 0.5) & (joined['overlap2'] >= 0.5)  # Precision & Recall
    joined['max_overlap'] = joined[['overlap1', 'overlap2']].max(axis=1)
    tp_pred_ids = joined.query('potential_TP') \
        .sort_values('max_overlap', ascending=False) \
        .groupby(['id', 'predictionstring_gt']).first()['pred_id'].values

    # stage 3. Calculate Potential False Positive
    fp_pred_ids = [p for p in joined['pred_id'].unique() if p not in tp_pred_ids]

    matched_gt_ids = joined.query('potential_TP')['gt_id'].unique()
    unmatched_gt_ids = [c for c in joined['gt_id'].unique() if c not in matched_gt_ids]

    # Stage 4. Calculate Micro F1-Score for Cross Validation
    tp = len(tp_pred_ids)
    fp = len(fp_pred_ids)
    fn = len(unmatched_gt_ids)
    score = tp / (tp + 0.5 * (fp + fn))
    return score

In [108]:
""" Debugging NERDataset """
# test_text = train_df.text[0]
# test_token = ner_tokenizing(CFG, test_text)
# word_ids, offset_to_wordidx, offsets = test_token.word_ids(), split_mapping(test_text), test_token['offset_mapping']
# split_word_ids = np.full(len(word_ids), -1)

# len(offset_to_wordidx), offset_to_wordidx, offsets
# is_train = True 
# label_ids = []
# word_labels = ast.literal_eval(train_df.entities[0])
# for token_idx, word_idx in reversed(list(enumerate(word_ids))):
#     if word_idx is None:
#         """ for padding token """
#         if is_train:
#             label_ids.append(-100)
#     else:
#         if offsets[token_idx] != (0, 0):
#             # Choose the split word that shares the most characters with the token if any
#             split_idxs = offset_to_wordidx[offsets[token_idx][0]:offsets[token_idx][1]]
#             split_index = stats.mode(split_idxs[split_idxs != -1]).mode[0] if len(np.unique(split_idxs)) > 1 else split_idxs[0]
#             if split_index != -1:
#                 if is_train:
#                     label_ids.append(labels2ids[word_labels[split_index]])
#                 split_word_ids[token_idx] = split_index
#             else:
#                 # Even if we don't find a word, continue labeling 'I' tokens until a 'B' token is found
#                 if label_ids and label_ids[-1] != -100 and ids2labels[label_ids[-1]][0] == 'I':
#                     split_word_ids[token_idx] = split_word_ids[token_idx + 1]
#                     if is_train:
#                         label_ids.append(label_ids[-1])
#                 else:
#                     if is_train:
#                         label_ids.append(-100)
#         else:
#             if is_train:
#                 label_ids.append(-100)

' Debugging NERDataset '

In [109]:
""" Custom Dataset Class """

class NERDataset(Dataset):
    """
    Custom Dataset Class for NER Task
    Args:
        cfg: configuration.CFG
        df: dataframe from .txt file
        is_train: if this param set False, return word_ids from self.df.entities
    """
    def __init__(self, cfg: configuration.CFG, df: pd.DataFrame, is_train: bool = True) -> None:
        self.cfg = cfg
        self.df = df
        self.tokenizer = ner_tokenizing
        self.labels2ids = labels2ids()  # Function for Encoding Labels to ids
        self.ids2labels = ids2labels()  # Function for Decoding ids to Labels
        self.is_train = is_train

    def __len__(self) -> int:
        return len(self.df)

    def __getitem__(self, item: int) -> dict[Tensor, Tensor, Tensor, Tensor]:
        """
        1) Tokenizing input text:
            - if you param 'return_offsets_mapping' == True, tokenizer doen't erase \n or \n\n
              but, I don't know this param also applying for DeBERTa Pretrained Tokenizer
        2) Create targets and mapping of tokens to split() words by tokenizer
            - Mapping Labels to split tokens
            - Iterate in reverse to label whitespace tokens until a Begin token is encountered
            - Tokenizer will split word into subsequent of character such as copied => copy, ##ed
            - So, we need to find having same parent token and then label BIO NER Tags
        3) Return dict:
            - Train: dict.keys = [inputs_id, attention_mask, token_type_ids, labels]
            - Validation/Test: dict.keys = [inputs_id, attention_mask, token_type_ids, word_ids]
        """
        text = self.df.text[item]
        if self.is_train:
            word_labels = ast.literal_eval(self.df.entities[item])

        # 1) Tokenizing input text
        encoding = self.tokenizer(
            self.cfg,
            text,
        )
        word_ids = encoding.word_ids()
        split_word_ids = np.full(len(word_ids), -1)
        offset_to_wordidx = split_mapping(text)  # [1, sequence_length]
        offsets = encoding['offset_mapping']  # [(src, end), (src, end), ...]

        # 2) Find having same parent token and then label BIO NER Tags
        label_ids = []
        for token_idx, word_idx in reversed(list(enumerate(word_ids))):
            if word_idx is None:
                """ for padding token """
                if self.is_train:
                    label_ids.append(-100)
            else:
                if offsets[token_idx] != (0, 0):
                    # Choose the split word that shares the most characters with the token if any
                    split_idxs = offset_to_wordidx[offsets[token_idx][0]:offsets[token_idx][1]]
                    split_index = stats.mode(split_idxs[split_idxs != -1]).mode[0] if len(
                        np.unique(split_idxs)) > 1 else split_idxs[0]
                    if split_index != -1:
                        if self.is_train:
                            label_ids.append(labels2ids[word_labels[split_index]])
                        split_word_ids[token_idx] = split_index
                    else:
                        # Even if we don't find a word, continue labeling 'I' tokens until a 'B' token is found
                        if label_ids and label_ids[-1] != -100 and self.ids2labels[label_ids[-1]][0] == 'I':
                            split_word_ids[token_idx] = split_word_ids[token_idx + 1]
                            if self.is_train:
                                label_ids.append(label_ids[-1])
                        else:
                            if self.is_train:
                                label_ids.append(-100)
                else:
                    if self.is_train:
                        label_ids.append(-100)
        encoding['labels'] = list(reversed(label_ids))   # restore to original order
        if not self.is_train:
            encoding['word_ids'] = torch.as_tensor(split_word_ids)
        else:
            for k, v in encoding.items():
                encoding[k] = torch.as_tensor(v)
        return encoding

In [110]:
""" Custom Model Class """

class NERModel(nn.Module):
    """
    Model class For NER Task Pipeline, in this class no pooling layer
    This pipeline apply B.I.O Style, so the number of classes is 15 which is 7 unique classes original
    Each of 7 unique classes has sub 2 classes (B, I) => 14 classes
    And 1 class for O => 1 class
    14 + 1 = 15 classes
    Args:
        cfg: configuration.CFG
    """
    def __init__(self, cfg: configuration.CFG) -> None:
        super().__init__()
        self.cfg = cfg
        self.auto_cfg = AutoConfig.from_pretrained(
            cfg.model,
            output_hidden_states=True
        )
        self.model = AutoModel.from_pretrained(
            cfg.model,
            config=self.auto_cfg
        )
        self.fc = nn.Linear(self.auto_cfg.hidden_size, 15)  # BIO Style NER Task

        if self.cfg.reinit:
            self._init_weights(self.fc)
            reinit_topk(self.model, cfg.num_reinit)

        if self.cfg.freeze:
            freeze(self.model.embeddings)
            freeze(self.model.encoder.layer[:cfg.num_freeze])

        if self.cfg.gradient_checkpoint:
            self.model.gradient_checkpointing_enable()

    def _init_weights(self, module) -> None:
        """ over-ride initializes weights of the given module function (+initializes LayerNorm) """
        if isinstance(module, nn.Linear):
            if self.cfg.init_weight == 'normal':
                module.weight.data.normal_(mean=0.0, std=self.auto_cfg.initializer_range)
            elif self.cfg.init_weight == 'xavier_uniform':
                module.weight.data = nn.init.xavier_uniform_(module.weight.data)
            elif self.cfg.init_weight == 'xavier_normal':
                module.weight.data = nn.init.xavier_normal_(module.weight.data)
            elif self.cfg.init_weight == 'kaiming_uniform':
                module.weight.data = nn.init.kaiming_uniform_(module.weight.data)
            elif self.cfg.init_weight == 'kaiming_normal':
                module.weight.data = nn.init.kaiming_normal_(module.weight.data)
            elif self.cfg.init_weight == 'orthogonal':
                module.weight.data = nn.init.orthogonal_(module.weight.data)
            if module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.Embedding):
            module.weight.data.normal_(mean=0.0, std=self.auto_cfg.initializer_range)
            if module.padding_idx is not None:
                module.weight.data[module.padding_idx].zero_()
        elif isinstance(module, nn.LayerNorm):
            """ reference from torch.nn.Layernorm with elementwise_affine=True """
            module.weight.data.fill_(1.0)
            module.bias.data.zero_()

    def feature(self, inputs: dict):
        outputs = self.model(**inputs)
        return outputs

    def forward(self, inputs: dict[Tensor, Tensor, Tensor]) -> Tensor:
        """
        No Pooling Layer for word-level task
        Args:
            inputs: Dict type from AutoTokenizer
            => {input_ids: Tensor[], attention_mask: Tensor[], token_type_ids: Tensor[]}
        """
        outputs = self.feature(inputs)
        logit = self.fc(outputs.last_hidden_state)
        return logit