<a href="https://colab.research.google.com/github/limseo12/SideProject-4/blob/main/tacotron2_pynb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install torch
!pip install torchaudio

In [None]:
import os
import torch
import torch.nn as nn
import torch.utils.data as data
import torch.optim as optim
import torch.nn.functional as F
import torchaudio
from torch.utils.data import Dataset, DataLoader
import numpy as np
import unicodedata
import re
import time
from torch.autograd import Variable
from math import sqrt

print('Import 완료')

0. Hyperparameter 설정 및 device 확인\
batch size는 큰 값을 갖는게 좋음

In [None]:
class HParams():
    def __init__(self):
        self.n_mel_channels = 80    #Mel-spectrogram의 착수 80차

        ################################
        # Model Parameters             #
        ################################
        self.symbols_embedding_dim=512    #텍스트에서 인베딩, 룩업테이블을 통해서 임베딩한다

        # Encoder parameters
        self.encoder_kernel_size=5
        self.encoder_n_convolutions=3   #커널 사이즈가 5인 컨볼루션을 3개 쌓는다
        self.encoder_embedding_dim=512    #LSTM을 512로 쌓는게 뒤쪽에 나온다

        # Decoder parameters
        self.n_frames_per_step=1  # currently only 1 is supported   ,타코트론의 경우 멜을 출력으로 낼 때 하나 이상의 멜 스펙트롤을 뱉는다,타코트론2에서는 계산속도는 빠르나 성능이 떨어질 수 있기에 1로고정
        self.decoder_rnn_dim=1024   
        self.prenet_dim=256
        self.max_decoder_steps=1000
        self.gate_threshold=0.5
        self.p_attention_dropout=0.1
        self.p_decoder_dropout=0.1

        # Attention parameters
        self.attention_rnn_dim=1024
        self.attention_dim=128

        # Location Layer parameters
        self.attention_location_n_filters=32
        self.attention_location_kernel_size=31

        # Mel-post processing network parameters
        self.postnet_embedding_dim=512
        self.postnet_kernel_size=5
        self.postnet_n_convolutions=5

        ################################
        # Optimization Hyperparameters #
        ################################
        self.use_saved_learning_rate=False
        self.learning_rate=1e-3
        self.weight_decay=1e-6
        self.grad_clip_thresh=1.0
        self.batch_size=4
        self.mask_padding=True  # set model's padded outputs to padded values
    
    
hparams = HParams()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

if torch.cuda.is_available():
  device = torch.device("cuda")
  print('GPU를 사용할 준비가 되었습니다.')
else:
  device = torch.device("cpu")
  print('CPU 모드입니다. GPU 설정으로 변경해주세요.')

1. Dataset (LJ Speech) 다운로드\
Public data : LJ-Speech, VCTK, LibriTTS, (blizzard challenge data)

In [None]:
LJSpeech_url = 'https://data.keithito.com/data/speech/LJSpeech-1.1.tar.bz2'   #오디오북을 녹음한 DB, 어떠한 모델을 학습해도 소리가 잘 나온다.VCTK는 데이터가 좀 더 작아 음질이 좋진않다,LibriTTS는 db사이즈가 너무 커서 일부만 사용하거나 작은 셋을 사용하는 걸 추천한다.
train_dataset = torchaudio.datasets.LJSPEECH("", url=LJSpeech_url, download=True)

print('Download 완료')

2. Dataset 전처리

In [None]:
vocab = " abcdefghijklmnopqrstuvwxyz'.?"  # P: Padding, E: EOS. 각각의 voca가 숫자로 변환되게 된다.
char2idx = {char: idx for idx, char in enumerate(vocab)}
idx2char = {idx: char for idx, char in enumerate(vocab)}


def text_normalize(text):   #영어 같은 경우에는 대문자 소문자가 있기에 소문자로 맞춰준다. 실제로는 이 코드 보다 좀 더 복잡하다.
    text = ''.join(char for char in unicodedata.normalize('NFD', text)
                   if unicodedata.category(char) != 'Mn')  # Strip accents

    text = text.lower()
    text = re.sub("[^{}]".format(vocab), " ", text)
    text = re.sub("[ ]+", " ", text)
    return text


