# Beam Search 原理及实现

## 原理
Beam search 是 seq2seq 模型在 decoder 过程中寻找次优解的一个方法。

Beam search 的原理比较简单，就是维护一个 beam size 大小的当前最优（即概率最大） sequence。然后每次 decode 下一个 token 的时候，就有 beam_size * vocab_size 种组合情况，继续从这些组合种找出 beam_size 个概率最大的序列，依次类推。

由于每次加一个 token，序列概率都倾向于变小（序列概率是连乘的），因此可以加一个 [length normalization](https://arxiv.org/pdf/1609.08144.pdf) 来消减这种倾向。

## 实现
在实现的时候，要用`tf.while_loop`来推进 decode 的进行。在 decode 过程中会维护两个 beam_size 大小的学列，一个用于存放已经到达 eos token 的序列，即已经完成的序列，一个用于存放还没有完成的序列。

主要需要考虑以下几点：
1. Decode 停止条件：
    * 达到最大 decode 长度，最大 decode 长度是认为设定的，可以根据不同的 task 决定。
    * 已经生成的序列的最低分比当前序列的最高分还高，即找不到更好预测序列了。
2. Decode 的一个 step:
    * 当前预测序列作为输入，得到下一个预测 token 的概率分布。
    * 找到 beam size 个序列概率最大的序列。
   
下面的代码是 official 实现的 transformer 中，beam search 的实现。代码基本都是抄下来的，不过抄的过程仍然受益匪浅: )

In [1]:
import tensorflow as tf
import numpy as np
print(tf.__version__)

2.0.0-beta1


In [54]:
INF = 1. * 1e7

class _StateKeys(object):
    """State 中的 key 的定义
    """
    #TODO: 每个加一个注释.
    CUR_INDEX = "CUR_INDEX"
    ALIVE_SEQ = "ALIVE_SEQ"
    ALIVE_LOG_PROBS = "ALIVE_LOG_PROBS"
    ALIVE_CACHE = "ALIVE_CACHE"
    FINISHED_SEQ = "FINISHED_SEQ"
    FINISHED_SCORES = "FINISHED_SCORES"
    FINISHED_FLAGS = "FINISHED_FLAGS"


