### Sentence Bert 논문 핵심 요약

- Bert 모델을 활용해 Sentence Embedding을 산출하는 모델 소개

- 데이터 유형(`Categorical Data`, `Numerical Data`)에 맞게 언어 모델을 Sentence Bert로 Fine-tuning하는 방법 소개


### Siamese Network(샴 네트워크)

- Bi eoncoder는 샴 네트워크 구조를 취함. 샴 네트워크(Siamese network)란 하나의 모델로 두 개의 output을 산출하는 구조를 의미함.

- 아래 구조를 보면 Bert 모델이 두 개 사용되는 것으로 나타나지만 실제로는 하나의 Bert 모델을 활용해 개별 문장의 output을 산출하게 됨.

    <img src ='../images/SBERT_Siamese_Network.png' alt='SBERT_Siamese_Network' width ='300px'/>


### Bi encoder 구현하기

- 여느 Fine-tuning 방법과 같이 last_hidden_state를 활용해 Bi-encoder 구현함.

- Bert 모델을 거친 문장은 문장의 Token 개수 만큼의 Embedding이 존재함. 이때 여러 개의 Embedding을 하나의 Embedding으로 통합하는 과정을 Pooling이라 하는데, 여러 개의 토큰 임베딩을 하나의 Embedding으로 변환하면 Sentence Embedding이 됨.

- Pooling 방법에는 [CLS] pooling, mean pooling, max pooling이 활용 가능하나 논문에서는 mean pooling이 가장 성능이 좋아 기본 값으로 활용함. 



    <img src ='../images/SBERT_Architecture.png' alt='SBERT_Architecture' width ='150px'/>


### Sentece Bert 구조 생성


In [6]:
from transformers import (
    ElectraModel,
    ElectraTokenizerFast,
    TrainingArguments,
    TrainerCallback,
    Trainer,
)
from torch.utils.data import DataLoader
from datasets import Dataset
from torch import nn
import pandas as pd
import torch


class SentenceBert(nn.Module):
    """
    Sentence Bert 논문을 읽고 관련 Repo를 참고해 만든 모델입니다.

    Huggingface Trainer API를 쉽게 활용할 수 있도록 모델을 일부 수정했습니다.

    Parameter
    ---------
    - model : Huggingface에서 제공하는 BaseModel을 활용해야 합니다.
    - tokenizer : 모델에 맞는 토크나이저를 사용해야하며 TokenizerFast를 통해 불러와야합니다.
    - model 및 tokenizer 설정이 없는 경우 "monologg/koelectra-base-v3-discriminator" 를 기본 모델로 사용합니다.
    - pooling_type : 논문에서 제시하는 Pooling 방법인 mean pooling, max pooling, CLS pooling을 지원하며 기본 설정 값은 mean입니다.

    """

    def __init__(self, model=None, pooling_type: str = "mean") -> None:
        super().__init__()
        name = "monologg/koelectra-base-v3-discriminator"
        self.model = model if model else ElectraModel.from_pretrained(name)

        if pooling_type in ["mean", "max", "cls"] and type(pooling_type) == str:
            self.pooling_type = pooling_type
        else:
            raise ValueError("'pooling_type' only ['mean','max','cls'] possible")

    def forward(self, **kwargs):
        attention_mask = kwargs["attention_mask"]
        last_hidden_state = self.model(**kwargs)["last_hidden_state"]

        if self.pooling_type == "cls":
            """[cls] token을 sentence embedding으로 활용"""
            result = last_hidden_state[:, 0]

        if self.pooling_type == "max":
            """문장 내 여러 토큰 중 가장 값이 큰 token만 추출하여 sentence embedding으로 활용"""

            num_of_tokens = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
            last_hidden_state[num_of_tokens == 0] = -1e9
            result = torch.max(last_hidden_state, 1)[0]

        if self.pooling_type == "mean":
            """문장 내 토큰을 평균하여 sentence embedding으로 활용"""

            num_of_tokens = attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()

            sum_embeddings = torch.sum(last_hidden_state * num_of_tokens, 1)

            total_num_of_tokens = num_of_tokens.sum(1)
            total_num_of_tokens = torch.clamp(total_num_of_tokens, min=1e-9)

            result = sum_embeddings / total_num_of_tokens

        return {"sentence_embedding": result}