class Collate():    #기존의 NVDIA 코드에서 조금 더 변형된 코드,
  def __init__(self):
    # n_mels=80은 몇 차에 멜 스펙트럼을 사용할 것인가,
    #win_length는 Short-Time Analysis를 하기위한 윈도우길이,타임축에서 씌우는 것
    #hop_length=256는 win_length를 256만큼 조금씩 이동시킨다,
    #f_min,f_max은 어느 정도의 바운더리를 설정한 것 , 값을 키우면 성능이 좋아지지만 계산할 때 안좋은 점이 있다.
    #n_fft는 fft사이즈이다,주파수 도메인에서 몇 차를 할 것인가
    self.wav_to_mel = torchaudio.transforms.MelSpectrogram(sample_rate=22050, n_mels=80, win_length=1024, hop_length=256, f_min=0.0, f_max=8000.0, n_fft=1024)

  def __call__(self, batch):
    # batch: N_batch * [wav, sample_rate, text, text_normalized]
    #소리가 안나오는 경우가 많아서 잘 확인 해보자.
    #긴 것 부터 차례대로 내려가는 경우에 계산이 효율적으로 되기에 솔팅을 한는 것을 볼 수 있다. 
    mel_list = []
    for data in batch:
      wav = data[0]
      mel_list.append(self.wav_to_mel(wav).squeeze())
    input_lengths, ids_sorted_decreasing = torch.sort(torch.LongTensor([len(data[3]) for data in batch]), dim=0, descending=True)
    mel_lengths, ids_sorted_mel = torch.sort(torch.LongTensor([mel.shape[1] for mel in mel_list]), dim=0, descending=True)

    max_input_len = input_lengths[0]
    max_target_len = mel_lengths[0]
    #제일 긴 단계에 따라서 패딩을 해주는 단계
    text_padded = torch.LongTensor(len(batch), max_input_len)
    mel_padded = torch.FloatTensor(len(batch), 80, max_target_len)
    #스탑토큰과 관련된 부분, 스탑토큰 , 멜과 관련된 것 , 텍스트 관련된 것도 3가지 다 준비한다.
    gate_padded = torch.FloatTensor(len(batch), max_target_len)
    output_lengths = torch.LongTensor(len(batch))

    text_padded.zero_()
    mel_padded.zero_()
    gate_padded.zero_()

    for i in range(len(ids_sorted_decreasing)):
        _, _, _, text = batch[ids_sorted_decreasing[i]]
        mel = mel_list[ids_sorted_decreasing[i]]
        mel = self.dynamic_range_compression(mel)
        mel_padded[i, :, :mel.size(1)] = mel
        #끝나는 지점을 1이 되게해서 확인 할 수 있도록 한다.
        gate_padded[i, mel.size(1)-1:] = 1
        #아웃풋 사이즈를 통하여 게이트 값들을 구하게 된다.
        output_lengths[i] = mel.size(1)
        text = text_normalize(text)
        text = [char2idx[char] for char in text]
        text_norm = torch.IntTensor(text)
        text_padded[i, :len(text)] = text_norm

    return text_padded, input_lengths, mel_padded, gate_padded, output_lengths

    #고주파 내용들이 너무 열화가 되서 고려가 안 될 수 있어서 그걸 살려주기 위해서 한 것.
  def dynamic_range_compression(self, x, C=1, clip_val=1e-5):
    return torch.log(torch.clamp(x, min=clip_val) * C)

collate_fn = Collate()
#파이 토키의 데이터로더를 사용하여 lj스피치를 받고 데이터를 처리하게 된다.
#barch_size같은 경우에는 LJ경우에 8만 되도 잘 되는 경우가 많은데 기본적으로 128정도면 성능이 잘 나온다 보면 된다.
#suffle은 당연히 해주고
#drop_last는 남은 것을 버릴 건지 아닐 건지 물어보는 건데 버리지 않고 다 사용하겠다는 False
train_loader = DataLoader(train_dataset, batch_size=hparams.batch_size, shuffle=True, drop_last=False, collate_fn=collate_fn)

print('Data 전처리 완료')

3. Tacotron2 모델 구현\
3-0) Basic model

In [None]:
def get_mask_from_lengths(lengths):
    max_len = torch.max(lengths).item()
    ids = torch.arange(0, max_len, out=torch.cuda.LongTensor(max_len))
    mask = (ids < lengths.unsqueeze(1)).bool()
    return mask

#텐서 x를 gpu에 올리는 과정.
def to_gpu(x):
    x = x.contiguous() #연산과정에서 메모리에 올려진 순서에 따라 발생할 수 있는 비효율성 혹은 최악의 경우 에러를 방지하기 위함
                        #예를들어 transpose의 경우 메모리에 올려진 순서가 연속성이 깨지게되는데 메모리 접근 성능이 비효율적으로됨

    if torch.cuda.is_available():
        x = x.cuda(non_blocking=True)
    return torch.autograd.Variable(x)

#리니어한 레이어 1나
class LinearNorm(torch.nn.Module):
  def __init__(self, in_dim, out_dim, bias=True, w_init_gain='linear'):
      super(LinearNorm, self).__init__()
      self.linear_layer = torch.nn.Linear(in_dim, out_dim, bias=bias)

      torch.nn.init.xavier_uniform_(
          self.linear_layer.weight,
          gain=torch.nn.init.calculate_gain(w_init_gain))

  def forward(self, x):
      return self.linear_layer(x)

#1d 컨볼루션을 한다고 생각하면 된다.
class ConvNorm(torch.nn.Module):
  def __init__(self, in_channels, out_channels, kernel_size=1, stride=1,
                padding=None, dilation=1, bias=True, w_init_gain='linear'):
      super(ConvNorm, self).__init__()
      if padding is None:
          assert(kernel_size % 2 == 1)
          padding = int(dilation * (kernel_size - 1) / 2)

      self.conv = torch.nn.Conv1d(in_channels, out_channels,
                                  kernel_size=kernel_size, stride=stride,
                                  padding=padding, dilation=dilation,
                                  bias=bias)

      torch.nn.init.xavier_uniform_(
          self.conv.weight, gain=torch.nn.init.calculate_gain(w_init_gain))

  def forward(self, signal):
      conv_signal = self.conv(signal)
      return conv_signal

3-1) Encoder