class SequenceBeamSearch:
    def __init__(self, symbols_to_logits_fn, vocab_size, batch_size, beam_size, alpha, max_decode_length, eos_id):
        """
        """
        self.symbols_to_logits_fn = symbols_to_logits_fn
        self.vocab_size = vocab_size
        self.batch_size = batch_size
        self.beam_size = beam_size
        self.alpha = alpha
        self.max_decode_length = max_decode_length
        self.eos_id = eos_id
        
    def search(self, initial_ids, initial_cache):
        state, state_shapes = self._create_initial_state(initial_ids, initial_cache)
        
        finished_state = tf.while_loop(
            self._continue_search, self._search_step, loop_vars=[state],
            shape_invariants=[state_shapes], parallel_iterations=1, back_prop=False)
        finished_state = finished_state[0]

        alive_seq = finished_state[_StateKeys.ALIVE_SEQ]
        alive_log_probs = finished_state[_StateKeys.ALIVE_LOG_PROBS]
        finished_seq = finished_state[_StateKeys.FINISHED_SEQ]
        finished_scores = finished_state[_StateKeys.FINISHED_SCORES]
        finished_flags = finished_state[_StateKeys.FINISHED_FLAGS]
        
        # 由于有可能 finished_seq 里一个序列都没有，即没有任何一个序列走到了 eos token，这时候需要把
        # alive_seq 作为 backup.
        finished_cond = tf.reduce_any(finished_flags, 1, name="finished_cond")
        seq_cond = _expand_to_same_rank(finished_cond, finished_seq)
        score_cond = _expand_to_same_rank(finished_cond, finished_scores)
        finished_seq = tf.where(seq_cond, finished_seq, alive_seq)
        finished_scores = tf.where(score_cond, finished_scores, alive_log_probs)
        return finished_seq, finished_scores
        
    def _create_initial_state(self, initial_ids, initial_cache):
        """inital_ids: 预测时的初始 id (一般设为0)，维度为 (batch_size, )
        如果 batch_size 为 3， 则 initial_ids 为 [0, 0, 0]
        """
        # 当前 decode 到哪个位置，初始为 0
        cur_index = tf.constant(0)
        
        # 还没有 decode 完成的 sequence， 即没有decode 到 eos token.
        alive_seq = _expand_to_beam_size(initial_ids, self.beam_size)
        # (batch_size, beam_size, 1)
        alive_seq = tf.expand_dims(alive_seq, axis=2)
        
        # alive_log_probs 保存每个 batch 每个 beam 下的 sequence 的 log probability。
        # 初始化 sequence 的概率为1，即 log prob 为 0.
        # 维度为 (batch_size, beam_size)
        # 例如，当 batch size 为3， beam size 为4时，alive_log_probs 初始化为:
        # [[  0. -inf -inf -inf]
        #  [  0. -inf -inf -inf]
        #  [  0. -inf -inf -inf]]
        initial_log_probs = tf.constant([[0.] + [-float("inf")] * (self.beam_size - 1)])
        alive_log_probs = tf.tile(initial_log_probs, [self.batch_size, 1])
        
        # 将 cache 中保存的每一个变量都加一维 beam_size 维，使得不同 beam 下 cache 的变量不一样。
        alive_cache = tf.nest.map_structure(lambda t: _expand_to_beam_size(t, self.beam_size), initial_cache)
        
        # 初始化用户保存已经预测完成的 sequence 的变量。
        finished_seq = tf.zeros(tf.shape(alive_seq), tf.int32)
        # 初始化用户保存已经预测完成的 sequence 的 log probability.
        finished_scores = tf.ones([self.batch_size, self.beam_size]) * -INF
        # 初始化用户保存 sequence 是否已经预测完成的变量。
        finished_flags = tf.zeros([self.batch_size, self.beam_size], tf.bool)
        
        # 初始化 state，这个 state 命名是根据 tf.while_loop 来的（类比 rnn 中的初始化 state）。
        state = {
            _StateKeys.CUR_INDEX: cur_index,
            _StateKeys.ALIVE_SEQ: alive_seq,
            _StateKeys.ALIVE_LOG_PROBS: alive_log_probs,
            _StateKeys.ALIVE_CACHE: alive_cache,
            _StateKeys.FINISHED_SEQ: finished_seq,
            _StateKeys.FINISHED_SCORES: finished_scores,
            _StateKeys.FINISHED_FLAGS: finished_flags
        }
        
        # 在 tf.while_loop 为了保证正确性，每个 loop 都会检查 state 中变量的 shape 是不是和 shape_invariants 设置的 shape 保持一致。
        # 如果不一致，就会报错。因此，如果 state 中的变量在 loop 的时候 shape 会变，则需要把它设置的 general 一点，比如 None。
        # 另外，如果 dimension 的值会根据 state 的输入不同而不同，不能提前确定，也要设置成 None，比如 batch size.
        state_shape_invariants = {
            _StateKeys.CUR_INDEX: tf.TensorShape([]),
            _StateKeys.ALIVE_SEQ: tf.TensorShape([None, self.beam_size, None]),
            _StateKeys.ALIVE_LOG_PROBS: tf.TensorShape([None, self.beam_size]),
            _StateKeys.ALIVE_CACHE: tf.nest.map_structure(
                _get_shape_keep_last_dim, alive_cache),
            _StateKeys.FINISHED_SEQ: tf.TensorShape([None, self.beam_size, None]),
            _StateKeys.FINISHED_SCORES: tf.TensorShape([None, self.beam_size]),
            _StateKeys.FINISHED_FLAGS: tf.TensorShape([None, self.beam_size])
        }
        
        return state, state_shape_invariants
    
    def _continue_search(self, state):
        """判断 decode 是否应该停止，decode 停止条件有两个：
            1. 达到最大 decode 长度。
            2. 已经生成的序列的最低分比当前序列的最高分还高，即找不到更好预测序列了。
        """
        i = state[_StateKeys.CUR_INDEX]
        alive_log_probs = state[_StateKeys.ALIVE_LOG_PROBS]
        finished_scores = state[_StateKeys.FINISHED_SCORES]
        finished_flags = state[_StateKeys.FINISHED_FLAGS]
    
        not_at_max_decode_length = tf.less(i, self.max_decode_length)
        max_length_norm = _length_normalization(self.alpha, self.max_decode_length)
        
        # 为什么是取第0个beam，因为存的时候就是排序好的。
        best_alive_scores = alive_log_probs[:, 0] / max_length_norm
        
        finished_scores *= tf.cast(finished_flags, tf.float32)
        # 当前预测完成的序列的最低分, 维度 (batch_size, )
        lowest_finished_scores = tf.reduce_min(finished_scores, axis=1)
        
        # 如果某个batch一个已完成的序列都没有，则把分数设为一个最小值。
        finished_batches = tf.reduce_any(finished_flags, 1)
        lowest_finished_scores += (1.0 - tf.cast(finished_batches, tf.float32)) * -INF
        
        worst_finished_score_better_than_best_alive_score = tf.reduce_all(
            tf.greater(lowest_finished_scores, best_alive_scores)
        )

        return tf.logical_and(
            not_at_max_decode_length,
            tf.logical_not(worst_finished_score_better_than_best_alive_score)
        )
        
    def _search_step(self, state):
        # Step 1. 对于每一个 batch 的每一个 beam，都去 decode 下一个 token。并保留 beam_size * 2个概率最高的序列。
        # 保留 beam_size * 2 的目的是保证至少有 beam_size 个序列是还没 decode 完成的。例如假如每个 beam 都是 eos token 概率
        # 最高，多取一个可以保证能取到非 eos 的 token。
        new_seq, new_log_probs, new_cache = self._grow_alive_seq(state)

        # Step 2. 从 beam_size * 2 个概率最高的序列中，拿出 beam_size 个概率最高的，且还没有 decode 完成的序列。
        alive_state = self._get_new_alive_state(new_seq, new_log_probs, new_cache)
        
        # Step 3. 把新得到的已完成的序列与原来得到的已完成的序列拼在一起，得到新的 beam_size 个 log prob 最高的「已完成」序列。
        finished_state = self._get_new_finished_state(state, new_seq, new_log_probs)
        
        # Step 4. 更新 state.
        new_state = {_StateKeys.CUR_INDEX: state[_StateKeys.CUR_INDEX] + 1}
        new_state.update(alive_state)
        new_state.update(finished_state)
        return [new_state]
        
    def _get_new_finished_state(self, state, new_seq, new_log_probs):
        i = state[_StateKeys.CUR_INDEX]
        finished_seq = state[_StateKeys.FINISHED_SEQ]
        finished_scores = state[_StateKeys.FINISHED_SCORES]
        finished_flags = state[_StateKeys.FINISHED_FLAGS]
        
        length_norm = _length_normalization(self.alpha, i + 1)
        new_scores = new_log_probs / length_norm
        
        new_finished_flags = tf.equal(new_seq[:, :, -1], self.eos_id)
        new_scores += (1 - tf.cast(new_finished_flags, tf.float32)) * -INF
        
        finished_seq = tf.concat([finished_seq, tf.zeros([self.batch_size, self.beam_size, 1], tf.int32)], axis=2)
        
        finished_seq = tf.concat([finished_seq, new_seq], axis=1)
        finished_scores = tf.concat([finished_scores, new_scores], axis=1)
        finished_flags = tf.concat([finished_flags, new_finished_flags], axis=1)
        
        top_finished_seq, top_finished_scores, top_finished_flags = (
            _gather_topk_beams([finished_seq, finished_scores, finished_flags],
                               finished_scores, self.batch_size, self.beam_size))
        
        return {
            _StateKeys.FINISHED_SEQ: top_finished_seq,
            _StateKeys.FINISHED_SCORES: top_finished_scores,
            _StateKeys.FINISHED_FLAGS: top_finished_flags
        }
        
    def _grow_alive_seq(self, state):
        """ 对于还没有decode完成的每一个 sequence，继续decode下一个词，并保留 beam_size * 2 个序列概率最大的序列。
        Returns:
          topk_seq: 概率最大的topk个序列，shape: (batch_size, beam_size*2, i+2)
          topk_log_probs: topk个序列对应的log prob，shape: (batch_size, beam_size*2)
          new_cache: 序列对应的 attention 中的 k, v 等信息。
        """
        i = state[_StateKeys.CUR_INDEX]
        alive_seq = state[_StateKeys.ALIVE_SEQ]
        alive_log_probs = state[_StateKeys.ALIVE_LOG_PROBS]
        alive_cache = state[_StateKeys.ALIVE_CACHE]
        
        beams_to_keep = 2 * self.beam_size
        
        
        # 把 batch_size 和 beam_size 合并，以便喂到模型中。因为模型并不接受 beam_size 这一维
        flat_ids = _flatten_beam_dim(alive_seq)
        flat_cache = tf.nest.map_structure(_flatten_beam_dim, alive_cache)
        flat_logits, flat_cache = self.symbols_to_logits_fn(flat_ids, i, flat_cache)
        
        # shape: [batch_size, beam_size, vocab_size]
        logits = _unflatten_beam_dim(flat_logits, self.batch_size, self.beam_size)
        new_cache = tf.nest.map_structure(lambda t: _unflatten_beam_dim(t, self.batch_size, self.beam_size), flat_cache)
        
        # shape: [batch_size, beam_size, vocab_size] 即下一个词为词表中每个词的 log prob.
        candidate_log_probs = _log_prob_from_logits(logits)
        log_probs = candidate_log_probs + tf.expand_dims(alive_log_probs, axis=2)
        
        # 对于每个 batch，都有 beam_size * vocab_size 个 candidate 序列，我们需要从这些序列中找出 log prob 最高的 topk 个。
        flat_log_probs = tf.reshape(log_probs, [-1, self.beam_size * self.vocab_size])
        topk_log_probs, topk_indices = tf.nn.top_k(flat_log_probs, k=beams_to_keep)
        
        # shape: (batch_size, beams_to_keep)
        topk_beam_indices = topk_indices // self.vocab_size
        
        topk_seq, new_cache = _gather_beams(
            [alive_seq, new_cache], topk_beam_indices, self.batch_size,
            beams_to_keep)
        
        topk_word_ids = topk_indices % self.vocab_size
        # shape: (batch_size, beams_to_keep, 1)
        topk_word_ids = tf.expand_dims(topk_word_ids, axis=2)
        topk_seq = tf.concat([topk_seq, topk_word_ids], axis=2)
        return topk_seq, topk_log_probs, new_cache
    
    def _get_new_alive_state(self, new_seq, new_log_probs, new_cache):
        new_finished_flags = tf.equal(new_seq[:, :, -1], self.eos_id)
        new_log_probs += tf.cast(new_finished_flags, tf.float32) * -INF
        
        top_alive_seq, top_alive_log_probs, top_alive_cache = _gather_topk_beams(
            [new_seq, new_log_probs, new_cache], new_log_probs, self.batch_size, self.beam_size)
        
        return {
            _StateKeys.ALIVE_SEQ: top_alive_seq,
            _StateKeys.ALIVE_LOG_PROBS: top_alive_log_probs,
            _StateKeys.ALIVE_CACHE: top_alive_cache
        }
        
