# Beam Search

In [6]:
# Beam Search 디코딩 구현
import torch 
import torch.nn.functional as F 


# 디코더를 이용해 beam search로 토큰 시퀀스를 생성하는 함수
def beam_search(decoder, hidden, context, beam_width=3, max_langth=10):
    sequences = [[[], 1.0, hidden]]                                     # [토큰 시퀀스, 누적점수, hidden] 초기 빔 (빈 시퀀스)
    
    for _ in range(max_langth):                                         # 최대 생성 길이 만큼 반복
        all_candidates = []                                             # 이번 스텝에서 확장된 후보들을 모을 리스트
        
        for seq, score, hidden, in sequences:                           # 현재 빔에 있는 시퀀스들을 하나씩 확장
            decoder_input = torch.tensor([seq[-1] if seq else 0])       # 직전 토큰이 있으면 그 토큰, 없으면 시작토큰(0) 사용
            output, hidden = decoder(decoder_input, hidden, context)    # 1-step 디코딩 -> output(logits), hidden(다음 hidden)
            top_probs , top_indices = torch.topk(
                F.softmax(output, dim=1), beam_width    
            )   # 확률 기준 상위 beam_width개 토큰 선택
            
            # 선택된 상위 토큰들로 후보 시퀀스 생성
            for i in range(beam_width):
                candidate = (
                    seq + [top_indices[0][1].item()],                   # 토큰 추가(정수 ID)
                    score * top_probs[0][i].item(),                     # 누적 점수 갱신(확률 곱)
                    hidden                                              # 다음 Step에 사용할 hidden(현재 디코딩한 결과)
                )
                all_candidates.append(candidate)                        # 후보 리스트 저장

        # 점수를 기준으로 오름차순 정렬(상위 beam_width만큼)        
        sequences = sorted(all_candidates, key=lambda x: x[1], reverse=True)[:beam_width]    
        
    return sequences[0][0]  # 최종 

In [7]:
# <eos>를 만나면 확장을 멈추고, 로그확률 합(score) 기준으로 상위 beam만 유지하는 beam search 함수
def beam_search_eos(decoder, hidden, context, beam_width = 3, max_length = 10, sos_id = 10, eos_id =2):
    sequences = [([sos_id], 0.0, hidden)]   # (시퀀스, 누적log점수, hidden) 초기 빔(<sos>부터 시작)

    for _ in range(max_length):     # 최대 길이 만큼 디코딩 반복
        all_candidates = []         # 이번 스텝에서 확장된 후보들을 모을 리스트

        for seq, score, h in sequences:     # 현재 유지중인 beam 들을 하나씩 화장
            if seq[-1] == eos_id :          # 이미 <eos>로 끝난 시퀀스면
                all_candidates.append((seq, score, h))  # 그대로 후보로 유지 (더 이상 확장 안함)
                continue
        
            decoder_input = torch.tensor([seq[-1]])                 # 디코더 입력은 직전 토큰 (없으면 <sos>토큰 )
            output, h2 = decoder(decoder_input, h, context)         # 1-step 디코딩 수행 -> output:로짓, h2 : 다음 hidden

            log_probs = F.log_softmax(output, dim = 1)              # 로짓 -> log확률로 변환 (언더플로우 방지)
            top_logp, top_idx = torch.topk(log_probs, beam_width)   # log 확률 상위 beam_width 개의 토큰 선택

            # 상위 토큰들로 후보 시퀀스 생성
            for i in range(beam_width):              
                tok = top_idx[0, i].item()                          # 선택된 토큰 ID
                cand = (    
                    seq + [tok],                                     # 토큰을 시퀀스에 추가     
                    score + top_logp[0, i].item(),                   # 누적 점수는 log확률을 "합"으로 누적  
                    h2                                               # 다음 step에 사용할 hidden
                )   # (새 시퀀스, 새 점수, 새 hidden) 구성
                all_candidates.append(cand)                          # 후보 리스트에 저장

        # 점수를 기준으로 내림차순 정렬 (상위 beam_width만큼)
        sequences = sorted(all_candidates, key = lambda x : x[1], reverse = True)[:beam_width] 
        if all(seq[-1] == eos_id for seq, _, _ in sequences):   # 유지 중인 beam이 전부 <eos>로 끝나면
            break                                               # 더 생성할 필요없이 종료

    best_seq = sequences[0][0]  
    return best_seq

