# Library

In [1]:
import polars as pl
import gc
import pickle
from pathlib import Path, PosixPath
from tqdm.auto import tqdm
from collections import defaultdict, Counter

import sys

sys.path.append("..")

from src.utils import seed_everything, get_logger, get_config, TimeUtil
from src.preprocess import DataProvider1st

# Setup

In [2]:
# コマンドライン引数
exp = "087"

In [3]:
config = get_config(exp, config_dir=Path("../config"))
logger = get_logger(config.output_path)
logger.info(f"exp:{exp} start")

seed_everything(config.seed)

[ [32m2024-10-10 08:19:04[0m | [1mINFO ] exp:087 start[0m


In [4]:
config.debug = True
config.exter_dataset

[['nicholas', True], ['mpware', False], ['pjma', False]]

# Data

In [5]:
# ランダムな位置に出現するようにするギミック
# ラベルスムージングをどこで入れるか

import numpy as np
import gc
import random
from typing import List, Literal
import torch
from omegaconf import DictConfig
from torch.utils.data import Dataset
from transformers import AutoTokenizer
import statistics
import itertools


from src.utils.competition_utils import (
    mapping_index_org2char,
    mapping_index_char2token,
    mapping_index_char2token_overlapped,
)


class DetectDataset(Dataset):
    def __init__(
        self,
        config: DictConfig,
        data: List[dict],
        tokenizer: AutoTokenizer,
        data_type: Literal["train", "valid", "test"],
    ):
        self.config = config
        self.data_type = data_type
        self.tokenizer = tokenizer
        self.doc_ids = [d["document"] for d in data]
        self.full_texts = [d["full_text"] for d in data]
        self.org_tokens = [d["tokens"] for d in data]
        self.whitespaces = [d["trailing_whitespace"] for d in data]
        self.additionals = [d["additional"] for d in data]

        stride = config.train_stride if data_type == "train" else config.eval_stride
        tokens = tokenizer(
            self.full_texts,
            max_length=config.max_length,
            stride=stride,
            return_overflowing_tokens=True,
            return_offsets_mapping=True,
            truncation=True,
        )
        self.input_ids = tokens["input_ids"]
        self.attention_mask = tokens["attention_mask"]
        self.offset_mapping = tokens["offset_mapping"]
        self.overflow_mapping = tokens["overflow_to_sample_mapping"]

        # 外部データはidxがstr型なので全てstr型に変換されることに注意
        self.overlap_doc_ids = np.array(self.doc_ids)[self.overflow_mapping]
        self.overlap_additional = np.array(self.additionals)[self.overflow_mapping]

        org_tokens_idx = [list(range(len(d["tokens"]))) for d in data]
        org_tokens_len = {str(d["document"]): len(d["tokens"]) for d in data}

        char_org_idx = mapping_index_org2char(
            self.full_texts,
            self.org_tokens,
            org_tokens_idx,
            self.whitespaces,
            fill_val=-1,
        )
        token_org_idx = mapping_index_char2token_overlapped(
            char_org_idx,
            self.input_ids,
            self.offset_mapping,
            self.overflow_mapping,
            fill_val=-1,
        )

        # 相対的な位置情報
        self.positions_ratio = [
            np.clip(np.array(org_idx) / org_tokens_len[str(doc_id)], 0, None)
            for org_idx, doc_id in zip(token_org_idx, self.overlap_doc_ids)
        ]
        # 絶対的な位置情報
        self.positions_abs = [
            np.clip(np.array(org_idx) / 3298, 0, None) for org_idx in token_org_idx
        ]  # 3298 is the maximum token length of dataset

        # ラベルを取得する
        if data_type in ["train", "valid"]:
            self.org_labels = [d["labels"] for d in data]
            org_labels_dict = {str(d["document"]): np.array(d["labels"]) for d in data}

            self.token_labels = []
            for org_idx, doc_id in zip(token_org_idx, self.overlap_doc_ids):
                label = org_labels_dict[str(doc_id)][org_idx]
                space_idx = np.where(np.array(org_idx) == -1)[0]
                label[space_idx] = -1  # -1になっている場合は-1を保持する必要がある
                self.token_labels.append(label)

    def __getitem__(self, idx: int):
        if self.data_type in ["train", "valid"]:
            return (
                torch.tensor(self.input_ids[idx], dtype=torch.long, device="cpu"),
                torch.tensor(self.attention_mask[idx], dtype=torch.long, device="cpu"),
                torch.tensor(self.positions_ratio[idx], dtype=torch.float, device="cpu"),
                torch.tensor(self.positions_abs[idx], dtype=torch.float, device="cpu"),
                torch.tensor(self.token_labels[idx], dtype=torch.long, device="cpu"),
            )
        else:
            return (
                torch.tensor(self.input_ids[idx], dtype=torch.long, device="cpu"),
                torch.tensor(self.attention_mask[idx], dtype=torch.long, device="cpu"),
                torch.tensor(self.positions_ratio[idx], dtype=torch.float, device="cpu"),
                torch.tensor(self.positions_abs[idx], dtype=torch.float, device="cpu"),
            )

    def __len__(self):
        return len(self.input_ids)

    def drop_first_only_data(self):
        """
        追加学習を行う際に, 1段階目のみ使用するデータセットを削除する
        """
        assert self.data_type == "train"
        mask = self.overlap_additional
        self.input_ids = list(itertools.compress(self.input_ids, mask))
        self.attention_mask = list(itertools.compress(self.attention_mask, mask))
        self.positions_ratio = list(itertools.compress(self.positions_ratio, mask))
        self.positions_abs = list(itertools.compress(self.positions_abs, mask))
        self.token_labels = list(itertools.compress(self.token_labels, mask))


class DetectRandomDataset(Dataset):
    def __init__(
        self,
        config: DictConfig,
        data: List[dict],
        tokenizer: AutoTokenizer,
        data_type: Literal["train", "valid", "test"],
    ):
        self.config = config
        self.data_type = data_type
        self.tokenizer = tokenizer
        self.doc_ids = [d["document"] for d in data]
        self.full_texts = [d["full_text"] for d in data]
        self.org_tokens = [d["tokens"] for d in data]
        self.whitespaces = [d["trailing_whitespace"] for d in data]
        self.additionals = [d["additional"] for d in data]

        tokens = tokenizer(self.full_texts, return_offsets_mapping=True)
        self.input_ids = tokens["input_ids"]
        self.attention_mask = tokens["attention_mask"]
        self.offset_mapping = tokens["offset_mapping"]
        self.overflow_mapping = tokens["overflow_to_sample_mapping"]

# 検証

In [6]:
dpr = DataProvider1st(config, "train")
data = dpr.load_data()
len(data)

400

In [12]:
tokenizer = AutoTokenizer.from_pretrained(config.model_path)



In [13]:
dataset = DetectDataset(config, data, tokenizer, "train")

In [14]:
input_ids = dataset.input_ids
additionals = dataset.additionals

In [None]:
# random位置にpiiが含まれるようにwindowを作成する -> cv下がったのでこれは使用しない
class FirstStageDatasetWithRandom(Dataset):
    def __init__(
        self,
        config,
        data: List[dict],
        tokenizer: AutoTokenizer,
        data_type: Literal["train", "valid", "test"],
    ):
        if get_window == "random":
            assert data_type == "train"  # randomはtrainのみのオプション

        self.config = config
        self.get_window = get_window
        self.data_type = data_type
        self.tokenizer = tokenizer
        self.doc_ids = [d["document"] for d in data]
        self.texts = [d["full_text"] for d in data]
        self.org_tokens = [d["tokens"] for d in data]
        self.whitespace = [d["trailing_whitespace"] for d in data]

        if get_window == "overlap":
            stride = config.train_stride if data_type == "train" else config.eval_stride
            tokens = tokenizer(
                self.texts,
                max_length=config.max_length,
                stride=stride,
                return_overflowing_tokens=True,
                return_offsets_mapping=True,
                truncation=True,
            )
        elif get_window == "random":
            tokens = tokenizer(
                self.texts,
                return_offsets_mapping=True,
            )

        self.input_ids = tokens["input_ids"]
        self.attention_mask = tokens["attention_mask"]
        self.offset_mapping = tokens["offset_mapping"]
        if get_window == "overlap":
            self.overflow_mapping = tokens["overflow_to_sample_mapping"]
            self.overlap_doc_ids = np.array(self.doc_ids)[self.overflow_mapping]

        if self.data_type in ["train", "valid"]:
            self.org_labels = [d["labels"] for d in data]
            self.char_labels = convert_org2char_labels(self.texts, self.org_tokens, self.whitespace, self.org_labels)
            if get_window == "overlap":
                self.token_labels = convert_char2token_labels_overlapped(
                    self.char_labels, self.input_ids, self.offset_mapping, self.overflow_mapping
                )
            elif get_window == "random":
                self.token_labels = convert_char2token_labels(
                    self.char_labels, self.input_ids, self.offset_mapping, fill_val=0
                )
                self.pii_locs = self.find_pii_locations(self.doc_ids, self.token_labels)

    def __getitem__(self, idx: int):
        # doc_ids, input_ids, attention_mask, token_labels, offset_mapping
        if self.get_window == "overlap":
            if self.data_type in ["train", "valid"]:
                return (
                    torch.tensor(self.input_ids[idx], dtype=torch.long, device="cpu"),
                    torch.tensor(self.attention_mask[idx], dtype=torch.long, device="cpu"),
                    torch.tensor(self.token_labels[idx], dtype=torch.long, device="cpu"),
                )
            else:
                return (
                    torch.tensor(self.input_ids[idx], dtype=torch.long, device="cpu"),
                    torch.tensor(self.attention_mask[idx], dtype=torch.long, device="cpu"),
                )
        elif self.get_window == "random":  # randomはtrainのみ
            if (
                random.random() >= 0.50
            ):  # 50%の確率でnegative sampling(ランダムなwindowを返す -> ほとんどnegativeになる)
                global_idx, _, token_idx = self.pii_locs[idx]
                input_ids = self.input_ids[global_idx]
                attention_mask = self.attention_mask[global_idx]
                token_labels = self.token_labels[global_idx]
            else:
                global_idx = random.choice(range(len(self.input_ids)))
                input_ids = self.input_ids[global_idx]
                attention_mask = self.attention_mask[global_idx]
                token_labels = self.token_labels[global_idx]
                token_idx = random.choice(range(len(input_ids)))

            left_length = random.randint(0, self.config.max_length - 1)
            start_idx = max(0, token_idx - left_length)
            end_idx = start_idx + self.config.max_length
            excess_right_length = max(0, end_idx - len(input_ids))
            start_idx = max(0, start_idx - excess_right_length)
            end_idx = end_idx - excess_right_length
            assert start_idx <= token_idx and token_idx <= end_idx

            input_ids = input_ids[start_idx:end_idx]
            attention_mask = attention_mask[start_idx:end_idx]
            token_labels = token_labels[start_idx:end_idx]

            return (
                torch.tensor(input_ids, dtype=torch.long, device="cpu"),
                torch.tensor(attention_mask, dtype=torch.long, device="cpu"),
                torch.tensor(token_labels, dtype=torch.long, device="cpu"),
            )

    def __len__(self):
        if self.get_window == "overlap":
            return len(self.input_ids)
        elif self.get_window == "random":
            return len(self.pii_locs)

    def find_pii_locations(self, doc_ids: List[int], token_labels: List[int]) -> List[List[int]]:
        # 各documentのtoken_labelsからPIIの位置を特定する
        pii_locs = []
        for i, (doc_id, token_label) in enumerate(zip(doc_ids, token_labels)):
            is_pii_array = np.where(token_label != 0, 1, 0)
            is_pii_diff = np.where(np.diff(is_pii_array, prepend=0) == 1, 1, 0)
            pii_index = np.where(is_pii_diff == 1)[0]
            for idx in pii_index:
                pii_locs.append([i, doc_id, idx])
        return pii_locs

In [None]:
import numpy as np
from typing import List, Tuple
from torch.utils.data import DataLoader
from transformers import AutoTokenizer

from src.torch.first_stage.dataset import FirstStageDataset
from src.torch.first_stage.collate_fn import CollateFn
from src.torch.data_utils import get_sampler


def get_train_loaders(
    config,
    data: List[dict],
    fold_array: np.ndarray = None,
) -> List[Tuple[DataLoader, DataLoader]]:
    """
    Args:
        use_fold: if not None, only use the first `use_fold` folds
    """
    data = np.array(data)
    tokenizer = AutoTokenizer.from_pretrained(config.model_path)
    if config.add_newline_token:
        tokenizer.add_tokens(["\n", "\r"], special_tokens=True)
    collate_fn = CollateFn(tokenizer, is_train=True)

    dataloaders = []
    for fold in range(config.n_fold):
        if config.use_fold is not None and config.use_fold <= fold:
            break
        train_idx = np.where(fold_array != fold)[0]
        valid_idx = np.where(fold_array == fold)[0]
        train_data = data[train_idx]
        valid_data = data[valid_idx]

        train_dataset = FirstStageDataset(
            config,
            train_data,
            tokenizer,
            data_type="train",
        )
        valid_dataset = FirstStageDataset(config, valid_data, tokenizer, data_type="valid")
        train_sampler = get_sampler(train_dataset)
        train_loader = DataLoader(
            train_dataset,
            sampler=train_sampler,
            batch_size=config.train_batch,
            collate_fn=collate_fn,
            pin_memory=True,
            drop_last=True,
        )
        valid_loader = DataLoader(
            valid_dataset,
            batch_size=config.eval_batch,
            collate_fn=collate_fn,
            shuffle=False,
            pin_memory=True,
            drop_last=False,
        )
        dataloaders.append((train_loader, valid_loader))
    return dataloaders


def get_full_train_loader(config, data: List[dict]) -> DataLoader:
    tokenizer = AutoTokenizer.from_pretrained(config.model_path)
    if config.add_newline_token:
        tokenizer.add_tokens(["\n", "\r"], special_tokens=True)

    collate_fn = CollateFn(tokenizer, is_train=True)
    train_dataset = FirstStageDataset(
        config,
        data,
        tokenizer,
        data_type="train",
    )
    train_sampler = get_sampler(train_dataset)
    train_loader = DataLoader(
        train_dataset,
        sampler=train_sampler,
        batch_size=config.train_batch,
        collate_fn=collate_fn,
        pin_memory=True,
        drop_last=True,
    )
    return train_loader

In [None]:
import numpy as np
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import WeightedRandomSampler
from transformers import AutoTokenizer


class CollateFn:
    def __init__(self, tokenizer: AutoTokenizer, is_train: bool = True):
        self.tokenizer = tokenizer
        self.is_train = is_train

    def __call__(self, batch):
        if self.is_train:
            input_ids, attention_mask, token_labels = zip(*batch)
        else:
            input_ids, attention_mask = zip(*batch)

        input_ids = pad_sequence(input_ids, batch_first=True, padding_value=self.tokenizer.pad_token_id)
        attention_mask = pad_sequence(attention_mask, batch_first=True, padding_value=0)
        if self.is_train:
            token_labels = pad_sequence(token_labels, batch_first=True, padding_value=-1)
            return input_ids, attention_mask, token_labels
        return input_ids, attention_mask

In [None]:
import torch
import torch.nn as nn
from transformers import AutoModel, AutoConfig


class FirstStageModel(nn.Module):
    def __init__(self, config, class_num: int):
        super().__init__()
        self.config = config
        self.use_hidden_states = config.use_hidden_states  # backboneのhidden_statesを何層使用するか
        self.model_config = AutoConfig.from_pretrained(config.model_path)
        self.model_config.update(
            {
                "hidden_dropout_prob": config.hidden_dropout,
                "attention_probs_dropout_prob": config.attention_dropout,
                "output_hidden_states": True,
            }
        )
        hidden_size = self.model_config.hidden_size
        self.backbone = AutoModel.from_pretrained(config.model_path, config=self.model_config)
        self.head = nn.Sequential(
            nn.Linear(hidden_size * self.use_hidden_states, 128),
            nn.LayerNorm(128),
            nn.ReLU(),
            nn.Dropout(config.dropout),
            nn.Linear(128, class_num),
        )
        self.dropout = nn.Dropout(config.dropout)
        self._init_weights(self.head)

    # DeBERTa初期化関数
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            module.weight.data.normal_(mean=0.0, std=self.model_config.initializer_range)
            if module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.Embedding):
            module.weight.data.normal_(mean=0.0, std=self.model_config.initializer_range)
            if module.padding_idx is not None:
                module.weight.data[module.padding_idx].zero_()
        elif isinstance(module, nn.LayerNorm):
            module.bias.data.zero_()
            module.weight.data.fill_(1.0)

    def forward(self, input_ids, attention_mask):
        outputs = self.backbone(input_ids=input_ids, attention_mask=attention_mask)
        x = torch.cat(outputs.hidden_states[-self.use_hidden_states :], dim=-1)  # N層分のhidden_statesをconcat
        x = self.dropout(x)
        x = self.head(x)
        return x

    def reinit_layers(self, reinit_layer_num: int):
        """
        学習済みのweightを初期化して学習を行う
        """
        for i in range(1, reinit_layer_num + 1):
            self.backbone.encoder.layer[-i].apply(self._init_weights)

    def freeze_layers(self, freeze_layer_num: int):
        """
        N層目までをfreezeする
        """
        for i in range(freeze_layer_num):
            if i == 0:
                for params in self.backbone.embeddings.parameters():
                    params.requires_grad = False
            else:
                for params in self.backbone.encoder.layer[i - 1].parameters():
                    params.requires_grad = False

    def freeze_backbone(self, reinit_layer_num: int):
        """
        N-step目まではbackboneをfreezeしてheadのみ学習を行う
        """
        for param in self.backbone.parameters():
            param.requires_grad = False

        # reinit_layerに関してはfreezeしない
        for i in range(reinit_layer_num):
            for params in self.backbone.encoder.layer[i - 1].parameters():
                params.requires_grad = True

    def unfreeze_backbone(self, freeze_layer_num: int):
        """
        N-step目以降はbackboneも学習を行う
        """
        for param in self.backbone.parameters():
            param.requires_grad = True

        # freeze_layer_num層目までは学習全体を通してfreezeしておく
        for i in range(freeze_layer_num):
            if i == 0:
                for params in self.backbone.embeddings.parameters():
                    params.requires_grad = False
            else:
                for params in self.backbone.encoder.layer[i - 1].parameters():
                    params.requires_grad = False


class FirstStageModelWithGRU(nn.Module):
    def __init__(self, config, class_num: int):
        super().__init__()
        self.config = config
        self.use_hidden_states = config.use_hidden_states  # backboneのhidden_statesを何層使用するか
        self.model_config = AutoConfig.from_pretrained(config.model_path)
        self.model_config.update(
            {
                "hidden_dropout_prob": config.hidden_dropout,
                "attention_probs_dropout_prob": config.attention_dropout,
                "output_hidden_states": True,
            }
        )
        hidden_size = self.model_config.hidden_size
        self.backbone = AutoModel.from_pretrained(config.model_path, config=self.model_config)
        self.gru = nn.GRU(hidden_size, hidden_size // 2, num_layers=1, bidirectional=True, batch_first=True)
        self.linear = nn.Sequential(
            nn.Linear(hidden_size * self.use_hidden_states, hidden_size),
            nn.LayerNorm(hidden_size),
            nn.ReLU(),
            nn.Dropout(config.dropout),
        )
        self.head = nn.Sequential(
            nn.Linear(hidden_size * 2, 128),
            nn.LayerNorm(128),
            nn.ReLU(),
            nn.Dropout(config.dropout),
            nn.Linear(128, class_num),
        )
        self.layernorm1 = nn.LayerNorm(hidden_size)
        self.layernorm2 = nn.LayerNorm(hidden_size * 2)
        self.dropout = nn.Dropout(config.dropout)
        self._init_weights(self.linear)
        self._init_weights(self.head)
        # self._gru_init_weights(self.gru)

    # DeBERTa初期化関数
    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            module.weight.data.normal_(mean=0.0, std=self.model_config.initializer_range)
            if module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.Embedding):
            module.weight.data.normal_(mean=0.0, std=self.model_config.initializer_range)
            if module.padding_idx is not None:
                module.weight.data[module.padding_idx].zero_()
        elif isinstance(module, nn.LayerNorm):
            module.bias.data.zero_()
            module.weight.data.fill_(1.0)

    # Tensorflow/Keras-like initialization for GRU
    def _gru_init_weights(self, module):
        for name, p in module.named_parameters():
            if "weight_ih" in name:
                nn.init.xavier_uniform_(p.data)
            elif "weight_hh" in name:
                nn.init.orthogonal_(p.data)
            elif "bias" in name:
                p.data.fill_(0)

    def forward(self, input_ids, attention_mask):
        outputs = self.backbone(input_ids=input_ids, attention_mask=attention_mask)
        x = torch.cat(outputs.hidden_states[-self.use_hidden_states :], dim=-1)  # N層分のhidden_statesをconcat
        x = self.linear(x)
        y, _ = self.gru(x)
        y = self.layernorm1(y)
        y = self.dropout(y)
        x = torch.cat([x, y], dim=-1)
        x = self.layernorm2(x)
        x = self.head(x)
        return x

    def reinit_layers(self, reinit_layer_num: int):
        """
        学習済みのweightを初期化して学習を行う
        """
        for i in range(1, reinit_layer_num + 1):
            self.backbone.encoder.layer[-i].apply(self._init_weights)

    def freeze_layers(self, freeze_layer_num: int):
        """
        N層目までをfreezeする
        """
        for i in range(freeze_layer_num):
            if i == 0:
                for params in self.backbone.embeddings.parameters():
                    params.requires_grad = False
            else:
                for params in self.backbone.encoder.layer[i - 1].parameters():
                    params.requires_grad = False

    def freeze_backbone(self, reinit_layer_num: int):
        """
        N-step目まではbackboneをfreezeしてheadのみ学習を行う
        """
        for param in self.backbone.parameters():
            param.requires_grad = False

        # reinit_layerに関してはfreezeしない
        for i in range(reinit_layer_num):
            for params in self.backbone.encoder.layer[i - 1].parameters():
                params.requires_grad = True

    def unfreeze_backbone(self, freeze_layer_num: int):
        """
        N-step目以降はbackboneも学習を行う
        """
        for param in self.backbone.parameters():
            param.requires_grad = True

        # freeze_layer_num層目までは学習全体を通してfreezeしておく
        for i in range(freeze_layer_num):
            if i == 0:
                for params in self.backbone.embeddings.parameters():
                    params.requires_grad = False
            else:
                for params in self.backbone.encoder.layer[i - 1].parameters():
                    params.requires_grad = False


def get_model(config):
    if config.with_gru:
        model = FirstStageModelWithGRU(config, config.class_num)
    else:
        model = FirstStageModel(config, config.class_num)
    model = model.to(config.device)
    if config.reinit_layer_num > 0:
        model.reinit_layers(config.reinit_layer_num)
    if config.freeze_layer_num > 0:
        model.freeze_layers(config.freeze_layer_num)
    return model

In [None]:
class DetectModel(nn.Module):
    pass


class ClassifyModel(nn.Module):
    pass