논문 요약

- Bert로 Sentence Embedding을 만드는 방법을 소개

- 학습 데이터 유형(Classification, Regression) 별 Sentence Embedding를 생성하는 구조 소개


### 학습 데이터 유형에 맞는 Sentence Embedding 구조 소개


##### Transformers로 모델 불러오기


In [3]:
from transformers import ElectraModel, ElectraTokenizer

model = ElectraModel.from_pretrained("monologg/koelectra-base-v3-discriminator")
tokenizer = ElectraTokenizer.from_pretrained("monologg/koelectra-base-v3-discriminator")


Some weights of the model checkpoint at monologg/koelectra-base-v3-discriminator were not used when initializing ElectraModel: ['discriminator_predictions.dense_prediction.bias', 'discriminator_predictions.dense.bias', 'discriminator_predictions.dense.weight', 'discriminator_predictions.dense_prediction.weight']
- This IS expected if you are initializing ElectraModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing ElectraModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


### Pooling Model 만들기

<img src ='../img/SBERT_Architecture.png' alt='SBERT_Architecture' />


In [4]:
import torch.nn as nn
import torch
from torch.utils.data import DataLoader


class modelWithPooling(nn.Module):
    def __init__(self, model, pooling_type="mean") -> None:
        super().__init__()

        self.model = model  # base model ex)BertModel, ElectraModel ...
        self.pooling_type = pooling_type  # pooling type 선정
        self.tokenizer = None

    def forward(self, **kwargs):
        features = self.model(**kwargs)
        # [batch_size, src_token, embed_size]
        attention_mask = kwargs["attention_mask"]

        last_hidden_state = features["last_hidden_state"]

        if self.pooling_type == "cls":
            """
            [cls] 부분만 추출
            """

            cls_token = last_hidden_state[:, 0]  # [batch_size, embed_size]
            result = cls_token

        if self.pooling_type == "max":
            """
            문장 내 토큰 중 가장 값이 큰 token만 추출
            """

            input_mask_expanded = (
                attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
            )
            # Set padding tokens to large negative value
            last_hidden_state[input_mask_expanded == 0] = -1e9
            max_over_time = torch.max(last_hidden_state, 1)[0]
            result = max_over_time

        if self.pooling_type == "mean":
            """
            문장 내 토큰을 합한 뒤 평균
            """
            # padding 부분 찾기 = [batch_size, src_token, embed_size]
            input_mask_expanded = (
                attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
            )
            # padding인 경우 0 아닌 경우 1곱한 뒤 총합 = [batch_size, embed_size]
            sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, 1)

            # 평균 내기위한 token 개수
            sum_mask = input_mask_expanded.sum(1)
            sum_mask = torch.clamp(sum_mask, min=1e-9)

            result = sum_embeddings / sum_mask

        #  input.shape : [batch_size, src_token, embed_size] => output.shape : [batch_size, embed_size]
        return {"sentence_embedding": result}


#### Regeression 데이터에 맞는 Sentence Embedding 구조

> 모델을 STS로 Finetuning 하는 방법이 아님, STS 데이터를 어떻게 Sentence Embedding에 넣을 것인지에 대한 내용임

- STS Task는 문장의 유사도(0~5) 범위를 output으로 산출함.

- 이러한 데이터를 기반으로 학습하기 위해선 아래의 구조가 필요


### Model for Regression Training

<img src='../img/SBERT_Siamese_Network.png' alt='siamese' width='300px'>


In [5]:
from torch import nn


class modelForRegressionTraining(nn.Module):
    def __init__(self, model, *inputs, **kwargs):
        super().__init__()

        # 학습을 수행할 모델 불러오기
        self.model = modelWithPooling(model)

    def forward(self, features, answer):

        # Sentence 1, Sentence 2에 대한 Embedding
        embeddings = [self.model(**input_data)["sentence_embedding"] for input_data in features]

        # Sentence 1, Sentence 2에 대한 Cosine Similarity 계산
        cos_score_transformation = nn.Identity()
        outputs = cos_score_transformation(torch.cosine_similarity(embeddings[0], embeddings[1]))

        # label score Normalization
        answer = answer / 5  # 0 ~ 5 => 0 ~ 1

        loss_fct = nn.MSELoss()
        loss = loss_fct(outputs, answer.view(-1))

        return {"loss": loss}





### Regression 유형 Data 불러오기(KorSTS)


In [1]:
import pandas as pd

with open("../data/KorSTS/sts-train.tsv") as f:
    v = f.readlines()

## from list to dataframe
lst = [i.rstrip("\n").split("\t") for i in v]

