# Tacotron and Tacotron2 

模型结构上，tacotron2与tacotron基本类似，除了CBHG结构和Griffin结构上略有区别

### Tacotron

![Screen Shot 2021-11-03 at 7.56.41 PM.png](attachment:72746c31-215c-4ffb-b0d8-de177de03ddb.png)

### Tacotron2

![20200707165804125.png](attachment:468e8246-b1ae-4189-b42b-b6f07b361243.png)


![20200722181543278.png](attachment:40e15c19-6fa9-4c2a-87e0-8558201c483f.png)

### 重点区别：

1. encoder部分，使用了embedding + 3Conv_layers + Bi-directional_LSTM

2. attention使用的是location sensitive attention

3. Pre_net的dropout是一直设置为 true 的（据说是根据实验效果），post_net中最后一层一般为linear激活函数

4. 最后的 mel_output_post 是经过 post_net 和 linear_projection相加得到

5. 多帧预测方面，虽然Tacotron2没有使用多帧，但是实现原理类似

In [None]:
from .tacotron import Tacotron

def create_model(name, hparams):
    if name == 'tacotron':
        return Tacotron(hparams)
    else:
        raise Exception('Unknown model: ' + name)

## Model (Spectrogram Prediction Network 声谱预测网络）

![Screen Shot 2021-10-26 at 11.30.40 AM.png](attachment:c949b888-ced0-43cc-93b1-ac197d9d5b98.png)

也可以表示为：

![1611228354100-9fe31d3c-5046-4c93-b4da-06f0fec9e361.png](attachment:eb54aab4-1806-4f3c-ba27-b54c9ff20b65.png)

通常我们将RNN的输入称为'context(上下文)'，我们希望通过encoder来产生此上下文的向量。

- encoder输入一个序列或句子，然后将其压缩到一个固定长度的向量(向量也可以理解为一种形式的序列)中

- decoder使用固定长度的向量，利用attention关注向量的不同部分，将其解压成一个序列。

### 1. Character Embedding  

将文本转为one-hot向量，embedding之后的shape:[N, T, 256], 256为word_dim

In [None]:
from text.symbols import symbols

class Tacotron():
    def __init__(self, hparams):
        self._hparams = hparams

    def initialize(self, inputs, input_lengths, mel_targets=None, linear_targets=None):
        '''
        Initializes the model for inference.
        Sets "mel_outputs", "linear_outputs", and "alignments" fields.
        Args:
          inputs: int32 Tensor with shape [N, T_in] where N is batch size, T_in is number of steps in the input time series, and values are character IDs
          input_lengths: int32 Tensor with shape [N] where N is batch size and values are the lengths of each sequence in inputs.
          mel_targets: float32 Tensor with shape [N, T_out, M] where N is batch size, T_out is number of steps in the output time series, M is num_mels, 
                       and values are entries in the mel spectrogram. Only needed for training.
          linear_targets: float32 Tensor with shape [N, T_out, F] where N is batch_size, T_out is number of steps in the output time series, F is num_freq, 
                       and values are entries in the linear spectrogram. Only needed for training.
        '''
        with tf.variable_scope('inference') as scope:
            is_training = linear_targets is not None
            batch_size = tf.shape(inputs)[0]
            hp = self._hparams

        # Embeddings
        embedding_table = tf.get_variable('embedding', [len(symbols), hp.embed_depth], dtype=tf.float32,
        initializer=tf.truncated_normal_initializer(stddev=0.5))
        embedded_inputs = tf.nn.embedding_lookup(embedding_table, inputs)   # [N, T_in, embed_depth=256]

### 2. Pre-net

pre-net是一个3层的网络结构，其主要功能是对输入进行一系列的非线性的变换，这样有助于模型收敛和泛化。

它有两个隐藏层，层与层之间的连接均是全连接；第一层的隐藏单元数目与输入单元数目一致[N,T,256]->[N,T,256]，第二层的隐藏单元数目为第一层的一半[N,T,256]->[N,T,128]；两个隐藏层采用的激活函数均为ReLu，并保持0.5的dropout来提高泛化能力。

基于tensorflow实现prenet的代码：

In [None]:
# Encoder
#hp.prenet_depths=[256, 128]
prenet_outputs = prenet(embedded_inputs, is_training, hp.prenet_depths)    # [N, T_in, prenet_depths[-1]=128]

