# A Hierarchical Latent Vector Model for Learning Long-Term Structure in Music

# Abstract
VAE는 풍부한 의미를 담는 latent representation을 학습하는데에 좋은 모델이다. 그러나 latent code를 무시하고 생성을 하는 'posterior collapse' 문제가 존재한다. 이러한 기존 VAE의 문제점을 해결하고자 hierarchical decoder를 사용하여 모델이 latent code를 활용하도록 유도한다.


# 기존의 Recurrent VAE
<img src="https://ars.els-cdn.com/content/image/1-s2.0-S0020025519302786-gr3.jpg" width= 600 heigth=300>

## Encoder
인코더는 rnn cell로 이뤄져있다. 사후확률 $q_\lambda(z \mid x)$를 $p(z)$ 에 근사하도록 학습한다. 
인코딩의 결과물로 latent 분포를 나타내는 $\mu$ $\sigma$ 를 도출한다.

## Decoder
디코더는 rnn cell로 이뤄져있다. 인코더의 $z$를 받아서 output을 생성한다.

## 문제점
- latent code를 무시하는 경우가 있다.
- 하나의 latent vector에 전체 시퀀스 정보를 압축하기에, 짧은 시퀀스에서만 잘 작동하는 경우가 있다.
---

# MusicVAE Model
<img src="https://velog.velcdn.com/images/ann9902/post/c47ee5a6-c06d-4663-b2b0-8a3ac15eb222/image.png" width=600 heigth=300>

## Bidrectional Encoder
two-layer bidirectional LSTM을 사용한다. 양방향의 hidden state, $\vec{h}_T, \overleftarrow{h}_T$를 concat하여 최종적인 hidden state를 만들고, 이를 fullyconnected layer를 이용해 $\mu$와 $\sigma$ 를 만든다. 해당 과정의 수식은 아래와 같다. 

$\begin{aligned} & \mu=W_{h \mu} h_T+b_\mu \\ & \sigma=\log \left(\exp \left(W_{h \sigma} h_T+b_\sigma\right)+1\right)\end{aligned}$

LSTM의 hidden state size는 2048이고, latent dimension은 512를 가진다. bidirectional 인코더를 사용하기에 long-term 시퀀스를 parameterize하기에 용이하다. 

<img src="https://velog.velcdn.com/images/ann9902/post/f7f0fd9f-2ff3-4b46-bbe0-cda86e425ccc/image.png" width=600 heigth=300>

## Hierarchical Decoder
기존의 recurrent VAE는 인코더의 $Z$를 디코더의 initial state로 해서 output을 autoregressive하게 생성한다. 이는 long sequence를 잘 생성하지 못한다. 이는 latent code의 영향력이 뒤로 갈수록 손실되기 때문이라고 짐작된다. 이러한 문제를 해결하기 위해 hierarchical한 구조를 도입했다. 

생성하려고 하는 시퀀스가 u개의 subsequence로 나눠져있다고 가정하고, 
우선 z를 fully-connected layer와 tanh activation를 통과시켜 “conductor” RNN의 initial state를 얻는다. 그리고 conductor를 이용해서 u개의 임베딩(각 subsequence를 위한 것)을 만든다. 

만들어진 임베딩을 fully-connected layer와 tanh activation를 통과시켜 디코더의 마지막 RNN(이하 decoder RNN)의 initial state를 만든다. decoder RNN이 autoregressive하게 output을 생성하고 이를 softmax에 통과시켜 최종 output token을 예측한다. 이전의 output token과 현재의 conductor의 임베딩을 concat해서 현재 decoder RNN에 입력한다. 

앞서 설명한 모델도 여전히 “posterior collapse 문제가 발생할 수 있지만, 디코더의 스코프를 줄여서 latent code를 쓰도록 강제하는 것이 중요하다는 것을 발견했다. 이러한 제한은 CNN decoder에서는 receptive field를 줄여서 쉽게 가능하지만 RNN에서는 정해진 방법이 없다. 그렇기에 각 서브시퀀스의 RNN state를 conducter의 임베딩으로 initialize하여, 디코더가 long term한 정보를 얻기 위해서 latent code로부터 만들어진 conductor의 임베딩을 활용할 수밖에 없도록하는 방법을 채택했다.