### Sbert 불러오기
name = "monologg/koelectra-base-v3-discriminator"
model = ElectraModel.from_pretrained(name)
tokenizer = ElectraTokenizerFast.from_pretrained(name)

sbert = SentenceBert(model=model, pooling_type="mean")

sen = "나는 어제 맥북을 샀다."

token = tokenizer(sen, return_tensors="pt")
PLM = model(**token)["last_hidden_state"]
sentence_embedding = sbert(**token)["sentence_embedding"]

print("Tokenizing 결과 \n", tokenizer.tokenize(sen))
print("")
print(f"PLM output shape => {PLM.shape} \nSbert output shape => {sentence_embedding.shape}")


Some weights of the model checkpoint at monologg/koelectra-base-v3-discriminator were not used when initializing ElectraModel: ['discriminator_predictions.dense.bias', 'discriminator_predictions.dense_prediction.bias', 'discriminator_predictions.dense.weight', 'discriminator_predictions.dense_prediction.weight']
- This IS expected if you are initializing ElectraModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing ElectraModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Tokenizing 결과 
 ['나', '##는', '어제', '맥', '##북', '##을', '샀', '##다', '.']

PLM output shape => torch.Size([1, 11, 768]) 
Sbert output shape => torch.Size([1, 768])


### 데이터 유형 별 Sentence Bert 학습 구조

- SentenceBert를 학습 시키는 방법은 학습 데이터의 유형에 따라 달라짐

- `Numerical Data`를 기반으로 Sentence Bert를 학습시키는 경우 학습 구조는 다음과 같음.

    <img src ='../images/SBERT_Siamese_Network.png' alt='SBERT_Siamese_Network' width ='300px'/>

<br/>

- `Categorical Data`를 기반으로 Sentence Bert를 학습시키는 경우 학습 구조는 다음과 같음.

    <img src ='../images/SBERT_SoftmaxLoss.png' alt='SBERT_SoftmaxLoss' width ='300px'/>


### Numerical Data 학습 시 Sbert 구조

- 논문에서는 Numerical Data 데이터로 STS 데이터를 활용함. 이 글에서는 `KorSTS` 데이터를 활용함.

- STS 데이터는 문장 2개와 문장의 유사도를 표현한 값으로 구성됨.

  ```python

  {
  'sen1': '비행기가 이륙하고 있다.',
  'sen2': '비행기가 이륙하고 있다.',
  'score': '5.000'
  }

  ```

- 학습이 완료된 이후에는 학습 구조에서 Sbert를 추출하여 활용함.

    <img src='../images/SBERT_Siamese_Network.png' alt='siamese' width='300px'>


In [7]:
class modelForRegressionTraining(nn.Module):
    def __init__(self, model, *inputs, **kwargs):
        super().__init__()

        # 학습을 수행할 모델 불러오기
        self.model = model

    def forward(self, features, answer):

        # Sentence 1, Sentence 2에 대한 Sentence Embedding 확보
        embeddings = [self.model(**input_data)["sentence_embedding"] for input_data in features]

        u, v = embeddings[0], embeddings[1]

        # Sentence 1, Sentence 2에 대한 Cosine Similarity 계산
        cos_score_transformation = nn.Identity()
        outputs = cos_score_transformation(torch.cosine_similarity(u, v))

        # label score Normalization
        answer = answer / 5  # 0 ~ 5 => 0 ~ 1

        loss_fct = nn.MSELoss()
        loss = loss_fct(outputs, answer.view(-1))

        return {"loss": loss}


