In [None]:
!nvidia-smi

In [None]:
! pip install tqdm
! pip install overrides
! pip install torch==1.7.0
! pip install transformers==3.5.1
! pip install pytorch-lightning==1.1.0
! wget https://github.com/KLUE-benchmark/KLUE/raw/main/klue_benchmark/klue-ner-v1.1/klue-ner-v1.1_train.tsv
! wget https://github.com/KLUE-benchmark/KLUE/raw/main/klue_benchmark/klue-ner-v1.1/klue-ner-v1.1_dev.tsv

In [None]:
import re
import argparse
import logging
from pathlib import Path
from typing import Dict, List, Optional, Union
from torch.utils.data import TensorDataset
from dataclasses import dataclass

import pytorch_lightning as pl
import torch
import torch.nn as nn

# huggingface's transformers library
from transformers import AdamW, AutoConfig, AutoTokenizer, PretrainedConfig, PreTrainedTokenizer ,AutoModelForTokenClassification
from transformers.optimization import (
    Adafactor,
    get_cosine_schedule_with_warmup,
    get_cosine_with_hard_restarts_schedule_with_warmup,
    get_linear_schedule_with_warmup,
    get_polynomial_decay_schedule_with_warmup,
)

# the tqdm library used to show the iteration progress
import tqdm
tqdmn = tqdm.notebook.tqdm

In [None]:
roberta_version = 'klue/roberta-large'
tokenizer = AutoTokenizer.from_pretrained(roberta_version)

In [None]:
logger = logging.getLogger(__name__)

tokenizer_type ="bert-wp"
hparams = argparse.Namespace(max_seq_length=128)


@dataclass
class InputExample:
    """A single example of data.utils
    This is for YNAT, KLUE-NLI, KLUE-NER, and KLUE-RE.
    Args:
        guid: Unique id for the example.
        text_a: string. The untokenized text of the first sequence. For single
            sequence tasks, only this sequence must be specified.
        text_b: (Optional) string. The untokenized text of the second sequence.
            Only must be specified for sequence pair tasks.
        label: (Optional) string. The label of the example. This should be
            specified for train and dev examples, but not for test examples.
    """

    guid: str
    text_a: str
    text_b: Optional[str] = None
    label: Optional[str] = None

    def to_dict(self) -> Dict[str, str]:
        return dataclasses.asdict(self)

    def to_json_string(self) -> None:
        """Serializes this instance to a JSON string."""
        return json.dumps(self.to_dict(), indent=2) + "\n"

@dataclass(frozen=True)
class InputFeatures:
    """A single set of features of data to feed into the pretrained models.
    This is for YNAT, KLUE-STS, KLUE-NLI, KLUE-NER, and KLUE-RE. Property names
    are same with the corresponding inputs to a model.
    Args:
        input_ids: Indices of input sequence tokens in the vocabulary.
        attention_mask: Mask to avoid performing attention on padding token indices.
            Mask values selected in ``[0, 1]``: Usually ``1`` for tokens that are NOT MASKED, ``0`` for MASKED (padded)
            tokens.
        token_type_ids: (Optional) Segment token indices to indicate first and second
            portions of the inputs. Only some models use them.
        label: (Optional) Label corresponding to the input. Int for classification problems,
            float for regression problems.
    """

    input_ids: List[int]
    attention_mask: Optional[List[int]] = None
    token_type_ids: Optional[List[int]] = None
    label: Optional[Union[int, float]] = None

    def to_json_string(self) -> None:
        """Serializes this instance to a JSON string."""
        return json.dumps(dataclasses.asdict(self)) + "\n"