<img src="https://velog.velcdn.com/images/ann9902/post/9e344f16-f875-412e-a332-9a8001218d3c/image.png" width=600 heigth=300>

## Reconstruction Quality
hierarchical한 구조를 사용하지 않을때보다 reconstruction accuracy가 높은 것을 볼 수 있다.

<img src="https://velog.velcdn.com/images/ann9902/post/03115e5d-62fa-4b15-bd4a-ce6eaf12aa5e/image.png" width=600 heigth=300>

# 구현된 코드 살펴보기 

## 처리 순서
1. midi → notesequence → tfRecord → 모델 input형태 순으로 데이터 처리를 한다. 
2. train
3. generate

각 단계에서 아래의 모듈을 사용한다.

midi → notesequence : `magenta\scripts\convert_dir_to_note_sequences.py`

tfRecord → model input  : `magenta\models\music_vae\preprocess_tfrecord.py`

train : `magenta\models\music_vae\music_vae_train.py`

generate : `magenta\models\music_vae\music_vae_generate.py`


1_midi_to_tf.sh > 2_tf_to_input.sh > 3_train.sh > 4_generate.sh 순서로 실행하면 된다.

아래에서 모듈의 주요부분을 살펴보자.


# 데이터 전처리

## midi -> notesequence(tfrecord)

scripts/convert_dir_to_note_sequences.py > def convert_midi, def convert_directory

midi_to_sequence_proto로 midi를 notesequence로 만들고, TFRecordWriter로 이를 tfrecord로 저장한다.

```python
def convert_midi(root_dir, sub_dir, full_file_path):
  """Converts a midi file to a sequence proto.

  Args:
    root_dir: A string specifying the root directory for the files being
        converted.
    sub_dir: The directory being converted currently.
    full_file_path: the full path to the file to convert.

  Returns:
    Either a NoteSequence proto or None if the file could not be converted.
  """
  try:
    sequence = midi_io.midi_to_sequence_proto( # notesequence로 만드는 부분
        tf.gfile.GFile(full_file_path, 'rb').read())
  except midi_io.MIDIConversionError as e:
    tf.logging.warning(
        'Could not parse MIDI file %s. It will be skipped. Error was: %s',
        full_file_path, e)
    return None
  sequence.collection_name = os.path.basename(root_dir)
  sequence.filename = os.path.join(sub_dir, os.path.basename(full_file_path))
  sequence.id = generate_note_sequence_id(
      sequence.filename, sequence.collection_name, 'midi') 
  tf.logging.info('Converted MIDI file %s.', full_file_path)
  return sequence
```

```python
def convert_directory(root_dir, output_file, recursive=False):
  """Converts files to NoteSequences and writes to `output_file`.

  Input files found in `root_dir` are converted to NoteSequence protos with the
  basename of `root_dir` as the collection_name, and the relative path to the
  file from `root_dir` as the filename. If `recursive` is true, recursively
  converts any subdirectories of the specified directory.

  Args:
    root_dir: A string specifying a root directory.
    output_file: Path to TFRecord file to write results to.
    recursive: A boolean specifying whether or not recursively convert files
        contained in subdirectories of the specified directory.
  """
  with tf.io.TFRecordWriter(output_file) as writer:
    convert_files(root_dir, '', writer, recursive)

```
## make model input

music_vae/preprocess_tfrecord.py 

DrumsConverter를 적용하여 notesequence를 model input으로 만든다. 

```python
class DrumsConverter(BaseNoteSequenceConverter):
  """Converter for legacy drums with either pianoroll or one-hot tensors.

  Inputs/outputs are either a "pianoroll"-like encoding of all possible drum
  hits at a given step, or a one-hot encoding of the pianoroll.

  The "roll" input encoding includes a final NOR bit (after the optional end
  token).
```

# 모델 구조

