In [None]:
# 1. BERT Tokenizerを用いて単語分割・IDへ変換
## Tokenizerの準備
import numpy as np 
from torch.utils.data import TensorDataset, random_split
from torch.utils.data import DataLoader, RandomSampler, SequentialSampler
from mlflow import log_metric, log_param, log_artifact
from transformers import BertJapaneseTokenizer, BertForSequenceClassification
import pytorch_lightning as pl
tokenizer = BertJapaneseTokenizer.from_pretrained('cl-tohoku/bert-base-japanese-whole-word-masking')
import torch

max_length = 209

from transformers import BertModel

modelname = 'cl-tohoku/bert-base-japanese-whole-word-masking'

def mean_pooling(model_output, attention_mask):
    token_embeddings = model_output[0] #First element of model_output contains all token embeddings
    input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float()
    return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9)

class SentenceBERT(pl.LightningModule):

    def __init__(self, model_name, lr):
        # model_name: Transformersのモデルの名前
        # lr: 学習率

        super().__init__()

        # 例えば、self.hparams.lrでlrにアクセスできる。
        # チェックポイント作成時にも自動で保存される。
        self.save_hyperparameters() 

        # BERTのロード
        self.bert_sc1 = BertModel.from_pretrained(
            model_name
        )
        self.bert_sc1.cuda(1)

        self.bert_sc2 = BertModel.from_pretrained(
            model_name
        )
        self.bert_sc2.cuda(1)

        self.triplet_loss = torch.nn.TripletMarginWithDistanceLoss(distance_function=torch.nn.PairwiseDistance(p=2), margin=1.0)
        #self.triplet_loss = torch.nn.TripletMarginWithDistanceLoss(distance_function=torch.nn.CosineSimilarity(), margin=1.0)
        self.cos = torch.nn.CosineSimilarity(dim=1, eps=1e-6)

    # 学習データのミニバッチ(`batch`)が与えられた時に損失を出力する関数を書く。
    # batch_idxはミニバッチの番号であるが今回は使わない。
    def training_step(self, batch, batch_idx):
        output1 = mean_pooling(self.bert_sc1(attention_mask=batch['attention_mask_a'], 
                                             input_ids=batch['input_ids_a'], 
                                             token_type_ids=batch['token_type_ids_a']), 
                               batch['attention_mask_a'])
        output2 = mean_pooling(self.bert_sc2(attention_mask=batch['attention_mask_p'], 
                                             input_ids=batch['input_ids_p'], 
                                             token_type_ids=batch['token_type_ids_p']), 
                               batch['attention_mask_p'])
        output3 = mean_pooling(self.bert_sc2(attention_mask=batch['attention_mask_n'], 
                                             input_ids=batch['input_ids_n'], 
                                             token_type_ids=batch['token_type_ids_n']), 
                               batch['attention_mask_n'])
        loss = self.triplet_loss(output1,output2,output3)
        self.log('train_loss', loss) # 損失を'train_loss'の名前でログをとる。
        return loss

    # 検証データのミニバッチが与えられた時に、
    # 検証データを評価する指標を計算する関数を書く。
    def validation_step(self, batch, batch_idx):
        output1 = mean_pooling(self.bert_sc1(attention_mask=batch['attention_mask_a'], 
                                             input_ids=batch['input_ids_a'], 
                                             token_type_ids=batch['token_type_ids_a']), 
                               batch['attention_mask_a'])
        output2 = mean_pooling(self.bert_sc2(attention_mask=batch['attention_mask_p'], 
                                             input_ids=batch['input_ids_p'], 
                                             token_type_ids=batch['token_type_ids_p']), 
                               batch['attention_mask_p'])
        output3 = mean_pooling(self.bert_sc2(attention_mask=batch['attention_mask_n'], 
                                             input_ids=batch['input_ids_n'], 
                                             token_type_ids=batch['token_type_ids_n']), 
                               batch['attention_mask_n'])
        val_loss = self.triplet_loss(output1,output2,output3)
        self.log('val_loss', val_loss) # 損失を'val_loss'の名前でログをとる。


    # 学習に用いるオプティマイザを返す関数を書く。
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.hparams.lr)

# 6-20
# PyTorch Lightningモデルのロード
model = SentenceBERT.load_from_checkpoint(
    './model/epoch=4-step=6396.ckpt'
) 

model.cuda(1)

In [None]:
import json
from ast import literal_eval
import pickle

def calc_similarity(text):
    
    f = open("./sentenceVector.txt", "rb")
    vector_lst = pickle.load(f)

    with open("../../data/PoliInfo3-FormalRun-FactVerification/Pref13_tokyo.json") as f:
        lines = f.readlines()

    #utterancesummaryの文ベクトルを出す
    utterancesummary = text
    encoding_ = tokenizer(
        utterancesummary,
        max_length=209,
        truncation=True,
        return_tensors='pt'
    )
    encoding_ = {k: v.cuda(1) for k, v in encoding_.items()}
    with torch.no_grad():
        output1 = mean_pooling(model.bert_sc1(attention_mask=encoding_['attention_mask'], 
                                    input_ids=encoding_['input_ids'], 
                                    token_type_ids=encoding_['token_type_ids']), 
                    encoding_['attention_mask'])

    sim_diffs = []
    #utterancesummaryの文ベクトルと議事録の各文の文ベクトルのcos類似度を取る    
    for i in range(len(vector_lst)):
        output2 = vector_lst[i]
        cos = torch.nn.CosineSimilarity(dim=1, eps=1e-6)
        sim_diff = cos(output1, output2)
        sim_diff = sim_diff.to('cpu').detach().numpy().copy()
        sim_diffs.extend(np.array(sim_diff))
    
    #上位n件の要素を取ってくる。
    n = 7
    np_simdiffs = np.array(sim_diffs)
    np_simdiffs = np.argsort(-np_simdiffs)[:n]
    return np_simdiffs

In [None]:
json_open = open('../../data/PoliInfo3-FormalRun-FactVerification/PoliInfo3_FactVerification_Formal_Train.json', 'r')
json_load = json.load(json_open)

correct_num = 0
num = 0
#valuesで値をとってくる
for v in json_load:
    if v["DocumentEntailment"] != False:
        np_simdiffs = calc_similarity(v["UtteranceSummary"])
        startingline = v["StartingLine"]
        endingline = v["EndingLine"]
        for i in np_simdiffs:
            num += 1
            for j in range(startingline, endingline+1):
                if i+1 == j:
                    correct_num += 1
                    
accuracy = correct_num / num
print(accuracy)