In [None]:
# Copyright 2023, Acadential, All rights reserved.

# 16-11. PyTorch로 구현된 BERT 모델 뜯어보기
이번 시간에는 BERT 모델이 어떻게 구현되어 있는지 심도있게 파헤쳐 보는 시간을 가져보겠습니다.

In [1]:
# Install transformers
!pip install transformers



In [6]:
from transformers.models.bert import (BertModel,
                                      BertConfig,
                                      BertForSequenceClassification,
                                      BertTokenizer)
from transformers.models.bert.modeling_bert import BertEmbeddings

In [7]:
config = BertConfig()

In [8]:
BertEmbeddings(config)

BertEmbeddings(
  (word_embeddings): Embedding(30522, 768, padding_idx=0)
  (position_embeddings): Embedding(512, 768)
  (token_type_embeddings): Embedding(2, 768)
  (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
  (dropout): Dropout(p=0.1, inplace=False)
)

# BertForSequenceClassification


In [9]:

# transformers.models.bert.BertForSequenceClassification을 한번 뜯어보자 
# 아래 코드는 transformers library에서 구현되어 있는 일부 코드를 발췌한 것임. 

import torch 
from torch import nn 
from typing import List, Optional, Tuple, Union
from transformers.models.bert import BertPreTrainedModel, BertModel
from transformers.modeling_outputs import SequenceClassifierOutput
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss


class BertForSequenceClassification(BertPreTrainedModel):
    # Init 함수에서는 BertConfig을 입력받습니다.
    # BertConfig에서는 Bert model의 구조에 대한 hyperparameter들이 정의되어 있습니다.
    # 예: Layer 개수, Hidden dim 크기, Attention head 개수, Dropout rate 등등.
    def __init__(self, config):
        super().__init__(config) 
        self.num_labels = config.num_labels
        self.config = config

        # BertModel은 Classification Layer가 필요로하는 hidden state를 출력하는 모델입니다.
        # Task에 상관없이 공통적으로 사용되는 Backbone architecture입니다.
        # 일반적으로 MLM Pre-trained된 모델의 BertModel을 가져와서 이것으로 weight initialize합니다.
        self.bert = BertModel(config)
        
        classifier_dropout = (
            config.classifier_dropout if config.classifier_dropout is not None else config.hidden_dropout_prob
        )
        self.dropout = nn.Dropout(classifier_dropout)
        
        # Task specific한 classifier layer
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)

        # Initialize weights and apply final processing
        self.post_init()
    
    # Forward pass에서 주의깊게 봐야할 인자들은 input_ids, attention_masks, token_type_ids, labels입니다.
    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], SequenceClassifierOutput]:
        
        # input_ids: [batch_size, sequence_length] 은 문장을 token id로 변환한 것입니다. 
        # 저희가 RNN 실습하는 과정에서 tokenize된 문장들이 input_ids로 변환된 것을 이 인자로 입력해 줍니다.
        
        # attention_mask: [batch_size, sequence_length] 은 padding token을 masking하기 위한 인자입니다.
        # attention_mask은 padding token이 아닌 부분은 1, padding token은 0으로 구성됩니다.
        
        # token_type_ids: [batch_size, sequence_length] 은 문장의 앞뒤를 구분하기 위한 인자입니다.
        # token_type_ids는 문장의 앞뒤를 구분하기 위해 앞문장은 0, 뒷문장은 1로 구성됩니다.
        
        # labels: [batch_size, num_labels] 은 각 문장에 대한 label입니다.
        
        # 위 값들은 dataset이 tokenizer로 전처리 된 후에 field 값으로 저장되어 있습니다.
        
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        
        # BertModel의 forward pass를 통해 hidden state (output)를 얻습니다.
        # outputs: Tuple[torch.FloatTensor] = (last_hidden_state, pooler_output)
        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            position_ids=position_ids,
            head_mask=head_mask,
            inputs_embeds=inputs_embeds,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        # BERT의 backbone architecture가 어떻게 작동하는지 아래 셀에서 더 자세하게 살펴보겠습니다.

        pooled_output = outputs[1]

        pooled_output = self.dropout(pooled_output)
        logits = self.classifier(pooled_output)

        loss = None
        if labels is not None:
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)
        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

