In [3]:
## Forked from skraiii's notebook https://www.kaggle.com/skraiii

# Pseudo Labeling

In [None]:
import pandas as pd
from pathlib import Path

INPUT_DIR = Path("../../input")
test = pd.read_csv(INPUT_DIR / "test.csv")
train = pd.read_csv(INPUT_DIR / "train.csv")
patient_notes = pd.read_csv(INPUT_DIR / "patient_notes.csv")
features = pd.read_csv(INPUT_DIR / "features.csv")
oof = pd.read_pickle(INPUT_DIR / "exp038-nbme-microsoft-deberta-v3-large/oof_df.pkl")

In [None]:
pn_nums = patient_notes["pn_num"].to_list()

In [None]:
from sklearn.model_selection import StratifiedKFold

unique_train_pn_num = train.pn_num.unique().tolist()
mask_not_in_train = [True if x not in unique_train_pn_num else False for x in pn_nums]
ssl_df = patient_notes.loc[mask_not_in_train].copy()
skf = StratifiedKFold(n_splits=5, random_state=0, shuffle=True)
ssl_df["fold"] = -1
for f, (t, v) in enumerate(skf.split(ssl_df.pn_history, ssl_df.case_num)):
    ssl_df.iloc[v, -1] = f
ssl_df.to_csv("corpus.csv", index=False)

In [None]:
features.merge(ssl_df, on="case_num").to_csv("pl_train", index=False)

In [None]:
from numpy import ndarray
from sklearn.metrics import f1_score


def get_score(y_true: ndarray, y_pred: ndarray) -> float:
    score = span_micro_f1(y_true, y_pred)
    return score


def micro_f1(preds: list, truths: list) -> float:
    """
    Micro f1 on binary arrays.

    Args:
        preds (list of lists of ints): Predictions.
        truths (list of lists of ints): Ground truths.

    Returns:
        float: f1 score.
    """
    # Micro : aggregating over all instances
    preds = np.concatenate(preds)
    truths = np.concatenate(truths)
    return f1_score(truths, preds)


def spans_to_binary(spans: list, length=None):
    """
    Converts spans to a binary array indicating whether each character is in the span.

    Args:
        spans (list of lists of two ints): Spans.

    Returns:
        np array [length]: Binarized spans.
    """
    length = np.max(spans) if length is None else length
    binary = np.zeros(length)
    for start, end in spans:
        binary[start:end] = 1
    return binary


def span_micro_f1(preds, truths):
    """
    Micro f1 on spans.

    Args:
        preds (list of lists of two ints): Prediction spans.
        truths (list of lists of two ints): Ground truth spans.

    Returns:
        float: f1 score.
    """
    bin_preds = []
    bin_truths = []
    for pred, truth in zip(preds, truths):
        if not len(pred) and not len(truth):
            continue
        length = max(
            np.max(pred) if len(pred) else 0, np.max(truth) if len(truth) else 0
        )
        bin_preds.append(spans_to_binary(pred, length))
        bin_truths.append(spans_to_binary(truth, length))
    return micro_f1(bin_preds, bin_truths)

In [None]:
import torch
from torch import Tensor
from transformers.tokenization_utils import PreTrainedTokenizer


def create_label(
    tokenizer: PreTrainedTokenizer,
    max_len,
    text: str,
    annotation_length: int,
    location_list: list,
) -> Tensor:

    encoded = tokenizer(
        text,
        add_special_tokens=True,
        max_length=max_len,
        padding="max_length",
        return_offsets_mapping=True,
    )
    offset_mapping = encoded["offset_mapping"]
    ignore_idxes = np.where(np.array(encoded.sequence_ids()) != 0)[0]
    label = np.zeros(len(offset_mapping))
    label[ignore_idxes] = -1
    if annotation_length != 0:
        for location in location_list:
            for loc in [s.split() for s in location.split(";")]:
                start_idx = -1
                end_idx = -1
                start, end = int(loc[0]), int(loc[1])
                for idx in range(len(offset_mapping)):
                    if (start_idx == -1) & (start < offset_mapping[idx][0]):
                        start_idx = idx - 1
                    if (end_idx == -1) & (end <= offset_mapping[idx][1]):
                        end_idx = idx + 1
                if start_idx == -1:
                    start_idx = end_idx
                if (start_idx != -1) & (end_idx != -1):
                    label[start_idx:end_idx] = 1
    return torch.tensor(label[:max_len], dtype=torch.float)


def get_char_probs(texts, predictions, tokenizer) -> list:
    results = [np.zeros(len(t)) for t in texts]
    for i, (text, prediction) in enumerate(zip(texts, predictions)):
        encoded = tokenizer(text, add_special_tokens=True, return_offsets_mapping=True)
        prev_pred = 0
        prev_end = -1
        for idx, (offset_mapping, pred) in enumerate(
            zip(encoded["offset_mapping"], prediction)
        ):
            start = offset_mapping[0]
            end = offset_mapping[1]
            results[i][start:end] = pred
            if start != prev_end:
                results[i][prev_end:start] = (pred + prev_pred) / 2
            prev_pred = pred
            prev_end = end
    return results