def _gather_topk_beams(nested, score_or_log_prob, batch_size, beam_size):
    _, topk_indexes = tf.nn.top_k(score_or_log_prob, k=beam_size)
    return _gather_beams(nested, topk_indexes, batch_size, beam_size)

def _expand_to_beam_size(tensor, beam_size):
    """给 tensor 添加一维 beam_size 的维度，添加到第一维。比如 tensor 是 (batch_size, ) 则输出是 (batch_size, beam_size) 
    例如: tensor = [1, 2, 3, 4], beam_size 是3，则结果为: 
    [
     [1, 1, 1]
     [2, 2, 2]
     [3, 3, 3]
     [4, 4, 4]
    ]
    """
    tensor = tf.expand_dims(tensor, axis=1)
    tile_dims = [1] * tensor.shape.ndims
    tile_dims[1] = beam_size
    
    return tf.tile(tensor, tile_dims)

def _get_shape_keep_last_dim(tensor):
    """只保留 shape 的最后一维，其它都设为 None。
    """
    shape_list = _shape_list(tensor)
    
    for i in range(len(shape_list) - 1):
        shape_list[i] = None
    
    # 这句话用在什么情况？
    if isinstance(shape_list[-1], tf.Tensor):
        shape_list[-1] = None
    
    return tf.TensorShape(shape_list)
    