In [None]:
class Encoder(nn.Module):
  """Encoder module:
      - Three 1-d convolution banks
      - Bidirectional LSTM
  """
  def __init__(self, hparams):
      super(Encoder, self).__init__()

      convolutions = []
      for _ in range(hparams.encoder_n_convolutions):
      ##시퀀셜하게 ConvNorm 을 쌓는다.    
          conv_layer = nn.Sequential(
              ConvNorm(hparams.encoder_embedding_dim,
                        hparams.encoder_embedding_dim,
                        kernel_size=hparams.encoder_kernel_size, stride=1,
                        padding=int((hparams.encoder_kernel_size - 1) / 2),
                        dilation=1, w_init_gain='relu'),
      #아까 보았던 대로 ConvNorm을 하고 배치노멀라이제이션을 한다.
             
              nn.BatchNorm1d(hparams.encoder_embedding_dim))
          convolutions.append(conv_layer)
      self.convolutions = nn.ModuleList(convolutions)
      #LSTM 그대로 사용, 인코더에서는 양방향 모두 사용하는 것이 성능향상에 도움이 되어 bidirectional을 사용한다.
      self.lstm = nn.LSTM(hparams.encoder_embedding_dim,
                          int(hparams.encoder_embedding_dim / 2), 1,
                          batch_first=True, bidirectional=True)

  def forward(self, x, input_lengths):
      for conv in self.convolutions:
      #액티베이션을 걸고 (F.rely) 드랍아웃 하는 형태  
          x = F.dropout(F.relu(conv(x)), 0.5, self.training)

      x = x.transpose(1, 2)

      # pytorch tensor are not reversible, hence the conversion
      input_lengths = input_lengths.cpu().numpy()
      #팩 패디드 시퀀스를 하게 되는 과정.병렬 처리로 계산을 좀 더 빨리 하기 위해서 시퀀스를 패킹하는 과정
      #길이에 따라 소팅하는 과정이라고 보면 될 것 같다
      x = nn.utils.rnn.pack_padded_sequence(
          x, input_lengths, batch_first=True) #병렬 처리를 위해 padded된 sequence들을 packing하는 과정, 일종의 길이에 따른 sorting 과정 더 빠른 연산 가능

      self.lstm.flatten_parameters()
      outputs, _ = self.lstm(x)
      #패드패킨스 시퀀스를 한번 더 진행한다.논문과 똑같이 인코더를 구하게 된다.
      outputs, _ = nn.utils.rnn.pad_packed_sequence(
          outputs, batch_first=True)

      return outputs
#forward와 inference로 나뉜 이유는 forward는 학습할 때 사용하고 inference는 실제 학습된 모델을 인퍼런스 할 때 나오는 것
#inference 같은 경우에는 배치사이즈가 1이라고 볼 수 있기 때문에 굳이 택 패지드 시퀀스가 들어가 있지 않은 형태로 구현이 되어 있는 것을 볼 수 있다.
  def inference(self, x):
      for conv in self.convolutions:
          x = F.dropout(F.relu(conv(x)), 0.5, self.training)

      x = x.transpose(1, 2)

      self.lstm.flatten_parameters()
      outputs, _ = self.lstm(x)

      return outputs

3-2) Attention

In [None]:
class LocationLayer(nn.Module):
  def __init__(self, attention_n_filters, attention_kernel_size,
                attention_dim):
      super(LocationLayer, self).__init__()
      padding = int((attention_kernel_size - 1) / 2)
      self.location_conv = ConvNorm(2, attention_n_filters,
                                    kernel_size=attention_kernel_size,
                                    padding=padding, bias=False, stride=1,
                                    dilation=1)
      self.location_dense = LinearNorm(attention_n_filters, attention_dim,
                                        bias=False, w_init_gain='tanh')

  def forward(self, attention_weights_cat):
      processed_attention = self.location_conv(attention_weights_cat)
      processed_attention = processed_attention.transpose(1, 2)
      processed_attention = self.location_dense(processed_attention)
      return processed_attention

#쿼리 메모리는 디코더 아웃풋, 수식에서 h값 이다 메모리 값은 인코더 인베딩에 시퀀스 수식에 있는 xn이다.
#어텐션 웨이트는 합성에서는 얼라이먼트라고 표현하는데 밑에 수식에서보이는 y값이라 보면 된다.
#c는 컨텍스트 벡터, 어텐션은 최종 출력이라고 수있다
class Attention(nn.Module):
  def __init__(self, attention_rnn_dim, embedding_dim, attention_dim,
                attention_location_n_filters, attention_location_kernel_size):
      super(Attention, self).__init__()
      
      self.query_layer = LinearNorm(attention_rnn_dim, attention_dim,
                                    bias=False, w_init_gain='tanh')
      self.memory_layer = LinearNorm(embedding_dim, attention_dim, bias=False,
                                      w_init_gain='tanh')
      self.v = LinearNorm(attention_dim, 1, bias=False)
      self.location_layer = LocationLayer(attention_location_n_filters,
                                          attention_location_kernel_size,
                                          attention_dim)
      self.score_mask_value = -float("inf")


  def get_alignment_energies(self, query, processed_memory,
                              attention_weights_cat):
      """
      PARAMS
      ------
      query: decoder output (batch, n_mel_channels * n_frames_per_step)
      processed_memory: processed encoder outputs (B, T_in, attention_dim)
      attention_weights_cat: cumulative and prev. att weights (B, 2, max_time)

      RETURNS
      -------
      alignment (batch, max_time)
      """
#에너지 펑션 ,e를 구하는 과정, 디코더 아웃풋에 웨이트를 하나 주고 로케이션 레이어는 앞의 수식에서의 f 와 관련된 내용.
#쿼리+ 어텐션+ 메모리 로 이루어지게 된다.
      processed_query = self.query_layer(query.unsqueeze(1))
      processed_attention_weights = self.location_layer(attention_weights_cat)
      energies = self.v(torch.tanh(
          processed_query + processed_attention_weights + processed_memory))

      energies = energies.squeeze(-1)
      return energies
#forward 부분에서 용어가 살짝다른데 에너지가 얼라이먼트라고 되어있고 어텐션웨이트가 얼라이먼트 라고 나와있는 것을 볼 수 있다.
  def forward(self, attention_hidden_state, memory, processed_memory,
              attention_weights_cat, mask):
      """
      PARAMS
      ------
      attention_hidden_state: attention rnn last output
      memory: encoder outputs
      processed_memory: processed encoder outputs
      attention_weights_cat: previous and cummulative attention weights
      mask: binary mask for padded data
      """
      alignment = self.get_alignment_energies(
          attention_hidden_state, processed_memory, attention_weights_cat)

      if mask is not None:
          alignment.data.masked_fill_(mask, self.score_mask_value)