data = pd.DataFrame(lst[1:], columns=lst[:1])
data = data[["sentence1", "sentence2", "score"]]
data.columns = ["sen1", "sen2", "score"]
data.head(3)


Unnamed: 0,sen1,sen2,score
0,비행기가 이륙하고 있다.,비행기가 이륙하고 있다.,5.0
1,한 남자가 큰 플루트를 연주하고 있다.,남자가 플루트를 연주하고 있다.,3.8
2,한 남자가 피자에 치즈를 뿌려놓고 있다.,한 남자가 구운 피자에 치즈 조각을 뿌려놓고 있다.,3.8


### Huggingface Datasets으로 불러오기


In [2]:
from datasets import Dataset

train_data_set = Dataset.from_pandas(data)

train_data_set[0]


{'sen1': '비행기가 이륙하고 있다.', 'sen2': '비행기가 이륙하고 있다.', 'score': '5.000'}

### collator 구현


In [None]:
from torch.utils.data import DataLoader


def smart_batching_collate(batch):
    text_lst1 = []
    text_lst2 = []
    labels = []

    for example in batch:
        for k, v in example.items():
            if k == "sen1":
                text_lst1.append(v)
            if k == "sen2":
                text_lst2.append(v)
            if k == "score":
                labels.append(float(v))

    labels = torch.tensor(labels)

    sentence_features = []
    for items in [text_lst1, text_lst2]:
        tokenized = tokenizer(items, return_tensors="pt", truncation=True, padding=True)
        sentence_features.append(tokenized)

    return dict(features=sentence_features, answer=labels)


### Custom Trainer 생성하기


In [None]:
from transformers import (
    TrainingArguments,
    TrainerCallback,
    Trainer,
    DataCollatorForLanguageModeling,
)
import torch.nn as nn


training_args = TrainingArguments(
    output_dir="test_trainer",
    per_device_train_batch_size=4,
    logging_steps=10,
    eval_steps=100,
    num_train_epochs=2,
    remove_unused_columns=False,
)

trainer = Trainer(
    model=model_for_training,
    train_dataset=train_data_set,
    args=training_args,
    data_collator=smart_batching_collate,
)

trainer.train()


### Classification 데이터에 적합한 학습 구조


<img src='../img/SBERT_SoftmaxLoss.png' alt='siamese' width='300px'>


### Model for Classification Training


In [None]:
from torch import nn


class modelForClassificationTraining(nn.Module):
    def __init__(self, model, *inputs, **kwargs):
        super().__init__()

        # 학습할 모델 불러오기
        self.model = modelWithPooling(model)

        # 모델 embed_size
        sentence_embedding_dimension = self.model.model.config.hidden_size

        # concat 해야하는 vector 개수(U,V, |U-V|)
        num_vectors_concatenated = 3

        # embed_size * 3 => 3 차원으로 축소시키는 classifier
        self.classifier = nn.Linear(num_vectors_concatenated * sentence_embedding_dimension, 3)

    def forward(self, features, answer):

        """
        샴 네트워크는 하나의 모델로 두 개의 output을 산출하는 구조임.
        하나의 모델을 사용하지만 각각 출력하므로 Input 데이터 상호 간 영향을 줄 수 없게 됨
        반면 Cross encoder는 이와 반대로 두 개의 문장을 묶어 하나의 Input 데이터로 만든 뒤
        모델 내부에서 상호간 유사성을 파악하는 구조임.
        """

        # 개별 데이터 생성
        embeddings = [self.model(**input_data)["sentence_embedding"] for input_data in features]

        rep_a, rep_b = embeddings

        # U,V, |U-V| vector 병합
        vectors_concat = []
        vectors_concat.append(rep_a)
        vectors_concat.append(rep_b)
        vectors_concat.append(torch.abs(rep_a - rep_b))

        features = torch.cat(vectors_concat, 1)

        # 병합한 vector 차원 축소
        outputs = self.classifier(features)

        # Loss 계산
        loss_fct = nn.CrossEntropyLoss()
        loss = loss_fct(outputs, answer.view(-1))

        return {"loss": loss}


model_for_training = modelForClassificationTraining(model=model)


### Classification 유형 Data 불러오기(KorNLI)


In [None]:
import pandas as pd

with open("data/KorNLI/snli_1.0_train.ko.tsv") as f:
    v = f.readlines()

## from list to dataframe
lst = [i.rstrip("\n").split("\t") for i in v]

data = pd.DataFrame(lst[1:], columns=lst[:1])
data.columns = ["sen1", "sen2", "gold_label"]
data.head(3)