## BERT 모델 (Backbone architecture)

```BertModel```가 구현된 코드는 Huggingface의 transformers.models.bert.modeling_bert에서 확인할 수 있습니다. \
아래 코드는 ```BertModel```의 구현 코드 중 일부입니다.

BertModel을 구성하는 요소들은 다음과 같습니다:
1. ```BertEmbeddings```: Token Embedding과 Positional Embedding
2. ```BertEncoder```: Encoder Layer를 여러번 쌓은 형태
3. ```BertPooler```: Encoder Layer의 마지막 hidden state를 이용해 문장의 특징을 추출




In [10]:
from transformers.models.bert.modeling_bert import BertEmbeddings, BertEncoder, BertPooler, BaseModelOutputWithPoolingAndCrossAttentions

class BertModel(BertPreTrainedModel):

    def __init__(self, config, add_pooling_layer=True):
        super().__init__(config)
        self.config = config

        # Embedding
        self.embeddings = BertEmbeddings(config)
        
        # Encoder
        self.encoder = BertEncoder(config)

        self.pooler = BertPooler(config) if add_pooling_layer else None

        # Initialize weights and apply final processing
        self.post_init()

    def forward(
        self,
        input_ids: Optional[torch.Tensor] = None,
        attention_mask: Optional[torch.Tensor] = None,
        token_type_ids: Optional[torch.Tensor] = None,
        position_ids: Optional[torch.Tensor] = None,
        head_mask: Optional[torch.Tensor] = None,
        inputs_embeds: Optional[torch.Tensor] = None,
        encoder_hidden_states: Optional[torch.Tensor] = None,
        encoder_attention_mask: Optional[torch.Tensor] = None,
        past_key_values: Optional[List[torch.FloatTensor]] = None,
        use_cache: Optional[bool] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], BaseModelOutputWithPoolingAndCrossAttentions]:
        
        # 아래 method에서 
        # attention_mask가 1인 경우 (padding이 아닌 경우),
        # extended_attention_mask은 0으로 채워집니다.
        # attention_mask가 0인 경우 (padding인 경우),
        # extended_attention_mask은 torch.finfo(dtype).min (즉 -inf)으로 채워집니다.
        extended_attention_mask: torch.Tensor = self.get_extended_attention_mask(
            attention_mask,
            input_shape
        )
        
        # Embedding
        embedding_output = self.embeddings(
            input_ids=input_ids,
            position_ids=position_ids,
            token_type_ids=token_type_ids,
            inputs_embeds=inputs_embeds,
        )
        # BertEmbedding을 통해서 input_ids를 embedding_output으로 변환해줍니다.
        
        # Encoder
        encoder_outputs = self.encoder(
            embedding_output,
            attention_mask=extended_attention_mask,
            head_mask=head_mask,
            encoder_hidden_states=encoder_hidden_states,
            past_key_values=past_key_values,
            use_cache=use_cache,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )
        # BertEncoder를 통해서 embedding_output을 encoder_outputs으로 변환해줍니다.
        # BertEncoder가 Classification Layer에서 필요로하는 feature을 추출하는 Backbone Architecture입니다.
        
        sequence_output = encoder_outputs[0]
        pooled_output = self.pooler(sequence_output) if self.pooler is not None else None
        # BertEncoder은 각 token마다 hidden state를 반환합니다.
        # 따라서 문장을 대표하는 feature을 추출하기 위해서 Pooling Layer을 사용합니다.

        if not return_dict:
            return (sequence_output, pooled_output) + encoder_outputs[1:]

        return BaseModelOutputWithPoolingAndCrossAttentions(
            last_hidden_state=sequence_output,
            pooler_output=pooled_output,
            past_key_values=encoder_outputs.past_key_values,
            hidden_states=encoder_outputs.hidden_states,
            attentions=encoder_outputs.attentions,
            cross_attentions=encoder_outputs.cross_attentions,
        )