#에너지 펑션에 소프트맥스를 취해서 어텐션웨이트를 구한다.
      attention_weights = F.softmax(alignment, dim=1)
      #인코더 임베딩 값들과 매트릭스 멀티플케이션을 하여 콘텍스트 벡터를 얻는다.
      attention_context = torch.bmm(attention_weights.unsqueeze(1), memory) # bmm: batch matrix multiplication
      attention_context = attention_context.squeeze(1)
#출력에서는 컨텍스트 벡터와 얼라이 어텐션 레이트 (수식에서 봤던y값) 두개를 출력으로 내보내게 된다.
      return attention_context, attention_weights

3-3) Decoder

In [None]:
#프리넷 같은 경우에는 각각의 레이어를 쌓는 형태로 되어 있다.
class Prenet(nn.Module):
  def __init__(self, in_dim, sizes):
      super(Prenet, self).__init__()
      in_sizes = [in_dim] + sizes[:-1]
      self.layers = nn.ModuleList(
          [LinearNorm(in_size, out_size, bias=False)
            for (in_size, out_size) in zip(in_sizes, sizes)])
#각각의 레이어는 드롭아웃을 거치게 된다는 것을 알 수 있다.
#엑티베이션으로는 렐루를 사용하여 프리넷을 하게 된다.
  def forward(self, x):
      for linear in self.layers:
          x = F.dropout(F.relu(linear(x)), p=0.5, training=True)
      return x

##포스트넷 같은경우에는 1d컨볼루션을 쌓는 형태로 되어 있다.
class Postnet(nn.Module):
  """Postnet
      - Five 1-d convolution with 512 channels and kernel size 5
  """

  def __init__(self, hparams):
      super(Postnet, self).__init__()
      self.convolutions = nn.ModuleList()

      self.convolutions.append(
          nn.Sequential(
              ConvNorm(hparams.n_mel_channels, hparams.postnet_embedding_dim,
                        kernel_size=hparams.postnet_kernel_size, stride=1,
                        padding=int((hparams.postnet_kernel_size - 1) / 2),
                        dilation=1, w_init_gain='tanh'),
              nn.BatchNorm1d(hparams.postnet_embedding_dim))
      )

      for i in range(1, hparams.postnet_n_convolutions - 1):
          self.convolutions.append(
              nn.Sequential(
                  ConvNorm(hparams.postnet_embedding_dim,
                            hparams.postnet_embedding_dim,
                            kernel_size=hparams.postnet_kernel_size, stride=1,
                            padding=int((hparams.postnet_kernel_size - 1) / 2),
                            dilation=1, w_init_gain='tanh'),
                  nn.BatchNorm1d(hparams.postnet_embedding_dim))
          )

      self.convolutions.append(
          nn.Sequential(
              ConvNorm(hparams.postnet_embedding_dim, hparams.n_mel_channels,
                        kernel_size=hparams.postnet_kernel_size, stride=1,
                        padding=int((hparams.postnet_kernel_size - 1) / 2),
                        dilation=1, w_init_gain='linear'),
              nn.BatchNorm1d(hparams.n_mel_channels))
          )
#마지막 컨볼루션 레이어 전까지는 하이퍼 탄젠트를 엑티베이션으로 하는 1D컨볼루션을 쌓고 그때마다 드롭아웃을 수행하게 되고
#마지막 레이어에서는 그냥 멜을 리그레션 할 수 있는 형태로 리니어가 나오게 된다.
#레이어마다 드롭아웃을 전부 했었는데 드롭아웃에 관한건 코드에 따라서 많이 차이가 있는 것 같다.어느 코드는 인코더 부분에만 드롭아웃이 달려있다(논문에도 디테일하게 나와있지 않음,성능에 따라 사용하는 듯)
#노멀라이 제이션이나 드롭아웃을 군데군데 사용하면 성능이 좋아지는 것을 확인했다.옵티마이저하는 과정을 거쳐서 이것 저것 바꿔보며 실험을 해봐야 할 듯.
  def forward(self, x):
      for i in range(len(self.convolutions) - 1):
          x = F.dropout(torch.tanh(self.convolutions[i](x)), 0.5, self.training)
      x = F.dropout(self.convolutions[-1](x), 0.5, self.training)

      return x


class Decoder(nn.Module):
  def __init__(self, hparams):
      super(Decoder, self).__init__()
      #메스펙토그램을 몇차로 할 것인지
      self.n_mel_channels = hparams.n_mel_channels
      #몇개의 멜을 출력으로 할 것인지(타코트론2에서는 1)
      self.n_frames_per_step = hparams.n_frames_per_step
      #인코더 임베디드를 입력으로 받기 때문에 그 디멘션
      self.encoder_embedding_dim = hparams.encoder_embedding_dim
      
      self.attention_rnn_dim = hparams.attention_rnn_dim
      self.decoder_rnn_dim = hparams.decoder_rnn_dim
      self.prenet_dim = hparams.prenet_dim
      self.max_decoder_steps = hparams.max_decoder_steps
      self.gate_threshold = hparams.gate_threshold
      self.p_attention_dropout = hparams.p_attention_dropout
      self.p_decoder_dropout = hparams.p_decoder_dropout

      self.prenet = Prenet(
          hparams.n_mel_channels * hparams.n_frames_per_step,
          [hparams.prenet_dim, hparams.prenet_dim])

      self.attention_rnn = nn.LSTMCell(
          hparams.prenet_dim + hparams.encoder_embedding_dim,
          hparams.attention_rnn_dim)

      self.attention_layer = Attention(
          hparams.attention_rnn_dim, hparams.encoder_embedding_dim,
          hparams.attention_dim, hparams.attention_location_n_filters,
          hparams.attention_location_kernel_size)

      self.decoder_rnn = nn.LSTMCell(
          hparams.attention_rnn_dim + hparams.encoder_embedding_dim,
          hparams.decoder_rnn_dim, 1)
