In [1]:
!pip install transformers pandas scikit-learn typeguard matplotlib

In [2]:
import torch
torch.cuda.is_available()

In [15]:
from dataclasses import dataclass
from pathlib import Path

dataset = '../input/nbme-score-clinical-patient-notes'

@dataclass
class Config:
    # feature_num, case_num, feature_text
    features_path: Path = Path(f"{dataset}/features.csv")
    # pn_num, case_num, pn_history
    patient_notes_path: Path = Path(f"{dataset}/patient_notes.csv")
    # id, case_num, pn_num, feature_num, annotation, location
    train_path: Path = Path(f"{dataset}/train.csv")
    # id, case_num, pn_num, feature_num, annotation, location
    test_path: Path = Path(f"{dataset}/test.csv")
    # id, location
    submission_path: Path = Path(f"submission.csv")
    # model
    model: str = '../input/deberta/base'
    # token size
    token_size: int = 416
    # batch size (default: 8)
    batch_size: int = 8
    # device: 'cpu' or 'cuda'
    # device: str = 'cuda'
    device: str = 'cuda'
    # span thres
    span_thres: float = 0.4
    # epochs
    epochs: int = 10
    # debug?
    debug: bool = False

In [4]:

from typing import List, Tuple
from ast import literal_eval
import pandas as pd


def parse_location(location: str) -> List[Tuple[int, int]]:
    """
    "['682 688;695 697']" -> [(682, 688), (695, 697)]
    """
    def parse(spans: str) -> str:
        loc_strs = spans.split(";")
        for loc in loc_strs:
            start, end = loc.split()
            return (int(start), int(end))

    lst = literal_eval(location)
    return list(map(parse, lst))


def train_test_split(df: pd.DataFrame, test_ratio: float) -> Tuple[pd.DataFrame, pd.DataFrame]:
    assert(0.0 <= test_ratio <= 1.0)
    shuffle_df = df.sample(frac=1)
    train_size = int((1.0 - test_ratio) * len(df))
    return shuffle_df[:train_size], shuffle_df[train_size:]


In [8]:
import numpy as np
import pandas as pd
from torch.utils.data import Dataset
from transformers import AutoTokenizer


def tokenize_and_add_labels(
    tokenizer: AutoTokenizer,
    data: pd.Series,
    config: Config,
) -> pd.DataFrame:
    out = tokenizer(
        data["feature_text"],
        data["pn_history"],
        # 文の2番目の切り捨てを行う
        truncation="only_second",
        max_length=config.token_size,
        # 最大長でpaddingする
        padding='max_length',
        return_token_type_ids=True,
        return_offsets_mapping=True
    )
    # input_ids: トークンIDのリスト
    # token_type_ids: 文の種類を表すマスク(0, 1)
    labels = [0.0] * len(out["input_ids"])
    out["sequence_ids"] = out.sequence_ids()

    for idx, (seq_id, offsets) in enumerate(zip(out["sequence_ids"], out["offset_mapping"])):
        is_test_data = 'location' not in data.index
        if not seq_id or seq_id == 0 or is_test_data:
            labels[idx] = -1
            continue

        token_start, token_end = offsets
        for feature_start, feature_end in data["location"]:
            if token_start >= feature_start and token_end <= feature_end:
                labels[idx] = 1.0
                break

    out["labels"] = labels
    return out


def loopup(df, ref_df, key_label: str, value_label: str) -> pd.DataFrame:
    df[value_label] = df.merge(
        ref_df[[key_label, value_label]], on=key_label, how='right')[value_label]
    return df


def join_dfs(df, features, patient_notes):
    df = loopup(df, features, 'feature_num', 'feature_text')
    df = loopup(df, patient_notes, 'pn_num', 'pn_history')
    return df


def make_test_dataset(config: Config) -> pd.DataFrame:
    features = pd.read_csv(config.features_path)
    patient_notes = pd.read_csv(config.patient_notes_path)
    test = pd.read_csv(config.test_path)

    test = join_dfs(test, features, patient_notes)
    test = test[["id", "pn_history", "feature_text"]]
    return test