## BertEmbedding

Bert Model을 구성하는 3가지 요소 중 하나인 Bert Embedding에 대해 알아보겠습니다.

Bert Embedding은 총 3가지로 구성되어 있습니다:
1. ```Word Embedding```: 각 단어 (token)을 벡터로 변환
2. ```Positional Embedding```: 토큰의 위치 정보를 벡터로 변환
3. ```Token Type Embedding```: 토큰의 종류 정보를 벡터로 변환. 토큰의 종류는 총 2가지가 있습니다.
    - ```START```, ```END```와 같은 special token
    - 일반적인 토큰

일반적인 경우에 Word Embedding, Positional Embedding, Token Type Embedding들을 다 더한 것이 Bert Embedding이 됩니다.

In [11]:
class BertEmbeddings(nn.Module):
    """Construct the embeddings from word, position and token_type embeddings."""

    def __init__(self, config):
        super().__init__()
        
        # Word Embedding
        self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=config.pad_token_id)
        
        # Position Embedding
        self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
        
        # Token Type Embedding
        self.token_type_embeddings = nn.Embedding(config.type_vocab_size, config.hidden_size)

        self.LayerNorm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.position_embedding_type = getattr(config, "position_embedding_type", "absolute")

    def forward(
        self,
        input_ids: Optional[torch.LongTensor] = None,
        token_type_ids: Optional[torch.LongTensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
    ) -> torch.Tensor:

        # Word Embedding 계산
        inputs_embeds = self.word_embeddings(input_ids)
        
        # Token Type Embedding 계산
        token_type_embeddings = self.token_type_embeddings(token_type_ids)
        
        # Word Embedding + Token Type Embedding
        embeddings = inputs_embeds + token_type_embeddings
        
        # Position Embedding 계산
        position_embeddings = self.position_embeddings(position_ids)
        
        # Word Embedding + Token Type Embedding + Position Embedding
        embeddings += position_embeddings
        
        embeddings = self.LayerNorm(embeddings)
        embeddings = self.dropout(embeddings)
        
        return embeddings

## Bert Encoder



In [5]:
from transformers.models.bert.modeling_bert import BaseModelOutputWithPastAndCrossAttentions, BertLayer

class BertEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.config = config
        
        # 여러 개의 Bert Layer로 구성되어 있습니다.
        self.layer = nn.ModuleList([BertLayer(config) for _ in range(config.num_hidden_layers)])

    # BertEncoder는 BertEmbedding에서 출력된 embedding_output을 입력받습니다.
    # 즉, hidden_states는 embedding_output입니다.
    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.FloatTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        encoder_hidden_states: Optional[torch.FloatTensor] = None,
        encoder_attention_mask: Optional[torch.FloatTensor] = None,
        past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
    ) -> Union[Tuple[torch.Tensor], BaseModelOutputWithPastAndCrossAttentions]:
        
        # BertEncoder는 BertLayer을 차례대로 통과시킵니다.
        for i, layer_module in enumerate(self.layer):

            layer_head_mask = head_mask[i] if head_mask is not None else None
            past_key_value = past_key_values[i] if past_key_values is not None else None

            # BertLayer에 대한 forward pass를 실행합니다.
            layer_outputs = layer_module(
                hidden_states,
                attention_mask,
                layer_head_mask,
                encoder_hidden_states,
                encoder_attention_mask,
                past_key_value,
            )

            hidden_states = layer_outputs[0]  # 각 layer의 출력값을 hidden_states로 정의하여 다음 Layer의 입력값으로 사용합니다.
        
        # 최종 BertLayer의 hidden_states을 출력하고,
        # BertModel에서는 BertPooler에 전달합니다.
        return BaseModelOutputWithPastAndCrossAttentions(
            last_hidden_state=hidden_states,
        )