#멜을 추정해야 되기 때문에 엑티베이션이 그냥 리니어이다
      self.linear_projection = LinearNorm(
          hparams.decoder_rnn_dim + hparams.encoder_embedding_dim,
          hparams.n_mel_channels * hparams.n_frames_per_step)
#게이트는 스탑토큰을 프레딕션 해야 하기 때문에 시그모이드로 출력을 받게 된다.
      self.gate_layer = LinearNorm(
          hparams.decoder_rnn_dim + hparams.encoder_embedding_dim, 1,
          bias=True, w_init_gain='sigmoid')
#첫 번째 디코더로 들어갈 0들을 만드는 작업.
  def get_go_frame(self, memory):
      """ Gets all zeros frames to use as first decoder input
      PARAMS
      ------
      memory: decoder outputs

      RETURNS
      -------
      decoder_input: all zeros frames
      """

      B = memory.size(0)
      decoder_input = Variable(memory.data.new(
          B, self.n_mel_channels * self.n_frames_per_step).zero_())
      return decoder_input
#어텐션 스테이트 디코더 스테이트들을 이 코드에서 정의해준다.
  def initialize_decoder_states(self, memory, mask):
      """ Initializes attention rnn states, decoder rnn states, attention
      weights, attention cumulative weights, attention context, stores memory
      and stores processed memory
      PARAMS
      ------
      memory: Encoder outputs
      mask: Mask for padded data if training, expects None for inference
      """
      B = memory.size(0)
      MAX_TIME = memory.size(1)

      self.attention_hidden = Variable(memory.data.new(
          B, self.attention_rnn_dim).zero_())
      self.attention_cell = Variable(memory.data.new(
          B, self.attention_rnn_dim).zero_())

      self.decoder_hidden = Variable(memory.data.new(
          B, self.decoder_rnn_dim).zero_())
      self.decoder_cell = Variable(memory.data.new(
          B, self.decoder_rnn_dim).zero_())

      self.attention_weights = Variable(memory.data.new(
          B, MAX_TIME).zero_())
      self.attention_weights_cum = Variable(memory.data.new(
          B, MAX_TIME).zero_())
      self.attention_context = Variable(memory.data.new(
          B, self.encoder_embedding_dim).zero_())

      self.memory = memory
      self.processed_memory = self.attention_layer.memory_layer(memory)
      self.mask = mask
#피쳐 푸싱에 관련된 내용이다. 타코트론 디코더를 학습할 때에 추정된 매를 다음번에 입력에 넣어주는게 아니라 정답 멜을 사용하는데
#그 부분을 담당하는게 파스 디코더 인풋이다.
  def parse_decoder_inputs(self, decoder_inputs):
      """ Prepares decoder inputs, i.e. mel outputs
      PARAMS
      ------
      decoder_inputs: inputs used for teacher-forced training, i.e. mel-specs

      RETURNS
      -------
      inputs: processed decoder inputs

      """
      # (B, n_mel_channels, T_out) -> (B, T_out, n_mel_channels)
      #추정된 것 대신 대신 넣는 것에 대해 구현되어 있다.
      #트랜스포즈는 디멘션을 맞춰주기 위해 진행되는 것이라 보면 된다.
      decoder_inputs = decoder_inputs.transpose(1, 2)
      decoder_inputs = decoder_inputs.view(
          decoder_inputs.size(0),
          int(decoder_inputs.size(1)/self.n_frames_per_step), -1)
      # (B, T_out, n_mel_channels) -> (T_out, B, n_mel_channels)
      decoder_inputs = decoder_inputs.transpose(0, 1)
      return decoder_inputs
# 이 부분은 디코더 아웃풋을 어떻게 파싱할지 하는 부분.
  def parse_decoder_outputs(self, mel_outputs, gate_outputs, alignments):
      """ Prepares decoder outputs for output
      PARAMS
      ------
      mel_outputs:
      gate_outputs: gate output energies
      alignments:

      RETURNS
      -------
      mel_outputs:
      gate_outpust: gate output energies
      alignments:
      """
      # (T_out, B) -> (B, T_out)
      alignments = torch.stack(alignments).transpose(0, 1)
      # (T_out, B) -> (B, T_out)
      gate_outputs = torch.stack(gate_outputs).transpose(0, 1)
      gate_outputs = gate_outputs.contiguous()
      # (T_out, B, n_mel_channels) -> (B, T_out, n_mel_channels)
      mel_outputs = torch.stack(mel_outputs).transpose(0, 1).contiguous()
      # decouple frames per step
      mel_outputs = mel_outputs.view(
          mel_outputs.size(0), -1, self.n_mel_channels)
      # (B, T_out, n_mel_channels) -> (B, n_mel_channels, T_out)
      mel_outputs = mel_outputs.transpose(1, 2)

      return mel_outputs, gate_outputs, alignments

  def decode(self, decoder_input):
      """ Decoder step using stored states, attention and memory
      PARAMS
      ------
      decoder_input: previous mel output

      RETURNS
      -------
      mel_output:
      gate_output: gate output energies
      attention_weights:
      """
      #기본적으로 디코더에 인풋으로 들어가는 것은 어텐션에 컨텍스트 벡터를 콘텍트에서 쓰게 된다. 컨텍트 하는 부분.
      #cell_input은 어텐션rnn 디코션 rnn 에 들어가는 인풋을 결정해 주는 것이다.
      #이 인풋은 전 시간의 디코더 아웃풋이 디코더 인풋으로 들어가고 어텐션 컨텍스트 벡터의 결과를 같이 컨켁트네이션을하여 같이 사용한다.
      cell_input = torch.cat((decoder_input, self.attention_context), -1)
      #어텐션 rnn을 통해서 한 번 LSTM을 통과하게된다, 어텐션 rnn이 들어가는 것을 볼 수 있다.
      self.attention_hidden, self.attention_cell = self.attention_rnn(
          cell_input, (self.attention_hidden, self.attention_cell))
      #dropout이 들어간다
      self.attention_hidden = F.dropout(
          self.attention_hidden, self.p_attention_dropout, self.training)

      attention_weights_cat = torch.cat(
          (self.attention_weights.unsqueeze(1),
            self.attention_weights_cum.unsqueeze(1)), dim=1)
      #어텐션레이어가 나오게 되는데 어텐션 레이어는 아까 말한대로 어텐션히든, ht가 입력으로 들어가고 메모리는 인코더 인베딩 시퀀스라 보면 된다.
      self.attention_context, self.attention_weights = self.attention_layer(
          self.attention_hidden, self.memory, self.processed_memory,
          attention_weights_cat, self.mask) #전시간의 어텐션을 넣어준다고 보면된다
