In [None]:
#======================================================================================================
# sentence-bert STS 데이터셋을 가지고, 훈련 및 평가 예시
# => sentence-transformers 패키지를 이용하여 구현 함.(*pip install -U sentence-transformers 설치 필요)
#
# 도큐먼트 : https://www.sbert.net/index.html
# 소스참고 : https://github.com/BM-K/KoSentenceBERT-ETRI

# pip install -U sentence-transformers
#======================================================================================================

from torch.utils.data import DataLoader
import math
from sentence_transformers import models, losses
from sentence_transformers import SentencesDataset, LoggingHandler, SentenceTransformer, util, InputExample
from sentence_transformers.evaluation import EmbeddingSimilarityEvaluator
from datetime import datetime
import sys
import os
import gzip
import csv

sys.path.append('..')
from myutils import seed_everything, GPU_info, mlogging

logger = mlogging(loggername="s-bert1", logfilename="s-bert1")
device = GPU_info()
seed_everything(111)

In [None]:
import os

# s-bert로 만들 원본 bert 경로
model_path = "../model/distilbert/distilbert-0331-TS-nli-0.1-10"

# 원본 bert를 sentencebert로 만든후 만들어진 s-bert 저장 경로
#smodel_path = 'output/training_nli_'+model_name.replace("/", "-")+'-'+datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
smodel_path = "../model/sbert/sbert-distilbert-0331-TS-nli-0.1-10"

# 평가시 cosine 유사도등 측정 결과값 파일 (similarity_evaluation_xxxx.xls) 저장될 경로
output_path = smodel_path
os.makedirs(output_path, exist_ok=True)

train_file_type = 1 #0이면 korsts.tsv 파일, 1이면 klue-stst.json 파일

if train_file_type == 0:
    train_file = '../korpora/korsts/tune_train.tsv'
    eval_file = '../korpora/korsts/tune_dev.tsv'
elif train_file_type == 1:
    train_file = '../korpora/klue-sts/klue-sts-v1.1_train.json'
    eval_file = '../korpora/klue-sts/klue-sts-v1.1_dev.json'

test_file = '../korpora/korsts/tune_test.tsv'

train_batch_size = 32
num_epochs = 100

# 모델과 tokenizer 를 불러옴
# => **사전파일(vocab.txt, *.json) 와 model 경로(config.json, pytorch_model.bin)가 같은 경로에 있어야 함.
word_embedding_model = models.Transformer(model_path, max_seq_length=128)

# word embedding_model 출력 
print(word_embedding_model)

In [None]:
# 2 bert 모델의 임베딩 풀링 정책을 설정(cls 이용, 워드임베딩 평균이용, 워드임베딩 max 이용)
# Apply mean pooling to get one fixed sized sentence vector
pooling_model = models.Pooling(word_embedding_model.get_word_embedding_dimension(),  #모델이 dimension(768)
                               pooling_mode_mean_tokens=True,  # 워드 임베딩 평균을 이용
                               pooling_mode_cls_token=False,   # cls 를 이용
                               pooling_mode_max_tokens=False)  # 워드 임베딩 값중 max 값을 이용
# pooling model 출력 
print(pooling_model)

In [None]:
# SBERT 모델 생성
model = SentenceTransformer(modules=[word_embedding_model, pooling_model])
print(model)

In [None]:
import json

# korsts 훈련 데이터 불러오기
# => [sentence1, sentence2], labels 식으로 만듬
logger.info(f"Read STS train dataset=>{train_file}")

train_samples = []
count = 0
    
if train_file_type == 0:
    with open(train_file, "rt", encoding="utf-8") as f:
        lines = f.readlines()
        for line in lines:
            text_a, text_a, score = line.split('\t')
            score = score.strip()
            score = float(score) / 5.0  #5로 나눠서 0~1 사이가 되도록 함
            
            if count < 5:
                print(f"{text_a}, {text_b}, {score}")
                
            train_samples.append(InputExample(texts= [text_a,text_a], label=score))
            count += 1
            
# klue 훈련 데이터 불러오기
elif train_file_type == 1:           
    with open(train_file, "rt", encoding="utf-8") as f:
        datas = json.load(f)
        for data in datas:
            text_a = data["sentence1"]
            text_b = data["sentence2"]
            score = data["labels"]["label"]
            score = float(score) / 5.0  #5로 나눠서 0~1 사이가 되도록 함

            if count < 5:
                print(f"{text_a}, {text_b}, {score}")

            train_samples.append(InputExample(texts= [text_a,text_b], label=score))
            count += 1
        