### KorSTS Data 불러오기


In [1]:
with open("../data/KorSTS/sts-train.tsv") as f:
    v = f.readlines()

## from list to dataframe
lst = [i.rstrip("\n").split("\t") for i in v]

data = pd.DataFrame(lst[1:], columns=lst[:1])
data = data[["sentence1", "sentence2", "score"]]
data.columns = ["sen1", "sen2", "score"]
data.head(3)


Unnamed: 0,sen1,sen2,score
0,비행기가 이륙하고 있다.,비행기가 이륙하고 있다.,5.0
1,한 남자가 큰 플루트를 연주하고 있다.,남자가 플루트를 연주하고 있다.,3.8
2,한 남자가 피자에 치즈를 뿌려놓고 있다.,한 남자가 구운 피자에 치즈 조각을 뿌려놓고 있다.,3.8


### Huggingface Datasets으로 변환

- 🤗Transformers와 호환을 위해 Dataframe을 🤗dataset으로 변환


In [2]:
train_data_set = Dataset.from_pandas(data)

train_data_set[0]


{'sen1': '비행기가 이륙하고 있다.', 'sen2': '비행기가 이륙하고 있다.', 'score': '5.000'}

### collator 구현

- 학습 구조에 맞는 input data 생성을 위한 custom collator 제작


In [None]:
def smart_batching_collate(batch):
    text_lst1 = []
    text_lst2 = []
    labels = []

    for example in batch:
        for k, v in example.items():
            if k == "sen1":
                text_lst1.append(v)
            if k == "sen2":
                text_lst2.append(v)
            if k == "score":
                labels.append(float(v))

    labels = torch.tensor(labels)

    sentence_features = []
    for items in [text_lst1, text_lst2]:
        token = tokenizer(items, return_tensors="pt", truncation=True, padding=True)
        sentence_features.append(token)

    return dict(features=sentence_features, answer=labels)


### 🤗Transformers Trainer를 활용해 학습


In [None]:
# Trainer Option 설정
training_args = TrainingArguments(
    output_dir="test_trainer",
    per_device_train_batch_size=4,
    logging_steps=10,
    eval_steps=100,
    num_train_epochs=2,
    remove_unused_columns=False,
)

# 학습 구조 불러오기
model_for_training = modelForRegressionTraining(sbert)

# Trainer 정의
trainer = Trainer(
    model=model_for_training,
    train_dataset=train_data_set,
    args=training_args,
    data_collator=smart_batching_collate,
)

trainer.train()


### Categorical Data 학습 시 Sbert 구조

- 논문에서는 Categorical Data로 NLI 데이터를 활용함. 이 글에서는 `KorNLI` 데이터 중 `snli_1.0_train.ko`를 활용함.

- KorNLI 데이터는 문장 2개와 문장의 관계를 Label로 표현함.

```python

  {
   'sen1': '그리고 그가 말했다, "엄마, 저 왔어요."',
   'sen2': '그는 학교 버스가 그를 내려주자마자 엄마에게 전화를 걸었다.',
   'gold_label': 'neutral'
   }

```

- 학습이 완료된 이후에는 학습 구조에서 Sbert를 추출하여 활용함.

    <img src='../images/SBERT_SoftmaxLoss.png' alt='siamese' width='300px'>


In [None]:
from torch import nn