```python
CONFIG_MAP['hierdec-drum_4bar'] = Config(
    model=MusicVAE(
        lstm_models.BidirectionalLstmEncoder(),
        lstm_models.HierarchicalLstmDecoder(
            lstm_models.CategoricalLstmDecoder(),
            level_lengths=[8, 8],
            disable_autoregression=True)),
    hparams=merge_hparams(
        lstm_models.get_default_hparams(),
        HParams(
            batch_size=64,
            max_seq_len=64,  # 16*4
            z_size=512,
            enc_rnn_size=[2048, 2048],
            dec_rnn_size=[1024, 1024],
            free_bits=256,
            max_beta=0.2,
        )),
    note_sequence_augmenter=None,
    data_converter=data.DrumsConverter(
        max_bars=100,  # Truncate long drum sequences before slicing.
        slice_bars=4,  # 4 마디
        steps_per_quarter=4,
        roll_input=True),
    train_examples_path=None,
    eval_examples_path=None,
)
```

MusicVAE안에 인코더로 BidirectionalLstmEncoder, 디코더로 HierarchicalLstmDecoder, CategoricalLstmDecoder가 들어있다.

논문에서 conductor RNN가 HierarchicalLstmDecoder에 해당하는 것이고, decoder RNN이 CategoricalLstmDecoder이다.

max_seq_length인자를 64로 설정하면 16*4, 4마디를 생성할 수 있다. 

## 인코더
### Bidirectional Encoder

lstm_utils.py > def build_bidirectional_lstm

양방향 lstm을 만든다.

```python
def build_bidirectional_lstm(
    layer_sizes, dropout_keep_prob, residual, is_training):
  """Build the Tensorflow graph for a bidirectional LSTM."""

  cells_fw = []
  cells_bw = []
  for layer_size in layer_sizes:
    cells_fw.append(
        rnn_cell([layer_size], dropout_keep_prob, residual, is_training))
    cells_bw.append(
        rnn_cell([layer_size], dropout_keep_prob, residual, is_training))

  return cells_fw, cells_bw
```
---

lstm_models.py > BidirectionalLstmEncoder클래스 > def encode

양방향의 RNN 결과물을 concat하고 있다.

```python
  def encode(self, sequence, sequence_length):
    cells_fw, cells_bw = self._cells

    _, states_fw, states_bw = contrib_rnn.stack_bidirectional_dynamic_rnn(
        cells_fw,
        cells_bw,
        sequence,
        sequence_length=sequence_length,
        time_major=False,
        dtype=tf.float32,
        scope=self._name_or_scope)
    # Note we access the outputs (h) from the states since the backward
    # ouputs are reversed to the input order in the returned outputs.
    last_h_fw = states_fw[-1][-1].h
    last_h_bw = states_bw[-1][-1].h

    return tf.concat([last_h_fw, last_h_bw], 1)
```
---

base_models.py > class MusicVAE > def encode

앞서 만든 양방향 rnn 결과물로 $\mu$, $\sigma$ 를 만들고 최종적으로 latent code를 반환한다.

```python
  def encode(self, sequence, sequence_length, control_sequence=None):
    """Encodes input sequences into a MultivariateNormalDiag distribution.

    Args:
      sequence: A Tensor with shape `[num_sequences, max_length, input_depth]`
          containing the sequences to encode.
      sequence_length: The length of each sequence in the `sequence` Tensor.
      control_sequence: (Optional) A Tensor with shape
          `[num_sequences, max_length, control_depth]` containing control
          sequences on which to condition. These will be concatenated depthwise
          to the input sequences.

    Returns:
      A tfp.distributions.MultivariateNormalDiag representing the posterior
      distribution for each sequence.
    """
    hparams = self.hparams
    z_size = hparams.z_size

    sequence = tf.to_float(sequence)
    if control_sequence is not None:
      control_sequence = tf.to_float(control_sequence)
      sequence = tf.concat([sequence, control_sequence], axis=-1)
    encoder_output = self.encoder.encode(sequence, sequence_length) # 앞서 설명한 concat한 결과물

    mu = tf.layers.dense(
        encoder_output,
        z_size,
        name='encoder/mu',
        kernel_initializer=tf.random_normal_initializer(stddev=0.001))
    sigma = tf.layers.dense(
        encoder_output,
        z_size,
        activation=tf.nn.softplus,
        name='encoder/sigma',
        kernel_initializer=tf.random_normal_initializer(stddev=0.001))

    return ds.MultivariateNormalDiag(loc=mu, scale_diag=sigma) # latent code
``` 