def prenet(inputs, is_training, layer_sizes, scope=None):
    '''Args:
    inputs:输入的tensor,[N,T,256],其中N：batchsize T:输入文本长度
    is_training:布尔值
    layer_size:prenet一共有两层隐藏层,第一层为256个hidden units,第二层为128个hidden units
    '''
    x = inputs
    drop_rate = 0.5 if is_training else 0.0
    with tf.variable_scope(scope or 'prenet'):
        for i, size in enumerate(layer_sizes):
        dense = tf.layers.dense(x, units=size, activation=tf.nn.relu, name='dense_%d' % (i+1))
        x = tf.layers.dropout(dense, rate=drop_rate, training=is_training, name='dropout_%d' % (i+1))
    return x

### 3. Encoder_cbhg

encoder_cbhg是输入端的cbhg,包括convolution bank, hignway network, BiGRU. CBHG最初源于机器翻译，主要用于提高模型的泛化能力。结构如下：

![Screen Shot 2021-10-26 at 11.30.40 AM.png](attachment:9d9e331c-beed-429d-98cc-d7c76800cdbf.png)

#### 3.1 Convolution Bank

这部分对于特征提取的思路是用 height=1, width=word_dim 的 filter 过滤得到一个 input_embedding 的特征值，然后相邻两个 word 一起过滤得到局部窗口为2的上下文特征值，即 2-gram,一直提取到 16-gram. 128个 filter 用于关注不同的特征值。

- 是 n-gram 的思想

- encoder_cbhg 使用的 kernel_size 为 [1,2,...,16] filter_size=128

- 每个 conv1d 结果按照 axis=-1 拼接起来后的 shape 为 [N, T, 128 \* 16]

#### 3.2 Maxpool1d

maxpool相当于特征筛选，挑选出相邻两个时间步更加突出的特征(更大的值),shape 为 [N, T, 128 \* 16]

#### 3.3 projection

对于encoder_cbhg projections = [128,input_channels] 由于 encoder_cbhg 的输入为 prenet [N, T,128]而 input_channels = inputs.get_shape()[2] 故而 encoder_cbhg projections = [128,128]

#### 3.4 residual connection

残差网络思想 [N, T, 128]

#### 3.5 HighWay Net

把输入同时放入到两个一层的全连接网络中，这两个网络的激活函数分别采用了ReLu和sigmoid函数，假定输入为input，ReLu的输出为output1，sigmoid的输出为output2，那么highway layer的输出为：output=output1∗output2+input∗（1−output2).

#### 3.6 BiGRU

双向拼接，所以shape=[N, T, 256]

In [None]:
def highwaynet(inputs, scope, depth):
    with tf.variable_scope(scope):
        H = tf.layers.dense(inputs, units=depth, activation=tf.nn.relu, name='H')
        T = tf.layers.dense(inputs, units=depth, activation=tf.nn.sigmoid, name='T', bias_initializer=tf.constant_initializer(-1.0))
        return H * T + inputs * (1.0 - T)

In [None]:
encoder_outputs = encoder_cbhg(prenet_outputs, input_lengths, is_training, hp.encoder_depth)  # encoder_outputs=[N, T, encoder_depth=256]

def encoder_cbhg(inputs, input_lengths, is_training, depth):
    input_channels = inputs.get_shape()[2]
    return cbhg(inputs, input_lengths, is_training, scope='encoder_cbhg', K=16, projections=[128, input_channels], depth=depth)

def cbhg(inputs, input_lengths, is_training, scope, K, projections, depth):
    '''
    Args:
        inputs:输入的tensor(也是pre-net的输出) [N, T, 128]
        input_lengths:输入的文本的长度,形状为[N],代表一个batch中每一条文本的长度组成的向量
        is_training:布尔值
        K:int,代表第一层卷积里卷积核的个数
        projections:代表第二个卷积层中每次卷积之后的输出长度
    '''
    with tf.variable_scope(scope):
        with tf.variable_scope('conv_bank'):
        # Convolution bank: concatenate on the last axis to stack channels from all convolutions
        conv_outputs = tf.concat([conv1d(inputs, k, 128, tf.nn.relu, is_training, 'conv1d_%d' % k) for k in range(1, K+1)], axis=-1) #K=16

    # Maxpooling:
    maxpool_output = tf.layers.max_pooling1d(conv_outputs, pool_size=2, strides=1, padding='same') #[N,T,128*16]

    # Two projection layers:
    proj1_output = conv1d(maxpool_output, 3, projections[0], tf.nn.relu, is_training, 'proj_1')
    proj2_output = conv1d(proj1_output, 3, projections[1], None, is_training, 'proj_2') # proj2 没有采用激活函数

    # Residual connection:
    highway_input = proj2_output + inputs

    half_depth = depth // 2
    assert half_depth*2 == depth, 'encoder and postnet depths must be even.'

    # Handle dimensionality mismatch:
    if highway_input.shape[2] != half_depth:
        highway_input = tf.layers.dense(highway_input, half_depth)

    # 4-layer HighwayNet:
    for i in range(4):
        highway_input = highwaynet(highway_input, 'highway_%d' % (i+1), half_depth)
    rnn_input = highway_input

    # Bidirectional RNN
    outputs, states = tf.nn.bidirectional_dynamic_rnn(GRUCell(half_depth), GRUCell(half_depth), rnn_input, sequence_length=input_lengths, dtype=tf.float32)
    return tf.concat(outputs, axis=2)  # Concat forward and backward