def make_dataset(config: Config) -> pd.DataFrame:
    features = pd.read_csv(config.features_path)
    patient_notes = pd.read_csv(config.patient_notes_path)
    train = pd.read_csv(config.train_path)

    train = join_dfs(train, features, patient_notes)
    train['location'] = train['location'].apply(parse_location)
    train = train[["id", "pn_history",
                   "feature_text", "annotation", 'location']]

    if config.debug:
        return pd.DataFrame(train.sample(n = 1000))
    else:
        return train


class QADataset(Dataset):
    def __init__(self, data: pd.DataFrame, tokenizer: AutoTokenizer, config: Config):
        self.data = data
        self.tokenizer = tokenizer
        self.config = config

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        data = self.data.iloc[idx]
        tokens = tokenize_and_add_labels(self.tokenizer, data, self.config)

        input_ids = np.array(tokens["input_ids"])
        attention_mask = np.array(tokens["attention_mask"])
        token_type_ids = np.array(tokens["token_type_ids"])

        labels = np.array(tokens["labels"])
        offset_mapping = np.array(tokens['offset_mapping'])
        sequence_ids = np.array(tokens['sequence_ids']).astype("float16")

        return input_ids, attention_mask, token_type_ids, labels, offset_mapping, sequence_ids


In [9]:
import torch.nn as nn
from transformers import AutoModel

class QAModel(nn.Module):
    def __init__(self, config: Config):
        super().__init__()

        self.bert = AutoModel.from_pretrained(config.model)
        dropout = 0.2
        self.dropout = nn.Dropout(p=dropout)
        self.config = config
        self.fc1 = nn.Linear(768, 512)
        self.fc2 = nn.Linear(512, 512)
        self.fc3 = nn.Linear(512, 1)

    def forward(
        self,
        input_ids,
        attention_mask,
        token_type_ids,
    ):
        outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids
        )
        hidden = outputs.last_hidden_state
        logits = self.fc1(hidden)
        logits = self.fc2(self.dropout(logits))
        logits = self.fc3(self.dropout(logits))
        logits = logits.squeeze(-1)
        return logits


In [10]:
from dataclasses import dataclass
from itertools import chain
from typing import List, Tuple

import numpy as np
import torch
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from torch.utils.data import DataLoader
from tqdm import tqdm


@dataclass
class Metrics:
    accuracy: float
    precision: float
    recall: float
    f1: float


def eval_model(
    epoch: int,
    config: Config,
    model: QAModel,
    dataloader: DataLoader,
    criterion,
    writer,
):
    """
    how to evaluation
    https://www.kaggle.com/c/nbme-score-clinical-patient-notes/overview/evaluation
    """
    DEVICE = torch.device(config.device)
    model.eval()

    valid_loss = []
    preds = []
    offsets = []
    seq_ids = []
    valid_labels = []

    for i, batch in enumerate(tqdm(dataloader)):
        (
            input_ids,
            attention_mask,
            token_type_ids,
            labels,
            offset_mapping,
            sequence_ids,
        ) = batch
        input_ids = input_ids.to(DEVICE)
        attention_mask = attention_mask.to(DEVICE)
        token_type_ids = token_type_ids.to(DEVICE)
        labels = labels.to(DEVICE)

        logits = model(
            input_ids, attention_mask, token_type_ids
        )

        loss = criterion(logits, labels)
        loss = torch.masked_select(loss, labels > -1.0).mean()

        valid_loss = loss.item() * input_ids.size(0)
        writer.add_scalar("valid/loss", valid_loss, epoch * len(dataloader) + i)

        preds.append(logits.detach().cpu().numpy())
        offsets.append(offset_mapping.numpy())
        seq_ids.append(sequence_ids.numpy())
        valid_labels.append(labels.detach().cpu().numpy())

    preds = np.concatenate(preds, axis=0)
    offsets = np.concatenate(offsets, axis=0)
    seq_ids = np.concatenate(seq_ids, axis=0)
    valid_labels = np.concatenate(valid_labels, axis=0)

    location_preds = get_location_predictions(
        preds, offsets, seq_ids, config.span_thres
    )
    score = calculate_char_cv(location_preds, offsets, seq_ids, valid_labels)
    return score