print(train_samples[0:3])

In [None]:
# 데이터 셋, 데이터 로더, 손실함수 정의
train_dataset = SentencesDataset(train_samples, model=model)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=train_batch_size)

train_dataset = SentencesDataset(train_samples, model)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=train_batch_size)
train_loss = losses.CosineSimilarityLoss(model=model)


In [None]:
#Read STSbenchmark dataset and use it as development set
# 평가데이터 불러오기
#korsts 파일로 두 문장간 유사도를 수치로(5.0이 만점=매우 유사) 측정함.
logger.info(f"Read STS dev dataset=>{eval_file}")
dev_samples = []
count = 0

# korSTS.tsv 파일인 경우 
if train_file_type == 0:
    with open(eval_file, 'rt', encoding='utf-8') as f:
        lines = f.readlines()
        for line in lines:
            text_a, text_b, score = line.split('\t')
            score = score.strip()
            score = float(score) / 5.0  #5로 나눠서 0~1 사이가 되도록 함
            
            if count < 5:
                print(f"{text_a}, {text_b}, {score}")
            
            dev_samples.append(InputExample(texts= [text_a,text_b], label=score))
            count += 1
            
#KLUE-STS.json 파일인 경우            
elif train_file_type == 1:
     with open(eval_file, "rt", encoding="utf-8") as f:
        datas = json.load(f)
        for data in datas:
            text_a = data["sentence1"]
            text_b = data["sentence2"]
            score = data["labels"]["label"]
            score = float(score) / 5.0  #5로 나눠서 0~1 사이가 되도록 함

            if count < 5:
                print(f"{text_a}, {text_b}, {score}")

            dev_samples.append(InputExample(texts= [text_a,text_b], label=score))
            count += 1
            
print(dev_samples[0:3])

# 2개의 bert 모델에서 구한 2개의 embedding 값들의 cosine 유사도를 구해서, 이를 실제 score와 비교해서 유사도 측정함
dev_evaluator = EmbeddingSimilarityEvaluator.from_input_examples(dev_samples, 
                                                                 batch_size=train_batch_size, 
                                                                 name='sts-dev')

In [None]:
#warmup_step은 10% 로 설정
warmup_steps = math.ceil(len(train_dataset) * num_epochs / train_batch_size * 0.1) 

# evaluation_steps은 10%로 설정
evaluation_steps = int(len(train_dataset) * num_epochs / train_batch_size * 0.2)

logger.info(f"model:{model_path}, smodel:{smodel_path}")
logger.info("*batch_size: {}, epoch:{}, train_dataset:{}, Warmup-steps: {}, evaluation_step: {}".format(train_batch_size, num_peochs, len(train_dataset), warmup_steps, evaluation_steps))

# Train the model
model.fit(train_objectives=[(train_dataloader, train_loss)],
          evaluator=dev_evaluator,
          epochs=num_epochs,
          evaluation_steps=evaluation_steps,
          warmup_steps=warmup_steps,
          output_path=smodel_path
          )


In [None]:
##############################################################################
#
# Load the stored model and evaluate its performance on STS benchmark dataset
# => 훈련되어서 저장된 s-bert 모델을 불러와서 성능 평가 해봄
##############################################################################
import time

test_samples = []
with open(test_file, 'rt', encoding='utf-8') as fIn:
    lines = fIn.readlines()
    for line in lines:
        s1, s2, score = line.split('\t')
        score = score.strip()
        score = float(score) / 5.0
        test_samples.append(InputExample(texts=[s1,s2], label=score))

logger.info("\n")
logger.info("======================TEST===================")
logger.info("\n\n")
logger.info(f"model save path > {smodel_path}")
start = time.time()
model = SentenceTransformer(smodel_path)

test_evaluator = EmbeddingSimilarityEvaluator.from_input_examples(test_samples, batch_size=train_batch_size, name='sts-test', show_progress_bar=True)
result = test_evaluator(model, output_path=output_path)

logger.info(f"\n")
logger.info(f"model path: {smodel_path}")
logger.info(f'=== result: {result} ===')
logger.info(f'=== 처리시간: {time.time() - start:.3f} 초 ===')