def get_results(char_probs: list, pn_histories: list, th: float = 0.5) -> list:
    label_strs = []
    for char_prob, pn_history in zip(char_probs, pn_histories):
        pos_char_indices = np.where(char_prob > th)[0] + 1
        if len(pos_char_indices) > 0 and pos_char_indices[0] == 1:
            pos_char_indices = np.hstack([[0], pos_char_indices])
        clustered_pos_char_indices = cluster_elements(xs=pos_char_indices)

        for i in range(len(clustered_pos_char_indices)):
            if len(clustered_pos_char_indices[i]) > 0:

                # 1文字目がspaceの場合
                target_idx = clustered_pos_char_indices[i][0] - 1
                if target_idx > -1 and pn_history[target_idx] != " ":
                    clustered_pos_char_indices[i] = np.hstack(
                        [[target_idx], clustered_pos_char_indices[i]]
                    )

                # 1文字目が\r\nの場合
                if clustered_pos_char_indices[i][0] > 0 and clustered_pos_char_indices[
                    i
                ][0] + 2 < len(pn_history):
                    if (
                        pn_history[
                            clustered_pos_char_indices[i][
                                0
                            ] : clustered_pos_char_indices[i][0]
                            + 2
                        ]
                        == "\r\n"
                    ):
                        clustered_pos_char_indices[i] = clustered_pos_char_indices[i][
                            2:
                        ]

                # 最後の2文字が\n-の場合
                target_idx = clustered_pos_char_indices[i][-1] - 2
                if target_idx > 0 and pn_history[target_idx : target_idx + 2] == "\n-":
                    clustered_pos_char_indices[i] = clustered_pos_char_indices[i][:-2]

        pos_char_spans = []
        if len(clustered_pos_char_indices[0]) != 0:
            for x in clustered_pos_char_indices:
                if len(x) > 0:
                    pos_char_spans.append([x[0], x[-1]])

        label_strs.append(";".join([f"{x[0]} {x[1]}" for x in pos_char_spans]))

    return label_strs


def get_predictions(results):
    predictions = []
    for result in results:
        prediction = []
        if result != "":
            for loc in [s.split() for s in result.split(";")]:
                start, end = int(loc[0]), int(loc[1])
                prediction.append([start, end])
        predictions.append(prediction)
    return predictions


def cluster_elements(xs: list) -> list:
    clusters = [[]]

    if len(xs) == 0:
        return clusters

    prev_x = xs[0] - 1
    for x in xs:
        if x == prev_x + 1:
            clusters[-1].append(x)
        else:
            clusters.append([x])
        prev_x = x
    return clusters


import ast
from pandas import DataFrame


def get_result(
    oof_df: DataFrame, tokenizer: PreTrainedTokenizer, max_len: int
) -> tuple:
    labels = create_labels_for_scoring(oof_df)
    predictions = oof_df[[i for i in range(max_len)]].to_numpy()
    char_probs = get_char_probs(oof_df["pn_history"].to_numpy(), predictions, tokenizer)
    pn_histories = oof_df["pn_history"].to_list()

    score = -100
    for th in np.arange(0.3, 0.7, 0.005):
        th = np.round(th, 4)
        results = get_results(char_probs, pn_histories, th=th)
        preds = get_predictions(results)
        tmp_score = get_score(labels, preds)
        if tmp_score > score:
            best_th = th
            score = tmp_score
    print(f"Score: {score:<.4f} Best threshold:: {best_th}")
    return score, best_th


def create_labels_for_scoring(df: DataFrame):
    # example: ['0 1', '3 4'] -> ['0 1; 3 4']
    df["location_for_create_labels"] = [ast.literal_eval(f"[]")] * len(df)
    for i in range(len(df)):
        lst = df.loc[i, "location"]
        if lst:
            new_lst = ";".join(lst)
            df.loc[i, "location_for_create_labels"] = ast.literal_eval(
                f'[["{new_lst}"]]'
            )
    # create labels
    truths = []
    for location_list in df["location_for_create_labels"].values:
        truth = []
        if len(location_list) > 0:
            location = location_list[0]
            for loc in [s.split() for s in location.split(";")]:
                start, end = int(loc[0]), int(loc[1])
                truth.append([start, end])
        truths.append(truth)
    return truths

In [None]:
import pandas as pd
from ast import literal_eval

train = pd.read_csv("../input/nbme-score-clinical-patient-notes/train.csv")
train["annotation"] = train["annotation"].map(lambda x: literal_eval(x))
train["location"] = train["location"].map(lambda x: literal_eval(x))
train["annotation_length"] = train["annotation"].map(lambda x: len(x))
train = train.sort_values(by="id").reset_index()
mask = train["annotation_length"] == 0

oof = pd.read_pickle("../input/exp038-nbme-microsoft-deberta-v3-large/oof_df.pkl")
oof = oof.sort_values(by="id").reset_index()

In [None]:
cols = [
    "id",
    "case_num",
    "pn_num",
    "feature_num",
    "annotation",
    "location",
    "feature_text",
    "pn_history",
    "annotation_length",
    "fold",
]
oof.loc[mask, cols]

In [None]:
max_len = 354

pn_histories = oof.loc[mask, "pn_history"].to_list()
char_probs = get_char_probs(
    pn_histories, oof.loc[mask, range(max_len)].to_numpy(), deberta_tokenizer
)

th = 0.5
results = get_results(char_probs, pn_histories, th=th)
preds = get_predictions(results)

In [None]:
pl_locations = [str([str(y)[1:-2].replace(",", "") for y in x]) for x in preds]

print(len(pl_locations))

oof.loc[mask, "location"] = pl_locations
x = oof.loc[mask]
x[x.location != "[]"].iloc[:, :11].to_csv("pl_train.csv", index=False)