# Hierarchical decoder

## conductor RNN

lstm_models.py > class HierarchicalLstmDecoder > def _hierarchical_decode

conductor RNN에서 인코더에서 넘어온 z를 받아 임베딩을 만든다.

```python
  def _hierarchical_decode(self, z, base_decode_fn):
    """Depth first decoding from `z`, passing final embeddings to base fn."""
    batch_size = z.shape[0]
    # Subtract 1 for the core decoder level.
    num_levels = len(self._level_lengths) - 1

    hparams = self.hparams
    batch_size = hparams.batch_size

    def recursive_decode(initial_input, path=None):
      """Recursive hierarchical decode function."""
      path = path or []
      level = len(path)

      if level == num_levels:
        with tf.variable_scope('core_decoder', reuse=tf.AUTO_REUSE):
          return base_decode_fn(initial_input, path)

      scope = tf.VariableScope(
          tf.AUTO_REUSE, 'decoder/hierarchical_level_%d' % level)
      num_steps = self._level_lengths[level]
      with tf.variable_scope(scope):
        state = lstm_utils.initial_cell_state_from_embedding( 
            self._hier_cells[level], initial_input, name='initial_state')
      if level not in self._disable_autoregression:
        # The initial input should be the same size as the tensors returned by
        # next level.
        if self._hierarchical_encoder:
          input_size = self._hierarchical_encoder.level(0).output_depth
        elif level == num_levels - 1:
          input_size = sum(tf.nest.flatten(self._core_decoder.state_size))
        else:
          input_size = sum(
              tf.nest.flatten(self._hier_cells[level + 1].state_size))
        next_input = tf.zeros([batch_size, input_size])
      lower_level_embeddings = []
      for i in range(num_steps):
        if level in self._disable_autoregression:
          next_input = tf.zeros([batch_size, 1])
        else:
          next_input = tf.concat([next_input, initial_input], axis=1)
        with tf.variable_scope(scope):
          output, state = self._hier_cells[level](next_input, state, scope)
        next_input = recursive_decode(output, path + [i])
        lower_level_embeddings.append(next_input) 
      if self._hierarchical_encoder:
        # Return the encoding of the outputs using the appropriate level of the
        # hierarchical encoder.
        enc_level = num_levels - level
        return self._hierarchical_encoder.level(enc_level).encode(
            sequence=tf.stack(lower_level_embeddings, axis=1),
            sequence_length=tf.fill([batch_size], num_steps))
      else:
        # Return the final state.
        return tf.concat(tf.nest.flatten(state), axis=-1)

    return recursive_decode(z)
```
---

## decoder RNN

lstm_models.CategoricalLstmDecoder()에 해당한다. 

lstm_models.py > class BaseLstmDecoder > def _decode

conductor의 임베딩을 받아 decoder RNN의 initial state를 만든다.

```python
  def _decode(self, z, helper, input_shape, max_length=None):
    """Decodes the given batch of latent vectors vectors, which may be 0-length.

    Args:
      z: Batch of latent vectors, sized `[batch_size, z_size]`, where `z_size`
        may be 0 for unconditioned decoding.
      helper: A seq2seq.Helper to use.
      input_shape: The shape of each model input vector passed to the decoder.
      max_length: (Optional) The maximum iterations to decode.

    Returns:
      results: The LstmDecodeResults.
    """
    initial_state = lstm_utils.initial_cell_state_from_embedding( # 임베딩으로부터 inintial state를 만들고 있다.
        self._dec_cell, z, name='decoder/z_to_initial_state')

    decoder = lstm_utils.Seq2SeqLstmDecoder(
        self._dec_cell,
        helper,
        initial_state=initial_state,
        input_shape=input_shape,
        output_layer=self._output_layer)
    final_output, final_state, final_lengths = contrib_seq2seq.dynamic_decode(
        decoder,
        maximum_iterations=max_length,
        swap_memory=True,
        scope='decoder')
    results = lstm_utils.LstmDecodeResults(
        rnn_input=final_output.rnn_input[:, :, :self._output_depth],
        rnn_output=final_output.rnn_output,
        samples=final_output.sample_id,
        final_state=final_state,
        final_sequence_lengths=final_lengths)

    return results
```