#어텐션웨이트들이 나올때마다 더해줘서 모노토닉한 부분을 억지로 조금이라도 넣어줄 수 있도록 하는 방식이 타코트론2에서 사용된 방식이다.
      self.attention_weights_cum += self.attention_weights
      decoder_input = torch.cat(
      #어텐션히든과 어텐션컨텍스트 두 가지 어텐션LSTM과 어텐션 자체에서 나온 컨텍스트 벡터 이렇게 2가지를 concat에서 사용하게된다.
          (self.attention_hidden, self.attention_context), -1)
      self.decoder_hidden, self.decoder_cell = self.decoder_rnn(
          decoder_input, (self.decoder_hidden, self.decoder_cell))
      self.decoder_hidden = F.dropout(
          self.decoder_hidden, self.p_decoder_dropout, self.training)
#리니어 프로젝션 멜을 뽑는 리니어 프로젝션과 스탑토큰 프레딕션을 할 때 디코더 LSTM값에 디코더히든과 어텐션콘텍스트벡터를 콘캣하게된다
      decoder_hidden_attention_context = torch.cat(
          (self.decoder_hidden, self.attention_context), dim=1)
      decoder_output = self.linear_projection(
          decoder_hidden_attention_context)
      gate_prediction = self.gate_layer(decoder_hidden_attention_context)
      ##최종적으로 디코더 아웃풋 멜이 나오게되고, gate는 스탑토큰,attention은얼라인과 관련된 이렇게 3 가지가 출력으로 나오게된다.
      return decoder_output, gate_prediction, self.attention_weights

  def forward(self, memory, decoder_inputs, memory_lengths):
      """ Decoder forward pass for training
      PARAMS
      ------
      memory: Encoder outputs
      decoder_inputs: Decoder inputs for teacher forcing. i.e. mel-specs
      memory_lengths: Encoder output lengths for attention masking.

      RETURNS
      -------
      mel_outputs: mel outputs from the decoder
      gate_outputs: gate outputs from the decoder
      alignments: sequence of attention weights from the decoder
      """
#학습시 0을 처음값으로 주고 그 인풋을 파싱한 값을 준비한다음 프리넷에 통과시켜 학습하게된다.
#실제 그라운드 트루 스멜 스펙트로그램을 인풋으로 넣어주는 과정이라 보면 된다.
      decoder_input = self.get_go_frame(memory).unsqueeze(0)
      decoder_inputs = self.parse_decoder_inputs(decoder_inputs)
      decoder_inputs = torch.cat((decoder_input, decoder_inputs), dim=0)
      decoder_inputs = self.prenet(decoder_inputs)

      self.initialize_decoder_states(
          memory, mask=~get_mask_from_lengths(memory_lengths))

      mel_outputs, gate_outputs, alignments = [], [], []
      while len(mel_outputs) < decoder_inputs.size(0) - 1:
          decoder_input = decoder_inputs[len(mel_outputs)]
          #디코드를통해 멜 아웃풋 게이트 아웃풋 어탠션 웨이트를 구하는 과정이라 보면된다.
          mel_output, gate_output, attention_weights = self.decode(
              decoder_input)
          mel_outputs += [mel_output.squeeze(1)]
          gate_outputs += [gate_output.squeeze(1)]
          alignments += [attention_weights]
      #구해진 값을 파싱을 통해 다시 정리를 하는 하는 과정
      mel_outputs, gate_outputs, alignments = self.parse_decoder_outputs(
          mel_outputs, gate_outputs, alignments)

      return mel_outputs, gate_outputs, alignments

  def inference(self, memory):
      """ Decoder inference
      PARAMS
      ------
      memory: Encoder outputs

      RETURNS
      -------
      mel_outputs: mel outputs from the decoder
      gate_outputs: gate outputs from the decoder
      alignments: sequence of attention weights from the decoder
      """
      #티쳐포싱을 더이상 쓸 수 없기 때문에 그냥 프리모드. 즉 전 시간의 디코더 인풋을 말문을 통해 받는 과정
      decoder_input = self.get_go_frame(memory)

      self.initialize_decoder_states(memory, mask=None)

      mel_outputs, gate_outputs, alignments = [], [], []
      
      while True: #free learning 모드
          decoder_input = self.prenet(decoder_input)
          mel_output, gate_output, alignment = self.decode(decoder_input)

          mel_outputs += [mel_output.squeeze(1)]
          gate_outputs += [gate_output]
          alignments += [alignment]
      #말문이 끝나는 과정은 스탑토큰의 결과가 쓰레쉬홀드를 넘을 경우 브레이크 해버리게 되는 형태로 합성이 이루어진다.
          if torch.sigmoid(gate_output.data) > self.gate_threshold:
              break
          elif len(mel_outputs) == self.max_decoder_steps:
              print("Warning! Reached max decoder steps")
              break
