# BARTModel 분석

In [1]:
import math
import random
import warnings
from typing import Any, Dict, Tuple, Union, Optional, List

import numpy as np
from overrides import overrides

import torch
import torch.nn as nn
import torch.utils.checkpoint
import torch.nn.functional as F

import transformers
from transformers import BartConfig, BartTokenizer, BartModel
from transformers.models.bart.modeling_bart import BartEncoder, BartDecoder
from transformers.utils import logging
from transformers.modeling_utils import PreTrainedModel

In [2]:
print(f"torch.__version__ == {torch.__version__}")
print(transformers.__version__)
print(torch.cuda.is_available())

torch.__version__ == 1.7.1+cu110
4.2.1
True


In [4]:
config = BartConfig()
bart = BartModel(config)

## BartModel.\_\_init\_\_
- BartPretrainedModel을 상속받음

In [11]:
bart.config.pad_token_id, bart.config.vocab_size

(1, 50265)

In [13]:
bart.shared # torch.nn.Embedding

Embedding(50265, 1024, padding_idx=1)

- `BartPretrainedModel`의 init_weight 메서드 실시, 뜯어보자
- `nn.Linear`, `nn.Embedding`의 경우 config의 std로 초기값 조정

In [16]:
class BartPretrainedModel(PreTrainedModel):
    config_class = BartConfig
    base_model_prefix = "model"

    def _init_weights(self, module):
        std = self.config.init_std
        if isinstance(module, nn.Linear):
            module.weight.data.normal_(mean=0.0, std=std)
            if module.bias is not None:
                module.bias.data.zero_()
        elif isinstance(module, BartSinusoidalPositionalEmbedding):
            pass
        elif isinstance(module, nn.Embedding):
            module.weight.data.normal_(mean=0.0, std=std)
            if module.padding_idx is not None:
                module.weight.data[module.padding_idx].zero_()

    @property
    def dummy_inputs(self):
        pad_token = self.config.pad_token_id
        input_ids = torch.tensor([[0, 6, 10, 4, 2], [0, 8, 12, 2, pad_token]], device=self.device)
        dummy_inputs = {
            "attention_mask": input_ids.ne(pad_token),
            "input_ids": input_ids,
        }
        return dummy_inputs

>### BartEncoder
>- `BartPretrainedModel` 객체를 동일하게 상속받음
>### BartEncoder.\_\_init\_\_

In [27]:
print(f"""
dropout: {bart.encoder.dropout},
layerdrop: {bart.config.encoder_layerdrop}
embed_dim: {bart.config.d_model},
embed_scale: {math.sqrt(bart.config.d_model) if config.scale_embedding else 1.0},
padding_idx: {bart.config.pad_token_id},
max_source_positions: {bart.config.max_position_embeddings}
""".strip())

dropout: 0.1,
layerdrop: 0.0
embed_dim: 1024,
embed_scale: 1.0,
padding_idx: 1,
max_source_positions: 1024


In [28]:
bart.encoder.embed_tokens # __init__에서 받아올 수 있음

Embedding(50265, 1024, padding_idx=1)

In [30]:
print(bart.config.static_position_embeddings) # 21.01.06 commit으로 삭제!
bart.encoder.embed_positions

False


BartLearnedPositionalEmbedding(1026, 1024, padding_idx=1)

- config.static_position_embeddings에 따라 어떤 객체를 사용할지 갈림
    - if True, `BartSinusoidalPositionalEmbedding`
    - else: `BartLearnedPositionalEmbedding`
- config.encoder_layers의 수만큼 EncoderLayer를 쌓음

In [17]:
config.encoder_layers

12

In [18]:
bart.encoder.layers[0] # 이 layer를 12개 쌓음