def get_location_predictions(
    # NdArray['test_size', 'token_size']
    preds,
    # NdArray['test_size', 'token_size', 2]
    # 各単語の [開始index, 終了index]
    offset_mapping,
    # TensorType['test_size', 'token_size']
    # question: 0, context: 1, otherwize: nan
    sequence_ids,
    thres: float = 0.5,
) -> List[List[Tuple[int, int]]]:
    """
    preds -> spans: List['test_size', 'span_count']
    """
    all_spans: List[List[Tuple[int, int]]] = []
    for pred, offsets, seq_ids in zip(preds, offset_mapping, sequence_ids):
        # logitsからprobabilityを計算
        pred = 1 / (1 + np.exp(-pred))

        start_idx = None
        end_idx = None
        current_preds: List[Tuple[int, int]] = []
        # print(pred, offsets, seq_ids)
        for pr, offset, seq_id in zip(pred, offsets, seq_ids):
            if seq_id is None or seq_id == 0:
                continue

            if pr > thres:
                if start_idx is None:
                    start_idx = offset[0]
                end_idx = offset[1]
            elif start_idx is not None:
                current_preds.append((start_idx, end_idx))
                start_idx = None
        all_spans.append(current_preds)

    return all_spans


def calculate_char_cv(
    predictions: List[List[Tuple[int, int]]],
    # NdArray['test_size', 'token_size', 2]
    offset_mapping: np.ndarray,
    # NdArray['test_size', 'token_size']
    sequence_ids: np.ndarray,
    # NdArray['test_size', 'token_size']
    labels: np.ndarray,
) -> Metrics:
    """
    文字単位で評価値を計算する
    """
    all_labels = []
    all_preds = []
    for preds, offsets, seq_ids, labels in zip(
        predictions, offset_mapping, sequence_ids, labels
    ):
        num_chars = max(list(chain(*offsets)))
        char_labels = np.zeros(num_chars)

        # ラベル
        for o, s_id, label in zip(offsets, seq_ids, labels):
            if s_id is None or s_id == 0:
                continue
            if int(label) == 1:
                char_labels[o[0] : o[1]] = 1

        char_preds = np.zeros(num_chars)

        # 予測結果
        for start_idx, end_idx in preds:
            char_preds[start_idx:end_idx] = 1

        all_labels.extend(char_labels)
        all_preds.extend(char_preds)

    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, average="binary", labels=np.unique(all_preds)
    )
    accuracy: float = accuracy_score(all_labels, all_preds)

    return Metrics(accuracy, precision, recall, f1)


In [17]:
from typing import List, Tuple

import torch
import torch.nn as nn
from torch import optim
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm
from transformers import AutoTokenizer


def train_model(
    epoch: int,
    config: Config,
    model: QAModel,
    dataloader: DataLoader,
    optimizer,
    criterion,
    writer,
):
    model.train()
    train_loss = []

    DEVICE = config.device

    for i, batch in enumerate(tqdm(dataloader)):
        optimizer.zero_grad()
        (
            input_ids,
            attention_mask,
            token_type_ids,
            labels,
            offset_mapping,
            sequence_ids,
        ) = batch

        # [batch, token_size]
        input_ids = input_ids.to(DEVICE)
        attention_mask = attention_mask.to(DEVICE)
        token_type_ids = token_type_ids.to(DEVICE)
        labels = labels.to(DEVICE)

        logits = model(input_ids, attention_mask, token_type_ids)
        loss = criterion(logits, labels)

        loss = torch.masked_select(loss, labels > -1.0).mean()

        train_loss = loss.item() * input_ids.size(0)
        writer.add_scalar("train/loss", train_loss, epoch * len(dataloader) + i)

        loss.backward()
        # clip the the gradients to 1.0. It helps in preventing the exploding gradient problem
        # it's also improve f1 accuracy slightly
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()