#학습할 때에는 브레이크 없이 쭉 값들은 받고 그 값을 로스펑션할대만 사용한다.
          decoder_input = mel_output

      mel_outputs, gate_outputs, alignments = self.parse_decoder_outputs(
          mel_outputs, gate_outputs, alignments)

      return mel_outputs, gate_outputs, alignments
#스탑토큰이 생각보다 빨리 끊기는 경우가 많은데,즉 '뭐뭐했습니.(다x)'에서 끝난다
#보통 뒤가 짤리는 경우는 앞뒤 사일런스를 타이트 하게 잘라서 타코트론이 학습할 때 마지막 발음을 사일런스로 취급하는 경우가 있다.
#데이터 프로세싱 할때 트레이닝을 너무 타이트 하게 하면 스탑토큰이 빨리 끊길 수 있다.

3-4) Tacotron2

In [None]:
#전체적인 타코트론2에 대한 구조
#각각의 인코더 디코더 포스트넷을 불러온다.
class Tacotron2(nn.Module):
  def __init__(self, hparams):
      super(Tacotron2, self).__init__()
      self.mask_padding = hparams.mask_padding
      self.n_mel_channels = hparams.n_mel_channels
      self.n_frames_per_step = hparams.n_frames_per_step
      self.embedding = nn.Embedding(
          len(vocab), hparams.symbols_embedding_dim)
      std = sqrt(2.0 / (len(vocab) + hparams.symbols_embedding_dim))
      val = sqrt(3.0) * std  # uniform bounds for std
      self.embedding.weight.data.uniform_(-val, val)
      self.encoder = Encoder(hparams)
      self.decoder = Decoder(hparams)
      self.postnet = Postnet(hparams)
#배치를 파싱 어느코드랑 다 비슷비슷하다
  def parse_batch(self, batch):
      text_padded, input_lengths, mel_padded, gate_padded, \
          output_lengths = batch
      text_padded = to_gpu(text_padded).long()
      input_lengths = to_gpu(input_lengths).long()
      max_len = torch.max(input_lengths.data).item()
      mel_padded = to_gpu(mel_padded).float()
      gate_padded = to_gpu(gate_padded).float()
      output_lengths = to_gpu(output_lengths).long()

      return (
          (text_padded, input_lengths, mel_padded, max_len, output_lengths),
          (mel_padded, gate_padded))
#매스크를 사용하여 패딩된걸 날린다.
  def parse_output(self, outputs, output_lengths=None):
      if self.mask_padding and output_lengths is not None:
          mask = ~get_mask_from_lengths(output_lengths)
          mask = mask.expand(self.n_mel_channels, mask.size(0), mask.size(1))
          mask = mask.permute(1, 0, 2)

          outputs[0].data.masked_fill_(mask, 0.0)
          outputs[1].data.masked_fill_(mask, 0.0)
          outputs[2].data.masked_fill_(mask[:, 0, :], 1e3)  # gate energies

      return outputs
#텍스트가 로딩을하게된다 mels, max_len등
  def forward(self, inputs):
      text_inputs, text_lengths, mels, max_len, output_lengths = inputs
      text_lengths, output_lengths = text_lengths.data, output_lengths.data
#이 임베딩은 학습가능한 루거테이블이라 보면된다.
      embedded_inputs = self.embedding(text_inputs).transpose(1, 2) #LUT
#인풋아웃풋통과
      encoder_outputs = self.encoder(embedded_inputs, text_lengths)

      mel_outputs, gate_outputs, alignments = self.decoder(
          encoder_outputs, mels, memory_lengths=text_lengths)
#여기있는 포스트넷은 레지듀얼커넥션을 가지게 되서 이렇게 표현한다.
      mel_outputs_postnet = self.postnet(mel_outputs)
      mel_outputs_postnet = mel_outputs + mel_outputs_postnet
#파스아웃풋을통해 매스함수를 씌워주게된다
      return self.parse_output(
          [mel_outputs, mel_outputs_postnet, gate_outputs, alignments],
          output_lengths)

  def inference(self, inputs):
      embedded_inputs = self.embedding(inputs).transpose(1, 2)
      encoder_outputs = self.encoder.inference(embedded_inputs)
      mel_outputs, gate_outputs, alignments = self.decoder.inference(
          encoder_outputs)

      mel_outputs_postnet = self.postnet(mel_outputs)
      mel_outputs_postnet = mel_outputs + mel_ouftputs_postnet

      outputs = self.parse_output(
          [mel_outputs, mel_outputs_postnet, gate_outputs, alignments])

      return outputs

4. 모델 선언 및 옵티마이저 설정

In [None]:
model = Tacotron2(hparams).to(device)
#위에서 설정했던 파라미터 값들.
learning_rate = hparams.learning_rate
#타코트론 같은 경우는 아담 옵티마이저를 사용한다라고 볼 수 있다.
#성능 자체는 아담이 제일 안정적으로 나온다, 구현상이나 코드에 따라서 어느정도 차이가 있을 수 있다.
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate,
                              weight_decay=hparams.weight_decay)