## BertPooler

BertModel의 구성요소들 중 하나입니다. \
BertEncoder에서 출력한 ```hidden_states```를 입력으로 받아서, 대표값을 pooling하는 역할을 합니다. \
Pooling하는 방법은 첫번째 Token에 해당되는 ```hidden_states[:, 0]```를 FC Layer에 통과시켜서 얻습니다.

In [12]:
class BertPooler(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.activation = nn.Tanh()

    def forward(self, hidden_states: torch.Tensor) -> torch.Tensor:
        # We "pool" the model by simply taking the hidden state corresponding
        # to the first token.
        first_token_tensor = hidden_states[:, 0]
        pooled_output = self.dense(first_token_tensor)
        pooled_output = self.activation(pooled_output)
        return pooled_output


## BertLayer

Bert Encoder을 구성하는 각 레이어를 구현한 클래스입니다.


In [6]:
from transformers.models.bert.modeling_bert import BertAttention, BertIntermediate, BertOutput
from transformers.pytorch_utils import apply_chunking_to_forward

class BertLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.chunk_size_feed_forward = config.chunk_size_feed_forward
        self.seq_len_dim = 1
        
        # Self-Attention이 구현되어 있는 부분입니다.
        self.attention = BertAttention(config)
        
        self.intermediate = BertIntermediate(config)
        self.output = BertOutput(config)

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.FloatTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
        output_attentions: Optional[bool] = False,
    ) -> Tuple[torch.Tensor]:
        
        self_attn_past_key_value = past_key_value[:2] if past_key_value is not None else None
        
        # Self-Attention이 구현되어 있는 부분입니다.
        self_attention_outputs = self.attention(
            hidden_states,
            attention_mask,
            head_mask,
            output_attentions=output_attentions,
            past_key_value=self_attn_past_key_value,
        )
        attention_output = self_attention_outputs[0]
        outputs = self_attention_outputs[1:]  # add self attentions if we output attention weights

        layer_output = apply_chunking_to_forward(
            self.feed_forward_chunk, self.chunk_size_feed_forward, self.seq_len_dim, attention_output
        )
        outputs = (layer_output,) + outputs

        return outputs

    def feed_forward_chunk(self, attention_output):
        intermediate_output = self.intermediate(attention_output)
        layer_output = self.output(intermediate_output, attention_output)
        return layer_output

## BertAttention

In [7]:
from transformers.models.bert.modeling_bert import BertSelfAttention, BertSelfOutput
from transformers.modeling_utils import prune_linear_layer, find_pruneable_heads_and_indices


class BertAttention(nn.Module):
    def __init__(self, config, position_embedding_type=None):
        super().__init__()
        
        # Self-Attention을 위한 Layer입니다.
        self.self = BertSelfAttention(config, position_embedding_type=position_embedding_type)
        
        # Residual connection, FC Layer, Dropout, LayerNorm으로 구성되어 있습니다.
        self.output = BertSelfOutput(config)

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.FloatTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        encoder_hidden_states: Optional[torch.FloatTensor] = None,
        encoder_attention_mask: Optional[torch.FloatTensor] = None,
        past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
        output_attentions: Optional[bool] = False,
    ) -> Tuple[torch.Tensor]:
        
        # SelfAttention이 수행되는 부분입니다.
        self_outputs = self.self(
            hidden_states,
            attention_mask,
            head_mask,
            encoder_hidden_states,
            encoder_attention_mask,
            past_key_value,
            output_attentions,
        )  # 아래 Cell을 통해서 Self-attention이 어떻게 구현되어 있는지 살펴보도록 하겠습니다.
        
        
        attention_output = self.output(self_outputs[0], hidden_states)
        outputs = (attention_output,) + self_outputs[1:]  # add attentions if we output them
        return outputs

## BertSelfAttention



In [8]:
import math 

