In [None]:
import os
os.environ["TOKENIZERS_PARALLELISM"] = "true"

from tqdm import tqdm
import pickle
import random
import datetime
import json

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
torch.set_grad_enabled(False)

from transformers import DataCollatorForTokenClassification
from transformers import AutoModel, AutoConfig
from transformers.modeling_outputs import TokenClassifierOutput
from transformers import AutoTokenizer, DebertaV2Tokenizer


class cfg:
    pretrained_checkpoint = 'microsoft/deberta-base'  # microsoft/deberta-large /  microsoft/deberta-v3-large
    model_dir = "/home/xm/workspace/output/1002" # 模型权重文件路径
    data_path = "/home/xm/workspace/nbme-score-clinical-patient-notes/train_pl_all.pkl" # 数据路径
    batch_size = 128

def save_pickle(obj, path):
    '''
    保存json文件
    '''
    with open(path, 'wb') as f:
        pickle.dump(obj, f)
        
pl_df = pd.read_pickle(cfg.data_path)
tokenizer = AutoTokenizer.from_pretrained(cfg.pretrained_checkpoint, trim_offsets=False)  # trim_offsets==False 删除因offsets造成的空白token
pl_df

In [None]:
# DataSet
def prepare_input(tokenizer, text, feature_text):
    '''
    构造 input 数据
    '''
    inputs = tokenizer(text, feature_text, #note and feature
                       add_special_tokens=True, # 加入特殊token 如[CLS]，[SEP] 
                       return_offsets_mapping=False # 将每个tokens映射回原始文本character级别的位置。
                      )
    return inputs



class NBMEDatasetInfer(Dataset):
    def __init__(self, tokenizer, df):
        self.tokenizer = tokenizer
        self.feature_texts = df['feature_text'].values # feature_text
        self.pn_historys = df['pn_history'].values  # notes_text

    def __len__(self):
        return len(self.feature_texts) # 样本数

    def __getitem__(self, item):
        inputs = prepare_input(self.tokenizer,
                               self.pn_historys[item],
                               self.feature_texts[item]
                              )
        return inputs

In [None]:
# Model
class NBMEModel(nn.Module):
    def __init__(self, checkpoint):
        super().__init__()
        self.config = AutoConfig.from_pretrained(checkpoint, output_hidden_states=True) # AutoConfig
        self.backbone = AutoModel.from_pretrained(checkpoint) # AutoModel
        self.dropout = nn.Dropout(0.1) # Dropout
        self.classifier = nn.Linear(self.config.hidden_size, 1) #MLP
        self._init_weights(self.classifier) # 初始化 

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            # Linear 层
            module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)
            if module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, nn.Embedding):
            # Embedding 层 
            module.weight.data.normal_(mean=0.0, std=self.config.initializer_range)  # initializer_range: 0.02
            if module.padding_idx is not None:
                # padding部分置零
                module.weight.data[module.padding_idx].zero_()
        elif isinstance(module, nn.LayerNorm): 
            # Normalization层 
            module.bias.data.zero_()
            module.weight.data.fill_(1.0)

    def forward(self, **inputs):
        #inputs(dict)
        #    input_ids, token_type_ids, attention_mask, label
        outputs = self.backbone(**{k: v for k, v in inputs.items() if k != 'label'})
        # outputs: [last_hidden_state], last_hidden_state: [bs, seq_len, hidden_size]
        sequence_output = outputs[0]
        logits = self.classifier(self.dropout(sequence_output)) #获得 preds
        loss = None
        if 'label' in inputs:
            # 计算loss
            loss_fct = nn.BCEWithLogitsLoss(reduction="none")
            loss = loss_fct(logits.view(-1, 1), inputs['label'].view(-1, 1).float())
            loss = torch.masked_select(loss, inputs['label'].view(-1, 1) != -100).mean()
        # 返回值
        return TokenClassifierOutput(
            loss=loss, # loss
            logits=logits, # logits
        )
        
model = NBMEModel(cfg.pretrained_checkpoint).cuda() # 创建模型

In [None]:
def get_char_logits(texts, predictions, tokenizer):
    '''
    获得每个char级的预测概率值
    texts: 原始notes文本数据(会重复，notes对应多个features)
    predictions: token级预测概率值
    '''
    results = [np.zeros(len(t)) for t in texts] # 输出列表 [[0,0,0],[0,0,0,0]]
    for i, (text, prediction) in enumerate(zip(texts, predictions)):
        encoded = tokenizer(text, # note
                            add_special_tokens=True, # 加入特殊token 如[CLS]，[SEP] 
                            return_offsets_mapping=True # 将每个tokens映射回原始文本char级别的位置。
                           )
        offset_mappings = encoded['offset_mapping']
        for idx, (offset_mapping, pred) in enumerate(zip(offset_mappings, prediction)):
            start, end = offset_mapping
            results[i][start:end] = pred # char级填上logits
    return results

In [None]:
results = {}  # {id: char_logits}
for fold in range(5): 
    pl_df_fold = pl_df[pl_df['fold'] == fold].reset_index(drop=True) # 全部数据fold
    pl_dataset_fold = NBMEDatasetInfer(tokenizer, pl_df_fold) # 创建Dataset
    dataset_len = len(pl_dataset_fold) # 样本数
    maxlen = max([len(x['input_ids']) for x in pl_dataset_fold]) # 最长样本的len作为maxlen
    # 创建DataLoader
    dataloader = DataLoader(pl_dataset_fold, batch_size=cfg.batch_size, shuffle=False, 
                            collate_fn=DataCollatorForTokenClassification(tokenizer),
                            num_workers=4,
                            pin_memory=False)
    
    # if fold in [3,4]:
    #     fold = 0
    model.load_state_dict(torch.load(os.path.join(cfg.model_dir, f'{fold}.pt'))) # 载入模型权重
    model.eval() # 评估模式
    preds = []
    for b in tqdm(dataloader, total=dataset_len // cfg.batch_size + 1):
        b = {k: v.cuda() for k, v in b.items()} # batch
        pred = model(**b).logits.squeeze()  # [bs, maxlen, 1]
        pred = F.pad(input=pred, pad=(0, maxlen - pred.shape[1]), mode='constant', value=-100).cpu().numpy() # pad满maxlen，填充值-100 
        preds.append(pred) 
    preds = np.concatenate(preds, axis=0)   # 所有样本的预测值 # [n, maxlen]
    char_logits = get_char_logits(pl_df_fold['pn_history'].values, preds, tokenizer) # 获得char级的预测概率值
    results.update({k: v for k, v in zip(pl_df_fold['id'], char_logits)})
    
print(f'{len(results)} results')
# results 字典 样本id : 样本char级的预测概率值
save_pickle(results, os.path.join(cfg.model_dir, 'pl_logits.pkl')) # 保存模型