![](https://substackcdn.com/image/fetch/f_auto,q_auto:good,fl_progressive:steep/https%3A%2F%2Fbucketeer-e05bbc84-baa3-437e-9518-adb32be77984.s3.amazonaws.com%2Fpublic%2Fimages%2Ffe697a9b-6a11-489b-9701-15d42d3feb15_2006x1206.png)

## マスク言語モデリング（MLM）：

入力文の15%のトークンをランダムに[MASK]に置き換える。　　　
- モデルはこれらのマスクされた単語を予測する。　　　
- 例えば、「[MASK]が[MASK]を飛び越える」という文から「犬」「柵」を予測します。   

-> これにより、BERTは文脈における単語の意味を理解できるようになります


## 次文予測（NSP）：

2つの文が与えられた時、文Bが文Aの実際の次の文であるかを予測する。
例：
- 文A：「男性は店に行きました」
- 文B：「彼は牛乳を買いました」（関連あり、IsNextと予測）
- 文B：「ペンギンは南極に住んでいます」（関連なし、NotNextと予測）

-> これにより、BERTは文と文の関係性を理解できるようになります

In [8]:
# パラメータ設定
maxlen = 30  
batch_size = 6  
max_pred = 5  
n_layers = 6  
n_heads = 12  
d_model = 768  
d_ff = 768*4  
d_k = d_v = 64  
n_segments = 2  
num_epochs = 40

##  パラメータ設定

1. データ処理関連:   
-  maxlen: 各文の最大トークン数, これより長い文は切り捨て, 短い文はパディング  
- batch_size: 1回の学習更新で処理するサンプル数
- max_pred: マスク予測の最大数, MLMタスクで予測する単語の最大数, 入力長の15%程度になるよう調整


2. モデルアーキテクチャ

- n_layers: モデルの深さ, Transformerレイヤーの数
- n_heads: マルチヘッドアテンションのヘッド数, 並列処理の度合い
- d_model: 表現力に関わる次元数, フィードフォワードネットワークの中間層の次元数, 通常、d_modelの4倍に設定
- d_ff, d_k, d_v: キーと値のベクトル次元数, d_modelをヘッド数で割った値に設定されることが多い
- n_segments: タスク固有の設定, NSPタスクで使用するセグメント種類数(文1と文2の区別)


3. 学習設定

- num_epochs: 学習の反復回数

In [9]:
# トークナイザの特殊トークンを確認
from transformers import BertTokenizer, BertModel
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
special_tokens = tokenizer.special_tokens_map

special_tokens



{'unk_token': '[UNK]',
 'sep_token': '[SEP]',
 'pad_token': '[PAD]',
 'cls_token': '[CLS]',
 'mask_token': '[MASK]'}

## 特殊トークン

1. [UNK] (Unknown Token)
未知の単語を表すトークンだ。BERTの語彙（ボキャブラリ）に存在しない単語が入力された場合、自動的にこのトークンに置き換えられる。    

2. [SEP] (Separator Token)
テキストのセクションを分けるためのトークンだ。   
例：2つの文を区別する際に使います。    
文1: これは文だ。[SEP]    
文2: こちらも別の文だ。    

3. [PAD] (Padding Token)
入力を一定の長さに揃えるためのトークンだ。短い文を埋める際に使用され、モデルには無視される。

4. [CLS] (Classification Token)
文全体の情報を要約するためのトークンだ。     
例：文分類タスクでは、このトークンの出力を使ってクラスを予測する。   

5. [MASK] (Mask Token)
マスクされたトークンを表す。     
主にBERTの事前学習（マスク付き言語モデル）で使用され、文中の一部を隠して予測するタスクで用いられる。

# Data Preprocess

In [10]:
import re
import numpy as np

text = (
    'Hello, how are you? I am Romeo.\n' 
    'Hello, Romeo My name is Juliet. Nice to meet you.\n'
    'Nice to meet you too. How are you today?\n'
    'Great. My baseball team won the competition.\n'
    'Oh Congratulations, Juliet\n'
    'Thank you Romeo'
)

# 文章を前処理：
# - 句読点などを削除
# - 小文字に変換
# - 改行で分割
sentences = re.sub("[.,!?-]", '', text.lower()).split('\n')  # filter '.', ',', '?', '!'

word_list = list(set(" ".join(sentences).split()))
# 特殊トークンを含む単語辞書の初期化
word_dict = {'[PAD]': 0, '[CLS]': 1, '[SEP]': 2, '[MASK]': 3}
for i, w in enumerate(word_list):
   word_dict[w] = i + 4
   number_dict = {i: w for i, w in enumerate(word_dict)}
vocab_size = len(word_dict)
number_dict, vocab_size

({0: '[PAD]',
  1: '[CLS]',
  2: '[SEP]',
  3: '[MASK]',
  4: 'competition',
  5: 'nice',
  6: 'oh',
  7: 'to',
  8: 'you',
  9: 'how',
  10: 'today',
  11: 'thank',
  12: 'am',
  13: 'baseball',
  14: 'name',
  15: 'i',
  16: 'won',
  17: 'congratulations',
  18: 'meet',
  19: 'great',
  20: 'juliet',
  21: 'the',
  22: 'hello',
  23: 'are',
  24: 'romeo',
  25: 'my',
  26: 'team',
  27: 'too',
  28: 'is'},
 29)

In [11]:
token_list = []
for sentence in sentences:
   arr = [word_dict[s] for s in sentence.split()]
   token_list.append(arr)
   print(sentence)
   # print(arr)

token_list

hello how are you i am romeo
hello romeo my name is juliet nice to meet you
nice to meet you too how are you today
great my baseball team won the competition
oh congratulations juliet
thank you romeo


[[22, 9, 23, 8, 15, 12, 24],
 [22, 24, 25, 14, 28, 20, 5, 7, 18, 8],
 [5, 7, 18, 8, 27, 9, 23, 8, 10],
 [19, 25, 13, 26, 16, 21, 4],
 [6, 17, 20],
 [11, 8, 24]]

In [None]:
from random import randrange, shuffle, randint, random


def make_batch():
   batch = []
   positive = negative = 0 # 連続する文の場合は正例、連続しない文の場合は負例、数をbatch_sizeの半分ずつ生成

   # バッチサイズの半分ずつになるまでループ
   while positive != batch_size/2 or negative != batch_size/2:
       # ランダムに2つの文を選択
       tokens_a_index, tokens_b_index= randrange(len(sentences)), randrange(len(sentences))

       # 選択された文のトークン列を取得
       tokens_a, tokens_b= token_list[tokens_a_index], token_list[tokens_b_index]

       # 入力形式の作成：[CLS] + 1番目の文 + [SEP] + 2番目の文 + [SEP]
       input_ids = [word_dict['[CLS]']] + tokens_a + [word_dict['[SEP]']] + tokens_b + [word_dict['[SEP]']]
       # セグメントID: 1番目の文には0、2番目の文には1を割り当て
       segment_ids = [0] * (1 + len(tokens_a) + 1) + [1] * (len(tokens_b) + 1)

        # === マスク言語モデル（MLM）の処理 ===
        # マスクする単語数を決定（文長の15%、ただし最大値と最小値で制限）
       n_pred =  min(max_pred, max(1, int(round(len(input_ids) * 0.15)))) 
       
        # マスク候補位置の選定（[CLS]と[SEP]以外の全ての位置）
       cand_maked_pos = [i for i, token in enumerate(input_ids)
                         if token != word_dict['[CLS]'] and token != word_dict['[SEP]']]
       
       shuffle(cand_maked_pos)
       masked_tokens, masked_pos = [], []

       # 選択された位置に対してマスク処理を実行
       for pos in cand_maked_pos[:n_pred]:
           masked_pos.append(pos)
           masked_tokens.append(input_ids[pos])
           if random() < 0.8:  # 80%の確率で[MASK]トークンに置き換え
               input_ids[pos] = word_dict['[MASK]'] # make mask
           elif random() < 0.5:  # 10%の確率でランダムな単語に置き換え
               index = randint(0, vocab_size - 1) # random index in vocabulary
               input_ids[pos] = word_dict[number_dict[index]] # replace


       # === パディング処理 ===
       # 入力系列を最大長(maxlen)まで0でパディング
       n_pad = maxlen - len(input_ids)
       input_ids.extend([0] * n_pad)
       segment_ids.extend([0] * n_pad)

       # マスクされたトークンも最大予測数(max_pred)まで0でパディング
       if max_pred > n_pred:
           n_pad = max_pred - n_pred
           masked_tokens.extend([0] * n_pad)
           masked_pos.extend([0] * n_pad)


        # === Next Sentence Prediction (NSP) のラベル付け ===
        # 連続する文の場合は正例として追加
       if tokens_a_index + 1 == tokens_b_index and positive < batch_size/2:
           batch.append([input_ids, segment_ids, masked_tokens, masked_pos, True]) # IsNext
           positive += 1

        # 連続しない文の場合は負例として追加
       elif tokens_a_index + 1 != tokens_b_index and negative < batch_size/2:
           batch.append([input_ids, segment_ids, masked_tokens, masked_pos, False]) # NotNext
           negative += 1
   return batch

In [32]:
# create a batch
batch = make_batch()

# バッチの内容を確認
for b in batch[:3]:
    input_ids, segment_ids, masked_tokens, masked_pos, isNext = b
    index = batch.index(b)
    print(f"Input IDs, batch[{index}][0]: {input_ids}")
    print(f"Segment IDs, batch[{index}][1]: {segment_ids}")
    print(f"Masked Tokens, batch[{index}][2]: {masked_tokens}")
    print(f"Masked Positions, batch[{index}][3]: {masked_pos}")
    print(f"Is Next, batch[{index}][4]: {isNext}")
    print()

Input IDs, batch[0][0]: [1, 5, 7, 18, 8, 27, 9, 23, 8, 10, 2, 6, 17, 3, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Segment IDs, batch[0][1]: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Masked Tokens, batch[0][2]: [20, 10, 0, 0, 0]
Masked Positions, batch[0][3]: [13, 9, 0, 0, 0]
Is Next, batch[0][4]: False

Input IDs, batch[1][0]: [1, 19, 25, 13, 26, 16, 21, 4, 2, 3, 17, 20, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Segment IDs, batch[1][1]: [0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Masked Tokens, batch[1][2]: [19, 6, 0, 0, 0]
Masked Positions, batch[1][3]: [1, 9, 0, 0, 0]
Is Next, batch[1][4]: True

Input IDs, batch[2][0]: [1, 6, 17, 20, 2, 11, 8, 24, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Segment IDs, batch[2][1]: [0, 0, 0, 0, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Masked Tokens, batch[2][2]: [8, 0, 0, 0, 0]


In [35]:
# マスクされたトークンのattention maskを作成
def get_attn_pad_mask(seq_q, seq_k):
    batch_size, len_q = seq_q.size()
    batch_size, len_k = seq_k.size()
    pad_attn_mask = seq_k.data.eq(0).unsqueeze(1)   # batch_size x 1 x len_k(=len_q), one is masking
    return pad_attn_mask.expand(batch_size, len_q, len_k)  # batch_size x len_q x len_k

In [36]:
# get_attn_pad_mask関数の動作確認
# get_attn_pad_mask(torch.tensor([batch[0][0]]), torch.tensor([batch[0][0]]))

seq_q = torch.tensor([[1, 2, 3, 0], [4, 5, 0, 0]])  # batch_size x len_q
seq_k = torch.tensor([[1, 2, 3, 0], [4, 5, 0, 0]])  # batch_size x len_k
pad_mask = get_attn_pad_mask(seq_q, seq_k)
print("queryの (seq_q):")
print(seq_q)
print("\nkeyの (seq_k):")
print(seq_k)
print("\n結果 (pad_mask):")
print(pad_mask)
# 結果のTrueのところは、パディングされた位置を示す

queryの (seq_q):
tensor([[1, 2, 3, 0],
        [4, 5, 0, 0]])

keyの (seq_k):
tensor([[1, 2, 3, 0],
        [4, 5, 0, 0]])

結果 (pad_mask):
tensor([[[False, False, False,  True],
         [False, False, False,  True],
         [False, False, False,  True],
         [False, False, False,  True]],

        [[False, False,  True,  True],
         [False, False,  True,  True],
         [False, False,  True,  True],
         [False, False,  True,  True]]])


In [37]:
import torch
import torch.utils.data as Data

# リスト　-> テンソル
batch = make_batch()
input_ids, segment_ids, masked_tokens, masked_pos, isNext = zip(*batch)
input_ids, segment_ids, masked_tokens, masked_pos, isNext = (
    torch.LongTensor(input_ids),
    torch.LongTensor(segment_ids),
    torch.LongTensor(masked_tokens),
    torch.LongTensor(masked_pos),
    torch.LongTensor(isNext)
)

# pytorchを使って、batch処理のため、データセットを作成
class MyDataSet(Data.Dataset):
    def __init__(self, input_ids, segment_ids, masked_tokens, masked_pos, isNext):
        self.input_ids = input_ids
        self.segment_ids = segment_ids
        self.masked_tokens = masked_tokens
        self.masked_pos = masked_pos
        self.isNext = isNext

    def __len__(self):
        return len(self.input_ids)

    def __getitem__(self, idx):
        return self.input_ids[idx], self.segment_ids[idx], self.masked_tokens[idx], self.masked_pos[idx], self.isNext[idx]
    
loader = Data.DataLoader(MyDataSet(input_ids, segment_ids, masked_tokens, masked_pos, isNext), batch_size, True)

# Model Architecture
![](https://humboldt-wi.github.io/blog/img/seminar/bert/bert_architecture.png)
もっと詳しいコードは[こちら](https://github.com/huggingface/transformers/blob/31d452c68b34c2567b62924ee0df40a83cbc52d5/src/transformers/models/bert/modeling_bert.py#L187)に参照してください。
BERTの全部のclassはこのサイトにあると考えられる: 
- `BertEmbeddings`: 単語、位置、およびトークンタイプの埋め込みを構築します。
- `BertSelfAttention`: 自己注意メカニズムを実装します。
- `BertSelfOutput`: デンスレイヤーを適用し、入力テンソルを加算します。
- `BertAttention`: 自己注意と出力レイヤーを組み合わせます。
- `BertIntermediate`: 活性化関数を持つデンスレイヤーを適用します。
- `BertOutput`: もう一つのデンスレイヤーを適用し、入力テンソルを加算します。
- `BertLayer`: 注意層と中間出力層を組み合わせます。
- `BertEncoder`: 複数のBertLayerインスタンスをスタックします。
- `BertPooler`: デンスレイヤーを適用し、Tanh活性化を行います。
- `BertPredictionHeadTransform`: デンスレイヤーと活性化関数を適用します。
- `BertLMPredictionHead`: マスクされた言語モデリングの予測を生成します。
- `BertOnlyMLMHead`: マスクされた言語モデリングヘッドのみを含みます。
- `BertOnlyNSPHead`: 次の文予測ヘッドのみを含みます。
- `BertPreTrainingHeads`: MLMとNSPヘッドを組み合わせます。
- `BertPreTrainedModel`: すべてのBERTモデルの基本クラスです。
- `BertForPreTrainingOutput`: 事前トレーニングの出力タイプです。
- `BertModel`: 特定のヘッドなしのメインBERTモデルです。
- `BertForPreTraining`: MLMとNSPヘッドを持つBERTモデルです。
- `BertLMHeadModel`: 言語モデリングヘッドを持つBERTモデルです。
- `BertForMaskedLM`: マスクされた言語モデリングヘッドを持つBERTモデルです。
- `BertForNextSentencePrediction`: 次の文予測のためのBERTモデルです。
- `BertForSequenceClassification`: シーケンス分類のためのBERTモデルです。
- `BertForMultipleChoice`: 複数選択タスクのためのBERTモデルです。
- `BertForTokenClassification`: トークン分類タスクのためのBERTモデルです。
- `BertForQuestionAnswering`: 質問応答タスクのためのBERTモデルです。

$$ BERT = Embedding + N \times EncoderLayer $$  

(Embedding -> (MultiHeadAttention + PoswiseFeedForwardNet) == EncoderLayer $\times$ N) == BERT

In [None]:
import torch.nn as nn


class Embedding(nn.Module):
    def __init__(self):
        super(Embedding, self).__init__()
        self.tok_embed = nn.Embedding(vocab_size, d_model)
        # transformers libraryのBERTはcos位置三角関数を使ってない
        # https://github.com/huggingface/transformers/blob/31d452c68b34c2567b62924ee0df40a83cbc52d5/src/transformers/models/bert/modeling_bert.py
        # hugging face コミュニティの説明
        # https://discuss.huggingface.co/t/why-positional-embeddings-are-implemented-as-just-simple-embeddings/585/5
        self.pos_embed = nn.Embedding(maxlen, d_model)
        self.seg_embed = nn.Embedding(n_segments, d_model) # トークンのセグメント情報をエンコード(文0 or 文1)
        self.norm = nn.LayerNorm(d_model)

    def forward(self, input_ids, segment_ids):
        seq_len = input_ids.size(1)
        # posは0から始まる, 0, 1, 2, 3, ..., seq_len-1
        pos = torch.arange(seq_len, dtype=torch.long)
        pos = pos.unsqueeze(0).expand_as(input_ids)
        embedding = self.tok_embed(input_ids) + self.pos_embed(pos) + self.seg_embed(segment_ids)
        
        return self.norm(embedding)

In [None]:
class MultiHeadAttention(nn.Module):
    def __init__(self):
        super(MultiHeadAttention, self).__init__()
        self.W_Q = nn.Linear(d_model, d_k * n_heads)
        self.W_K = nn.Linear(d_model, d_k * n_heads)
        self.W_V = nn.Linear(d_model, d_v * n_heads)

        
    def forward(self, Q, K, V, attn_mask):
        # q: [batch_size x len_q x d_model], k: [batch_size x len_k x d_model], v: [batch_size x len_k x d_model]
        residual, batch_size = Q, Q.size(0)
        # (B, S, D) -proj-> (B, S, D) -split-> (B, S, H, W) -trans-> (B, H, S, W)
        q_s = self.W_Q(Q).view(batch_size, -1, n_heads, d_k).transpose(1,2)
        k_s = self.W_K(K).view(batch_size, -1, n_heads, d_k).transpose(1,2)
        v_s = self.W_V(V).view(batch_size, -1, n_heads, d_v).transpose(1,2)
        attn_mask = attn_mask.unsqueeze(1).repeat(1, n_heads, 1, 1)

        # context: [batch_size x n_heads x len_q x d_v], attn: [batch_size x n_heads x len_q(=len_k) x len_k(=len_q)]
        context, attn = nn.functional.scaled_dot_product_attention(q_s, k_s, v_s, attn_mask)
        context = context.transpose(1, 2).contiguous().view(batch_size, -1, n_heads * d_v)
        output = nn.Linear(n_heads * d_v, d_model)(context)
        
        return nn.LayerNorm(d_model)(output + residual), attn

In [None]:
class PoswiseFeedForwardNet(nn.Module):
    def __init__(self):
        super(PoswiseFeedForwardNet, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)

    def forward(self, inputs):
        # geluは活性化関数、Reluの改良版、
        return self.fc2(nn.GELU()(self.fc1(inputs)))

In [None]:
class EncoderLayer(nn.Module):
    def __init__(self):
        super(EncoderLayer, self).__init__()
        self.enc_self_attn = MultiHeadAttention()
        self.pos_ffn = PoswiseFeedForwardNet()
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)

    def forward(self, enc_inputs, enc_self_attn_mask):
        enc_outputs, attn = self.enc_self_attn(enc_inputs, enc_inputs, enc_inputs, enc_self_attn_mask) # enc_inputs to same Q,K,V
        enc_outputs = self.layer_norm1(enc_inputs + enc_outputs) # Add & Norm
        enc_outputs = self.pos_ffn(enc_outputs) # enc_outputs: [batch_size x len_q x d_model]
        enc_outputs = self.layer_norm2(enc_outputs + ffn_outputs) # Add & Norm
        
        return enc_outputs, attn

In [None]:
class BERT(nn.Module):
    def __init__(self):
        super(BERT, self).__init__()
        self.embedding = Embedding()
        self.layers = nn.ModuleList([EncoderLayer() for _ in range(n_layers)])
        self.fc = nn.Sequential(
            nn.Linear(d_model, d_model),
            nn.Dropout(0.5),
            nn.Tanh(), # BERTのpooling層はTanh
        )
        self.classifier = nn.Linear(d_model, 2)
        self.linear = nn.Linear(d_model, d_model)
        self.activ2 = nn.GELU()
        embed_weight = self.embedding.tok_embed.weight
        self.fc2 = nn.Linear(d_model, vocab_size, bias=False)
        self.fc2.weight = embed_weight

    def forward(self, input_ids, segment_ids, masked_pos):
        output = self.embedding(input_ids, segment_ids)
        enc_self_attn_mask = get_attn_pad_mask(input_ids, input_ids)
        for layer in self.layers:
            output, enc_self_attn = layer(output, enc_self_attn_mask)
        h_pooled = self.fc(output[:, 0])
        logits_clsf = self.classifier(h_pooled)

        masked_pos = masked_pos[:, :, None].expand(-1, -1, d_model)  # [batch_size, max_pred, d_model]
        h_masked = torch.gather(output, 1, masked_pos)
        h_masked = self.activ2(self.linear(h_masked))
        logits_lm = self.fc2(h_masked)
        
        return logits_lm, logits_clsf

![](../../image/BERT_arch.png)

In [None]:
model = BERT()
criteria = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

In [None]:
for epoch in range(num_epochs):
    for input_ids, segment_ids, masked_tokens, masked_pos, isNext in loader:
        logits_lm, logits_clsf = model(input_ids, segment_ids, masked_pos)
        loss_lm = criteria(logits_lm.transpose(1, 2), masked_tokens) # for masked LM
        loss_lm = (loss_lm.float()).mean()
        loss_clsf = criteria(logits_clsf, isNext) # for sentence classification
        loss = loss_lm + loss_clsf
        if (epoch + 1) % 10 == 0:
            print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

# save model
# torch.save(model.state_dict(), 'bert.model')

In [None]:
input_ids, segment_ids, masked_tokens, masked_pos, isNext = batch[1]
print(text)
print([number_dict[w] for w in input_ids if number_dict[w] != '[PAD]'])

logits_lm, logits_clsf = model(torch.LongTensor([input_ids]), torch.LongTensor([segment_ids]), torch.LongTensor([masked_pos]))
logits_lm = logits_lm.data.max(2)[1][0].data.numpy()
print('masked tokens list : ', [pos for pos in masked_tokens if pos != 0])
print('predict masked tokens list : ', [pos for pos in logits_lm if pos != 0])

logits_clsf = logits_clsf.data.max(1)[1].data.numpy()[0]
print('isNext : ', True if isNext else False)
print('predict isNext : ', True if logits_clsf else False)