#### gold_label Encoding


In [None]:
label2int = {"contradiction": 0, "entailment": 1, "neutral": 2}

data["gold_label"] = data["gold_label"].replace(label2int).values

data.head(3)


### Huggingface Dataset으로 불러오기


In [None]:
from datasets import Dataset

train_data_set = Dataset.from_pandas(data)

train_data_set[0]


### collator 구현


In [None]:
from torch.utils.data import DataLoader


def smart_batching_collate(batch):
    text_lst1 = []
    text_lst2 = []
    labels = []

    for example in batch:
        for k, v in example.items():
            if k == "sen1":
                text_lst1.append(v)
            if k == "sen2":
                text_lst2.append(v)
            if k == "gold_label":
                labels.append(int(v))

    labels = torch.tensor(labels)

    sentence_features = []
    for items in [text_lst1, text_lst2]:
        tokenized = tokenizer(items, return_tensors="pt", truncation=True, padding=True)
        sentence_features.append(tokenized)

    return dict(features=sentence_features, answer=labels)


### Custom Trainer 생성하기


In [None]:
from transformers import TrainingArguments, Trainer

training_args = TrainingArguments(
    output_dir="test_trainer",
    per_device_train_batch_size=4,
    logging_steps=10,
    eval_steps=100,
    num_train_epochs=2,
    remove_unused_columns=False,
)

trainer = Trainer(
    model=model_for_training,
    train_dataset=train_data_set,
    args=training_args,
    data_collator=smart_batching_collate,
)

trainer.train()


### Pooling Model 내 Encode 구현하기


In [None]:
import torch.nn as nn
import torch
from torch.utils.data import DataLoader


class modelWithPooling(nn.Module):
    def __init__(self, model, pooling_type="mean") -> None:
        super().__init__()

        self.model = model  # base model ex)BertModel, ElectraModel ...
        self.pooling_type = pooling_type  # pooling type 선정
        self.tokenizer = None

    #### Encoder 구현
    def encode(self, items: list, tokenizer=None, batch_size: int = 16):

        if tokenizer is not None:
            self.tokenizer = tokenizer

        if self.tokenizer is None:
            from transformers import AutoTokenizer

            print(f'Loading Tokenizer : "{self.model.config._name_or_path}"')
            self.tokenizer = AutoTokenizer.from_pretrained(
                self.model.config._name_or_path
            )

        def default_collater(items):
            token = self.tokenizer(
                items, padding=True, truncation=True, return_tensors="pt"
            )
            return {"sen": items, "token": token}

        data_loader = DataLoader(
            dataset=items, batch_size=batch_size, collate_fn=default_collater
        )

        output_lst = []
        sen_lst = []
        for data in data_loader:
            sen = data.pop("sen")
            sen_lst += sen
            token = data.pop("token")
            outputs = self.forward(**token)["sentence_embedding"]
            output_lst.append(outputs)

        return {"sen": sen_lst, "sentence_embedding": torch.cat(output_lst)}

    def forward(self, **kwargs):
        features = self.model(**kwargs)
        # [batch_size, src_token, embed_size]
        attention_mask = kwargs["attention_mask"]

        last_hidden_state = features["last_hidden_state"]

        if self.pooling_type == "cls":
            """
            [cls] 부분만 추출
            """

            cls_token = last_hidden_state[:, 0]  # [batch_size, embed_size]
            result = cls_token

        if self.pooling_type == "max":
            """
            문장 내 토큰 중 가장 값이 큰 token만 추출
            """

            input_mask_expanded = (
                attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
            )
            # Set padding tokens to large negative value
            last_hidden_state[input_mask_expanded == 0] = -1e9
            max_over_time = torch.max(last_hidden_state, 1)[0]
            result = max_over_time

        if self.pooling_type == "mean":
            """
            문장 내 토큰을 합한 뒤 평균
            """
            # padding 부분 찾기 = [batch_size, src_token, embed_size]
            input_mask_expanded = (
                attention_mask.unsqueeze(-1).expand(last_hidden_state.size()).float()
            )
            # padding인 경우 0 아닌 경우 1곱한 뒤 총합 = [batch_size, embed_size]
            sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded, 1)

            # 평균 내기위한 token 개수
            sum_mask = input_mask_expanded.sum(1)
            sum_mask = torch.clamp(sum_mask, min=1e-9)

            result = sum_embeddings / sum_mask

        #  input.shape : [batch_size, src_token, embed_size] => output.shape : [batch_size, embed_size]
        return {"sentence_embedding": result}