def convert_examples_to_features(
    examples: List[InputExample],
    tokenizer: "klue-ner-v1.1_train.tsv",
    label_list: List[str],
    max_length: Optional[int] = None,
    task_mode: Optional[str] = None,
) -> List[InputFeatures]:
    """Converts dataset in InputExample to dataset in InputFeatures to feed into pretrained models.
    This is for YNAT, KLUE-STS, KLUE-NLI, and KLUE-NER.
    Args:
        examples: List of InputExample converted from the raw dataset.
        tokenizer: Tokenizer of the pretrained model.
        label_list: List of labels of the task.
        max_length: Maximum length of the input tokens.
        task_mode: Task type.
    Returns:
        features: List of InputFeatures for the task and model.
    """
    if max_length is None:
        max_length = tokenizer.max_len

    label_map = {label: i for i, label in enumerate(label_list)}

    def label_from_example(example: InputExample) -> Union[int, float, None, List[int]]:
        if example.label is None:
            return None
        if task_mode == "classification":
            return label_map[example.label]
        elif task_mode == "regression":
            return float(example.label)
        elif task_mode == "tagging":  # See KLUE paper: https://arxiv.org/pdf/2105.09680.pdf
            token_label = [label_map["O"]] * (max_length)
            for i, label in enumerate(example.label[: max_length - 2]):  # last [SEP] label -> 'O'
                token_label[i + 1] = label_map[label]  # first [CLS] label -> 'O'
            return token_label
        raise KeyError(task_mode)

    labels = [label_from_example(example) for example in examples]

    batch_encoding = tokenizer(
        [(example.text_a, example.text_b) for example in examples],
        max_length=max_length,
        padding="max_length",
        truncation=True,
    )

    features = []
    for i in range(len(examples)):
        inputs = {k: batch_encoding[k][i] for k in batch_encoding}

        feature = InputFeatures(**inputs, label=labels[i])
        features.append(feature)

    for i, example in enumerate(examples[:5]):
        logger.info("*** Example ***")
        logger.info("guid: %s" % (example.guid))
        logger.info("features: %s" % features[i])

    return features



def get_labels() -> List[str]:
    return ["B-PS", "I-PS", "B-LC", "I-LC", "B-OG", "I-OG", "B-DT", "I-DT", "B-TI", "I-TI", "B-QT", "I-QT", "O"]

def _create_examples(file_path: str, dataset_type: str) -> List[InputExample]:
    """Loads the raw dataset and converts to InputExample.
    Since the ner dataset is tagged in character-level, subword-level token
    label should be aligned with the given unit. Here, we take the first
    character label for the token label.
    """
    is_training = dataset_type == "train"
    if tokenizer_type == "xlm-sp":
        strip_char = "▁"
    elif tokenizer_type == "bert-wp":
        strip_char = "##"
    else:
        raise ValueError("This code only supports XLMRobertaTokenizer & BertWordpieceTokenizer")

    examples = []
    ori_examples = []
    file_path = Path(file_path)
    raw_text = file_path.read_text().strip()
    raw_docs = re.split(r"\n\t?\n", raw_text)
    cnt = 0
    for doc in raw_docs:
        original_clean_tokens = []  # clean tokens (bert clean func)
        original_clean_labels = []  # clean labels (bert clean func)
        sentence = ""
        for line in doc.split("\n"):
            if line[:2] == "##":
                guid = line.split("\t")[0].replace("##", "")
                continue
            token, tag = line.split("\t")
            sentence += token
            if token == " ":
                continue
            original_clean_tokens.append(token)
            original_clean_labels.append(tag)
        # sentence: "안녕 하세요.."
        # original_clean_labels: [안, 녕, 하, 세, 요, ., .]
        sent_words = sentence.split(" ")
        # sent_words: [안녕, 하세요..]
        modi_labels = []
        char_idx = 0
        for word in sent_words:
            # 안녕, 하세요
            correct_syllable_num = len(word)
            tokenized_word = tokenizer.tokenize(word)
            # case1: 음절 tokenizer --> [안, ##녕]
            # case2: wp tokenizer --> [안녕]
            # case3: 음절, wp tokenizer에서 unk --> [unk]
            # unk규칙 --> 어절이 통채로 unk로 변환, 단, 기호는 분리
            contain_unk = True if tokenizer.unk_token in tokenized_word else False
            for i, token in enumerate(tokenized_word):
                token = token.replace(strip_char, "")
                if not token:
                    modi_labels.append("O")
                    continue
                modi_labels.append(original_clean_labels[char_idx])
                if not contain_unk:
                    char_idx += len(token)
            if contain_unk:
                char_idx += correct_syllable_num

        text_a = sentence  # original sentence
        examples.append(InputExample(guid=guid, text_a=text_a, label=modi_labels))
        ori_examples.append({"original_sentence": text_a, "original_clean_labels": original_clean_labels})
        cnt += 1
    if not is_training:
        data = getattr(hparams, "data", {})
        data[dataset_type] = {"original_examples": ori_examples}
        setattr(hparams, "data", data)
        setattr(hparams, "tokenizer", tokenizer)
    return examples