class BertSelfAttention(nn.Module):
    def __init__(self, config, position_embedding_type=None):
        super().__init__()

        self.num_attention_heads = config.num_attention_heads
        self.attention_head_size = int(config.hidden_size / config.num_attention_heads)
        self.all_head_size = self.num_attention_heads * self.attention_head_size

        self.query = nn.Linear(config.hidden_size, self.all_head_size)
        self.key = nn.Linear(config.hidden_size, self.all_head_size)
        self.value = nn.Linear(config.hidden_size, self.all_head_size)

        self.dropout = nn.Dropout(config.attention_probs_dropout_prob)
        self.position_embedding_type = position_embedding_type or getattr(
            config, "position_embedding_type", "absolute"
        )
        if self.position_embedding_type == "relative_key" or self.position_embedding_type == "relative_key_query":
            self.max_position_embeddings = config.max_position_embeddings
            self.distance_embedding = nn.Embedding(2 * config.max_position_embeddings - 1, self.attention_head_size)

    def transpose_for_scores(self, x: torch.Tensor) -> torch.Tensor:
        # x의 shape는 (batch_size, seq_len, hidden_size)이다.
        # transpose_for_scores은 x를 (batch_size, seq_len, num_attention_heads, attention_head_size)로 바꾸어준다.
        new_x_shape = x.size()[:-1] + (self.num_attention_heads, self.attention_head_size)
        x = x.view(new_x_shape)
        return x.permute(0, 2, 1, 3)

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.FloatTensor] = None,
        past_key_value: Optional[Tuple[Tuple[torch.FloatTensor]]] = None,
        output_attentions: Optional[bool] = False,
    ) -> Tuple[torch.Tensor]:
        mixed_query_layer = self.query(hidden_states)
        
        # transpose_for_scores은 각각의 Key, Query, Value 벡터들을 attention_head 개수로 나누어서 Multi-head로 만들어주는 역할을 한다.
        key_layer = self.transpose_for_scores(self.key(hidden_states))  # (batch_size, seq_len, num_attention_heads, attention_head_size)
        value_layer = self.transpose_for_scores(self.value(hidden_states))  # (batch_size, seq_len, num_attention_heads, attention_head_size)

        query_layer = self.transpose_for_scores(mixed_query_layer)

        # Take the dot product between "query" and "key" to get the raw attention scores.
        # query_layer shape: (batch_size, num_attention_heads, seq_len, attention_head_size)
        # key_layer.transpose(...) shape: (batch_size, num_attention_heads, attention_head_size, seq_len)
        # attention_scores shape: (batch_size, num_attention_heads, seq_len, seq_len)
        # 각 batch, head에 대해서 독립적으로 행렬곱을 수행하는 셈이다.
        attention_scores = torch.matmul(query_layer, key_layer.transpose(-1, -2))

        # Scaling
        attention_scores = attention_scores / math.sqrt(self.attention_head_size)
        
        # Padding에 해당되는 token들의 attention_mask은 -inf로 만들어주었다.
        # 따라서 softmax를 통해 attention_probs를 구할 때, padding에 해당되는 token들은 0에 가까운 값이 나오게 된다.
        if attention_mask is not None:
            # Apply the attention mask is (precomputed for all layers in BertModel forward() function)
            attention_scores = attention_scores + attention_mask

        # Normalize the attention scores to probabilities.
        attention_probs = nn.functional.softmax(attention_scores, dim=-1)

        # This is actually dropping out entire tokens to attend to, which might
        # seem a bit unusual, but is taken from the original Transformer paper.
        attention_probs = self.dropout(attention_probs)
        
        # Attention weight (probability)를 value_layer에 곱해준다.
        context_layer = torch.matmul(attention_probs, value_layer)

        context_layer = context_layer.permute(0, 2, 1, 3).contiguous()
        new_context_layer_shape = context_layer.size()[:-2] + (self.all_head_size,)
        context_layer = context_layer.view(new_context_layer_shape)

        outputs = (context_layer, attention_probs) if output_attentions else (context_layer,)

        return outputs