def _shape_list(tensor):
    shape = tensor.get_shape().as_list()
    dynamic_shape = tf.shape(tensor)
    for i in range(len(shape)):
        if shape[i] is None:
            shape[i] = dynamic_shape[i]
    return shape
    
def _length_normalization(alpha, length):
    """长度归一化，使得 beam search 给短的 sequence 一些惩罚。
    """
    return tf.pow(((5. + tf.cast(length, tf.float32)) / 6.), alpha)

def _flatten_beam_dim(tensor):
    """ 合并 batch_size 和 beam_size 这俩维到 batch_size * beam_size 一维。
    即 (batch_size, beam_size, ...) -> (batch_size * beam_size, ...)
    """
    shape = _shape_list(tensor)
    shape[0] *= shape[1]
    shape.pop(1)
    return tf.reshape(tensor, shape)

def _unflatten_beam_dim(tensor, batch_size, beam_size):
    """ 与 flatten_beam_dim 效果相反。
    即：(batch_size * beam_size, ...) -> (batch_size, beam_size, ...)
    """
    shape = _shape_list(tensor)
    new_shape = [batch_size, beam_size] + shape[1:]
    return tf.reshape(tensor, new_shape)

def _log_prob_from_logits(logits):
    """ 计算log概率： log(exp(xi) / sigma(exp(xj)))
    """
    return logits - tf.reduce_logsumexp(logits, axis=2, keepdims=True)