5.목적함수정의

In [None]:
class Tacotron2Loss(nn.Module):
  def __init__(self):
      super(Tacotron2Loss, self).__init__()
#타겟자체는 두개가 있다, 멜타겟= 최종적인 출력, 게이트타겟=스탑토큰타겟, 
#둘 다 그래디언트가 필요없기 때문에 False로 정의해준다.
  def forward(self, model_output, targets):
      mel_target, gate_target = targets[0], targets[1]
      mel_target.requires_grad = False
      gate_target.requires_grad = False
      gate_target = gate_target.view(-1, 1)
#모델 아웃풋은 포스넷을 걸치지 않은 멜 아웃풋, 포스넷을 걸친 최종 아웃풋, 게이트 아웃풋 3가지를 사용하게된다.
      mel_out, mel_out_postnet, gate_out, _ = model_output
      gate_out = gate_out.view(-1, 1)
      #멜 로스 같은 경우에는 비포어 로스, 에프터 로스 이것을 MSE를 통해 더해준다
      mel_loss = nn.MSELoss()(mel_out, mel_target) + nn.MSELoss()(mel_out_postnet, mel_target)
      #게이트 로스도 바이너리 크로센트리를 사용하게 된다, 바이너리 크로센트리 엔트리는 BCE보다 안정적이여서 많이 쓴다
      gate_loss = nn.BCEWithLogitsLoss()(gate_out, gate_target) #binary cross entropy (BCE보다 안정적)
      #총 3가지 로스가 합쳐진 형태.
      return mel_loss + gate_loss

criterion = Tacotron2Loss()

6. 훈련

In [None]:
iteration = 0
#보통 데이터 사이즈에 따라서 에폭이 다르다, 100k정도만 돌아도 소리가 잘 난다,블리자드 같은경우 150정도 ,모델 사이즈나 데이터 사이즈에 따라서 유동적으로 잡자.
#중간중간에 샘플을 들어보면 굉장히 잘 나온다 정답 멜들이 계속 들어오기 때문에 하지만 테스트했을 때 잘 나올거라 생각하면 안된다.
#보통의경우는 얼라이먼트커브를 그린다.여기서는 없다.
#어텐션웨이트들을 플랏을 하게 되면 어텐션이 제대로 잡혔는지 안 잡혔는지 알 수 있다.
#엔드투엔드는 모노토닉하게 증가해야 된다고 말했는데 그것과 같이 잘 잡힌 것을 알 수 있고 만약 잡히지 않으면 어텐션 웨이트들이 어느 한 인코더값에 집중하지 못하고 날라가는 모습을 보이기도한다.
#합성에서는 러닝레이트가 줄어든다고해서 잘 합성이 되고있다고 보면안되고 중간중간 얼라이먼트와 여력이 된다면 실제 합성음을 중간중간 테스트 해보자
#합성처음하는 사람들이 러닝레이트가 잘 줄고있어서 학습이 잘 되고 있다고 착각하는 경우가 많다.

for epoch in range(100):
  print("Epoch: {}".format(epoch))
  for i, batch in enumerate(train_loader):
    start = time.perf_counter()
    model.zero_grad()
    x, y = model.parse_batch(batch)
    y_pred = model(x)
    loss = criterion(y_pred, y)
    #로스와 로스를 통한 백프로퍼기게이션을 통해 학습이 되는 단계가 이 부분이다
    reduced_loss = loss.item()  #[1.0] -> 1.0
    loss.backward()
    grad_norm = torch.nn.utils.clip_grad_norm_(
        model.parameters(), hparams.grad_clip_thresh)

    optimizer.step()

    duration = time.perf_counter() - start
    print("Train loss {} {:.6f} Grad Norm {:.6f} {:.2f}s/it".format(
        iteration, reduced_loss, grad_norm, duration))
    
    iteration += 1
    #타코트론 로스와 같은경우 초반엔 굉장히 빠르게 줄다 빠르게 줄지않는 구간이 금방 찾아온다.
    #보코더가 앤드투앤드에서 굉장히 중요한 부분을 차지한다.
    #그리핀림과 웨이브넷의 차이가 심한 것 처럼 뉴럴 보코더를 사용하기만해도 성능향상이 엄청나게 이루어진다
    #웨이브넷 같은경우에는 연산량문제가 커서 10분짜리 음성을 만드는데 몇십분이 걸리는 경우가 있다 굉장히 느리다.
    #현재에 굉장히 성능좋은 보코더가 많이나와서 찾아서 돌려보는 것을 추천한다.
    #앤드투앤드라는 의미는 텍스트부터 멜까지가 앤드투 앤드라고 보면되고 일반적으로는 조인 트레이닝 하지않고 웨이브넷 따로 트레이닝 한다.
    #웨이브넷 트레이닝 할 때에는 DB 가 달라도 어느 정도 성능이 나오지만 같은 DB를 추천한다.
    #멀티 스피커로 넘어가게되면 뉴럴 보코더가 문제가 될 수 있다. 여러 목소리로 합성을 시도해야 하기 떄문에.
    #그래서 컨디션해서 넣어주는 방식을 사용하거나 뉴럴보코더를 사용하지않고 그리핀림을 사용하는 방식으로 사용하면 될 것같다
    #그리핀림은 음질이 떨어지지만 복원하는 데에는 큰 문제가 없기 때문에 사용하는 것을 추천하고
    #성능을 올리고 싶다 하는 사람들은 웨이브넷이나 다른 보코더를 찾아 쓰는게 연구에 도움이 될 듯 하다.
    #NVIDIA 깃허브에 있는 코드를 참고하자.