In [8]:
# <eos>를 만나면 확장을 멈추고, 매 스텝에서 가장 확률이 높은 토큰 1만 선택하는 greedy decode
def greedy_decode(decoder, hidden, context, max_length=10, sos_id=0, eos_id=2):
    seq = [sos_id]  # 생성 시퀀스는 <sos> 토큰으로 시작
    h = hidden      # LSTM hidden(h, c)
    
    for _ in range(max_length):
        decode_input = torch.tensor([seq[-1]])              # 직전 토큰을 다음 입력으로 사용
        output, h = decoder(decode_input, h, context)       # 1-step 디코딩 -> (B, V), 다음 hidden
        
        log_probs = F.log_softmax(output, dim = 1)              # 로짓 -> log확률로 변환 (언더플로우 방지)
        next_tok = torch.argmax(log_probs, dim=1).item()        # 가장 큰 확률 토큰 선택
        
        seq.append(next_tok)                                     
        if next_tok == eos_id:                                  # <eos> 토큰이면 종료
            break
        
    return seq

In [9]:
id2word = {0:"<sos>", 1:"I", 2:"<eos>", 3:"like", 4:"bus", 5:'.'}

# 토큰 ID 리스트를 사람이 읽을 수 없는 단어 문자열로 변환하는 함수
def decode_idx(idx, id2word):
    return " ".join(id2word.get(i, "<unk>") for i in idx)   # 매핑 없으면 <unk>로 대체 후 공백으로 연결


In [10]:
decode_idx([0, 1, 3, 4, 5, 2], id2word)

'<sos> I like bus . <eos>'

In [11]:
import torch.nn as nn

# 이전 stop의 context(어텐션 결과)를 다음 step 입력에 붙여 LSTM에 넣는 Input Feeding 디코더
class InputFeedingDecoder(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(InputFeedingDecoder, self).__init__()
        self.hidden_size = hidden_size                              # 디코더 은닉 차원 저장
        self.embedding = nn.Embedding(output_size, input_size)      # 출력 토큰 ID -> 입력 임베딩(input_size)
        self.lstm = nn.LSTM(input_size + hidden_size, hidden_size)  # (임베딩 + context) -> hidden_size LSTM
        self.fc = nn.Linear(hidden_size, output_size)               # hidden -> 어휘 크기(output_size) 로짓
        
    # 입력 토큰/이전 hidden/이전 context를 받아 다음 토큰 로짓과 hidden을 반환
    def forward(self, input, hidden, context):
        embedded = self.embedding(input).unsqueeze(0)   # (B,) -> (1, B, input_size)
        lstm_input = torch.cat((embedded, context.unsqueeze(0)), dim=2) # 임베딩과 context를 feature 차원으로 결합
        output, hidden = self.lstm(lstm_input, hidden)  # LSTM 1-stop 수행 -> output(1,B,H), hidden((1,B,H),(1,B,H))
        output = self.fc(output.squeeze(0))             # (1,B,H) -> (B,H) -> (B,output_size)로 변환
        return output,hidden                            # 로짓과 다음 hidden값 반환

In [34]:
# Input Feeding Decoder 더미 입력으로 1-step 실행 확인
decoder = InputFeedingDecoder(input_size = 10, hidden_size = 20, output_size = 30)  # 디코더 생성(임베딩: 10, hidden: 20, vocab:30)
hidden = (torch.zeros(1, 1, 20), torch.zeros(1, 1, 20))     # 초기 hidden state(h, c) : (num_layers, B, H)
context = torch.zeros(1, 20)                                # 이전 context 벡터 (B, H)

g = greedy_decode(decoder, hidden, context, max_length=10, sos_id=0, eos_id=2)
b = beam_search_eos(decoder, hidden, context, beam_width=3, max_length=10, sos_id=0, eos_id=2)

print('greedy idx :', g)
print('beam idx :', b)

greedy idx : [0, 4, 10, 4, 9, 10, 27, 4, 10, 4, 9]
beam idx : [0, 13, 9, 9, 10, 10, 4, 9, 9, 10, 27]


- Greedy VS beam Search
    - Greedy 
        - 매 스텝마다 확률이 가장 높은것 1개를 고르는 방식
        - 그 순간 1등이 선택된다.
        - 빠르다. (1개 경로만 찾기 때문에)
    - Beam Search
        - 매 스텝마다 상위 후보 k개(Beam Width)를 유지하면서 여러 경로를 동시에 확장
        - 전역적으로 더 점수가 큰 문장을 찾을 가능성이 높다.
        - 상대적으로 느리다.(더 좋은 시퀀스를 찾으려고 하기 때문에)
        - 성능이 상대적으로 좋다.