def _gather_beams(nested, beam_indices, batch_size, new_beam_size):
    # 生成一个 batch_size * new_beam_size 的 tensor，每个 batch 下面都是对应的 batch 下标。
    # 例如 batch_size = 2, new_beam_size = 3, 则 batch_pos 为:
    # [[0, 0, 0],
    #  [1, ,1 ,1]]
    batch_pos = tf.range(batch_size * new_beam_size) // new_beam_size
    batch_pos = tf.reshape(batch_pos, [batch_size, new_beam_size])
    
    # 把 batch_pos 和 beam_indices 拼在一起，得到一个 (batch_size, new_beam_size, 2) 的指示下标。
    # 最后一维的每个元素都是一个 (batch下标, beam下标).
    # 这个是用于传给 tf.gather_nd 来获取对应下标的元素的。
    indices = tf.stack([batch_pos, beam_indices], axis=2)
    
    return tf.nest.map_structure(lambda state: tf.gather_nd(state, indices), nested)

def _expand_to_same_rank(tensor, target):
  if tensor.shape.rank is None:
    raise ValueError("Expect rank for tensor shape, but got None.")
  if target.shape.rank is None:
    raise ValueError("Expect rank for target shape, but got None.")

  with tf.name_scope("expand_rank"):
    diff_rank = target.shape.rank - tensor.shape.rank
    for _ in range(diff_rank):
      tensor = tf.expand_dims(tensor, -1)
    return tensor

In [55]:
batch_size = 2
beam_size = 3
vocab_size = 4
max_decode_length = 10

def symbols_to_logits_fn(ids, i, cache):
    logits = tf.ones([batch_size * beam_size, vocab_size])
    return logits, cache
    
def test_beam_search():
    initial_ids = tf.zeros([batch_size], dtype=tf.int32)
    initial_cache = {}
    searcher = SequenceBeamSearch(symbols_to_logits_fn, vocab_size, batch_size, beam_size, 0.6, max_decode_length, 99)
    seq, seq_score = searcher.search(initial_ids, initial_cache)
    print(f'seq: {seq.shape}')
    print(f'seq score: {seq_score.shape}')
    
test_beam_search()

seq: (2, 3, 11)
seq score: (2, 3)


可以看到，beam search 最终的输出结果有两个，分别是 batch size * beam size 个预测序列以及其对应的序列概率（其实是 log probability）。

序列长度可能不一样，因此0表示 padding。