BartEncoderLayer(
  (self_attn): BartAttention(
    (k_proj): Linear(in_features=1024, out_features=1024, bias=True)
    (v_proj): Linear(in_features=1024, out_features=1024, bias=True)
    (q_proj): Linear(in_features=1024, out_features=1024, bias=True)
    (out_proj): Linear(in_features=1024, out_features=1024, bias=True)
  )
  (self_attn_layer_norm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
  (fc1): Linear(in_features=1024, out_features=4096, bias=True)
  (fc2): Linear(in_features=4096, out_features=1024, bias=True)
  (final_layer_norm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
)

- layernorm을 어떻게 적용할지 보자

In [20]:
if config.normalize_embedding:
    print(BartLayerNorm(embed_dim))
else:
    print(nn.Identity())

LayerNorm((1024,), eps=1e-05, elementwise_affine=True)


In [21]:
if config.add_final_layer_norm:
    print(BartLayerNorm(config.d_model))
else:
    print(None)

None


In [22]:
def BartLayerNorm(
    normalized_shape: torch.Size, eps: float = 1e-5, elementwise_affine: bool = True
):
    if torch.cuda.is_available():
        try:
            from apex.normalization import FusedLayerNorm

            return FusedLayerNorm(normalized_shape, eps, elementwise_affine)
        except ImportError:
            pass
    return torch.nn.LayerNorm(normalized_shape, eps, elementwise_affine)

In [171]:
hidden_states= torch.randn(2, 5, 1024)

In [172]:
nn.LayerNorm(1024)(hidden_states)

tensor([[[-0.7331, -0.4191,  0.7951,  ...,  0.4873, -0.5494, -0.2944],
         [-0.7643,  1.8104, -0.0323,  ..., -0.4546,  0.5776, -0.7373],
         [-1.1619,  1.9948,  0.4805,  ..., -1.0691, -0.7803,  0.6411],
         [ 0.0236,  0.1118, -0.2880,  ..., -1.5818,  0.1992, -0.9446],
         [ 0.3735, -1.4478,  0.8767,  ...,  1.2091, -0.4567,  0.4698]],

        [[ 0.4276, -1.4758,  0.0165,  ...,  1.9631, -0.1555, -1.0019],
         [ 0.6768, -0.3537,  0.9676,  ..., -1.3469, -0.1781,  1.4861],
         [ 0.5480, -1.0024,  0.4656,  ...,  0.5370, -0.4840,  0.0959],
         [-1.5319,  0.8093, -0.3881,  ..., -0.5653, -0.3972,  0.5072],
         [-1.0625,  0.7415, -0.8247,  ...,  0.1221, -0.1593, -0.0284]]],
       grad_fn=<NativeLayerNormBackward>)

In [173]:
mean = hidden_states.mean(dim=-1)
mean = mean[:, :, None].expand((*mean.size(), 1024))
std = hidden_states.std(dim=-1, unbiased=False)
std = std[:, :, None].expand((*std.size(), 1024))

(hidden_states - mean) / std

tensor([[[-0.7331, -0.4191,  0.7951,  ...,  0.4873, -0.5494, -0.2944],
         [-0.7643,  1.8104, -0.0323,  ..., -0.4546,  0.5776, -0.7373],
         [-1.1619,  1.9948,  0.4805,  ..., -1.0691, -0.7803,  0.6411],
         [ 0.0236,  0.1118, -0.2880,  ..., -1.5819,  0.1992, -0.9446],
         [ 0.3735, -1.4478,  0.8767,  ...,  1.2091, -0.4567,  0.4698]],

        [[ 0.4276, -1.4758,  0.0165,  ...,  1.9631, -0.1555, -1.0019],
         [ 0.6768, -0.3537,  0.9676,  ..., -1.3469, -0.1781,  1.4862],
         [ 0.5480, -1.0024,  0.4656,  ...,  0.5370, -0.4840,  0.0959],
         [-1.5319,  0.8093, -0.3881,  ..., -0.5653, -0.3972,  0.5072],
         [-1.0625,  0.7415, -0.8247,  ...,  0.1221, -0.1593, -0.0284]]])

>### BartEncoder.forward
>#### Ch0. forward의 input
>```python
input_ids            = None,
attention_mask       = None,
inputs_embeds        = None,
output_attentions    = None,
output_hidden_states = None,
return_dict          = None,
>```

>#### Ch1. config으로 input setting

In [23]:
print(config.output_attentions)
print((config.output_hidden_states,))
print(config.use_return_dict)

False
(False,)
True


>- input을 아래의 코드로 처리
>```python
\# retrieve input_ids and inputs_embeds
if input_ids is not None and inputs_embeds is not None:
    raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time")
elif input_ids is not None:
    input_shape = input_ids.size()
    input_ids = input_ids.view(-1, input_shape[-1])
elif inputs_embeds is not None:
    input_shape = inputs_embeds.size()[:-1]
else:
    raise ValueError("You have to specify either input_ids or inputs_embeds")
if inputs_embeds is None:
    inputs_embeds = self.embed_tokens(input_ids) * self.embed_scale
>```

In [52]:
bart.encoder.embed_positions

BartLearnedPositionalEmbedding(1026, 1024, padding_idx=1)

In [34]:
bart.encoder.embed_positions(bart.dummy_inputs['input_ids'].size()).size()

torch.Size([5, 1024])

torch.Size([2, 5])

>#### 아래의 과정으로 처리
>- input을 embedding (주어진 경우는 그냥 넘어감)
>- input_shape으로 position vector 얻음
>- hidden_states를 input_embeds + embeds_pos로 계산
>- hidden_states를 LayerNorm해주고 Dropout 실시
>- attention_mask가 None이 아니면 아래 코드로 expand (**디코더랑 처리가 조금 다름!**)
>- output_hidden_states, output_attentions이 None이 아니면,
    - () tuple을 주고 None이면 그냥 None

In [23]:
def _expand_mask(mask: torch.Tensor, dtype: torch.dtype, tgt_len: Optional[int]=None):
    bsz, src_len = mask.size()
    tgt_len = tgt_len if tgt_len is not None else src_len
    expanded_mask = mask[:, None, None, :].expand(bsz, 1, tgt_len, src_len).to(dtype)
    inverted_mask = 1.0 - expanded_mask
    return inverted_mask.masked_fill(inverted_mask.bool(), torch.finfo(dtype).min)

>- 그리고 나선, BartEncoderLayer별로 아래의 연산을 수행
>```python
for encoder_layer in self.layers:
    if output_hidden_states:
        encoder_states = encoder_states + (hidden_states,)
    # add LayerDrop (see https://arxiv.org/abs/1909.11556 for description)
    dropout_probability = random.uniform(0, 1)
    if self.training and (dropout_probability < self.layerdrop):  # skip the layer
        attn = None
    else:
        hidden_states, attn = encoder_layer(
            hidden_states, 
            attention_mask, 
            output_attentions=output_attentions
        )
    if output_attentions:
        all_attentions = all_attentions + (attn,)
>```
>- 그 다음, layer_norm이 None이 아니면 hidden_states를 layer normalization
>- output_hidden_states가 None이 아니면, encoder_states에 (hidden_states,)를 더해줌
>- return_dict가 True인지 False인지에 따라 출력 결과물이 달라짐
    - `False`: 
    ```python 
    tuple(
        v for v in [
            hidden_states, encoder_states, all_attentions
        ] if v is not None
    )
    ```
    - `True`:
    ```python
    BaseModelOutput(
        last_hidden_state=hidden_states, 
        hidden_states=encoder_states, 
        attentions=all_attentions
    )
    ```

---

>### BartDecoder
>- `BartPretrainedModel` 객체를 동일하게 상속받음
>### BartDecoder.\_\_init\_\_

In [62]:
dropout = config.dropout
layerdrop = config.decoder_layerdrop

embed_dim = config.d_model
embed_scale = math.sqrt(embed_dim) if config.scale_embedding else 1.0
padding_idx = config.pad_token_id
max_source_positions = config.max_position_embeddings

print(f"""
dropout: {dropout}
layerdrop: {layerdrop}
embed_dim: {embed_dim},
embed_scale: {embed_scale},
padding_idx: {padding_idx},
max_source_positions: {max_source_positions}
""".strip())

dropout: 0.1
layerdrop: 0.0
embed_dim: 1024,
embed_scale: 1.0,
padding_idx: 1,
max_source_positions: 1024


In [63]:
# Decoder 차별점
do_blenderbot_90_layernorm = config.do_blenderbot_90_layernorm  # layernorm variant
do_blenderbot_90_layernorm

False

In [64]:
embed_tokens: Optional[nn.Embedding] = None

# None이면
embed_tokens = nn.Embedding(config.vocab_size, embed_dim, padding_idx)

# None이 아니면
embed_tokens = embed_tokens

- config.static_position_embeddings에 따라 어떤 객체를 사용할지 갈림
    - if True, `BartSinusoidalPositionalEmbedding`
    - else: `BartLearnedPositionalEmbedding`
- config.encoder_layers의 수만큼 EncoderLayer를 쌓음

In [65]:
config.decoder_layers

12

In [66]:
bart.decoder.layers[0] # 이 layer를 12개 쌓음

BartDecoderLayer(
  (self_attn): BartAttention(
    (k_proj): Linear(in_features=1024, out_features=1024, bias=True)
    (v_proj): Linear(in_features=1024, out_features=1024, bias=True)
    (q_proj): Linear(in_features=1024, out_features=1024, bias=True)
    (out_proj): Linear(in_features=1024, out_features=1024, bias=True)
  )
  (self_attn_layer_norm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
  (encoder_attn): BartAttention(
    (k_proj): Linear(in_features=1024, out_features=1024, bias=True)
    (v_proj): Linear(in_features=1024, out_features=1024, bias=True)
    (q_proj): Linear(in_features=1024, out_features=1024, bias=True)
    (out_proj): Linear(in_features=1024, out_features=1024, bias=True)
  )
  (encoder_attn_layer_norm): LayerNorm((1024,), eps=1e-05, elementwise_affine=True)
  (fc1): Linear(in_features=1024, out_features=4096, bias=True)
  (fc2): Linear(in_features=4096, out_features=1024, bias=True)
  (final_layer_norm): LayerNorm((1024,), eps=1e-05, elementwis

In [67]:
if config.normalize_embedding:
    print(BartLayerNorm(embed_dim)) # config.d_model과 동일
else:
    print(nn.Identity())

LayerNorm((1024,), eps=1e-05, elementwise_affine=True)


In [68]:
if config.add_final_layer_norm:
    print(BartLayerNorm(config.d_model))
else:
    print(None)

None


>### BartDecoder.forward
>#### Ch0. forward의 input
>```python
input_ids=None,
attention_mask=None,
encoder_hidden_states=None,
encoder_attention_mask=None,
past_key_values=None,
inputs_embeds=None,
use_cache=None,
output_attentions=None,
output_hidden_states=None,
return_dict=None,
>```

In [69]:
print(config.output_attentions)
print((config.output_hidden_states,))
print(config.use_cache) # 차이점
print(config.use_return_dict)

False
(False,)
True
True


>- input을 아래의 코드로 처리
>```python
\# retrieve input_ids and inputs_embeds
if input_ids is not None and inputs_embeds is not None:
    raise ValueError("You cannot specify both decoder_input_ids and decoder_inputs_embeds at the same time")
elif input_ids is not None:
    input_shape = input_ids.size()
    input_ids = input_ids.view(-1, input_shape[-1])
elif inputs_embeds is not None:
    input_shape = inputs_embeds.size()[:-1]
else:
    raise ValueError("You have to specify either decoder_input_ids or decoder_inputs_embeds")
if inputs_embeds is None:
    inputs_embeds = self.embed_tokens(input_ids) * self.embed_scale
>```

- Decoder 차이점

In [70]:
# past_key_values_length
past_key_values = None
past_key_values_length = past_key_values[0][0].shape[2] if past_key_values is not None else 0
past_key_values_length

0

- Decoder 차이점

In [71]:
attention_mask = bart.dummy_inputs['attention_mask']
input_shape = bart.dummy_inputs['input_ids'].size()
input_ids = bart.dummy_inputs['input_ids'].view(-1, input_shape[-1])
inputs_embeds = embed_tokens(input_ids) * embed_scale 

In [72]:
def _make_causal_mask(input_ids_shape: torch.Size, dtype: torch.dtype, past_key_values_length: int = 0):
    """
    Make causal mask used for bi-directional self-attention.
    """
    bsz, tgt_len = input_ids_shape
    mask = torch.full((tgt_len, tgt_len), float("-inf"))
    mask_cond = torch.arange(mask.size(-1))
    mask.masked_fill_(mask_cond < (mask_cond + 1).view(mask.size(-1), 1), 0)
    mask = mask.to(dtype)

    if past_key_values_length > 0:
        mask = torch.cat([torch.zeros(tgt_len, past_key_values_length, dtype=dtype), mask], dim=-1)
    return mask[None, None, :, :].expand(bsz, 1, tgt_len, tgt_len + past_key_values_length)

In [83]:
# Attentoin Mask 처리

# Create causal mask
# [bsz, seq_len] -> [bsz, 1, tgt_seq_len, srxc_seq_len]
combined_attention_mask = None
if input_shape[-1] > 1: # 걍 무조건 하는거나 다름없음
    combined_attention_mask = _make_causal_mask(
        input_shape, inputs_embeds.dtype, past_key_values_length=past_key_values_length
    )

In [84]:
combined_attention_mask.size()

torch.Size([2, 1, 5, 5])

In [85]:
# create decoder_padding_mask if not provided and needed
# 4.12.20 (PVP): Not a fan of this "magical" function that
# automatically creates attention_mask for padded tokens
# => this is inconsistent with other models
# => Pegasus uses the pad_token as decoder_start_token_id, so that this could
# pose some problems.
if (
    attention_mask is None
    and input_ids is not None
    and input_shape[-1] > 1
    and config.pad_token_id in input_ids
):
    # should be kept for backwards compatibility
    attention_mask = input_ids.ne(config.pad_token_id).to(torch.long)
    # never mask leading token, even if it is pad
    attention_mask[:, 0] = attention_mask[:, 1]
    if past_key_values_length > 0:
        attention_mask = torch.cat(
            [
                torch.ones(
                    (input_shape[0], past_key_values_length), dtype=torch.long, device=input_ids.device
                ),
                attention_mask,
            ],
            dim=-1,
        )

In [92]:
res = bart.encoder(**bart.dummy_inputs, return_dict=True, output_attentions=True, output_hidden_states=True)

In [97]:
res.attentions[-1].size()

torch.Size([2, 16, 5, 5])

In [None]:
# expand encoder attention mask
encoder_hidden_states = None
encoder_attention_mask = None

# BartModel에서 encoder의 결과값을 Decoder에 넣어줌!
if encoder_hidden_states is not None and encoder_attention_mask is not None:
    # [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len]
    encoder_attention_mask = _expand_mask(
        encoder_attention_mask, inputs_embeds.dtype, tgt_len=input_shape[-1])
else:
    print('지금은 None!')

In [110]:
if attention_mask is not None and combined_attention_mask is not None:
    # [bsz, seq_len] -> [bsz, 1, tgt_seq_len, src_seq_len]
    combined_attention_mask = combined_attention_mask + _expand_mask(
        attention_mask, inputs_embeds.dtype, tgt_len=input_shape[-1]
    )

>#### 아래의 과정으로 처리! (Encoder와 유사)
>- input_shape으로 position vector 얻음, past_key_values_length도 넣어줌
>
>#### Decoder에서 다른 점!
>- do_blenderbot_90_layernorm이 True가 아니라면 Encoder와 동일하게 계산
    - hidden_states를 input_embeds + embeds_pos로 계산
    - hidden_states를 LayerNorm해줌
>- do_blenderbot_90_layernorm가 True면
    - inputs_embeds를 LayerNorm해주고 (이 결과값이 hidden_states)
    - hidden_states에 embeds_pos를 더해줌
>- 이 후, Dropout
>- 그리고 나서 아래 값들에 대해 Tuple을 할당
>```python
all_hidden_states = () if output_hidden_states else None
all_self_attns = () if output_attentions else None
all_cross_attentions = () if output_attentions else None
next_decoder_cache = () if use_cache else None
>```

>- 그리고 나선, BartDecoderLayer별로 아래의 연산을 수행
>```python
for idx, decoder_layer in enumerate(self.layers): # Add idx
    # add LayerDrop (see https://arxiv.org/abs/1909.11556 for description)
    if output_hidden_states:
        # encoder_states = encoder_states + (hidden_states,) # Encoder
        all_hidden_states += (hidden_states,)
    dropout_probability = random.uniform(0, 1)
    if self.training and (dropout_probability < self.layerdrop):  # skip the layer
        # attn = None
        # Encoder에선 if output_attentions: 구문을 도는데
        # Decoder에선 걍 continue
        continue
    hidden_states, layer_self_attn, present_key_value, layer_cross_attn = decoder_layer(
        hidden_states, 
        attention_mask=combined_attention_mask, 
        encoder_hidden_states=encoder_hidden_states,
        encoder_attention_mask=encoder_attention_mask,
        past_key_value=past_key_value,
        output_attentions=output_attentions,
    )
    # Decoder에서 추가된 부분
    if use_cache:
        next_decoder_cache += (present_key_value,)
    if output_attentions:
        # all_attentions = all_attentions + (attn,)
        all_self_attns += (layer_self_attn,)
        all_cross_attentions += (layer_cross_attn,)
if output_hidden_states: # add hidden states from the last decoder layer
    all_hidden_states += (hidden_states,)
>```
>- output_hidden_states가 None이 아니면, all_hidden_states에 (hidden_states,)를 더해줌
    - encoder_states였었음
>- 그 다음, layer_norm이 None이 아니면 hidden_states를 layer normalization
>- use_cache가 True면 next_decoder_cache를, 아니면 None을 next_cache에 할당
>- return_dict가 True인지 False인지에 따라 출력 결과물이 달라짐
    - `False`: 
    ```python 
    tuple(
        v for v in [
            hidden_states, next_cache, all_hidden_states, all_self_attns, all_cross_attentions
        ] if v is not None
    )
    ```
    - `True`:
    ```python
    BaseModelOutputWithPastAndCrossAttentions(
        last_hidden_state=hidden_states,
        past_key_values=next_cache,
        hidden_states=all_hidden_states,
        attentions=all_self_attns,
        cross_attentions=all_cross_attentions,
    )
    ```

---

## BartModel.forward

### Ch0. forward의 input

```python
input_ids              = None
attention_mask         = None
decoder_input_ids      = None
decoder_attention_mask = None
encoder_outputs        = None
past_key_values        = None
inputs_embeds          = None
decoder_inputs_embeds  = None
use_cache              = None
output_attentions      = None
output_hidden_states   = None
return_dict            = None
```

### 1. config으로 input setting

In [3]:
p(config.pad_token_id)
p(config.output_attentions)
p(config.output_hidden_states)
p(config.use_cache)
p(config.use_return_dict)

1
False
False
True
True


In [110]:
input_ids = bart.dummy_inputs['input_ids']

In [113]:
bart.encoder(input_ids).last_hidden_state.size()

torch.Size([2, 5, 1024])

In [125]:
decoder_output = bart.decoder(
    input_ids, encoder_hidden_states=bart.encoder(input_ids).last_hidden_state)

In [126]:
decoder_output.keys()

odict_keys(['last_hidden_state', 'past_key_values'])

In [127]:
decoder_output.last_hidden_state.size()

torch.Size([2, 5, 1024])

In [143]:
reconstruction = torch.randn(32, 10, 1024)
reconstruction = torch.softmax(reconstruction, dim=-1)

clean = torch.LongTensor(32, 10).random_(10000)

In [145]:
nn.CrossEntropyLoss()(reconstruction, clean)

ValueError: Expected target size (32, 1024), got torch.Size([32, 10])

In [130]:
set([v.size() for pkv in decoder_output.past_key_values for v in pkv])

{torch.Size([2, 16, 5, 64])}

In [102]:
def shift_tokens_right(input_ids: torch.Tensor, pad_token_id: int):
    """
    Shift input ids one token to the right, and wrap the last non pad token (usually <eos>).
    """
    prev_output_tokens = input_ids.clone()

    assert pad_token_id is not None, "self.model.config.pad_token_id has to be defined."
    # replace possible -100 values in labels by `pad_token_id`
    prev_output_tokens.masked_fill_(prev_output_tokens == -100, pad_token_id)

    index_of_eos = (prev_output_tokens.ne(pad_token_id).sum(dim=1) - 1).unsqueeze(-1)
    decoder_start_tokens = prev_output_tokens.gather(1, index_of_eos).squeeze()
    prev_output_tokens[:, 1:] = prev_output_tokens[:, :-1].clone()
    prev_output_tokens[:, 0] = decoder_start_tokens

    return prev_output_tokens

In [104]:
input_ids

tensor([[ 0,  6, 10,  4,  2],
        [ 0,  8, 12,  2,  1]])

In [107]:
from transformers import BartTokenizer

tokenizer = BartTokenizer.from_pretrained('facebook/bart-large')

In [109]:
tokenizer.eos_token_id

2

In [105]:
shift_tokens_right(input_ids, 1)

tensor([[ 2,  0,  6, 10,  4],
        [ 2,  0,  8, 12,  2]])

### Ch2. Encoder
#### Ch2.번외 Enc output -> BaseModelOutput setting

- return_dict가 True이고
- encoder_outputs이 BaseModelOutput 객체가 아니면
- 아래 코드로 형변환시켜줌

```python
encoder_outputs = BaseModelOutput(
    last_hidden_state=encoder_outputs[0],
    hidden_states=encoder_outputs[1] if len(encoder_outputs) > 1 else None,
    attentions=encoder_outputs[2] if len(encoder_outputs) > 2 else None,
)

```

### Ch3. Decoder

### Ch4. 최종 output
- return_dict가 False일 경우엔
    - decoder_outputs + encoder_outputs를 출력
- 그 외의 경우엔
    - Seq2SeqModelOutput에 결과값을 입력 후 출력

In [47]:
output = bart(**bart.dummy_inputs)

In [52]:
output.keys()

odict_keys(['last_hidden_state', 'past_key_values', 'encoder_last_hidden_state'])

In [54]:
output.last_hidden_state.size()

torch.Size([2, 5, 1024])

In [72]:
set([v.size() for pkv in output.past_key_values for v in pkv])

{torch.Size([2, 16, 5, 64])}

In [79]:
16 * 64, config.d_model, config.encoder_attention_heads, config.decoder_attention_heads

(1024, 1024, 16, 16)

In [74]:
output.encoder_last_hidden_state.size()

torch.Size([2, 5, 1024])

## Other Methods

In [None]:
class BartModel(BartPretrainedModel):
    
    def __init__(self, config: BartConfig):
        pass
    
    @overrides
    def get_input_embeddings(self):
        return self.shared
        
    @overrides
    def set_input_embeddings(self, value):
        self.shared = value
        self.encoder.embed_tokens = self.shared
        self.decoder.embed_tokens = self.shared
        
    def get_encoder(self):
        return self.encoder
    
    def get_decoder(self):
        return self.decoder

## 번외

### PreTrainedModel 분석

In [36]:
from transformers.configuration_utils import PretrainedConfig

# file_utils.py
DUMMY_INPUTS = [[7, 6, 0, 0, 1], [1, 2, 3, 0, 0], [0, 0, 0, 4, 5]]
DUMMY_MASK = [[1, 1, 1, 1, 1], [1, 1, 1, 0, 0], [0, 0, 0, 1, 1]]

pt_config = PretrainedConfig()

In [37]:
from transformers.modeling_utils import ModuleUtilsMixin
from transformers.generation_utils import GenerationMixin # Beam Search 파보쟈

In [38]:
pt_config.is_encoder_decoder, pt_config.tie_word_embeddings

(False, True)

In [39]:
config.is_encoder_decoder, config.tie_word_embeddings

(True, True)

In [40]:
config.pruned_heads, pt_config.pruned_heads

({}, {})

In [72]:
class PreTrainedModel(nn.Module, ModuleUtilsMixin, GenerationMixin):
    config_class = None
    base_model_prefix = ""
    _keys_to_ignore_on_load_missing = None
    _keys_to_ignore_on_load_unexpected = None
    _keys_to_ignore_on_save = None
    
    @property
    def dummy_inputs(self) -> Dict[str, torch.Tensor]:
        return {'input_ids': torch.tensor(DUMMY_INPUTS)}
    
    def __init__(self, config: PretrainedConfig, *inputs, **kwargs):
        super().__init__()
        if not isinstance(config, PretrainedConfig):
            raise ValueError(
                "Parameter config in `{}(config)` should be an instance of class `PretrainedConfig`. "
                "To create a model from a pretrained model use "
                "`model = {}.from_pretrained(PRETRAINED_MODEL_NAME)`".format(
                    self.__class__.__name__, self.__class__.__name__
                )
            )
        # Save config and origin of the pretrained weights if given in model
        self.config = config
        self.name_or_path = config.name_or_path
        
    @property
    def base_model(self) -> nn.Module:
        return getattr(self, self.base_model_prefix, self)
    
    def get_input_embeddings(self) -> nn.Module:
        base_model = getattr(self, self.base_model_prefix, self)
        if base_model is not self:
            return base_model.get_input_embeddings()
        else:
            raise NotImplementedError
            
    def set_input_embeddings(self, value: nn.Module):
        base_model = getattr(self, self.base_model_prefix, self)
        if base_model is not self:
            base_model.set_input_embeddings(value)
        else:
            raise NotImplementedError
            
    def get_output_embeddings(self) -> nn.Module:
        return None # Overwrite for models with output embeddings
    
    def tie_weights(self):
        output_embeddings = self.get_output_embeddings()
        if output_embeddings is not None and self.config.tie_word_embeddings:
            self._tie_or_clone_weights(output_embeddings, self.get_input_embeddings())

        if self.config.is_encoder_decoder and self.config.tie_encoder_decoder:
            if hasattr(self, self.base_model_prefix):
                self = getattr(self, self.base_model_prefix)
            self._tie_encoder_decoder_weights(
                self.encoder, self.decoder, self.base_model_prefix)
            
    @staticmethod
    def _tie_encoder_decoder_weights(encoder: nn.Module, decoder: nn.Module, base_model_prefix: str):
        uninitialized_encoder_weights: List[str] = []
        """
        1. encoder, decoder class가 같은지 체크!
            >> In this case make sure that all encoder weights are correctly initialized.
        2. weights를 recursively하게 tie
            >> tie_encoder_to_decoder_recursively 함수는 내부에서 구현되어있음
        """
        tie_encoder_to_decoder_recursively(
            decoder, encoder, base_model_prefix, uninitialized_encoder_weights)
        
    def _tie_or_clone_weights(self, output_embeddings, input_embeddings):
        """
        Tie or clone module weights depending of whether we are using
        TorchScript or not
        """
        if self.config.torchscript:
            output_embeddings.weight = nn.Parameter(input_embeddings.weight.clone())
        else:
            output_embeddings.weight = input_embeddings.weight

        if getattr(output_embeddings, "bias", None) is not None:
            output_embeddings.bias.data = torch.nn.functional.pad(
                output_embeddings.bias.data,
                (
                    0,
                    output_embeddings.weight.shape[0] - output_embeddings.bias.shape[0],
                ),
                "constant",
                0,
            )
        if hasattr(output_embeddings, "out_features") and hasattr(input_embeddings, "num_embeddings"):
            output_embeddings.out_features = input_embeddings.num_embeddings
            
    def resize_token_embeddings(self, new_num_tokens: Optional[int] = None) -> torch.nn.Embedding:
        pass
    
    def _resize_token_embeddings(self, new_num_tokens):
        pass
    
    def _get_resized_embeddings(
        self, old_embeddings: torch.nn.Embedding, new_num_tokens: Optional[int] = None
    ) -> torch.nn.Embedding:
        pass
    
    def _get_resized_lm_head(
        self, old_lm_head: torch.nn.Linear, new_num_tokens: Optional[int] = None, transposed: Optional[bool] = False
    ) -> torch.nn.Linear:
        pass
    
    def init_weights(self):
        """
        Initializes and prunes weights if needed.
        """
        # Initialize weights
        self.apply(self._init_weights)

        # Prune heads if needed
        if self.config.pruned_heads:
            self.prune_heads(self.config.pruned_heads)

        # Tie weights if needed
        self.tie_weights()
        
    def prune_heads(self, heads_to_prune: Dict[int, List[int]]):
        """
        Prunes heads of the base model.
        Arguments:
            heads_to_prune (:obj:`Dict[int, List[int]]`):
                Dictionary with keys being selected layer indices (:obj:`int`) and associated values being the list of
                heads to prune in said layer (list of :obj:`int`). For instance {1: [0, 2], 2: [2, 3]} will prune heads
                0 and 2 on layer 1 and heads 2 and 3 on layer 2.
        """
        # save new sets of pruned heads as union of previously stored pruned heads and newly pruned heads
        for layer, heads in heads_to_prune.items():
            union_heads = set(self.config.pruned_heads.get(layer, [])) | set(heads)
            self.config.pruned_heads[layer] = list(union_heads)  # Unfortunately we have to store it as list for JSON

        self.base_model._prune_heads(heads_to_prune)
        
    def save_pretrained(self, save_directory: Union[str, os.PathLike]):
        pass
    
    @classmethod
    def from_pretrained(cls, pretrained_model_name_or_path: Optional[Union[str, os.PathLike]], *model_args, **kwargs):
        pass

In [68]:
from transformers.utils import logging

In [69]:
logger = logging.get_logger(__name__)

logger.info(
    f"{decoder.__class__} and {encoder.__class__} are not equal. In this case make sure that all encoder weights are correctly initialized."
)

In [58]:
bart.encoder.__class__ != bart.decoder.__class__

True

### nn.Module의 apply 메서드

In [1]:
from typing import TypeVar, Callable

T = TypeVar('T', bound='Module')

def apply(self: T, fn: Callable[['Module'], None]) -> T:
    for module in self.children():
        module.apply(fn)
    fn(self)
    return self

### Encoder, Decoder의 embed_positions
- forward의 인자가 tensor가 아니라 torch.Size!

In [37]:
class BartSinusoidalPositionalEmbedding(nn.Embedding):
    """This module produces sinusoidal positional embeddings of any length."""

    def __init__(self, num_positions: int, embedding_dim: int, padding_idx: Optional[int] = None):
        super().__init__(num_positions, embedding_dim)
        self.weight = self._init_weight(self.weight)

    @staticmethod
    def _init_weight(out: nn.Parameter):
        """
        Identical to the XLM create_sinusoidal_embeddings except features are not interleaved. 
        The cos features are in the 2nd half of the vector. [dim // 2:]
        """
        n_pos, dim = out.shape
        position_enc = np.array(
            [
                [pos / np.power(10000, 2 * (j // 2) / dim) for j in range(dim)]
                for pos in range(n_pos)
            ]
        )
        out.requires_grad = False  # set early to avoid an error in pytorch-1.8+
        sentinel = dim // 2 if dim % 2 == 0 else (dim // 2) + 1
        out[:, 0:sentinel] = torch.FloatTensor(np.sin(position_enc[:, 0::2]))
        out[:, sentinel:]  = torch.FloatTensor(np.cos(position_enc[:, 1::2]))
        out.detach_()
        return out

    @torch.no_grad()
    def forward(self, input_ids_shape: torch.Size, past_key_values_length: int = 0):
        """`input_ids_shape` is expected to be [bsz x seqlen]."""
        bsz, seq_len = input_ids_shape[:2]
        positions = torch.arange(
            past_key_values_length,
            past_key_values_length + seq_len, 
            dtype=torch.long, 
            device=self.weight.device
        )
        return super().forward(positions)

$$\cfrac{pos}{10000^{\cfrac{2}{d_{model}}}}$$

In [53]:
class BartLearnedPositionalEmbedding(nn.Embedding):
    """
    This module learns positional embeddings up to a fixed maximum size. Padding ids are ignored by either offsetting
    based on padding_idx or by setting padding_idx to None and ensuring that the appropriate position ids are passed to
    the forward function.
    """

    def __init__(
        self, num_embeddings: int, embedding_dim: int, padding_idx: int, offset: int
    ):
        # Bart is set up so that if padding_idx is specified then offset the embedding ids by 2
        # and adjust num_embeddings appropriately. Other models dont have this hack
        self.offset = offset
        assert padding_idx is not None, "`padding_idx` should not be None, but of type int"
        num_embeddings += offset
        super().__init__(num_embeddings, embedding_dim, padding_idx=padding_idx)

    def forward(self, input_ids_shape: torch.Size, past_key_values_length: int = 0):
        """`input_ids_shape` is expected to be [bsz x seqlen]."""
        bsz, seq_len = input_ids_shape[:2]
        positions = torch.arange(
            past_key_values_length, 
            past_key_values_length + seq_len, 
            dtype=torch.long, 
            device=self.weight.device
        )
        return super().forward(positions + self.offset)

### BartEncoderLayer

In [None]:
class BartEncoderLayer(nn.Module):
    def __init__(self, config: BartConfig):
        super().__init__()
        self.embed_dim = config.d_model
        self.self_attn = BartAttention(
            embed_dim=self.embed_dim,
            num_heads=config.encoder_attention_heads,
            dropout=config.attention_dropout,
        )
        self.normalize_before = config.normalize_before
        self.self_attn_layer_norm = BartLayerNorm(self.embed_dim)
        self.dropout = config.dropout
        self.activation_fn = ACT2FN[config.activation_function]
        self.activation_dropout = config.activation_dropout
        self.fc1 = nn.Linear(self.embed_dim, config.encoder_ffn_dim)
        self.fc2 = nn.Linear(config.encoder_ffn_dim, self.embed_dim)
        self.final_layer_norm = BartLayerNorm(self.embed_dim)

    def forward(self, hidden_states: torch.Tensor, attention_mask: torch.Tensor, output_attentions: bool = False):
        """
        Args:
            hidden_states (:obj:`torch.FloatTensor`): input to the layer of shape `(seq_len, batch, embed_dim)`
            attention_mask (:obj:`torch.FloatTensor`): attention mask of size
                `(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values.
            output_attentions (:obj:`bool`): Whether the base model outputs attentions. This requires the attentions tensor to be reshaped in this function.
        """
        residual = hidden_states
        if self.normalize_before:
            hidden_states = self.self_attn_layer_norm(hidden_states)
        hidden_states, attn_weights, _ = self.self_attn(
            hidden_states=hidden_states, attention_mask=attention_mask, output_attentions=output_attentions
        )
        hidden_states = F.dropout(hidden_states, p=self.dropout, training=self.training)
        hidden_states = residual + hidden_states
        if not self.normalize_before:
            hidden_states = self.self_attn_layer_norm(hidden_states)

        residual = hidden_states
        if self.normalize_before:
            hidden_states = self.final_layer_norm(hidden_states)
        hidden_states = self.activation_fn(self.fc1(hidden_states))
        hidden_states = F.dropout(hidden_states, p=self.activation_dropout, training=self.training)
        hidden_states = self.fc2(hidden_states)
        hidden_states = F.dropout(hidden_states, p=self.dropout, training=self.training)
        hidden_states = residual + hidden_states
        if not self.normalize_before:
            hidden_states = self.final_layer_norm(hidden_states)
        if torch.isinf(hidden_states).any() or torch.isnan(hidden_states).any():
            clamp_value = torch.finfo(hidden_states.dtype).max - 1000
            hidden_states = torch.clamp(hidden_states, min=-clamp_value, max=clamp_value)
        return hidden_states, attn_weights

### BartDecoderLayer

In [None]:
class BartDecoderLayer(nn.Module):
    def __init__(self, config: BartConfig):
        super().__init__()
        self.embed_dim = config.d_model

        self.self_attn = BartAttention(
            embed_dim=self.embed_dim,
            num_heads=config.decoder_attention_heads,
            dropout=config.attention_dropout,
            is_decoder=True,
        )
        self.dropout = config.dropout
        self.activation_fn = ACT2FN[config.activation_function]
        self.activation_dropout = config.activation_dropout
        self.normalize_before = config.normalize_before

        self.self_attn_layer_norm = BartLayerNorm(self.embed_dim)
        self.encoder_attn = BartAttention(
            self.embed_dim,
            config.decoder_attention_heads,
            dropout=config.attention_dropout,
            is_decoder=True,
        )
        self.encoder_attn_layer_norm = BartLayerNorm(self.embed_dim)
        self.fc1 = nn.Linear(self.embed_dim, config.decoder_ffn_dim)
        self.fc2 = nn.Linear(config.decoder_ffn_dim, self.embed_dim)
        self.final_layer_norm = BartLayerNorm(self.embed_dim)

    def forward(
        self,
        hidden_states: torch.Tensor,
        attention_mask: Optional[torch.Tensor] = None,
        encoder_hidden_states: Optional[torch.Tensor] = None,
        encoder_attention_mask: Optional[torch.Tensor] = None,
        past_key_value: Optional[Tuple[torch.Tensor]] = None,
        output_attentions: Optional[torch.Tensor] = False,
    ):
        """
        Args:
            hidden_states (:obj:`torch.FloatTensor`): input to the layer of shape `(seq_len, batch, embed_dim)`
            attention_mask (:obj:`torch.FloatTensor`): attention mask of size
                `(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values.
            encoder_hidden_states (:obj:`torch.FloatTensor`): cross attention input to the layer of shape `(seq_len, batch, embed_dim)`
            encoder_attention_mask (:obj:`torch.FloatTensor`): encoder attention mask of size
                `(batch, 1, tgt_len, src_len)` where padding elements are indicated by very large negative values.
            past_key_value (:obj:`Tuple(torch.FloatTensor)`): cached past key and value projection states
            output_attentions (:obj:`bool`): Whether the base model outputs attentions. This requires the attentions tensor to be reshaped in this function.
        """
        residual = hidden_states
        if self.normalize_before:
            hidden_states = self.self_attn_layer_norm(hidden_states)

        # Self Attention
        # decoder uni-directional self-attention cached key/values tuple is at positions 1,2
        self_attn_past_key_value = past_key_value[:2] if past_key_value is not None else None
        # add present self-attn cache to positions 1,2 of present_key_value tuple
        hidden_states, self_attn_weights, present_key_value = self.self_attn(
            hidden_states=hidden_states,
            past_key_value=self_attn_past_key_value,
            attention_mask=attention_mask,
            output_attentions=output_attentions,
        )
        hidden_states = F.dropout(hidden_states, p=self.dropout, training=self.training)
        hidden_states = residual + hidden_states
        if not self.normalize_before:
            hidden_states = self.self_attn_layer_norm(hidden_states)

        # Cross-Attention Block
        cross_attn_present_key_value = None
        cross_attn_weights = None
        if encoder_hidden_states is not None:
            residual = hidden_states
            if self.normalize_before:
                hidden_states = self.encoder_attn_layer_norm(hidden_states)

            # cross_attn cached key/values tuple is at positions 3,4 of present_key_value tuple
            cross_attn_past_key_value = past_key_value[-2:] if past_key_value is not None else None
            hidden_states, cross_attn_weights, cross_attn_present_key_value = self.encoder_attn(
                hidden_states=hidden_states,
                key_value_states=encoder_hidden_states,
                attention_mask=encoder_attention_mask,
                past_key_value=cross_attn_past_key_value,
                output_attentions=output_attentions,
            )
            hidden_states = F.dropout(hidden_states, p=self.dropout, training=self.training)
            hidden_states = residual + hidden_states
            if not self.normalize_before:
                hidden_states = self.encoder_attn_layer_norm(hidden_states)

            # add cross-attn to positions 3,4 of present_key_value tuple
            present_key_value = present_key_value + cross_attn_present_key_value

        # Fully Connected
        residual = hidden_states
        if self.normalize_before:
            hidden_states = self.final_layer_norm(hidden_states)
        hidden_states = self.activation_fn(self.fc1(hidden_states))
        hidden_states = F.dropout(hidden_states, p=self.activation_dropout, training=self.training)
        hidden_states = self.fc2(hidden_states)
        hidden_states = F.dropout(hidden_states, p=self.dropout, training=self.training)
        hidden_states = residual + hidden_states
        if not self.normalize_before:
            hidden_states = self.final_layer_norm(hidden_states)

        return (
            hidden_states,
            self_attn_weights,
            present_key_value,
            cross_attn_weights,
        )