def train(config: Config):
    """
    train & save model
    """
    df = make_dataset(config)
    train, test = train_test_split(df, 0.3)
    print(f"train: {len(train)}, test: {len(test)}")
    DEVICE = torch.device(config.device)
    model = QAModel(config).to(DEVICE)
    tokenizer = AutoTokenizer.from_pretrained(config.model)

    train_dataset = QADataset(train, tokenizer, config)
    test_dataset = QADataset(test, tokenizer, config)

    train_dataloader = DataLoader(
        train_dataset, batch_size=config.batch_size, shuffle=True
    )
    test_dataloader = DataLoader(
        test_dataset, batch_size=config.batch_size, shuffle=False
    )

    criterion = torch.nn.BCEWithLogitsLoss(reduction="none")

    lr = 1e-5
    optimizer = optim.AdamW(model.parameters(), lr=lr)

    writer = SummaryWriter()

    for epoch in range(config.epochs):
        print(f"epoch: {epoch + 1}/{config.epochs}")

        train_model(epoch, config, model, train_dataloader, optimizer, criterion, writer)
        metrics = eval_model(epoch, config, model, test_dataloader, criterion, writer)
        writer.add_scalar("valid/f1", metrics.f1, epoch * len(test_dataloader))
        print(f'f1: {metrics.f1}')
        writer.add_scalar('valid/acc', metrics.accuracy, epoch * len(test_dataloader))
        writer.add_scalar('valid/prec', metrics.precision, epoch * len(test_dataloader))

    torch.save(model.to("cpu").state_dict(), "model.pth")


In [12]:

from typing import List, Tuple
import torch
import pandas as pd
from transformers import AutoTokenizer
import numpy as np
import matplotlib.pyplot as plt


def visualize(config: Config, logits: torch.Tensor):
    """
    render probablity distribution
    """
    assert len(logits.shape) == 1
    preds = 1 / (1 + np.exp(-logits[0]))
    n = len(preds)

    plt.xlabel('token index')
    plt.ylabel('pr')
    plt.plot(range(n), preds)
    plt.hlines(config.span_thres, 0, n)
    plt.show()


def predict(
    config: Config,
    tokenizer: AutoTokenizer,
    model: QAModel,
    series: pd.Series,
) -> Tuple[np.ndarray, List[Tuple[int, int]]]:
    """
    データ1つ -> 予測スパンとlogits
    """
    DEVICE = torch.device(config.device)
    tokens = tokenize_and_add_labels(tokenizer, series, config)
    # batch-rize ['token_size'] -> [1, 'token_size']
    input_ids = torch.LongTensor(np.array(tokens["input_ids"])) \
        .unsqueeze(0).to(DEVICE)
    # [1, 'token_size']
    attention_mask = torch.LongTensor(
        np.array(tokens["attention_mask"])).unsqueeze(0).to(DEVICE)
    # [1, 'token_size']
    token_type_ids = torch.LongTensor(
        np.array(tokens["token_type_ids"])).unsqueeze(0).to(DEVICE)

    # [1, 'token_size']
    logits = model(input_ids, attention_mask, token_type_ids)
    assert logits.shape == (1, config.token_size)
    # torch.Tensor -> numpy.ndarray
    logits = logits.detach().cpu().numpy()

    # [1, 'token_size']
    batched_offset_mapping = np.array(tokens['offset_mapping'])[np.newaxis, :]
    batched_sequence_ids = np.array(tokens['sequence_ids'])[np.newaxis, :]

    assert batched_offset_mapping.shape == (1, config.token_size, 2)
    assert batched_sequence_ids.shape == (1, config.token_size)

    spans = get_location_predictions(
        logits, batched_offset_mapping, batched_sequence_ids, config.span_thres)[0]
    return logits[0], spans


def test(config: Config):
    """
    TODO: dump submission.csv
    """
    test = make_test_dataset(config)

    model_path = 'model.pth'
    model = QAModel(config)
    model.load_state_dict(torch.load(
        model_path, map_location=torch.device('cpu')))
    DEVICE = torch.device(config.device)
    model.to(DEVICE)
    model.eval()

    tokenizer = AutoTokenizer.from_pretrained(config.model)

    submission = test['id'].to_frame()

    for idx, series in test.iterrows():
        logits, spans = predict(config, tokenizer, model, series)
        submission.loc[idx, 'location'] = ';'.join(
            map(lambda span: f'{span[0]} {span[1]}', spans)
        )
        print(spans)
        # visualize(config, logits)
        
    del model

    submission.to_csv(config.submission_path, index=False)


In [16]:
config = Config()
train(config)

In [None]:
test(config)