In [None]:
def conv1d(inputs, kernel_size, channels, activation, is_training, scope):
    with tf.variable_scope(scope):
        conv1d_output = tf.layers.conv1d(inputs, filters=channels, kernel_size=kernel_size, activation=activation, padding='same')
        return tf.layers.batch_normalization(conv1d_output, training=is_training)

### 4. Decoder

Tacotron2的Decoder部分是由以下五部分构成：

1. Prenet ： 2层Fc全连接

2. LocationSensitiveAttention

3. LSTM：两层ZoneOutLSTM组成

4. FrameProjection: 1层Dense , no activation, 为了调整输出维度

5. StopProjection : 1层Dense, activation = sigmoid

In [None]:
attention_cell = AttentionWrapper(GRUCell(hp.attention_depth),BahdanauAttention(hp.attention_depth, encoder_outputs),
                                  alignment_history=True, output_attention=False)                   # [N, T_in, attention_depth=256]

#Apply prenet before concatenation in AttentionWrapper.
attention_cell = DecoderPrenetWrapper(attention_cell, is_training, hp.prenet_depths)

# Concatenate attention context vector and RNN cell output into a 2*attention_depth=512D vector.
concat_cell = ConcatOutputAndAttentionWrapper(attention_cell)              # [N, T_in, 2*attention_depth=512]

decoder_cell = MultiRNNCell([OutputProjectionWrapper(concat_cell, hp.decoder_depth),ResidualWrapper(GRUCell(hp.decoder_depth)),
                             ResidualWrapper(GRUCell(hp.decoder_depth))], state_is_tuple=True)    # [N, T_in, decoder_depth=256]

# Project onto r mel spectrograms (predict r outputs at each RNN step):
output_cell = OutputProjectionWrapper(decoder_cell, hp.num_mels * hp.outputs_per_step)
decoder_init_state = output_cell.zero_state(batch_size=batch_size, dtype=tf.float32)

In [None]:
class DecoderPrenetWrapper(RNNCell):
    '''Runs RNN inputs through a prenet before sending them to the cell.'''
    def __init__(self, cell, is_training, layer_sizes):
        super(DecoderPrenetWrapper, self).__init__()
        self._cell = cell
        self._is_training = is_training
        self._layer_sizes = layer_sizes

    @property
    def state_size(self):
        return self._cell.state_size

    @property
    def output_size(self):
        return self._cell.output_size

    def call(self, inputs, state):
        prenet_out = prenet(inputs, self._is_training, self._layer_sizes, scope='decoder_prenet')
        return self._cell(prenet_out, state)

    def zero_state(self, batch_size, dtype):
        return self._cell.zero_state(batch_size, dtype)

In [None]:
class ConcatOutputAndAttentionWrapper(RNNCell):
    '''Concatenates RNN cell output with the attention context vector.

      This is expected to wrap a cell wrapped with an AttentionWrapper constructed with
      attention_layer_size=None and output_attention=False. Such a cell's state will include an
      "attention" field that is the context vector.
      '''
    def __init__(self, cell):
        super(ConcatOutputAndAttentionWrapper, self).__init__()
        self._cell = cell

    @property
    def state_size(self):
        return self._cell.state_size

    @property
    def output_size(self):
        return self._cell.output_size + self._cell.state_size.attention

    def call(self, inputs, state):
        output, res_state = self._cell(inputs, state)
        return tf.concat([output, res_state.attention], axis=-1), res_state

    def zero_state(self, batch_size, dtype):
        return self._cell.zero_state(batch_size, dtype)