class modelForClassificationTraining(nn.Module):
    def __init__(self, model, *inputs, **kwargs):
        super().__init__()

        # 학습할 모델 불러오기
        self.model = model

        # 모델 embed_size
        sentence_embedding_dimension = self.model.model.config.hidden_size

        # concat 해야하는 vector 개수(U,V, |U-V|)
        num_vectors_concatenated = 3

        # embed_size * 3 => 3 차원으로 축소시키는 classifier
        self.classifier = nn.Linear(num_vectors_concatenated * sentence_embedding_dimension, 3)

    def forward(self, features, answer):

        # Sentence Embedding 생성
        embeddings = [self.model(**input_data)["sentence_embedding"] for input_data in features]

        u, v = embeddings

        # U,V, |U-V| vector 병합
        vectors_concat = []
        vectors_concat.append(u)
        vectors_concat.append(v)
        vectors_concat.append(torch.abs(u - v))
        features = torch.cat(vectors_concat, 1)

        # 병합한 vector 차원 축소
        outputs = self.classifier(features)

        # Loss 계산
        loss_fct = nn.CrossEntropyLoss()
        loss = loss_fct(outputs, answer.view(-1))

        return {"loss": loss}


### KorNLI Data 불러오기


In [23]:
import pandas as pd

with open("../data/KorNLI/snli_1.0_train.ko.tsv") as f:
    v = f.readlines()

## from list to dataframe
lst = [i.rstrip("\n").split("\t") for i in v]

data = pd.DataFrame(lst[1:], columns=lst[:1])
data.columns = ["sen1", "sen2", "gold_label"]
data.head(3)


Unnamed: 0,sen1,sen2,gold_label
0,말을 탄 사람이 고장난 비행기 위로 뛰어오른다.,한 사람이 경쟁을 위해 말을 훈련시키고 있다.,neutral
1,말을 탄 사람이 고장난 비행기 위로 뛰어오른다.,한 사람이 식당에서 오믈렛을 주문하고 있다.,contradiction
2,말을 탄 사람이 고장난 비행기 위로 뛰어오른다.,사람은 야외에서 말을 타고 있다.,entailment


#### gold_label Encoding


In [24]:
label2int = {"contradiction": 0, "entailment": 1, "neutral": 2}

data["gold_label"] = data["gold_label"].replace(label2int).values

data.head(3)


Unnamed: 0,sen1,sen2,gold_label
0,말을 탄 사람이 고장난 비행기 위로 뛰어오른다.,한 사람이 경쟁을 위해 말을 훈련시키고 있다.,2
1,말을 탄 사람이 고장난 비행기 위로 뛰어오른다.,한 사람이 식당에서 오믈렛을 주문하고 있다.,0
2,말을 탄 사람이 고장난 비행기 위로 뛰어오른다.,사람은 야외에서 말을 타고 있다.,1


### Huggingface Dataset으로 불러오기


In [25]:
train_data_set = Dataset.from_pandas(data)

train_data_set[0]


{'sen1': '말을 탄 사람이 고장난 비행기 위로 뛰어오른다.',
 'sen2': '한 사람이 경쟁을 위해 말을 훈련시키고 있다.',
 'gold_label': 2}

### collator 구현


In [26]:
def smart_batching_collate(batch):
    text_lst1 = []
    text_lst2 = []
    labels = []

    for example in batch:
        for k, v in example.items():
            if k == "sen1":
                text_lst1.append(v)
            if k == "sen2":
                text_lst2.append(v)
            if k == "gold_label":
                labels.append(int(v))

    labels = torch.tensor(labels)

    sentence_features = []
    for items in [text_lst1, text_lst2]:
        tokenized = tokenizer(items, return_tensors="pt", truncation=True, padding=True)
        sentence_features.append(tokenized)

    return dict(features=sentence_features, answer=labels)


### 🤗Transformers Trainer를 활용해 학습


In [None]:
# Trainer Option 설정
training_args = TrainingArguments(
    output_dir="test_trainer",
    per_device_train_batch_size=4,
    logging_steps=10,
    eval_steps=100,
    num_train_epochs=2,
    remove_unused_columns=False,
)

# 학습 구조 불러오기
model_for_training = modelForClassificationTraining(sbert)

# Trainer 정의
trainer = Trainer(
    model=model_for_training,
    train_dataset=train_data_set,
    args=training_args,
    data_collator=smart_batching_collate,
)

trainer.train()