def _convert_features(examples: List[InputExample]) -> List[InputFeatures]:
    return convert_examples_to_features(
        examples,
        tokenizer,
        label_list=get_labels(),
        max_length=hparams.max_seq_length,
        task_mode="tagging",
    )

def _create_dataset(file_path: str, dataset_type: str) -> TensorDataset:
    examples = _create_examples(file_path, dataset_type)
    features = _convert_features(examples)

    all_input_ids = torch.tensor([f.input_ids for f in features], dtype=torch.long)
    all_attention_mask = torch.tensor([f.attention_mask for f in features], dtype=torch.long)
    # Some model does not make use of token type ids (e.g. RoBERTa)
    all_token_type_ids = torch.tensor(
        [0 if f.token_type_ids is None else f.token_type_ids for f in features], dtype=torch.long
    )
    all_labels = torch.tensor([f.label for f in features], dtype=torch.long)

    return TensorDataset(all_input_ids, all_attention_mask, all_token_type_ids, all_labels)


_create_dataset("./klue-ner-v1.1_train.tsv","train")

In [None]:
from overrides import overrides

class BaseTransformer(pl.LightningModule):
    """Initializes a model, tokenizer and config for the task."""

    USE_TOKEN_TYPE_MODELS = ["bert", "xlnet", "electra"]

    def __init__(
        self,
        hparams: argparse.Namespace,
        num_labels: Optional[int] = None,
        mode: str = "base",
        config: Optional[PretrainedConfig] = None,
        model_type: Optional[str] = None,
        tokenizer: Optional[PreTrainedTokenizer] = None,
        metrics: Dict[str, Any] = {},
        **config_kwargs: Dict[str, Any],
    ) -> None:
        super().__init__()

        data = getattr(hparams, "data", None)
        if data is not None:
            delattr(hparams, "data")
        self.save_hyperparameters(hparams)
        self.hparams.data = data

        self.step_count = 0
        self.output_dir = Path(self.hparams.output_dir)
        self.predictions: List[int] = []

        cache_dir = self.hparams.cache_dir if self.hparams.cache_dir else None
        if config is None:
            self.config = AutoConfig.from_pretrained(
                self.hparams.config_name if self.hparams.config_name else self.hparams.model_name_or_path,
                **({"num_labels": num_labels} if num_labels is not None else {}),
                cache_dir=cache_dir,
                **config_kwargs,
            )
        else:
            self.config: PretrainedConfig = config  # type: ignore[no-redef]

        extra_model_params = ("encoder_layerdrop", "decoder_layerdrop", "dropout", "attention_dropout")
        for p in extra_model_params:
            if getattr(self.hparams, p, None):
                assert hasattr(self.config, p), f"model config doesn't have a `{p}` attribute"
                setattr(self.config, p, getattr(self.hparams, p))

        if tokenizer is None:
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.hparams.tokenizer_name if self.hparams.tokenizer_name else self.hparams.model_name_or_path,
                cache_dir=cache_dir,
            )
        else:
            self.tokenizer = tokenizer
        self.model = model_type.from_pretrained(
            self.hparams.model_name_or_path,
            from_tf=bool(".ckpt" in self.hparams.model_name_or_path),
            config=self.config,
            cache_dir=cache_dir,
        )
        self.metrics = nn.ModuleDict(metrics)
        self.eval_dataset_type = "valid"

    def is_use_token_type(self) -> bool:
        if self.config.model_type in set(self.USE_TOKEN_TYPE_MODELS):
            return True
        else:
            return False

    def get_lr_scheduler(self) -> Any:
        get_schedule_func = arg_to_scheduler[self.hparams.lr_scheduler]

        scheduler = get_schedule_func(
            self.opt, num_warmup_steps=self.num_warmup_steps(), num_training_steps=self.total_steps()
        )
        scheduler = {"scheduler": scheduler, "interval": "step", "frequency": 1}
        return scheduler

    def configure_optimizers(self) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
        """Prepare optimizer and schedule (linear warmup and decay)"""
        no_decay = ["bias", "LayerNorm.weight"]
        optimizer_grouped_parameters = [
            {
                "params": [p for n, p in self.named_parameters() if not any(nd in n for nd in no_decay)],
                "weight_decay": self.hparams.weight_decay,
            },
            {
                "params": [p for n, p in self.named_parameters() if any(nd in n for nd in no_decay)],
                "weight_decay": 0.0,
            },
        ]
        if self.hparams.adafactor:
            optimizer = Adafactor(
                optimizer_grouped_parameters, lr=self.hparams.learning_rate, scale_parameter=False, relative_step=False
            )
        else:
            optimizer = AdamW(
                optimizer_grouped_parameters, lr=self.hparams.learning_rate, eps=self.hparams.adam_epsilon
            )
        self.opt = optimizer
        scheduler = self.get_lr_scheduler()
        return [optimizer], [scheduler]

    def training_step(self) -> Dict[str, torch.Tensor]:
        raise NotImplementedError

    def training_step_end(self, training_step_outputs: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
        # For DataParallel
        return {"loss": training_step_outputs["loss"].mean()}

    def validation_step(self, batch: List[torch.Tensor], batch_idx: int, data_type: str) -> Dict[str, torch.Tensor]:
        # return Format: (e.g. dictionary {"logits": logits, "labels": labels})
        raise NotImplementedError

    def validation_epoch_end(
        self, outputs: List[Dict[str, torch.Tensor]], data_type: str = "valid", write_predictions: bool = False
    ) -> None:
        preds = self._convert_outputs_to_preds(outputs)
        labels = torch.cat([output["labels"] for output in outputs], dim=0)

        if write_predictions is True:
            self.predictions = preds

        self._set_metrics_device()
        for k, metric in self.metrics.items():
            metric(preds, labels)
            self.log(f"{data_type}/{k}", metric, on_step=False, on_epoch=True, logger=True)

    def test_step(self, batch: List[torch.Tensor], batch_idx: int) -> Dict[str, torch.Tensor]:
        assert self.eval_dataset_type in {"valid", "test"}
        return self.validation_step(batch, batch_idx, data_type=self.eval_dataset_type)

    def test_epoch_end(self, outputs: List[Dict[str, torch.Tensor]]) -> None:
        assert self.eval_dataset_type in {"valid", "test"}
        return self.validation_epoch_end(outputs, data_type=self.eval_dataset_type, write_predictions=True)

    def _convert_outputs_to_preds(self, outputs: List[Dict[str, torch.Tensor]]) -> Any:
        # outputs is output (dict, return object from validation_step) of list
        raise NotImplementedError

    def _set_metrics_device(self) -> None:
        device = next(self.parameters()).device
        for _, metric in self.metrics.items():
            if metric.device is None:
                metric.device = device

    def num_warmup_steps(self) -> Any:
        num_warmup_steps = self.hparams.warmup_steps
        if num_warmup_steps is None and self.hparams.warmup_ratio is not None:
            num_warmup_steps = self.total_steps() * self.hparams.warmup_ratio
            num_warmup_steps = math.ceil(num_warmup_steps)

        if num_warmup_steps is None:
            num_warmup_steps = 0
        return num_warmup_steps

    def total_steps(self) -> Any:
        """The number of total training steps that will be run. Used for lr scheduler purposes."""
        num_devices = max(1, self.hparams.num_gpus)  # TODO: consider num_tpu_cores
        effective_batch_size = self.hparams.train_batch_size * self.hparams.accumulate_grad_batches * num_devices
        return (self.hparams.dataset_size / effective_batch_size) * self.hparams.max_epochs

    @pl.utilities.rank_zero_only
    def on_save_checkpoint(self, checkpoint: Dict[str, Any]) -> None:
        save_path = self.output_dir.joinpath("transformers")
        self.model.config.save_step = self.step_count
        self.model.save_pretrained(save_path)
        self.tokenizer.save_pretrained(save_path)

    @staticmethod
    def add_specific_args(parser: argparse.ArgumentParser, root_dir: str) -> argparse.ArgumentParser:
        parser.add_argument(
            "--model_name_or_path",
            default=None,
            type=str,
            required=True,
            help="Path to pretrained model or model identifier from huggingface.co/models",
        )
        parser.add_argument(
            "--config_name", default="", type=str, help="Pretrained config name or path if not the same as model_name"
        )
        parser.add_argument(
            "--tokenizer_name",
            default=None,
            type=str,
            help="Pretrained tokenizer name or path if not the same as model_name",
        )
        parser.add_argument(
            "--cache_dir",
            default="",
            type=str,
            help="Where do you want to store the pre-trained models downloaded from s3",
        )
        parser.add_argument(
            "--encoder_layerdrop",
            type=float,
            help="Encoder layer dropout probability (Optional). Goes into model.config",
        )
        parser.add_argument(
            "--decoder_layerdrop",
            type=float,
            help="Decoder layer dropout probability (Optional). Goes into model.config",
        )
        parser.add_argument(
            "--dropout",
            type=float,
            help="Dropout probability (Optional). Goes into model.config",
        )
        parser.add_argument(
            "--attention_dropout",
            type=float,
            help="Attention dropout probability (Optional). Goes into model.config",
        )
        parser.add_argument("--learning_rate", default=5e-5, type=float, help="The initial learning rate for Adam.")
        parser.add_argument(
            "--lr_scheduler",
            default="linear",
            choices=arg_to_scheduler_choices,
            metavar=arg_to_scheduler_metavar,
            type=str,
            help="Learning rate scheduler",
        )
        parser.add_argument("--weight_decay", default=0.0, type=float, help="Weight decay if we apply some.")
        parser.add_argument("--adam_epsilon", default=1e-8, type=float, help="Epsilon for Adam optimizer.")
        parser.add_argument("--warmup_steps", default=None, type=int, help="Linear warmup over warmup_steps.")
        parser.add_argument("--warmup_ratio", default=None, type=float, help="Linear warmup over warmup_step ratio.")
        parser.add_argument("--num_train_epochs", dest="max_epochs", default=4, type=int)
        parser.add_argument("--adafactor", action="store_true")
        parser.add_argument("--verbose_step_count", default=100, type=int)
        return parser

class NERTransformer(BaseTransformer):

    mode = Mode.NamedEntityRecognition

    def __init__(self, hparams: Union[Dict[str, Any], argparse.Namespace], metrics: dict = {}) -> None:
        if type(hparams) == dict:
            hparams = argparse.Namespace(**hparams)

        self.tokenizer = hparams.tokenizer
        self.tokenizer_type = check_tokenizer_type(self.tokenizer)  # ["xlm-sp", "bert-wp", "other']
        # When unk, representing subword, is expanded to represent multiple
        # characters to align with character-level labels, this special
        # representation is used to represent characters from the second.
        # (e.g., 찝찝이 [UNK] --> 찝 [UNK] / 찝 [+UNK] / 이 [+UNK])
        self.in_unk_token = "[+UNK]"

        super().__init__(
            hparams,
            num_labels=hparams.num_labels,
            mode=self.mode,
            model_type=AutoModelForTokenClassification,
            metrics=metrics,
        )

    @overrides
    def forward(self, **inputs: torch.Tensor) -> Any:
        return self.model(**inputs)

    @overrides
    def training_step(self, batch: List[torch.Tensor], batch_idx: int) -> dict:
        inputs = {"input_ids": batch[0], "attention_mask": batch[1], "labels": batch[3]}

        if self.is_use_token_type():
            inputs["token_type_ids"] = batch[2]
        outputs = self(**inputs)
        loss = outputs[0]

        self.log("train/loss", loss)
        return {"loss": loss}

    @overrides
    def validation_step(self, batch: List[torch.Tensor], batch_idx: int, data_type: str = "valid") -> dict:
        inputs = {"input_ids": batch[0], "attention_mask": batch[1], "labels": batch[3]}

        if self.is_use_token_type():
            inputs["token_type_ids"] = batch[2]

        outputs = self(**inputs)
        loss, logits = outputs[:2]

        self.log(f"{data_type}/loss", loss, on_step=False, on_epoch=True, logger=True)

        return {"logits": logits, "labels": inputs["labels"]}

    @overrides
    def validation_epoch_end(
        self, outputs: List[Dict[str, torch.Tensor]], data_type: str = "valid", write_predictions: bool = False
    ) -> None:
        """When validation step ends, either token- or character-level predicted
        labels are aligned with the original character-level labels and then
        evaluated.
        """
        list_of_subword_preds = self._convert_outputs_to_preds(outputs)
        if self.tokenizer_type == "xlm-sp":
            strip_char = "▁"
        elif self.tokenizer_type == "bert-wp":
            strip_char = "##"
        else:
            raise ValueError("This code only supports XLMRobertaTokenizer & BertWordpieceTokenizer")

        original_examples = self.hparams.data[data_type]["original_examples"]
        list_of_character_preds = []
        list_of_originals = []
        label_list = self.hparams.label_list

        for i, (subword_preds, example) in enumerate(zip(list_of_subword_preds, original_examples)):
            original_sentence = example["original_sentence"]  # 안녕 하세요 ^^
            character_preds = [subword_preds[0].tolist()]  # [CLS]
            character_preds_idx = 1
            for word in original_sentence.split(" "):  # ['안녕', '하세요', '^^']
                if character_preds_idx >= self.hparams.max_seq_length - 1:
                    break
                subwords = self.tokenizer.tokenize(word)  # 안녕 -> [안, ##녕] / 하세요 -> [하, ##세요] / ^^ -> [UNK]
                if self.tokenizer.unk_token in subwords:  # 뻥튀기가 필요한 case!
                    unk_aligned_subwords = self.tokenizer_out_aligner(
                        word, subwords, strip_char
                    )  # [UNK] -> [UNK, +UNK]
                    unk_flag = False
                    for subword in unk_aligned_subwords:
                        if character_preds_idx >= self.hparams.max_seq_length - 1:
                            break
                        subword_pred = subword_preds[character_preds_idx].tolist()
                        subword_pred_label = label_list[subword_pred]
                        if subword == self.tokenizer.unk_token:
                            unk_flag = True
                            character_preds.append(subword_pred)
                            continue
                        elif subword == self.in_unk_token:
                            if subword_pred_label == "O":
                                character_preds.append(subword_pred)
                            else:
                                _, entity_category = subword_pred_label.split("-")
                                character_pred_label = "I-" + entity_category
                                character_pred = label_list.index(character_pred_label)
                                character_preds.append(character_pred)
                            continue
                        else:
                            if unk_flag:
                                character_preds_idx += 1
                                subword_pred = subword_preds[character_preds_idx].tolist()
                                character_preds.append(subword_pred)
                                unk_flag = False
                            else:
                                character_preds.append(subword_pred)
                                character_preds_idx += 1  # `+UNK`가 끝나는 시점에서도 += 1 을 해줘야 다음 label로 넘어감
                else:
                    for subword in subwords:
                        if character_preds_idx >= self.hparams.max_seq_length - 1:
                            break
                        subword = subword.replace(strip_char, "")  # xlm roberta: "▁" / others "##"
                        subword_pred = subword_preds[character_preds_idx].tolist()
                        subword_pred_label = label_list[subword_pred]
                        for i in range(0, len(subword)):  # 안, 녕
                            if i == 0:
                                character_preds.append(subword_pred)
                            else:
                                if subword_pred_label == "O":
                                    character_preds.append(subword_pred)
                                else:
                                    _, entity_category = subword_pred_label.split("-")
                                    character_pred_label = "I-" + entity_category
                                    character_pred = label_list.index(character_pred_label)
                                    character_preds.append(character_pred)
                        character_preds_idx += 1
            character_preds.append(subword_preds[-1].tolist())  # [SEP] label
            list_of_character_preds.extend(character_preds)
            original_labels = ["O"] + example["original_clean_labels"][: len(character_preds) - 2] + ["O"]
            originals = []
            for label in original_labels:
                originals.append(label_list.index(label))
            assert len(character_preds) == len(originals)
            list_of_originals.extend(originals)

        self._set_metrics_device()

        if write_predictions is True:
            self.predictions = list_of_character_preds

        for k, metric in self.metrics.items():
            metric(list_of_character_preds, list_of_originals, label_list)
            self.log(f"{data_type}/{k}", metric, on_step=False, on_epoch=True, logger=True)

    def tokenizer_out_aligner(self, t_in: str, t_out: List[str], strip_char: str = "##") -> List[str]:
        """Aligns with character-level labels after tokenization.
        Example:
            >>> t_in = "베쏭이,제5원소"
            >>> t_out = ['[UNK]', ',', '제', '##5', '##원', '##소']
            >>> tokenizer_out_aligner(t_in, t_out, strip_char="##")
            ['[UNK]', '[+UNK]', '[+UNK]', ',', '제', '##5', '##원', '##소']
            >>> t_in = "미나藤井美菜27가"
            >>> t_out = ['미나', '[UNK]', '[UNK]', '美', '[UNK]', '27', '##가']
            >>> tokenizer_out_aligner(t_in, t_out, strip_char="##")
            ['미나', '[UNK]', '[UNK]', '美', '[UNK]', '27', '##가']
        """
        t_out_new = []
        i, j = 0, 0
        UNK_flag = False
        while True:
            if i == len(t_in) and j == len(t_out) - 1:
                break
            step_t_out = len(t_out[j].replace(strip_char, "")) if t_out[j] != self.tokenizer.unk_token else 1
            if UNK_flag:
                t_out_new.append(self.in_unk_token)
            else:
                t_out_new.append(t_out[j])
            if j < len(t_out) - 1 and t_out[j] == self.tokenizer.unk_token and t_out[j + 1] != self.tokenizer.unk_token:
                i += step_t_out
                UNK_flag = True
                if t_in[i] == t_out[j + 1][0]:
                    j += 1
                    UNK_flag = False
            else:
                i += step_t_out
                j += 1
                UNK_flag = False
            if j == len(t_out):
                UNK_flag = True
                j -= 1
        return t_out_new

    @overrides
    def _convert_outputs_to_preds(self, outputs: List[Dict[str, torch.Tensor]]) -> torch.Tensor:
        logits = torch.cat([output["logits"] for output in outputs], dim=0)
        return torch.argmax(logits, axis=2)

    @staticmethod
    def add_specific_args(parser: argparse.ArgumentParser, root_dir: str) -> argparse.ArgumentParser:
        BaseTransformer.add_specific_args(parser, root_dir)
        return parser