In [None]:
class MultiRNNCell(RNNCell):
    """RNN cell composed sequentially of multiple simple cells."""

    def __init__(self, cells, state_is_tuple=True):
        """Create a RNN cell composed sequentially of a number of RNNCells.

        Args:
            cells: list of RNNCells that will be composed in this order.
            state_is_tuple: If True, accepted and returned states are n-tuples, where `n = len(cells)`.  
            If False, the states are all concatenated along the column axis. This latter behavior will 
            soon be deprecated.

        Raises:
            ValueError: if cells is empty (not allowed), or at least one of the cells
            returns a state tuple but the flag `state_is_tuple` is `False`.
    """
    super(MultiRNNCell, self).__init__()
    if not cells:
        raise ValueError("Must specify at least one cell for MultiRNNCell.")
    if not nest.is_sequence(cells):
        raise TypeError(f"cells must be a list or tuple, but saw: {cells}.")

    self._cells = cells
    self._state_is_tuple = state_is_tuple
    if not state_is_tuple:
        if any(nest.is_sequence(c.state_size) for c in self._cells):
            raise ValueError(f"Some cells return tuples of states, but the flag "
                             "state_is_tuple is not set.  State sizes are: %s"
                             % str([c.state_size for c in self._cells]))

    @property
    def state_size(self):
        if self._state_is_tuple:
            return tuple(cell.state_size for cell in self._cells)
        else:
            return sum([cell.state_size for cell in self._cells])

    @property
    def output_size(self):
        return self._cells[-1].output_size

    def zero_state(self, batch_size, dtype):
        with ops.name_scope(type(self).__name__ + "ZeroState", values=[batch_size]):
        if self._state_is_tuple:
            return tuple(cell.zero_state(batch_size, dtype) for cell in self._cells)
        else:
            # We know here that state_size of each cell is not a tuple and
            # presumably(大概) does not contain TensorArrays or anything else fancy
            return super(MultiRNNCell, self).zero_state(batch_size, dtype)

    def call(self, inputs, state):
        """Run this multi-layer cell on inputs, starting from state."""
        cur_state_pos = 0
        cur_inp = inputs
        new_states = []
        for i, cell in enumerate(self._cells):
            with vs.variable_scope(f"cell_{i}"):
                if self._state_is_tuple:
                    if not nest.is_sequence(state):
                        raise ValueError(f"Expected state to be a tuple of length {len(self.state_size)}, but received: {state}")
                    cur_state = state[i]
                else:
                    cur_state = array_ops.slice(state, [0, cur_state_pos],[-1, cell.state_size])
                    cur_state_pos += cell.state_size
                cur_inp, new_state = cell(cur_inp, cur_state)
                new_states.append(new_state)

        new_states = (tuple(new_states) if self._state_is_tuple else array_ops.concat(new_states, 1))

        return cur_inp, new_states

In [None]:
class ResidualWrapper(RNNCell):
  """RNNCell wrapper that ensures cell inputs are added to the outputs."""

  def __init__(self, cell, residual_fn=None):
    """Constructs a `ResidualWrapper` for `cell`.

    Args:
      cell: An instance of `RNNCell`.
      residual_fn: (Optional) The function to map raw cell inputs and raw cell
        outputs to the actual cell outputs of the residual network.
        Defaults to calling nest.map_structure on (lambda i, o: i + o), inputs
        and outputs.
    """
    self._cell = cell
    self._residual_fn = residual_fn

  @property
  def state_size(self):
    return self._cell.state_size

  @property
  def output_size(self):
    return self._cell.output_size

  def zero_state(self, batch_size, dtype):
    with ops.name_scope(type(self).__name__ + "ZeroState", values=[batch_size]):
      return self._cell.zero_state(batch_size, dtype)

  def __call__(self, inputs, state, scope=None):
    """Run the cell and then apply the residual_fn on its inputs to its outputs.

    Args:
      inputs: cell inputs.
      state: cell state.
      scope: optional cell scope.

    Returns:
      Tuple of cell outputs and new state.

    Raises:
      TypeError: If cell inputs and outputs have different structure (type).
      ValueError: If cell inputs and outputs have different structure (value).
    """
    outputs, new_state = self._cell(inputs, state, scope=scope)
    # Ensure shapes match
    def assert_shape_match(inp, out):
      inp.get_shape().assert_is_compatible_with(out.get_shape())
    def default_residual_fn(inputs, outputs):
      nest.assert_same_structure(inputs, outputs)
      nest.map_structure(assert_shape_match, inputs, outputs)
      return nest.map_structure(lambda inp, out: inp + out, inputs, outputs)
    res_outputs = (self._residual_fn or default_residual_fn)(inputs, outputs)
    return (res_outputs, new_state)